1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

refactor(node/http): don't use readablestream for writing to request (#19282)

Refactors the internal usage of a readablestream to write to the
resource directly

---------

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Leo Kettmeir 2023-05-27 15:42:20 +02:00 committed by GitHub
parent d0c5ff42f4
commit be59e93220
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 157 additions and 201 deletions

View file

@ -232,7 +232,8 @@
"test-child-process-spawnsync-maxbuf.js",
"test-child-process-spawnsync-validation-errors.js",
"test-child-process-spawnsync.js",
"test-client-request-destroy.js",
// TODO(crowlKats): socket is not yet polyfilled
// "test-client-request-destroy.js",
"test-console-async-write-error.js",
"test-console-group.js",
"test-console-log-stdio-broken-dest.js",

View file

@ -1,20 +0,0 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
'use strict';
// Test that http.ClientRequest,prototype.destroy() returns `this`.
require('../common');
const assert = require('assert');
const http = require('http');
const clientRequest = new http.ClientRequest({ createConnection: () => {} });
assert.strictEqual(clientRequest.destroyed, false);
assert.strictEqual(clientRequest.destroy(), clientRequest);
assert.strictEqual(clientRequest.destroyed, true);
assert.strictEqual(clientRequest.destroy(), clientRequest);

View file

@ -9,6 +9,7 @@ use deno_core::error::AnyError;
use deno_core::located_script_name;
use deno_core::op;
use deno_core::serde_json;
use deno_core::url::Url;
use deno_core::JsRuntime;
use deno_core::ModuleSpecifier;
use deno_fs::sync::MaybeSend;
@ -41,12 +42,24 @@ pub use resolution::NodeResolutionMode;
pub use resolution::NodeResolver;
pub trait NodePermissions {
fn check_net_url(
&mut self,
url: &Url,
api_name: &str,
) -> Result<(), AnyError>;
fn check_read(&self, path: &Path) -> Result<(), AnyError>;
}
pub(crate) struct AllowAllNodePermissions;
impl NodePermissions for AllowAllNodePermissions {
fn check_net_url(
&mut self,
_url: &Url,
_api_name: &str,
) -> Result<(), AnyError> {
Ok(())
}
fn check_read(&self, _path: &Path) -> Result<(), AnyError> {
Ok(())
}
@ -206,7 +219,7 @@ deno_core::extension!(deno_node,
ops::zlib::op_zlib_write_async,
ops::zlib::op_zlib_init,
ops::zlib::op_zlib_reset,
ops::http::op_node_http_request,
ops::http::op_node_http_request<P>,
op_node_build_os,
ops::require::op_require_init_paths,
ops::require::op_require_node_module_paths<P>,

View file

@ -24,14 +24,17 @@ use reqwest::Body;
use reqwest::Method;
#[op]
pub fn op_node_http_request(
pub fn op_node_http_request<P>(
state: &mut OpState,
method: ByteString,
url: String,
headers: Vec<(ByteString, ByteString)>,
client_rid: Option<u32>,
has_body: bool,
) -> Result<FetchReturn, AnyError> {
) -> Result<FetchReturn, AnyError>
where
P: crate::NodePermissions + 'static,
{
let client = if let Some(rid) = client_rid {
let r = state.resource_table.get::<HttpClientResource>(rid)?;
r.client.clone()
@ -42,6 +45,11 @@ pub fn op_node_http_request(
let method = Method::from_bytes(&method)?;
let url = Url::parse(&url)?;
{
let permissions = state.borrow_mut::<P>();
permissions.check_net_url(&url, "ClientRequest")?;
}
let mut header_map = HeaderMap::new();
for (key, value) in headers {
let name = HeaderName::from_bytes(&key)

View file

@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
const core = globalThis.__bootstrap.core;
import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs";
import assert from "ext:deno_node/internal/assert.mjs";
import EE from "ext:deno_node/events.ts";
@ -137,12 +138,6 @@ export class OutgoingMessage extends Stream {
this._keepAliveTimeout = 0;
this._onPendingData = nop;
this.stream = new ReadableStream({
start: (controller) => {
this.controller = controller;
},
});
}
get writableFinished() {
@ -374,21 +369,30 @@ export class OutgoingMessage extends Stream {
return headers;
}
controller: ReadableStreamDefaultController;
write(
chunk: string | Uint8Array | Buffer,
encoding: string | null,
// TODO(crowlKats): use callback
_callback: () => void,
callback: () => void,
): boolean {
if (typeof chunk === "string") {
chunk = Buffer.from(chunk, encoding);
}
if (chunk instanceof Buffer) {
chunk = new Uint8Array(chunk.buffer);
}
if (
(typeof chunk === "string" && chunk.length > 0) ||
((chunk instanceof Buffer || chunk instanceof Uint8Array) &&
chunk.buffer.byteLength > 0)
) {
if (typeof chunk === "string") {
chunk = Buffer.from(chunk, encoding);
}
if (chunk instanceof Buffer) {
chunk = new Uint8Array(chunk.buffer);
}
this.controller.enqueue(chunk);
core.writeAll(this._bodyWriteRid, chunk).then(() => {
callback?.();
this.emit("drain");
}).catch((e) => {
this._requestSendError = e;
});
}
return false;
}
@ -400,18 +404,8 @@ export class OutgoingMessage extends Stream {
}
// deno-lint-ignore no-explicit-any
end(chunk: any, encoding: any, _callback: any) {
if (typeof chunk === "function") {
callback = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === "function") {
callback = encoding;
encoding = null;
}
// TODO(crowlKats): finish
return this;
end(_chunk: any, _encoding: any, _callback: any) {
notImplemented("OutgoingMessage.end");
}
flushHeaders() {

View file

@ -38,6 +38,7 @@ import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs";
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
import { notImplemented } from "ext:deno_node/_utils.ts";
import {
connResetException,
ERR_HTTP_HEADERS_SENT,
@ -500,6 +501,14 @@ class ClientRequest extends OutgoingMessage {
delete optsWithoutSignal.signal;
}
if (options!.createConnection) {
notImplemented("ClientRequest.options.createConnection");
}
if (options!.lookup) {
notImplemented("ClientRequest.options.lookup");
}
// initiate connection
// TODO(crowlKats): finish this
/*if (this.agent) {
@ -547,61 +556,14 @@ class ClientRequest extends OutgoingMessage {
const client = this._getClient() ?? createHttpClient({ http2: false });
this._client = client;
const req = core.ops.op_node_http_request(
this._req = core.ops.op_node_http_request(
this.method,
url,
headers,
client.rid,
this.method === "POST" || this.method === "PATCH",
);
this._req = req;
if (req.requestBodyRid !== null) {
const reader = this.stream.getReader();
(async () => {
let done = false;
while (!done) {
let val;
try {
const res = await reader.read();
done = res.done;
val = res.value;
} catch (err) {
//if (terminator.aborted) break;
// TODO(lucacasonato): propagate error into response body stream
this._requestSendError = err;
this._requestSendErrorSet = true;
break;
}
if (done) break;
try {
await core.writeAll(req.requestBodyRid, val);
} catch (err) {
//if (terminator.aborted) break;
await reader.cancel(err);
// TODO(lucacasonato): propagate error into response body stream
this._requestSendError = err;
this._requestSendErrorSet = true;
break;
}
}
if (done /*&& !terminator.aborted*/) {
try {
await core.shutdown(req.requestBodyRid);
} catch (err) {
// TODO(bartlomieju): fix this conditional
// deno-lint-ignore no-constant-condition
if (true) {
this._requestSendError = err;
this._requestSendErrorSet = true;
}
}
}
//WeakMapPrototypeDelete(requestBodyReaders, req);
core.tryClose(req.requestBodyRid);
})();
}
this._bodyWriteRid = this._req.requestBodyRid;
}
_getClient(): Deno.HttpClient | undefined {
@ -645,112 +607,92 @@ class ClientRequest extends OutgoingMessage {
}
}
// TODO(bartlomieju): use callback here
// deno-lint-ignore no-explicit-any
end(chunk?: any, encoding?: any, _cb?: any): this {
end(chunk?: any, encoding?: any, cb?: any): this {
this.finished = true;
if (chunk !== undefined) {
if (chunk !== undefined && chunk !== null) {
this.write(chunk, encoding);
}
this.controller.close();
core.opAsync("op_fetch_send", this._req.requestRid).then((res) => {
if (this._timeout) {
this._timeout.onabort = null;
(async () => {
try {
const [res, _] = await Promise.all([
core.opAsync("op_fetch_send", this._req.requestRid),
(async () => {
if (this._bodyWriteRid) {
try {
await core.shutdown(this._bodyWriteRid);
} catch (err) {
this._requestSendError = err;
}
core.tryClose(this._bodyWriteRid);
try {
cb?.();
} catch (_) {
//
}
}
})(),
]);
if (this._timeout) {
this._timeout.onabort = null;
}
this._client.close();
const incoming = new IncomingMessageForClient(this.socket);
// TODO(@crowlKats):
// incoming.httpVersionMajor = versionMajor;
// incoming.httpVersionMinor = versionMinor;
// incoming.httpVersion = `${versionMajor}.${versionMinor}`;
// incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders ||
// parser.joinDuplicateHeaders;
incoming.url = res.url;
incoming.statusCode = res.status;
incoming.statusMessage = res.statusText;
incoming._addHeaderLines(
res.headers,
Object.entries(res.headers).flat().length,
);
incoming._bodyRid = res.responseRid;
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
}
this.emit("response", incoming);
} catch (err) {
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
}
if (this._requestSendError !== undefined) {
// if the request body stream errored, we want to propagate that error
// instead of the original error from opFetchSend
throw new TypeError(
"Failed to fetch: request body stream errored",
{
cause: this._requestSendError,
},
);
}
if (
err.message.includes("connection closed before message completed")
) {
// Node.js seems ignoring this error
} else if (err.message.includes("The signal has been aborted")) {
// Remap this error
this.emit("error", connResetException("socket hang up"));
} else {
this.emit("error", err);
}
}
this._client.close();
const incoming = new IncomingMessageForClient(this.socket);
// TODO(@crowlKats):
// incoming.httpVersionMajor = versionMajor;
// incoming.httpVersionMinor = versionMinor;
// incoming.httpVersion = `${versionMajor}.${versionMinor}`;
// incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders ||
// parser.joinDuplicateHeaders;
incoming.url = res.url;
incoming.statusCode = res.status;
incoming.statusMessage = res.statusText;
incoming._addHeaderLines(
res.headers,
Object.entries(res.headers).flat().length,
);
incoming._bodyRid = res.responseRid;
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
}
this.emit("response", incoming);
}).catch((err) => {
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
}
if (this._requestSendErrorSet) {
// if the request body stream errored, we want to propagate that error
// instead of the original error from opFetchSend
throw new TypeError("Failed to fetch: request body stream errored", {
cause: this._requestSendError,
});
}
if (err.message.includes("connection closed before message completed")) {
// Node.js seems ignoring this error
} else if (err.message.includes("The signal has been aborted")) {
// Remap this error
this.emit("error", connResetException("socket hang up"));
} else {
this.emit("error", err);
}
});
})();
}
/*
override async _final() {
if (this.controller) {
this.controller.close();
}
const body = await this._createBody(this.body, this.opts);
const client = await this._createCustomClient();
const opts = {
body,
method: this.opts.method,
client,
headers: this.opts.headers,
signal: this.opts.signal ?? undefined,
};
const mayResponse = fetch(this._createUrlStrFromOptions(this.opts), opts)
.catch((e) => {
if (e.message.includes("connection closed before message completed")) {
// Node.js seems ignoring this error
} else if (e.message.includes("The signal has been aborted")) {
// Remap this error
this.emit("error", connResetException("socket hang up"));
} else {
this.emit("error", e);
}
return undefined;
});
const res = new IncomingMessageForClient(
await mayResponse,
this._createSocket(),
);
this.emit("response", res);
if (client) {
res.on("end", () => {
client.close();
});
}
if (this.opts.timeout != undefined) {
clearTimeout(this.opts.timeout);
this.opts.timeout = undefined;
}
this.cb?.(res);
}*/
abort() {
if (this.aborted) {

View file

@ -122,6 +122,13 @@ mod startup_snapshot {
}
impl deno_node::NodePermissions for Permissions {
fn check_net_url(
&mut self,
_url: &deno_core::url::Url,
_api_name: &str,
) -> Result<(), deno_core::error::AnyError> {
unreachable!("snapshotting!")
}
fn check_read(&self, _p: &Path) -> Result<(), deno_core::error::AnyError> {
unreachable!("snapshotting!")
}

View file

@ -13,6 +13,7 @@ use deno_core::serde::Deserializer;
use deno_core::serde::Serialize;
use deno_core::serde_json;
use deno_core::url;
use deno_core::url::Url;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
use log;
@ -1871,6 +1872,15 @@ impl PermissionsContainer {
}
impl deno_node::NodePermissions for PermissionsContainer {
#[inline(always)]
fn check_net_url(
&mut self,
url: &Url,
api_name: &str,
) -> Result<(), AnyError> {
self.0.lock().net.check_url(url, Some(api_name))
}
#[inline(always)]
fn check_read(&self, path: &Path) -> Result<(), AnyError> {
self.0.lock().read.check(path, None)

View file

@ -3,7 +3,7 @@
NOTE: This file should not be manually edited. Please edit 'cli/tests/node_compat/config.json' and run 'tools/node_compat/setup.ts' instead.
Total: 2934
Total: 2935
- [abort/test-abort-backtrace.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-backtrace.js)
- [abort/test-abort-fatal-error.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-fatal-error.js)
@ -359,6 +359,7 @@ Total: 2934
- [parallel/test-cli-syntax-eval.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cli-syntax-eval.js)
- [parallel/test-cli-syntax-piped-bad.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cli-syntax-piped-bad.js)
- [parallel/test-cli-syntax-piped-good.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cli-syntax-piped-good.js)
- [parallel/test-client-request-destroy.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-client-request-destroy.js)
- [parallel/test-cluster-accept-fail.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-accept-fail.js)
- [parallel/test-cluster-advanced-serialization.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-advanced-serialization.js)
- [parallel/test-cluster-basic.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-basic.js)