diff --git a/doc/api/http2.md b/doc/api/http2.md index c85de0092a72a2..3da60175fe1b1a 100644 --- a/doc/api/http2.md +++ b/doc/api/http2.md @@ -1598,6 +1598,9 @@ added: v8.4.0 used to determine the padding. See [Using options.selectPadding][]. * `settings` {[Settings Object][]} The initial settings to send to the remote peer upon connection. + * `createConnection` {Function} An optional callback that receives the `URL` + instance passed to `connect` and the `options` object, and returns any + [`Duplex`][] stream that is to be used as the connection for this session. * `listener` {Function} * Returns {Http2Session} diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 5eb34ecdfd20e0..903ec0ced1444d 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -13,6 +13,7 @@ const tls = require('tls'); const util = require('util'); const fs = require('fs'); const errors = require('internal/errors'); +const { StreamWrap } = require('_stream_wrap'); const { Duplex } = require('stream'); const { URL } = require('url'); const { onServerStream, @@ -695,10 +696,14 @@ class Http2Session extends EventEmitter { // type { number } either NGHTTP2_SESSION_SERVER or NGHTTP2_SESSION_CLIENT // options { Object } - // socket { net.Socket | tls.TLSSocket } + // socket { net.Socket | tls.TLSSocket | stream.Duplex } constructor(type, options, socket) { super(); + if (!socket._handle || !socket._handle._externalStream) { + socket = new StreamWrap(socket); + } + // No validation is performed on the input parameters because this // constructor is not exported directly for users. @@ -723,7 +728,8 @@ class Http2Session extends EventEmitter { this[kSocket] = socket; // Do not use nagle's algorithm - socket.setNoDelay(); + if (typeof socket.setNoDelay === 'function') + socket.setNoDelay(); // Disable TLS renegotiation on the socket if (typeof socket.disableRenegotiation === 'function') @@ -2429,15 +2435,19 @@ function connect(authority, options, listener) { const host = authority.hostname || authority.host || 'localhost'; let socket; - switch (protocol) { - case 'http:': - socket = net.connect(port, host); - break; - case 'https:': - socket = tls.connect(port, host, initializeTLSOptions(options, host)); - break; - default: - throw new errors.Error('ERR_HTTP2_UNSUPPORTED_PROTOCOL', protocol); + if (typeof options.createConnection === 'function') { + socket = options.createConnection(authority, options); + } else { + switch (protocol) { + case 'http:': + socket = net.connect(port, host); + break; + case 'https:': + socket = tls.connect(port, host, initializeTLSOptions(options, host)); + break; + default: + throw new errors.Error('ERR_HTTP2_UNSUPPORTED_PROTOCOL', protocol); + } } socket.on('error', socketOnError); diff --git a/src/js_stream.cc b/src/js_stream.cc index b62dcf3ef5b407..a279970c1bbfca 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -27,6 +27,9 @@ JSStream::JSStream(Environment* env, Local obj) StreamBase(env) { node::Wrap(obj, this); MakeWeak(this); + + set_alloc_cb({ OnAllocImpl, this }); + set_read_cb({ OnReadImpl, this }); } @@ -34,6 +37,45 @@ JSStream::~JSStream() { } +void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { + buf->base = Malloc(size); + buf->len = size; +} + + +void JSStream::OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + JSStream* wrap = static_cast(ctx); + CHECK_NE(wrap, nullptr); + Environment* env = wrap->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + if (nread < 0) { + if (buf != nullptr && buf->base != nullptr) + free(buf->base); + wrap->EmitData(nread, Local(), Local()); + return; + } + + if (nread == 0) { + if (buf->base != nullptr) + free(buf->base); + return; + } + + CHECK_LE(static_cast(nread), buf->len); + char* base = node::Realloc(buf->base, nread); + + CHECK_EQ(pending, UV_UNKNOWN_HANDLE); + + Local obj = Buffer::New(env, base, nread).ToLocalChecked(); + wrap->EmitData(nread, obj, Local()); +} + + void* JSStream::Cast() { return static_cast(this); } @@ -45,6 +87,8 @@ AsyncWrap* JSStream::GetAsyncWrap() { bool JSStream::IsAlive() { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); v8::Local fn = object()->Get(env()->isalive_string()); if (!fn->IsFunction()) return false; @@ -54,18 +98,24 @@ bool JSStream::IsAlive() { bool JSStream::IsClosing() { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); return MakeCallback(env()->isclosing_string(), 0, nullptr) .ToLocalChecked()->IsTrue(); } int JSStream::ReadStart() { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); return MakeCallback(env()->onreadstart_string(), 0, nullptr) .ToLocalChecked()->Int32Value(); } int JSStream::ReadStop() { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); return MakeCallback(env()->onreadstop_string(), 0, nullptr) .ToLocalChecked()->Int32Value(); } @@ -73,6 +123,7 @@ int JSStream::ReadStop() { int JSStream::DoShutdown(ShutdownWrap* req_wrap) { HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); Local argv[] = { req_wrap->object() @@ -93,6 +144,7 @@ int JSStream::DoWrite(WriteWrap* w, CHECK_EQ(send_handle, nullptr); HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); Local bufs_arr = Array::New(env()->isolate(), count); Local buf; @@ -124,37 +176,6 @@ void JSStream::New(const FunctionCallbackInfo& args) { } -static void FreeCallback(char* data, void* hint) { - // Intentional no-op -} - - -void JSStream::DoAlloc(const FunctionCallbackInfo& args) { - JSStream* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); - - uv_buf_t buf; - wrap->OnAlloc(args[0]->Int32Value(), &buf); - Local vbuf = Buffer::New( - wrap->env(), - buf.base, - buf.len, - FreeCallback, - nullptr).ToLocalChecked(); - return args.GetReturnValue().Set(vbuf); -} - - -void JSStream::DoRead(const FunctionCallbackInfo& args) { - JSStream* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); - - CHECK(Buffer::HasInstance(args[1])); - uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1])); - wrap->OnRead(args[0]->Int32Value(), &buf); -} - - void JSStream::DoAfterWrite(const FunctionCallbackInfo& args) { JSStream* wrap; CHECK(args[0]->IsObject()); @@ -220,8 +241,6 @@ void JSStream::Initialize(Local target, AsyncWrap::AddWrapMethods(env, t); - env->SetProtoMethod(t, "doAlloc", DoAlloc); - env->SetProtoMethod(t, "doRead", DoRead); env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite); env->SetProtoMethod(t, "finishWrite", Finish); env->SetProtoMethod(t, "finishShutdown", Finish); diff --git a/src/js_stream.h b/src/js_stream.h index fc0b7abe15a633..a4a67ae3372620 100644 --- a/src/js_stream.h +++ b/src/js_stream.h @@ -38,12 +38,16 @@ class JSStream : public AsyncWrap, public StreamBase { AsyncWrap* GetAsyncWrap() override; static void New(const v8::FunctionCallbackInfo& args); - static void DoAlloc(const v8::FunctionCallbackInfo& args); - static void DoRead(const v8::FunctionCallbackInfo& args); static void DoAfterWrite(const v8::FunctionCallbackInfo& args); static void ReadBuffer(const v8::FunctionCallbackInfo& args); static void EmitEOF(const v8::FunctionCallbackInfo& args); + static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); + static void OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); + template static void Finish(const v8::FunctionCallbackInfo& args); }; diff --git a/test/common/README.md b/test/common/README.md index 93106ad076e2e1..46eb001df86cf6 100644 --- a/test/common/README.md +++ b/test/common/README.md @@ -8,6 +8,7 @@ This directory contains modules used to test the Node.js implementation. * [Common module API](#common-module-api) * [Countdown module](#countdown-module) * [DNS module](#dns-module) +* [Duplex pair helper](#duplex-pair-helper) * [Fixtures module](#fixtures-module) * [WPT module](#wpt-module) @@ -458,6 +459,14 @@ Reads a Domain String and returns a Buffer containing the domain. Takes in a parsed Object and writes its fields to a DNS packet as a Buffer object. +## Duplex pair helper + +The `common/duplexpair` module exports a single function `makeDuplexPair`, +which returns an object `{ clientSide, serverSide }` where each side is a +`Duplex` stream connected to the other side. + +There is no difference between client or server side beyond their names. + ## Fixtures Module The `common/fixtures` module provides convenience methods for working with diff --git a/test/common/duplexpair.js b/test/common/duplexpair.js new file mode 100644 index 00000000000000..ea5bd86a041b24 --- /dev/null +++ b/test/common/duplexpair.js @@ -0,0 +1,45 @@ +/* eslint-disable required-modules */ +'use strict'; +const { Duplex } = require('stream'); +const assert = require('assert'); + +const kCallback = Symbol('Callback'); +const kOtherSide = Symbol('Other'); + +class DuplexSocket extends Duplex { + constructor() { + super(); + this[kCallback] = null; + this[kOtherSide] = null; + } + + _read() { + const callback = this[kCallback]; + if (callback) { + this[kCallback] = null; + callback(); + } + } + + _write(chunk, encoding, callback) { + assert.notStrictEqual(this[kOtherSide], null); + assert.strictEqual(this[kOtherSide][kCallback], null); + this[kOtherSide][kCallback] = callback; + this[kOtherSide].push(chunk); + } + + _final(callback) { + this[kOtherSide].on('end', callback); + this[kOtherSide].push(null); + } +} + +function makeDuplexPair() { + const clientSide = new DuplexSocket(); + const serverSide = new DuplexSocket(); + clientSide[kOtherSide] = serverSide; + serverSide[kOtherSide] = clientSide; + return { clientSide, serverSide }; +} + +module.exports = makeDuplexPair; diff --git a/test/parallel/test-http2-generic-streams-sendfile.js b/test/parallel/test-http2-generic-streams-sendfile.js new file mode 100644 index 00000000000000..1054574a8b1ca2 --- /dev/null +++ b/test/parallel/test-http2-generic-streams-sendfile.js @@ -0,0 +1,40 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const fs = require('fs'); +const makeDuplexPair = require('../common/duplexpair'); + +{ + const server = http2.createServer(); + server.on('stream', common.mustCall((stream, headers) => { + stream.respondWithFile(__filename); + })); + + const { clientSide, serverSide } = makeDuplexPair(); + server.emit('connection', serverSide); + + const client = http2.connect('http://localhost:80', { + createConnection: common.mustCall(() => clientSide) + }); + + const req = client.request({ ':path': '/' }); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 200); + })); + + req.setEncoding('utf8'); + let data = ''; + req.on('data', (chunk) => { + data += chunk; + }); + req.on('end', common.mustCall(() => { + assert.strictEqual(data, fs.readFileSync(__filename, 'utf8')); + clientSide.destroy(); + clientSide.end(); + })); + req.end(); +} diff --git a/test/parallel/test-http2-generic-streams.js b/test/parallel/test-http2-generic-streams.js new file mode 100644 index 00000000000000..d97e86a5ecea55 --- /dev/null +++ b/test/parallel/test-http2-generic-streams.js @@ -0,0 +1,45 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const makeDuplexPair = require('../common/duplexpair'); + +{ + const testData = '

Hello World

'; + const server = http2.createServer(); + server.on('stream', common.mustCall((stream, headers) => { + stream.respond({ + 'content-type': 'text/html', + ':status': 200 + }); + stream.end(testData); + })); + + const { clientSide, serverSide } = makeDuplexPair(); + server.emit('connection', serverSide); + + const client = http2.connect('http://localhost:80', { + createConnection: common.mustCall(() => clientSide) + }); + + const req = client.request({ ':path': '/' }); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 200); + })); + + req.setEncoding('utf8'); + // Note: This is checking that this small amount of data is passed through in + // a single chunk, which is unusual for our test suite but seems like a + // reasonable assumption here. + req.on('data', common.mustCall((data) => { + assert.strictEqual(data, testData); + })); + req.on('end', common.mustCall(() => { + clientSide.destroy(); + clientSide.end(); + })); + req.end(); +} diff --git a/test/parallel/test-wrap-js-stream-duplex.js b/test/parallel/test-wrap-js-stream-duplex.js new file mode 100644 index 00000000000000..6bd860e6ba1f56 --- /dev/null +++ b/test/parallel/test-wrap-js-stream-duplex.js @@ -0,0 +1,22 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const StreamWrap = require('_stream_wrap'); +const { PassThrough } = require('stream'); +const { Socket } = require('net'); + +{ + const wrap = new StreamWrap(new PassThrough()); + assert(wrap instanceof Socket); + wrap.on('data', common.mustCall((d) => assert.strictEqual(`${d}`, 'foo'))); + wrap.on('end', common.mustNotCall()); + wrap.write('foo'); +} + +{ + const wrap = new StreamWrap(new PassThrough()); + assert(wrap instanceof Socket); + wrap.on('data', common.mustCall((d) => assert.strictEqual(`${d}`, 'foo'))); + wrap.on('end', common.mustCall()); + wrap.end('foo'); +}