Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: further work on node http client #19211

Merged
merged 15 commits into from
May 23, 2023
28 changes: 25 additions & 3 deletions cli/tests/unit_node/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,35 @@ Deno.test("[node/http] server unref", async () => {
res.statusCode = status;
res.end("");
});
// This should let the program to exit without waiting for the

// This should let the program to exit without waiting for the
// server to close.
server.unref();

server.listen(async () => {
});
`);
assertEquals(statusCode, 0);
});

Deno.test("[node/http] ClientRequest handle non-string headers", async () => {
// deno-lint-ignore no-explicit-any
let headers: any;
const def = deferred();
const req = http.request("http://localhost:4545/echo_server", {
method: "POST",
headers: { 1: 2 },
}, (resp) => {
headers = resp.headers;

resp.on("data", () => {});

resp.on("end", () => {
def.resolve();
});
});
req.once("error", (e) => def.reject(e));
req.end();
await def;
assertEquals(headers!["1"], "2");
});
9 changes: 6 additions & 3 deletions ext/node/polyfills/_http_outgoing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ export class OutgoingMessage extends Stream {
this[kOutHeaders] = headers = Object.create(null);
}

headers[name.toLowerCase()] = [name, value];
name = name.toString();
headers[name.toLowerCase()] = [name, value.toString()];
return this;
}

Expand All @@ -262,6 +263,8 @@ export class OutgoingMessage extends Stream {
validateHeaderName(name);
validateHeaderValue(name, value);

name = name.toString();

const field = name.toLowerCase();
const headers = this[kOutHeaders];
if (headers === null || !headers[field]) {
Expand All @@ -276,10 +279,10 @@ export class OutgoingMessage extends Stream {
const existingValues = headers[field][1];
if (Array.isArray(value)) {
for (let i = 0, length = value.length; i < length; i++) {
existingValues.push(value[i]);
existingValues.push(value[i].toString());
}
} else {
existingValues.push(value);
existingValues.push(value.toString());
}

return this;
Expand Down
51 changes: 24 additions & 27 deletions ext/node/polyfills/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
} from "ext:deno_node/internal/errors.ts";
import { getTimerDuration } from "ext:deno_node/internal/timers.mjs";
import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js";
import { createHttpClient } from "ext:deno_fetch/22_http_client.js";

enum STATUS_CODES {
/** RFC 7231, 6.2.1 */
Expand Down Expand Up @@ -541,13 +542,14 @@ class ClientRequest extends OutgoingMessage {
}
}

const client = this._getClient();
const client = this._getClient() ?? createHttpClient({ http2: false });
this._client = client;

const req = core.ops.op_node_http_request(
this.method,
url,
headers,
client,
client.rid,
this.method === "POST" || this.method === "PATCH",
);

Expand Down Expand Up @@ -652,6 +654,10 @@ class ClientRequest extends OutgoingMessage {
this.controller.close();

core.opAsync("op_fetch_send", this._req.requestRid).then((res) => {
if (this._timeout) {
this._timeout.onabort = null;
}
this._client.close();
const incoming = new IncomingMessageForClient(this.socket);

// TODO(@crowlKats):
Expand All @@ -665,7 +671,10 @@ class ClientRequest extends OutgoingMessage {
incoming.statusCode = res.status;
incoming.statusMessage = res.statusText;

incoming._addHeaderLines(res.headers);
incoming._addHeaderLines(
res.headers,
Object.entries(res.headers).flat().length,
);
incoming._bodyRid = res.responseRid;

if (this._req.cancelHandleRid !== null) {
Expand Down Expand Up @@ -793,31 +802,19 @@ class ClientRequest extends OutgoingMessage {
}${path}${search}${hash}`;
}

setTimeout(timeout: number, callback?: () => void) {
if (timeout == 0) {
// Node's underlying Socket implementation expects a 0 value to disable the
// existing timeout.
if (this.opts.timeout) {
clearTimeout(this.opts.timeout);
this.opts.timeout = undefined;
this.opts.signal = undefined;
}

return;
setTimeout(msecs: number, callback?: () => void) {
if (this._ended || this._timeout) {
return this;
}

const controller = new AbortController();
this.opts.signal = controller.signal;

this.opts.timeout = setTimeout(() => {
controller.abort();
msecs = getTimerDuration(msecs, "msecs");
if (callback) this.once("timeout", callback);

this.emit("timeout");
const timeout = AbortSignal.timeout(msecs);
timeout.onabort = () => this.emit("timeout");
this._timeout = timeout;

if (callback !== undefined) {
callback();
}
}, timeout);
return this;
}

_processHeader(headers, key, value, validate) {
Expand Down Expand Up @@ -860,7 +857,7 @@ function isCookieField(s) {

function isContentDispositionField(s) {
return s.length === 19 &&
StringPrototypeToLowerCase(s) === "content-disposition";
s.toLowerCase() === "content-disposition";
}

const kHeaders = Symbol("kHeaders");
Expand Down Expand Up @@ -1111,7 +1108,7 @@ export class IncomingMessageForClient extends NodeReadable {
}

_addHeaderLineDistinct(field, value, dest) {
field = StringPrototypeToLowerCase(field);
field = field.toLowerCase();
if (!dest[field]) {
dest[field] = [value];
} else {
Expand Down Expand Up @@ -1256,7 +1253,7 @@ function matchKnownFields(field, lowercased) {
if (lowercased) {
return "\u0000" + field;
}
return matchKnownFields(StringPrototypeToLowerCase(field), true);
return matchKnownFields(field.toLowerCase(), true);
}

function onError(self, error, cb) {
Expand Down
10 changes: 3 additions & 7 deletions ext/node/polyfills/https.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type RequestOptions,
} from "ext:deno_node/http.ts";
import { Agent as HttpAgent } from "ext:deno_node/_http_agent.mjs";
import { createHttpClient } from "ext:deno_fetch/22_http_client.js";

export class Server {
constructor() {
Expand Down Expand Up @@ -80,7 +81,7 @@ class HttpsClientRequest extends ClientRequest {
return undefined;
}
if (caCerts !== undefined) {
return Deno.createHttpClient({ caCerts });
return createHttpClient({ caCerts, http2: false });
}
// const status = await Deno.permissions.query({
// name: "env",
Expand All @@ -97,13 +98,8 @@ class HttpsClientRequest extends ClientRequest {
}
const caCert = Deno.readTextFileSync(certFilename);
caCerts = [caCert];
return Deno.createHttpClient({ caCerts });
return createHttpClient({ caCerts, http2: false });
}

/*override _createSocket(): Socket {
// deno-lint-ignore no-explicit-any
return { authorized: true } as any;
}*/
}

/** Makes a request to an https server. */
Expand Down
11 changes: 1 addition & 10 deletions test_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,16 +734,7 @@ async fn main_server(
*response.status_mut() =
StatusCode::from_bytes(status.as_bytes()).unwrap();
}
if let Some(content_type) = parts.headers.get("content-type") {
response
.headers_mut()
.insert("content-type", content_type.clone());
}
if let Some(user_agent) = parts.headers.get("user-agent") {
response
.headers_mut()
.insert("user-agent", user_agent.clone());
}
response.headers_mut().extend(parts.headers);
Ok(response)
}
(&hyper::Method::POST, "/echo_multipart_file") => {
Expand Down