From 881fe46d3f8c3fdb2ecce9b63fa9ba211fd2a9bf Mon Sep 17 00:00:00 2001 From: Satya Rohith Date: Tue, 15 Oct 2024 16:46:09 +0530 Subject: [PATCH] fix double request initiation --- ext/node/ops/http.rs | 21 +++++- ext/node/polyfills/_http_outgoing.ts | 46 +++++++++--- ext/node/polyfills/http.ts | 106 ++++++++++++++++++--------- 3 files changed, 127 insertions(+), 46 deletions(-) diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index d5bc553805..8462e2a35b 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::cell::RefCell; +use std::fmt::Debug; use std::pin::Pin; use std::rc::Rc; use std::task::Context; @@ -82,6 +83,14 @@ pub struct NodeHttpClientResponse { url: String, } +impl Debug for NodeHttpClientResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NodeHttpClientResponse") + .field("url", &self.url) + .finish() + } +} + impl deno_core::Resource for NodeHttpClientResponse { fn name(&self) -> Cow { "nodeHttpClientResponse".into() @@ -266,11 +275,17 @@ pub async fn op_node_http_await_response( .resource_table .take::(rid)?; let resource = Rc::try_unwrap(resource) - .map_err(|_| bad_resource("NodeHttpClientResponse"))?; + .map_err(|_| bad_resource("NodeHttpClientResponse")); + eprintln!("resource: {resource:?}"); + let resource = resource?; - let res = resource.response.await?; + eprintln!("op_node_http_await_response: awating for res"); + let res = resource.response.await; + eprintln!("op_node_http_await_response: res: {res:?}"); + let res = res?; let status = res.status(); + eprintln!("op_node_http_await_response: {status}"); let mut res_headers = Vec::new(); for (key, val) in res.headers().iter() { res_headers.push((key.as_str().into(), val.as_bytes().into())); @@ -568,7 +583,7 @@ impl Stream for NodeHttpResourceToBodyAdapter { Ok(buf) if buf.is_empty() => Poll::Ready(None), Ok(buf) => { let bytes: Bytes = buf.to_vec().into(); - eprintln!("buf: {:?}", bytes.len()); + eprintln!("buf: {:?}", bytes); this.1 = Some(this.0.clone().read(64 * 1024)); Poll::Ready(Some(Ok(bytes))) } diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts index 7cf7aa911d..8bacf34b7d 100644 --- a/ext/node/polyfills/_http_outgoing.ts +++ b/ext/node/polyfills/_http_outgoing.ts @@ -391,7 +391,11 @@ Object.defineProperties( msg._implicitHeader(); } - return msg._send(chunk, encoding, callback); + try { + return msg._send(chunk, encoding, callback); + } catch (error) { + console.log("error from msg._send()", error); + } }, // deno-lint-ignore no-explicit-any @@ -510,24 +514,36 @@ Object.defineProperties( /** Right after socket is ready, we need to writeHeader() to setup the request and * client. This is invoked by onSocket(). */ _flushHeaders() { - if (this.socket) { - if (!this._headerSent) { - this._writeHeader(); - this._headerSent = true; - } + console.trace("_flushHeaders", { + socket: !!this.socket, + headerSent: this._headerSent, + }); + if (this.socket && !this._headerSent) { + this._headerSent = true; + this._writeHeader(); } else { // deno-lint-ignore no-console console.warn("socket not found"); } + console.trace("_flushHeaders after", { + socket: !!this.socket, + headerSent: this._headerSent, + }); }, // deno-lint-ignore no-explicit-any _send(data: any, encoding?: string | null, callback?: () => void) { - console.trace("send invoked"); + console.trace("send invoked", { + data: new TextDecoder().decode(data.slice(-20, -1)), + }); // if socket is ready, write the data after headers are written. // if socket is not ready, buffer data in outputbuffer. if (this.socket && !this.socket.connecting) { - if (!this._headerSent && this._header !== null) { + console.log("writing headers again:", { + headerSent: this._headerSent, + header: this._header, + }); + if (!this._headerSent) { this._writeHeader(); this._headerSent = true; } @@ -591,9 +607,20 @@ Object.defineProperties( data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); } if (data.buffer.byteLength > 0) { + console.log("waiting for bodyWriter to be ready", { + data: new TextDecoder().decode(data.slice(-20, -1)), + }); this._bodyWriter.ready.then(() => { + console.log("bodyWriter is ready", { + desirezedSize: this._bodyWriter.desiredSize, + }); if (this._bodyWriter.desiredSize > 0) { this._bodyWriter.write(data).then(() => { + console.log("writing done: ", { + last_bytes: new TextDecoder().decode(data.slice(-20, -1)), + length: data.length, + buffer: this.outputData.length, + }); callback?.(); if (this.outputData.length == 0) { this.emit("finish"); @@ -739,7 +766,8 @@ Object.defineProperties( const { header } = state; this._header = header + "\r\n"; - this._headerSent = false; + // console.log("_headerSent set to false"); + // this._headerSent = false; // Wait until the first body chunk, or close(), is sent to flush, // UNLESS we're sending Expect: 100-continue. diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 574d892468..0a827f7378 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -398,6 +398,7 @@ class ClientRequest extends OutgoingMessage { if (typeof optsWithoutSignal.createConnection === "function") { const oncreate = once((err, socket) => { if (err) { + console.log("emitting error", { err }); this.emit("error", err); } else { this.onSocket(socket); @@ -423,6 +424,7 @@ class ClientRequest extends OutgoingMessage { } _writeHeader() { + console.trace("_writeHeader invoked"); const url = this._createUrlStrFromOptions(); const headers = []; @@ -451,7 +453,7 @@ class ClientRequest extends OutgoingMessage { (async () => { try { const parsedUrl = new URL(url); - console.log("starting conn"); + console.trace("starting conn"); let baseConnRid = this.socket.rid; console.log("socket:", baseConnRid); if (this._encrypted) { @@ -463,21 +465,41 @@ class ClientRequest extends OutgoingMessage { alpnProtocols: ["http/1.0", "http/1.1"], }); } - const [rid, connRid] = await op_node_http_request_with_conn( - this.method, - url, - headers, - this._bodyWriteRid, - baseConnRid, - this._encrypted, - ); - console.log("request rid:", rid); + let rid; + let connRid; + try { + console.log( + "sending request with conn", + this._bodyWriteRid, + baseConnRid, + this._encrypted, + ); + [rid, connRid] = await op_node_http_request_with_conn( + this.method, + url, + headers, + this._bodyWriteRid, + baseConnRid, + this._encrypted, + ); + } catch (error) { + console.error("error from request with conn", error); + } // Emit request ready to let the request body to be written. - await op_node_http_wait_for_connection(connRid); + try { + await op_node_http_wait_for_connection(connRid); + } catch (error) { + console.error("error from wait for connection", error); + } console.log("request ready"); this.emit("requestReady"); - const res = await op_node_http_await_response(rid); - console.log("response received"); + let res; + try { + res = await op_node_http_await_response(rid); + console.log("response received", { res }); + } catch (error) { + console.log("error from await response", error); + } const incoming = new IncomingMessageForClient(this.socket); incoming.req = this; this.res = incoming; @@ -516,29 +538,43 @@ class ClientRequest extends OutgoingMessage { if (this.method === "CONNECT") { throw new Error("not implemented CONNECT"); } - - const upgradeRid = await op_node_http_fetch_response_upgrade( - res.responseRid, - ); + let upgradeRid; + try { + upgradeRid = await op_node_http_fetch_response_upgrade( + res.responseRid, + ); + } catch (error) { + console.log("error from fetch response upgrade", error); + } assert(typeof res.remoteAddrIp !== "undefined"); assert(typeof res.remoteAddrIp !== "undefined"); - const conn = new TcpConn( - upgradeRid, - { - transport: "tcp", - hostname: res.remoteAddrIp, - port: res.remoteAddrIp, - }, - // TODO(bartlomieju): figure out actual values - { - transport: "tcp", - hostname: "127.0.0.1", - port: 80, - }, - ); - const socket = new Socket({ - handle: new TCP(constants.SERVER, conn), - }); + let conn; + try { + conn = new TcpConn( + upgradeRid, + { + transport: "tcp", + hostname: res.remoteAddrIp, + port: res.remoteAddrIp, + }, + // TODO(bartlomieju): figure out actual values + { + transport: "tcp", + hostname: "127.0.0.1", + port: 80, + }, + ); + } catch (error) { + console.log("error from new connectin"); + } + let socket; + try { + socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + } catch (error) { + console.log("error from new Socket", error); + } this.upgradeOrConnect = true; @@ -568,8 +604,10 @@ class ClientRequest extends OutgoingMessage { // Node.js seems ignoring this error } else if (err.message.includes("The signal has been aborted")) { // Remap this error + console.log("emitting socket hung up error"); this.emit("error", connResetException("socket hang up")); } else { + console.log("emitting error event", err); this.emit("error", err); } }