From 72dd88bdec01b8e82d60dd1e1abee048bcbe67cd Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Tue, 8 Aug 2017 10:00:06 -0700 Subject: [PATCH] [core] Add the TimeSeriesWorkload class, a new type of workload that uses the existing YCSB API to generate time series data (numeric values associated with timestamps). See the TimeSeriesWorkload.java class Javadocs for details on how to use it with a client and workloads/tsworkload_template for parameters. Also add the BasicTSDB instance for printing/debugging the workloads. --- bin/bindings.properties | 1 + bin/ycsb | 5 +- .../src/main/java/com/yahoo/ycsb/BasicDB.java | 8 +- .../main/java/com/yahoo/ycsb/BasicTSDB.java | 273 ++++ .../ycsb/workloads/TimeSeriesWorkload.java | 1286 +++++++++++++++++ .../workloads/TestTimeSeriesWorkload.java | 550 +++++++ workloads/tsworkload_template | 283 ++++ workloads/tsworkloada | 46 + 8 files changed, 2446 insertions(+), 6 deletions(-) create mode 100644 core/src/main/java/com/yahoo/ycsb/BasicTSDB.java create mode 100644 core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java create mode 100644 core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java create mode 100644 workloads/tsworkload_template create mode 100644 workloads/tsworkloada diff --git a/bin/bindings.properties b/bin/bindings.properties index a2aeb9a64b..a3db9189c6 100644 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -32,6 +32,7 @@ arangodb:com.yahoo.ycsb.db.ArangoDBClient arangodb3:com.yahoo.ycsb.db.arangodb.ArangoDB3Client azuretablestorage:com.yahoo.ycsb.db.azuretablestorage.AzureClient basic:com.yahoo.ycsb.BasicDB +basicts:com.yahoo.ycsb.BasicTSDB cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient cloudspanner:com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient diff --git a/bin/ycsb b/bin/ycsb index 7fb7518024..59ad00dd0f 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -57,6 +57,7 @@ DATABASES = { "arangodb3" : "com.yahoo.ycsb.db.arangodb.ArangoDB3Client", "asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient", "basic" : "com.yahoo.ycsb.BasicDB", + "basicts" : "com.yahoo.ycsb.BasicTSDB", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cloudspanner" : "com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient", @@ -269,8 +270,8 @@ def main(): warn("Running against a source checkout. In order to get our runtime " "dependencies we'll have to invoke Maven. Depending on the state " "of your system, this may take ~30-45 seconds") - db_location = "core" if binding == "basic" else binding - project = "core" if binding == "basic" else binding + "-binding" + db_location = "core" if (binding == "basic" or binding == "basicts") else binding + project = "core" if (binding == "basic" or binding == "basicts") else binding + "-binding" db_dir = os.path.join(ycsb_home, db_location) # goes first so we can rely on side-effect of package maven_says = get_classpath_from_maven(project) diff --git a/core/src/main/java/com/yahoo/ycsb/BasicDB.java b/core/src/main/java/com/yahoo/ycsb/BasicDB.java index 9e483f2147..15d1101053 100644 --- a/core/src/main/java/com/yahoo/ycsb/BasicDB.java +++ b/core/src/main/java/com/yahoo/ycsb/BasicDB.java @@ -47,16 +47,16 @@ public class BasicDB extends DB { protected static Map inserts; protected static Map deletes; - private boolean verbose; - private boolean randomizedelay; - private int todelay; + protected boolean verbose; + protected boolean randomizedelay; + protected int todelay; protected boolean count; public BasicDB() { todelay = 0; } - private void delay() { + protected void delay() { if (todelay > 0) { long delayNs; if (randomizedelay) { diff --git a/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java b/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java new file mode 100644 index 0000000000..42117ea9e9 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java @@ -0,0 +1,273 @@ +/** + * Copyright (c) 2017 YCSB contributors All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +import com.yahoo.ycsb.workloads.TimeSeriesWorkload; + +/** + * Basic DB for printing out time series workloads and/or tracking the distribution + * of keys and fields. + */ +public class BasicTSDB extends BasicDB { + + /** Time series workload specific counters. */ + protected static Map timestamps; + protected static Map floats; + protected static Map integers; + + private String timestampKey; + private String valueKey; + private String tagPairDelimiter; + private String queryTimeSpanDelimiter; + private long lastTimestamp; + + @Override + public void init() { + super.init(); + + synchronized (MUTEX) { + if (timestamps == null) { + timestamps = new HashMap(); + floats = new HashMap(); + integers = new HashMap(); + } + } + + timestampKey = getProperties().getProperty( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY, + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT); + valueKey = getProperties().getProperty( + TimeSeriesWorkload.VALUE_KEY_PROPERTY, + TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT); + tagPairDelimiter = getProperties().getProperty( + TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY, + TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY_DEFAULT); + queryTimeSpanDelimiter = getProperties().getProperty( + TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY, + TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT); + } + + public Status read(String table, String key, Set fields, Map result) { + delay(); + + if (verbose) { + StringBuilder sb = getStringBuilder(); + sb.append("READ ").append(table).append(" ").append(key).append(" [ "); + if (fields != null) { + for (String f : fields) { + sb.append(f).append(" "); + } + } else { + sb.append(""); + } + + sb.append("]"); + System.out.println(sb); + } + + if (count) { + Set filtered = null; + if (fields != null) { + filtered = new HashSet(); + for (final String field : fields) { + if (field.startsWith(timestampKey)) { + String[] parts = field.split(tagPairDelimiter); + if (parts[1].contains(queryTimeSpanDelimiter)) { + parts = parts[1].split(queryTimeSpanDelimiter); + lastTimestamp = Long.parseLong(parts[0]); + } else { + lastTimestamp = Long.parseLong(parts[1]); + } + synchronized(timestamps) { + Integer ctr = timestamps.get(lastTimestamp); + if (ctr == null) { + timestamps.put(lastTimestamp, 1); + } else { + timestamps.put(lastTimestamp, ctr + 1); + } + } + } else { + filtered.add(field); + } + } + } + incCounter(reads, hash(table, key, filtered)); + } + return Status.OK; + } + + @Override + public Status update(String table, String key, Map values) { + delay(); + + boolean isFloat = false; + + if (verbose) { + StringBuilder sb = getStringBuilder(); + sb.append("UPDATE ").append(table).append(" ").append(key).append(" [ "); + if (values != null) { + final TreeMap tree = new TreeMap(values); + for (Map.Entry entry : tree.entrySet()) { + if (entry.getKey().equals(timestampKey)) { + sb.append(entry.getKey()).append("=") + .append(Utils.bytesToLong(entry.getValue().toArray())).append(" "); + } else if (entry.getKey().equals(valueKey)) { + final NumericByteIterator it = (NumericByteIterator) entry.getValue(); + isFloat = it.isFloatingPoint(); + sb.append(entry.getKey()).append("=") + .append(isFloat ? it.getDouble() : it.getLong()).append(" "); + } else { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" "); + } + } + } + sb.append("]"); + System.out.println(sb); + } + + if (count) { + if (!verbose) { + isFloat = ((NumericByteIterator) values.get(valueKey)).isFloatingPoint(); + } + int hash = hash(table, key, values); + incCounter(updates, hash); + synchronized(timestamps) { + Integer ctr = timestamps.get(lastTimestamp); + if (ctr == null) { + timestamps.put(lastTimestamp, 1); + } else { + timestamps.put(lastTimestamp, ctr + 1); + } + } + if (isFloat) { + incCounter(floats, hash); + } else { + incCounter(integers, hash); + } + } + + return Status.OK; + } + + @Override + public Status insert(String table, String key, Map values) { + delay(); + + boolean isFloat = false; + + if (verbose) { + StringBuilder sb = getStringBuilder(); + sb.append("INSERT ").append(table).append(" ").append(key).append(" [ "); + if (values != null) { + final TreeMap tree = new TreeMap(values); + for (Map.Entry entry : tree.entrySet()) { + if (entry.getKey().equals(timestampKey)) { + sb.append(entry.getKey()).append("=") + .append(Utils.bytesToLong(entry.getValue().toArray())).append(" "); + } else if (entry.getKey().equals(valueKey)) { + final NumericByteIterator it = (NumericByteIterator) entry.getValue(); + isFloat = it.isFloatingPoint(); + sb.append(entry.getKey()).append("=") + .append(isFloat ? it.getDouble() : it.getLong()).append(" "); + } else { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" "); + } + } + } + sb.append("]"); + System.out.println(sb); + } + + if (count) { + if (!verbose) { + isFloat = ((NumericByteIterator) values.get(valueKey)).isFloatingPoint(); + } + int hash = hash(table, key, values); + incCounter(inserts, hash); + synchronized(timestamps) { + Integer ctr = timestamps.get(lastTimestamp); + if (ctr == null) { + timestamps.put(lastTimestamp, 1); + } else { + timestamps.put(lastTimestamp, ctr + 1); + } + } + if (isFloat) { + incCounter(floats, hash); + } else { + incCounter(integers, hash); + } + } + + return Status.OK; + } + + @Override + public void cleanup() { + super.cleanup(); + if (count && counter < 1) { + System.out.println("[TIMESTAMPS], Unique, " + timestamps.size()); + System.out.println("[FLOATS], Unique series, " + floats.size()); + System.out.println("[INTEGERS], Unique series, " + integers.size()); + + long minTs = Long.MAX_VALUE; + long maxTs = Long.MIN_VALUE; + for (final long ts : timestamps.keySet()) { + if (ts > maxTs) { + maxTs = ts; + } + if (ts < minTs) { + minTs = ts; + } + } + System.out.println("[TIMESTAMPS], Min, " + minTs); + System.out.println("[TIMESTAMPS], Max, " + maxTs); + } + } + + @Override + protected int hash(final String table, final String key, final Map values) { + final TreeMap sorted = new TreeMap(); + for (final Entry entry : values.entrySet()) { + if (entry.getKey().equals(valueKey)) { + continue; + } else if (entry.getKey().equals(timestampKey)) { + lastTimestamp = ((NumericByteIterator) entry.getValue()).getLong(); + entry.getValue().reset(); + continue; + } + sorted.put(entry.getKey(), entry.getValue()); + } + // yeah it's ugly but gives us a unique hash without having to add hashers + // to all of the ByteIterators. + StringBuilder buf = new StringBuilder().append(table).append(key); + for (final Entry entry : sorted.entrySet()) { + entry.getValue().reset(); + buf.append(entry.getKey()) + .append(entry.getValue().toString()); + } + return buf.toString().hashCode(); + } + +} \ No newline at end of file diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java new file mode 100644 index 0000000000..3a637eebd1 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java @@ -0,0 +1,1286 @@ +/** + * Copyright (c) 2017 YCSB contributors All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.workloads; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.TimeUnit; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Client; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.NumericByteIterator; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.Utils; +import com.yahoo.ycsb.Workload; +import com.yahoo.ycsb.WorkloadException; +import com.yahoo.ycsb.generator.DiscreteGenerator; +import com.yahoo.ycsb.generator.Generator; +import com.yahoo.ycsb.generator.HotspotIntegerGenerator; +import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator; +import com.yahoo.ycsb.generator.NumberGenerator; +import com.yahoo.ycsb.generator.RandomDiscreteTimestampGenerator; +import com.yahoo.ycsb.generator.ScrambledZipfianGenerator; +import com.yahoo.ycsb.generator.SequentialGenerator; +import com.yahoo.ycsb.generator.UniformLongGenerator; +import com.yahoo.ycsb.generator.UnixEpochTimestampGenerator; +import com.yahoo.ycsb.generator.ZipfianGenerator; +import com.yahoo.ycsb.measurements.Measurements; + +/** + * A specialized workload dealing with time series data, i.e. series of discreet + * events associated with timestamps and identifiers. For this workload, identities + * consist of a {@link String} key and a set of {@link String} tag key/value + * pairs. + *

+ * For example: + * + * + * + * + * + * + *
Time Series KeyTag Keys/Values148322880014832288601483228920
AAAA=AA, AB=AA42.51.085.9
AAAA=AA, AB=AB-9.476.90.18
ABAA=AA, AB=AA-93.057.1-63.8
ABAA=AA, AB=AB7.656.1-0.3
+ *

+ * This table shows four time series with 3 measurements at three different timestamps. + * Keys, tags, timestamps and values (numeric only at this time) are generated by + * this workload. For details on properties and behavior, see the + * {@code workloads/tsworkload_template} file. The Javadocs will focus on implementation + * and how {@link DB} clients can parse the workload. + *

+ * In order to avoid having existing DB implementations implement a brand new interface + * this workload uses the existing APIs to encode a few special values that can be parsed + * by the client. The special values include the timestamp, numeric value and some + * query (read or scan) parameters. As an example on how to parse the fields, see + * {@link BasicTSDB}. + *

+ * Timestamps + *

+ * Timestamps are presented as Unix Epoch values in units of {@link TimeUnit#SECONDS}, + * {@link TimeUnit#MILLISECONDS} or {@link TimeUnit#NANOSECONDS} based on the + * {@code timestampunits} property. For calls to {@link DB#insert(String, String, java.util.Map)} + * and {@link DB#update(String, String, java.util.Map)}, the timestamp is added to the + * {@code values} map encoded in a {@link NumericByteIterator} with the key defined + * in the {@code timestampkey} property (defaulting to "YCSBTS"). To pull out the timestamp + * when iterating over the values map, cast the {@link ByteIterator} to a + * {@link NumericByteIterator} and call {@link NumericByteIterator#getLong()}. + *

+ * Note that for calls to {@link DB#update(String, String, java.util.Map)}, timestamps + * earlier than the timestamp generator's timestamp will be choosen at random to + * mimic a lambda architecture or old job re-reporting some data. + *

+ * For calls to {@link DB#read(String, String, java.util.Set, java.util.Map)} and + * {@link DB#scan(String, String, int, java.util.Set, Vector)}, timestamps + * are encoded in a {@link StringByteIterator} in a key/value format with the + * {@code tagpairdelimiter} separator. E.g {@code YCSBTS=1483228800}. If {@code querytimespan} + * has been set to a positive value then the value will include a range with the + * starting (oldest) timestamp followed by the {@code querytimespandelimiter} separator + * and the ending (most recent) timestamp. E.g. {@code YCSBTS=1483228800-1483228920}. + *

+ * For calls to {@link DB#delete(String, String)}, encoding is the same as reads and + * scans but key/value pairs are separated by the {@code deletedelimiter} property value. + *

+ * By default, the starting timestamp is the current system time without any rounding. + * All timestamps are then offsets from that starting value. + *

+ * Values + *

+ * Similar to timestamps, values are encoded in {@link NumericByteIterator}s and stored + * in the values map with the key defined in {@code valuekey} (defaulting to "YCSBV"). + * Values can either be 64 bit signed {@link long}s or double precision {@link double}s + * depending on the {@code valuetype} or {@code dataintegrity} properties. When parsing + * out the value, always call {@link NumericByteIterator#isFloatingPoint()} to determine + * whether or not to call {@link NumericByteIterator#getDouble()} (true) or + * {@link NumericByteIterator#getLong()} (false). + *

+ * When {@code dataintegrity} is set to true, then the value is always set to a + * 64 bit signed integer which is the Java hash code of the concatenation of the + * key and map of values (sorted on the map keys and skipping the timestamp and value + * entries) OR'd with the timestamp of the data point. See + * {@link #validationFunction(String, long, TreeMap)} for the implementation. + *

+ * Keys and Tags + *

+ * As mentioned, the workload generates strings for the keys and tags. On initialization + * three string generators are created using the {@link IncrementingPrintableStringGenerator} + * implementation. Then the generators fill three arrays with values based on the + * number of keys, the number of tags and the cardinality of each tag key/value pair. + * This implementation gives us time series like the example table where every string + * starts at something like "AA" (depending on the length of keys, tag keys and tag values) + * and continuing to "ZZ" wherein they rollover back to "AA". + *

+ * Each time series must have a unique set of tag keys, i.e. the key "AA" cannot appear + * more than once per time series. If the workload is configured for four tags with a + * tag key length of 2, the keys would be "AA", "AB", "AC" and "AD". + *

+ * Each tag key is then associated with a tag value. Tag values may appear more than once + * in each time series. E.g. time series will usually start with the tags "AA=AA", + * "AB=AA", "AC=AA" and "AD=AA". The {@code tagcardinality} property determines how many + * unique values will be generated per tag key. In the example table above, the + * {@code tagcardinality} property would have been set to {@code 1,2} meaning tag + * key "AA" would always have the tag value "AA" given a cardinality of 1. However + * tag key "AB" would have values "AA" and "AB" due to a cardinality of 2. This + * cardinality map, along with the number of unique time series keys determines how + * many unique time series are generated for the workload. Tag values share a common + * array of generated strings to save on memory. + *

+ * Operation Order + *

+ * The default behavior of the workload (for inserts and updates) is to generate a + * value for each time series for a given timestamp before incrementing to the next + * timestamp and writing values. This is an ideal workload and some time series + * databases are designed for this behavior. However in the real-world events will + * arrive grouped close to the current system time with a number of events being + * delayed, hence their timestamps are further in the past. The {@code delayedseries} + * property determines the percentage of time series that are delayed by up to + * {@code delayedintervals} intervals. E.g. setting this value to 0.05 means that + * 5% of the time series will be written with timestamps earlier than the timestamp + * generator's current time. + *

+ * Reads and Scans + *

+ * For benchmarking queries, some common tasks implemented by almost every time series + * data base are available and are passed in the fields {@link Set}: + *

+ * GroupBy - A common operation is to aggregate multiple time series into a + * single time series via common parameters. For example, a user may want to see the + * total network traffic in a data center so they'll issue a SQL query like: + * SELECT value FROM timeseriesdb GROUP BY datacenter ORDER BY SUM(value); + * If the {@code groupbyfunction} has been set to a group by function, then the fields + * will contain a key/value pair with the key set in {@code groupbykey}. E.g. + * {@code YCSBGB=SUM}. + *

+ * Additionally with grouping enabled, fields on tag keys where group bys should + * occur will only have the key defined and will not have a value or delimiter. E.g. + * if grouping on tag key "AA", the field will contain {@code AA} instead of {@code AA=AB}. + *

+ * Downsampling - Another common operation is to reduce the resolution of the + * queried time series when fetching a wide time range of data so fewer data points + * are returned. For example, a user may fetch a week of data but if the data is + * recorded on a 1 second interval, that would be over 600k data points so they + * may ask for a 1 hour downsampling (also called bucketing) wherein every hour, all + * of the data points for a "bucket" are aggregated into a single value. + *

+ * To enable downsampling, the {@code downsamplingfunction} property must be set to + * a supported function such as "SUM" and the {@code downsamplinginterval} must be + * set to a valid time interval with the same units as {@code timestampunits}, e.g. + * "3600" which would create 1 hour buckets if the time units were set to seconds. + * With downsampling, query fields will include a key/value pair with + * {@code downsamplingkey} as the key (defaulting to "YCSBDS") and the value being + * a concatenation of {@code downsamplingfunction} and {@code downsamplinginterval}, + * for example {@code YCSBDS=SUM60}. + *

+ * Timestamps - For every read, a random timestamp is selected from the interval + * set. If {@code querytimespan} has been set to a positive value, then the configured + * query time interval is added to the selected timestamp so the read passes the DB + * a range of times. Note that during the run phase, if no data was previously loaded, + * or if there are more {@code recordcount}s set for the run phase, reads may be sent + * to the DB with timestamps that are beyond the written data time range (or even the + * system clock of the DB). + *

+ * Deletes + *

+ * Because the delete API only accepts a single key, a full key and tag key/value + * pair map is flattened into a single string for parsing by the database. Common + * workloads include deleting a single time series (wherein all tag key and values are + * defined), deleting all series containing a tag key and value or deleting all of the + * time series sharing a common time series key. + *

+ * Right now the workload supports deletes with a key and for time series tag key/value + * pairs or a key with tags and a group by on one or more tags (meaning, delete all of + * the series with any value for the given tag key). The parameters are collapsed into + * a single string delimited with the character in the {@code deletedelimiter} property. + * For example, a delete request may look like: {@code AA:AA=AA:AA=AB} to delete the + * first time series in the table above. + *

+ * Threads + *

+ * For a multi-threaded execution, the number of time series keys set via the + * {@code fieldcount} property, must be greater than or equal to the number of + * threads set via {@code threads}. This is due to each thread choosing a subset + * of the total number of time series keys and being responsible for writing values + * for each time series containing those keys at each timestamp. Thus each thread + * will have it's own timestamp generator, incrementing each time every time series + * it is responsible for has had a value written. + *

+ * Each thread may, however, issue reads and scans for any time series in the + * complete set. + *

+ * Sparsity + *

+ * By default, during loads, every time series will have a data point written at every + * time stamp in the interval set. This is common in workloads where a sensor writes + * a value at regular intervals. However some time series are only reported under + * certain conditions. + *

+ * For example, a counter may track the number of errors over a + * time period for a web service and only report when the value is greater than 1. + * Or a time series may include tags such as a user ID and IP address when a request + * arrives at the web service and only report values when that combination is seen. + * This means the timeseries will not have a value at every timestamp and in + * some cases there may be only a single value! + *

+ * This workload has a {@code sparsity} parameter that can choose how often a + * time series should record a value. The default value of 0.0 means every series + * will get a value at every timestamp. A value of 0.95 will mean that for each + * series, only 5% of the timestamps in the interval will have a value. The distribution + * of values is random. + *

+ * Notes/Warnings + *

+ *

    + *
  • Because time series keys and tag key/values are generated and stored in memory, + * be careful of setting the cardinality too high for the JVM's heap.
  • + *
  • When running for data integrity, a number of settings are incompatible and will + * throw errors. Check the error messages for details.
  • + *
  • Databases that support keys only and can't store tags should order and then + * collapse the tag values using a delimiter. For example the series in the example + * table at the top could be written as: + *
      + *
    • {@code AA.AA.AA}
    • + *
    • {@code AA.AA.AB}
    • + *
    • {@code AB.AA.AA}
    • + *
    • {@code AB.AA.AB}
    • + *
  • + *
+ *

+ * TODOs + *

+ *

    + *
  • Support random time intervals. E.g. some series write every second, others every + * 60 seconds.
  • + *
  • Support random time series cardinality. Right now every series has the same + * cardinality.
  • + *
  • Truly random timetamps per time series. We could use bitmaps to determine if + * a series has had a value written for a given timestamp. Right now all of the series + * are in sync time-wise.
  • + *
  • Possibly a real-time load where values are written with the current system time. + * It's more of a bulk-loading operation now.
  • + *
+ */ +public class TimeSeriesWorkload extends Workload { + + /** + * The types of values written to the timeseries store. + */ + public enum ValueType { + INTEGERS("integers"), + FLOATS("floats"), + MIXED("mixednumbers"); + + protected final String name; + + ValueType(final String name) { + this.name = name; + } + + public static ValueType fromString(final String name) { + for (final ValueType type : ValueType.values()) { + if (type.name.equalsIgnoreCase(name)) { + return type; + } + } + throw new IllegalArgumentException("Unrecognized type: " + name); + } + } + + /** Name and default value for the timestamp key property. */ + public static final String TIMESTAMP_KEY_PROPERTY = "timestampkey"; + public static final String TIMESTAMP_KEY_PROPERTY_DEFAULT = "YCSBTS"; + + /** Name and default value for the value key property. */ + public static final String VALUE_KEY_PROPERTY = "valuekey"; + public static final String VALUE_KEY_PROPERTY_DEFAULT = "YCSBV"; + + /** Name and default value for the timestamp interval property. */ + public static final String TIMESTAMP_INTERVAL_PROPERTY = "timestampinterval"; + public static final String TIMESTAMP_INTERVAL_PROPERTY_DEFAULT = "60"; + + /** Name and default value for the timestamp units property. */ + public static final String TIMESTAMP_UNITS_PROPERTY = "timestampunits"; + public static final String TIMESTAMP_UNITS_PROPERTY_DEFAULT = "SECONDS"; + + /** Name and default value for the number of tags property. */ + public static final String TAG_COUNT_PROPERTY = "tagcount"; + public static final String TAG_COUNT_PROPERTY_DEFAULT = "4"; + + /** Name and default value for the tag value cardinality map property. */ + public static final String TAG_CARDINALITY_PROPERTY = "tagcardinality"; + public static final String TAG_CARDINALITY_PROPERTY_DEFAULT = "1, 2, 4, 8"; + + /** Name and default value for the tag key length property. */ + public static final String TAG_KEY_LENGTH_PROPERTY = "tagkeylength"; + public static final String TAG_KEY_LENGTH_PROPERTY_DEFAULT = "8"; + + /** Name and default value for the tag value length property. */ + public static final String TAG_VALUE_LENGTH_PROPERTY = "tagvaluelength"; + public static final String TAG_VALUE_LENGTH_PROPERTY_DEFAULT = "8"; + + /** Name and default value for the tag pair delimiter property. */ + public static final String PAIR_DELIMITER_PROPERTY = "tagpairdelimiter"; + public static final String PAIR_DELIMITER_PROPERTY_DEFAULT = "="; + + /** Name and default value for the delete string delimiter property. */ + public static final String DELETE_DELIMITER_PROPERTY = "deletedelimiter"; + public static final String DELETE_DELIMITER_PROPERTY_DEFAULT = ":"; + + /** Name and default value for the random timestamp write order property. */ + public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY = "randomwritetimestamporder"; + public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT = "false"; + + /** Name and default value for the random time series write order property. */ + public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY = "randomtimeseriesorder"; + public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT = "true"; + + /** Name and default value for the value types property. */ + public static final String VALUE_TYPE_PROPERTY = "valuetype"; + public static final String VALUE_TYPE_PROPERTY_DEFAULT = "floats"; + + /** Name and default value for the sparsity property. */ + public static final String SPARSITY_PROPERTY = "sparsity"; + public static final String SPARSITY_PROPERTY_DEFAULT = "0.00"; + + /** Name and default value for the delayed series percentage property. */ + public static final String DELAYED_SERIES_PROPERTY = "delayedseries"; + public static final String DELAYED_SERIES_PROPERTY_DEFAULT = "0.10"; + + /** Name and default value for the delayed series intervals property. */ + public static final String DELAYED_INTERVALS_PROPERTY = "delayedintervals"; + public static final String DELAYED_INTERVALS_PROPERTY_DEFAULT = "5"; + + /** Name and default value for the query time span property. */ + public static final String QUERY_TIMESPAN_PROPERTY = "querytimespan"; + public static final String QUERY_TIMESPAN_PROPERTY_DEFAULT = "0"; + + /** Name and default value for the randomized query time span property. */ + public static final String QUERY_RANDOM_TIMESPAN_PROPERTY = "queryrandomtimespan"; + public static final String QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT = "false"; + + /** Name and default value for the query time stamp delimiter property. */ + public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY = "querytimespandelimiter"; + public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT = ","; + + /** Name and default value for the group-by key property. */ + public static final String GROUPBY_KEY_PROPERTY = "groupbykey"; + public static final String GROUPBY_KEY_PROPERTY_DEFAULT = "YCSBGB"; + + /** Name and default value for the group-by function property. */ + public static final String GROUPBY_PROPERTY = "groupbyfunction"; + + /** Name and default value for the group-by key map property. */ + public static final String GROUPBY_KEYS_PROPERTY = "groupbykeys"; + + /** Name and default value for the downsampling key property. */ + public static final String DOWNSAMPLING_KEY_PROPERTY = "downsamplingkey"; + public static final String DOWNSAMPLING_KEY_PROPERTY_DEFAULT = "YCSBDS"; + + /** Name and default value for the downsampling function property. */ + public static final String DOWNSAMPLING_FUNCTION_PROPERTY = "downsamplingfunction"; + + /** Name and default value for the downsampling interval property. */ + public static final String DOWNSAMPLING_INTERVAL_PROPERTY = "downsamplinginterval"; + + /** The properties to pull settings from. */ + protected Properties properties; + + /** Generators for keys, tag keys and tag values. */ + protected Generator keyGenerator; + protected Generator tagKeyGenerator; + protected Generator tagValueGenerator; + + /** The timestamp key, defaults to "YCSBTS". */ + protected String timestampKey; + + /** The value key, defaults to "YCSBDS". */ + protected String valueKey; + + /** The number of time units in between timestamps. */ + protected int timestampInterval; + + /** The units of time the timestamp and various intervals represent. */ + protected TimeUnit timeUnits; + + /** Whether or not to randomize the timestamp order when writing. */ + protected boolean randomizeTimestampOrder; + + /** Whether or not to randomize (shuffle) the time series order. NOT compatible + * with data integrity. */ + protected boolean randomizeTimeseriesOrder; + + /** The type of values to generate when writing data. */ + protected ValueType valueType; + + /** Used to calculate an offset for each time series. */ + protected int[] cumulativeCardinality; + + /** The calculated total cardinality based on the config. */ + protected int totalCardinality; + + /** The calculated per-time-series-key cardinality. I.e. the number of unique + * tag key and value combinations. */ + protected int perKeyCardinality; + + /** How much data to scan for in each call. */ + protected NumberGenerator scanlength; + + /** A generator used to select a random time series key per read/scan. */ + protected NumberGenerator keychooser; + + /** A generator to select what operation to perform during the run phase. */ + protected DiscreteGenerator operationchooser; + + /** The maximum number of interval offsets from the starting timestamp. Calculated + * based on the number of records configured for the run. */ + protected int maxOffsets; + + /** The number of records or operations to perform for this run. */ + protected int recordcount; + + /** The number of tag pairs per time series. */ + protected int tagPairs; + + /** The table we'll write to. */ + protected String table; + + /** How many time series keys will be generated. */ + protected int numKeys; + + /** The generated list of possible time series key values. */ + protected String[] keys; + + /** The generated list of possible tag key values. */ + protected String[] tagKeys; + + /** The generated list of possible tag value values. */ + protected String[] tagValues; + + /** The cardinality for each tag key. */ + protected int[] tagCardinality; + + /** A helper to skip non-incrementing tag values. */ + protected int firstIncrementableCardinality; + + /** How sparse the data written should be. */ + protected double sparsity; + + /** The percentage of time series that should be delayed in writes. */ + protected double delayedSeries; + + /** The maximum number of intervals to delay a series. */ + protected int delayedIntervals; + + /** Optional query time interval during reads/scans. */ + protected int queryTimeSpan; + + /** Whether or not the actual interval should be randomly chosen, using + * queryTimeSpan as the maximum value. */ + protected boolean queryRandomTimeSpan; + + /** The delimiter for tag pairs in fields. */ + protected String tagPairDelimiter; + + /** The delimiter between parameters for the delete key. */ + protected String deleteDelimiter; + + /** The delimiter between timestamps for query time spans. */ + protected String queryTimeSpanDelimiter; + + /** Whether or not to issue group-by queries. */ + protected boolean groupBy; + + /** The key used for group-by tag keys. */ + protected String groupByKey; + + /** The function used for group-by's. */ + protected String groupByFunction; + + /** The tag keys to group on. */ + protected boolean[] groupBys; + + /** Whether or not to issue downsampling queries. */ + protected boolean downsample; + + /** The key used for downsampling tag keys. */ + protected String downsampleKey; + + /** The downsampling function. */ + protected String downsampleFunction; + + /** The downsampling interval. */ + protected int downsampleInterval; + + /** + * Set to true if want to check correctness of reads. Must also + * be set to true during loading phase to function. + */ + protected boolean dataintegrity; + + /** Measurements to write data integrity results to. */ + protected Measurements measurements = Measurements.getMeasurements(); + + @Override + public void init(final Properties p) throws WorkloadException { + properties = p; + recordcount = + Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, + Client.DEFAULT_RECORD_COUNT)); + if (recordcount == 0) { + recordcount = Integer.MAX_VALUE; + } + timestampKey = p.getProperty(TIMESTAMP_KEY_PROPERTY, TIMESTAMP_KEY_PROPERTY_DEFAULT); + valueKey = p.getProperty(VALUE_KEY_PROPERTY, VALUE_KEY_PROPERTY_DEFAULT); + operationchooser = CoreWorkload.createOperationGenerator(properties); + + final int maxscanlength = + Integer.parseInt(p.getProperty(CoreWorkload.MAX_SCAN_LENGTH_PROPERTY, + CoreWorkload.MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); + String scanlengthdistrib = + p.getProperty(CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY, + CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); + + if (scanlengthdistrib.compareTo("uniform") == 0) { + scanlength = new UniformLongGenerator(1, maxscanlength); + } else if (scanlengthdistrib.compareTo("zipfian") == 0) { + scanlength = new ZipfianGenerator(1, maxscanlength); + } else { + throw new WorkloadException( + "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length"); + } + + randomizeTimestampOrder = Boolean.parseBoolean(p.getProperty( + RANDOMIZE_TIMESTAMP_ORDER_PROPERTY, + RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT)); + randomizeTimeseriesOrder = Boolean.parseBoolean(p.getProperty( + RANDOMIZE_TIMESERIES_ORDER_PROPERTY, + RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT)); + + // setup the cardinality + numKeys = Integer.parseInt(p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY, + CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); + tagPairs = Integer.parseInt(p.getProperty(TAG_COUNT_PROPERTY, + TAG_COUNT_PROPERTY_DEFAULT)); + sparsity = Double.parseDouble(p.getProperty(SPARSITY_PROPERTY, SPARSITY_PROPERTY_DEFAULT)); + tagCardinality = new int[tagPairs]; + + final String requestdistrib = + p.getProperty(CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY, + CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); + if (requestdistrib.compareTo("uniform") == 0) { + keychooser = new UniformLongGenerator(0, numKeys - 1); + } else if (requestdistrib.compareTo("sequential") == 0) { + keychooser = new SequentialGenerator(0, numKeys - 1); + } else if (requestdistrib.compareTo("zipfian") == 0) { + keychooser = new ScrambledZipfianGenerator(0, numKeys - 1); + //} else if (requestdistrib.compareTo("latest") == 0) { + // keychooser = new SkewedLatestGenerator(transactioninsertkeysequence); + } else if (requestdistrib.equals("hotspot")) { + double hotsetfraction = + Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_DATA_FRACTION, + CoreWorkload.HOTSPOT_DATA_FRACTION_DEFAULT)); + double hotopnfraction = + Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_OPN_FRACTION, + CoreWorkload.HOTSPOT_OPN_FRACTION_DEFAULT)); + keychooser = new HotspotIntegerGenerator(0, numKeys - 1, + hotsetfraction, hotopnfraction); + } else { + throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); + } + + // figure out the start timestamp based on the units, cardinality and interval + try { + timestampInterval = Integer.parseInt(p.getProperty( + TIMESTAMP_INTERVAL_PROPERTY, TIMESTAMP_INTERVAL_PROPERTY_DEFAULT)); + } catch (NumberFormatException nfe) { + throw new WorkloadException("Unable to parse the " + + TIMESTAMP_INTERVAL_PROPERTY, nfe); + } + + try { + timeUnits = TimeUnit.valueOf(p.getProperty(TIMESTAMP_UNITS_PROPERTY, + TIMESTAMP_UNITS_PROPERTY_DEFAULT).toUpperCase()); + } catch (IllegalArgumentException e) { + throw new WorkloadException("Unknown time unit type", e); + } + if (timeUnits == TimeUnit.NANOSECONDS || timeUnits == TimeUnit.MICROSECONDS) { + throw new WorkloadException("YCSB doesn't support " + timeUnits + + " at this time."); + } + + tagPairDelimiter = p.getProperty(PAIR_DELIMITER_PROPERTY, PAIR_DELIMITER_PROPERTY_DEFAULT); + deleteDelimiter = p.getProperty(DELETE_DELIMITER_PROPERTY, DELETE_DELIMITER_PROPERTY_DEFAULT); + dataintegrity = Boolean.parseBoolean( + p.getProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, + CoreWorkload.DATA_INTEGRITY_PROPERTY_DEFAULT)); + + queryTimeSpan = Integer.parseInt(p.getProperty(QUERY_TIMESPAN_PROPERTY, + QUERY_TIMESPAN_PROPERTY_DEFAULT)); + queryRandomTimeSpan = Boolean.parseBoolean(p.getProperty(QUERY_RANDOM_TIMESPAN_PROPERTY, + QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT)); + queryTimeSpanDelimiter = p.getProperty(QUERY_TIMESPAN_DELIMITER_PROPERTY, + QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT); + + groupByKey = p.getProperty(GROUPBY_KEY_PROPERTY, GROUPBY_KEY_PROPERTY_DEFAULT); + groupByFunction = p.getProperty(GROUPBY_PROPERTY); + if (groupByFunction != null && !groupByFunction.isEmpty()) { + final String groupByKeys = p.getProperty(GROUPBY_KEYS_PROPERTY); + if (groupByKeys == null || groupByKeys.isEmpty()) { + throw new WorkloadException("Group by was enabled but no keys were specified."); + } + final String[] gbKeys = groupByKeys.split(","); + if (gbKeys.length != tagKeys.length) { + throw new WorkloadException("Only " + gbKeys.length + " group by keys " + + "were specified but there were " + tagKeys.length + " tag keys given."); + } + groupBys = new boolean[gbKeys.length]; + for (int i = 0; i < gbKeys.length; i++) { + groupBys[i] = Integer.parseInt(gbKeys[i].trim()) == 0 ? false : true; + } + groupBy = true; + } + + downsampleKey = p.getProperty(DOWNSAMPLING_KEY_PROPERTY, DOWNSAMPLING_KEY_PROPERTY_DEFAULT); + downsampleFunction = p.getProperty(DOWNSAMPLING_FUNCTION_PROPERTY); + if (downsampleFunction != null && !downsampleFunction.isEmpty()) { + final String interval = p.getProperty(DOWNSAMPLING_INTERVAL_PROPERTY); + if (interval == null || interval.isEmpty()) { + throw new WorkloadException("'" + DOWNSAMPLING_INTERVAL_PROPERTY + "' was missing despite '" + + DOWNSAMPLING_FUNCTION_PROPERTY + "' being set."); + } + downsampleInterval = Integer.parseInt(interval); + downsample = true; + } + + delayedSeries = Double.parseDouble(p.getProperty(DELAYED_SERIES_PROPERTY, DELAYED_SERIES_PROPERTY_DEFAULT)); + delayedIntervals = Integer.parseInt(p.getProperty(DELAYED_INTERVALS_PROPERTY, DELAYED_INTERVALS_PROPERTY_DEFAULT)); + + valueType = ValueType.fromString(p.getProperty(VALUE_TYPE_PROPERTY, VALUE_TYPE_PROPERTY_DEFAULT)); + table = p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT); + initKeysAndTags(); + validateSettings(); + } + + @Override + public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException { + if (properties == null) { + throw new WorkloadException("Workload has not been initialized."); + } + return new ThreadState(mythreadid, threadcount); + } + + @Override + public boolean doInsert(DB db, Object threadstate) { + if (threadstate == null) { + throw new IllegalStateException("Missing thread state."); + } + final Map tags = new TreeMap(); + final String key = ((ThreadState)threadstate).nextDataPoint(tags, true); + if (db.insert(table, key, tags) == Status.OK) { + return true; + } + return false; + } + + @Override + public boolean doTransaction(DB db, Object threadstate) { + if (threadstate == null) { + throw new IllegalStateException("Missing thread state."); + } + switch (operationchooser.nextString()) { + case "READ": + doTransactionRead(db, threadstate); + break; + case "UPDATE": + doTransactionUpdate(db, threadstate); + break; + case "INSERT": + doTransactionInsert(db, threadstate); + break; + case "SCAN": + doTransactionScan(db, threadstate); + break; + case "DELETE": + doTransactionDelete(db, threadstate); + break; + default: + return false; + } + return true; + } + + protected void doTransactionRead(final DB db, Object threadstate) { + final ThreadState state = (ThreadState) threadstate; + final String keyname = keys[keychooser.nextValue().intValue()]; + + int offsets = state.queryOffsetGenerator.nextValue().intValue(); + //int offsets = Utils.random().nextInt(maxOffsets - 1); + final long startTimestamp; + if (offsets > 0) { + startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); + } else { + startTimestamp = state.startTimestamp; + } + + // rando tags + Set fields = new HashSet(); + for (int i = 0; i < tagPairs; ++i) { + if (groupBy && groupBys[i]) { + fields.add(tagKeys[i]); + } else { + fields.add(tagKeys[i] + tagPairDelimiter + + tagValues[Utils.random().nextInt(tagCardinality[i])]); + } + } + + if (queryTimeSpan > 0) { + final long endTimestamp; + if (queryRandomTimeSpan) { + endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); + } else { + endTimestamp = startTimestamp + queryTimeSpan; + } + fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); + } else { + fields.add(timestampKey + tagPairDelimiter + startTimestamp); + } + if (groupBy) { + fields.add(groupByKey + tagPairDelimiter + groupByFunction); + } + if (downsample) { + fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + downsampleInterval); + } + + final Map cells = new HashMap(); + final Status status = db.read(table, keyname, fields, cells); + + if (dataintegrity && status == Status.OK) { + verifyRow(keyname, cells); + } + } + + protected void doTransactionUpdate(final DB db, Object threadstate) { + if (threadstate == null) { + throw new IllegalStateException("Missing thread state."); + } + final Map tags = new TreeMap(); + final String key = ((ThreadState)threadstate).nextDataPoint(tags, false); + db.update(table, key, tags); + } + + protected void doTransactionInsert(final DB db, Object threadstate) { + doInsert(db, threadstate); + } + + protected void doTransactionScan(final DB db, Object threadstate) { + final ThreadState state = (ThreadState) threadstate; + + final String keyname = keys[Utils.random().nextInt(keys.length)]; + + // choose a random scan length + int len = scanlength.nextValue().intValue(); + + int offsets = Utils.random().nextInt(maxOffsets - 1); + final long startTimestamp; + if (offsets > 0) { + startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); + } else { + startTimestamp = state.startTimestamp; + } + + // rando tags + Set fields = new HashSet(); + for (int i = 0; i < tagPairs; ++i) { + if (groupBy && groupBys[i]) { + fields.add(tagKeys[i]); + } else { + fields.add(tagKeys[i] + tagPairDelimiter + + tagValues[Utils.random().nextInt(tagCardinality[i])]); + } + } + + if (queryTimeSpan > 0) { + final long endTimestamp; + if (queryRandomTimeSpan) { + endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); + } else { + endTimestamp = startTimestamp + queryTimeSpan; + } + fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); + } else { + fields.add(timestampKey + tagPairDelimiter + startTimestamp); + } + if (groupBy) { + fields.add(groupByKey + tagPairDelimiter + groupByFunction); + } + if (downsample) { + fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + tagPairDelimiter + downsampleInterval); + } + + final Vector> results = new Vector>(); + db.scan(table, keyname, len, fields, results); + } + + protected void doTransactionDelete(final DB db, Object threadstate) { + final ThreadState state = (ThreadState) threadstate; + + final StringBuilder buf = new StringBuilder().append(keys[Utils.random().nextInt(keys.length)]); + + int offsets = Utils.random().nextInt(maxOffsets - 1); + final long startTimestamp; + if (offsets > 0) { + startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); + } else { + startTimestamp = state.startTimestamp; + } + + // rando tags + for (int i = 0; i < tagPairs; ++i) { + if (groupBy && groupBys[i]) { + buf.append(deleteDelimiter) + .append(tagKeys[i]); + } else { + buf.append(deleteDelimiter).append(tagKeys[i] + tagPairDelimiter + + tagValues[Utils.random().nextInt(tagCardinality[i])]); + } + } + + if (queryTimeSpan > 0) { + final long endTimestamp; + if (queryRandomTimeSpan) { + endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); + } else { + endTimestamp = startTimestamp + queryTimeSpan; + } + buf.append(deleteDelimiter) + .append(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); + } else { + buf.append(deleteDelimiter) + .append(timestampKey + tagPairDelimiter + startTimestamp); + } + + db.delete(table, buf.toString()); + } + + /** + * Parses the values returned by a read or scan operation and determines whether + * or not the integer value matches the hash and timestamp of the original timestamp. + * Only works for raw data points, will not work for group-by's or downsampled data. + * @param key The time series key. + * @param cells The cells read by the DB. + * @return {@link Status#OK} if the data matched or {@link Status#UNEXPECTED_STATE} if + * the data did not match. + */ + protected Status verifyRow(final String key, final Map cells) { + Status verifyStatus = Status.UNEXPECTED_STATE; + long startTime = System.nanoTime(); + + double value = 0; + long timestamp = 0; + final TreeMap validationTags = new TreeMap(); + for (final Entry entry : cells.entrySet()) { + if (entry.getKey().equals(timestampKey)) { + final NumericByteIterator it = (NumericByteIterator) entry.getValue(); + timestamp = it.getLong(); + } else if (entry.getKey().equals(valueKey)) { + final NumericByteIterator it = (NumericByteIterator) entry.getValue(); + value = it.isFloatingPoint() ? it.getDouble() : it.getLong(); + } else { + validationTags.put(entry.getKey(), entry.getValue().toString()); + } + } + + if (validationFunction(key, timestamp, validationTags) == value) { + verifyStatus = Status.OK; + } + long endTime = System.nanoTime(); + measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); + measurements.reportStatus("VERIFY", verifyStatus); + return verifyStatus; + } + + /** + * Function used for generating a deterministic hash based on the combination + * of metric, tags and timestamp. + * @param key A non-null string representing the key. + * @param timestamp A timestamp in the proper units for the workload. + * @param tags A non-null map of tag keys and values NOT including the YCSB + * key or timestamp. + * @return A hash value as an 8 byte integer. + */ + protected long validationFunction(final String key, final long timestamp, + final TreeMap tags) { + final StringBuilder validationBuffer = new StringBuilder(keys[0].length() + + (tagPairs * tagKeys[0].length()) + (tagPairs * tagCardinality[1])); + for (final Entry pair : tags.entrySet()) { + validationBuffer.append(pair.getKey()).append(pair.getValue()); + } + return (long) validationBuffer.toString().hashCode() ^ timestamp; + } + + /** + * Breaks out the keys, tags and cardinality initialization in another method + * to keep CheckStyle happy. + * @throws WorkloadException If something goes pear shaped. + */ + protected void initKeysAndTags() throws WorkloadException { + final int keyLength = Integer.parseInt(properties.getProperty( + CoreWorkload.FIELD_LENGTH_PROPERTY, + CoreWorkload.FIELD_LENGTH_PROPERTY_DEFAULT)); + final int tagKeyLength = Integer.parseInt(properties.getProperty( + TAG_KEY_LENGTH_PROPERTY, TAG_KEY_LENGTH_PROPERTY_DEFAULT)); + final int tagValueLength = Integer.parseInt(properties.getProperty( + TAG_VALUE_LENGTH_PROPERTY, TAG_VALUE_LENGTH_PROPERTY_DEFAULT)); + + keyGenerator = new IncrementingPrintableStringGenerator(keyLength); + tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeyLength); + tagValueGenerator = new IncrementingPrintableStringGenerator(tagValueLength); + + final int threads = Integer.parseInt(properties.getProperty(Client.THREAD_COUNT_PROPERTY, "1")); + final String tagCardinalityString = properties.getProperty( + TAG_CARDINALITY_PROPERTY, + TAG_CARDINALITY_PROPERTY_DEFAULT); + final String[] tagCardinalityParts = tagCardinalityString.split(","); + int idx = 0; + totalCardinality = numKeys; + perKeyCardinality = 1; + int maxCardinality = 0; + for (final String card : tagCardinalityParts) { + try { + tagCardinality[idx] = Integer.parseInt(card.trim()); + } catch (NumberFormatException nfe) { + throw new WorkloadException("Unable to parse cardinality: " + + card, nfe); + } + if (tagCardinality[idx] < 1) { + throw new WorkloadException("Cardinality must be greater than zero: " + + tagCardinality[idx]); + } + totalCardinality *= tagCardinality[idx]; + perKeyCardinality *= tagCardinality[idx]; + if (tagCardinality[idx] > maxCardinality) { + maxCardinality = tagCardinality[idx]; + } + ++idx; + if (idx >= tagPairs) { + // we have more cardinalities than tag keys so bail at this point. + break; + } + } + if (numKeys < threads) { + throw new WorkloadException("Field count " + numKeys + " (keys for time " + + "series workloads) must be greater or equal to the number of " + + "threads " + threads); + } + + // fill tags without explicit cardinality with 1 + if (idx < tagPairs) { + tagCardinality[idx++] = 1; + } + + for (int i = 0; i < tagCardinality.length; ++i) { + if (tagCardinality[i] > 1) { + firstIncrementableCardinality = i; + break; + } + } + + keys = new String[numKeys]; + tagKeys = new String[tagPairs]; + tagValues = new String[maxCardinality]; + for (int i = 0; i < numKeys; ++i) { + keys[i] = keyGenerator.nextString(); + } + + for (int i = 0; i < tagPairs; ++i) { + tagKeys[i] = tagKeyGenerator.nextString(); + } + + for (int i = 0; i < maxCardinality; i++) { + tagValues[i] = tagValueGenerator.nextString(); + } + if (randomizeTimeseriesOrder) { + Utils.shuffleArray(keys); + Utils.shuffleArray(tagValues); + } + + maxOffsets = (recordcount / totalCardinality) + 1; + final int[] keyAndTagCardinality = new int[tagPairs + 1]; + keyAndTagCardinality[0] = numKeys; + for (int i = 0; i < tagPairs; i++) { + keyAndTagCardinality[i + 1] = tagCardinality[i]; + } + + cumulativeCardinality = new int[keyAndTagCardinality.length]; + for (int i = 0; i < keyAndTagCardinality.length; i++) { + int cumulation = 1; + for (int x = i; x <= keyAndTagCardinality.length - 1; x++) { + cumulation *= keyAndTagCardinality[x]; + } + if (i > 0) { + cumulativeCardinality[i - 1] = cumulation; + } + } + cumulativeCardinality[cumulativeCardinality.length - 1] = 1; + } + + /** + * Makes sure the settings as given are compatible. + * @throws WorkloadException If one or more settings were invalid. + */ + protected void validateSettings() throws WorkloadException { + if (dataintegrity) { + if (valueType != ValueType.INTEGERS) { + throw new WorkloadException("Data integrity was enabled. 'valuetype' must " + + "be set to 'integers'."); + } + if (groupBy) { + throw new WorkloadException("Data integrity was enabled. 'groupbyfunction' must " + + "be empty or null."); + } + if (downsample) { + throw new WorkloadException("Data integrity was enabled. 'downsamplingfunction' must " + + "be empty or null."); + } + if (queryTimeSpan > 0) { + throw new WorkloadException("Data integrity was enabled. 'querytimespan' must " + + "be empty or 0."); + } + if (randomizeTimeseriesOrder) { + throw new WorkloadException("Data integrity was enabled. 'randomizetimeseriesorder' must " + + "be false."); + } + final String startTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY); + if (startTimestamp == null || startTimestamp.isEmpty()) { + throw new WorkloadException("Data integrity was enabled. 'insertstart' must " + + "be set to a Unix Epoch timestamp."); + } + } + } + + /** + * Thread state class holding thread local generators and indices. + */ + protected class ThreadState { + /** The timestamp generator for this thread. */ + protected final UnixEpochTimestampGenerator timestampGenerator; + + /** An offset generator to select a random offset for queries. */ + protected final NumberGenerator queryOffsetGenerator; + + /** The current write key index. */ + protected int keyIdx; + + /** The starting fence for writing keys. */ + protected int keyIdxStart; + + /** The ending fence for writing keys. */ + protected int keyIdxEnd; + + /** Indices for each tag value for writes. */ + protected int[] tagValueIdxs; + + /** Whether or not all time series have written values for the current timestamp. */ + protected boolean rollover; + + /** The starting timestamp. */ + protected long startTimestamp; + + /** + * Default ctor. + * @param threadID The zero based thread ID. + * @param threadCount The total number of threads. + * @throws WorkloadException If something went pear shaped. + */ + protected ThreadState(final int threadID, final int threadCount) throws WorkloadException { + int totalThreads = threadCount > 0 ? threadCount : 1; + + if (threadID >= totalThreads) { + throw new IllegalStateException("Thread ID " + threadID + " cannot be greater " + + "than or equal than the thread count " + totalThreads); + } + if (keys.length < threadCount) { + throw new WorkloadException("Thread count " + totalThreads + " must be greater " + + "than or equal to key count " + keys.length); + } + + int keysPerThread = keys.length / totalThreads; + keyIdx = keysPerThread * threadID; + keyIdxStart = keyIdx; + if (totalThreads - 1 == threadID) { + keyIdxEnd = keys.length; + } else { + keyIdxEnd = keyIdxStart + keysPerThread; + } + + tagValueIdxs = new int[tagPairs]; // all zeros + + final String startingTimestamp = + properties.getProperty(CoreWorkload.INSERT_START_PROPERTY); + if (startingTimestamp == null || startingTimestamp.isEmpty()) { + timestampGenerator = randomizeTimestampOrder ? + new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, maxOffsets) : + new UnixEpochTimestampGenerator(timestampInterval, timeUnits); + } else { + try { + timestampGenerator = randomizeTimestampOrder ? + new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, + Long.parseLong(startingTimestamp), maxOffsets) : + new UnixEpochTimestampGenerator(timestampInterval, timeUnits, + Long.parseLong(startingTimestamp)); + } catch (NumberFormatException nfe) { + throw new WorkloadException("Unable to parse the " + + CoreWorkload.INSERT_START_PROPERTY, nfe); + } + } + // Set the last value properly for the timestamp, otherwise it may start + // one interval ago. + startTimestamp = timestampGenerator.nextValue(); + // TODO - pick it + queryOffsetGenerator = new UniformLongGenerator(0, maxOffsets - 2); + } + + /** + * Generates the next write value for thread. + * @param map An initialized map to populate with tag keys and values as well + * as the timestamp and actual value. + * @param isInsert Whether or not it's an insert or an update. Updates will pick + * an older timestamp (if random isn't enabled). + * @return The next key to write. + */ + protected String nextDataPoint(final Map map, final boolean isInsert) { + int iterations = sparsity <= 0 ? 1 : + Utils.random().nextInt((int) ((double) perKeyCardinality * sparsity)); + if (iterations < 1) { + iterations = 1; + } + while (true) { + iterations--; + if (rollover) { + timestampGenerator.nextValue(); + rollover = false; + } + String key = null; + if (iterations <= 0) { + final TreeMap validationTags; + if (dataintegrity) { + validationTags = new TreeMap(); + } else { + validationTags = null; + } + key = keys[keyIdx]; + int overallIdx = keyIdx * cumulativeCardinality[0]; + for (int i = 0; i < tagPairs; ++i) { + int tvidx = tagValueIdxs[i]; + map.put(tagKeys[i], new StringByteIterator(tagValues[tvidx])); + if (dataintegrity) { + validationTags.put(tagKeys[i], tagValues[tvidx]); + } + if (delayedSeries > 0) { + overallIdx += (tvidx * cumulativeCardinality[i + 1]); + } + } + + if (!isInsert) { + final long delta = (timestampGenerator.currentValue() - startTimestamp) / timestampInterval; + final int intervals = Utils.random().nextInt((int) delta); + map.put(timestampKey, new NumericByteIterator(startTimestamp + (intervals * timestampInterval))); + } else if (delayedSeries > 0) { + // See if the series falls in a delay bucket and calculate an offset earlier + // than the current timestamp value if so. + double pct = (double) overallIdx / (double) totalCardinality; + if (pct < delayedSeries) { + int modulo = overallIdx % delayedIntervals; + if (modulo < 0) { + modulo *= -1; + } + map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue() - + timestampInterval * modulo)); + } else { + map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue())); + } + } else { + map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue())); + } + + if (dataintegrity) { + map.put(valueKey, new NumericByteIterator(validationFunction(key, + timestampGenerator.currentValue(), validationTags))); + } else { + switch (valueType) { + case INTEGERS: + map.put(valueKey, new NumericByteIterator(Utils.random().nextInt())); + break; + case FLOATS: + map.put(valueKey, new NumericByteIterator( + Utils.random().nextDouble() * (double) 100000)); + break; + case MIXED: + if (Utils.random().nextBoolean()) { + map.put(valueKey, new NumericByteIterator(Utils.random().nextInt())); + } else { + map.put(valueKey, new NumericByteIterator( + Utils.random().nextDouble() * (double) 100000)); + } + break; + default: + throw new IllegalStateException("Somehow we didn't have a value " + + "type configured that we support: " + valueType); + } + } + } + + boolean tagRollover = false; + for (int i = tagCardinality.length - 1; i >= 0; --i) { + if (tagCardinality[i] <= 1) { + // nothing to increment here + continue; + } + + if (tagValueIdxs[i] + 1 >= tagCardinality[i]) { + tagValueIdxs[i] = 0; + if (i == firstIncrementableCardinality) { + tagRollover = true; + } + } else { + ++tagValueIdxs[i]; + break; + } + } + + if (tagRollover) { + if (keyIdx + 1 >= keyIdxEnd) { + keyIdx = keyIdxStart; + rollover = true; + } else { + ++keyIdx; + } + } + + if (iterations <= 0) { + return key; + } + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java new file mode 100644 index 0000000000..55acc1f625 --- /dev/null +++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java @@ -0,0 +1,550 @@ +/** + * Copyright (c) 2017 YCSB contributors All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.workloads; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.Vector; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Client; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.NumericByteIterator; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.Utils; +import com.yahoo.ycsb.WorkloadException; +import com.yahoo.ycsb.measurements.Measurements; + +import org.testng.annotations.Test; + +public class TestTimeSeriesWorkload { + + @Test + public void twoThreads() throws Exception { + final Properties p = getUTProperties(); + Measurements.setProperties(p); + + final TimeSeriesWorkload wl = new TimeSeriesWorkload(); + wl.init(p); + Object threadState = wl.initThread(p, 0, 2); + + MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + + threadState = wl.initThread(p, 1, 2); + db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAB"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + } + + @Test (expectedExceptions = WorkloadException.class) + public void badTimeUnit() throws Exception { + final Properties p = new Properties(); + p.put(TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY, "foobar"); + getWorkload(p, true); + } + + @Test (expectedExceptions = WorkloadException.class) + public void failedToInitWorkloadBeforeThreadInit() throws Exception { + final Properties p = getUTProperties(); + final TimeSeriesWorkload wl = getWorkload(p, false); + //wl.init(p); // <-- we NEED this :( + final Object threadState = wl.initThread(p, 0, 2); + + final MockDB db = new MockDB(); + wl.doInsert(db, threadState); + } + + @Test (expectedExceptions = IllegalStateException.class) + public void failedToInitThread() throws Exception { + final Properties p = getUTProperties(); + final TimeSeriesWorkload wl = getWorkload(p, true); + + final MockDB db = new MockDB(); + wl.doInsert(db, null); + } + + @Test + public void insertOneKeyTwoTagsLowCardinality() throws Exception { + final Properties p = getUTProperties(); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + } + + @Test + public void insertTwoKeysTwoTagsLowCardinality() throws Exception { + final Properties p = getUTProperties(); + + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + int metricCtr = 0; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + } + if (metricCtr++ > 1) { + assertEquals(db.keys.get(i), "AAAB"); + if (metricCtr >= 4) { + metricCtr = 0; + timestamp += 60; + } + } else { + assertEquals(db.keys.get(i), "AAAA"); + } + } + } + + @Test + public void insertTwoKeysTwoThreads() throws Exception { + final Properties p = getUTProperties(); + + final TimeSeriesWorkload wl = getWorkload(p, true); + Object threadState = wl.initThread(p, 0, 2); + + MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); // <-- key 1 + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + + threadState = wl.initThread(p, 1, 2); + db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAB"); // <-- key 2 + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + } + + @Test + public void insertThreeKeysTwoThreads() throws Exception { + // To make sure the distribution doesn't miss any metrics + final Properties p = getUTProperties(); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "3"); + + final TimeSeriesWorkload wl = getWorkload(p, true); + Object threadState = wl.initThread(p, 0, 2); + + MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + + threadState = wl.initThread(p, 1, 2); + db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + timestamp = 1451606400; + int metricCtr = 0; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + } + if (metricCtr++ > 1) { + assertEquals(db.keys.get(i), "AAAC"); + if (metricCtr >= 4) { + metricCtr = 0; + timestamp += 60; + } + } else { + assertEquals(db.keys.get(i), "AAAB"); + } + } + } + + @Test + public void insertWithValidation() throws Exception { + final Properties p = getUTProperties(); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); + p.put(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); + p.put(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertFalse(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + + // validation check + final TreeMap validationTags = new TreeMap(); + for (final Entry entry : db.values.get(i).entrySet()) { + if (entry.getKey().equals(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT) || + entry.getKey().equals(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT)) { + continue; + } + validationTags.put(entry.getKey(), entry.getValue().toString()); + } + assertEquals(wl.validationFunction(db.keys.get(i), timestamp, validationTags), + ((NumericByteIterator) db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).getLong()); + + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + } + + @Test + public void read() throws Exception { + final Properties p = getUTProperties(); + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 20; i++) { + wl.doTransactionRead(db, threadState); + } + } + + @Test + public void verifyRow() throws Exception { + final Properties p = getUTProperties(); + final TimeSeriesWorkload wl = getWorkload(p, true); + + final TreeMap validationTags = new TreeMap(); + final HashMap cells = new HashMap(); + + validationTags.put("AA", "AAAA"); + cells.put("AA", new StringByteIterator("AAAA")); + validationTags.put("AB", "AAAB"); + cells.put("AB", new StringByteIterator("AAAB")); + long hash = wl.validationFunction("AAAA", 1451606400L, validationTags); + + cells.put(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT, new NumericByteIterator(1451606400L)); + cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash)); + + assertEquals(wl.verifyRow("AAAA", cells), Status.OK); + + // tweak the last value a bit + for (final ByteIterator it : cells.values()) { + it.reset(); + } + cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash + 1)); + assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE); + + // no value cell, returns an unexpected state + for (final ByteIterator it : cells.values()) { + it.reset(); + } + cells.remove(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT); + assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE); + } + + @Test + public void validateSettingsDataIntegrity() throws Exception { + Properties p = getUTProperties(); + + // data validation incompatibilities + p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); // now it's ok + p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "sum"); // now it's not + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, ""); + p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "sum"); + p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "60"); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, ""); + p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, ""); + p.setProperty(TimeSeriesWorkload.QUERY_TIMESPAN_PROPERTY, "60"); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p = getUTProperties(); + p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); + p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); + p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "true"); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false"); + p.setProperty(TimeSeriesWorkload.INSERT_START_PROPERTY, ""); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + } + + /** Helper method that generates unit testing defaults for the properties map */ + private Properties getUTProperties() { + final Properties p = new Properties(); + p.put(Client.RECORD_COUNT_PROPERTY, "10"); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "2"); + p.put(CoreWorkload.FIELD_LENGTH_PROPERTY, "4"); + p.put(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY, "2"); + p.put(TimeSeriesWorkload.TAG_VALUE_LENGTH_PROPERTY, "4"); + p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "2"); + p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1,2"); + p.put(CoreWorkload.INSERT_START_PROPERTY, "1451606400"); + p.put(TimeSeriesWorkload.DELAYED_SERIES_PROPERTY, "0"); + p.put(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false"); + return p; + } + + /** Helper to setup the workload for testing. */ + private TimeSeriesWorkload getWorkload(final Properties p, final boolean init) + throws WorkloadException { + Measurements.setProperties(p); + if (!init) { + return new TimeSeriesWorkload(); + } else { + final TimeSeriesWorkload workload = new TimeSeriesWorkload(); + workload.init(p); + return workload; + } + } + + static class MockDB extends DB { + final List keys = new ArrayList(); + final List> values = + new ArrayList>(); + + @Override + public Status read(String table, String key, Set fields, + Map result) { + return Status.OK; + } + + @Override + public Status scan(String table, String startkey, int recordcount, + Set fields, Vector> result) { + // TODO Auto-generated method stub + return Status.OK; + } + + @Override + public Status update(String table, String key, + Map values) { + // TODO Auto-generated method stub + return Status.OK; + } + + @Override + public Status insert(String table, String key, + Map values) { + keys.add(key); + this.values.add(values); + return Status.OK; + } + + @Override + public Status delete(String table, String key) { + // TODO Auto-generated method stub + return Status.OK; + } + + public void dumpStdout() { + for (int i = 0; i < keys.size(); i++) { + System.out.print("[" + i + "] Key: " + keys.get(i) + " Values: {"); + int x = 0; + for (final Entry entry : values.get(i).entrySet()) { + if (x++ > 0) { + System.out.print(", "); + } + System.out.print("{" + entry.getKey() + " => "); + if (entry.getKey().equals("YCSBV")) { + System.out.print(new String(Utils.bytesToDouble(entry.getValue().toArray()) + "}")); + } else if (entry.getKey().equals("YCSBTS")) { + System.out.print(new String(Utils.bytesToLong(entry.getValue().toArray()) + "}")); + } else { + System.out.print(new String(entry.getValue().toArray()) + "}"); + } + } + System.out.println("}"); + } + } + } +} \ No newline at end of file diff --git a/workloads/tsworkload_template b/workloads/tsworkload_template new file mode 100644 index 0000000000..9b79a0096f --- /dev/null +++ b/workloads/tsworkload_template @@ -0,0 +1,283 @@ +# Copyright (c) 2017 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. + +# Yahoo! Cloud System Benchmark +# Time Series Workload Template: Default Values +# +# File contains all properties that can be set to define a +# YCSB session. All properties are set to their default +# value if one exists. If not, the property is commented +# out. When a property has a finite number of settings, +# the default is enabled and the alternates are shown in +# comments below it. +# +# Use of each property is explained through comments in Client.java, +# CoreWorkload.java, TimeSeriesWorkload.java or on the YCSB wiki page: +# https://github.com/brianfrankcooper/YCSB/wiki/TimeSeriesWorkload + +# The name of the workload class to use. Always the following. +workload=com.yahoo.ycsb.workloads.TimeSeriesWorkload + +# The default is Java's Long.MAX_VALUE. +# The number of records in the table to be inserted in +# the load phase or the number of records already in the +# table before the run phase. +recordcount=1000000 + +# There is no default setting for operationcount but it is +# required to be set. +# The number of operations to use during the run phase. +operationcount=3000000 + +# The number of insertions to do, if different from recordcount. +# Used with insertstart to grow an existing table. +#insertcount= + +# ..::NOTE::.. This is different from the CoreWorkload! +# The starting timestamp of a run as a Unix Epoch numeral in the +# unit set in 'timestampunits'. This is used to determine what +# the first timestamp should be when writing or querying as well +# as how many offsets (based on 'timestampinterval'). +#insertstart= + +# The units represented by the 'insertstart' timestamp as well as +# durations such as 'timestampinterval', 'querytimespan', etc. +# For values, see https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html +# Note that only seconds through nanoseconds are supported. +timestampunits=SECONDS + +# The amount of time between each value in every time series in +# the units of 'timestampunits'. +timestampinterval=60 + +# ..::NOTE::.. This is different from the CoreWorkload! +# Represents the number of unique "metrics" or "keys" for time series. +# E.g. "sys.cpu" may be a single field or "metric" while there may be many +# time series sharing that key (perhaps a host tag with "web01" and "web02" +# as options). +fieldcount=16 + +# The number of characters in the "metric" or "key". +fieldlength=8 + +# --- TODO ---? +# The distribution used to choose the length of a field +fieldlengthdistribution=constant +#fieldlengthdistribution=uniform +#fieldlengthdistribution=zipfian + +# The number of unique tag combinations for each time series. E.g +# if this value is 4, each record will have a key and 4 tag combinations +# such as A=A, B=A, C=A, D=A. +tagcount=4 + +# The cardinality (number of unique values) of each tag value for +# every "metric" or field as a comma separated list. Each value must +# be a number from 1 to Java's Integer.MAX_VALUE and there must be +# 'tagcount' values. If there are more or fewer values than +#'tagcount' then either it is ignored or 1 is substituted respectively. +tagcardinality=1,2,4,8 + +# The length of each tag key in characters. +tagkeylength=8 + +# The length of each tag value in characters. +tagvaluelength=8 + +# The character separating tag keys from tag values when reads, deletes +# or scans are executed against a database. The default is the equals sign +# so a field passed in a read to a DB may look like 'AA=AB'. +tagpairdelimiter== + +# The delimiter between keys and tags when a delete is passed to the DB. +# E.g. if there was a key and a field, the request key would look like: +# 'AA:AA=AB' +deletedelimiter=: + +# Whether or not to randomize the timestamp order when performing inserts +# and updates against a DB. By default all writes perform with the +# timestamps moving linearly forward in time once all time series for a +# given key have been written. +randomwritetimestamporder=false + +# Whether or not to randomly shuffle the time series order when writing. +# This will shuffle the keys, tag keys and tag values. +# ************************************************************************ +# WARNING - When this is enabled, reads and scans will likely return many +# empty results as invalid tag combinations will be chosen. Likewise +# this setting is INCOMPATIBLE with data integrity checks. +# ************************************************************************ +randomtimeseriesorder=false + +# The type of numerical data generated for each data point. The values are +# 64 bit signed integers, double precision floating points or a random mix. +# For data integrity, this setting is ignored and values are switched to +# 64 bit signed ints. +#valuetype=integers +valuetype=floats +#valuetype=mixed + +# A value from 0 to 0.999999 representing how sparse each time series +# should be. The higher this value, the greater the time interval between +# values in a single series. For example, if sparsity is 0 and there are +# 10 time series with a 'timestampinterval' of 60 seconds with a total +# time range of 10 intervals, you would see 100 values written, one per +# timestamp interval per time series. If the sparsity is 0.50 then there +# would be only about 50 values written so some time series would have +# missing values at each interval. +sparsity=0.00 + +# The percentage of time series that are "lagging" behind the current +# timestamp of the writer. This is used to mimic a common behavior where +# most sources (agents, sensors, etc) are writing data in sync (same timestamp) +# but a subset are running behind due to buffering, latency issues, etc. +delayedSeries=0.10 + +# The maximum amount of delay for delayed series in interval counts. The +# actual delay is chosen based on a modulo of the series index. +delayedIntervals=5 + +# The fixed or maximum amount of time added to the start time of a +# read or scan operation to generate a query over a range of time +# instead of a single timestamp. Units are shared with 'timestampunits'. +# For example if the value is set to 3600 seconds (1 hour) then +# each read would pick a random start timestamp based on the +#'insertstart' value and number of intervals, then add 3600 seconds +# to create the end time of the query. If this value is 0 then reads +# will only provide a single timestamp. +# WARNING: Cannot be used with 'dataintegrity'. +querytimespan=0 + +# Whether or not reads should choose a random time span (aligned to +# the 'timestampinterval' value) for each read or scan request starting +# at 0 and reaching 'querytimespan' as the max. +queryrandomtimespan=false + +# A delimiter character used to separate the start and end timestamps +# of a read query when 'querytimespan' is enabled. +querytimespandelimiter=, + +# A unique key given to read, scan and delete operations when the +# operation should perform a group-by (multi-series aggregation) on one +# or more tags. If 'groupbyfunction' is set, this key will be given with +# the configured function. +groupbykey=YCSBGB + +# A function name (e.g. 'sum', 'max' or 'avg') passed during reads, +# scans and deletes to cause the database to perform a group-by +# operation on one or more tags. If this value is empty or null +# (default), group-by operations are not performed +#groupbyfunction= + +# A comma separated list of 0s or 1s to denote which of the tag keys +# should be grouped during group-by operations. The number of values +# must match the number of tags in 'tagcount'. +#groupbykeys=0,0,1,1 + +# A unique key given to read and scan operations when the operation +# should downsample the results of a query into lower resolution +# data. If 'downsamplingfunction' is set, this key will be given with +# the configured function. +downsamplingkey=YCSBDS + +# A function name (e.g. 'sum', 'max' or 'avg') passed during reads and +# scans to cause the database to perform a downsampling operation +# returning lower resolution data. If this value is empty or null +# (default), downsampling is not performed. +#downsamplingfunction= + +# A time interval for which to downsample the raw data into. Shares +# the same units as 'timestampinterval'. This value must be greater +# than 'timestampinterval'. E.g. if the timestamp interval for raw +# data is 60 seconds, the downsampling interval could be 3600 seconds +# to roll up the data into 1 hour buckets. +#downsamplinginterval= + +# What proportion of operations are reads +readproportion=0.10 + +# What proportion of operations are updates +updateproportion=0.00 + +# What proportion of operations are inserts +insertproportion=0.90 + +# The distribution of requests across the keyspace +requestdistribution=zipfian +#requestdistribution=uniform +#requestdistribution=latest + +# The name of the database table to run queries against +table=usertable + +# Whether or not data should be validated during writes and reads. If +# set then the data type is always a 64 bit signed integer and is the +# hash code of the key, timestamp and tags. +dataintegrity=false + +# How the latency measurements are presented +measurementtype=histogram +#measurementtype=timeseries +#measurementtype=raw +# When measurementtype is set to raw, measurements will be output +# as RAW datapoints in the following csv format: +# "operation, timestamp of the measurement, latency in us" +# +# Raw datapoints are collected in-memory while the test is running. Each +# data point consumes about 50 bytes (including java object overhead). +# For a typical run of 1 million to 10 million operations, this should +# fit into memory most of the time. If you plan to do 100s of millions of +# operations per run, consider provisioning a machine with larger RAM when using +# the RAW measurement type, or split the run into multiple runs. +# +# Optionally, you can specify an output file to save raw datapoints. +# Otherwise, raw datapoints will be written to stdout. +# The output file will be appended to if it already exists, otherwise +# a new output file will be created. +#measurement.raw.output_file = /tmp/your_output_file_for_this_run + +# JVM Reporting. +# +# Measure JVM information over time including GC counts, max and min memory +# used, max and min thread counts, max and min system load and others. This +# setting must be enabled in conjunction with the "-s" flag to run the status +# thread. Every "status.interval", the status thread will capture JVM +# statistics and record the results. At the end of the run, max and mins will +# be recorded. +# measurement.trackjvm = false + +# The range of latencies to track in the histogram (milliseconds) +histogram.buckets=1000 + +# Granularity for time series (in milliseconds) +timeseries.granularity=1000 + +# Latency reporting. +# +# YCSB records latency of failed operations separately from successful ones. +# Latency of all OK operations will be reported under their operation name, +# such as [READ], [UPDATE], etc. +# +# For failed operations: +# By default we don't track latency numbers of specific error status. +# We just report latency of all failed operation under one measurement name +# such as [READ-FAILED]. But optionally, user can configure to have either: +# 1. Record and report latency for each and every error status code by +# setting reportLatencyForEachError to true, or +# 2. Record and report latency for a select set of error status codes by +# providing a CSV list of Status codes via the "latencytrackederrors" +# property. +# reportlatencyforeacherror=false +# latencytrackederrors="" diff --git a/workloads/tsworkloada b/workloads/tsworkloada new file mode 100644 index 0000000000..868007f2fc --- /dev/null +++ b/workloads/tsworkloada @@ -0,0 +1,46 @@ +# Copyright (c) 2017 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. + +# Yahoo! Cloud System Benchmark +# Workload A: Small cardinality consistent data for 2 days +# Application example: Typical monitoring of a single compute or small +# sensor station where 90% of the load is write and only 10% is read +# (it's usually much less). All writes are inserts. No sparsity so +# every series will have a value at every timestamp. +# +# Read/insert ratio: 10/90 +# Cardinality: 16 per key (field), 64 fields for a total of 1,024 +# time series. +workload=com.yahoo.ycsb.workloads.TimeSeriesWorkload + +recordcount=1474560 +operationcount=2949120 + +fieldlength=8 +fieldcount=64 +tagcount=4 +tagcardinality=1,2,4,2 + +sparsity=0.0 +delayedSeries=0.0 +delayedIntervals=0 + +timestampunits=SECONDS +timestampinterval=60 +querytimespan=3600 + +readproportion=0.10 +updateproportion=0.00 +insertproportion=0.90