mirror of
https://github.com/denoland/deno.git
synced 2025-01-06 22:35:51 -05:00
clean up debug logs
This commit is contained in:
parent
be8c90de6f
commit
9c1b39ba2d
10 changed files with 8 additions and 194 deletions
|
@ -100,12 +100,10 @@ impl Resource for TcpStreamResource {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
|
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
|
||||||
eprintln!("shutdown TcpStreamResource");
|
|
||||||
Box::pin(self.shutdown())
|
Box::pin(self.shutdown())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn close(self: Rc<Self>) {
|
fn close(self: Rc<Self>) {
|
||||||
eprintln!("close TcpStreamResource");
|
|
||||||
self.cancel_read_ops();
|
self.cancel_read_ops();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -328,7 +328,6 @@ where
|
||||||
let rid = state_
|
let rid = state_
|
||||||
.resource_table
|
.resource_table
|
||||||
.add(TcpStreamResource::new(tcp_stream.into_split()));
|
.add(TcpStreamResource::new(tcp_stream.into_split()));
|
||||||
eprintln!("adding TcpStreamResource in op_net_connect_tcp: {rid}");
|
|
||||||
|
|
||||||
Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr)))
|
Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr)))
|
||||||
}
|
}
|
||||||
|
@ -655,7 +654,6 @@ pub fn op_set_nodelay_inner(
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
let resource: Rc<TcpStreamResource> =
|
let resource: Rc<TcpStreamResource> =
|
||||||
state.resource_table.get::<TcpStreamResource>(rid)?;
|
state.resource_table.get::<TcpStreamResource>(rid)?;
|
||||||
println!("setting nodelay (rid: {rid}): {nodelay}");
|
|
||||||
resource.set_nodelay(nodelay)
|
resource.set_nodelay(nodelay)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -676,7 +674,6 @@ pub fn op_set_keepalive_inner(
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
let resource: Rc<TcpStreamResource> =
|
let resource: Rc<TcpStreamResource> =
|
||||||
state.resource_table.get::<TcpStreamResource>(rid)?;
|
state.resource_table.get::<TcpStreamResource>(rid)?;
|
||||||
eprintln!("setting keepalive (rid: {rid}): {keepalive}");
|
|
||||||
resource.set_keepalive(keepalive)
|
resource.set_keepalive(keepalive)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -293,7 +293,6 @@ where
|
||||||
.borrow::<DefaultTlsOptions>()
|
.borrow::<DefaultTlsOptions>()
|
||||||
.root_cert_store()?;
|
.root_cert_store()?;
|
||||||
|
|
||||||
println!("used in op_tls_start");
|
|
||||||
let resource_rc = state
|
let resource_rc = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
|
|
|
@ -329,7 +329,6 @@ pub fn take_network_stream_resource(
|
||||||
// The stream we're attempting to unwrap may be in use somewhere else. If that's the case, we cannot proceed
|
// The stream we're attempting to unwrap may be in use somewhere else. If that's the case, we cannot proceed
|
||||||
// with the process of unwrapping this connection, so we just return a bad resource error.
|
// with the process of unwrapping this connection, so we just return a bad resource error.
|
||||||
// See also: https://github.com/denoland/deno/pull/16242
|
// See also: https://github.com/denoland/deno/pull/16242
|
||||||
println!("used in take_network_stream_resource");
|
|
||||||
if let Ok(resource_rc) = resource_table.take::<TcpStreamResource>(stream_rid)
|
if let Ok(resource_rc) = resource_table.take::<TcpStreamResource>(stream_rid)
|
||||||
{
|
{
|
||||||
// This TCP connection might be used somewhere else.
|
// This TCP connection might be used somewhere else.
|
||||||
|
|
|
@ -107,15 +107,8 @@ where
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.take::<TcpStreamResource>(conn_rid)?;
|
.take::<TcpStreamResource>(conn_rid)?;
|
||||||
eprintln!(
|
let resource = Rc::try_unwrap(resource_rc)
|
||||||
"rc: strong_count: {strong_count} weak_count: {weak_count}",
|
.map_err(|e| bad_resource("TCP stream is currently in use"))?;
|
||||||
strong_count = Rc::strong_count(&resource_rc),
|
|
||||||
weak_count = Rc::weak_count(&resource_rc)
|
|
||||||
);
|
|
||||||
let resource = Rc::try_unwrap(resource_rc).map_err(|e| {
|
|
||||||
eprintln!("error: {:?}", e);
|
|
||||||
bad_resource("TCP stream is currently in use")
|
|
||||||
})?;
|
|
||||||
let (read_half, write_half) = resource.into_inner();
|
let (read_half, write_half) = resource.into_inner();
|
||||||
let tcp_stream = read_half.reunite(write_half)?;
|
let tcp_stream = read_half.reunite(write_half)?;
|
||||||
let io = TokioIo::new(tcp_stream);
|
let io = TokioIo::new(tcp_stream);
|
||||||
|
@ -128,11 +121,9 @@ where
|
||||||
|
|
||||||
// Spawn a task to poll the connection, driving the HTTP state
|
// Spawn a task to poll the connection, driving the HTTP state
|
||||||
let _handle = tokio::task::spawn(async move {
|
let _handle = tokio::task::spawn(async move {
|
||||||
eprintln!("rs: connection started");
|
|
||||||
conn_start.store(true, std::sync::atomic::Ordering::Relaxed);
|
conn_start.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
let _ = notify.send(());
|
let _ = notify.send(());
|
||||||
conn.await?;
|
conn.await?;
|
||||||
eprintln!("rs: connection completed");
|
|
||||||
Ok::<_, AnyError>(())
|
Ok::<_, AnyError>(())
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -199,9 +190,6 @@ where
|
||||||
request.headers_mut().insert(CONTENT_LENGTH, len.into());
|
request.headers_mut().insert(CONTENT_LENGTH, len.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
eprintln!("rs: sending request: {request:?}");
|
|
||||||
// let req_fut = sender.send_request(request);
|
|
||||||
// let res = tokio::time::timeout(Duration::from_secs(10), req_fut).await??;
|
|
||||||
let res = sender.send_request(request).map_err(Error::from).boxed();
|
let res = sender.send_request(request).map_err(Error::from).boxed();
|
||||||
let rid = state
|
let rid = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
|
@ -248,15 +236,7 @@ pub async fn op_node_http_await_response(
|
||||||
let resource = Rc::try_unwrap(resource)
|
let resource = Rc::try_unwrap(resource)
|
||||||
.map_err(|_| bad_resource("NodeHttpClientResponse"))?;
|
.map_err(|_| bad_resource("NodeHttpClientResponse"))?;
|
||||||
|
|
||||||
eprintln!(
|
|
||||||
"rs: awaiting response: {}",
|
|
||||||
resource
|
|
||||||
.connection_started
|
|
||||||
.load(std::sync::atomic::Ordering::Relaxed)
|
|
||||||
);
|
|
||||||
let res = resource.response.await?;
|
let res = resource.response.await?;
|
||||||
// let res = tokio::time::timeout(Duration::from_secs(10), req_fut).await??;
|
|
||||||
eprintln!("rs: received response");
|
|
||||||
|
|
||||||
let status = res.status();
|
let status = res.status();
|
||||||
let mut res_headers = Vec::new();
|
let mut res_headers = Vec::new();
|
||||||
|
@ -280,7 +260,6 @@ pub async fn op_node_http_await_response(
|
||||||
let body = body.boxed();
|
let body = body.boxed();
|
||||||
|
|
||||||
let res = http::Response::from_parts(parts, body);
|
let res = http::Response::from_parts(parts, body);
|
||||||
println!("res: {res:?}");
|
|
||||||
|
|
||||||
let response_rid = state
|
let response_rid = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
|
@ -556,7 +535,6 @@ impl Stream for NodeHttpResourceToBodyAdapter {
|
||||||
Poll::Ready(res) => match res {
|
Poll::Ready(res) => match res {
|
||||||
Ok(buf) if buf.is_empty() => Poll::Ready(None),
|
Ok(buf) if buf.is_empty() => Poll::Ready(None),
|
||||||
Ok(buf) => {
|
Ok(buf) => {
|
||||||
println!("rs: reading: {len}", len = buf.len());
|
|
||||||
this.1 = Some(this.0.clone().read(64 * 1024));
|
this.1 = Some(this.0.clone().read(64 * 1024));
|
||||||
Poll::Ready(Some(Ok(buf.to_vec().into())))
|
Poll::Ready(Some(Ok(buf.to_vec().into())))
|
||||||
}
|
}
|
||||||
|
|
|
@ -326,7 +326,6 @@ Agent.prototype.createSocket = function createSocket(req, options, cb) {
|
||||||
});
|
});
|
||||||
|
|
||||||
const newSocket = this.createConnection(options, oncreate);
|
const newSocket = this.createConnection(options, oncreate);
|
||||||
console.log("agent: creating a connection", newSocket._handle.rid);
|
|
||||||
if (newSocket) {
|
if (newSocket) {
|
||||||
oncreate(null, newSocket);
|
oncreate(null, newSocket);
|
||||||
}
|
}
|
||||||
|
|
|
@ -498,11 +498,9 @@ Object.defineProperties(
|
||||||
const outputLength = this.outputData.length;
|
const outputLength = this.outputData.length;
|
||||||
if (socket && outputLength > 0) {
|
if (socket && outputLength > 0) {
|
||||||
const { data, encoding, callback } = this.outputData.shift();
|
const { data, encoding, callback } = this.outputData.shift();
|
||||||
console.log("flushBody: writing", { data });
|
|
||||||
this._writeRaw(data, encoding, callback);
|
this._writeRaw(data, encoding, callback);
|
||||||
if (this.outputData.length > 0) {
|
if (this.outputData.length > 0) {
|
||||||
this.on("drain", () => {
|
this.on("drain", () => {
|
||||||
console.log("drain emitted");
|
|
||||||
this._flushBody();
|
this._flushBody();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -512,19 +510,13 @@ Object.defineProperties(
|
||||||
/** Right after socket is ready, we need to writeHeader() to setup the request and
|
/** Right after socket is ready, we need to writeHeader() to setup the request and
|
||||||
* client. This is invoked by onSocket(). */
|
* client. This is invoked by onSocket(). */
|
||||||
_flushHeaders() {
|
_flushHeaders() {
|
||||||
console.log("flushHeaders");
|
|
||||||
if (this.socket) {
|
if (this.socket) {
|
||||||
console.log("socket found: ", {
|
|
||||||
headerSent: this._headerSent,
|
|
||||||
header: this._header,
|
|
||||||
});
|
|
||||||
if (!this._headerSent) {
|
if (!this._headerSent) {
|
||||||
console.log("_writeHeader invoked");
|
|
||||||
this._writeHeader();
|
this._writeHeader();
|
||||||
this._headerSent = true;
|
this._headerSent = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
console.log("socket not found");
|
console.warn("socket not found");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -533,32 +525,15 @@ Object.defineProperties(
|
||||||
// if socket is ready, write the data after headers are written.
|
// if socket is ready, write the data after headers are written.
|
||||||
// if socket is not ready, buffer data in outputbuffer.
|
// if socket is not ready, buffer data in outputbuffer.
|
||||||
if (this.socket && !this.socket.connecting) {
|
if (this.socket && !this.socket.connecting) {
|
||||||
console.log("_send(): im never invoked");
|
|
||||||
if (!this._headerSent && this._header !== null) {
|
if (!this._headerSent && this._header !== null) {
|
||||||
this._writeHeader();
|
this._writeHeader();
|
||||||
this._headerSent = true;
|
this._headerSent = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this._headerSent) {
|
if (this._headerSent) {
|
||||||
console.log("_send(): writeRaw", data, encoding, callback);
|
|
||||||
return this._writeRaw(data, encoding, callback);
|
return this._writeRaw(data, encoding, callback);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// if (!this._listenerSet) {
|
|
||||||
// this._listenerSet = true;
|
|
||||||
// this.on("socket", (socket) => {
|
|
||||||
// console.log("socket rid:", socket._handle.rid);
|
|
||||||
// socket.on("ready", () => {
|
|
||||||
// if (!this._headerSent && this._header !== null) {
|
|
||||||
// this._writeHeader();
|
|
||||||
// this._headerSent = true;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// this._flushBuffer();
|
|
||||||
// });
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
console.log("_send(): pushing to outputData:", this.outputData.length);
|
|
||||||
this.outputData.push({ data, encoding, callback });
|
this.outputData.push({ data, encoding, callback });
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -591,7 +566,6 @@ Object.defineProperties(
|
||||||
}).catch((e) => {
|
}).catch((e) => {
|
||||||
this._requestSendError = e;
|
this._requestSendError = e;
|
||||||
});
|
});
|
||||||
console.log("flushing data:", data, ret);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.outputData = [];
|
this.outputData = [];
|
||||||
|
@ -606,7 +580,6 @@ Object.defineProperties(
|
||||||
encoding?: string | null,
|
encoding?: string | null,
|
||||||
callback?: () => void,
|
callback?: () => void,
|
||||||
) {
|
) {
|
||||||
console.trace("_writeRaw invoked:", data.length);
|
|
||||||
if (typeof data === "string") {
|
if (typeof data === "string") {
|
||||||
data = Buffer.from(data, encoding);
|
data = Buffer.from(data, encoding);
|
||||||
}
|
}
|
||||||
|
@ -614,20 +587,11 @@ Object.defineProperties(
|
||||||
data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
|
data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
|
||||||
}
|
}
|
||||||
if (data.buffer.byteLength > 0) {
|
if (data.buffer.byteLength > 0) {
|
||||||
console.log(
|
|
||||||
"writing to",
|
|
||||||
this._bodyWriteRid,
|
|
||||||
"desired size",
|
|
||||||
this._bodyWriter.desiredSize,
|
|
||||||
"writer",
|
|
||||||
this._bodyWriter,
|
|
||||||
);
|
|
||||||
this._bodyWriter.ready.then(() => {
|
this._bodyWriter.ready.then(() => {
|
||||||
if (this._bodyWriter.desiredSize > 0) {
|
if (this._bodyWriter.desiredSize > 0) {
|
||||||
this._bodyWriter.write(data).then(() => {
|
this._bodyWriter.write(data).then(() => {
|
||||||
callback?.();
|
callback?.();
|
||||||
if (this.outputData.length == 0) {
|
if (this.outputData.length == 0) {
|
||||||
console.log("emitting finish for", { data });
|
|
||||||
this.emit("finish");
|
this.emit("finish");
|
||||||
}
|
}
|
||||||
this.emit("drain");
|
this.emit("drain");
|
||||||
|
|
|
@ -50,7 +50,6 @@ import { Agent, globalAgent } from "node:_http_agent";
|
||||||
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
|
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
|
||||||
import { kEmptyObject, once } from "ext:deno_node/internal/util.mjs";
|
import { kEmptyObject, once } from "ext:deno_node/internal/util.mjs";
|
||||||
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
|
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
|
||||||
// import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
|
|
||||||
import { notImplemented } from "ext:deno_node/_utils.ts";
|
import { notImplemented } from "ext:deno_node/_utils.ts";
|
||||||
|
|
||||||
import {
|
import {
|
||||||
|
@ -64,10 +63,7 @@ import {
|
||||||
} from "ext:deno_node/internal/errors.ts";
|
} from "ext:deno_node/internal/errors.ts";
|
||||||
import { getTimerDuration } from "ext:deno_node/internal/timers.mjs";
|
import { getTimerDuration } from "ext:deno_node/internal/timers.mjs";
|
||||||
import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.ts";
|
import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.ts";
|
||||||
import { createHttpClient } from "ext:deno_fetch/22_http_client.js";
|
|
||||||
import { headersEntries } from "ext:deno_fetch/20_headers.js";
|
import { headersEntries } from "ext:deno_fetch/20_headers.js";
|
||||||
// import { timerId } from "ext:deno_web/03_abort_signal.js";
|
|
||||||
// import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js";
|
|
||||||
import { resourceForReadableStream } from "ext:deno_web/06_streams.js";
|
import { resourceForReadableStream } from "ext:deno_web/06_streams.js";
|
||||||
import { TcpConn } from "ext:deno_net/01_net.js";
|
import { TcpConn } from "ext:deno_net/01_net.js";
|
||||||
import { STATUS_CODES } from "node:_http_server";
|
import { STATUS_CODES } from "node:_http_server";
|
||||||
|
@ -388,7 +384,6 @@ class ClientRequest extends OutgoingMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.agent) {
|
if (this.agent) {
|
||||||
console.log("use this.agent");
|
|
||||||
this.agent.addRequest(this, optsWithoutSignal);
|
this.agent.addRequest(this, optsWithoutSignal);
|
||||||
} else {
|
} else {
|
||||||
// No agent, default to Connection:close.
|
// No agent, default to Connection:close.
|
||||||
|
@ -416,29 +411,13 @@ class ClientRequest extends OutgoingMessage {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
debug("CLIENT use net.createConnection", optsWithoutSignal);
|
debug("CLIENT use net.createConnection", optsWithoutSignal);
|
||||||
console.log("use net.createConnection");
|
|
||||||
this.onSocket(netCreateConnection(optsWithoutSignal));
|
this.onSocket(netCreateConnection(optsWithoutSignal));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_writeHeader() {
|
_writeHeader() {
|
||||||
console.trace("_writeHeader invoked am i working?");
|
|
||||||
const url = this._createUrlStrFromOptions();
|
const url = this._createUrlStrFromOptions();
|
||||||
console.log({ url });
|
|
||||||
// const headers = [];
|
|
||||||
// for (const key in this[kOutHeaders]) {
|
|
||||||
// if (Object.hasOwn(this[kOutHeaders], key)) {
|
|
||||||
// const entry = this[kOutHeaders][key];
|
|
||||||
// console.log("processing header");
|
|
||||||
// this._processHeader(headers, entry[0], entry[1], false);
|
|
||||||
// console.log("processing header done");
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// console.log("im here", { headers });
|
|
||||||
// const client = this._getClient() ?? createHttpClient({ http2: false });
|
|
||||||
// this._client = client;
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
this.method === "POST" || this.method === "PATCH" || this.method === "PUT"
|
this.method === "POST" || this.method === "PATCH" || this.method === "PUT"
|
||||||
|
@ -455,18 +434,8 @@ class ClientRequest extends OutgoingMessage {
|
||||||
this._bodyWriteRid = resourceForReadableStream(readable);
|
this._bodyWriteRid = resourceForReadableStream(readable);
|
||||||
}
|
}
|
||||||
|
|
||||||
// this._req = op_node_http_request(
|
|
||||||
// this.method,
|
|
||||||
// url,
|
|
||||||
// headers,
|
|
||||||
// client[internalRidSymbol],
|
|
||||||
// this._bodyWriteRid,
|
|
||||||
// );
|
|
||||||
|
|
||||||
(async () => {
|
(async () => {
|
||||||
try {
|
try {
|
||||||
console.trace("js: sending request", this.socket.rid);
|
|
||||||
// console.log("this.socket", this.socket);
|
|
||||||
const [rid, connRid] = await op_node_http_request_with_conn(
|
const [rid, connRid] = await op_node_http_request_with_conn(
|
||||||
this.method,
|
this.method,
|
||||||
url,
|
url,
|
||||||
|
@ -474,21 +443,10 @@ class ClientRequest extends OutgoingMessage {
|
||||||
this._bodyWriteRid,
|
this._bodyWriteRid,
|
||||||
this.socket.rid,
|
this.socket.rid,
|
||||||
);
|
);
|
||||||
console.log("js: request sent", { rid, connRid });
|
|
||||||
// Emit request ready to let the request body to be written.
|
// Emit request ready to let the request body to be written.
|
||||||
await op_node_http_wait_for_connection(connRid);
|
await op_node_http_wait_for_connection(connRid);
|
||||||
this.emit("requestReady");
|
this.emit("requestReady");
|
||||||
const res = await op_node_http_await_response(rid);
|
const res = await op_node_http_await_response(rid);
|
||||||
console.log({ status: res.status });
|
|
||||||
// if (this._req.cancelHandleRid !== null) {
|
|
||||||
// core.tryClose(this._req.cancelHandleRid);
|
|
||||||
// }
|
|
||||||
// if (this._timeout) {
|
|
||||||
// this._timeout.removeEventListener("abort", this._timeoutCb);
|
|
||||||
// webClearTimeout(this._timeout[timerId]);
|
|
||||||
// }
|
|
||||||
// this._client.close();
|
|
||||||
console.log("IncomingMessageForClient constructed");
|
|
||||||
const incoming = new IncomingMessageForClient(this.socket);
|
const incoming = new IncomingMessageForClient(this.socket);
|
||||||
incoming.req = this;
|
incoming.req = this;
|
||||||
this.res = incoming;
|
this.res = incoming;
|
||||||
|
@ -558,18 +516,10 @@ class ClientRequest extends OutgoingMessage {
|
||||||
this._closed = true;
|
this._closed = true;
|
||||||
this.emit("close");
|
this.emit("close");
|
||||||
} else {
|
} else {
|
||||||
console.log("emitting response");
|
incoming._bodyRid = res.responseRid;
|
||||||
{
|
|
||||||
console.log("_bodyRid set", res.responseRid);
|
|
||||||
incoming._bodyRid = res.responseRid;
|
|
||||||
}
|
|
||||||
this.emit("response", incoming);
|
this.emit("response", incoming);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// if (this._req.cancelHandleRid !== null) {
|
|
||||||
// core.tryClose(this._req.cancelHandleRid);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (this._requestSendError !== undefined) {
|
if (this._requestSendError !== undefined) {
|
||||||
// if the request body stream errored, we want to propagate that error
|
// if the request body stream errored, we want to propagate that error
|
||||||
// instead of the original error from opFetchSend
|
// instead of the original error from opFetchSend
|
||||||
|
@ -613,19 +563,13 @@ class ClientRequest extends OutgoingMessage {
|
||||||
onSocket(socket, _err) {
|
onSocket(socket, _err) {
|
||||||
nextTick(() => {
|
nextTick(() => {
|
||||||
socket.on("connect", () => {
|
socket.on("connect", () => {
|
||||||
console.log("connect emitted");
|
|
||||||
// Flush the internal buffers once socket is ready.
|
// Flush the internal buffers once socket is ready.
|
||||||
// Note: the order is important, as the headers flush
|
// Note: the order is important, as the headers flush
|
||||||
// sets up the request.
|
// sets up the request.
|
||||||
try {
|
this._flushHeaders();
|
||||||
this._flushHeaders();
|
this.on("requestReady", () => {
|
||||||
this.on("requestReady", () => {
|
this._flushBody();
|
||||||
console.log("onSocket: flushing body");
|
});
|
||||||
this._flushBody();
|
|
||||||
});
|
|
||||||
} catch (error) {
|
|
||||||
console.log("socket error: ", error);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
this.emit("socket", socket);
|
this.emit("socket", socket);
|
||||||
|
@ -634,13 +578,8 @@ class ClientRequest extends OutgoingMessage {
|
||||||
|
|
||||||
// deno-lint-ignore no-explicit-any
|
// deno-lint-ignore no-explicit-any
|
||||||
end(chunk?: any, encoding?: any, cb?: any): this {
|
end(chunk?: any, encoding?: any, cb?: any): this {
|
||||||
console.log("end(): invoked");
|
|
||||||
this.on("drain", () => {
|
|
||||||
console.log("drain emitted");
|
|
||||||
});
|
|
||||||
// Do nothing if request is already destroyed.
|
// Do nothing if request is already destroyed.
|
||||||
if (this.destroyed) return this;
|
if (this.destroyed) return this;
|
||||||
console.log("end(): not destroyed");
|
|
||||||
|
|
||||||
if (typeof chunk === "function") {
|
if (typeof chunk === "function") {
|
||||||
cb = chunk;
|
cb = chunk;
|
||||||
|
@ -653,18 +592,10 @@ class ClientRequest extends OutgoingMessage {
|
||||||
|
|
||||||
this.finished = true;
|
this.finished = true;
|
||||||
if (chunk) {
|
if (chunk) {
|
||||||
console.log("end(): writing chunk", chunk);
|
|
||||||
this.write_(chunk, encoding, null, true);
|
this.write_(chunk, encoding, null, true);
|
||||||
} else if (!this._headerSent) {
|
} else if (!this._headerSent) {
|
||||||
if (this.socket && !this.socket.connecting) {
|
if (this.socket && !this.socket.connecting) {
|
||||||
console.log("end(): socket created and sending implicit header");
|
|
||||||
this._contentLength = 0;
|
this._contentLength = 0;
|
||||||
console.log(
|
|
||||||
"end(): _implicitHeader; socket.rid",
|
|
||||||
this.socket.rid,
|
|
||||||
"socket.connecting",
|
|
||||||
this.socket.connecting,
|
|
||||||
);
|
|
||||||
this._implicitHeader();
|
this._implicitHeader();
|
||||||
this._send("", "latin1");
|
this._send("", "latin1");
|
||||||
} else {
|
} else {
|
||||||
|
@ -674,26 +605,9 @@ class ClientRequest extends OutgoingMessage {
|
||||||
if (this.socket && this._bodyWriter) {
|
if (this.socket && this._bodyWriter) {
|
||||||
(async () => {
|
(async () => {
|
||||||
try {
|
try {
|
||||||
// const { promise, resolve } = Promise.withResolvers();
|
|
||||||
// if (this.outputData.length > 0) {
|
|
||||||
// this.on("flushBodyDone", () => {
|
|
||||||
// console.log("end(): flushBody done emitted");
|
|
||||||
// resolve(null);
|
|
||||||
// })
|
|
||||||
// } else {
|
|
||||||
// resolve(null);
|
|
||||||
// }
|
|
||||||
// // sleep for 10s
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 1000));
|
|
||||||
console.log("end(): closing bodyWriter", this._bodyWriter, {
|
|
||||||
buffer: this.outputData.length,
|
|
||||||
});
|
|
||||||
await this._bodyWriter.ready;
|
await this._bodyWriter.ready;
|
||||||
await this._bodyWriter?.close();
|
await this._bodyWriter?.close();
|
||||||
console.log("end(): bodyWriter closed");
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.log("err:", err);
|
|
||||||
console.log("end(): body writer closed", err);
|
|
||||||
// The readable stream resource is dropped right after
|
// The readable stream resource is dropped right after
|
||||||
// read is complete closing the writable stream resource.
|
// read is complete closing the writable stream resource.
|
||||||
// If we try to close the writer again, it will result in an
|
// If we try to close the writer again, it will result in an
|
||||||
|
@ -709,17 +623,9 @@ class ClientRequest extends OutgoingMessage {
|
||||||
this.on("finish", () => {
|
this.on("finish", () => {
|
||||||
(async () => {
|
(async () => {
|
||||||
try {
|
try {
|
||||||
console.log(
|
|
||||||
"end(): connect() closing bodyWriter",
|
|
||||||
this._bodyWriter,
|
|
||||||
{ buffer: this.outputData.length },
|
|
||||||
);
|
|
||||||
await this._bodyWriter.ready;
|
await this._bodyWriter.ready;
|
||||||
await this._bodyWriter?.close();
|
await this._bodyWriter?.close();
|
||||||
console.log("end(): bodyWriter closed");
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.log("err:", err);
|
|
||||||
console.log("end(): body writer closed", err);
|
|
||||||
// The readable stream resource is dropped right after
|
// The readable stream resource is dropped right after
|
||||||
// read is complete closing the writable stream resource.
|
// read is complete closing the writable stream resource.
|
||||||
// If we try to close the writer again, it will result in an
|
// If we try to close the writer again, it will result in an
|
||||||
|
@ -743,7 +649,6 @@ class ClientRequest extends OutgoingMessage {
|
||||||
}
|
}
|
||||||
this.aborted = true;
|
this.aborted = true;
|
||||||
this.emit("abort");
|
this.emit("abort");
|
||||||
//process.nextTick(emitAbortNT, this);
|
|
||||||
this.destroy();
|
this.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -779,46 +684,25 @@ class ClientRequest extends OutgoingMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
_createUrlStrFromOptions(): string {
|
_createUrlStrFromOptions(): string {
|
||||||
console.log("_createUrlStrFromOptions");
|
|
||||||
if (this.href) {
|
if (this.href) {
|
||||||
return this.href;
|
return this.href;
|
||||||
}
|
}
|
||||||
console.log("_createUrlStrFromOptions 2");
|
|
||||||
const protocol = this.protocol ?? this.defaultProtocol;
|
const protocol = this.protocol ?? this.defaultProtocol;
|
||||||
console.log("_createUrlStrFromOptions 3");
|
|
||||||
const auth = this.auth;
|
const auth = this.auth;
|
||||||
console.log("_createUrlStrFromOptions 4");
|
|
||||||
const host = this.host ?? this.hostname ?? "localhost";
|
const host = this.host ?? this.hostname ?? "localhost";
|
||||||
console.log("_createUrlStrFromOptions 5");
|
|
||||||
const hash = this.hash ? `#${this.hash}` : "";
|
const hash = this.hash ? `#${this.hash}` : "";
|
||||||
console.log("_createUrlStrFromOptions 6");
|
|
||||||
const defaultPort = this.agent?.defaultPort;
|
const defaultPort = this.agent?.defaultPort;
|
||||||
console.log("_createUrlStrFromOptions 7");
|
|
||||||
const port = this.port ?? defaultPort ?? 80;
|
const port = this.port ?? defaultPort ?? 80;
|
||||||
console.log("_createUrlStrFromOptions 8");
|
|
||||||
let path = this.path ?? "/";
|
let path = this.path ?? "/";
|
||||||
if (!path.startsWith("/")) {
|
if (!path.startsWith("/")) {
|
||||||
path = "/" + path;
|
path = "/" + path;
|
||||||
}
|
}
|
||||||
console.log("_createUrlStrFromOptions 9");
|
|
||||||
// try {
|
|
||||||
console.log({
|
|
||||||
url: `${protocol}//${auth ? `${auth}@` : ""}${host}${
|
|
||||||
port === 80 ? "" : `:${port}`
|
|
||||||
}${path}`,
|
|
||||||
});
|
|
||||||
const url = new URL(
|
const url = new URL(
|
||||||
`${protocol}//${auth ? `${auth}@` : ""}${host}${
|
`${protocol}//${auth ? `${auth}@` : ""}${host}${
|
||||||
port === 80 ? "" : `:${port}`
|
port === 80 ? "" : `:${port}`
|
||||||
}${path}`,
|
}${path}`,
|
||||||
);
|
);
|
||||||
console.log(url);
|
|
||||||
// } catch (error) {
|
|
||||||
// console.log({ error })
|
|
||||||
// }
|
|
||||||
console.log("_createUrlStrFromOptions 10");
|
|
||||||
url.hash = hash;
|
url.hash = hash;
|
||||||
console.log("_createUrlStrFromOptions end");
|
|
||||||
return url.href;
|
return url.href;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1040,7 +924,6 @@ export class IncomingMessageForClient extends NodeReadable {
|
||||||
const buf = new Uint8Array(16 * 1024);
|
const buf = new Uint8Array(16 * 1024);
|
||||||
|
|
||||||
core.read(this._bodyRid, buf).then((bytesRead) => {
|
core.read(this._bodyRid, buf).then((bytesRead) => {
|
||||||
console.log(`bytes read from ${this._bodyRid}:`, bytesRead);
|
|
||||||
if (bytesRead === 0) {
|
if (bytesRead === 0) {
|
||||||
this.push(null);
|
this.push(null);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -376,10 +376,8 @@ export class TCP extends ConnectionWrap {
|
||||||
transport: "tcp",
|
transport: "tcp",
|
||||||
};
|
};
|
||||||
|
|
||||||
console.log("deno.connect start");
|
|
||||||
Deno.connect(connectOptions).then(
|
Deno.connect(connectOptions).then(
|
||||||
(conn: Deno.Conn) => {
|
(conn: Deno.Conn) => {
|
||||||
console.log("deno.connect success");
|
|
||||||
// Incorrect / backwards, but correcting the local address and port with
|
// Incorrect / backwards, but correcting the local address and port with
|
||||||
// what was actually used given we can't actually specify these in Deno.
|
// what was actually used given we can't actually specify these in Deno.
|
||||||
const localAddr = conn.localAddr as Deno.NetAddr;
|
const localAddr = conn.localAddr as Deno.NetAddr;
|
||||||
|
|
|
@ -22,7 +22,6 @@ fn op_http_start(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
#[smi] tcp_stream_rid: ResourceId,
|
#[smi] tcp_stream_rid: ResourceId,
|
||||||
) -> Result<ResourceId, AnyError> {
|
) -> Result<ResourceId, AnyError> {
|
||||||
println!("used in op_http_start");
|
|
||||||
if let Ok(resource_rc) = state
|
if let Ok(resource_rc) = state
|
||||||
.resource_table
|
.resource_table
|
||||||
.take::<TcpStreamResource>(tcp_stream_rid)
|
.take::<TcpStreamResource>(tcp_stream_rid)
|
||||||
|
|
Loading…
Reference in a new issue