Skip to content

Commit

Permalink
HTTPCORE-708: H2 stream multiplexer incorrectly handles multiple fram…
Browse files Browse the repository at this point in the history
…e fragments in a single input chunk
  • Loading branch information
ok2c committed Mar 10, 2022
1 parent 0ed656a commit 64c4c7b
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,11 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
if (connState == ConnectionHandshake.SHUTDOWN) {
ioSession.clearEvent(SelectionKey.OP_READ);
} else {
if (src != null) {
inputBuffer.put(src);
}
RawFrame frame;
while ((frame = inputBuffer.read(ioSession)) != null) {
for (;;) {
final RawFrame frame = inputBuffer.read(src, ioSession);
if (frame == null) {
break;
}
if (streamListener != null) {
streamListener.onFrameInput(this, frame.getStreamId(), frame);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,37 @@ public void put(final ByteBuffer src) {
buffer.flip();
}

public RawFrame read(final ReadableByteChannel channel) throws IOException {
/**
* Attempts to read a complete frame from the given source buffer and the underlying data
* channel. The source buffer is consumed first. More data can be read from the channel
* if required.
*
* @param src the source buffer or {@code null} if not available.
* @param channel the underlying data channel.
*
* @return a complete frame or {@code null} a complete frame cannot be read.
*
* @since 5.1
*/
public RawFrame read(final ByteBuffer src, final ReadableByteChannel channel) throws IOException {
for (;;) {
if (src != null) {
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
final int remaining = buffer.remaining();
if (remaining >= src.remaining()) {
buffer.put(src);
} else {
final int limit = src.limit();
src.limit(remaining);
buffer.put(src);
src.limit(limit);
}
buffer.flip();
}
switch (state) {
case HEAD_EXPECTED:
if (buffer.remaining() >= FrameConsts.HEAD_LEN) {
Expand Down Expand Up @@ -150,6 +179,15 @@ public RawFrame read(final ReadableByteChannel channel) throws IOException {
return null;
}

/**
* Attempts to read a complete frame from the underlying data channel.
*
* @param channel the underlying data channel.
*/
public RawFrame read(final ReadableByteChannel channel) throws IOException {
return read(null, channel);
}

public void reset() {
buffer.compact();
state = State.HEAD_EXPECTED;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.http2.impl.nio;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http2.H2ConnectionException;
import org.apache.hc.core5.http2.WritableByteChannelMock;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.frame.DefaultFrameFactory;
import org.apache.hc.core5.http2.frame.FrameConsts;
import org.apache.hc.core5.http2.frame.FrameFactory;
import org.apache.hc.core5.http2.frame.FrameType;
import org.apache.hc.core5.http2.frame.RawFrame;
import org.apache.hc.core5.http2.frame.StreamIdGenerator;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

public class TestAbstractH2StreamMultiplexer {

@Mock
ProtocolIOSession protocolIOSession;
@Mock
HttpProcessor httpProcessor;
@Mock
H2StreamListener h2StreamListener;

@Before
public void prepareMocks() {
MockitoAnnotations.initMocks(this);
}

static class H2StreamMultiplexerImpl extends AbstractH2StreamMultiplexer {

public H2StreamMultiplexerImpl(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final StreamIdGenerator idGenerator,
final HttpProcessor httpProcessor,
final CharCodingConfig charCodingConfig,
final H2Config h2Config,
final H2StreamListener streamListener) {
super(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener);
}

@Override
void acceptHeaderFrame() throws H2ConnectionException {
}

@Override
void acceptPushRequest() throws H2ConnectionException {
}

@Override
void acceptPushFrame() throws H2ConnectionException {
}

@Override
H2StreamHandler createRemotelyInitiatedStream(
final H2StreamChannel channel,
final HttpProcessor httpProcessor,
final BasicHttpConnectionMetrics connMetrics,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
return null;
}

@Override
H2StreamHandler createLocallyInitiatedStream(
final ExecutableCommand command,
final H2StreamChannel channel,
final HttpProcessor httpProcessor,
final BasicHttpConnectionMetrics connMetrics) throws IOException {
return null;
}
}

@Test
public void testInputOneFrame() throws Exception {
final WritableByteChannelMock writableChannel = new WritableByteChannelMock(1024);
final FrameOutputBuffer outbuffer = new FrameOutputBuffer(16 * 1024);

final byte[] data = new byte[FrameConsts.MIN_FRAME_SIZE];
for (int i = 0; i < FrameConsts.MIN_FRAME_SIZE; i++) {
data[i] = (byte)(i % 16);
}

final RawFrame frame = new RawFrame(FrameType.DATA.getValue(), 0, 1, ByteBuffer.wrap(data));
outbuffer.write(frame, writableChannel);
final byte[] bytes = writableChannel.toByteArray();

final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl(
protocolIOSession,
DefaultFrameFactory.INSTANCE,
StreamIdGenerator.ODD,
httpProcessor,
CharCodingConfig.DEFAULT,
H2Config.custom()
.setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE)
.build(),
h2StreamListener);

Assert.assertThrows(H2ConnectionException.class, new ThrowingRunnable() {
@Override
public void run() throws Throwable {
streamMultiplexer.onInput(ByteBuffer.wrap(bytes));
}
});

Mockito.verify(h2StreamListener).onFrameInput(
Mockito.same(streamMultiplexer),
Mockito.eq(1),
Mockito.<RawFrame>any());

Mockito.reset(h2StreamListener);

Assert.assertThrows(H2ConnectionException.class, new ThrowingRunnable() {
@Override
public void run() throws Throwable {
int pos = 0;
int remaining = bytes.length;
while (remaining > 0) {
final int chunk = Math.min(2048, remaining);
streamMultiplexer.onInput(ByteBuffer.wrap(bytes, pos, chunk));
pos += chunk;
remaining -= chunk;
}
}
});

Mockito.verify(h2StreamListener).onFrameInput(
Mockito.same(streamMultiplexer),
Mockito.eq(1),
Mockito.<RawFrame>any());
}

@Test
public void testInputMultipleFrames() throws Exception {
final WritableByteChannelMock writableChannel = new WritableByteChannelMock(1024);
final FrameOutputBuffer outbuffer = new FrameOutputBuffer(16 * 1024);

final byte[] data = new byte[FrameConsts.MIN_FRAME_SIZE];
for (int i = 0; i < FrameConsts.MIN_FRAME_SIZE; i++) {
data[i] = (byte) (i % 16);
}

final RawFrame frame1 = new RawFrame(FrameType.DATA.getValue(), 0, 1, ByteBuffer.wrap(data));
outbuffer.write(frame1, writableChannel);
final RawFrame frame2 = new RawFrame(FrameType.DATA.getValue(), 0, 1, ByteBuffer.wrap(data));
outbuffer.write(frame2, writableChannel);
final byte[] bytes = writableChannel.toByteArray();

final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl(
protocolIOSession,
DefaultFrameFactory.INSTANCE,
StreamIdGenerator.ODD,
httpProcessor,
CharCodingConfig.DEFAULT,
H2Config.custom()
.setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE)
.build(),
h2StreamListener);

Assert.assertThrows(H2ConnectionException.class, new ThrowingRunnable() {
@Override
public void run() throws Throwable {
streamMultiplexer.onInput(ByteBuffer.wrap(bytes));
}
});
Mockito.verify(h2StreamListener).onFrameInput(
Mockito.same(streamMultiplexer),
Mockito.eq(1),
Mockito.<RawFrame>any());

Mockito.reset(h2StreamListener);

Assert.assertThrows(H2ConnectionException.class, new ThrowingRunnable() {
@Override
public void run() throws Throwable {
int pos = 0;
int remaining = bytes.length;
while (remaining > 0) {
final int chunk = Math.min(4096, remaining);
streamMultiplexer.onInput(ByteBuffer.wrap(bytes, pos, chunk));
pos += chunk;
remaining -= chunk;
}
}
});
Mockito.verify(h2StreamListener).onFrameInput(
Mockito.same(streamMultiplexer),
Mockito.eq(1),
Mockito.<RawFrame>any());
}

}

0 comments on commit 64c4c7b

Please sign in to comment.