diff --git a/.github/workflows/pr-builder.yml b/.github/workflows/pr-builder.yml new file mode 100644 index 0000000000..4077e50564 --- /dev/null +++ b/.github/workflows/pr-builder.yml @@ -0,0 +1,27 @@ +name: YCSB build with Maven Wrapper + +on: + pull_request: + branches: [ main ] + paths-ignore: + - '**.md' + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + java: [11, 17] + + steps: + - uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 + + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@5ffc13f4174014e2d4d4572b3d74c3fa61aeb2c2 # 3.11.0 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + + - name: Build with Maven Wrapper + run: ./mvnw clean install site \ No newline at end of file diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000000..f9ba8cf65f --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,9 @@ +# Microsoft Open Source Code of Conduct + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). + +Resources: + +- [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/) +- [Microsoft Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) +- Contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with questions or concerns diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/README.md b/README.md index 84fca3011d..cc3150d608 100644 --- a/README.md +++ b/README.md @@ -1,80 +1,34 @@ - +Additionally, the “operationcount” type in YCSB core, has been changed to long from int to allow for large “operationcount” value. -YCSB -==================================== -[![Build Status](https://travis-ci.org/brianfrankcooper/YCSB.png?branch=master)](https://travis-ci.org/brianfrankcooper/YCSB) +We will keep this fork up to date with the upstream YCSB repository and push changes from here, back to the upstream YCSB repository if it becomes active again. +## Contributing +This project welcomes contributions and suggestions. Most contributions require you to agree to a +Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us +the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com. +When you submit a pull request, a CLA bot will automatically determine whether you need to provide +a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions +provided by the bot. You will only need to do this once across all repos using our CLA. -Links ------ -* To get here, use https://ycsb.site -* [Our project docs](https://github.com/brianfrankcooper/YCSB/wiki) -* [The original announcement from Yahoo!](https://labs.yahoo.com/news/yahoo-cloud-serving-benchmark/) +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). +For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or +contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. -Getting Started ---------------- +## Trademarks -1. Download the [latest release of YCSB](https://github.com/brianfrankcooper/YCSB/releases/latest): - - ```sh - curl -O --location https://github.com/brianfrankcooper/YCSB/releases/download/0.17.0/ycsb-0.17.0.tar.gz - tar xfvz ycsb-0.17.0.tar.gz - cd ycsb-0.17.0 - ``` - -2. Set up a database to benchmark. There is a README file under each binding - directory. - -3. Run YCSB command. - - On Linux: - ```sh - bin/ycsb.sh load basic -P workloads/workloada - bin/ycsb.sh run basic -P workloads/workloada - ``` - - On Windows: - ```bat - bin/ycsb.bat load basic -P workloads\workloada - bin/ycsb.bat run basic -P workloads\workloada - ``` - - Running the `ycsb` command without any argument will print the usage. - - See https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload - for a detailed documentation on how to run a workload. - - See https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties for - the list of available workload properties. - - -Building from source --------------------- - -YCSB requires the use of Maven 3; if you use Maven 2, you may see [errors -such as these](https://github.com/brianfrankcooper/YCSB/issues/406). - -To build the full distribution, with all database bindings: - - mvn clean package - -To build a single database binding: - - mvn -pl site.ycsb:mongodb-binding -am clean package +This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft +trademarks or logos is subject to and must follow +[Microsoft's Trademark & Brand Guidelines](https://www.microsoft.com/en-us/legal/intellectualproperty/trademarks/usage/general). +Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. +Any use of third-party trademarks or logos are subject to those third-party's policies. diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000000..869fdfe2b2 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,41 @@ + + +## Security + +Microsoft takes the security of our software products and services seriously, which includes all source code repositories managed through our GitHub organizations, which include [Microsoft](https://github.com/Microsoft), [Azure](https://github.com/Azure), [DotNet](https://github.com/dotnet), [AspNet](https://github.com/aspnet), [Xamarin](https://github.com/xamarin), and [our GitHub organizations](https://opensource.microsoft.com/). + +If you believe you have found a security vulnerability in any Microsoft-owned repository that meets [Microsoft's definition of a security vulnerability](https://aka.ms/opensource/security/definition), please report it to us as described below. + +## Reporting Security Issues + +**Please do not report security vulnerabilities through public GitHub issues.** + +Instead, please report them to the Microsoft Security Response Center (MSRC) at [https://msrc.microsoft.com/create-report](https://aka.ms/opensource/security/create-report). + +If you prefer to submit without logging in, send email to [secure@microsoft.com](mailto:secure@microsoft.com). If possible, encrypt your message with our PGP key; please download it from the [Microsoft Security Response Center PGP Key page](https://aka.ms/opensource/security/pgpkey). + +You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Additional information can be found at [microsoft.com/msrc](https://aka.ms/opensource/security/msrc). + +Please include the requested information listed below (as much as you can provide) to help us better understand the nature and scope of the possible issue: + + * Type of issue (e.g. buffer overflow, SQL injection, cross-site scripting, etc.) + * Full paths of source file(s) related to the manifestation of the issue + * The location of the affected source code (tag/branch/commit or direct URL) + * Any special configuration required to reproduce the issue + * Step-by-step instructions to reproduce the issue + * Proof-of-concept or exploit code (if possible) + * Impact of the issue, including how an attacker might exploit the issue + +This information will help us triage your report more quickly. + +If you are reporting for a bug bounty, more complete reports can contribute to a higher bounty award. Please visit our [Microsoft Bug Bounty Program](https://aka.ms/opensource/security/bounty) page for more details about our active programs. + +## Preferred Languages + +We prefer all communications to be in English. + +## Policy + +Microsoft follows the principle of [Coordinated Vulnerability Disclosure](https://aka.ms/opensource/security/cvd). + + diff --git a/SUPPORT.md b/SUPPORT.md new file mode 100644 index 0000000000..291d4d4373 --- /dev/null +++ b/SUPPORT.md @@ -0,0 +1,25 @@ +# TODO: The maintainer of this repo has not yet edited this file + +**REPO OWNER**: Do you want Customer Service & Support (CSS) support for this product/project? + +- **No CSS support:** Fill out this template with information about how to file issues and get help. +- **Yes CSS support:** Fill out an intake form at [aka.ms/onboardsupport](https://aka.ms/onboardsupport). CSS will work with/help you to determine next steps. +- **Not sure?** Fill out an intake as though the answer were "Yes". CSS will help you decide. + +*Then remove this first heading from this SUPPORT.MD file before publishing your repo.* + +# Support + +## How to file issues and get help + +This project uses GitHub Issues to track bugs and feature requests. Please search the existing +issues before filing new issues to avoid duplicates. For new issues, file your bug or +feature request as a new Issue. + +For help and questions about using this project, please **REPO MAINTAINER: INSERT INSTRUCTIONS HERE +FOR HOW TO ENGAGE REPO OWNERS OR COMMUNITY FOR HELP. COULD BE A STACK OVERFLOW TAG OR OTHER +CHANNEL. WHERE WILL YOU HELP PEOPLE?**. + +## Microsoft Support Policy + +Support for this **PROJECT or PRODUCT** is limited to the resources listed above. diff --git a/azurecosmos/README.md b/azurecosmos/README.md index 9af889b4c5..25838f223e 100644 --- a/azurecosmos/README.md +++ b/azurecosmos/README.md @@ -125,4 +125,4 @@ following location: ### 3. FAQs ### 4. Example command -./bin/ycsb run azurecosmos -P workloads/workloadc -p azurecosmos.primaryKey= -p azurecosmos.uri=https://.documents.azure.com:443/ -p recordcount=100 -p operationcount=100 +./bin/ycsb run azurecosmos -P workloads/workloadc -p azurecosmos.primaryKey= -p azurecosmos.uri=https://.documents.azure.com:443/ -p recordcount=100 -p operationcount=100 \ No newline at end of file diff --git a/azurecosmos/conf/azurecosmos.properties b/azurecosmos/conf/azurecosmos.properties index ac2a6c69e9..f3a79a632f 100644 --- a/azurecosmos/conf/azurecosmos.properties +++ b/azurecosmos/conf/azurecosmos.properties @@ -16,8 +16,8 @@ # See https://docs.microsoft.com/en-us/azure/cosmos-db/performance-tips-java-sdk-v4-sql for details on some of the options below. # Azure Cosmos DB host uri (ex: https://p3rf.documents.azure.com:443/) and primary key. -# azurecosmos.primaryKey = -# azurecosmos.uri = +# azurecosmos.primaryKey = +# azurecosmos.uri = # Database to be used, if not specified 'ycsb' will be used. # azurecosmos.databaseName = ycsb @@ -31,6 +31,10 @@ # The default is false to reduce output size. # azurecosmos.includeExceptionStackInLog = false +# Determines if full request diagnostics need to be printed for high latency requests. +# The default is -1(no diagnostics at all)to reduce output size. +# azurecosmos.diagnosticsLatencyThresholdInMS = -1 + # The value to be appended to the user-agent header. # In most cases, you should leave this as "azurecosmos-ycsb". # azurecosmos.userAgent = azurecosmos-ycsb @@ -54,7 +58,7 @@ # Set the maximum retry duration in seconds. # azurecosmos.maxRetryWaitTimeInSeconds = 30 -# Set the value of the connection pool size in gateway mode. +# Set the value of the connection pool size in gateway mode. # azurecosmos.gatewayMaxConnectionPoolSize = 30 # Set the value of the max connections per endpoint in direct mode. @@ -82,3 +86,18 @@ # Sets the preferred page size when scanning. # Default value is -1. # azurecosmos.preferredPageSize = -1 + +# output file location +# exportfile = + +# application insight connection String +# azurecosmos.appInsightConnectionString = + +# Cosmos Client Diagnostics logs options +# azurecosmos.pointOperationLatencyThresholdInMS = +# azurecosmos.nonPointOperationLatencyThresholdInMS = +# azurecosmos.requestChargeThreshold = + +# Sets the comma separated preferred region list. +# Default value is null. +# azurecosmos.preferredRegionList = diff --git a/azurecosmos/pom.xml b/azurecosmos/pom.xml index 1b45895fa7..5c8da86b33 100644 --- a/azurecosmos/pom.xml +++ b/azurecosmos/pom.xml @@ -34,27 +34,38 @@ LICENSE file. com.azure azure-cosmos ${azurecosmos.version} - - - org.slf4j - slf4j-api - 1.7.5 - - - org.slf4j - slf4j-log4j12 - 1.7.5 - - - log4j - log4j - 1.2.17 - - + + + org.apache.logging.log4j + log4j-api + 2.18.0 + + + org.apache.logging.log4j + log4j-core + 2.18.0 + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.18.0 + + site.ycsb core ${project.version} provided + + io.micrometer + micrometer-registry-azure-monitor + 1.9.2 + compile + + + com.microsoft.azure + applicationinsights-agent + 3.5.1 + diff --git a/azurecosmos/src/main/java/site/ycsb/db/AzureCosmosClient.java b/azurecosmos/src/main/java/site/ycsb/db/AzureCosmosClient.java index 040485d301..37355f603d 100644 --- a/azurecosmos/src/main/java/site/ycsb/db/AzureCosmosClient.java +++ b/azurecosmos/src/main/java/site/ycsb/db/AzureCosmosClient.java @@ -16,32 +16,21 @@ package site.ycsb.db; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.Vector; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosClient; import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosContainer; import com.azure.cosmos.CosmosDatabase; +import com.azure.cosmos.CosmosDiagnosticsThresholds; import com.azure.cosmos.CosmosException; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.GatewayConnectionConfig; import com.azure.cosmos.ThrottlingRetryOptions; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.models.CosmosClientTelemetryConfig; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchOperations; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; @@ -51,15 +40,35 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import site.ycsb.ByteIterator; import site.ycsb.DB; import site.ycsb.DBException; import site.ycsb.Status; import site.ycsb.StringByteIterator; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** - * Azure Cosmos DB Java SDK 4.6.0 client for YCSB. + * Azure Cosmos DB Java V4 SDK client for YCSB. */ public class AzureCosmosClient extends DB { @@ -74,11 +83,22 @@ public class AzureCosmosClient extends DB { private static final int DEFAULT_MAX_DEGREE_OF_PARALLELISM = -1; private static final int DEFAULT_MAX_BUFFERED_ITEM_COUNT = 0; private static final int DEFAULT_PREFERRED_PAGE_SIZE = -1; - public static final int NUM_UPDATE_ATTEMPTS = 4; + private static final int DEFAULT_DIAGNOSTICS_LATENCY_THRESHOLD_IN_MS = -1; private static final boolean DEFAULT_INCLUDE_EXCEPTION_STACK_IN_LOG = false; private static final String DEFAULT_USER_AGENT = "azurecosmos-ycsb"; private static final Logger LOGGER = LoggerFactory.getLogger(AzureCosmosClient.class); + private static final Marker CREATE_DIAGNOSTIC = MarkerFactory.getMarker("CREATE_DIAGNOSTIC"); + private static final Marker READ_DIAGNOSTIC = MarkerFactory.getMarker("READ_DIAGNOSTIC"); + private static final Marker PATCH_DIAGNOSTIC = MarkerFactory.getMarker("PATCH_DIAGNOSTIC"); + private static final Marker DELETE_DIAGNOSTIC = MarkerFactory.getMarker("DELETE_DIAGNOSTIC"); + private static final Marker QUERY_DIAGNOSTIC = MarkerFactory.getMarker("QUERY_DIAGNOSTIC"); + private static final Marker CREATE_EXCEPTION = MarkerFactory.getMarker("CREATE_EXCEPTION"); + private static final Marker READ_EXCEPTION = MarkerFactory.getMarker("READ_EXCEPTION"); + private static final Marker PATCH_EXCEPTION = MarkerFactory.getMarker("PATCH_EXCEPTION"); + private static final Marker DELETE_EXCEPTION = MarkerFactory.getMarker("DELETE_EXCEPTION"); + private static final Marker QUERY_EXCEPTION = MarkerFactory.getMarker("QUERY_EXCEPTION"); + /** * Count the number of times initialized to teardown on the last @@ -93,10 +113,27 @@ public class AzureCosmosClient extends DB { private static int maxDegreeOfParallelism; private static int maxBufferedItemCount; private static int preferredPageSize; + private static int diagnosticsLatencyThresholdInMS; private static boolean includeExceptionStackInLog; private static Map containerCache; private static String userAgent; + private static Counter readSuccessCounter; + private static Counter readFailureCounter; + private static Timer readSuccessLatencyTimer; + + private static Counter scanSuccessCounter; + private static Counter scanFailureCounter; + private static Timer scanSuccessLatencyTimer; + + private static Counter writeSuccessCounter; + private static Counter writeFailureCounter; + private static Timer writeSuccessLatencyTimer; + + private static Counter updateSuccessCounter; + private static Counter updateFailureCounter; + private static Timer updateSuccessLatencyTimer; + @Override public void init() throws DBException { INIT_COUNT.incrementAndGet(); @@ -122,7 +159,7 @@ private void initAzureCosmosClient() throws DBException { } String uri = this.getStringProperty("azurecosmos.uri", null); - if (primaryKey == null || primaryKey.isEmpty()) { + if (uri == null || uri.isEmpty()) { throw new DBException("Missing uri required to connect to the database."); } @@ -141,6 +178,10 @@ private void initAzureCosmosClient() throws DBException { AzureCosmosClient.preferredPageSize = this.getIntProperty("azurecosmos.preferredPageSize", DEFAULT_PREFERRED_PAGE_SIZE); + AzureCosmosClient.diagnosticsLatencyThresholdInMS = this.getIntProperty( + "azurecosmos.diagnosticsLatencyThresholdInMS", + DEFAULT_DIAGNOSTICS_LATENCY_THRESHOLD_IN_MS); + AzureCosmosClient.includeExceptionStackInLog = this.getBooleanProperty("azurecosmos.includeExceptionStackInLog", DEFAULT_INCLUDE_EXCEPTION_STACK_IN_LOG); @@ -182,6 +223,21 @@ private void initAzureCosmosClient() throws DBException { gatewayConnectionConfig.setIdleConnectionTimeout(Duration.ofSeconds(gatewayIdleConnectionTimeoutInSeconds)); } + String preferredRegions = this.getStringProperty("azurecosmos.preferredRegionList", null); + List preferredRegionList = null; + if (StringUtils.isNotEmpty(preferredRegions)) { + preferredRegions = preferredRegions.trim(); + preferredRegionList = new ArrayList<>(Arrays.asList(preferredRegions.split(","))); + } + + int pointOperationLatencyThresholdInMS = this.getIntProperty("azurecosmos.pointOperationLatencyThresholdInMS", + 100); + + int nonPointOperationLatencyThresholdInMS = this.getIntProperty("azurecosmos.nonPointOperationLatencyThresholdInMS", + 500); + + int requestChargeThreshold = this.getIntProperty("azurecosmos.requestChargeThreshold", 100); + try { LOGGER.info( "Creating Cosmos DB client {}, useGateway={}, consistencyLevel={}," @@ -192,8 +248,18 @@ private void initAzureCosmosClient() throws DBException { AzureCosmosClient.maxDegreeOfParallelism, AzureCosmosClient.maxBufferedItemCount, AzureCosmosClient.preferredPageSize); - CosmosClientBuilder builder = new CosmosClientBuilder().endpoint(uri).key(primaryKey) - .throttlingRetryOptions(retryOptions).consistencyLevel(consistencyLevel).userAgentSuffix(userAgent); + CosmosClientBuilder builder = new CosmosClientBuilder() + .endpoint(uri) + .key(primaryKey) + .throttlingRetryOptions(retryOptions) + .consistencyLevel(consistencyLevel) + .userAgentSuffix(userAgent) + .clientTelemetryConfig(new CosmosClientTelemetryConfig() + .diagnosticsThresholds( + new CosmosDiagnosticsThresholds() + .setPointOperationLatencyThreshold(Duration.ofMillis(pointOperationLatencyThresholdInMS)) + .setNonPointOperationLatencyThreshold(Duration.ofMillis(nonPointOperationLatencyThresholdInMS)) + .setRequestChargeThreshold(requestChargeThreshold))); if (useGateway) { builder = builder.gatewayMode(gatewayConnectionConfig); @@ -201,6 +267,10 @@ private void initAzureCosmosClient() throws DBException { builder = builder.directMode(directConnectionConfig); } + if (preferredRegionList != null && preferredRegionList.size() > 0) { + builder.preferredRegions(preferredRegionList); + } + AzureCosmosClient.client = builder.buildClient(); LOGGER.info("Azure Cosmos DB connection created to {}", uri); } catch (IllegalArgumentException e) { @@ -223,6 +293,11 @@ private void initAzureCosmosClient() throws DBException { throw new DBException( "Invalid database name (" + AzureCosmosClient.databaseName + ") or failed to read database.", e); } + + String appInsightConnectionString = this.getStringProperty("azurecosmos.appInsightConnectionString", null); + if (appInsightConnectionString != null) { + registerMeter(); + } } private String getStringProperty(String propertyName, String defaultValue) { @@ -249,6 +324,18 @@ private int getIntProperty(String propertyName, int defaultValue) { } } + private double getDoubleProperty(String propertyName, double defaultValue) { + String stringVal = getProperties().getProperty(propertyName, null); + if (stringVal == null) { + return defaultValue; + } + try { + return Double.parseDouble(stringVal); + } catch (NumberFormatException e) { + return defaultValue; + } + } + /** * Cleanup any state for this DB. Called once per DB instance; there is one DB * instance per client thread. @@ -284,6 +371,7 @@ public void cleanup() throws DBException { @Override public Status read(String table, String key, Set fields, Map result) { try { + long st = System.nanoTime(); CosmosContainer container = AzureCosmosClient.containerCache.get(table); if (container == null) { container = AzureCosmosClient.database.getContainer(table); @@ -310,10 +398,29 @@ public Status read(String table, String key, Set fields, Map 0 && + response.getDiagnostics().getDuration().compareTo(Duration.ofMillis(diagnosticsLatencyThresholdInMS)) > 0) { + LOGGER.warn(READ_DIAGNOSTIC, response.getDiagnostics().toString()); + } + + if (readSuccessLatencyTimer != null) { + long en = System.nanoTime(); + long latency = (en - st) / 1000; + readSuccessLatencyTimer.record(latency, TimeUnit.MICROSECONDS); + readSuccessCounter.increment(); + } return Status.OK; } catch (CosmosException e) { - LOGGER.error("Failed to read key {} in collection {} in database {}", key, table, AzureCosmosClient.databaseName, - e); + int statusCode = e.getStatusCode(); + if (!AzureCosmosClient.includeExceptionStackInLog) { + e = null; + } + LOGGER.error(READ_EXCEPTION, "Failed to read key {} in collection {} in database {} statusCode {}", key, table, + AzureCosmosClient.databaseName, statusCode, e); + if (readFailureCounter != null) { + readFailureCounter.increment(); + } return Status.NOT_FOUND; } } @@ -322,7 +429,6 @@ public Status read(String table, String key, Set fields, Map fields, Map fields, - Vector> result) { + Vector> result) { try { + long st = System.nanoTime(); CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions(); queryOptions.setMaxDegreeOfParallelism(AzureCosmosClient.maxDegreeOfParallelism); queryOptions.setMaxBufferedItemCount(AzureCosmosClient.maxBufferedItemCount); @@ -354,7 +461,8 @@ public Status scan(String table, String startkey, int recordcount, Set f Iterator> pageIterator = pagedIterable .iterableByPage(AzureCosmosClient.preferredPageSize).iterator(); while (pageIterator.hasNext()) { - List pageDocs = pageIterator.next().getResults(); + FeedResponse feedResponse = pageIterator.next(); + List pageDocs = feedResponse.getResults(); for (ObjectNode doc : pageDocs) { Map stringResults = new HashMap<>(doc.size()); Iterator> nodeIterator = doc.fields(); @@ -367,13 +475,24 @@ public Status scan(String table, String startkey, int recordcount, Set f result.add(byteResults); } } + + if (scanSuccessLatencyTimer != null) { + long en = System.nanoTime(); + long latency = (en - st) / 1000; + scanSuccessLatencyTimer.record(latency, TimeUnit.MICROSECONDS); + scanSuccessCounter.increment(); + } return Status.OK; } catch (CosmosException e) { + int statusCode = e.getStatusCode(); if (!AzureCosmosClient.includeExceptionStackInLog) { e = null; } - LOGGER.error("Failed to query key {} from collection {} in database {}", startkey, table, - AzureCosmosClient.databaseName, e); + LOGGER.error(QUERY_EXCEPTION, "Failed to query key {} from collection {} in database {} statusCode {}", + startkey, table, AzureCosmosClient.databaseName, statusCode, e); + } + if (scanFailureCounter != null) { + scanFailureCounter.increment(); } return Status.ERROR; } @@ -390,45 +509,45 @@ public Status scan(String table, String startkey, int recordcount, Set f */ @Override public Status update(String table, String key, Map values) { + try { + long st = System.nanoTime(); + CosmosContainer container = AzureCosmosClient.containerCache.get(table); + if (container == null) { + container = AzureCosmosClient.database.getContainer(table); + AzureCosmosClient.containerCache.put(table, container); + } - String readEtag = ""; - - // Azure Cosmos DB does not have patch support. Until then, we need to read - // the document, update it, and then write it back. - // This could be made more efficient by using a stored procedure - // and doing the read/modify write on the server side. Perhaps - // that will be a future improvement. - for (int attempt = 0; attempt < NUM_UPDATE_ATTEMPTS; attempt++) { - try { - CosmosContainer container = AzureCosmosClient.containerCache.get(table); - if (container == null) { - container = AzureCosmosClient.database.getContainer(table); - AzureCosmosClient.containerCache.put(table, container); - } - - CosmosItemResponse response = container.readItem(key, new PartitionKey(key), ObjectNode.class); - readEtag = response.getETag(); - ObjectNode node = response.getItem(); - - for (Entry pair : values.entrySet()) { - node.put(pair.getKey(), pair.getValue().toString()); - } + CosmosPatchOperations cosmosPatchOperations = CosmosPatchOperations.create(); + for (Entry pair : values.entrySet()) { + cosmosPatchOperations.replace("/" + pair.getKey(), pair.getValue().toString()); + } - CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); - requestOptions.setIfMatchETag(readEtag); - PartitionKey pk = new PartitionKey(key); - container.replaceItem(node, key, pk, requestOptions); + PartitionKey pk = new PartitionKey(key); + CosmosItemResponse response = container.patchItem(key, pk, cosmosPatchOperations, ObjectNode.class); + if (diagnosticsLatencyThresholdInMS > 0 && + response.getDiagnostics().getDuration().compareTo(Duration.ofMillis(diagnosticsLatencyThresholdInMS)) > 0) { + LOGGER.warn(PATCH_DIAGNOSTIC, response.getDiagnostics().toString()); + } - return Status.OK; - } catch (CosmosException e) { - if (!AzureCosmosClient.includeExceptionStackInLog) { - e = null; - } - LOGGER.error("Failed to update key {} to collection {} in database {} on attempt {}", key, table, - AzureCosmosClient.databaseName, attempt, e); + if (updateSuccessLatencyTimer != null) { + long en = System.nanoTime(); + long latency = (en - st) / 1000; + updateSuccessLatencyTimer.record(latency, TimeUnit.MICROSECONDS); + updateSuccessCounter.increment(); + } + return Status.OK; + } catch (CosmosException e) { + int statusCode = e.getStatusCode(); + if (!AzureCosmosClient.includeExceptionStackInLog) { + e = null; } + LOGGER.error(PATCH_EXCEPTION, "Failed to update key {} to collection {} in database {} statusCode {}", key, table, + AzureCosmosClient.databaseName, statusCode, e); } + if (updateFailureCounter != null) { + updateFailureCounter.increment(); + } return Status.ERROR; } @@ -446,7 +565,7 @@ public Status insert(String table, String key, Map values) if (LOGGER.isDebugEnabled()) { LOGGER.debug("Insert key: {} into table: {}", key, table); } - + long st = System.nanoTime(); try { CosmosContainer container = AzureCosmosClient.containerCache.get(table); if (container == null) { @@ -457,21 +576,39 @@ public Status insert(String table, String key, Map values) ObjectNode node = OBJECT_MAPPER.createObjectNode(); node.put("id", key); + for (Map.Entry pair : values.entrySet()) { node.put(pair.getKey(), pair.getValue().toString()); } + CosmosItemResponse response; if (AzureCosmosClient.useUpsert) { - container.upsertItem(node, pk, new CosmosItemRequestOptions()); + response = container.upsertItem(node, pk, new CosmosItemRequestOptions()); } else { - container.createItem(node, pk, new CosmosItemRequestOptions()); + response = container.createItem(node, pk, new CosmosItemRequestOptions()); + } + + if (diagnosticsLatencyThresholdInMS > 0 && + response.getDiagnostics().getDuration().compareTo(Duration.ofMillis(diagnosticsLatencyThresholdInMS)) > 0) { + LOGGER.warn(CREATE_DIAGNOSTIC, response.getDiagnostics().toString()); + } + + if (writeSuccessLatencyTimer != null) { + long en = System.nanoTime(); + long latency = (en - st) / 1000; + writeSuccessLatencyTimer.record(latency, TimeUnit.MICROSECONDS); + writeSuccessCounter.increment(); } return Status.OK; } catch (CosmosException e) { + int statusCode = e.getStatusCode(); if (!AzureCosmosClient.includeExceptionStackInLog) { e = null; } - LOGGER.error("Failed to insert key {} to collection {} in database {}", key, table, - AzureCosmosClient.databaseName, e); + LOGGER.error(CREATE_EXCEPTION, "Failed to insert key {} to collection {} in database {} statusCode {}", key, + table, AzureCosmosClient.databaseName, statusCode, e); + } + if (writeFailureCounter != null) { + writeFailureCounter.increment(); } return Status.ERROR; } @@ -487,14 +624,22 @@ public Status delete(String table, String key) { container = AzureCosmosClient.database.getContainer(table); AzureCosmosClient.containerCache.put(table, container); } - container.deleteItem(key, new PartitionKey(key), new CosmosItemRequestOptions()); + CosmosItemResponse response = container.deleteItem(key, + new PartitionKey(key), + new CosmosItemRequestOptions()); + if (diagnosticsLatencyThresholdInMS > 0 && + response.getDiagnostics().getDuration().compareTo(Duration.ofMillis(diagnosticsLatencyThresholdInMS)) > 0) { + LOGGER.warn(DELETE_DIAGNOSTIC, response.getDiagnostics().toString()); + } return Status.OK; - } catch (Exception e) { + } catch (CosmosException e) { + int statusCode = e.getStatusCode(); if (!AzureCosmosClient.includeExceptionStackInLog) { e = null; } - LOGGER.error("Failed to delete key {} in collection {}", key, table, e); + LOGGER.error(DELETE_EXCEPTION, "Failed to delete key {} in collection {} database {} statusCode {}", key, table, + AzureCosmosClient.databaseName, statusCode, e); } return Status.ERROR; } @@ -514,4 +659,42 @@ private String createSelectTop(Set fields, int top) { return result.toString(); } } -} + + private void registerMeter() { + if (this.getDoubleProperty("readproportion", 0) > 0) { + readSuccessCounter = Metrics.globalRegistry.counter("Read Successful Operations"); + readFailureCounter = Metrics.globalRegistry.counter("Read Unsuccessful Operations"); + readSuccessLatencyTimer = Timer.builder("Read Successful Latency") + .publishPercentiles(0.5, 0.95, 0.99, 0.999, 0.9999) + .publishPercentileHistogram() + .register(Metrics.globalRegistry); + } + + if (this.getDoubleProperty("insertproportion", 0) > 0) { + writeSuccessCounter = Metrics.globalRegistry.counter("Write Successful Operations"); + writeFailureCounter = Metrics.globalRegistry.counter("Write Unsuccessful Operations"); + writeSuccessLatencyTimer = Timer.builder("Write Successful Latency") + .publishPercentiles(0.5, 0.95, 0.99, 0.999, 0.9999) + .publishPercentileHistogram() + .register(Metrics.globalRegistry); + } + + if (this.getDoubleProperty("scanproportion", 0) > 0) { + scanSuccessCounter = Metrics.globalRegistry.counter("Scan Successful Operations"); + scanFailureCounter = Metrics.globalRegistry.counter("Scan Unsuccessful Operations"); + scanSuccessLatencyTimer = Timer.builder("Scan Successful Latency") + .publishPercentiles(0.5, 0.95, 0.99, 0.999, 0.9999) + .publishPercentileHistogram() + .register(Metrics.globalRegistry); + } + + if (this.getDoubleProperty("updateproportion", 0) > 0) { + updateSuccessCounter = Metrics.globalRegistry.counter("Update Successful Operations"); + updateFailureCounter = Metrics.globalRegistry.counter("Update Unsuccessful Operations"); + updateSuccessLatencyTimer = Timer.builder("Update Successful Latency") + .publishPercentiles(0.5, 0.95, 0.99, 0.999, 0.9999) + .publishPercentileHistogram() + .register(Metrics.globalRegistry); + } + } +} \ No newline at end of file diff --git a/azurecosmos/src/main/resources/log4j.properties b/azurecosmos/src/main/resources/log4j.properties deleted file mode 100644 index 49a3f1a7d2..0000000000 --- a/azurecosmos/src/main/resources/log4j.properties +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright (c) 2018 YCSB contributors. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you -# may not use this file except in compliance with the License. You -# may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. See the License for the specific language governing -# permissions and limitations under the License. See accompanying -# LICENSE file. - -#define the console appender -log4j.appender.consoleAppender = org.apache.log4j.ConsoleAppender - -# now define the layout for the appender -log4j.appender.consoleAppender.layout = org.apache.log4j.PatternLayout -log4j.appender.consoleAppender.layout.ConversionPattern=%-4r [%t] %-5p %c %x -%m%n - -# now map our console appender as a root logger, means all log messages will go -# to this appender -log4j.rootLogger = INFO, consoleAppender - -# Set HTTP components' logger to INFO -log4j.category.org.apache.http=INFO -log4j.category.org.apache.http.wire=INFO -log4j.category.org.apache.http.headers=INFO diff --git a/azurecosmos/src/main/resources/log4j2.xml b/azurecosmos/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..5f5766503a --- /dev/null +++ b/azurecosmos/src/main/resources/log4j2.xml @@ -0,0 +1,174 @@ + + + + %-4r [%t] %-5p %c %x -%m%n + /tmp/cosmos_client_logs/cosmos_diagnostics + /tmp/cosmos_client_logs/cosmos_exceptions + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/bin/bindings.properties b/bin/bindings.properties index 5c767c7599..a0e9bc10b8 100755 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -59,6 +59,7 @@ kudu:site.ycsb.db.KuduYCSBClient memcached:site.ycsb.db.MemcachedClient mongodb:site.ycsb.db.MongoDbClient mongodb-async:site.ycsb.db.AsyncMongoDbClient +mongodbreactivestreams:site.ycsb.db.MongoDbReactiveStreamsClient nosqldb:site.ycsb.db.NoSqlDbClient orientdb:site.ycsb.db.OrientDBClient postgrenosql:site.ycsb.postgrenosql.PostgreNoSQLDBClient diff --git a/bin/ycsb b/bin/ycsb index b5c85e35bb..99922122ee 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -88,6 +88,7 @@ DATABASES = { "maprjsondb" : "site.ycsb.db.mapr.MapRJSONDBClient", "mongodb" : "site.ycsb.db.MongoDbClient", "mongodb-async": "site.ycsb.db.AsyncMongoDbClient", + "mongodbreactivestreams" : "site.ycsb.db.MongoDbReactiveStreamsClient", "nosqldb" : "site.ycsb.db.NoSqlDbClient", "orientdb" : "site.ycsb.db.OrientDBClient", "postgrenosql" : "site.ycsb.postgrenosql.PostgreNoSQLDBClient", diff --git a/core/src/main/java/site/ycsb/Client.java b/core/src/main/java/site/ycsb/Client.java index bd96821170..990ff9b1e8 100644 --- a/core/src/main/java/site/ycsb/Client.java +++ b/core/src/main/java/site/ycsb/Client.java @@ -408,18 +408,18 @@ private static List initDb(String dbname, Properties props, int th final List clients = new ArrayList<>(threadcount); try (final TraceScope span = tracer.newScope(CLIENT_INIT_SPAN)) { - int opcount; + long opcount; if (dotransactions) { - opcount = Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY, "0")); + opcount = Long.parseLong(props.getProperty(OPERATION_COUNT_PROPERTY, "0")); } else { if (props.containsKey(INSERT_COUNT_PROPERTY)) { - opcount = Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY, "0")); + opcount = Long.parseLong(props.getProperty(INSERT_COUNT_PROPERTY, "0")); } else { - opcount = Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT)); + opcount = Long.parseLong(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT)); } } if (threadcount > opcount && opcount > 0){ - threadcount = opcount; + threadcount = (int) opcount; System.out.println("Warning: the threadcount is bigger than recordcount, the threadcount will be recordcount!"); } for (int threadid = 0; threadid < threadcount; threadid++) { @@ -432,7 +432,7 @@ private static List initDb(String dbname, Properties props, int th break; } - int threadopcount = opcount / threadcount; + long threadopcount = opcount / threadcount; // ensure correct number of operations, in case opcount is not a multiple of threadcount if (threadid < opcount % threadcount) { diff --git a/core/src/main/java/site/ycsb/ClientThread.java b/core/src/main/java/site/ycsb/ClientThread.java index fcfc13267d..20dacf2fd7 100644 --- a/core/src/main/java/site/ycsb/ClientThread.java +++ b/core/src/main/java/site/ycsb/ClientThread.java @@ -34,7 +34,7 @@ public class ClientThread implements Runnable { private DB db; private boolean dotransactions; private Workload workload; - private int opcount; + private long opcount; private double targetOpsPerMs; private int opsdone; @@ -56,7 +56,7 @@ public class ClientThread implements Runnable { * @param targetperthreadperms target number of operations per thread per ms * @param completeLatch The latch tracking the completion of all clients. */ - public ClientThread(DB db, boolean dotransactions, Workload workload, Properties props, int opcount, + public ClientThread(DB db, boolean dotransactions, Workload workload, Properties props, long opcount, double targetperthreadperms, CountDownLatch completeLatch) { this.db = db; this.dotransactions = dotransactions; @@ -81,7 +81,7 @@ public void setThreadCount(final int threadCount) { threadcount = threadCount; } - public int getOpsDone() { + public long getOpsDone() { return opsdone; } @@ -179,8 +179,8 @@ private void throttleNanos(long startTimeNanos) { /** * The total amount of work this thread is still expected to do. */ - int getOpsTodo() { - int todo = opcount - opsdone; + long getOpsTodo() { + long todo = opcount - opsdone; return todo < 0 ? 0 : todo; } } diff --git a/core/src/main/java/site/ycsb/workloads/CoreWorkload.java b/core/src/main/java/site/ycsb/workloads/CoreWorkload.java index 082cb0dc2b..29a9889f3a 100644 --- a/core/src/main/java/site/ycsb/workloads/CoreWorkload.java +++ b/core/src/main/java/site/ycsb/workloads/CoreWorkload.java @@ -448,7 +448,7 @@ public void init(Properties p) throws WorkloadException { long insertstart = Long.parseLong(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT)); long insertcount= - Integer.parseInt(p.getProperty(INSERT_COUNT_PROPERTY, String.valueOf(recordcount - insertstart))); + Long.parseLong(p.getProperty(INSERT_COUNT_PROPERTY, String.valueOf(recordcount - insertstart))); // Confirm valid values for insertstart and insertcount in relation to recordcount if (recordcount < (insertstart + insertcount)) { System.err.println("Invalid combination of insertstart, insertcount and recordcount."); diff --git a/distribution/pom.xml b/distribution/pom.xml index 742a42f511..0165a36495 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -184,6 +184,11 @@ LICENSE file. mongodb-binding ${project.version} + + site.ycsb + mongodbreactivestreams-binding + ${project.version} + site.ycsb nosqldb-binding diff --git a/mongodbreactivestreams/README.md b/mongodbreactivestreams/README.md new file mode 100644 index 0000000000..b60de309e3 --- /dev/null +++ b/mongodbreactivestreams/README.md @@ -0,0 +1,140 @@ + + +## Quick Start + +This section describes how to run YCSB on MongoDB. + +### 1. Start MongoDB + +First, download MongoDB and start `mongod`. For example, to start MongoDB +on x86-64 Linux box: + + wget http://fastdl.mongodb.org/linux/mongodb-linux-x86_64-x.x.x.tgz + tar xfvz mongodb-linux-x86_64-*.tgz + mkdir /tmp/mongodb + cd mongodb-linux-x86_64-* + ./bin/mongod --dbpath /tmp/mongodb + +Replace x.x.x above with the latest stable release version for MongoDB. +See http://docs.mongodb.org/manual/installation/ for installation steps for various operating systems. + +### 2. Install Java and Maven + +Go to http://www.oracle.com/technetwork/java/javase/downloads/index.html + +and get the url to download the rpm into your server. For example: + + wget http://download.oracle.com/otn-pub/java/jdk/7u40-b43/jdk-7u40-linux-x64.rpm?AuthParam=11232426132 -o jdk-7u40-linux-x64.rpm + rpm -Uvh jdk-7u40-linux-x64.rpm + +Or install via yum/apt-get + + sudo yum install java-devel + +Download MVN from http://maven.apache.org/download.cgi + + wget http://ftp.heanet.ie/mirrors/www.apache.org/dist/maven/maven-3/3.1.1/binaries/apache-maven-3.1.1-bin.tar.gz + sudo tar xzf apache-maven-*-bin.tar.gz -C /usr/local + cd /usr/local + sudo ln -s apache-maven-* maven + sudo vi /etc/profile.d/maven.sh + +Add the following to `maven.sh` + + export M2_HOME=/usr/local/maven + export PATH=${M2_HOME}/bin:${PATH} + +Reload bash and test mvn + + bash + mvn -version + +### 3. Set Up YCSB + +Download the YCSB zip file and compile: + + curl -O --location https://github.com/brianfrankcooper/YCSB/releases/download/0.5.0/ycsb-0.5.0.tar.gz + tar xfvz ycsb-0.5.0.tar.gz + cd ycsb-0.5.0 + +### 4. Run YCSB + +Now you are ready to run! First, lets load the data: + + ./bin/ycsb load mongodbreactivestreams -s -P workloads/workloada > outputLoad.txt + +Then, run the workload: + + ./bin/ycsb run mongodbreactivestreams -s -P workloads/workloada > outputRun.txt + +See the next section for the list of configuration parameters for MongoDB. + +## Log Level Control +Due to the mongodb driver defaulting to a log level of DEBUG, the log4j2.xml file is included with this module restricts the org.mongodb logging to WARN. + +## MongoDB Configuration Parameters + +- `mongodb.url` + - This should be a MongoDB URI or connection string. + - See http://docs.mongodb.org/manual/reference/connection-string/ for the standard options. + - Default value is `mongodb://localhost:27017/ycsb?w=1` + - Default value of database is `ycsb` + +- `mongodb.batchsize` + - Useful for the insert workload as it will submit the inserts in batches inproving throughput. + - Default value is `1`. + +- `mongodb.upsert` + - Determines if the insert operation performs an update with the upsert operation or a insert. + Upserts have the advantage that they will continue to work for a partially loaded data set. + - Setting to `true` uses updates, `false` uses insert operations. + - Default value is `false`. + +- `mongodb.writeConcern` + - **Deprecated** - Use the `w` and `journal` options on the MongoDB URI provided by the `mongodb.url`. + - Allowed values are : + - `errors_ignored` + - `unacknowledged` + - `acknowledged` + - `journaled` + - `replica_acknowledged` + - `majority` + - Default value is `acknowledged`. + +- `mongodb.readPreference` + - **Deprecated** - Use the `readPreference` options on the MongoDB URI provided by the `mongodb.url`. + - Allowed values are : + - `primary` + - `primary_preferred` + - `secondary` + - `secondary_preferred` + - `nearest` + - Default value is `primary`. + +- `mongodb.maxconnections` + - **Deprecated** - Use the `maxPoolSize` options on the MongoDB URI provided by the `mongodb.url`. + - Default value is `100`. + +- `mongodb.threadsAllowedToBlockForConnectionMultiplier` + - **Deprecated** - Use the `waitQueueMultiple` options on the MongoDB URI provided by the `mongodb.url`. + - Default value is `5`. + +For example: + + ./bin/ycsb load mongodbreactivestreams -s -P workloads/workloada -p mongodb.url=mongodb://localhost:27017/ycsb?w=0 + diff --git a/mongodbreactivestreams/conf/mongodb.properties b/mongodbreactivestreams/conf/mongodb.properties new file mode 100644 index 0000000000..728bd68943 --- /dev/null +++ b/mongodbreactivestreams/conf/mongodb.properties @@ -0,0 +1,28 @@ +# Copyright (c) 2022 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. + +# Mongo DB host uri +# mongodb.url = + +# Set to true to allow inserts to update existing documents. +# If this is false and a document already exists, the insert will fail. +# "true" or "false" +# mongodb.upsert = false Database to be used, if not specified 'ycsb' will be used. + +# Database to be used, if not specified 'ycsb' will be used. +# mongodb.databaseName = ycsb + +# Set insert batchsize, default 1 - to be YCSB-original equivalent +# mongodb.batchsize = 1 diff --git a/mongodbreactivestreams/pom.xml b/mongodbreactivestreams/pom.xml new file mode 100644 index 0000000000..5ea45448e1 --- /dev/null +++ b/mongodbreactivestreams/pom.xml @@ -0,0 +1,68 @@ + + + + + + 4.0.0 + + site.ycsb + binding-parent + 0.18.0-SNAPSHOT + ../binding-parent + + + mongodbreactivestreams-binding + MongoDB ReactiveStreams Binding + jar + + + + org.mongodb + mongodb-driver-reactivestreams + ${mongodbreactivestreams.version} + + + org.apache.logging.log4j + log4j-api + 2.18.0 + + + org.apache.logging.log4j + log4j-core + 2.18.0 + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.18.0 + + + site.ycsb + core + ${project.version} + provided + + + junit + junit + 4.13.1 + test + + + + diff --git a/mongodbreactivestreams/src/main/java/site/ycsb/db/MongoDbReactiveStreamsClient.java b/mongodbreactivestreams/src/main/java/site/ycsb/db/MongoDbReactiveStreamsClient.java new file mode 100644 index 0000000000..54ab7d5b9b --- /dev/null +++ b/mongodbreactivestreams/src/main/java/site/ycsb/db/MongoDbReactiveStreamsClient.java @@ -0,0 +1,611 @@ +/* + * Copyright (c) 2022, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package site.ycsb.db; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoInterruptedException; +import com.mongodb.MongoTimeoutException; +import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.client.model.InsertManyOptions; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.result.DeleteResult; +import com.mongodb.client.result.InsertManyResult; +import com.mongodb.client.result.InsertOneResult; +import com.mongodb.client.result.UpdateResult; +import com.mongodb.reactivestreams.client.FindPublisher; +import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoClients; +import com.mongodb.reactivestreams.client.MongoCollection; +import com.mongodb.reactivestreams.client.MongoDatabase; +import org.bson.Document; +import org.bson.types.Binary; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import site.ycsb.ByteArrayByteIterator; +import site.ycsb.ByteIterator; +import site.ycsb.DB; +import site.ycsb.DBException; +import site.ycsb.Status; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * MongoDB Reactive Streams client for YCSB framework. + */ +public class MongoDbReactiveStreamsClient extends DB { + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbReactiveStreamsClient.class); + private static final String DEFAULT_DATABASE_NAME = "ycsb"; + private static final boolean DEFAULT_USE_UPSERT = false; + private static final int DEFAULT_BATCH_SIZE = 1; + /** + * Used to include a field in a response. + */ + protected static final int INCLUDE = 1; + + /** + * The database to use. + */ + private static String databaseName; + + /** + * The write concern for the requests. + */ + private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + + /** + * The connection to MongoDB. + */ + private static MongoClient mongoClient; + + /** + * The database to MongoDB. + */ + private static MongoDatabase database; + + /** + * The batch size to use for inserts. + */ + private static int batchSize; + + /** + * If true then use updates with the upsert option for inserts. + */ + private static boolean useUpsert; + + /** + * The default read preference for the test. + */ + private static ReadPreference readPreference; + + /** + * The default write concern for the test. + */ + private static WriteConcern writeConcern; + + // /** The bulk inserts pending for the thread. */ + private final List bulkInserts = new ArrayList(); + + private static final ReplaceOptions REPLACE_WITH_UPSERT = new ReplaceOptions() + .upsert(true); + + private static final InsertManyOptions INSERT_UNORDERED = + new InsertManyOptions().ordered(false); + + /** + * Cleanup any state for this DB. Called once per DB instance; there is one DB + * instance per client thread. + */ + @Override + public final void cleanup() throws DBException { + if (INIT_COUNT.decrementAndGet() == 0) { + try { + mongoClient.close(); + } catch (final Exception e1) { + LOGGER.error("Could not close MongoDB connection pool: " + e1.toString()); + e1.printStackTrace(); + return; + } finally { + mongoClient = null; + database = null; + } + } + } + + /** + * Delete a record from the database. + * + * @param table The name of the table + * @param key The record key of the record to delete. + * @return Zero on success, a non-zero error code on error. See this class's + * description for a discussion of error codes. + */ + @Override + public final Status delete(final String table, final String key) { + try { + final MongoCollection collection = database.getCollection(table); + final Document q = new Document("_id", key); //Builders.start().add("_id", key).build(); + OperationSubscriber deleteSubscriber = new OperationSubscriber(); + collection.withWriteConcern(writeConcern).deleteOne(q).subscribe(deleteSubscriber); + if (deleteSubscriber.first() == null || deleteSubscriber.first().getDeletedCount() <= 0) { + LOGGER.error("Nothing deleted for key " + key); + return Status.NOT_FOUND; + } + return Status.OK; + } catch (final Exception e) { + LOGGER.error(e.toString()); + return Status.ERROR; + } + } + + /** + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public final void init() throws DBException { + INIT_COUNT.incrementAndGet(); + + synchronized (MongoDbReactiveStreamsClient.class) { + if (mongoClient != null) { + return; + } + + // Set insert batchsize, default 1 - to be YCSB-original equivalent + batchSize = this.getIntProperty("mongodb.batchsize", DEFAULT_BATCH_SIZE); + + // Set is inserts are done as upserts. Defaults to false. + useUpsert = this.getBooleanProperty("mongodb.upsert", DEFAULT_USE_UPSERT); + + // Just use the standard connection format URL + // http://docs.mongodb.org/manual/reference/connection-string/ + // to configure the client. + String url = this.getStringProperty("mongodb.url", "mongodb://localhost:27017/ycsb?w=1"); + url = OptionsSupport.updateUrl(url, getProperties()); + + if (!url.startsWith("mongodb://")) { + LOGGER.error("ERROR: Invalid URL: '" + url + + "'. Must be of the form " + + "'mongodb://:,:/database?" + + "options'. See " + + "http://docs.mongodb.org/manual/reference/connection-string/."); + System.exit(1); + } + + ConnectionString connectionString = new ConnectionString(url); + try { + databaseName = connectionString.getDatabase(); + if ((databaseName == null) || databaseName.isEmpty()) { + // Default database is "ycsb" if database is not + // specified in URL + databaseName = this.getStringProperty("mongodb.databaseName", DEFAULT_DATABASE_NAME); + } + MongoClientSettings settings = MongoClientSettings.builder() + .applyConnectionString(connectionString) + .retryWrites(false) + .build(); + + mongoClient = MongoClients.create(settings); + + readPreference = settings.getReadPreference(); + writeConcern = settings.getWriteConcern(); + + database = mongoClient.getDatabase(databaseName); + LOGGER.info("mongo connection created with " + url); + } catch (final Exception e1) { + LOGGER.error("Could not initialize MongoDB connection pool for Loader: " + + e1.toString()); + e1.printStackTrace(); + return; + } + } + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error. See the {@link DB} + * class's description for a discussion of error codes. + */ + @Override + public final Status insert(String table, String key, + Map values) { + try { + final MongoCollection collection = database.getCollection(table); + final Document toInsert = new Document("_id", key); + for (final Map.Entry entry : values.entrySet()) { + toInsert.put(entry.getKey(), entry.getValue().toArray()); + } + + OperationSubscriber insertSubscriber; + // Do an upsert. + if (batchSize <= 1) { + long result; + if (useUpsert) { + insertSubscriber = new OperationSubscriber(); + collection.replaceOne(new Document("_id", toInsert.get("_id")), toInsert, REPLACE_WITH_UPSERT) + .subscribe(insertSubscriber); + } else { + insertSubscriber = new OperationSubscriber(); + collection.insertOne(toInsert).subscribe(insertSubscriber); + } + insertSubscriber.await(); + } else { + // Use a bulk insert. + bulkInserts.add(toInsert); + if (bulkInserts.size() == batchSize) { + if (useUpsert) { + List> updates = + new ArrayList>(bulkInserts.size()); + for (Document doc : bulkInserts) { + updates.add(new ReplaceOneModel( + new Document("_id", doc.get("_id")), + doc, REPLACE_WITH_UPSERT)); + } + insertSubscriber = new OperationSubscriber(); + collection.bulkWrite(updates).subscribe(insertSubscriber); + } else { + insertSubscriber = new OperationSubscriber(); + collection.insertMany(bulkInserts, INSERT_UNORDERED).subscribe(insertSubscriber); + } + insertSubscriber.await(); + bulkInserts.clear(); + } else { + return Status.BATCHED_OK; + } + } + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + /** + * Read a record from the database. Each field/value pair from the result will + * be stored in a HashMap. + * + * @param table The name of the table + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error or "not found". + */ + @Override + public final Status read(final String table, final String key, + final Set fields, final Map result) { + try { + MongoCollection collection = database.getCollection(table); + OperationSubscriber readSubscriber = new OperationSubscriber(); + Document query = new Document("_id", key); + FindPublisher findPublisher = collection.find(query); + if (fields != null) { + Document projection = new Document(); + for (String field : fields) { + projection.put(field, INCLUDE); + } + findPublisher.projection(projection); + } + findPublisher.subscribe(readSubscriber); + Document queryResult = readSubscriber.first(); + if (queryResult != null) { + fillMap(result, queryResult); + } + return queryResult != null ? Status.OK : Status.NOT_FOUND; + } catch (final Exception e) { + LOGGER.error(e.toString()); + return Status.ERROR; + } + } + + /** + * Perform a range scan for a set of records in the database. Each field/value + * pair from the result will be stored in a HashMap. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one record + * @return Zero on success, a non-zero error code on error. See the {@link DB} + * class's description for a discussion of error codes. + */ + @Override + public final Status scan(String table, String startkey, + int recordcount, Set fields, + Vector> result) { + try { + MongoCollection collection = database.getCollection(table); + Document scanRange = new Document("$gte", startkey); + Document query = new Document("_id", scanRange); + Document sort = new Document("_id", INCLUDE); + + FindPublisher findPublisher = collection.find(query).sort(sort).limit(1); + if (fields != null) { + Document projection = new Document(); + for (String fieldName : fields) { + projection.put(fieldName, INCLUDE); + } + findPublisher.projection(projection); + } + + result.ensureCapacity(recordcount); + + QuerySubscriber querySubscriber = new QuerySubscriber(result); + findPublisher.subscribe(querySubscriber); + querySubscriber.await(); + + return Status.OK; + } catch (final Exception e) { + LOGGER.error((e.toString())); + return Status.ERROR; + } + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table The name of the table + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error. See the {@link DB} + * class's description for a discussion of error codes. + */ + @Override + public final Status update(String table, String key, + Map values) { + try { + MongoCollection collection = database.getCollection(table); + Document query = new Document("_id", key); + Document fieldsToSet = new Document(); + for (Map.Entry entry : values.entrySet()) { + fieldsToSet.put(entry.getKey(), entry.getValue().toArray()); + } + Document update = new Document("$set", fieldsToSet); + OperationSubscriber updateSubscriber = new OperationSubscriber(); + collection.updateOne(query, update).subscribe(updateSubscriber); + UpdateResult result = updateSubscriber.first(); + if (result.wasAcknowledged() && result.getMatchedCount() == 0) { + LOGGER.error("Nothing updated for key " + key); + return Status.NOT_FOUND; + } + return Status.OK; + } catch (final Exception e) { + LOGGER.error(e.toString()); + return Status.ERROR; + } + } + + /** + * Fills the map with the ByteIterators from the document. + * + * @param result The map to fill. + * @param queryResult The document to fill from. + */ + protected static final void fillMap(final Map result, + final Document queryResult) { + for (Map.Entry entry : queryResult.entrySet()) { + if (entry.getValue() instanceof Binary) { + result.put(entry.getKey(), + new ByteArrayByteIterator(((Binary) entry.getValue()).getData())); + } + } + } + + private String getStringProperty(String propertyName, String defaultValue) { + return getProperties().getProperty(propertyName, defaultValue); + } + + private boolean getBooleanProperty(String propertyName, boolean defaultValue) { + String stringVal = getProperties().getProperty(propertyName, null); + if (stringVal == null) { + return defaultValue; + } + return Boolean.parseBoolean(stringVal); + } + + private int getIntProperty(String propertyName, int defaultValue) { + String stringVal = getProperties().getProperty(propertyName, null); + if (stringVal == null) { + return defaultValue; + } + try { + return Integer.parseInt(stringVal); + } catch (NumberFormatException e) { + return defaultValue; + } + } + + /** + * A Subscriber that stores the publishers results and provides a latch so can block on completion. + * + * @param The publishers result type + */ + public abstract static class ObservableSubscriber implements Subscriber { + private final List received; + private final List errors; + private final CountDownLatch latch; + private volatile Subscription subscription; + private volatile boolean completed; + + /** + * Construct an instance. + */ + public ObservableSubscriber() { + this.received = new ArrayList<>(); + this.errors = new ArrayList<>(); + this.latch = new CountDownLatch(1); + } + + @Override + public void onSubscribe(final Subscription sub) { + this.subscription = sub; + } + + @Override + public void onNext(final T t) { + received.add(t); + } + + @Override + public void onError(final Throwable throwable) { + if (throwable instanceof RuntimeException) { + errors.add((RuntimeException) throwable); + } else { + errors.add(new RuntimeException("Unexpected exception", throwable)); + } + onComplete(); + } + + @Override + public void onComplete() { + completed = true; + latch.countDown(); + } + + /** + * Get received elements. + * + * @return the list of received elements. + */ + public List getReceived() { + return received; + } + + /** + * Get received elements. + * + * @return the list of receive elements + */ + public List get() { + return await().getReceived(); + } + + /** + * Get the first received element. + * + * @return the first received element + */ + public T first() { + List receivedElements = await().getReceived(); + return receivedElements.size() > 0 ? receivedElements.get(0) : null; + } + + /** + * Await completion or error. + * + * @return this + */ + public ObservableSubscriber await() { + return await(300, TimeUnit.SECONDS); + } + + /** + * Await completion or error. + * + * @param timeout how long to wait + * @param unit the time unit + * @return this + */ + public ObservableSubscriber await(final long timeout, final TimeUnit unit) { + subscription.request(Integer.MAX_VALUE); + try { + if (!latch.await(timeout, unit)) { + throw new MongoTimeoutException("Publisher onComplete timed out"); + } + } catch (InterruptedException e) { + throw new MongoInterruptedException("Interrupted waiting for observeration", e); + } + if (!errors.isEmpty()) { + throw errors.get(0); + } + return this; + } + } + + /** + * A CRUD operation Subscriber. + * + * @param The publishers result type + */ + public static class OperationSubscriber extends ObservableSubscriber { + @Override + public void onSubscribe(final Subscription s) { + super.onSubscribe(s); + s.request(Integer.MAX_VALUE); + } + } + + /** + * A Subscriber that prints the json version of each document. + */ + public static class PrintDocumentSubscriber extends OperationSubscriber { + + @Override + public void onNext(final Document document) { + super.onNext(document); + LOGGER.info(document.toJson()); + } + } + + /** + * A Query Subscriber. + */ + public static class QuerySubscriber extends ObservableSubscriber { + private Vector> result; + + public QuerySubscriber(Vector> result) { + this.result = result; + } + + @Override + public void onSubscribe(final Subscription s) { + super.onSubscribe(s); + s.request(Integer.MAX_VALUE); + } + + @Override + public void onNext(final Document t) { + LOGGER.info(t.toJson()); + HashMap resultMap = + new HashMap(); + fillMap(resultMap, t); + result.add(resultMap); + } + } +} diff --git a/mongodbreactivestreams/src/main/java/site/ycsb/db/OptionsSupport.java b/mongodbreactivestreams/src/main/java/site/ycsb/db/OptionsSupport.java new file mode 100644 index 0000000000..619a35017d --- /dev/null +++ b/mongodbreactivestreams/src/main/java/site/ycsb/db/OptionsSupport.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package site.ycsb.db; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * OptionsSupport provides methods for handling legacy options. + * + * @author rjm + */ +public final class OptionsSupport { + private static final Logger LOGGER = LoggerFactory.getLogger(OptionsSupport.class); + /** Value for an unavailable property. */ + private static final String UNAVAILABLE = "n/a"; + + /** + * Updates the URL with the appropriate attributes if legacy properties are + * set and the URL does not have the property already set. + * + * @param url + * The URL to update. + * @param props + * The legacy properties. + * @return The updated URL. + */ + public static String updateUrl(String url, Properties props) { + String result = url; + + // max connections. + final String maxConnections = + props.getProperty("mongodb.maxconnections", UNAVAILABLE).toLowerCase(); + if (!UNAVAILABLE.equals(maxConnections)) { + result = addUrlOption(result, "maxPoolSize", maxConnections); + } + + // Blocked thread multiplier. + final String threadsAllowedToBlockForConnectionMultiplier = + props + .getProperty( + "mongodb.threadsAllowedToBlockForConnectionMultiplier", + UNAVAILABLE).toLowerCase(); + if (!UNAVAILABLE.equals(threadsAllowedToBlockForConnectionMultiplier)) { + result = + addUrlOption(result, "waitQueueMultiple", + threadsAllowedToBlockForConnectionMultiplier); + } + + // write concern + String writeConcernType = + props.getProperty("mongodb.writeConcern", UNAVAILABLE).toLowerCase(); + if (!UNAVAILABLE.equals(writeConcernType)) { + if ("errors_ignored".equals(writeConcernType)) { + result = addUrlOption(result, "w", "0"); + } else if ("unacknowledged".equals(writeConcernType)) { + result = addUrlOption(result, "w", "0"); + } else if ("acknowledged".equals(writeConcernType)) { + result = addUrlOption(result, "w", "1"); + } else if ("journaled".equals(writeConcernType)) { + result = addUrlOption(result, "journal", "true"); // this is the + // documented option + // name + result = addUrlOption(result, "j", "true"); // but keep this until + // MongoDB Java driver + // supports "journal" option + } else if ("replica_acknowledged".equals(writeConcernType)) { + result = addUrlOption(result, "w", "2"); + } else if ("majority".equals(writeConcernType)) { + result = addUrlOption(result, "w", "majority"); + } else { + LOGGER.error("WARNING: Invalid writeConcern: '" + + writeConcernType + "' will be ignored. " + + "Must be one of [ unacknowledged | acknowledged | " + + "journaled | replica_acknowledged | majority ]"); + } + } + + // read preference + String readPreferenceType = + props.getProperty("mongodb.readPreference", UNAVAILABLE).toLowerCase(); + if (!UNAVAILABLE.equals(readPreferenceType)) { + if ("primary".equals(readPreferenceType)) { + result = addUrlOption(result, "readPreference", "primary"); + } else if ("primary_preferred".equals(readPreferenceType)) { + result = addUrlOption(result, "readPreference", "primaryPreferred"); + } else if ("secondary".equals(readPreferenceType)) { + result = addUrlOption(result, "readPreference", "secondary"); + } else if ("secondary_preferred".equals(readPreferenceType)) { + result = addUrlOption(result, "readPreference", "secondaryPreferred"); + } else if ("nearest".equals(readPreferenceType)) { + result = addUrlOption(result, "readPreference", "nearest"); + } else { + LOGGER.error("WARNING: Invalid readPreference: '" + + readPreferenceType + "' will be ignored. " + + "Must be one of [ primary | primary_preferred | " + + "secondary | secondary_preferred | nearest ]"); + } + } + + return result; + } + + /** + * Adds an option to the url if it does not already contain the option. + * + * @param url + * The URL to append the options to. + * @param name + * The name of the option. + * @param value + * The value for the option. + * @return The updated URL. + */ + private static String addUrlOption(String url, String name, String value) { + String fullName = name + "="; + if (!url.contains(fullName)) { + if (url.contains("?")) { + return url + "&" + fullName + value; + } + return url + "?" + fullName + value; + } + return url; + } + + /** + * Hidden Constructor. + */ + private OptionsSupport() { + // Nothing. + } +} diff --git a/mongodbreactivestreams/src/main/java/site/ycsb/db/package-info.java b/mongodbreactivestreams/src/main/java/site/ycsb/db/package-info.java new file mode 100644 index 0000000000..3260a4ddab --- /dev/null +++ b/mongodbreactivestreams/src/main/java/site/ycsb/db/package-info.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2022, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for MongoDB. + * For additional details on using and configuring the binding see the + * accompanying README.md. + */ +package site.ycsb.db; + diff --git a/mongodbreactivestreams/src/main/resources/log4j2.xml b/mongodbreactivestreams/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..d3dbafd56c --- /dev/null +++ b/mongodbreactivestreams/src/main/resources/log4j2.xml @@ -0,0 +1,19 @@ + + + + %-4r [%t] %-5p %c %x -%m%n + + + + + + + + + + + + + + + diff --git a/mongodbreactivestreams/src/test/java/site/ycsb/db/MongoDbReactiveStreamsClientTest.java b/mongodbreactivestreams/src/test/java/site/ycsb/db/MongoDbReactiveStreamsClientTest.java new file mode 100644 index 0000000000..62ede28522 --- /dev/null +++ b/mongodbreactivestreams/src/test/java/site/ycsb/db/MongoDbReactiveStreamsClientTest.java @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2022, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package site.ycsb.db; + +import org.junit.BeforeClass; +import org.junit.Test; +import site.ycsb.ByteArrayByteIterator; +import site.ycsb.ByteIterator; +import site.ycsb.DB; +import site.ycsb.Status; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNoException; + +/** + * MongoDbReactiveStreamsClientTest provides runs the basic DB test cases. + *

+ * The tests will be skipped if MongoDB is not running on port 27017 on the + * local machine. See the README.md for how to get MongoDB running. + *

+ */ +public class MongoDbReactiveStreamsClientTest { + + /** The default port for MongoDB. */ + private static final int MONGODB_DEFAULT_PORT = 27017; + + /** The client to use. */ + private DB myClient = null; + + /** + * Verifies the mongod process (or some process) is running on port 27017, if + * not the tests are skipped. + */ + @BeforeClass + public static void setUpBeforeClass() { + // Test if we can connect. + Socket socket = null; + try { + // Connect + socket = new Socket(InetAddress.getLocalHost(), MONGODB_DEFAULT_PORT); + assertThat("Socket is not bound.", socket.getLocalPort(), not(-1)); + } catch (IOException connectFailed) { + assumeNoException("MongoDB is not running. Skipping tests.", + connectFailed); + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException ignore) { + // Ignore. + } + } + socket = null; + } + } + + /** + * Test method for {@link DB#insert}, {@link DB#read}, and {@link DB#delete} . + */ + @Test + public void testInsertReadDelete() { + final DB client = getDB(); + + final String table = getClass().getSimpleName(); + final String id = "delete"; + + HashMap inserted = + new HashMap(); + inserted.put("a", new ByteArrayByteIterator(new byte[] { 1, 2, 3, 4 })); + Status result = client.insert(table, id, inserted); + assertThat("Insert did not return success (0).", result, is(Status.OK)); + + HashMap read = new HashMap(); + Set keys = Collections.singleton("a"); + result = client.read(table, id, keys, read); + assertThat("Read did not return success (0).", result, is(Status.OK)); + for (String key : keys) { + ByteIterator iter = read.get(key); + + assertThat("Did not read the inserted field: " + key, iter, + notNullValue()); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 1))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 2))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 3))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 4))); + assertFalse(iter.hasNext()); + } + + result = client.delete(table, id); + assertThat("Delete did not return success (0).", result, is(Status.OK)); + + read.clear(); + result = client.read(table, id, null, read); + assertThat("Read, after delete, did not return not found (1).", result, + is(Status.NOT_FOUND)); + assertThat("Found the deleted fields.", read.size(), is(0)); + + result = client.delete(table, id); + assertThat("Delete did not return not found (1).", result, is(Status.NOT_FOUND)); + } + + /** + * Test method for {@link DB#insert}, {@link DB#read}, and {@link DB#update} . + */ + @Test + public void testInsertReadUpdate() { + DB client = getDB(); + + final String table = getClass().getSimpleName(); + final String id = "update"; + + HashMap inserted = + new HashMap(); + inserted.put("a", new ByteArrayByteIterator(new byte[] { 1, 2, 3, 4 })); + Status result = client.insert(table, id, inserted); + assertThat("Insert did not return success (0).", result, is(Status.OK)); + + HashMap read = new HashMap(); + Set keys = Collections.singleton("a"); + result = client.read(table, id, keys, read); + assertThat("Read did not return success (0).", result, is(Status.OK)); + for (String key : keys) { + ByteIterator iter = read.get(key); + + assertThat("Did not read the inserted field: " + key, iter, + notNullValue()); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 1))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 2))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 3))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 4))); + assertFalse(iter.hasNext()); + } + + HashMap updated = new HashMap(); + updated.put("a", new ByteArrayByteIterator(new byte[] { 5, 6, 7, 8 })); + result = client.update(table, id, updated); + assertThat("Update did not return success (0).", result, is(Status.OK)); + + read.clear(); + result = client.read(table, id, null, read); + assertThat("Read, after update, did not return success (0).", result, is(Status.OK)); + for (String key : keys) { + ByteIterator iter = read.get(key); + + assertThat("Did not read the inserted field: " + key, iter, + notNullValue()); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 5))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 6))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 7))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 8))); + assertFalse(iter.hasNext()); + } + } + + /** + * Test method for {@link DB#insert}, {@link DB#read}, and {@link DB#update} . + */ + @Test + public void testInsertReadUpdateWithUpsert() { + Properties props = new Properties(); + props.setProperty("mongodb.upsert", "true"); + DB client = getDB(props); + + final String table = getClass().getSimpleName(); + final String id = "updateWithUpsert"; + + HashMap inserted = + new HashMap(); + inserted.put("a", new ByteArrayByteIterator(new byte[] { 1, 2, 3, 4 })); + Status result = client.insert(table, id, inserted); + assertThat("Insert did not return success (0).", result, is(Status.OK)); + + HashMap read = new HashMap(); + Set keys = Collections.singleton("a"); + result = client.read(table, id, keys, read); + assertThat("Read did not return success (0).", result, is(Status.OK)); + for (String key : keys) { + ByteIterator iter = read.get(key); + + assertThat("Did not read the inserted field: " + key, iter, + notNullValue()); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 1))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 2))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 3))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 4))); + assertFalse(iter.hasNext()); + } + + HashMap updated = new HashMap(); + updated.put("a", new ByteArrayByteIterator(new byte[] { 5, 6, 7, 8 })); + result = client.update(table, id, updated); + assertThat("Update did not return success (0).", result, is(Status.OK)); + + read.clear(); + result = client.read(table, id, null, read); + assertThat("Read, after update, did not return success (0).", result, is(Status.OK)); + for (String key : keys) { + ByteIterator iter = read.get(key); + + assertThat("Did not read the inserted field: " + key, iter, + notNullValue()); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 5))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 6))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 7))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 8))); + assertFalse(iter.hasNext()); + } + } + + /** + * Test method for {@link DB#scan}. + */ + @Test + public void testScan() { + final DB client = getDB(); + + final String table = getClass().getSimpleName(); + + // Insert a bunch of documents. + for (int i = 0; i < 100; ++i) { + HashMap inserted = + new HashMap(); + inserted.put("a", new ByteArrayByteIterator(new byte[] { + (byte) (i & 0xFF), (byte) (i >> 8 & 0xFF), (byte) (i >> 16 & 0xFF), + (byte) (i >> 24 & 0xFF) })); + Status result = client.insert(table, padded(i), inserted); + assertThat("Insert did not return success (0).", result, is(Status.OK)); + } + + Set keys = Collections.singleton("a"); + Vector> results = + new Vector>(); + Status result = client.scan(table, "00050", 5, null, results); + assertThat("Read did not return success (0).", result, is(Status.OK)); + assertThat(results.size(), is(5)); + for (int i = 0; i < 5; ++i) { + Map read = results.get(i); + for (String key : keys) { + ByteIterator iter = read.get(key); + + assertThat("Did not read the inserted field: " + key, iter, + notNullValue()); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) ((i + 50) & 0xFF)))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), + is(Byte.valueOf((byte) ((i + 50) >> 8 & 0xFF)))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), + is(Byte.valueOf((byte) ((i + 50) >> 16 & 0xFF)))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), + is(Byte.valueOf((byte) ((i + 50) >> 24 & 0xFF)))); + assertFalse(iter.hasNext()); + } + } + } + + /** + * Gets the test DB. + * + * @return The test DB. + */ + DB getDB() { + return getDB(new Properties()); + } + + DB getDB(Properties props) { + if( myClient == null ) { + myClient = instantiateClient(); + myClient.setProperties(props); + try { + myClient.init(); + } catch (Exception error) { + assumeNoException(error); + } + } + return myClient; + } + /** + * Creates a zero padded integer. + * + * @param i + * The integer to padd. + * @return The padded integer. + */ + private String padded(int i) { + String result = String.valueOf(i); + while (result.length() < 5) { + result = "0" + result; + } + return result; + } + + DB instantiateClient() { + return new MongoDbReactiveStreamsClient(); + } +} \ No newline at end of file diff --git a/mongodbreactivestreams/src/test/java/site/ycsb/db/OptionsSupportTest.java b/mongodbreactivestreams/src/test/java/site/ycsb/db/OptionsSupportTest.java new file mode 100644 index 0000000000..3a4d08d015 --- /dev/null +++ b/mongodbreactivestreams/src/test/java/site/ycsb/db/OptionsSupportTest.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package site.ycsb.db; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static site.ycsb.db.OptionsSupport.updateUrl; + +/** + * OptionsSupportTest provides tests for the OptionsSupport class. + * + * @author rjm + */ +public class OptionsSupportTest { + + /** + * Test method for {@link OptionsSupport#updateUrl(String, Properties)} for + * {@code mongodb.maxconnections}. + */ + @Test + public void testUpdateUrlMaxConnections() { + assertThat( + updateUrl("mongodb://locahost:27017/", + props("mongodb.maxconnections", "1234")), + is("mongodb://locahost:27017/?maxPoolSize=1234")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.maxconnections", "1234")), + is("mongodb://locahost:27017/?foo=bar&maxPoolSize=1234")); + assertThat( + updateUrl("mongodb://locahost:27017/?maxPoolSize=1", + props("mongodb.maxconnections", "1234")), + is("mongodb://locahost:27017/?maxPoolSize=1")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", props("foo", "1234")), + is("mongodb://locahost:27017/?foo=bar")); + } + + /** + * Test method for {@link OptionsSupport#updateUrl(String, Properties)} for + * {@code mongodb.threadsAllowedToBlockForConnectionMultiplier}. + */ + @Test + public void testUpdateUrlWaitQueueMultiple() { + assertThat( + updateUrl( + "mongodb://locahost:27017/", + props("mongodb.threadsAllowedToBlockForConnectionMultiplier", + "1234")), + is("mongodb://locahost:27017/?waitQueueMultiple=1234")); + assertThat( + updateUrl( + "mongodb://locahost:27017/?foo=bar", + props("mongodb.threadsAllowedToBlockForConnectionMultiplier", + "1234")), + is("mongodb://locahost:27017/?foo=bar&waitQueueMultiple=1234")); + assertThat( + updateUrl( + "mongodb://locahost:27017/?waitQueueMultiple=1", + props("mongodb.threadsAllowedToBlockForConnectionMultiplier", + "1234")), is("mongodb://locahost:27017/?waitQueueMultiple=1")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", props("foo", "1234")), + is("mongodb://locahost:27017/?foo=bar")); + } + + /** + * Test method for {@link OptionsSupport#updateUrl(String, Properties)} for + * {@code mongodb.threadsAllowedToBlockForConnectionMultiplier}. + */ + @Test + public void testUpdateUrlWriteConcern() { + assertThat( + updateUrl("mongodb://locahost:27017/", + props("mongodb.writeConcern", "errors_ignored")), + is("mongodb://locahost:27017/?w=0")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.writeConcern", "unacknowledged")), + is("mongodb://locahost:27017/?foo=bar&w=0")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.writeConcern", "acknowledged")), + is("mongodb://locahost:27017/?foo=bar&w=1")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.writeConcern", "journaled")), + is("mongodb://locahost:27017/?foo=bar&journal=true&j=true")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.writeConcern", "replica_acknowledged")), + is("mongodb://locahost:27017/?foo=bar&w=2")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.writeConcern", "majority")), + is("mongodb://locahost:27017/?foo=bar&w=majority")); + + // w already exists. + assertThat( + updateUrl("mongodb://locahost:27017/?w=1", + props("mongodb.writeConcern", "acknowledged")), + is("mongodb://locahost:27017/?w=1")); + + // Unknown options + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", props("foo", "1234")), + is("mongodb://locahost:27017/?foo=bar")); + } + + /** + * Test method for {@link OptionsSupport#updateUrl(String, Properties)} for + * {@code mongodb.threadsAllowedToBlockForConnectionMultiplier}. + */ + @Test + public void testUpdateUrlReadPreference() { + assertThat( + updateUrl("mongodb://locahost:27017/", + props("mongodb.readPreference", "primary")), + is("mongodb://locahost:27017/?readPreference=primary")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.readPreference", "primary_preferred")), + is("mongodb://locahost:27017/?foo=bar&readPreference=primaryPreferred")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.readPreference", "secondary")), + is("mongodb://locahost:27017/?foo=bar&readPreference=secondary")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.readPreference", "secondary_preferred")), + is("mongodb://locahost:27017/?foo=bar&readPreference=secondaryPreferred")); + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", + props("mongodb.readPreference", "nearest")), + is("mongodb://locahost:27017/?foo=bar&readPreference=nearest")); + + // readPreference already exists. + assertThat( + updateUrl("mongodb://locahost:27017/?readPreference=primary", + props("mongodb.readPreference", "secondary")), + is("mongodb://locahost:27017/?readPreference=primary")); + + // Unknown options + assertThat( + updateUrl("mongodb://locahost:27017/?foo=bar", props("foo", "1234")), + is("mongodb://locahost:27017/?foo=bar")); + } + + /** + * Factory method for a {@link Properties} object. + * + * @param key + * The key for the property to set. + * @param value + * The value for the property to set. + * @return The {@link Properties} with the property added. + */ + private Properties props(String key, String value) { + Properties props = new Properties(); + + props.setProperty(key, value); + + return props; + } + +} diff --git a/pom.xml b/pom.xml index de2d5eea23..6a2b52f13a 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ LICENSE file. 3.1.2 4.4.1 1.8.2 - 4.8.0 + 4.52.0 4.0.0 3.0.0 2.0.1 @@ -135,6 +135,7 @@ LICENSE file. 1.1.8-mapr-1710 3.11.0 2.0.1 + 4.9.0 2.1.1 2.2.37 UTF-8 @@ -187,6 +188,7 @@ LICENSE file. maprjsondb memcached mongodb + mongodbreactivestreams nosqldb orientdb postgrenosql