diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index 02f1a51cc..bbe09be5e 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -23,6 +23,7 @@ import io.rsocket.RSocket; import io.rsocket.SocketAcceptor; import io.rsocket.fragmentation.FragmentationDuplexConnection; +import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.frame.SetupFrameCodec; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.internal.ClientServerInputMultiplexer; @@ -400,14 +401,7 @@ public RSocketConnector lease(Supplier> supplier) { * and Reassembly */ public RSocketConnector fragment(int mtu) { - if (mtu > 0 && mtu < FragmentationDuplexConnection.MIN_MTU_SIZE || mtu < 0) { - String msg = - String.format( - "The smallest allowed mtu size is %d bytes, provided: %d", - FragmentationDuplexConnection.MIN_MTU_SIZE, mtu); - throw new IllegalArgumentException(msg); - } - this.mtu = mtu; + this.mtu = FragmentationDuplexConnection.assertMtu(mtu); return this; } @@ -468,8 +462,15 @@ public Mono connect(ClientTransport transport) { * @return a {@code Mono} with the connected RSocket */ public Mono connect(Supplier transportSupplier) { + Mono connectionMono = - Mono.fromSupplier(transportSupplier).flatMap(t -> t.connect(mtu)); + Mono.fromSupplier(transportSupplier) + .flatMap(ClientTransport::connect) + .map( + connection -> + mtu > 0 + ? new FragmentationDuplexConnection(connection, mtu, "client") + : new ReassemblyDuplexConnection(connection)); return connectionMono .flatMap( connection -> { diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index c5734cecc..44dd8765e 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -26,6 +26,7 @@ import io.rsocket.exceptions.InvalidSetupException; import io.rsocket.exceptions.RejectedSetupException; import io.rsocket.fragmentation.FragmentationDuplexConnection; +import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.frame.FrameHeaderCodec; import io.rsocket.frame.SetupFrameCodec; import io.rsocket.frame.decoder.PayloadDecoder; @@ -211,14 +212,7 @@ public RSocketServer lease(Supplier> supplier) { * and Reassembly */ public RSocketServer fragment(int mtu) { - if (mtu > 0 && mtu < FragmentationDuplexConnection.MIN_MTU_SIZE || mtu < 0) { - String msg = - String.format( - "The smallest allowed mtu size is %d bytes, provided: %d", - FragmentationDuplexConnection.MIN_MTU_SIZE, mtu); - throw new IllegalArgumentException(msg); - } - this.mtu = mtu; + this.mtu = FragmentationDuplexConnection.assertMtu(mtu); return this; } @@ -273,7 +267,7 @@ public Mono bind(ServerTransport transport) { @Override public Mono get() { return transport - .start(duplexConnection -> acceptor(serverSetup, duplexConnection), mtu) + .start(duplexConnection -> acceptor(serverSetup, duplexConnection)) .doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe()); } }); @@ -305,6 +299,11 @@ public Mono apply(DuplexConnection connection) { } private Mono acceptor(ServerSetup serverSetup, DuplexConnection connection) { + connection = + mtu > 0 + ? new FragmentationDuplexConnection(connection, mtu, "server") + : new ReassemblyDuplexConnection(connection); + ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection, interceptors, false); diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index 5d89bb9ad..1124698b0 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -22,7 +22,6 @@ import io.netty.buffer.ByteBufUtil; import io.rsocket.DuplexConnection; import io.rsocket.frame.FrameHeaderCodec; -import io.rsocket.frame.FrameLengthCodec; import io.rsocket.frame.FrameType; import java.util.Objects; import org.reactivestreams.Publisher; @@ -30,7 +29,6 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.annotation.Nullable; /** * A {@link DuplexConnection} implementation that fragments and reassembles {@link ByteBuf}s. @@ -46,15 +44,19 @@ public final class FragmentationDuplexConnection extends ReassemblyDuplexConnect private final DuplexConnection delegate; private final int mtu; private final FrameReassembler frameReassembler; - private final boolean encodeLength; private final String type; - public FragmentationDuplexConnection( - DuplexConnection delegate, int mtu, boolean encodeAndEncodeLength, String type) { - super(delegate, encodeAndEncodeLength); + /** + * Class constructor. + * + * @param delegate the underlying connection + * @param mtu the fragment size, greater than {@link #MIN_MTU_SIZE} + * @param type a label to use for logging purposes + */ + public FragmentationDuplexConnection(DuplexConnection delegate, int mtu, String type) { + super(delegate); Objects.requireNonNull(delegate, "delegate must not be null"); - this.encodeLength = encodeAndEncodeLength; this.delegate = delegate; this.mtu = assertMtu(mtu); this.frameReassembler = new FrameReassembler(delegate.alloc()); @@ -67,32 +69,17 @@ private boolean shouldFragment(FrameType frameType, int readableBytes) { return frameType.isFragmentable() && readableBytes > mtu; } - /*TODO this is nullable and not returning empty to workaround javac 11.0.3 compiler issue on ubuntu (at least) */ - @Nullable - public static Mono checkMtu(int mtu) { - if (isInsufficientMtu(mtu)) { + public static int assertMtu(int mtu) { + if (mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0) { String msg = - String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu); - return Mono.error(new IllegalArgumentException(msg)); - } else { - return null; - } - } - - private static int assertMtu(int mtu) { - if (isInsufficientMtu(mtu)) { - String msg = - String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu); + String.format( + "The smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu); throw new IllegalArgumentException(msg); } else { return mtu; } } - private static boolean isInsufficientMtu(int mtu) { - return mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0; - } - @Override public Mono send(Publisher frames) { return Flux.from(frames).concatMap(this::sendOne).then(); @@ -102,34 +89,22 @@ public Mono send(Publisher frames) { public Mono sendOne(ByteBuf frame) { FrameType frameType = FrameHeaderCodec.frameType(frame); int readableBytes = frame.readableBytes(); - if (shouldFragment(frameType, readableBytes)) { - if (logger.isDebugEnabled()) { - return delegate.send( - Flux.from(fragmentFrame(alloc(), mtu, frame, frameType, encodeLength)) - .doOnNext( - byteBuf -> { - ByteBuf f = encodeLength ? FrameLengthCodec.frame(byteBuf) : byteBuf; - logger.debug( - "{} - stream id {} - frame type {} - \n {}", - type, - FrameHeaderCodec.streamId(f), - FrameHeaderCodec.frameType(f), - ByteBufUtil.prettyHexDump(f)); - })); - } else { - return delegate.send( - Flux.from(fragmentFrame(alloc(), mtu, frame, frameType, encodeLength))); - } - } else { - return delegate.sendOne(encode(frame)); + if (!shouldFragment(frameType, readableBytes)) { + return delegate.sendOne(frame); } - } - - private ByteBuf encode(ByteBuf frame) { - if (encodeLength) { - return FrameLengthCodec.encode(alloc(), frame.readableBytes(), frame); - } else { - return frame; + Flux fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType)); + if (logger.isDebugEnabled()) { + fragments = + fragments.doOnNext( + byteBuf -> { + logger.debug( + "{} - stream id {} - frame type {} - \n {}", + type, + FrameHeaderCodec.streamId(byteBuf), + FrameHeaderCodec.frameType(byteBuf), + ByteBufUtil.prettyHexDump(byteBuf)); + }); } + return delegate.send(fragments); } } diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java index 4b8fd36e9..fcb6198a3 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java @@ -21,7 +21,6 @@ import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; import io.rsocket.frame.FrameHeaderCodec; -import io.rsocket.frame.FrameLengthCodec; import io.rsocket.frame.FrameType; import io.rsocket.frame.PayloadFrameCodec; import io.rsocket.frame.RequestChannelFrameCodec; @@ -42,11 +41,7 @@ */ final class FrameFragmenter { static Publisher fragmentFrame( - ByteBufAllocator allocator, - int mtu, - final ByteBuf frame, - FrameType frameType, - boolean encodeLength) { + ByteBufAllocator allocator, int mtu, final ByteBuf frame, FrameType frameType) { ByteBuf metadata = getMetadata(frame, frameType); ByteBuf data = getData(frame, frameType); int streamId = FrameHeaderCodec.streamId(frame); @@ -66,7 +61,7 @@ public void accept(SynchronousSink sink) { byteBuf = encodeFollowsFragment(allocator, mtu, streamId, metadata, data); } - sink.next(encode(allocator, byteBuf, encodeLength)); + sink.next(byteBuf); if (!metadata.isReadable() && !data.isReadable()) { sink.complete(); } @@ -237,12 +232,4 @@ static ByteBuf getData(ByteBuf frame, FrameType frameType) { } return data; } - - static ByteBuf encode(ByteBufAllocator allocator, ByteBuf frame, boolean encodeLength) { - if (encodeLength) { - return FrameLengthCodec.encode(allocator, frame.readableBytes(), frame); - } else { - return frame; - } - } } diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java index 6060c0c20..35d4f140e 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java @@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; -import io.rsocket.frame.FrameLengthCodec; import java.util.Objects; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -35,11 +34,10 @@ public class ReassemblyDuplexConnection implements DuplexConnection { private final DuplexConnection delegate; private final FrameReassembler frameReassembler; - private final boolean decodeLength; - public ReassemblyDuplexConnection(DuplexConnection delegate, boolean decodeLength) { + /** Constructor with the underlying delegate to receive frames from. */ + public ReassemblyDuplexConnection(DuplexConnection delegate) { Objects.requireNonNull(delegate, "delegate must not be null"); - this.decodeLength = decodeLength; this.delegate = delegate; this.frameReassembler = new FrameReassembler(delegate.alloc()); @@ -56,23 +54,9 @@ public Mono sendOne(ByteBuf frame) { return delegate.sendOne(frame); } - private ByteBuf decode(ByteBuf frame) { - if (decodeLength) { - return FrameLengthCodec.frame(frame).retain(); - } else { - return frame; - } - } - @Override public Flux receive() { - return delegate - .receive() - .handle( - (byteBuf, sink) -> { - ByteBuf decode = decode(byteBuf); - frameReassembler.reassembleFrame(decode, sink); - }); + return delegate.receive().handle(frameReassembler::reassembleFrame); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/transport/ClientTransport.java b/rsocket-core/src/main/java/io/rsocket/transport/ClientTransport.java index 25fd67097..3b8f624aa 100644 --- a/rsocket-core/src/main/java/io/rsocket/transport/ClientTransport.java +++ b/rsocket-core/src/main/java/io/rsocket/transport/ClientTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,11 +23,9 @@ public interface ClientTransport extends Transport { /** - * Returns a {@code Publisher}, every subscription to which returns a single {@code - * DuplexConnection}. + * Return a {@code Mono} that connects for each subscriber. * - * @param mtu The mtu used for fragmentation - if set to zero fragmentation will be disabled - * @return {@code Publisher}, every subscription returns a single {@code DuplexConnection}. + * @since 1.0.1 */ - Mono connect(int mtu); + Mono connect(); } diff --git a/rsocket-core/src/main/java/io/rsocket/transport/ServerTransport.java b/rsocket-core/src/main/java/io/rsocket/transport/ServerTransport.java index 3adc90cc8..92a9502a4 100644 --- a/rsocket-core/src/main/java/io/rsocket/transport/ServerTransport.java +++ b/rsocket-core/src/main/java/io/rsocket/transport/ServerTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,14 +26,13 @@ public interface ServerTransport extends Transport { /** - * Starts this server. + * Start this server. * - * @param acceptor An acceptor to process a newly accepted {@code DuplexConnection} - * @param mtu The mtu used for fragmentation - if set to zero fragmentation will be disabled - * @return A handle to retrieve information about a started server. - * @throws NullPointerException if {@code acceptor} is {@code null} + * @param acceptor to process a newly accepted connections with + * @return A handle for information about and control over the server. + * @since 1.0.1 */ - Mono start(ConnectionAcceptor acceptor, int mtu); + Mono start(ConnectionAcceptor acceptor); /** A contract to accept a new {@code DuplexConnection}. */ interface ConnectionAcceptor extends Function> { diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java index dc76b5450..3233187c7 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java @@ -64,12 +64,12 @@ public void shouldBeASharedReconnectableInstanceOfRSocketMono() { @SuppressWarnings({"rawtype", "unchecked"}) public void shouldBeRetrieableConnectionSharedReconnectableInstanceOfRSocketMono() { ClientTransport transport = Mockito.mock(ClientTransport.class); - Mockito.when(transport.connect(0)) + Mockito.when(transport.connect()) .thenThrow(UncheckedIOException.class) .thenThrow(UncheckedIOException.class) .thenThrow(UncheckedIOException.class) .thenThrow(UncheckedIOException.class) - .thenReturn(new TestClientTransport().connect(0)); + .thenReturn(new TestClientTransport().connect()); Mono rSocketMono = RSocketConnector.create() .reconnect( @@ -93,13 +93,13 @@ public void shouldBeRetrieableConnectionSharedReconnectableInstanceOfRSocketMono @SuppressWarnings({"rawtype", "unchecked"}) public void shouldBeExaustedRetrieableConnectionSharedReconnectableInstanceOfRSocketMono() { ClientTransport transport = Mockito.mock(ClientTransport.class); - Mockito.when(transport.connect(0)) + Mockito.when(transport.connect()) .thenThrow(UncheckedIOException.class) .thenThrow(UncheckedIOException.class) .thenThrow(UncheckedIOException.class) .thenThrow(UncheckedIOException.class) .thenThrow(UncheckedIOException.class) - .thenReturn(new TestClientTransport().connect(0)); + .thenReturn(new TestClientTransport().connect()); Mono rSocketMono = RSocketConnector.create() .reconnect( diff --git a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java index 2957a051e..b389531f1 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java @@ -137,7 +137,7 @@ private static class SingleConnectionTransport implements ServerTransport start(ConnectionAcceptor acceptor, int mtu) { + public Mono start(ConnectionAcceptor acceptor) { return Mono.just(new TestCloseable(acceptor, conn)); } diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java index 932df4283..8edd25daf 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java @@ -63,23 +63,23 @@ final class FragmentationDuplexConnectionTest { @Test void constructorInvalidMaxFragmentSize() { assertThatIllegalArgumentException() - .isThrownBy(() -> new FragmentationDuplexConnection(delegate, Integer.MIN_VALUE, false, "")) - .withMessage("smallest allowed mtu size is 64 bytes, provided: -2147483648"); + .isThrownBy(() -> new FragmentationDuplexConnection(delegate, Integer.MIN_VALUE, "")) + .withMessage("The smallest allowed mtu size is 64 bytes, provided: -2147483648"); } @DisplayName("constructor throws IllegalArgumentException with negative maxFragmentLength") @Test void constructorMtuLessThanMin() { assertThatIllegalArgumentException() - .isThrownBy(() -> new FragmentationDuplexConnection(delegate, 2, false, "")) - .withMessage("smallest allowed mtu size is 64 bytes, provided: 2"); + .isThrownBy(() -> new FragmentationDuplexConnection(delegate, 2, "")) + .withMessage("The smallest allowed mtu size is 64 bytes, provided: 2"); } @DisplayName("constructor throws NullPointerException with null delegate") @Test void constructorNullDelegate() { assertThatNullPointerException() - .isThrownBy(() -> new FragmentationDuplexConnection(null, 64, false, "")) + .isThrownBy(() -> new FragmentationDuplexConnection(null, 64, "")) .withMessage("delegate must not be null"); } @@ -93,7 +93,7 @@ void sendData() { when(delegate.onClose()).thenReturn(Mono.never()); when(delegate.alloc()).thenReturn(allocator); - new FragmentationDuplexConnection(delegate, 64, false, "").sendOne(encode.retain()); + new FragmentationDuplexConnection(delegate, 64, "").sendOne(encode.retain()); verify(delegate).send(publishers.capture()); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationIntegrationTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationIntegrationTest.java index ff62b56f2..0f15a1959 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationIntegrationTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationIntegrationTest.java @@ -35,8 +35,7 @@ void fragmentAndReassembleData() { frame.retain(); Publisher fragments = - FrameFragmenter.fragmentFrame( - allocator, 64, frame, FrameHeaderCodec.frameType(frame), false); + FrameFragmenter.fragmentFrame(allocator, 64, frame, FrameHeaderCodec.frameType(frame)); FrameReassembler reassembler = new FrameReassembler(allocator); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java index 60dbef74b..4548e4696 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java @@ -272,7 +272,7 @@ void fragmentData() { RequestResponseFrameCodec.encode(allocator, 1, true, null, Unpooled.wrappedBuffer(data)); Publisher fragments = - FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_RESPONSE, false); + FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_RESPONSE); StepVerifier.create(Flux.from(fragments).doOnError(Throwable::printStackTrace)) .expectNextCount(1) @@ -299,7 +299,7 @@ void fragmentMetadata() { allocator, 1, true, 10, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER); Publisher fragments = - FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_STREAM, false); + FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_STREAM); StepVerifier.create(Flux.from(fragments).doOnError(Throwable::printStackTrace)) .expectNextCount(1) @@ -326,7 +326,7 @@ void fragmentDataAndMetadata() { allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); Publisher fragments = - FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_RESPONSE, false); + FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_RESPONSE); StepVerifier.create(Flux.from(fragments).doOnError(Throwable::printStackTrace)) .assertNext( diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java index b083d6841..ac7e8c99b 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java @@ -86,7 +86,7 @@ void reassembleData() { when(delegate.onClose()).thenReturn(Mono.never()); when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, false) + new ReassemblyDuplexConnection(delegate) .receive() .as(StepVerifier::create) .assertNext( @@ -151,7 +151,7 @@ void reassembleMetadata() { when(delegate.onClose()).thenReturn(Mono.never()); when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, false) + new ReassemblyDuplexConnection(delegate) .receive() .as(StepVerifier::create) .assertNext( @@ -219,7 +219,7 @@ void reassembleMetadataAndData() { when(delegate.onClose()).thenReturn(Mono.never()); when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, false) + new ReassemblyDuplexConnection(delegate) .receive() .as(StepVerifier::create) .assertNext( @@ -240,7 +240,7 @@ void reassembleNonFragment() { when(delegate.onClose()).thenReturn(Mono.never()); when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, false) + new ReassemblyDuplexConnection(delegate) .receive() .as(StepVerifier::create) .assertNext( @@ -260,7 +260,7 @@ void reassembleNonFragmentableFrame() { when(delegate.onClose()).thenReturn(Mono.never()); when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, false) + new ReassemblyDuplexConnection(delegate) .receive() .as(StepVerifier::create) .assertNext( diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestClientTransport.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestClientTransport.java index a30e75875..259618300 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestClientTransport.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestClientTransport.java @@ -12,7 +12,7 @@ public class TestClientTransport implements ClientTransport { private final TestDuplexConnection testDuplexConnection = new TestDuplexConnection(allocator); @Override - public Mono connect(int mtu) { + public Mono connect() { return Mono.just(testDuplexConnection); } diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestServerTransport.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestServerTransport.java index 325496148..e7334f54a 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestServerTransport.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestServerTransport.java @@ -13,7 +13,7 @@ public class TestServerTransport implements ServerTransport { LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); @Override - public Mono start(ConnectionAcceptor acceptor, int mtu) { + public Mono start(ConnectionAcceptor acceptor) { conn.flatMap(acceptor::apply) .subscribe(ignored -> {}, err -> disposeConnection(), this::disposeConnection); return Mono.just( diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java index 2ab73116d..0653c3ceb 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java @@ -61,7 +61,7 @@ public static void main(String[] args) { if (in.headers().containsValue("Authorization", "test", true)) { DuplexConnection connection = new ReassemblyDuplexConnection( - new WebsocketDuplexConnection((Connection) in), false); + new WebsocketDuplexConnection((Connection) in)); return acceptor.apply(connection).then(out.neverComplete()); } diff --git a/rsocket-examples/src/test/java/io/rsocket/resume/DisconnectableClientTransport.java b/rsocket-examples/src/test/java/io/rsocket/resume/DisconnectableClientTransport.java index e29066f02..5824918bc 100644 --- a/rsocket-examples/src/test/java/io/rsocket/resume/DisconnectableClientTransport.java +++ b/rsocket-examples/src/test/java/io/rsocket/resume/DisconnectableClientTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,13 +33,13 @@ public DisconnectableClientTransport(ClientTransport clientTransport) { } @Override - public Mono connect(int mtu) { + public Mono connect() { return Mono.defer( () -> now() < nextConnectPermitMillis ? Mono.error(new ClosedChannelException()) : clientTransport - .connect(mtu) + .connect() .map( c -> { if (curConnection.compareAndSet(null, c)) { diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java index d69bd65e8..b80fc2337 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,12 +19,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; -import io.rsocket.fragmentation.FragmentationDuplexConnection; -import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.internal.UnboundedProcessor; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; -import io.rsocket.transport.local.LocalServerTransport.ServerDuplexConnectionAcceptor; import java.util.Objects; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; @@ -72,10 +69,11 @@ public static LocalClientTransport create(String name, ByteBufAllocator allocato return new LocalClientTransport(name, allocator); } - private Mono connect() { + @Override + public Mono connect() { return Mono.defer( () -> { - ServerDuplexConnectionAcceptor server = LocalServerTransport.findServer(name); + ServerTransport.ConnectionAcceptor server = LocalServerTransport.findServer(name); if (server == null) { return Mono.error(new IllegalArgumentException("Could not find server: " + name)); } @@ -84,25 +82,10 @@ private Mono connect() { UnboundedProcessor out = new UnboundedProcessor<>(); MonoProcessor closeNotifier = MonoProcessor.create(); - server.accept(new LocalDuplexConnection(allocator, out, in, closeNotifier)); + server.apply(new LocalDuplexConnection(allocator, out, in, closeNotifier)).subscribe(); return Mono.just( (DuplexConnection) new LocalDuplexConnection(allocator, in, out, closeNotifier)); }); } - - @Override - public Mono connect(int mtu) { - Mono isError = FragmentationDuplexConnection.checkMtu(mtu); - Mono connect = isError != null ? isError : connect(); - - return connect.map( - duplexConnection -> { - if (mtu > 0) { - return new FragmentationDuplexConnection(duplexConnection, mtu, false, "client"); - } else { - return new ReassemblyDuplexConnection(duplexConnection, false); - } - }); - } } diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java index 382b4533a..c07713cb3 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +17,12 @@ package io.rsocket.transport.local; import io.rsocket.Closeable; -import io.rsocket.DuplexConnection; -import io.rsocket.fragmentation.FragmentationDuplexConnection; -import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.util.annotation.Nullable; @@ -37,7 +33,7 @@ */ public final class LocalServerTransport implements ServerTransport { - private static final ConcurrentMap registry = + private static final ConcurrentMap registry = new ConcurrentHashMap<>(); private final String name; @@ -79,107 +75,64 @@ public static void dispose(String name) { } /** - * Retrieves an instance of {@link ServerDuplexConnectionAcceptor} based on the name of its {@code + * Retrieves an instance of {@link ConnectionAcceptor} based on the name of its {@code * LocalServerTransport}. Returns {@code null} if that server is not registered. * * @param name the name of the server to retrieve * @return the server if it has been registered, {@code null} otherwise * @throws NullPointerException if {@code name} is {@code null} */ - static @Nullable ServerDuplexConnectionAcceptor findServer(String name) { + static @Nullable ConnectionAcceptor findServer(String name) { Objects.requireNonNull(name, "name must not be null"); return registry.get(name); } + /** Return the name associated with this local server instance. */ + String getName() { + return name; + } + /** - * Returns a new {@link LocalClientTransport} that is connected to this {@code - * LocalServerTransport}. - * - * @return a new {@link LocalClientTransport} that is connected to this {@code - * LocalServerTransport} + * Return a new {@link LocalClientTransport} connected to this {@code LocalServerTransport} + * through its {@link #getName()}. */ public LocalClientTransport clientTransport() { return LocalClientTransport.create(name); } @Override - public Mono start(ConnectionAcceptor acceptor, int mtu) { + public Mono start(ConnectionAcceptor acceptor) { Objects.requireNonNull(acceptor, "acceptor must not be null"); - - Mono isError = FragmentationDuplexConnection.checkMtu(mtu); - return isError != null - ? isError - : Mono.create( - sink -> { - ServerDuplexConnectionAcceptor serverDuplexConnectionAcceptor = - new ServerDuplexConnectionAcceptor(name, acceptor, mtu); - - if (registry.putIfAbsent(name, serverDuplexConnectionAcceptor) != null) { - throw new IllegalStateException("name already registered: " + name); - } - - sink.success(serverDuplexConnectionAcceptor); - }); + return Mono.create( + sink -> { + ServerCloseable closeable = new ServerCloseable(name, acceptor); + if (registry.putIfAbsent(name, acceptor) != null) { + throw new IllegalStateException("name already registered: " + name); + } + sink.success(closeable); + }); } - /** - * Returns the name of this instance. - * - * @return the name of this instance - */ - String getName() { - return name; - } + static class ServerCloseable implements Closeable { - /** - * A {@link Consumer} of {@link DuplexConnection} that is called when a server has been created. - */ - static class ServerDuplexConnectionAcceptor implements Consumer, Closeable { + private final LocalSocketAddress address; private final ConnectionAcceptor acceptor; - private final LocalSocketAddress address; - private final MonoProcessor onClose = MonoProcessor.create(); - private final int mtu; - - /** - * Creates a new instance - * - * @param name the name of the server - * @param acceptor the {@link ConnectionAcceptor} to call when the server has been created - * @throws NullPointerException if {@code name} or {@code acceptor} is {@code null} - */ - ServerDuplexConnectionAcceptor(String name, ConnectionAcceptor acceptor, int mtu) { + ServerCloseable(String name, ConnectionAcceptor acceptor) { Objects.requireNonNull(name, "name must not be null"); - this.address = new LocalSocketAddress(name); - this.acceptor = Objects.requireNonNull(acceptor, "acceptor must not be null"); - this.mtu = mtu; - } - - @Override - public void accept(DuplexConnection duplexConnection) { - Objects.requireNonNull(duplexConnection, "duplexConnection must not be null"); - - if (mtu > 0) { - duplexConnection = - new FragmentationDuplexConnection(duplexConnection, mtu, false, "server"); - } else { - duplexConnection = new ReassemblyDuplexConnection(duplexConnection, false); - } - - acceptor.apply(duplexConnection).subscribe(); + this.acceptor = acceptor; } @Override public void dispose() { - if (!registry.remove(address.getName(), this)) { + if (!registry.remove(address.getName(), acceptor)) { throw new AssertionError(); } - onClose.onComplete(); } diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java index 4cfee9a01..ac4c13efe 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,8 +32,8 @@ void connect() { LocalServerTransport serverTransport = LocalServerTransport.createEphemeral(); serverTransport - .start(duplexConnection -> Mono.empty(), 0) - .flatMap(closeable -> LocalClientTransport.create(serverTransport.getName()).connect(0)) + .start(duplexConnection -> Mono.empty()) + .flatMap(closeable -> LocalClientTransport.create(serverTransport.getName()).connect()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -43,7 +43,7 @@ void connect() { @Test void connectNoServer() { LocalClientTransport.create("test-name") - .connect(0) + .connect() .as(StepVerifier::create) .verifyErrorMessage("Could not find server: test-name"); } diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalServerTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalServerTransportTest.java index 1656ed08d..ed906f65b 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalServerTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalServerTransportTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ void findServer() { LocalServerTransport serverTransport = LocalServerTransport.createEphemeral(); serverTransport - .start(duplexConnection -> Mono.empty(), 0) + .start(duplexConnection -> Mono.empty()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -97,7 +97,7 @@ void named() { @Test void start() { LocalServerTransport.createEphemeral() - .start(duplexConnection -> Mono.empty(), 0) + .start(duplexConnection -> Mono.empty()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -107,7 +107,7 @@ void start() { @Test void startNullAcceptor() { assertThatNullPointerException() - .isThrownBy(() -> LocalServerTransport.createEphemeral().start(null, 0)) + .isThrownBy(() -> LocalServerTransport.createEphemeral().start(null)) .withMessage("acceptor must not be null"); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index b7081593c..618708bf0 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,10 @@ public TcpDuplexConnection(Connection connection) { * * @param encodeLength indicates if this connection should encode the length or not. * @param connection the {@link Connection} to for managing the server + * @deprecated as of 1.0.1 in favor of using {@link #TcpDuplexConnection(Connection)} and hence + * {@code encodeLength} should always be true. */ + @Deprecated public TcpDuplexConnection(Connection connection, boolean encodeLength) { this.encodeLength = encodeLength; this.connection = Objects.requireNonNull(connection, "connection must not be null"); diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java index 22f139310..98feebdfc 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,6 @@ package io.rsocket.transport.netty.client; import io.rsocket.DuplexConnection; -import io.rsocket.fragmentation.FragmentationDuplexConnection; -import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; import io.rsocket.transport.netty.RSocketLengthCodec; @@ -93,21 +91,10 @@ public static TcpClientTransport create(TcpClient client) { } @Override - public Mono connect(int mtu) { - Mono isError = FragmentationDuplexConnection.checkMtu(mtu); - return isError != null - ? isError - : client - .doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec())) - .connect() - .map( - c -> { - if (mtu > 0) { - return new FragmentationDuplexConnection( - new TcpDuplexConnection(c, false), mtu, true, "client"); - } else { - return new ReassemblyDuplexConnection(new TcpDuplexConnection(c), false); - } - }); + public Mono connect() { + return client + .doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec())) + .connect() + .map(TcpDuplexConnection::new); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java index 747401210..9b8bea97a 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ import static io.rsocket.transport.netty.UriUtils.isSecure; import io.rsocket.DuplexConnection; -import io.rsocket.fragmentation.FragmentationDuplexConnection; -import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; import io.rsocket.transport.TransportHeaderAware; @@ -149,33 +147,19 @@ private static TcpClient createClient(URI uri) { } } - @Override - public Mono connect(int mtu) { - Mono isError = FragmentationDuplexConnection.checkMtu(mtu); - return isError != null - ? isError - : client - .headers(headers -> transportHeaders.get().forEach(headers::set)) - .websocket( - WebsocketClientSpec.builder().maxFramePayloadLength(FRAME_LENGTH_MASK).build()) - .uri(path) - .connect() - .map( - c -> { - DuplexConnection connection = new WebsocketDuplexConnection(c); - if (mtu > 0) { - connection = - new FragmentationDuplexConnection(connection, mtu, false, "client"); - } else { - connection = new ReassemblyDuplexConnection(connection, false); - } - return connection; - }); - } - @Override public void setTransportHeaders(Supplier> transportHeaders) { this.transportHeaders = Objects.requireNonNull(transportHeaders, "transportHeaders must not be null"); } + + @Override + public Mono connect() { + return client + .headers(headers -> transportHeaders.get().forEach(headers::set)) + .websocket(WebsocketClientSpec.builder().maxFramePayloadLength(FRAME_LENGTH_MASK).build()) + .uri(path) + .connect() + .map(WebsocketDuplexConnection::new); + } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java index 56dd59d45..311413a7d 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,6 @@ package io.rsocket.transport.netty.server; -import io.rsocket.DuplexConnection; -import io.rsocket.fragmentation.FragmentationDuplexConnection; -import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; import io.rsocket.transport.netty.RSocketLengthCodec; @@ -60,7 +57,6 @@ public static TcpServerTransport create(int port) { */ public static TcpServerTransport create(String bindAddress, int port) { Objects.requireNonNull(bindAddress, "bindAddress must not be null"); - TcpServer server = TcpServer.create().host(bindAddress).port(port); return create(server); } @@ -74,7 +70,6 @@ public static TcpServerTransport create(String bindAddress, int port) { */ public static TcpServerTransport create(InetSocketAddress address) { Objects.requireNonNull(address, "address must not be null"); - return create(address.getHostName(), address.getPort()); } @@ -87,34 +82,22 @@ public static TcpServerTransport create(InetSocketAddress address) { */ public static TcpServerTransport create(TcpServer server) { Objects.requireNonNull(server, "server must not be null"); - return new TcpServerTransport(server); } @Override - public Mono start(ConnectionAcceptor acceptor, int mtu) { + public Mono start(ConnectionAcceptor acceptor) { Objects.requireNonNull(acceptor, "acceptor must not be null"); - Mono isError = FragmentationDuplexConnection.checkMtu(mtu); - return isError != null - ? isError - : server - .doOnConnection( - c -> { - c.addHandlerLast(new RSocketLengthCodec()); - DuplexConnection connection; - if (mtu > 0) { - connection = - new FragmentationDuplexConnection( - new TcpDuplexConnection(c, false), mtu, true, "server"); - } else { - connection = new ReassemblyDuplexConnection(new TcpDuplexConnection(c), false); - } - acceptor - .apply(connection) - .then(Mono.never()) - .subscribe(c.disposeSubscriber()); - }) - .bind() - .map(CloseableChannel::new); + return server + .doOnConnection( + c -> { + c.addHandlerLast(new RSocketLengthCodec()); + acceptor + .apply(new TcpDuplexConnection(c)) + .then(Mono.never()) + .subscribe(c.disposeSubscriber()); + }) + .bind() + .map(CloseableChannel::new); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java index bd19f18b0..9543abaaa 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,6 @@ import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK; import io.rsocket.Closeable; -import io.rsocket.DuplexConnection; -import io.rsocket.fragmentation.FragmentationDuplexConnection; -import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.transport.ServerTransport; import io.rsocket.transport.netty.WebsocketDuplexConnection; import java.util.Objects; @@ -63,16 +60,15 @@ public WebsocketRouteTransport( } @Override - public Mono start(ConnectionAcceptor acceptor, int mtu) { + public Mono start(ConnectionAcceptor acceptor) { Objects.requireNonNull(acceptor, "acceptor must not be null"); - return server .route( routes -> { routesBuilder.accept(routes); routes.ws( path, - newHandler(acceptor, mtu), + newHandler(acceptor), WebsocketServerSpec.builder().maxFramePayloadLength(FRAME_LENGTH_MASK).build()); }) .bind() @@ -101,14 +97,7 @@ public static BiFunction> n */ public static BiFunction> newHandler( ConnectionAcceptor acceptor, int mtu) { - return (in, out) -> { - DuplexConnection connection = new WebsocketDuplexConnection((Connection) in); - if (mtu > 0) { - connection = new FragmentationDuplexConnection(connection, mtu, false, "server"); - } else { - connection = new ReassemblyDuplexConnection(connection, false); - } - return acceptor.apply(connection).then(out.neverComplete()); - }; + return (in, out) -> + acceptor.apply(new WebsocketDuplexConnection((Connection) in)).then(out.neverComplete()); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java index 1a0b32cf0..802a7f817 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,9 +18,6 @@ import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK; -import io.rsocket.DuplexConnection; -import io.rsocket.fragmentation.FragmentationDuplexConnection; -import io.rsocket.fragmentation.ReassemblyDuplexConnection; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; import io.rsocket.transport.TransportHeaderAware; @@ -74,7 +71,6 @@ public static WebsocketServerTransport create(int port) { */ public static WebsocketServerTransport create(String bindAddress, int port) { Objects.requireNonNull(bindAddress, "bindAddress must not be null"); - HttpServer httpServer = HttpServer.create().host(bindAddress).port(port); return create(httpServer); } @@ -88,7 +84,6 @@ public static WebsocketServerTransport create(String bindAddress, int port) { */ public static WebsocketServerTransport create(InetSocketAddress address) { Objects.requireNonNull(address, "address must not be null"); - return create(address.getHostName(), address.getPort()); } @@ -101,7 +96,6 @@ public static WebsocketServerTransport create(InetSocketAddress address) { */ public static WebsocketServerTransport create(final HttpServer server) { Objects.requireNonNull(server, "server must not be null"); - return new WebsocketServerTransport(server); } @@ -112,33 +106,20 @@ public void setTransportHeaders(Supplier> transportHeaders) } @Override - public Mono start(ConnectionAcceptor acceptor, int mtu) { + public Mono start(ConnectionAcceptor acceptor) { Objects.requireNonNull(acceptor, "acceptor must not be null"); - - Mono isError = FragmentationDuplexConnection.checkMtu(mtu); - return isError != null - ? isError - : server - .handle( - (request, response) -> { - transportHeaders.get().forEach(response::addHeader); - return response.sendWebsocket( - (in, out) -> { - DuplexConnection connection = - new WebsocketDuplexConnection((Connection) in); - if (mtu > 0) { - connection = - new FragmentationDuplexConnection(connection, mtu, false, "server"); - } else { - connection = new ReassemblyDuplexConnection(connection, false); - } - return acceptor.apply(connection).then(out.neverComplete()); - }, - WebsocketServerSpec.builder() - .maxFramePayloadLength(FRAME_LENGTH_MASK) - .build()); - }) - .bind() - .map(CloseableChannel::new); + return server + .handle( + (request, response) -> { + transportHeaders.get().forEach(response::addHeader); + return response.sendWebsocket( + (in, out) -> + acceptor + .apply(new WebsocketDuplexConnection((Connection) in)) + .then(out.neverComplete()), + WebsocketServerSpec.builder().maxFramePayloadLength(FRAME_LENGTH_MASK).build()); + }) + .bind() + .map(CloseableChannel::new); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/TcpClientTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/TcpClientTransportTest.java index e0bdb9cd7..9f4f6d3dc 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/TcpClientTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/TcpClientTransportTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,8 +37,8 @@ void connect() { TcpServerTransport serverTransport = TcpServerTransport.create(address); serverTransport - .start(duplexConnection -> Mono.empty(), 0) - .flatMap(context -> TcpClientTransport.create(context.address()).connect(0)) + .start(duplexConnection -> Mono.empty()) + .flatMap(context -> TcpClientTransport.create(context.address()).connect()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -47,7 +47,7 @@ void connect() { @DisplayName("create generates error if server not started") @Test void connectNoServer() { - TcpClientTransport.create(8000).connect(0).as(StepVerifier::create).verifyError(); + TcpClientTransport.create(8000).connect().as(StepVerifier::create).verifyError(); } @DisplayName("creates client with BindAddress") diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/WebsocketClientTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/WebsocketClientTransportTest.java index fc035c536..f94229848 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/WebsocketClientTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/WebsocketClientTransportTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package io.rsocket.transport.netty.client; -import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNullPointerException; @@ -24,13 +23,9 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.Collections; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -39,52 +34,6 @@ @ExtendWith(MockitoExtension.class) final class WebsocketClientTransportTest { - @Test - @Disabled - public void testThatSetupWithUnSpecifiedFrameSizeShouldSetMaxFrameSize() { - ArgumentCaptor captor = ArgumentCaptor.forClass(Integer.class); - HttpClient httpClient = Mockito.spy(HttpClient.create()); - Mockito.doAnswer(a -> httpClient).when(httpClient).headers(Mockito.any()); - Mockito.doCallRealMethod().when(httpClient).websocket(captor.capture()); - - WebsocketClientTransport clientTransport = WebsocketClientTransport.create(httpClient, ""); - - clientTransport.connect(0).subscribe(); - - Assertions.assertThat(captor.getValue()).isEqualTo(FRAME_LENGTH_MASK); - } - - @Test - @Disabled - public void testThatSetupWithSpecifiedFrameSizeButLowerThanWsDefaultShouldSetToWsDefault() { - ArgumentCaptor captor = ArgumentCaptor.forClass(Integer.class); - HttpClient httpClient = Mockito.spy(HttpClient.create()); - Mockito.doAnswer(a -> httpClient).when(httpClient).headers(Mockito.any()); - Mockito.doCallRealMethod().when(httpClient).websocket(captor.capture()); - - WebsocketClientTransport clientTransport = WebsocketClientTransport.create(httpClient, ""); - - clientTransport.connect(65536 - 10000).subscribe(); - - Assertions.assertThat(captor.getValue()).isEqualTo(FRAME_LENGTH_MASK); - } - - @Test - @Disabled - public void - testThatSetupWithSpecifiedFrameSizeButHigherThanWsDefaultShouldSetToSpecifiedFrameSize() { - ArgumentCaptor captor = ArgumentCaptor.forClass(Integer.class); - HttpClient httpClient = Mockito.spy(HttpClient.create()); - Mockito.doAnswer(a -> httpClient).when(httpClient).headers(Mockito.any()); - Mockito.doCallRealMethod().when(httpClient).websocket(captor.capture()); - - WebsocketClientTransport clientTransport = WebsocketClientTransport.create(httpClient, ""); - - clientTransport.connect(65536 + 10000).subscribe(); - - Assertions.assertThat(captor.getValue()).isEqualTo(FRAME_LENGTH_MASK); - } - @DisplayName("connects to server") @Test void connect() { @@ -93,8 +42,8 @@ void connect() { WebsocketServerTransport serverTransport = WebsocketServerTransport.create(address); serverTransport - .start(duplexConnection -> Mono.empty(), 0) - .flatMap(context -> WebsocketClientTransport.create(context.address()).connect(0)) + .start(duplexConnection -> Mono.empty()) + .flatMap(context -> WebsocketClientTransport.create(context.address()).connect()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -103,7 +52,7 @@ void connect() { @DisplayName("create generates error if server not started") @Test void connectNoServer() { - WebsocketClientTransport.create(8000).connect(0).as(StepVerifier::create).verifyError(); + WebsocketClientTransport.create(8000).connect().as(StepVerifier::create).verifyError(); } @DisplayName("creates client with BindAddress") diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/TcpServerTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/TcpServerTransportTest.java index b6cbfea34..709c9fa62 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/TcpServerTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/TcpServerTransportTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -87,7 +87,7 @@ void start() { TcpServerTransport serverTransport = TcpServerTransport.create(address); serverTransport - .start(duplexConnection -> Mono.empty(), 0) + .start(duplexConnection -> Mono.empty()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -97,7 +97,7 @@ void start() { @Test void startNullAcceptor() { assertThatNullPointerException() - .isThrownBy(() -> TcpServerTransport.create("localhost", 8000).start(null, 0)) + .isThrownBy(() -> TcpServerTransport.create("localhost", 8000).start(null)) .withMessage("acceptor must not be null"); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketRouteTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketRouteTransportTest.java index e94bef13c..2670b4a4b 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketRouteTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketRouteTransportTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ void start() { new WebsocketRouteTransport(HttpServer.create(), routes -> {}, "/test-path"); serverTransport - .start(duplexConnection -> Mono.empty(), 0) + .start(duplexConnection -> Mono.empty()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -76,7 +76,7 @@ void startNullAcceptor() { .isThrownBy( () -> new WebsocketRouteTransport(HttpServer.create(), routes -> {}, "/test-path") - .start(null, 0)) + .start(null)) .withMessage("acceptor must not be null"); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketServerTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketServerTransportTest.java index 249a3e12a..5ac2e05fb 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketServerTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketServerTransportTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,50 +44,7 @@ public void testThatSetupWithUnSpecifiedFrameSizeShouldSetMaxFrameSize() { WebsocketServerTransport serverTransport = WebsocketServerTransport.create(httpServer); - serverTransport.start(c -> Mono.empty(), 0).subscribe(); - - HttpServerRequest httpServerRequest = Mockito.mock(HttpServerRequest.class); - HttpServerResponse httpServerResponse = Mockito.mock(HttpServerResponse.class); - - captor.getValue().apply(httpServerRequest, httpServerResponse); - - Mockito.verify(httpServerResponse) - .sendWebsocket( - Mockito.nullable(String.class), Mockito.eq(FRAME_LENGTH_MASK), Mockito.any()); - } - - // @Test - public void testThatSetupWithSpecifiedFrameSizeButLowerThanWsDefaultShouldSetToWsDefault() { - ArgumentCaptor captor = ArgumentCaptor.forClass(BiFunction.class); - HttpServer httpServer = Mockito.spy(HttpServer.create()); - Mockito.doAnswer(a -> httpServer).when(httpServer).handle(captor.capture()); - Mockito.doAnswer(a -> Mono.empty()).when(httpServer).bind(); - - WebsocketServerTransport serverTransport = WebsocketServerTransport.create(httpServer); - - serverTransport.start(c -> Mono.empty(), 1000).subscribe(); - - HttpServerRequest httpServerRequest = Mockito.mock(HttpServerRequest.class); - HttpServerResponse httpServerResponse = Mockito.mock(HttpServerResponse.class); - - captor.getValue().apply(httpServerRequest, httpServerResponse); - - Mockito.verify(httpServerResponse) - .sendWebsocket( - Mockito.nullable(String.class), Mockito.eq(FRAME_LENGTH_MASK), Mockito.any()); - } - - // @Test - public void - testThatSetupWithSpecifiedFrameSizeButHigherThanWsDefaultShouldSetToSpecifiedFrameSize() { - ArgumentCaptor captor = ArgumentCaptor.forClass(BiFunction.class); - HttpServer httpServer = Mockito.spy(HttpServer.create()); - Mockito.doAnswer(a -> httpServer).when(httpServer).handle(captor.capture()); - Mockito.doAnswer(a -> Mono.empty()).when(httpServer).bind(); - - WebsocketServerTransport serverTransport = WebsocketServerTransport.create(httpServer); - - serverTransport.start(c -> Mono.empty(), 65536 + 1000).subscribe(); + serverTransport.start(c -> Mono.empty()).subscribe(); HttpServerRequest httpServerRequest = Mockito.mock(HttpServerRequest.class); HttpServerResponse httpServerResponse = Mockito.mock(HttpServerResponse.class); @@ -172,7 +129,7 @@ void start() { WebsocketServerTransport serverTransport = WebsocketServerTransport.create(address); serverTransport - .start(duplexConnection -> Mono.empty(), 0) + .start(duplexConnection -> Mono.empty()) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -182,7 +139,7 @@ void start() { @Test void startNullAcceptor() { assertThatNullPointerException() - .isThrownBy(() -> WebsocketServerTransport.create(8000).start(null, 0)) + .isThrownBy(() -> WebsocketServerTransport.create(8000).start(null)) .withMessage("acceptor must not be null"); } }