From 7a4602c8d3bd38ac8154c7c7ad2ddcaa69f1c707 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 10 Aug 2020 18:16:17 +0300 Subject: [PATCH 1/7] bumps reactor and netty version Signed-off-by: Oleh Dokuka --- build.gradle | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/build.gradle b/build.gradle index 1829a512f..a576f22f8 100644 --- a/build.gradle +++ b/build.gradle @@ -32,11 +32,10 @@ subprojects { apply plugin: 'io.spring.dependency-management' apply plugin: 'com.github.sherter.google-java-format' - ext['reactor-bom.version'] = 'Dysprosium-BUILD-SNAPSHOT' + ext['reactor-bom.version'] = 'Dysprosium-SR11' ext['logback.version'] = '1.2.3' - ext['findbugs.version'] = '3.0.2' - ext['netty-bom.version'] = '4.1.50.Final' - ext['netty-boringssl.version'] = '2.0.30.Final' + ext['netty-bom.version'] = '4.1.51.Final' + ext['netty-boringssl.version'] = '2.0.31.Final' ext['hdrhistogram.version'] = '2.1.10' ext['mockito.version'] = '3.2.0' ext['slf4j.version'] = '1.7.25' From 5a3dc4b12f8198b4ca080f503ce5c5ba5ff882ed Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 10 Aug 2020 19:27:19 +0300 Subject: [PATCH 2/7] updates project versions and docs Signed-off-by: Oleh Dokuka --- README.md | 8 ++++---- gradle.properties | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 5b5193329..d54a42dad 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,8 @@ repositories { mavenCentral() } dependencies { - implementation 'io.rsocket:rsocket-core:1.0.1' - implementation 'io.rsocket:rsocket-transport-netty:1.0.1' + implementation 'io.rsocket:rsocket-core:1.0.2' + implementation 'io.rsocket:rsocket-transport-netty:1.0.2' } ``` @@ -40,8 +40,8 @@ repositories { maven { url 'https://oss.jfrog.org/oss-snapshot-local' } } dependencies { - implementation 'io.rsocket:rsocket-core:1.0.2-SNAPSHOT' - implementation 'io.rsocket:rsocket-transport-netty:1.0.2-SNAPSHOT' + implementation 'io.rsocket:rsocket-core:1.0.3-SNAPSHOT' + implementation 'io.rsocket:rsocket-transport-netty:1.0.3-SNAPSHOT' } ``` diff --git a/gradle.properties b/gradle.properties index 9021ebfab..09eb2d90f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,5 +11,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=1.0.2 -perfBaselineVersion=1.0.1 +version=1.0.3 +perfBaselineVersion=1.0.2 From 18050edda0dff73682d6643e8aca9c5a89ffbd51 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 22 Sep 2020 16:08:36 +0100 Subject: [PATCH 3/7] Avoid queueing in UnicastProcessor receivers Closes gh-887 Signed-off-by: Rossen Stoyanchev --- .../src/main/java/io/rsocket/core/RSocketRequester.java | 6 +++--- .../src/main/java/io/rsocket/core/RSocketResponder.java | 5 +++-- rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java | 6 ++++-- .../test/java/io/rsocket/integration/TestingStreaming.java | 3 +++ 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 2ecdec215..272194bb2 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -348,7 +348,7 @@ private Flux handleRequestStream(final Payload payload) { } final UnboundedProcessor sendProcessor = this.sendProcessor; - final UnicastProcessor receiver = UnicastProcessor.create(); + final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); final AtomicBoolean once = new AtomicBoolean(); return Flux.defer( @@ -456,7 +456,7 @@ private Flux handleChannel(Flux request) { private Flux handleChannel(Payload initialPayload, Flux inboundFlux) { final UnboundedProcessor sendProcessor = this.sendProcessor; - final UnicastProcessor receiver = UnicastProcessor.create(); + final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); return receiver .transform( diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 581605ff4..3e2c06e92 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ import reactor.core.Exceptions; import reactor.core.publisher.*; import reactor.util.annotation.Nullable; +import reactor.util.concurrent.Queues; /** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */ class RSocketResponder implements RSocket { @@ -537,7 +538,7 @@ protected void hookOnError(Throwable throwable) { } private void handleChannel(int streamId, Payload payload, long initialRequestN) { - UnicastProcessor frames = UnicastProcessor.create(); + UnicastProcessor frames = UnicastProcessor.create(Queues.one().get()); channelProcessors.put(streamId, frames); Flux payloads = diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index 1e7bb337f..d3e614e1c 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -158,13 +158,13 @@ public Flux requestChannel(Publisher payloads) { } @Test(timeout = 2000) - public void testStream() throws Exception { + public void testStream() { Flux responses = rule.crs.requestStream(DefaultPayload.create("Payload In")); StepVerifier.create(responses).expectNextCount(10).expectComplete().verify(); } @Test(timeout = 2000) - public void testChannel() throws Exception { + public void testChannel() { Flux requests = Flux.range(0, 10).map(i -> DefaultPayload.create("streaming in -> " + i)); Flux responses = rule.crs.requestChannel(requests); @@ -543,6 +543,7 @@ public Mono requestResponse(Payload payload) { @Override public Flux requestStream(Payload payload) { return Flux.range(1, 10) + .delaySubscription(Duration.ofMillis(100)) .map( i -> DefaultPayload.create("server got -> [" + payload.toString() + "]")); } @@ -556,6 +557,7 @@ public Flux requestChannel(Publisher payloads) { .subscribe(); return Flux.range(1, 10) + .delaySubscription(Duration.ofMillis(100)) .map( payload -> DefaultPayload.create("server got -> [" + payload.toString() + "]")); diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java index 7d34ba478..f583355e6 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java @@ -49,6 +49,7 @@ public void testRangeButThrowException() { } }) .map(l -> DefaultPayload.create("l -> " + l)) + .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block(); @@ -71,6 +72,7 @@ public void testRangeOfConsumers() { payload -> Flux.range(1, 1000) .map(l -> DefaultPayload.create("l -> " + l)) + .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block(); @@ -104,6 +106,7 @@ public void testSingleConsumer() { payload -> Flux.range(1, 10_000) .map(l -> DefaultPayload.create("l -> " + l)) + .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block(); From 38a38e93ad06a8286788d70d7636a63f7ff5de4f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 23 Sep 2020 17:42:41 +0100 Subject: [PATCH 4/7] Refactoring in RequestOperator --- .../io/rsocket/core/RSocketRequester.java | 477 ++++++++---------- .../java/io/rsocket/core/RequestOperator.java | 36 +- .../java/io/rsocket/core/RSocketTest.java | 15 +- .../rsocket/integration/TestingStreaming.java | 3 - 4 files changed, 260 insertions(+), 271 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 272194bb2..a249ea888 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -64,7 +64,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Operators; import reactor.core.publisher.SignalType; import reactor.core.publisher.UnicastProcessor; import reactor.core.scheduler.Scheduler; @@ -267,68 +266,54 @@ private Mono handleRequestResponse(final Payload payload) { final UnboundedProcessor sendProcessor = this.sendProcessor; final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); - final AtomicBoolean once = new AtomicBoolean(); + return Mono.fromDirect( + new RequestOperator( + receiver.next(), "RequestResponseMono allows only a single subscriber") { - return Mono.defer( - () -> { - if (once.getAndSet(true)) { - return Mono.error( - new IllegalStateException("RequestResponseMono allows only a single subscriber")); - } + @Override + void hookOnFirstRequest(long n) { + if (isDisposed()) { + payload.release(); + final Throwable t = terminationError; + receiver.onError(t); + return; + } - return receiver - .next() - .transform( - Operators.lift( - (s, actual) -> - new RequestOperator(actual) { - - @Override - void hookOnFirstRequest(long n) { - if (isDisposed()) { - payload.release(); - final Throwable t = terminationError; - receiver.onError(t); - return; - } - - RequesterLeaseHandler lh = leaseHandler; - if (!lh.useLease()) { - payload.release(); - receiver.onError(lh.leaseError()); - return; - } - - int streamId = streamIdSupplier.nextStreamId(receivers); - this.streamId = streamId; - - ByteBuf requestResponseFrame = - RequestResponseFrameCodec.encodeReleasingPayload( - allocator, streamId, payload); - - receivers.put(streamId, receiver); - sendProcessor.onNext(requestResponseFrame); - } - - @Override - void hookOnCancel() { - if (receivers.remove(streamId, receiver)) { - sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); - } else { - if (this.firstRequest) { - payload.release(); - } - } - } - - @Override - public void hookOnTerminal(SignalType signalType) { - receivers.remove(streamId, receiver); - } - })) - .subscribeOn(serialScheduler) - .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); - }); + RequesterLeaseHandler lh = leaseHandler; + if (!lh.useLease()) { + payload.release(); + receiver.onError(lh.leaseError()); + return; + } + + int streamId = streamIdSupplier.nextStreamId(receivers); + this.streamId = streamId; + + ByteBuf requestResponseFrame = + RequestResponseFrameCodec.encodeReleasingPayload(allocator, streamId, payload); + + receivers.put(streamId, receiver); + sendProcessor.onNext(requestResponseFrame); + } + + @Override + void hookOnCancel() { + if (receivers.remove(streamId, receiver)) { + sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); + } else { + if (this.firstRequest) { + payload.release(); + } + } + } + + @Override + public void hookOnTerminal(SignalType signalType) { + receivers.remove(streamId, receiver); + } + }) + .subscribeOn(serialScheduler) + .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); } private Flux handleRequestStream(final Payload payload) { @@ -349,78 +334,64 @@ private Flux handleRequestStream(final Payload payload) { final UnboundedProcessor sendProcessor = this.sendProcessor; final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); - final AtomicBoolean once = new AtomicBoolean(); - return Flux.defer( - () -> { - if (once.getAndSet(true)) { - return Flux.error( - new IllegalStateException("RequestStreamFlux allows only a single subscriber")); - } + return Flux.from( + new RequestOperator(receiver, "RequestStreamFlux allows only a single subscriber") { - return receiver - .transform( - Operators.lift( - (s, actual) -> - new RequestOperator(actual) { - - @Override - void hookOnFirstRequest(long n) { - if (isDisposed()) { - payload.release(); - final Throwable t = terminationError; - receiver.onError(t); - return; - } - - RequesterLeaseHandler lh = leaseHandler; - if (!lh.useLease()) { - payload.release(); - receiver.onError(lh.leaseError()); - return; - } - - int streamId = streamIdSupplier.nextStreamId(receivers); - this.streamId = streamId; - - ByteBuf requestStreamFrame = - RequestStreamFrameCodec.encodeReleasingPayload( - allocator, streamId, n, payload); - - receivers.put(streamId, receiver); - - sendProcessor.onNext(requestStreamFrame); - } - - @Override - void hookOnRemainingRequests(long n) { - if (receiver.isDisposed()) { - return; - } - - sendProcessor.onNext( - RequestNFrameCodec.encode(allocator, streamId, n)); - } - - @Override - void hookOnCancel() { - if (receivers.remove(streamId, receiver)) { - sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); - } else { - if (this.firstRequest) { - payload.release(); - } - } - } - - @Override - void hookOnTerminal(SignalType signalType) { - receivers.remove(streamId); - } - })) - .subscribeOn(serialScheduler, false) - .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); - }); + @Override + void hookOnFirstRequest(long n) { + if (isDisposed()) { + payload.release(); + final Throwable t = terminationError; + receiver.onError(t); + return; + } + + RequesterLeaseHandler lh = leaseHandler; + if (!lh.useLease()) { + payload.release(); + receiver.onError(lh.leaseError()); + return; + } + + int streamId = streamIdSupplier.nextStreamId(receivers); + this.streamId = streamId; + + ByteBuf requestStreamFrame = + RequestStreamFrameCodec.encodeReleasingPayload(allocator, streamId, n, payload); + + receivers.put(streamId, receiver); + + sendProcessor.onNext(requestStreamFrame); + } + + @Override + void hookOnRemainingRequests(long n) { + if (receiver.isDisposed()) { + return; + } + + sendProcessor.onNext(RequestNFrameCodec.encode(allocator, streamId, n)); + } + + @Override + void hookOnCancel() { + if (receivers.remove(streamId, receiver)) { + sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); + } else { + if (this.firstRequest) { + payload.release(); + } + } + } + + @Override + void hookOnTerminal(SignalType signalType) { + receivers.remove(streamId); + } + }) + .subscribeOn(serialScheduler, false) + .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); } private Flux handleChannel(Flux request) { @@ -458,135 +429,133 @@ private Flux handleChannel(Payload initialPayload, Flux receiver = UnicastProcessor.create(Queues.one().get()); - return receiver - .transform( - Operators.lift( - (s, actual) -> - new RequestOperator(actual) { - - final BaseSubscriber upstreamSubscriber = - new BaseSubscriber() { - - boolean first = true; - - @Override - protected void hookOnSubscribe(Subscription subscription) { - // noops - } - - @Override - protected void hookOnNext(Payload payload) { - if (first) { - // need to skip first since we have already sent it - // no need to release it since it was released earlier on the - // request - // establishment - // phase - first = false; - request(1); - return; - } - if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) { - payload.release(); - cancel(); - final IllegalArgumentException t = - new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); - // no need to send any errors. - sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); - receiver.onError(t); - return; - } - final ByteBuf frame = - PayloadFrameCodec.encodeNextReleasingPayload( - allocator, streamId, payload); - - sendProcessor.onNext(frame); - } - - @Override - protected void hookOnComplete() { - ByteBuf frame = PayloadFrameCodec.encodeComplete(allocator, streamId); - sendProcessor.onNext(frame); - } - - @Override - protected void hookOnError(Throwable t) { - ByteBuf frame = ErrorFrameCodec.encode(allocator, streamId, t); - sendProcessor.onNext(frame); - receiver.onError(t); - } - - @Override - protected void hookFinally(SignalType type) { - senders.remove(streamId, this); - } - }; - - @Override - void hookOnFirstRequest(long n) { - if (isDisposed()) { - initialPayload.release(); - final Throwable t = terminationError; - upstreamSubscriber.cancel(); - receiver.onError(t); - return; - } - - RequesterLeaseHandler lh = leaseHandler; - if (!lh.useLease()) { - initialPayload.release(); - receiver.onError(lh.leaseError()); - return; - } - - final int streamId = streamIdSupplier.nextStreamId(receivers); - this.streamId = streamId; - - final ByteBuf frame = - RequestChannelFrameCodec.encodeReleasingPayload( - allocator, streamId, false, n, initialPayload); - - senders.put(streamId, upstreamSubscriber); - receivers.put(streamId, receiver); - - inboundFlux - .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER) - .subscribe(upstreamSubscriber); - - sendProcessor.onNext(frame); + return Flux.from( + new RequestOperator( + receiver, "RequestStreamFlux allows only a " + "single subscriber") { + + final BaseSubscriber upstreamSubscriber = + new BaseSubscriber() { + + boolean first = true; + + @Override + protected void hookOnSubscribe(Subscription subscription) { + // noops + } + + @Override + protected void hookOnNext(Payload payload) { + if (first) { + // need to skip first since we have already sent it + // no need to release it since it was released earlier on the + // request + // establishment + // phase + first = false; + request(1); + return; + } + if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) { + payload.release(); + cancel(); + final IllegalArgumentException t = + new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); + // no need to send any errors. + sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); + receiver.onError(t); + return; } + final ByteBuf frame = + PayloadFrameCodec.encodeNextReleasingPayload( + allocator, streamId, payload); + + sendProcessor.onNext(frame); + } + + @Override + protected void hookOnComplete() { + ByteBuf frame = PayloadFrameCodec.encodeComplete(allocator, streamId); + sendProcessor.onNext(frame); + } + + @Override + protected void hookOnError(Throwable t) { + ByteBuf frame = ErrorFrameCodec.encode(allocator, streamId, t); + sendProcessor.onNext(frame); + receiver.onError(t); + } + + @Override + protected void hookFinally(SignalType type) { + senders.remove(streamId, this); + } + }; + + @Override + void hookOnFirstRequest(long n) { + if (isDisposed()) { + initialPayload.release(); + final Throwable t = terminationError; + upstreamSubscriber.cancel(); + receiver.onError(t); + return; + } - @Override - void hookOnRemainingRequests(long n) { - if (receiver.isDisposed()) { - return; - } + RequesterLeaseHandler lh = leaseHandler; + if (!lh.useLease()) { + initialPayload.release(); + receiver.onError(lh.leaseError()); + return; + } - sendProcessor.onNext(RequestNFrameCodec.encode(allocator, streamId, n)); - } + final int streamId = streamIdSupplier.nextStreamId(receivers); + this.streamId = streamId; - @Override - void hookOnCancel() { - senders.remove(streamId, upstreamSubscriber); - if (receivers.remove(streamId, receiver)) { - sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); - } - } + final ByteBuf frame = + RequestChannelFrameCodec.encodeReleasingPayload( + allocator, streamId, false, n, initialPayload); - @Override - void hookOnTerminal(SignalType signalType) { - if (signalType == SignalType.ON_ERROR) { - upstreamSubscriber.cancel(); - } - receivers.remove(streamId, receiver); - } + senders.put(streamId, upstreamSubscriber); + receivers.put(streamId, receiver); - @Override - public void cancel() { - upstreamSubscriber.cancel(); - super.cancel(); - } - })) + inboundFlux + .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER) + .subscribe(upstreamSubscriber); + + sendProcessor.onNext(frame); + } + + @Override + void hookOnRemainingRequests(long n) { + if (receiver.isDisposed()) { + return; + } + + sendProcessor.onNext(RequestNFrameCodec.encode(allocator, streamId, n)); + } + + @Override + void hookOnCancel() { + senders.remove(streamId, upstreamSubscriber); + if (receivers.remove(streamId, receiver)) { + sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); + } + } + + @Override + void hookOnTerminal(SignalType signalType) { + if (signalType == SignalType.ON_ERROR) { + upstreamSubscriber.cancel(); + } + receivers.remove(streamId, receiver); + } + + @Override + public void cancel() { + upstreamSubscriber.cancel(); + super.cancel(); + } + }) .subscribeOn(serialScheduler, false); } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java index 6123b0492..dbca5fef2 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java @@ -1,14 +1,17 @@ package io.rsocket.core; import io.rsocket.Payload; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.CorePublisher; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.publisher.Operators; import reactor.core.publisher.SignalType; import reactor.util.context.Context; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + /** * This is a support class for handling of request input, intended for use with {@link * Operators#lift}. It ensures serial execution of cancellation vs first request signals and also @@ -16,9 +19,14 @@ * invocations. */ abstract class RequestOperator - implements CoreSubscriber, Fuseable.QueueSubscription { + implements CoreSubscriber, + CorePublisher, + Fuseable.QueueSubscription, + Fuseable { - final CoreSubscriber actual; + final String errorMessageOnSecondSubscription; + + CoreSubscriber actual; Subscription s; Fuseable.QueueSubscription qs; @@ -30,8 +38,25 @@ abstract class RequestOperator static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(RequestOperator.class, "wip"); - RequestOperator(CoreSubscriber actual) { - this.actual = actual; + RequestOperator(CorePublisher source, String errorMessageOnSecondSubscription) { + this.errorMessageOnSecondSubscription = errorMessageOnSecondSubscription; + source.subscribe(this); + WIP.lazySet(this, -1); + } + + @Override + public void subscribe(Subscriber actual) { + subscribe(Operators.toCoreSubscriber(actual)); + } + + @Override + public void subscribe(CoreSubscriber actual) { + if (this.wip == -1 && WIP.compareAndSet(this, -1, 0)) { + this.actual = actual; + actual.onSubscribe(this); + } else { + Operators.error(actual, new IllegalStateException(this.errorMessageOnSecondSubscription)); + } } /** @@ -129,7 +154,6 @@ public void onSubscribe(Subscription s) { if (s instanceof Fuseable.QueueSubscription) { this.qs = (Fuseable.QueueSubscription) s; } - this.actual.onSubscribe(this); } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index d3e614e1c..d78a1d032 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -16,8 +16,6 @@ package io.rsocket.core; -import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; @@ -33,10 +31,6 @@ import io.rsocket.test.util.LocalDuplexConnection; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; -import java.time.Duration; -import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.Assertions; import org.junit.Rule; import org.junit.Test; @@ -52,6 +46,13 @@ import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicReference; + +import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK; + public class RSocketTest { @Rule public final SocketRule rule = new SocketRule(); @@ -543,7 +544,6 @@ public Mono requestResponse(Payload payload) { @Override public Flux requestStream(Payload payload) { return Flux.range(1, 10) - .delaySubscription(Duration.ofMillis(100)) .map( i -> DefaultPayload.create("server got -> [" + payload.toString() + "]")); } @@ -557,7 +557,6 @@ public Flux requestChannel(Publisher payloads) { .subscribe(); return Flux.range(1, 10) - .delaySubscription(Duration.ofMillis(100)) .map( payload -> DefaultPayload.create("server got -> [" + payload.toString() + "]")); diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java index f583355e6..7d34ba478 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java @@ -49,7 +49,6 @@ public void testRangeButThrowException() { } }) .map(l -> DefaultPayload.create("l -> " + l)) - .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block(); @@ -72,7 +71,6 @@ public void testRangeOfConsumers() { payload -> Flux.range(1, 1000) .map(l -> DefaultPayload.create("l -> " + l)) - .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block(); @@ -106,7 +104,6 @@ public void testSingleConsumer() { payload -> Flux.range(1, 10_000) .map(l -> DefaultPayload.create("l -> " + l)) - .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block(); From b5952f6cebf9ba61c7ae0bc4afd1fc47624be836 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 23 Sep 2020 19:43:27 +0100 Subject: [PATCH 5/7] Fix formatting --- .../main/java/io/rsocket/core/RequestOperator.java | 3 +-- .../src/test/java/io/rsocket/core/RSocketTest.java | 13 ++++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java index dbca5fef2..f95a5f66c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java @@ -1,6 +1,7 @@ package io.rsocket.core; import io.rsocket.Payload; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CorePublisher; @@ -10,8 +11,6 @@ import reactor.core.publisher.SignalType; import reactor.util.context.Context; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - /** * This is a support class for handling of request input, intended for use with {@link * Operators#lift}. It ensures serial execution of cancellation vs first request signals and also diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index d78a1d032..7320d9ade 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -16,6 +16,8 @@ package io.rsocket.core; +import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; @@ -31,6 +33,10 @@ import io.rsocket.test.util.LocalDuplexConnection; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.Assertions; import org.junit.Rule; import org.junit.Test; @@ -46,13 +52,6 @@ import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; -import java.time.Duration; -import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.atomic.AtomicReference; - -import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK; - public class RSocketTest { @Rule public final SocketRule rule = new SocketRule(); From c4f54673320eb14a2a24624e11087df89072afce Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 22 Sep 2020 18:18:09 +0100 Subject: [PATCH 6/7] Use static errors in default RSocket methods Closes gh-865 Signed-off-by: Rossen Stoyanchev --- .../src/main/java/io/rsocket/RSocket.java | 16 ++--- .../main/java/io/rsocket/RSocketAdapter.java | 67 +++++++++++++++++++ 2 files changed, 73 insertions(+), 10 deletions(-) create mode 100644 rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java index 773c93dc2..b05241365 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,8 +34,7 @@ public interface RSocket extends Availability, Closeable { * handled, otherwise errors. */ default Mono fireAndForget(Payload payload) { - payload.release(); - return Mono.error(new UnsupportedOperationException("Fire-and-Forget not implemented.")); + return RSocketAdapter.fireAndForget(payload); } /** @@ -46,8 +45,7 @@ default Mono fireAndForget(Payload payload) { * response. */ default Mono requestResponse(Payload payload) { - payload.release(); - return Mono.error(new UnsupportedOperationException("Request-Response not implemented.")); + return RSocketAdapter.requestResponse(payload); } /** @@ -57,8 +55,7 @@ default Mono requestResponse(Payload payload) { * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. */ default Flux requestStream(Payload payload) { - payload.release(); - return Flux.error(new UnsupportedOperationException("Request-Stream not implemented.")); + return RSocketAdapter.requestStream(payload); } /** @@ -68,7 +65,7 @@ default Flux requestStream(Payload payload) { * @return Stream of response payloads. */ default Flux requestChannel(Publisher payloads) { - return Flux.error(new UnsupportedOperationException("Request-Channel not implemented.")); + return RSocketAdapter.requestChannel(payloads); } /** @@ -79,8 +76,7 @@ default Flux requestChannel(Publisher payloads) { * handled, otherwise errors. */ default Mono metadataPush(Payload payload) { - payload.release(); - return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented.")); + return RSocketAdapter.metadataPush(payload); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java b/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java new file mode 100644 index 000000000..5e250f37f --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java @@ -0,0 +1,67 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Package private class with default implementations for use in {@link RSocket}. The main purpose + * is to hide static {@link UnsupportedOperationException} declarations. + */ +class RSocketAdapter { + + private static final Mono UNSUPPORTED_FIRE_AND_FORGET = + Mono.error(new UnsupportedOperationException("Fire-and-Forget not implemented.")); + + private static final Mono UNSUPPORTED_REQUEST_RESPONSE = + Mono.error(new UnsupportedOperationException("Request-Response not implemented.")); + + private static final Flux UNSUPPORTED_REQUEST_STREAM = + Flux.error(new UnsupportedOperationException("Request-Stream not implemented.")); + + private static final Flux UNSUPPORTED_REQUEST_CHANNEL = + Flux.error(new UnsupportedOperationException("Request-Channel not implemented.")); + + private static final Mono UNSUPPORTED_METADATA_PUSH = + Mono.error(new UnsupportedOperationException("Metadata-Push not implemented.")); + + static Mono fireAndForget(Payload payload) { + payload.release(); + return RSocketAdapter.UNSUPPORTED_FIRE_AND_FORGET; + } + + static Mono requestResponse(Payload payload) { + payload.release(); + return RSocketAdapter.UNSUPPORTED_REQUEST_RESPONSE; + } + + static Flux requestStream(Payload payload) { + payload.release(); + return RSocketAdapter.UNSUPPORTED_REQUEST_STREAM; + } + + static Flux requestChannel(Publisher payloads) { + return RSocketAdapter.UNSUPPORTED_REQUEST_CHANNEL; + } + + static Mono metadataPush(Payload payload) { + payload.release(); + return RSocketAdapter.UNSUPPORTED_METADATA_PUSH; + } +} From 4698adb9a4c6a203559f46d08a352a1a41fa25df Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 24 Sep 2020 14:20:57 +0100 Subject: [PATCH 7/7] Turn off suppressed exceptions (to squash) Signed-off-by: Rossen Stoyanchev --- .../main/java/io/rsocket/RSocketAdapter.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java b/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java index 5e250f37f..b5a64b8dd 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java @@ -23,23 +23,25 @@ /** * Package private class with default implementations for use in {@link RSocket}. The main purpose * is to hide static {@link UnsupportedOperationException} declarations. + * + * @since 1.0.3 */ class RSocketAdapter { private static final Mono UNSUPPORTED_FIRE_AND_FORGET = - Mono.error(new UnsupportedOperationException("Fire-and-Forget not implemented.")); + Mono.error(new UnsupportedInteractionException("Fire-and-Forget")); private static final Mono UNSUPPORTED_REQUEST_RESPONSE = - Mono.error(new UnsupportedOperationException("Request-Response not implemented.")); + Mono.error(new UnsupportedInteractionException("Request-Response")); private static final Flux UNSUPPORTED_REQUEST_STREAM = - Flux.error(new UnsupportedOperationException("Request-Stream not implemented.")); + Flux.error(new UnsupportedInteractionException("Request-Stream")); private static final Flux UNSUPPORTED_REQUEST_CHANNEL = - Flux.error(new UnsupportedOperationException("Request-Channel not implemented.")); + Flux.error(new UnsupportedInteractionException("Request-Channel")); private static final Mono UNSUPPORTED_METADATA_PUSH = - Mono.error(new UnsupportedOperationException("Metadata-Push not implemented.")); + Mono.error(new UnsupportedInteractionException("Metadata-Push")); static Mono fireAndForget(Payload payload) { payload.release(); @@ -64,4 +66,13 @@ static Mono metadataPush(Payload payload) { payload.release(); return RSocketAdapter.UNSUPPORTED_METADATA_PUSH; } + + private static class UnsupportedInteractionException extends RuntimeException { + + private static final long serialVersionUID = 5084623297446471999L; + + UnsupportedInteractionException(String interactionName) { + super(interactionName + " not implemented.", null, false, false); + } + } }