From cc5a8916c1fcdc75cb83c5b04059b2f76fddb130 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Sun, 19 Jun 2016 12:47:10 -0700 Subject: [PATCH 1/4] Adds merge operation --- src/tdigest.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/tdigest.c b/src/tdigest.c index 254ca98..2714f92 100644 --- a/src/tdigest.c +++ b/src/tdigest.c @@ -322,6 +322,21 @@ double tdigestQuantile(struct TDigest *t, double q) { return t->max; } +void tdigestMerge(struct TDigest *t1, struct TDigest *t2) { + int i = t2->num_buffered_pts; + struct Point *p = t2->buffered_pts; + + for (i = 0; i < t2->num_centroids; i++) { + tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight); + } + + while (i) { + tdigestAdd(t1, p->value, p->weight); + p = p->next; + i--; + } +} + void tdigestFree(struct TDigest *t) { while (t->buffered_pts) { struct Point *p = t->buffered_pts; From 60391a72e48fe80d7921bf1cdee1558c81743306 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Sun, 19 Jun 2016 12:47:45 -0700 Subject: [PATCH 2/4] Adds TDIGEST.MERGE command --- src/command.c | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/src/command.c b/src/command.c index 1745d79..7c9dc7f 100644 --- a/src/command.c +++ b/src/command.c @@ -117,6 +117,60 @@ static int TDigestTypeAdd_RedisCommand(RedisModuleCtx *ctx, return REDISMODULE_OK; } +/* TDIGEST.MERGE destkey sourcekey [sourcekey ...] */ +static int TDigestTypeMerge_RedisCommand(RedisModuleCtx *ctx, + RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + int num_keys = 0; + int i, j; + + if (RedisModule_IsKeysPositionRequest(ctx)) { + for (i = 1; i < argc; i++) { + RedisModule_KeyAtPos(ctx, i); + } + return REDISMODULE_OK; + } + + if (argc < 3) + return RedisModule_WrongArity(ctx); + + /* Validate all keys are either empty or tdigest. */ + RedisModuleKey **keys = RedisModule_PoolAlloc(ctx, argc - 1); + for (i = 1; i < argc; i++) { + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[i], + REDISMODULE_READ | REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY + && RedisModule_ModuleTypeGetType(key) != TDigestType) { + return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + if (i == 1 || type != REDISMODULE_KEYTYPE_EMPTY) { + keys[num_keys++] = key; + } + } + + struct TDigest *t1, *t2; + if (RedisModule_KeyType(keys[0]) == REDISMODULE_KEYTYPE_EMPTY) { + t1 = tdigestNew(DEFAULT_COMPRESSION); + RedisModule_ModuleTypeSetValue(keys[0], TDigestType, t1); + } else { + t1 = RedisModule_ModuleTypeGetValue(keys[0]); + } + + /* Add all centroids from sources to destination. */ + for (i = 1; i < num_keys; i++) { + t2 = RedisModule_ModuleTypeGetValue(keys[i]); + for (j = 0; j < t2->num_centroids; j++) { + tdigestAdd(t1, t2->centroids[j].mean, t2->centroids[j].weight); + } + } + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + RedisModule_ReplicateVerbatim(ctx); + + return REDISMODULE_OK; +} + /* TDIGEST.CDF key value [value ...] */ static int TDigestTypeCDF_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { @@ -324,6 +378,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx) { 1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "tdigest.merge", + TDigestTypeMerge_RedisCommand, "write deny-oom getkeys-api", 0, 0, + 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "tdigest.cdf", TDigestTypeCDF_RedisCommand, "readonly", 1, 1, 1) == REDISMODULE_ERR) return REDISMODULE_ERR; From d3dff5cbaa07ace65e43cf02620061f14ac0cb48 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Sun, 19 Jun 2016 12:49:11 -0700 Subject: [PATCH 3/4] Adds tests for merge command --- test/__init__.py | 5 +++++ test/test_integration.py | 48 ++++++++++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index 6c2af7c..5fe4b71 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -52,6 +52,11 @@ def tdigest_add(self, key, value, count, *args): cmd_args.extend(map(str, args)) return self.client.execute_command(*cmd_args) + def tdigest_merge(self, destkey, sourcekey, *args): + cmd_args = ['TDIGEST.MERGE', destkey, sourcekey] + cmd_args.extend(map(str, args)) + return self.client.execute_command(*cmd_args) + def tdigest_cdf(self, key, value, *args): cmd_args = ['TDIGEST.CDF', key, str(value)] cmd_args.extend(map(str, args)) diff --git a/test/test_integration.py b/test/test_integration.py index 40f440f..53fb259 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -19,33 +19,47 @@ def cdf(x, values): def run_test_for_dist(redis, distfn): key = distfn.__name__ + keydest = key + ':dest' + key0 = key + ':0' + key1 = key + ':1' + testkeys = [key, keydest, key0] redis.tdigest_new(key) + redis.tdigest_new(key0) + redis.tdigest_new(key1) quantiles = [0.001, 0.01, 0.1, 0.5, 0.9, 0.99, 0.999] values = [] - for _ in xrange(NUM_VALUES): + for i in xrange(NUM_VALUES): v = distfn() redis.tdigest_add(key, v, 1) + if 0 == i % 2: + redis.tdigest_add(key0, v, 1) + else: + redis.tdigest_add(key1, v, 1) values.append(v) + + redis.tdigest_merge(keydest, key0, key1) + redis.tdigest_merge(key0, key1) + values = sorted(values) - soft_errs = 0 - - redis.tdigest_meta(key) - - for i, q in enumerate(quantiles): - ix = NUM_VALUES * quantiles[i] - 0.5; - idx = int(math.floor(ix)) - p = ix - idx; - x = values[idx] * (1 - p) + values[idx + 1] * p; - estimate_x = float(redis.tdigest_quantile(key, q)[0]) - estimate_q = float(redis.tdigest_cdf(key, x)[0]) - - assert abs(q - estimate_q) < 0.005 - if abs(cdf(estimate_x, values) - q) > 0.005: - soft_errs += 1 - assert soft_errs < 3 + + for k in testkeys: + soft_errs = 0 + redis.tdigest_meta(k) + for i, q in enumerate(quantiles): + ix = NUM_VALUES * quantiles[i] - 0.5; + idx = int(math.floor(ix)) + p = ix - idx; + x = values[idx] * (1 - p) + values[idx + 1] * p; + estimate_x = float(redis.tdigest_quantile(k, q)[0]) + estimate_q = float(redis.tdigest_cdf(k, x)[0]) + + assert abs(q - estimate_q) < 0.005 + if abs(cdf(estimate_x, values) - q) > 0.005: + soft_errs += 1 + assert soft_errs < 3 def test_uniform(redis, flushdb): From e022a535403798889f91657f974175620d443f4a Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Sun, 19 Jun 2016 12:49:38 -0700 Subject: [PATCH 4/4] Updates README with TDIGEST.MERGE --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index c72609c..5ca0ee9 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,12 @@ Adds a `value` with the specified `count`. If key is is missing, an empty t-dige *Reply:* `long long` +#### `TDIGEST.MERGE destkey sourcekey [sourcekey ...]` + +Merges one or more `sourcekey` into `destkey`. If `destkey` is missing, an empty t-digest structure is initialized with a default compression of `400`. + +*Reply:* `"OK"` + #### `TDIGEST.CDF key value [value ...]` Returns the cumulative distribution for all provided values. `value` must be a double. The cumulative distribution returned for all values is between `0..1`.