Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible header / URI corruption when discardReadBytes() is calle… #385

Merged

Conversation

normanmaurer
Copy link
Member

…d in HTTPDecoder

Motivation:

When discardReadBytes() was called and we still did not pass the Head to the user it was quite possible that the headers / URI could be corrupted as the stored readerIndex did not matchup anymore.

Modifications:

  • Override most of the functionality of ByteToMessageDecoder in HTTPDecoder to better work with the provided state machine of http_parser
  • Remove allocations by removing the pendingInOut Array as its not needed anymore.
  • Add guards against re-entrance calls
  • Add unit test that shows that everything works as expected now.

Result:

Fixed bug and reduced allocations (8% - 10% perf win).

@normanmaurer normanmaurer requested review from Lukasa and weissi May 2, 2018 18:17
@normanmaurer normanmaurer force-pushed the decoder_fix_and_allocation_reduce branch from 3bc7012 to d6087cc Compare May 2, 2018 19:15
Copy link
Contributor

@Lukasa Lukasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, generally looks really good! Notes inline.

@@ -82,6 +84,7 @@ private struct HTTPParserState {
let (index, length) = consumeSlice()
self.currentStatus = self.cumulationBuffer!.getString(at: index, length: length)!
case .body:
self.currentNameIndex = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ever hit this branch without having gone through .headerValue? It seems like should be an assertion instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep sounds correct... same for the slice imho

public func decoderRemoved(ctx: ChannelHandlerContext) {
// Remove the stored reference to ChannelHandlerContext
parser.data = UnsafeMutableRawPointer(bitPattern: 0x0000deadbeef0000)
private var decoding: Bool = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this up to the other state declarations rather than hiding it just above channelRead?

settings.on_message_begin = nil
}
// Guard against re-entrance calls of channelRead(...)
guard !decoding else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicit self please. 😄

// also need to update the reader index to whatever it is now.
state.slice = (buffer.readerIndex, slice.length)
buffer.moveReaderIndex(forwardBy: state.readerIndexAdjustment)
// Needed to guard again re-entrace calls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/entrace/entrant/

}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method's 80 lines long, can we factor it into some smaller sub-functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// There was no change to the readable bytes of the cumulationBuffer by a re-entrant call of channelRead. Its safe to break the loop.
break
}
} while true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeat...while true is a strange idiom. I know you didn't add it, but can we swap this to just while true { }?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no, let's use while var bufferSlice = self.cumulationBuffer, bufferSlice.readableBytes > 0. That will cover your loop exit condition at the bottom and also create the buffer slice we need.

return result
}

guard self.cumulationBuffer != nil else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better just to not nil the buffer out on channel closure? It'll simplify the code here, and the buffer will get dropped anyway, one way or another.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do this but this would also need changed in ByteToMessageDecoder and may increase memory usage if a user still hold a reference to the handler after the channel become inactive / the handler was removed.

WDYT ?

// Update readerIndex of the cumulationBuffer itself as we will refetch it in the next loop run if needed.
self.cumulationBuffer!.moveReaderIndex(forwardBy: result)

if bufferSlice.readableBytes == self.cumulationBuffer!.readableBytes + result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we have an assertion above that bufferSlice.readableBytes == result, and then the next line is self.cumulationBuffer!.moveReaderIndex(forwardBy: result), I think this conditional is just a very complex way of spelling if self.cumulationBuffer!.readableBytes == 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep... refactored to use your while loop :)

return .continue
}
public func decode(ctx: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
return DecodingState.needMoreData
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a code comment here indicating that we don't actually expect this function to be called?

self.state.seenEOF = true

guard !self.decoding else {
// We are currently decoding, return early as we will handle it in the decoding loop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is wrong: we don't handle the EOF in the decoding loop at all. We need to explicitly call c_nio_http_parser_execute with a length of 0, which the other code never does. You may want to add a test that triggers this case to validate that it doesn't work, and then we can fix it. 😄

Package.swift Outdated
@@ -51,6 +51,8 @@ var targets: [PackageDescription.Target] = [
dependencies: ["NIO", "NIOHTTP1", "CNIOSHA1"]),
.target(name: "NIOWebSocketServer",
dependencies: ["NIO", "NIOHTTP1", "NIOWebSocket"]),
.target(name: "NIOHTTP1HelloWorldServer",
dependencies: ["NIO", "NIOHTTP1", "NIOConcurrencyHelpers"]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this isn't just part of NIOHTTP1Server?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should not be part of the pr at all... included by mistake.

}
let result = state.baseAddress!.withMemoryRebound(to: Int8.self, capacity: pointer.count, { p in
c_nio_http_parser_execute(&parser, &settings, p.advanced(by: bufferSlice.readerIndex), bufferSlice.readableBytes)
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the trailing closure syntax instead of including the closure in the parenthesis.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

self.state.currentError = HTTPParserError.httpError(fromCHTTPParserErrno: http_errno(rawValue: httpError))!
throw self.state.currentError!
}
return result
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block seems wildly too long to me: do we need the pointer to be valid for this long?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope let me move some stuff outside.

private func decodeHTTP(ctx: ChannelHandlerContext) throws {
// We need to refetch the cumulationBuffer on each loop as it may has changed due re-entrance calls of channelRead(...)
while let bufferSlice = self.cumulationBuffer, bufferSlice.readableBytes > 0 {
let result = try bufferSlice.withVeryUnsafeBytes { (pointer) -> size_t in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this use withVeryUnsafeBytes and then manipulate the pointer, instead of using withUnsafeReadableBytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it makes things easier when we calculate the readerIndex based on the baseAddress.

state.baseAddress = nil
guard !self.state.seenEOF else {
// We need to notify the parser about the EOF as we received it while in http_parser_excecute.
self.notifyParserEOF(ctx: ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this notifies too early. While there are still bytes in the cumulation buffer, we need to parse them. This should be outside the while loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

if httpError != 0 {
self.state.currentError = HTTPParserError.httpError(fromCHTTPParserErrno: http_errno(rawValue: httpError))!
throw self.state.currentError!
guard self.cumulationBuffer != nil else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than have this guard, just change the movement of the reader index on line 439 to self.cumulationBuffer?.moveReaderIndex(forwardBy: result). That will cover the buffer being nild out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

@normanmaurer normanmaurer force-pushed the decoder_fix_and_allocation_reduce branch from e3d2f5e to 27f3508 Compare May 3, 2018 09:13
@normanmaurer
Copy link
Member Author

@weissi @Lukasa PTAL again

fileprivate var state = HTTPParserState()

fileprivate init(type: HTTPMessageT.Type) {
/* this is a private init, the public versions only allow HTTPClientResponsePart and HTTPServerRequestPart */
assert(HTTPMessageT.self == HTTPClientResponsePart.self || HTTPMessageT.self == HTTPServerRequestPart.self)
}

deinit {
// Remove the stored reference to ChannelHandlerContext
self.parser.data = UnsafeMutableRawPointer(bitPattern: 0x0000deadbeef0000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make @helje5's life easier and not cause needless merge conflicts, could you make this value 0xdeadbeef? That way it'll work on 32-bit platforms too

Copy link
Member

@weissi weissi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally looks really good but having a hard time to think all this though :)

assert(result == bufferSlice.readableBytes)

// Update readerIndex of the cumulationBuffer itself as we will refetch it in the next loop run if needed.
self.cumulationBuffer?.moveReaderIndex(forwardBy: result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's marginally faster to use moveReaderIndex(to: writerIndex) I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will mess up with the loop that handles re-entrance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah shoot you're right. Nevermind then.

let result = try buffer.withVeryUnsafeBytes { (pointer) -> size_t in
state.baseAddress = pointer.baseAddress!.assumingMemoryBound(to: UInt8.self)
if self.state.seenEOF {
// We need to notify the parser about the EOF as we received it while in http_parser_excecute.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: http_parser_execute.

@normanmaurer
Copy link
Member Author

@weissi @Lukasa PTAL again

@normanmaurer
Copy link
Member Author

@weissi @Lukasa ok now it does handle discarding of decoded bytes a lot better then before (the code is more complex tho :( ). PTAL again.

@normanmaurer normanmaurer force-pushed the decoder_fix_and_allocation_reduce branch from 5c984d4 to 0189aa5 Compare May 3, 2018 19:21
Copy link
Contributor

@Lukasa Lukasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick partial review, will re-review tomorrow.

self.cumulationBuffer = nil

case .headerField, .headerValue:
if let headerStartIdx = self.state.headerStartIndex {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole body of this case is in the if block, so this may as well be guard.

if let error = state.currentError {
throw error
case .headerField, .headerValue:
guard let headerStartIdx = self.state.headerStartIndex else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to be in this state without a headerStartIndex?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lukasa yes as we reset it to nil if we discarded the decoded bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason not to set it to 0 instead and avoid this branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes as otherwise we will try to increase the writerIndex etc later on which is even more expensive.

}

/// Will discard bytes till readerIndex if its needed and then call `fn`.
private func mayDiscardDecodedBytes(readerIndex: Int, _ fn: () -> Void) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the "readerIndex" label could be improved: maybe "upTo" so that this now reads mayDiscardDecodedBytes(upTo:).

fn()
}

self.cumulationBuffer!.moveReaderIndex(to: self.cumulationBuffer!.writerIndex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert that at the start of this function the reader index was equal to the writer index?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done

@Lukasa
Copy link
Contributor

Lukasa commented May 4, 2018

Ok, this looks good to me.

@Lukasa Lukasa added the semver/patch No public API change. label May 4, 2018
@Lukasa Lukasa added this to the 1.7.0 milestone May 4, 2018
…d in HTTPDecoder

Motivation:

When discardReadBytes() was called and we still did not pass the Head to the user it was quite possible that the headers / URI could be corrupted as the stored readerIndex did not matchup anymore.

Modifications:

- Override most of the functionality of ByteToMessageDecoder in HTTPDecoder to better work with the provided state machine of http_parser
- Remove allocations by removing the pendingInOut Array as its not needed anymore.
- Add guards against re-entrance calls
- Add unit test that shows that everything works as expected now.

Result:

Fixed bug and reduced allocations (8% - 10% perf win).
@normanmaurer normanmaurer force-pushed the decoder_fix_and_allocation_reduce branch from 26a9275 to be9cd2f Compare May 4, 2018 17:11
@normanmaurer normanmaurer merged commit 62a9ff1 into apple:master May 4, 2018
@normanmaurer normanmaurer deleted the decoder_fix_and_allocation_reduce branch May 4, 2018 17:22
@normanmaurer
Copy link
Member Author

Thanks for the review.. merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
semver/patch No public API change.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants