Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing onComplete from requestChannel #613

Closed
rstoyanchev opened this issue Apr 9, 2019 · 3 comments · Fixed by #736
Closed

Missing onComplete from requestChannel #613

rstoyanchev opened this issue Apr 9, 2019 · 3 comments · Fixed by #736
Assignees
Labels
bug superseded Issue is superseded by another

Comments

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Apr 9, 2019

We've some periodically failing tests. I modified one of the rsocket-java samples to demonstrate. It sends 10 messages, expects them echoed back, followed by onComplete. It repeats this in a loop up to 1000 times. Once in a while it fails to get onComplete after the 10 items are echoed back.

public final class ChannelEchoClient {

  public static void main(String[] args) {
    RSocketFactory.receive()
        .acceptor(new SocketAcceptorImpl())
        .transport(TcpServerTransport.create("localhost", 7000))
        .start()
        .subscribe();

    RSocket socket =
        RSocketFactory.connect()
            .transport(TcpClientTransport.create("localhost", 7000))
            .start()
            .block();

    for (int i=0; i < 1000; i++) {
        System.out.println("Iteration: " + (i + 1));
        Flux<String> result = socket
                .requestChannel(Flux.range(1, 10).map(idx -> DefaultPayload.create("Hello[" + idx + "]")))
                .map(Payload::getDataUtf8)
                .log();

        StepVerifier.create(result)
                .expectNext("Echo: Hello[1]").expectNextCount(8).expectNext("Echo: Hello[10]")
                .expectComplete()
                .verify(Duration.ofSeconds(5));
    }
  }

  private static class SocketAcceptorImpl implements SocketAcceptor {
    @Override
    public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
      return Mono.just(
          new AbstractRSocket() {
            @Override
            public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
              return Flux.from(payloads)
                  .delayElements(Duration.ofMillis(10))
                  .map(Payload::getDataUtf8)
                  .map(s -> "Echo: " + s)
                  .map(DefaultPayload::create);
            }
          });
    }
  }
}

I don't know if the expectation is valid but it seems to work that way most of the time. I want to say the failures started 2-3 weeks ago, possibly around the time we upgraded to 0.12.1-RC3 snapshots from 0.11.17 but I'm not 100% sure since our CI server only keeps 100 builds. The oldest failures I could find are from 2 weeks ago.

rstoyanchev added a commit to spring-projects/spring-framework that referenced this issue Apr 9, 2019
1) Tests use a timeout to avoid hanging issues
2) Some tests adjusted to work around potential rsocket-java issue
rsocket/rsocket-java#613
@OlegDokuka OlegDokuka self-assigned this Apr 9, 2019
@spencergibb
Copy link
Contributor

I think I get a similar thing every so often in ci builds.

@robertroeser
Copy link
Member

#618 should fix this issue

@lksvenoy-r7
Copy link

See #641 as I am observing this issue in 0.12.2-RC2 (not locally, but externally)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug superseded Issue is superseded by another
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants