Skip to content

Commit

Permalink
Update WebSocketHeadersSample (#839)
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev authored May 15, 2020
1 parent d8da87a commit 1181272
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,117 +17,85 @@
package io.rsocket.examples.transport.ws;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.fragmentation.ReassemblyDuplexConnection;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.WebsocketDuplexConnection;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.ByteBufPayload;
import java.time.Duration;
import java.util.HashMap;
import org.reactivestreams.Publisher;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class WebSocketHeadersSample {
static final Payload payload1 = ByteBufPayload.create("Hello ");

private static final Logger logger = LoggerFactory.getLogger(WebSocketHeadersSample.class);

public static void main(String[] args) {

ServerTransport.ConnectionAcceptor acceptor =
RSocketServer.create(SocketAcceptor.with(new ServerRSocket()))
ServerTransport.ConnectionAcceptor connectionAcceptor =
RSocketServer.create(SocketAcceptor.forRequestResponse(Mono::just))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.asConnectionAcceptor();

DisposableServer disposableServer =
DisposableServer server =
HttpServer.create()
.host("localhost")
.port(0)
.route(
routes ->
routes.ws(
routes.get(
"/",
(in, out) -> {
if (in.headers().containsValue("Authorization", "test", true)) {
DuplexConnection connection =
new ReassemblyDuplexConnection(
new WebsocketDuplexConnection((Connection) in));
return acceptor.apply(connection).then(out.neverComplete());
(req, res) -> {
if (req.requestHeaders().containsValue("Authorization", "test", true)) {
return res.sendWebsocket(
(in, out) ->
connectionAcceptor
.apply(new WebsocketDuplexConnection((Connection) in))
.then(out.neverComplete()));
}

return out.sendClose(
HttpResponseStatus.UNAUTHORIZED.code(),
HttpResponseStatus.UNAUTHORIZED.reasonPhrase());
res.status(HttpResponseStatus.UNAUTHORIZED);
return res.send();
}))
.bindNow();

logger.debug(
"\n\nStart of Authorized WebSocket Connection\n----------------------------------\n");

WebsocketClientTransport clientTransport =
WebsocketClientTransport.create(disposableServer.host(), disposableServer.port());
WebsocketClientTransport.create(server.host(), server.port());

clientTransport.setTransportHeaders(
() -> {
HashMap<String, String> map = new HashMap<>();
map.put("Authorization", "test");
return map;
});
clientTransport.setTransportHeaders(() -> Collections.singletonMap("Authorization", "test"));

RSocket socket =
RSocket clientRSocket =
RSocketConnector.create()
.keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(clientTransport)
.block();

Flux.range(0, 100)
.concatMap(i -> socket.fireAndForget(payload1.retain()))
// .doOnNext(p -> {
//// System.out.println(p.getDataUtf8());
// p.release();
// })
Flux.range(1, 100)
.concatMap(i -> clientRSocket.requestResponse(ByteBufPayload.create("Hello " + i)))
.doOnNext(payload -> logger.debug("Processed " + payload.getDataUtf8()))
.blockLast();
socket.dispose();

WebsocketClientTransport clientTransport2 =
WebsocketClientTransport.create(disposableServer.host(), disposableServer.port());

RSocket rSocket =
RSocketConnector.create()
.keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(clientTransport2)
.block();

// expect error here because of closed channel
rSocket.requestResponse(payload1).block();
}

private static class ServerRSocket implements RSocket {

@Override
public Mono<Void> fireAndForget(Payload payload) {
// System.out.println(payload.getDataUtf8());
payload.release();
return Mono.empty();
}
clientRSocket.dispose();

@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just(payload);
}
logger.debug(
"\n\nStart of Unauthorized WebSocket Upgrade\n----------------------------------\n");

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).subscribeOn(Schedulers.single());
}
RSocketConnector.create()
.keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(WebsocketClientTransport.create(server.host(), server.port()))
.block();
}
}
3 changes: 1 addition & 2 deletions rsocket-examples/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
</appender>

<logger name="io.rsocket.examples" level="DEBUG"/>

<!-- Set this to DEBUG to log frames -->
<logger name="io.rsocket.FrameLogger" level="INFO"/>
<logger name="reactor.netty" level="INFO"/>

<root level="INFO">
<appender-ref ref="STDOUT"/>
Expand Down

0 comments on commit 1181272

Please sign in to comment.