1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-05 13:59:01 -05:00

introduce more ops to wait for connection ready

This commit is contained in:
Satya Rohith 2024-09-13 14:22:52 +05:30
parent 821d5a5653
commit 29fe1768a8
No known key found for this signature in database
GPG key ID: B2705CF40523EB05
5 changed files with 134 additions and 58 deletions

View file

@ -353,6 +353,8 @@ deno_core::extension!(deno_node,
ops::zlib::brotli::op_brotli_decompress_stream_end,
ops::http::op_node_http_fetch_response_upgrade,
ops::http::op_node_http_request_with_conn<P>,
ops::http::op_node_http_await_response,
ops::http::op_node_http_wait_for_connection,
ops::http2::op_http2_connect,
ops::http2::op_http2_poll_client_connection,
ops::http2::op_http2_client_request,

View file

@ -4,11 +4,14 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use bytes::Bytes;
use deno_core::anyhow;
use deno_core::anyhow::Error;
use deno_core::error::bad_resource;
use deno_core::error::type_error;
use deno_core::error::AnyError;
@ -17,6 +20,7 @@ use deno_core::futures::Future;
use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::serde::Serialize;
use deno_core::unsync::spawn;
@ -39,8 +43,10 @@ use http::header::HeaderValue;
use http::header::AUTHORIZATION;
use http::header::CONTENT_LENGTH;
use http::Method;
use http::Response;
use http_body_util::BodyExt;
use hyper::body::Frame;
use hyper::body::Incoming;
use hyper_util::rt::TokioIo;
use std::cmp::min;
use tokio::io::AsyncReadExt;
@ -60,6 +66,29 @@ pub struct NodeHttpResponse {
pub error: Option<String>,
}
pub struct NodeHttpConnReady {
recv: tokio::sync::oneshot::Receiver<()>,
}
impl deno_core::Resource for NodeHttpConnReady {
fn name(&self) -> Cow<str> {
"nodeHttpConnReady".into()
}
}
pub struct NodeHttpClientResponse {
response:
Pin<Box<dyn Future<Output = Result<Response<Incoming>, Error>> + Send>>,
url: String,
connection_started: Arc<AtomicBool>,
}
impl deno_core::Resource for NodeHttpClientResponse {
fn name(&self) -> Cow<str> {
"nodeHttpClientResponse".into()
}
}
#[op2(async)]
#[serde]
pub async fn op_node_http_request_with_conn<P>(
@ -69,7 +98,7 @@ pub async fn op_node_http_request_with_conn<P>(
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] body: Option<ResourceId>,
#[smi] conn_rid: ResourceId,
) -> Result<NodeHttpResponse, AnyError>
) -> Result<(ResourceId, ResourceId), AnyError>
where
P: crate::NodePermissions + 'static,
{
@ -85,11 +114,18 @@ where
let io = TokioIo::new(tcp_stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
let connection_started = Arc::new(AtomicBool::new(false));
let conn_start = connection_started.clone();
let (notify, receiver) = tokio::sync::oneshot::channel::<()>();
// Spawn a task to poll the connection, driving the HTTP state
let _handle = tokio::task::spawn(async move {
eprintln!("connection started");
eprintln!("rs: connection started");
conn_start.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = notify.send(());
conn.await?;
eprintln!("connection completed");
eprintln!("rs: connection completed");
Ok::<_, AnyError>(())
});
@ -156,12 +192,64 @@ where
request.headers_mut().insert(CONTENT_LENGTH, len.into());
}
// eprintln!("rs: sending request: {request:?}");
eprintln!("rs: sending request: {request:?}");
// let req_fut = sender.send_request(request);
// let res = tokio::time::timeout(Duration::from_secs(10), req_fut).await??;
let res = sender.send_request(request).await?;
let res = sender.send_request(request).map_err(Error::from).boxed();
let rid = state
.borrow_mut()
.resource_table
.add(NodeHttpClientResponse {
response: res,
url: url.clone(),
connection_started,
});
let conn_rid = state
.borrow_mut()
.resource_table
.add(NodeHttpConnReady { recv: receiver });
Ok((rid, conn_rid))
}
#[op2(async)]
#[serde]
pub async fn op_node_http_wait_for_connection(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<ResourceId, AnyError> {
let resource = state
.borrow_mut()
.resource_table
.take::<NodeHttpConnReady>(rid)?;
let resource =
Rc::try_unwrap(resource).map_err(|_| bad_resource("NodeHttpConnReady"))?;
resource.recv.await?;
Ok(rid)
}
#[op2(async)]
#[serde]
pub async fn op_node_http_await_response(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<NodeHttpResponse, AnyError> {
let resource = state
.borrow_mut()
.resource_table
.take::<NodeHttpClientResponse>(rid)?;
let resource = Rc::try_unwrap(resource)
.map_err(|_| bad_resource("NodeHttpClientResponse"))?;
eprintln!(
"rs: awaiting response: {}",
resource
.connection_started
.load(std::sync::atomic::Ordering::Relaxed)
);
let res = resource.response.await?;
// let res = tokio::time::timeout(Duration::from_secs(10), req_fut).await??;
// eprintln!("rs: received response");
eprintln!("rs: received response");
let status = res.status();
let mut res_headers = Vec::new();
@ -196,7 +284,7 @@ where
status: status.as_u16(),
status_text: status.canonical_reason().unwrap_or("").to_string(),
headers: res_headers,
url,
url: resource.url,
response_rid,
content_length,
remote_addr_ip,
@ -461,6 +549,7 @@ impl Stream for NodeHttpResourceToBodyAdapter {
Poll::Ready(res) => match res {
Ok(buf) if buf.is_empty() => Poll::Ready(None),
Ok(buf) => {
println!("rs: reading: {len}", len = buf.len());
this.1 = Some(this.0.clone().read(64 * 1024));
Poll::Ready(Some(Ok(buf.to_vec().into())))
}

View file

@ -509,30 +509,25 @@ Object.defineProperties(
/** Right after socket is ready, we need to writeHeader() to setup the request and
* client. This is invoked by onSocket(). */
_flushHeaders() {
// console.log("flushHeaders");
console.log("flushHeaders");
if (this.socket) {
if (!this._headerSent && this._header !== null) {
this._writeHeader();
this._headerSent = true;
}
} else {
// console.log("socket not found");
console.log("socket not found");
}
},
// deno-lint-ignore no-explicit-any
_send(data: any, encoding?: string | null, callback?: () => void) {
console.log("writing data:", data, "socket:", this.socket);
if (this.socket) {
console.log("im never invoked");
if (!this._headerSent && this._header !== null) {
this._writeHeader();
this._headerSent = true;
}
if (this._headerSent) {
return this._writeRaw(data, encoding, callback);
}
return this._writeRaw(data, encoding, callback);
} else {
console.log("pushing data to outputData");
this.outputData.push({ data, encoding, callback });
@ -543,39 +538,6 @@ Object.defineProperties(
throw new ERR_METHOD_NOT_IMPLEMENTED("_writeHeader()");
},
async _flushBuffer() {
const outputLength = this.outputData.length;
if (outputLength <= 0 || !this.socket || !this._bodyWriter) {
return undefined;
}
const outputData = this.outputData;
let ret;
// Retain for(;;) loop for performance reasons
// Refs: https://github.com/nodejs/node/pull/30958
for (let i = 0; i < outputLength; i++) {
let { data, encoding, callback } = outputData[i];
if (typeof data === "string") {
data = Buffer.from(data, encoding);
}
if (data instanceof Buffer) {
data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
}
ret = await this._bodyWriter.write(data).then(() => {
callback?.();
this.emit("drain");
}).catch((e) => {
this._requestSendError = e;
});
console.log("flushing data:", data, ret);
}
this.outputData = [];
this.outputSize = 0;
return ret;
},
_writeRaw(
// deno-lint-ignore no-explicit-any
data: any,
@ -590,11 +552,23 @@ Object.defineProperties(
data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
}
if (data.buffer.byteLength > 0) {
this._bodyWriter.write(data).then(() => {
callback?.();
this.emit("drain");
}).catch((e) => {
this._requestSendError = e;
console.log(
"writing to",
this._bodyWriteRid,
"desired size",
this._bodyWriter.desiredSize,
"writer",
this._bodyWriter,
);
this._bodyWriter.ready.then(() => {
if (this._bodyWriter.desiredSize > 0) {
this._bodyWriter.write(data).then(() => {
callback?.();
this.emit("drain");
}).catch((e) => {
this._requestSendError = e;
});
}
});
}
return false;

View file

@ -5,8 +5,10 @@
import { core, primordials } from "ext:core/mod.js";
import {
op_node_http_await_response,
op_node_http_fetch_response_upgrade,
op_node_http_request_with_conn,
op_node_http_wait_for_connection,
} from "ext:core/ops";
import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
@ -414,14 +416,14 @@ class ClientRequest extends OutgoingMessage {
}
} else {
debug("CLIENT use net.createConnection", optsWithoutSignal);
// console.log("use net.createConnection");
console.log("use net.createConnection");
this.onSocket(netCreateConnection(optsWithoutSignal));
}
}
}
_writeHeader() {
// console.trace("_writeHeader invoked");
console.trace("_writeHeader invoked");
const url = this._createUrlStrFromOptions();
const headers = [];
@ -460,15 +462,20 @@ class ClientRequest extends OutgoingMessage {
(async () => {
try {
// console.trace("js: sending request", this.socket.rid);
console.trace("js: sending request", this.socket.rid);
// console.log("this.socket", this.socket);
const res = await op_node_http_request_with_conn(
const [rid, connRid] = await op_node_http_request_with_conn(
this.method,
url,
headers,
this._bodyWriteRid,
this.socket.rid,
);
console.log("js: request sent", { rid, connRid });
// Emit request ready to let the request body to be written.
await op_node_http_wait_for_connection(connRid);
this.emit("requestReady");
const res = await op_node_http_await_response(rid);
console.log({ res });
// if (this._req.cancelHandleRid !== null) {
// core.tryClose(this._req.cancelHandleRid);
@ -605,7 +612,9 @@ class ClientRequest extends OutgoingMessage {
// Note: the order is important, as the headers flush
// sets up the request.
this._flushHeaders();
this._flushBody();
this.on("requestReady", () => {
this._flushBody();
});
});
this.socket = socket;
this.emit("socket", socket);

View file

@ -376,8 +376,10 @@ export class TCP extends ConnectionWrap {
transport: "tcp",
};
console.log("deno.connect start");
Deno.connect(connectOptions).then(
(conn: Deno.Conn) => {
console.log("deno.connect success");
// Incorrect / backwards, but correcting the local address and port with
// what was actually used given we can't actually specify these in Deno.
const localAddr = conn.localAddr as Deno.NetAddr;