1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-05 13:59:01 -05:00

chore: make race condition more visible by removing op_node_http_wait_for_connection

This commit is contained in:
Satya Rohith 2024-10-17 20:51:58 +05:30
parent 2e017a8b78
commit 8c0f6b02c2
No known key found for this signature in database
GPG key ID: B2705CF40523EB05
6 changed files with 76 additions and 82 deletions

View file

@ -334,7 +334,6 @@ deno_core::extension!(deno_node,
ops::http::op_node_http_fetch_response_upgrade,
ops::http::op_node_http_request_with_conn<P>,
ops::http::op_node_http_await_response,
ops::http::op_node_http_wait_for_connection,
ops::http2::op_http2_connect,
ops::http2::op_http2_poll_client_connection,
ops::http2::op_http2_client_request,

View file

@ -66,17 +66,6 @@ pub struct NodeHttpResponse {
pub error: Option<String>,
}
#[derive(Debug)]
pub struct NodeHttpConnReady {
recv: tokio::sync::oneshot::Receiver<()>,
}
impl deno_core::Resource for NodeHttpConnReady {
fn name(&self) -> Cow<str> {
"nodeHttpConnReady".into()
}
}
pub struct NodeHttpClientResponse {
response:
Pin<Box<dyn Future<Output = Result<Response<Incoming>, Error>> + Send>>,
@ -107,11 +96,11 @@ pub async fn op_node_http_request_with_conn<P>(
#[smi] body: Option<ResourceId>,
#[smi] conn_rid: ResourceId,
encrypted: bool,
) -> Result<(ResourceId, ResourceId), AnyError>
) -> Result<ResourceId, AnyError>
where
P: crate::NodePermissions + 'static,
{
let (_handle, mut sender, receiver) = if encrypted {
let (_handle, mut sender) = if encrypted {
let resource_rc = state
.borrow_mut()
.resource_table
@ -123,18 +112,12 @@ where
let tcp_stream = read_half.unsplit(write_half);
let io = TokioIo::new(tcp_stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
let (notify, receiver) = tokio::sync::oneshot::channel::<()>();
// Spawn a task to poll the connection, driving the HTTP state
(
tokio::task::spawn(async move {
let _ = notify.send(());
conn.await?;
Ok::<_, AnyError>(())
}),
sender,
receiver,
)
} else {
let resource_rc = state
@ -149,17 +132,13 @@ where
let io = TokioIo::new(tcp_stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
let (notify, receiver) = tokio::sync::oneshot::channel::<()>();
// Spawn a task to poll the connection, driving the HTTP state
(
tokio::task::spawn(async move {
let _ = notify.send(());
conn.await?;
Ok::<_, AnyError>(())
}),
sender,
receiver,
)
};
@ -236,28 +215,7 @@ where
response: res,
url: url.clone(),
});
let conn_rid = state
.borrow_mut()
.resource_table
.add(NodeHttpConnReady { recv: receiver });
Ok((rid, conn_rid))
}
#[op2(async)]
#[serde]
pub async fn op_node_http_wait_for_connection(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<ResourceId, AnyError> {
let resource = state
.borrow_mut()
.resource_table
.take::<NodeHttpConnReady>(rid)?;
let resource =
Rc::try_unwrap(resource).map_err(|_| bad_resource("NodeHttpConnReady"));
let resource = resource?;
resource.recv.await?;
Ok(rid)
}

View file

@ -68,7 +68,7 @@ export function OutgoingMessage() {
// Queue that holds all currently pending data, until the response will be
// assigned to the socket (until it will its turn in the HTTP pipeline).
this.outputData = [];
this.pendingWrites = 0;
// `outputSize` is an approximate measure of how much data is queued on this
// response. `_onPendingData` will be invoked to update similar global
// per-connection counter. That counter will be used to pause/unpause the
@ -520,7 +520,7 @@ Object.defineProperties(
_send(data: any, encoding?: string | null, callback?: () => void) {
// if socket is ready, write the data after headers are written.
// if socket is not ready, buffer data in outputbuffer.
if (this.socket && !this.socket.connecting) {
if (this.socket && !this.socket.connecting && this.pendingWrites === 0) {
if (!this._headerSent) {
this._writeHeader();
this._headerSent = true;
@ -538,38 +538,39 @@ Object.defineProperties(
throw new ERR_METHOD_NOT_IMPLEMENTED("_writeHeader()");
},
async _flushBuffer() {
const outputLength = this.outputData.length;
if (outputLength <= 0 || !this.socket || !this._bodyWriter) {
return undefined;
}
// async _flushBuffer() {
// const outputLength = this.outputData.length;
// if (outputLength <= 0 || !this.socket || !this._bodyWriter) {
// return undefined;
// }
const outputData = this.outputData;
let ret;
// Retain for(;;) loop for performance reasons
// Refs: https://github.com/nodejs/node/pull/30958
for (let i = 0; i < outputLength; i++) {
let { data, encoding, callback } = outputData[i];
if (typeof data === "string") {
data = Buffer.from(data, encoding);
}
if (data instanceof Buffer) {
data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
}
await this._bodyWriter.ready;
ret = await this._bodyWriter.write(data).then(() => {
callback?.();
this.emit("drain");
}).catch((e) => {
this._requestSendError = e;
});
}
// const outputData = this.outputData;
// let ret;
// // Retain for(;;) loop for performance reasons
// // Refs: https://github.com/nodejs/node/pull/30958
// for (let i = 0; i < outputLength; i++) {
// let { data, encoding, callback } = outputData[i];
// if (typeof data === "string") {
// data = Buffer.from(data, encoding);
// }
// if (data instanceof Buffer) {
// data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
// }
// await this._bodyWriter.ready;
// ret = await this._bodyWriter.write(data).then(() => {
// callback?.();
// this.emit("drain");
// }).catch((e) => {
// this._requestSendError = e;
// });
// }
this.outputData = [];
this.outputSize = 0;
// this.outputData = [];
// this.pendingWrites = 0;
// this.outputSize = 0;
return ret;
},
// return ret;
// },
_writeRaw(
// deno-lint-ignore no-explicit-any
@ -577,6 +578,15 @@ Object.defineProperties(
encoding?: string | null,
callback?: () => void,
) {
this.pendingWrites = this.pendingWrites + 1;
const time = Date.now();
console.trace(
"write invoked:",
time.toString().slice(-6),
"pending",
this.pendingWrites,
this.outputData.length,
);
if (typeof data === "string") {
data = Buffer.from(data, encoding);
}
@ -588,8 +598,23 @@ Object.defineProperties(
if (this._bodyWriter.desiredSize > 0) {
this._bodyWriter.write(data).then(() => {
callback?.();
this.pendingWrites = this.pendingWrites - 1;
console.log(
"write done:",
time.toString().slice(-6),
"pending",
this.pendingWrites,
this.outputData.length,
);
this.emit("drain");
}).catch((e) => {
console.log(
"write error:",
time.toString().slice(-6),
"pending",
this.pendingWrites,
this.outputData.length,
);
this._requestSendError = e;
});
}

View file

@ -2359,7 +2359,7 @@ var require_readable = __commonJS({
return readableAddChunk(this, chunk, encoding, true);
};
function readableAddChunk(stream, chunk, encoding, addToFront) {
debug("readableAddChunk", chunk);
console.trace("readableAddChunk", chunk ? new TextDecoder().decode(chunk.slice(-20)) : chunk);
const state = stream._readableState;
let err;
if (!state.objectMode) {

View file

@ -8,7 +8,6 @@ import {
op_node_http_await_response,
op_node_http_fetch_response_upgrade,
op_node_http_request_with_conn,
op_node_http_wait_for_connection,
op_tls_start,
} from "ext:core/ops";
@ -460,7 +459,7 @@ class ClientRequest extends OutgoingMessage {
alpnProtocols: ["http/1.0", "http/1.1"],
});
}
const [rid, connRid] = await op_node_http_request_with_conn(
const rid = await op_node_http_request_with_conn(
this.method,
url,
headers,
@ -468,7 +467,6 @@ class ClientRequest extends OutgoingMessage {
baseConnRid,
this._encrypted,
);
await op_node_http_wait_for_connection(connRid);
// Emit request ready to let the request body to be written.
this.emit("requestReady");
const res = await op_node_http_await_response(rid);
@ -623,6 +621,7 @@ class ClientRequest extends OutgoingMessage {
// sets up the request.
this._flushHeaders();
this.once("requestReady", () => {
console.log("flushing Body");
this._flushBody();
});
});
@ -663,6 +662,12 @@ class ClientRequest extends OutgoingMessage {
}
const finish = async () => {
try {
console.log(
"finish(): outputData:",
this.outputData.length,
"pendingWrites",
this.pendingWrites,
);
await this._bodyWriter.ready;
await this._bodyWriter?.close();
} catch {
@ -678,11 +683,14 @@ class ClientRequest extends OutgoingMessage {
}
};
if (this.socket && this._bodyWriter && this.outputData.length === 0) {
console.log("pendingWrites", this.pendingWrites, "outputData", this.outputData.length);
if (this.socket && this._bodyWriter && this.pendingWrites === 0) {
finish();
} else {
console.log("setting drain event", this.pendingWrites, "outputData", this.outputData.length);
this.on("drain", () => {
if (this.outputData.length === 0) {
console.log("drain event", this.pendingWrites);
if (this.pendingWrites === 0) {
finish();
}
});

View file

@ -282,6 +282,10 @@ impl BoundedBufferChannelInner {
}
pub fn close(&mut self) {
eprintln!(
"stream_resource: close() len={} current_size={} closed={}",
self.len, self.current_size, self.closed
);
self.closed = true;
// Wake up reads and writes, since they'll both be able to proceed forever now
if let Some(waker) = self.write_waker.take() {