-
Notifications
You must be signed in to change notification settings - Fork 354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implements dedicated Publisher
/Subscriber
for each request type #761
#761
Conversation
d71b8c0
to
7aa340b
Compare
7aa340b
to
406daee
Compare
BenchesBase Line RC7-SNAPSHOT
Current Branch
|
231cae8
to
a78a14d
Compare
Testing Matrix ChecklistGeneric
Racing Cases |
a78a14d
to
e7efe6d
Compare
5bd5fcb
to
2d6c698
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamManager
provides a way for requester Mono's and responder Subscriber's to access something back from RSocketRequester
and RSocketResponder
. This can be taken further to avoid passing other fields as well.
For example RSocketRequester
and RSocketResponder
could extend this (replacing StreamManager
and AbstractStreamManager
):
class RequesterResponderSupport {
private final int mtu;
private final int maxFrameLength;
private final int maxInboundPayloadSize;
private final PayloadDecoder payloadDecoder;
private final ByteBufAllocator allocator;
@Nullable
final StreamIdSupplier streamIdSupplier;
final IntObjectMap<FrameHandler> activeStreams;
private final UnboundedProcessor<ByteBuf> sendProcessor;
public RequesterResponderSupport(
int mtu,
int maxFrameLength,
int maxInboundPayloadSize,
PayloadDecoder payloadDecoder,
ByteBufAllocator allocator,
@Nullable StreamIdSupplier streamIdSupplier,
IntObjectMap<FrameHandler> activeStreams) {
this.activeStreams = activeStreams;
this.mtu = mtu;
this.maxFrameLength = maxFrameLength;
this.maxInboundPayloadSize = maxInboundPayloadSize;
this.payloadDecoder = payloadDecoder;
this.allocator = allocator;
this.streamIdSupplier = streamIdSupplier;
this.sendProcessor = new UnboundedProcessor<>();
}
public int getMtu() {
return mtu;
}
public int getMaxFrameLength() {
return maxFrameLength;
}
public int getMaxInboundPayloadSize() {
return maxInboundPayloadSize;
}
public PayloadDecoder getPayloadDecoder() {
return payloadDecoder;
}
public ByteBufAllocator getAllocator() {
return allocator;
}
public UnboundedProcessor<ByteBuf> getSendProcessor() {
return sendProcessor;
}
public synchronized int getNextId() {
if (this.streamIdSupplier != null) {
return this.streamIdSupplier.nextStreamId(this.activeStreams);
}
else {
throw new UnsupportedOperationException("Responder can not issue id");
}
}
public synchronized int addAndGetNextId(FrameHandler frameHandler) {
if (this.streamIdSupplier != null) {
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
final int streamId = this.streamIdSupplier.nextStreamId(activeStreams);
activeStreams.put(streamId, frameHandler);
return streamId;
}
else {
throw new UnsupportedOperationException("Responder can not issue id");
}
}
public FrameHandler get(int streamId) {
return this.activeStreams.get(streamId);
}
public boolean remove(int streamId, FrameHandler frameHandler) {
return this.activeStreams.remove(streamId, frameHandler);
}
}
Now the requester Mono's and Flux's can be created more easily:
@Override
public Flux<Payload> requestStream(Payload payload) {
return new RequestStreamFlux(payload, this);
}
Likewise for responder Subscriber's:
RequestResponseSubscriber subscriber = new RequestResponseSubscriber(streamId, frame, this);
There might be other opportunities for re-use as well through such a common base class.
f86226d
to
eaeb099
Compare
rsocket-core/src/test/java/io/rsocket/core/StateMachineAssert.java
Outdated
Show resolved
Hide resolved
AtomicLongFieldUpdater<T> updater, | ||
T instance, | ||
ReassembledFramesHolder reassembledFramesHolder, | ||
Subscription subscription, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The subscription is the RequesterFrameHandler
instance. Why don't we add cancel()
to RequesterFrameHandler
or have it extend Subscription
so that only RequesterFrameHandler
is passed in, making it easier to understand where cancel()
is handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to avoid mixing RSocket spec frame handling with the reactive streams. First of all, it makes it impossible to understand from where method was invoked, especially when it comes to requestChannel case. Initially, it was reactive streams interfaces, but after lots of issues with understanding where and how should I handle every call (depends on the inbound vs outbound) I decided to get rid of that idea and make things fully separate
rsocket-core/src/main/java/io/rsocket/core/ReassemblyUtils.java
Outdated
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java
Outdated
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java
Outdated
Show resolved
Hide resolved
final Payload p = this.payload; | ||
try { | ||
if (!isValid(this.mtu, this.maxFrameLength, p, false)) { | ||
lazyTerminate(STATE, this); | ||
Operators.error( | ||
actual, | ||
new IllegalArgumentException( | ||
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength))); | ||
p.release(); | ||
return; | ||
} | ||
} catch (IllegalReferenceCountException e) { | ||
lazyTerminate(STATE, this); | ||
Operators.error(actual, e); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't the payload be validated before subscription, basically as soon as it is provided? Maybe RSocketRequester
could hold this logic and make the check before creating a requester Mono/Flux, thus also re-using the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case, it is going to be code duplication. The only different - that code would be in a different place. Thus, once something needed to be adjusted - we would need to look at more places and not only to Requester / Responder operators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't dig far deeper after yesterday's live review. it seems @rstoyanchev has already spotted quite a few elements to improve. overall I'd try to add more comments and documentations, especially on utils classes (like what's been done in StateUtils
👍)
rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java
Outdated
Show resolved
Hide resolved
eaeb099
to
e3b7998
Compare
93f9d5c
to
2854e04
Compare
Done. Ready for another round of review |
c531667
to
0b63bbf
Compare
Applied most of requested changes. Will be merging this one. More polishing can be done in the followups to this PR |
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
0b63bbf
to
8be980e
Compare
Publisher
/Subscriber
for each request type
Publisher
/Subscriber
for each request typePublisher
/Subscriber
for each request type #761
This PR provides fully reworked internals for all possible interactions for Rsocket requester and responder
fixes #742 #641 #613 #641 #760