-
Notifications
You must be signed in to change notification settings - Fork 267
Refactors the bolts from storm to be independent workers in online. With... #395
Conversation
…ith just a base bolt in storm to do the functions there
|
||
} | ||
|
||
object InflightTuples { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a metrics concept.
// If we incremented on something that was 0 or negative | ||
// And not in a failed state, then this is an error | ||
if((newS.counter - by <= 0) && !newS.failed) { | ||
throw new Exception("Invalid call on an inputstate, we had already decremented to 0 and not failed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in incrBy. It should not allow a faulty transition (or at least, it should be a method on the state, so we don't change states in a fault.
} | ||
} | ||
|
||
def fail(fn: (T => Unit)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar: def fail(fn: T => U): U = {
@johnynek should be all updated per comments |
val futWithFail = tick.onFailure { thr => | ||
responses.put(((List(), Failure(thr)))) | ||
} | ||
val fut = handleSuccess(futWithFail) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 46 - 52 is duplicated on 58 - 63. Can we make one function and call that in both places?
Refactors the bolts from storm to be independent workers in online. With...
twitterGH-388: Fix CMS test issue caused by roundtripping depth->delta->depth
... just a base bolt in storm to do the functions there