Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a naive merge operation #2

Merged
merged 4 commits into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ Adds a `value` with the specified `count`. If key is is missing, an empty t-dige

*Reply:* `long long`

#### `TDIGEST.MERGE destkey sourcekey [sourcekey ...]`

Merges one or more `sourcekey` into `destkey`. If `destkey` is missing, an empty t-digest structure is initialized with a default compression of `400`.

*Reply:* `"OK"`

#### `TDIGEST.CDF key value [value ...]`

Returns the cumulative distribution for all provided values. `value` must be a double. The cumulative distribution returned for all values is between `0..1`.
Expand Down
59 changes: 59 additions & 0 deletions src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,60 @@ static int TDigestTypeAdd_RedisCommand(RedisModuleCtx *ctx,
return REDISMODULE_OK;
}

/* TDIGEST.MERGE destkey sourcekey [sourcekey ...] */
static int TDigestTypeMerge_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx); /* Use automatic memory management. */
int num_keys = 0;
int i, j;

if (RedisModule_IsKeysPositionRequest(ctx)) {
for (i = 1; i < argc; i++) {
RedisModule_KeyAtPos(ctx, i);
}
return REDISMODULE_OK;
}

if (argc < 3)
return RedisModule_WrongArity(ctx);

/* Validate all keys are either empty or tdigest. */
RedisModuleKey **keys = RedisModule_PoolAlloc(ctx, argc - 1);
for (i = 1; i < argc; i++) {
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[i],
REDISMODULE_READ | REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY
&& RedisModule_ModuleTypeGetType(key) != TDigestType) {
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
if (i == 1 || type != REDISMODULE_KEYTYPE_EMPTY) {
keys[num_keys++] = key;
}
}

struct TDigest *t1, *t2;
if (RedisModule_KeyType(keys[0]) == REDISMODULE_KEYTYPE_EMPTY) {
t1 = tdigestNew(DEFAULT_COMPRESSION);
RedisModule_ModuleTypeSetValue(keys[0], TDigestType, t1);
} else {
t1 = RedisModule_ModuleTypeGetValue(keys[0]);
}

/* Add all centroids from sources to destination. */
for (i = 1; i < num_keys; i++) {
t2 = RedisModule_ModuleTypeGetValue(keys[i]);
for (j = 0; j < t2->num_centroids; j++) {
tdigestAdd(t1, t2->centroids[j].mean, t2->centroids[j].weight);
}
}

RedisModule_ReplyWithSimpleString(ctx, "OK");
RedisModule_ReplicateVerbatim(ctx);

return REDISMODULE_OK;
}

/* TDIGEST.CDF key value [value ...] */
static int TDigestTypeCDF_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv, int argc) {
Expand Down Expand Up @@ -324,6 +378,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx) {
1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx, "tdigest.merge",
TDigestTypeMerge_RedisCommand, "write deny-oom getkeys-api", 0, 0,
0) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx, "tdigest.cdf",
TDigestTypeCDF_RedisCommand, "readonly", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
Expand Down
15 changes: 15 additions & 0 deletions src/tdigest.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,21 @@ double tdigestQuantile(struct TDigest *t, double q) {
return t->max;
}

void tdigestMerge(struct TDigest *t1, struct TDigest *t2) {
int i = t2->num_buffered_pts;
struct Point *p = t2->buffered_pts;

for (i = 0; i < t2->num_centroids; i++) {
tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
}

while (i) {
tdigestAdd(t1, p->value, p->weight);
p = p->next;
i--;
}
}

void tdigestFree(struct TDigest *t) {
while (t->buffered_pts) {
struct Point *p = t->buffered_pts;
Expand Down
5 changes: 5 additions & 0 deletions test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def tdigest_add(self, key, value, count, *args):
cmd_args.extend(map(str, args))
return self.client.execute_command(*cmd_args)

def tdigest_merge(self, destkey, sourcekey, *args):
cmd_args = ['TDIGEST.MERGE', destkey, sourcekey]
cmd_args.extend(map(str, args))
return self.client.execute_command(*cmd_args)

def tdigest_cdf(self, key, value, *args):
cmd_args = ['TDIGEST.CDF', key, str(value)]
cmd_args.extend(map(str, args))
Expand Down
48 changes: 31 additions & 17 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,47 @@ def cdf(x, values):

def run_test_for_dist(redis, distfn):
key = distfn.__name__
keydest = key + ':dest'
key0 = key + ':0'
key1 = key + ':1'
testkeys = [key, keydest, key0]
redis.tdigest_new(key)
redis.tdigest_new(key0)
redis.tdigest_new(key1)

quantiles = [0.001, 0.01, 0.1, 0.5, 0.9, 0.99, 0.999]
values = []

for _ in xrange(NUM_VALUES):
for i in xrange(NUM_VALUES):
v = distfn()
redis.tdigest_add(key, v, 1)
if 0 == i % 2:
redis.tdigest_add(key0, v, 1)
else:
redis.tdigest_add(key1, v, 1)
values.append(v)


redis.tdigest_merge(keydest, key0, key1)
redis.tdigest_merge(key0, key1)

values = sorted(values)
soft_errs = 0

redis.tdigest_meta(key)

for i, q in enumerate(quantiles):
ix = NUM_VALUES * quantiles[i] - 0.5;
idx = int(math.floor(ix))
p = ix - idx;
x = values[idx] * (1 - p) + values[idx + 1] * p;
estimate_x = float(redis.tdigest_quantile(key, q)[0])
estimate_q = float(redis.tdigest_cdf(key, x)[0])

assert abs(q - estimate_q) < 0.005
if abs(cdf(estimate_x, values) - q) > 0.005:
soft_errs += 1
assert soft_errs < 3

for k in testkeys:
soft_errs = 0
redis.tdigest_meta(k)
for i, q in enumerate(quantiles):
ix = NUM_VALUES * quantiles[i] - 0.5;
idx = int(math.floor(ix))
p = ix - idx;
x = values[idx] * (1 - p) + values[idx + 1] * p;
estimate_x = float(redis.tdigest_quantile(k, q)[0])
estimate_q = float(redis.tdigest_cdf(k, x)[0])

assert abs(q - estimate_q) < 0.005
if abs(cdf(estimate_x, values) - q) > 0.005:
soft_errs += 1
assert soft_errs < 3


def test_uniform(redis, flushdb):
Expand Down