Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provides Prioritised delivering of Zero Streams Frames #718

Merged
merged 6 commits into from
Feb 22, 2020

Conversation

OlegDokuka
Copy link
Member

@OlegDokuka OlegDokuka commented Nov 19, 2019

Motivation

In the current implementation, RSocketRequester sends all the data over the UnboundedProcessor which in a nutshell is a MpScUnboundedArrayQueue which in turn does not have any prioritization mechanism. In general, there is no need to have such functionality unless it comes to delivering critical internal frames/payloads such as KeepAlive Frame or Leases Frame, which SHOULD be delivered as soon as possible. The problem comes when UnboundedProcessor is overwhelmed by other packets:

RSocket requester = ...;
while (true) {
	requester.fireAndForget(...).subscribe();
	requester.requestResponse(...).subscribe();
}

The above example shows how the client can quickly overwhelm its own queue so all the other packets as KEEPALIVE or LEASE will be simply stacked at the very top and be delivered with a significant delay. Especially, KEEPALIVE can simply cause an unwonted cancelation of the alive connection.

Proposal

To make sure that all the critical frames are delivered as soon as possible, we can add a kind of priority channel, or directly talking we can add a separate MpScUnboundedArrayQueue inside the UnboundedProcessor as a way for Zero Stream frames prioritized delivering.

In turn, under the hood, the Processor will be drained as in the following example:

void drainRegular(Subscriber<? super T> a) {
    int missed = 1;

    final Queue<T> q = queue;
    final Queue<T> pq = priorityQueue;

    for (; ; ) {

      long r = requested;
      long e = 0L;

      while (r != e) {
        boolean d = done;

        T t;
        boolean empty;

        if (!pq.isEmpty()) {
          t = pq.poll();
          empty = false;
        } else {
          t = q.poll();
          empty = t == null;
        }

        if (checkTerminated(d, empty, a, q, pq)) {
          return;
        }

        if (empty) {
          break;
        }

        a.onNext(t);

        e++;
      }

      if (r == e) {
        if (checkTerminated(done, q.isEmpty() && pq.isEmpty(), a, q, pq)) {
          return;
        }
      }

      if (e != 0 && r != Long.MAX_VALUE) {
        REQUESTED.addAndGet(this, -e);
      }

      missed = WIP.addAndGet(this, -missed);
      if (missed == 0) {
        break;
      }
    }
  }

Benchmarks

The benchmark has shown that the performance impact is insignificant (within a couple of percents when it comes to standalone UnboundedProcessor measurements), and no difference was observed for the standard E2e RSocket test.

@@ -110,7 +110,7 @@
new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout);
this.keepAliveFramesAcceptor =
keepAliveHandler.start(
keepAliveSupport, sendProcessor::onNext, this::tryTerminateOnKeepAlive);
keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should tighten this up in the spec, whether it's stream 0 prioritised or KEEPALIVE and LEASE only as you have implemented. I'll generally defer to @robertroeser for this, but put my thoughts on the issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. I have already created an issue on that!

}

@Override
public int getBufferSize() {
return Queues.capacity(this.queue);
return Integer.MAX_VALUE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

@@ -321,23 +346,29 @@ public void cancel() {

@Override
public T peek() {
if (!priorityQueue.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks racy (checking before calling peek) after the change, does Reactor promise this won't be concurrent code in practice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this code is never used in reactor. Thus just a forced Queue API. But indeed, it is racy in I guess i could do nothing to that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant Reactor in the sense it is a FluxProcessor and Fuseable.QueueSubscription, and are there guarantees within the Reactor framework that indicate the threading model is safe here? It sounds like you are saying it's known safe within our rsocket project.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, by default it is


	interface QueueSubscription<T> extends Queue<T>, Subscription {
		
		String NOT_SUPPORTED_MESSAGE = "Although QueueSubscription extends Queue it is purely internal" +
				" and only guarantees support for poll/clear/size/isEmpty." +
				" Instances shouldn't be used/exposed as Queue outside of Reactor operators.";

		...

		
		@Override
		@Nullable
		default T peek() {
			throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
		}

and as it is written in the error message only poll supposed to be used. So I don't think it matters at all and the best I can do is removing the peek operation at all.

@simonbasle @smaldini can you please correct me if I'm wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are correct @OlegDokuka

@He-Pin
Copy link

He-Pin commented Dec 5, 2019

Akka has ControlAwaredMailbox for this too.

@OlegDokuka OlegDokuka force-pushed the bugfix/prioritization branch 2 times, most recently from c3fb896 to 0ee435e Compare December 16, 2019 21:01
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
…arks

Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>

Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
@linux-china
Copy link
Contributor

Any news about this feature?

@mostroverkhov
Copy link
Member

@linux-china why did you need this? It is flawed - and RSocket as protocol has everything to guarantee outgoing queue is not growing unbounded

@linux-china
Copy link
Contributor

linux-china commented Mar 1, 2020

@mostroverkhov Now we use metadataPush to push some critical information, such as configuration for spring boot app, broker cluster changing, app status changing. the metadata push is zero stream id based, and it's very good to push such critical messages and make the app to do some responding ASAP. With prioritised delivering, it's good for medatapush and keep-alive checking.

For example, Now we want to implement token bucket to control message sending, if the 0 stream id messages are in this queue, and the requester and responder can not exchange some critical information by metadataPush.

@mostroverkhov
Copy link
Member

mostroverkhov commented Mar 2, 2020

@linux-china

With prioritised medatapush

Signalling additional capacity to peer while ignoring existing enqueued messages (size may be estimated by time-to-wire delay) removes natural negative feedback loop that keeps endpoints stable - responses will start to timeout, likely to be retried by loadbalancer and overwhelm remaining ones.

With prioritised delivering, it's good for keep-alive checking

Keep-alive frame can have data attached to It. Most likely this data will be RTT as in http2 ping frame. Having RTT include both RSocket peers outgoing queue latencies in addition to network latency gives true information about RSocket health. Network only RTT of 1 ms with prioritized keep-alives is not useful if 1000ms (instead of target e.g. 5ms) is spent on outgoing queues - in such case this is just false information - such RSocket is not healthy and I want to aggressively reduce allowed requests permits for It.

@OlegDokuka
Copy link
Member Author

OlegDokuka commented Mar 2, 2020

@mostroverkhov

  • responses will start to timeout, likely to be retried by loadbalancer and overwhelm remaining ones.

this is the downside of the current implementation and nothing more.

Keep-alive frame can have data attached to It. Most likely this data will be RTT as in http2 ping frame.

Funny, the same HTTP/2 spec says that

Receivers of a PING frame that does not include an ACK flag MUST send a PING frame with the ACK flag set in response, with an identical payload. PING responses SHOULD be given higher priority than any other frame.

So I rather say - this PR makes even more sense than before

@mostroverkhov
Copy link
Member

@OlegDokuka you are conflating rsocket keepalives with connection keep-alives as pointed by Steve Gury rsocket/rsocket#280 (comment). As I said above, network only RTT is useless for load estimation, and prioritizing keep-alives introduced by this PR just masks a problem when you have 1 ms keep-alive RTT but requests timeout after 5 sec not even hitting the network

@OlegDokuka
Copy link
Member Author

@mostroverkhov looking back into the history of the protocol and keepalive development, I found this discussion(rsocket/rsocket#8 (comment)) which states that keepalive is more on identifying the connection and rsocket problem rather than identifying how much messages are enqueued on the application level. Due to what @stevegury said, in order to identify queueing time, better to use simple request-response on the level of application logic.

@stevegury correct me if I'm wrong.

Also, looking over all the issues related to a keepalive, it seems that in all cases keepalive is mentioned in the context of client and connection and not in the context of the user's application. (rsocket/rsocket#58 (comment)).

@mostroverkhov feel free to open a ticket at rsocket-spec repo if you have any concerns

@OlegDokuka OlegDokuka linked an issue Mar 25, 2020 that may be closed by this pull request
@rstoyanchev rstoyanchev modified the milestones: 1.0, 1.0.0-RC7 Apr 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

No keep-alive acks in the process of receiving data
7 participants