diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 5db15cca18..7380a13f85 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -334,7 +334,6 @@ deno_core::extension!(deno_node, ops::http::op_node_http_fetch_response_upgrade, ops::http::op_node_http_request_with_conn

, ops::http::op_node_http_await_response, - ops::http::op_node_http_wait_for_connection, ops::http2::op_http2_connect, ops::http2::op_http2_poll_client_connection, ops::http2::op_http2_client_request, diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index 57e0530f40..1524af1a98 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -66,17 +66,6 @@ pub struct NodeHttpResponse { pub error: Option, } -#[derive(Debug)] -pub struct NodeHttpConnReady { - recv: tokio::sync::oneshot::Receiver<()>, -} - -impl deno_core::Resource for NodeHttpConnReady { - fn name(&self) -> Cow { - "nodeHttpConnReady".into() - } -} - pub struct NodeHttpClientResponse { response: Pin, Error>> + Send>>, @@ -107,11 +96,11 @@ pub async fn op_node_http_request_with_conn

( #[smi] body: Option, #[smi] conn_rid: ResourceId, encrypted: bool, -) -> Result<(ResourceId, ResourceId), AnyError> +) -> Result where P: crate::NodePermissions + 'static, { - let (_handle, mut sender, receiver) = if encrypted { + let (_handle, mut sender) = if encrypted { let resource_rc = state .borrow_mut() .resource_table @@ -123,18 +112,12 @@ where let tcp_stream = read_half.unsplit(write_half); let io = TokioIo::new(tcp_stream); let (sender, conn) = hyper::client::conn::http1::handshake(io).await?; - - let (notify, receiver) = tokio::sync::oneshot::channel::<()>(); - - // Spawn a task to poll the connection, driving the HTTP state ( tokio::task::spawn(async move { - let _ = notify.send(()); conn.await?; Ok::<_, AnyError>(()) }), sender, - receiver, ) } else { let resource_rc = state @@ -149,17 +132,13 @@ where let io = TokioIo::new(tcp_stream); let (sender, conn) = hyper::client::conn::http1::handshake(io).await?; - let (notify, receiver) = tokio::sync::oneshot::channel::<()>(); - // Spawn a task to poll the connection, driving the HTTP state ( tokio::task::spawn(async move { - let _ = notify.send(()); conn.await?; Ok::<_, AnyError>(()) }), sender, - receiver, ) }; @@ -236,28 +215,7 @@ where response: res, url: url.clone(), }); - let conn_rid = state - .borrow_mut() - .resource_table - .add(NodeHttpConnReady { recv: receiver }); - Ok((rid, conn_rid)) -} - -#[op2(async)] -#[serde] -pub async fn op_node_http_wait_for_connection( - state: Rc>, - #[smi] rid: ResourceId, -) -> Result { - let resource = state - .borrow_mut() - .resource_table - .take::(rid)?; - let resource = - Rc::try_unwrap(resource).map_err(|_| bad_resource("NodeHttpConnReady")); - let resource = resource?; - resource.recv.await?; Ok(rid) } diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts index b3bbcdb3ff..81fdd8dd70 100644 --- a/ext/node/polyfills/_http_outgoing.ts +++ b/ext/node/polyfills/_http_outgoing.ts @@ -68,7 +68,7 @@ export function OutgoingMessage() { // Queue that holds all currently pending data, until the response will be // assigned to the socket (until it will its turn in the HTTP pipeline). this.outputData = []; - + this.pendingWrites = 0; // `outputSize` is an approximate measure of how much data is queued on this // response. `_onPendingData` will be invoked to update similar global // per-connection counter. That counter will be used to pause/unpause the @@ -520,7 +520,7 @@ Object.defineProperties( _send(data: any, encoding?: string | null, callback?: () => void) { // if socket is ready, write the data after headers are written. // if socket is not ready, buffer data in outputbuffer. - if (this.socket && !this.socket.connecting) { + if (this.socket && !this.socket.connecting && this.pendingWrites === 0) { if (!this._headerSent) { this._writeHeader(); this._headerSent = true; @@ -538,38 +538,39 @@ Object.defineProperties( throw new ERR_METHOD_NOT_IMPLEMENTED("_writeHeader()"); }, - async _flushBuffer() { - const outputLength = this.outputData.length; - if (outputLength <= 0 || !this.socket || !this._bodyWriter) { - return undefined; - } + // async _flushBuffer() { + // const outputLength = this.outputData.length; + // if (outputLength <= 0 || !this.socket || !this._bodyWriter) { + // return undefined; + // } - const outputData = this.outputData; - let ret; - // Retain for(;;) loop for performance reasons - // Refs: https://github.com/nodejs/node/pull/30958 - for (let i = 0; i < outputLength; i++) { - let { data, encoding, callback } = outputData[i]; - if (typeof data === "string") { - data = Buffer.from(data, encoding); - } - if (data instanceof Buffer) { - data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); - } - await this._bodyWriter.ready; - ret = await this._bodyWriter.write(data).then(() => { - callback?.(); - this.emit("drain"); - }).catch((e) => { - this._requestSendError = e; - }); - } + // const outputData = this.outputData; + // let ret; + // // Retain for(;;) loop for performance reasons + // // Refs: https://github.com/nodejs/node/pull/30958 + // for (let i = 0; i < outputLength; i++) { + // let { data, encoding, callback } = outputData[i]; + // if (typeof data === "string") { + // data = Buffer.from(data, encoding); + // } + // if (data instanceof Buffer) { + // data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); + // } + // await this._bodyWriter.ready; + // ret = await this._bodyWriter.write(data).then(() => { + // callback?.(); + // this.emit("drain"); + // }).catch((e) => { + // this._requestSendError = e; + // }); + // } - this.outputData = []; - this.outputSize = 0; + // this.outputData = []; + // this.pendingWrites = 0; + // this.outputSize = 0; - return ret; - }, + // return ret; + // }, _writeRaw( // deno-lint-ignore no-explicit-any @@ -577,6 +578,15 @@ Object.defineProperties( encoding?: string | null, callback?: () => void, ) { + this.pendingWrites = this.pendingWrites + 1; + const time = Date.now(); + console.trace( + "write invoked:", + time.toString().slice(-6), + "pending", + this.pendingWrites, + this.outputData.length, + ); if (typeof data === "string") { data = Buffer.from(data, encoding); } @@ -588,8 +598,23 @@ Object.defineProperties( if (this._bodyWriter.desiredSize > 0) { this._bodyWriter.write(data).then(() => { callback?.(); + this.pendingWrites = this.pendingWrites - 1; + console.log( + "write done:", + time.toString().slice(-6), + "pending", + this.pendingWrites, + this.outputData.length, + ); this.emit("drain"); }).catch((e) => { + console.log( + "write error:", + time.toString().slice(-6), + "pending", + this.pendingWrites, + this.outputData.length, + ); this._requestSendError = e; }); } diff --git a/ext/node/polyfills/_stream.mjs b/ext/node/polyfills/_stream.mjs index 02640abcd9..2df432d6b0 100644 --- a/ext/node/polyfills/_stream.mjs +++ b/ext/node/polyfills/_stream.mjs @@ -2359,7 +2359,7 @@ var require_readable = __commonJS({ return readableAddChunk(this, chunk, encoding, true); }; function readableAddChunk(stream, chunk, encoding, addToFront) { - debug("readableAddChunk", chunk); + console.trace("readableAddChunk", chunk ? new TextDecoder().decode(chunk.slice(-20)) : chunk); const state = stream._readableState; let err; if (!state.objectMode) { diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 15a755feae..0e2a32a6e8 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -8,7 +8,6 @@ import { op_node_http_await_response, op_node_http_fetch_response_upgrade, op_node_http_request_with_conn, - op_node_http_wait_for_connection, op_tls_start, } from "ext:core/ops"; @@ -460,7 +459,7 @@ class ClientRequest extends OutgoingMessage { alpnProtocols: ["http/1.0", "http/1.1"], }); } - const [rid, connRid] = await op_node_http_request_with_conn( + const rid = await op_node_http_request_with_conn( this.method, url, headers, @@ -468,7 +467,6 @@ class ClientRequest extends OutgoingMessage { baseConnRid, this._encrypted, ); - await op_node_http_wait_for_connection(connRid); // Emit request ready to let the request body to be written. this.emit("requestReady"); const res = await op_node_http_await_response(rid); @@ -623,6 +621,7 @@ class ClientRequest extends OutgoingMessage { // sets up the request. this._flushHeaders(); this.once("requestReady", () => { + console.log("flushing Body"); this._flushBody(); }); }); @@ -663,6 +662,12 @@ class ClientRequest extends OutgoingMessage { } const finish = async () => { try { + console.log( + "finish(): outputData:", + this.outputData.length, + "pendingWrites", + this.pendingWrites, + ); await this._bodyWriter.ready; await this._bodyWriter?.close(); } catch { @@ -678,11 +683,14 @@ class ClientRequest extends OutgoingMessage { } }; - if (this.socket && this._bodyWriter && this.outputData.length === 0) { + console.log("pendingWrites", this.pendingWrites, "outputData", this.outputData.length); + if (this.socket && this._bodyWriter && this.pendingWrites === 0) { finish(); } else { + console.log("setting drain event", this.pendingWrites, "outputData", this.outputData.length); this.on("drain", () => { - if (this.outputData.length === 0) { + console.log("drain event", this.pendingWrites); + if (this.pendingWrites === 0) { finish(); } }); diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 78487883b6..7f9540693e 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -282,6 +282,10 @@ impl BoundedBufferChannelInner { } pub fn close(&mut self) { + eprintln!( + "stream_resource: close() len={} current_size={} closed={}", + self.len, self.current_size, self.closed + ); self.closed = true; // Wake up reads and writes, since they'll both be able to proceed forever now if let Some(waker) = self.write_waker.take() {