Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #399 from twitter/feature/useMultiMerge
Browse files Browse the repository at this point in the history
Uses multimerge
  • Loading branch information
johnynek committed Dec 6, 2013
2 parents c784861 + f69c863 commit 0b1d7de
Showing 1 changed file with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,11 @@ class Summer[Key, Value: Semigroup, S, D](
// See MaxWaitingFutures for a todo around simplifying this.
buffer(wrappedData)
.map { kvs =>
kvs.iterator.map { case ((k, batchID), (tups, stamp, delta)) =>
(tups,
store.merge(((k, batchID), delta)).map { before =>
List((stamp, (k, (before, delta))))
}
.onSuccess { _ => successHandlerOpt.get.handlerFn.apply() }
)
store.multiMerge(kvs.mapValues(_._3)).map{case (innerKb, beforeF) =>
val (tups, stamp, delta) = kvs(innerKb)
val (k, _) = innerKb
(tups, beforeF.map(before => List((stamp, (k, (before, delta)))))
.onSuccess { _ => successHandlerOpt.get.handlerFn.apply() } )
}
.toList // force, but order does not matter, so we could optimize this
}
Expand Down

0 comments on commit 0b1d7de

Please sign in to comment.