-
Notifications
You must be signed in to change notification settings - Fork 354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes behavior of RequestChannel #736
Conversation
Did you miss a file? or is this a draft? |
@yschimke pushed fix |
e24f3c2
to
ff009c8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is way too much for me to digest on a single commit, but I still have small general remarks. I didn't review the logic rework itself in any depth
rsocket-core/src/main/java/io/rsocket/internal/FluxSwitchOnFirst.java
Outdated
Show resolved
Hide resolved
suppressed by #761 |
d321e68
to
c853a39
Compare
5339a88
to
268e1ae
Compare
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
268e1ae
to
ab0ce37
Compare
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
.expectNextCount(3) | ||
.expectComplete() | ||
.verify(getTimeout()); | ||
|
||
Assertions.assertThat(requested.get()) | ||
.isEqualTo(257L); // 257 because of eager behavior of limitRate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rstoyanchev for your attention. Here is the result of eager prefetch 1L (from the switchOnFirst + 256 extra from limitRate)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh... found a bug because of skip does extra request 1 but it should not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. prefetch should be exactly 256
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
9ca64fe
to
fb97f43
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall.
Just one minor comment below, and also checkAvailable()
seems to be missing @Nullable
which results in several IDE warnings in RSocketRequester
.
// need to skip first since we have already sent it | ||
first = false; | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The if (first)
check could be replaced with inboundFlux.skip(1)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was with skip initially, but, I figured out that skip does extra request(1) when it skips, so we have extra 1 element requested (I mentioned that in the comments that expected prefetch is 256, but it was 257)
The strategy is the following
- we do at the very beginning
request(1)
(from theswitchOnFirst
) to obtain the first element and then do the transformation. - Then, we send that obtained element as a part of
RequestChannel
frame so it appears on the moment of delivery - Then, in order to not break backpressure because of the eager first element prefetch, we reduce 1 from the first upcoming request to the
switchOnFirst
. - However, the first prefetched element is sent to the flux along the way, and that would be OK if we have just normal routing case, but, in case of RSocket, the first payload has already been sent so if we are not going to
skip
the first element, it will be delivered twice - Here comes the issue, if we use
skip
thenskip
operator think that we need to fulfill the demand by requesting extra one element, but in fact we just need not skip butDROP
element instead without doingrequest(1)
This PR provides a fix for the incorrect behavior of RequestChannel.
At this moment there is a couple of issues with that:
1.1) There was an incorrect expectation of signal ordering which is
requestN
from the local downstream, onNext from the local upstream. In fact, signal order can be totally different and while we wait for first onNext signal, nothing prevents to perform a few morerequestN
which is going to break correctness (see https://github.com/rsocket/rsocket-java/pull/736/files#diff-a87c12662c200847a65e4b44b57e2278R236)1.2) There was an incorrect behavior in case upstream terminates earlier than the remote responder so the downstream observed onComplete before it should do so. The RSocket-Spec says that the stream can be half-closed until both sides send the complete signal. (https://github.com/rsocket/rsocket-java/pull/736/files#diff-b0fea90eb4e3536e65de608e97f637f9L369)
I found that there could be racing between sending first frame and cancelation, so I added an additional flag that ensures that we released the given frame and no leaks going to appear. (see https://github.com/rsocket/rsocket-java/pull/736/files#diff-b0fea90eb4e3536e65de608e97f637f9R394)
In order to implement correct behavior of request channel I reimplemented the switchOnFirst operator to follow the behavior defined by RSocket-Spec (https://github.com/rsocket/rsocket-java/pull/736/files#diff-ec51e7f167b7b31f5b3c4b25f5233e8dR1)
This PR is a temporary fix on the upcoming rewriting of every stream to state-machine it is done in the RSocket-CPP https://github.com/rsocket/rsocket-cpp/tree/master/rsocket/statemachine
Signed-off-by: Oleh Dokuka shadowgun@i.ua