Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: support generic Duplex streams #16269

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/api/http2.md
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,9 @@ added: v8.4.0
used to determine the padding. See [Using options.selectPadding][].
* `settings` {[Settings Object][]} The initial settings to send to the
remote peer upon connection.
* `createConnection` {Function} An optional callback that receives the `URL`
instance passed to `connect` and the `options` object, and returns any
[`Duplex`][] stream that is to be used as the connection for this session.
* `listener` {Function}
* Returns {Http2Session}

Expand Down
32 changes: 21 additions & 11 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const tls = require('tls');
const util = require('util');
const fs = require('fs');
const errors = require('internal/errors');
const { StreamWrap } = require('_stream_wrap');
const { Duplex } = require('stream');
const { URL } = require('url');
const { onServerStream,
Expand Down Expand Up @@ -695,10 +696,14 @@ class Http2Session extends EventEmitter {

// type { number } either NGHTTP2_SESSION_SERVER or NGHTTP2_SESSION_CLIENT
// options { Object }
// socket { net.Socket | tls.TLSSocket }
// socket { net.Socket | tls.TLSSocket | stream.Duplex }
constructor(type, options, socket) {
super();

if (!socket._handle || !socket._handle._externalStream) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is super nit picky but could we compare to undefined? Anytime we're dealing with an object, because of the existence of document.all (at least as far as V8 is concerned), these truthy/falsey checks are unusually expensive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some things in Node’s source explicitly set ._handle on sockets to null (e.g. after disconnecting)… so I’d like to be consistent with TLS here, just to be on the safe side.

these truthy/falsey checks are unusually expensive.

I doubt this makes a noticeable difference compared to the overall cost of setting up the HTTP2 session objects anyway, V8 seems to optimize a function containing only this line reasonable well to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I thought we were unsetting _handle but if we do null it then this is fine.

socket = new StreamWrap(socket);
}

// No validation is performed on the input parameters because this
// constructor is not exported directly for users.

Expand All @@ -723,7 +728,8 @@ class Http2Session extends EventEmitter {
this[kSocket] = socket;

// Do not use nagle's algorithm
socket.setNoDelay();
if (typeof socket.setNoDelay === 'function')
socket.setNoDelay();

// Disable TLS renegotiation on the socket
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit further down in this code there is a check for socket.connecting that explicitly defers actions until the socket is connected. With this change, the assumption would be that the non-socket Stream passed in is immediately available for use, which is a perfectly fine assumption to make, IMHO, but I want to make sure that is actually a safe assumption here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure what the alternative would be. We could document socket.connecting/socket.on('connect') as part of the implicit API here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would certainly be an option, but given that we're wrapping the custom socket, the likelihood of it even being an issue is pretty low. Let's go with this for now :-)

if (typeof socket.disableRenegotiation === 'function')
Expand Down Expand Up @@ -2429,15 +2435,19 @@ function connect(authority, options, listener) {
const host = authority.hostname || authority.host || 'localhost';

let socket;
switch (protocol) {
case 'http:':
socket = net.connect(port, host);
break;
case 'https:':
socket = tls.connect(port, host, initializeTLSOptions(options, host));
break;
default:
throw new errors.Error('ERR_HTTP2_UNSUPPORTED_PROTOCOL', protocol);
if (typeof options.createConnection === 'function') {
socket = options.createConnection(authority, options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should likely type check the return value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all we could do here is doing some duck-type checking, e.g. verifying that .write() and .on() are present…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to? This is very much opt-in... What does http do for createConnection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn’t do any explicit typechecking either, yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I won't block on that. I much prefer the APIs to be stricter here and not gate based on what we do in http1 but I can live with this.

} else {
switch (protocol) {
case 'http:':
socket = net.connect(port, host);
break;
case 'https:':
socket = tls.connect(port, host, initializeTLSOptions(options, host));
break;
default:
throw new errors.Error('ERR_HTTP2_UNSUPPORTED_PROTOCOL', protocol);
}
}

socket.on('error', socketOnError);
Expand Down
85 changes: 52 additions & 33 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,55 @@ JSStream::JSStream(Environment* env, Local<Object> obj)
StreamBase(env) {
node::Wrap(obj, this);
MakeWeak<JSStream>(this);

set_alloc_cb({ OnAllocImpl, this });
set_read_cb({ OnReadImpl, this });
}


JSStream::~JSStream() {
}


void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
buf->base = Malloc(size);
buf->len = size;
}


void JSStream::OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
JSStream* wrap = static_cast<JSStream*>(ctx);
CHECK_NE(wrap, nullptr);
Environment* env = wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

if (nread < 0) {
if (buf != nullptr && buf->base != nullptr)
free(buf->base);
wrap->EmitData(nread, Local<Object>(), Local<Object>());
return;
}

if (nread == 0) {
if (buf->base != nullptr)
free(buf->base);
return;
}

CHECK_LE(static_cast<size_t>(nread), buf->len);
char* base = node::Realloc(buf->base, nread);

CHECK_EQ(pending, UV_UNKNOWN_HANDLE);

Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
wrap->EmitData(nread, obj, Local<Object>());
}


void* JSStream::Cast() {
return static_cast<void*>(this);
}
Expand All @@ -45,6 +87,8 @@ AsyncWrap* JSStream::GetAsyncWrap() {


bool JSStream::IsAlive() {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
v8::Local<v8::Value> fn = object()->Get(env()->isalive_string());
if (!fn->IsFunction())
return false;
Expand All @@ -54,25 +98,32 @@ bool JSStream::IsAlive() {


bool JSStream::IsClosing() {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
return MakeCallback(env()->isclosing_string(), 0, nullptr)
.ToLocalChecked()->IsTrue();
}


int JSStream::ReadStart() {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
return MakeCallback(env()->onreadstart_string(), 0, nullptr)
.ToLocalChecked()->Int32Value();
}


int JSStream::ReadStop() {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
return MakeCallback(env()->onreadstop_string(), 0, nullptr)
.ToLocalChecked()->Int32Value();
}


int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());

Local<Value> argv[] = {
req_wrap->object()
Expand All @@ -93,6 +144,7 @@ int JSStream::DoWrite(WriteWrap* w,
CHECK_EQ(send_handle, nullptr);

HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());

Local<Array> bufs_arr = Array::New(env()->isolate(), count);
Local<Object> buf;
Expand Down Expand Up @@ -124,37 +176,6 @@ void JSStream::New(const FunctionCallbackInfo<Value>& args) {
}


static void FreeCallback(char* data, void* hint) {
// Intentional no-op
}


void JSStream::DoAlloc(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

uv_buf_t buf;
wrap->OnAlloc(args[0]->Int32Value(), &buf);
Local<Object> vbuf = Buffer::New(
wrap->env(),
buf.base,
buf.len,
FreeCallback,
nullptr).ToLocalChecked();
return args.GetReturnValue().Set(vbuf);
}


void JSStream::DoRead(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

CHECK(Buffer::HasInstance(args[1]));
uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1]));
wrap->OnRead(args[0]->Int32Value(), &buf);
}


void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
CHECK(args[0]->IsObject());
Expand Down Expand Up @@ -220,8 +241,6 @@ void JSStream::Initialize(Local<Object> target,

AsyncWrap::AddWrapMethods(env, t);

env->SetProtoMethod(t, "doAlloc", DoAlloc);
env->SetProtoMethod(t, "doRead", DoRead);
env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite);
env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
Expand Down
8 changes: 6 additions & 2 deletions src/js_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ class JSStream : public AsyncWrap, public StreamBase {
AsyncWrap* GetAsyncWrap() override;

static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoAlloc(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoRead(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoAfterWrite(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ReadBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
static void EmitEOF(const v8::FunctionCallbackInfo<v8::Value>& args);

static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);

template <class Wrap>
static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
};
Expand Down
9 changes: 9 additions & 0 deletions test/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This directory contains modules used to test the Node.js implementation.
* [Common module API](#common-module-api)
* [Countdown module](#countdown-module)
* [DNS module](#dns-module)
* [Duplex pair helper](#duplex-pair-helper)
* [Fixtures module](#fixtures-module)
* [WPT module](#wpt-module)

Expand Down Expand Up @@ -458,6 +459,14 @@ Reads a Domain String and returns a Buffer containing the domain.
Takes in a parsed Object and writes its fields to a DNS packet as a Buffer
object.

## Duplex pair helper

The `common/duplexpair` module exports a single function `makeDuplexPair`,
which returns an object `{ clientSide, serverSide }` where each side is a
`Duplex` stream connected to the other side.

There is no difference between client or server side beyond their names.

## Fixtures Module

The `common/fixtures` module provides convenience methods for working with
Expand Down
45 changes: 45 additions & 0 deletions test/common/duplexpair.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/* eslint-disable required-modules */
'use strict';
const { Duplex } = require('stream');
const assert = require('assert');

const kCallback = Symbol('Callback');
const kOtherSide = Symbol('Other');

class DuplexSocket extends Duplex {
constructor() {
super();
this[kCallback] = null;
this[kOtherSide] = null;
}

_read() {
const callback = this[kCallback];
if (callback) {
this[kCallback] = null;
callback();
}
}

_write(chunk, encoding, callback) {
assert.notStrictEqual(this[kOtherSide], null);
assert.strictEqual(this[kOtherSide][kCallback], null);
this[kOtherSide][kCallback] = callback;
this[kOtherSide].push(chunk);
}

_final(callback) {
this[kOtherSide].on('end', callback);
this[kOtherSide].push(null);
}
}

function makeDuplexPair() {
const clientSide = new DuplexSocket();
const serverSide = new DuplexSocket();
clientSide[kOtherSide] = serverSide;
serverSide[kOtherSide] = clientSide;
return { clientSide, serverSide };
}

module.exports = makeDuplexPair;
40 changes: 40 additions & 0 deletions test/parallel/test-http2-generic-streams-sendfile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');
const fs = require('fs');
const makeDuplexPair = require('../common/duplexpair');

{
const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {
stream.respondWithFile(__filename);
}));

const { clientSide, serverSide } = makeDuplexPair();
server.emit('connection', serverSide);

const client = http2.connect('http://localhost:80', {
createConnection: common.mustCall(() => clientSide)
});

const req = client.request({ ':path': '/' });

req.on('response', common.mustCall((headers) => {
assert.strictEqual(headers[':status'], 200);
}));

req.setEncoding('utf8');
let data = '';
req.on('data', (chunk) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: ideally a mustCall?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just accumulating the data, and that’s checked in an assert (inside a mustCall) anyway

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mustCall's on data events can be problematic. If anything, a mustCallAtLeast would be ok.

Copy link
Member

@apapirovski apapirovski Oct 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this was strictly about the consistency of these calls. In other tests with this PR, data events have mustCall. It sets a good example for newer contributors when there's consistency to where we mustCall and where we don't. Or we could leave a comment explaining why not.

When I started writing tests and contributing to node I had a bit of confusion around this because it wasn't 100% consistent across tests.

But as I mentioned, this is just a very minor nit. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, the other call is somewhat intentionally checking that the data is sent through in a single chunk :) I’ve added 2a666de1940b as a comment for that

data += chunk;
});
req.on('end', common.mustCall(() => {
assert.strictEqual(data, fs.readFileSync(__filename, 'utf8'));
clientSide.destroy();
clientSide.end();
}));
req.end();
}
Loading