Skip to content

Commit

Permalink
Apply FragmentationDuplexConnection from a single place (#836)
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev authored May 15, 2020
1 parent 585b5ef commit d8da87a
Show file tree
Hide file tree
Showing 32 changed files with 197 additions and 486 deletions.
19 changes: 10 additions & 9 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.fragmentation.FragmentationDuplexConnection;
import io.rsocket.fragmentation.ReassemblyDuplexConnection;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.ClientServerInputMultiplexer;
Expand Down Expand Up @@ -400,14 +401,7 @@ public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
* and Reassembly</a>
*/
public RSocketConnector fragment(int mtu) {
if (mtu > 0 && mtu < FragmentationDuplexConnection.MIN_MTU_SIZE || mtu < 0) {
String msg =
String.format(
"The smallest allowed mtu size is %d bytes, provided: %d",
FragmentationDuplexConnection.MIN_MTU_SIZE, mtu);
throw new IllegalArgumentException(msg);
}
this.mtu = mtu;
this.mtu = FragmentationDuplexConnection.assertMtu(mtu);
return this;
}

Expand Down Expand Up @@ -468,8 +462,15 @@ public Mono<RSocket> connect(ClientTransport transport) {
* @return a {@code Mono} with the connected RSocket
*/
public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {

Mono<DuplexConnection> connectionMono =
Mono.fromSupplier(transportSupplier).flatMap(t -> t.connect(mtu));
Mono.fromSupplier(transportSupplier)
.flatMap(ClientTransport::connect)
.map(
connection ->
mtu > 0
? new FragmentationDuplexConnection(connection, mtu, "client")
: new ReassemblyDuplexConnection(connection));
return connectionMono
.flatMap(
connection -> {
Expand Down
17 changes: 8 additions & 9 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.rsocket.exceptions.InvalidSetupException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.fragmentation.FragmentationDuplexConnection;
import io.rsocket.fragmentation.ReassemblyDuplexConnection;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
Expand Down Expand Up @@ -211,14 +212,7 @@ public RSocketServer lease(Supplier<Leases<?>> supplier) {
* and Reassembly</a>
*/
public RSocketServer fragment(int mtu) {
if (mtu > 0 && mtu < FragmentationDuplexConnection.MIN_MTU_SIZE || mtu < 0) {
String msg =
String.format(
"The smallest allowed mtu size is %d bytes, provided: %d",
FragmentationDuplexConnection.MIN_MTU_SIZE, mtu);
throw new IllegalArgumentException(msg);
}
this.mtu = mtu;
this.mtu = FragmentationDuplexConnection.assertMtu(mtu);
return this;
}

Expand Down Expand Up @@ -273,7 +267,7 @@ public <T extends Closeable> Mono<T> bind(ServerTransport<T> transport) {
@Override
public Mono<T> get() {
return transport
.start(duplexConnection -> acceptor(serverSetup, duplexConnection), mtu)
.start(duplexConnection -> acceptor(serverSetup, duplexConnection))
.doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe());
}
});
Expand Down Expand Up @@ -305,6 +299,11 @@ public Mono<Void> apply(DuplexConnection connection) {
}

private Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection connection) {
connection =
mtu > 0
? new FragmentationDuplexConnection(connection, mtu, "server")
: new ReassemblyDuplexConnection(connection);

ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(connection, interceptors, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import io.netty.buffer.ByteBufUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameLengthCodec;
import io.rsocket.frame.FrameType;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* A {@link DuplexConnection} implementation that fragments and reassembles {@link ByteBuf}s.
Expand All @@ -46,15 +44,19 @@ public final class FragmentationDuplexConnection extends ReassemblyDuplexConnect
private final DuplexConnection delegate;
private final int mtu;
private final FrameReassembler frameReassembler;
private final boolean encodeLength;
private final String type;

public FragmentationDuplexConnection(
DuplexConnection delegate, int mtu, boolean encodeAndEncodeLength, String type) {
super(delegate, encodeAndEncodeLength);
/**
* Class constructor.
*
* @param delegate the underlying connection
* @param mtu the fragment size, greater than {@link #MIN_MTU_SIZE}
* @param type a label to use for logging purposes
*/
public FragmentationDuplexConnection(DuplexConnection delegate, int mtu, String type) {
super(delegate);

Objects.requireNonNull(delegate, "delegate must not be null");
this.encodeLength = encodeAndEncodeLength;
this.delegate = delegate;
this.mtu = assertMtu(mtu);
this.frameReassembler = new FrameReassembler(delegate.alloc());
Expand All @@ -67,32 +69,17 @@ private boolean shouldFragment(FrameType frameType, int readableBytes) {
return frameType.isFragmentable() && readableBytes > mtu;
}

/*TODO this is nullable and not returning empty to workaround javac 11.0.3 compiler issue on ubuntu (at least) */
@Nullable
public static <T> Mono<T> checkMtu(int mtu) {
if (isInsufficientMtu(mtu)) {
public static int assertMtu(int mtu) {
if (mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0) {
String msg =
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
return Mono.error(new IllegalArgumentException(msg));
} else {
return null;
}
}

private static int assertMtu(int mtu) {
if (isInsufficientMtu(mtu)) {
String msg =
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
String.format(
"The smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
throw new IllegalArgumentException(msg);
} else {
return mtu;
}
}

private static boolean isInsufficientMtu(int mtu) {
return mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0;
}

@Override
public Mono<Void> send(Publisher<ByteBuf> frames) {
return Flux.from(frames).concatMap(this::sendOne).then();
Expand All @@ -102,34 +89,22 @@ public Mono<Void> send(Publisher<ByteBuf> frames) {
public Mono<Void> sendOne(ByteBuf frame) {
FrameType frameType = FrameHeaderCodec.frameType(frame);
int readableBytes = frame.readableBytes();
if (shouldFragment(frameType, readableBytes)) {
if (logger.isDebugEnabled()) {
return delegate.send(
Flux.from(fragmentFrame(alloc(), mtu, frame, frameType, encodeLength))
.doOnNext(
byteBuf -> {
ByteBuf f = encodeLength ? FrameLengthCodec.frame(byteBuf) : byteBuf;
logger.debug(
"{} - stream id {} - frame type {} - \n {}",
type,
FrameHeaderCodec.streamId(f),
FrameHeaderCodec.frameType(f),
ByteBufUtil.prettyHexDump(f));
}));
} else {
return delegate.send(
Flux.from(fragmentFrame(alloc(), mtu, frame, frameType, encodeLength)));
}
} else {
return delegate.sendOne(encode(frame));
if (!shouldFragment(frameType, readableBytes)) {
return delegate.sendOne(frame);
}
}

private ByteBuf encode(ByteBuf frame) {
if (encodeLength) {
return FrameLengthCodec.encode(alloc(), frame.readableBytes(), frame);
} else {
return frame;
Flux<ByteBuf> fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType));
if (logger.isDebugEnabled()) {
fragments =
fragments.doOnNext(
byteBuf -> {
logger.debug(
"{} - stream id {} - frame type {} - \n {}",
type,
FrameHeaderCodec.streamId(byteBuf),
FrameHeaderCodec.frameType(byteBuf),
ByteBufUtil.prettyHexDump(byteBuf));
});
}
return delegate.send(fragments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameLengthCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.frame.RequestChannelFrameCodec;
Expand All @@ -42,11 +41,7 @@
*/
final class FrameFragmenter {
static Publisher<ByteBuf> fragmentFrame(
ByteBufAllocator allocator,
int mtu,
final ByteBuf frame,
FrameType frameType,
boolean encodeLength) {
ByteBufAllocator allocator, int mtu, final ByteBuf frame, FrameType frameType) {
ByteBuf metadata = getMetadata(frame, frameType);
ByteBuf data = getData(frame, frameType);
int streamId = FrameHeaderCodec.streamId(frame);
Expand All @@ -66,7 +61,7 @@ public void accept(SynchronousSink<ByteBuf> sink) {
byteBuf = encodeFollowsFragment(allocator, mtu, streamId, metadata, data);
}

sink.next(encode(allocator, byteBuf, encodeLength));
sink.next(byteBuf);
if (!metadata.isReadable() && !data.isReadable()) {
sink.complete();
}
Expand Down Expand Up @@ -237,12 +232,4 @@ static ByteBuf getData(ByteBuf frame, FrameType frameType) {
}
return data;
}

static ByteBuf encode(ByteBufAllocator allocator, ByteBuf frame, boolean encodeLength) {
if (encodeLength) {
return FrameLengthCodec.encode(allocator, frame.readableBytes(), frame);
} else {
return frame;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameLengthCodec;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand All @@ -35,11 +34,10 @@
public class ReassemblyDuplexConnection implements DuplexConnection {
private final DuplexConnection delegate;
private final FrameReassembler frameReassembler;
private final boolean decodeLength;

public ReassemblyDuplexConnection(DuplexConnection delegate, boolean decodeLength) {
/** Constructor with the underlying delegate to receive frames from. */
public ReassemblyDuplexConnection(DuplexConnection delegate) {
Objects.requireNonNull(delegate, "delegate must not be null");
this.decodeLength = decodeLength;
this.delegate = delegate;
this.frameReassembler = new FrameReassembler(delegate.alloc());

Expand All @@ -56,23 +54,9 @@ public Mono<Void> sendOne(ByteBuf frame) {
return delegate.sendOne(frame);
}

private ByteBuf decode(ByteBuf frame) {
if (decodeLength) {
return FrameLengthCodec.frame(frame).retain();
} else {
return frame;
}
}

@Override
public Flux<ByteBuf> receive() {
return delegate
.receive()
.handle(
(byteBuf, sink) -> {
ByteBuf decode = decode(byteBuf);
frameReassembler.reassembleFrame(decode, sink);
});
return delegate.receive().handle(frameReassembler::reassembleFrame);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,11 +23,9 @@
public interface ClientTransport extends Transport {

/**
* Returns a {@code Publisher}, every subscription to which returns a single {@code
* DuplexConnection}.
* Return a {@code Mono} that connects for each subscriber.
*
* @param mtu The mtu used for fragmentation - if set to zero fragmentation will be disabled
* @return {@code Publisher}, every subscription returns a single {@code DuplexConnection}.
* @since 1.0.1
*/
Mono<DuplexConnection> connect(int mtu);
Mono<DuplexConnection> connect();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,14 +26,13 @@
public interface ServerTransport<T extends Closeable> extends Transport {

/**
* Starts this server.
* Start this server.
*
* @param acceptor An acceptor to process a newly accepted {@code DuplexConnection}
* @param mtu The mtu used for fragmentation - if set to zero fragmentation will be disabled
* @return A handle to retrieve information about a started server.
* @throws NullPointerException if {@code acceptor} is {@code null}
* @param acceptor to process a newly accepted connections with
* @return A handle for information about and control over the server.
* @since 1.0.1
*/
Mono<T> start(ConnectionAcceptor acceptor, int mtu);
Mono<T> start(ConnectionAcceptor acceptor);

/** A contract to accept a new {@code DuplexConnection}. */
interface ConnectionAcceptor extends Function<DuplexConnection, Publisher<Void>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public void shouldBeASharedReconnectableInstanceOfRSocketMono() {
@SuppressWarnings({"rawtype", "unchecked"})
public void shouldBeRetrieableConnectionSharedReconnectableInstanceOfRSocketMono() {
ClientTransport transport = Mockito.mock(ClientTransport.class);
Mockito.when(transport.connect(0))
Mockito.when(transport.connect())
.thenThrow(UncheckedIOException.class)
.thenThrow(UncheckedIOException.class)
.thenThrow(UncheckedIOException.class)
.thenThrow(UncheckedIOException.class)
.thenReturn(new TestClientTransport().connect(0));
.thenReturn(new TestClientTransport().connect());
Mono<RSocket> rSocketMono =
RSocketConnector.create()
.reconnect(
Expand All @@ -93,13 +93,13 @@ public void shouldBeRetrieableConnectionSharedReconnectableInstanceOfRSocketMono
@SuppressWarnings({"rawtype", "unchecked"})
public void shouldBeExaustedRetrieableConnectionSharedReconnectableInstanceOfRSocketMono() {
ClientTransport transport = Mockito.mock(ClientTransport.class);
Mockito.when(transport.connect(0))
Mockito.when(transport.connect())
.thenThrow(UncheckedIOException.class)
.thenThrow(UncheckedIOException.class)
.thenThrow(UncheckedIOException.class)
.thenThrow(UncheckedIOException.class)
.thenThrow(UncheckedIOException.class)
.thenReturn(new TestClientTransport().connect(0));
.thenReturn(new TestClientTransport().connect());
Mono<RSocket> rSocketMono =
RSocketConnector.create()
.reconnect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private static class SingleConnectionTransport implements ServerTransport<TestCl
private final TestDuplexConnection conn = new TestDuplexConnection(allocator);

@Override
public Mono<TestCloseable> start(ConnectionAcceptor acceptor, int mtu) {
public Mono<TestCloseable> start(ConnectionAcceptor acceptor) {
return Mono.just(new TestCloseable(acceptor, conn));
}

Expand Down
Loading

0 comments on commit d8da87a

Please sign in to comment.