mirror of
https://github.com/denoland/deno.git
synced 2024-12-02 17:01:14 -05:00
refactor: dedupe op_node_http_request_with_conn and op_node_http_request_with_tls_conn
This commit is contained in:
parent
5c0ecf6a8e
commit
7471848dc4
3 changed files with 50 additions and 139 deletions
|
@ -326,7 +326,6 @@ deno_core::extension!(deno_node,
|
||||||
ops::zlib::brotli::op_brotli_decompress_stream_end,
|
ops::zlib::brotli::op_brotli_decompress_stream_end,
|
||||||
ops::http::op_node_http_fetch_response_upgrade,
|
ops::http::op_node_http_fetch_response_upgrade,
|
||||||
ops::http::op_node_http_request_with_conn<P>,
|
ops::http::op_node_http_request_with_conn<P>,
|
||||||
ops::http::op_node_http_request_with_tls_conn<P>,
|
|
||||||
ops::http::op_node_http_await_response,
|
ops::http::op_node_http_await_response,
|
||||||
ops::http::op_node_http_wait_for_connection,
|
ops::http::op_node_http_wait_for_connection,
|
||||||
ops::http2::op_http2_connect,
|
ops::http2::op_http2_connect,
|
||||||
|
|
|
@ -96,30 +96,60 @@ pub async fn op_node_http_request_with_conn<P>(
|
||||||
#[serde] headers: Vec<(ByteString, ByteString)>,
|
#[serde] headers: Vec<(ByteString, ByteString)>,
|
||||||
#[smi] body: Option<ResourceId>,
|
#[smi] body: Option<ResourceId>,
|
||||||
#[smi] conn_rid: ResourceId,
|
#[smi] conn_rid: ResourceId,
|
||||||
|
encrypted: bool,
|
||||||
) -> Result<(ResourceId, ResourceId), AnyError>
|
) -> Result<(ResourceId, ResourceId), AnyError>
|
||||||
where
|
where
|
||||||
P: crate::NodePermissions + 'static,
|
P: crate::NodePermissions + 'static,
|
||||||
{
|
{
|
||||||
// Establish the connection/client.
|
let (_handle, mut sender, receiver) = if encrypted {
|
||||||
let resource_rc = state
|
let resource_rc = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.take::<TcpStreamResource>(conn_rid)?;
|
.take::<TlsStreamResource>(conn_rid)?;
|
||||||
let resource = Rc::try_unwrap(resource_rc)
|
let resource = Rc::try_unwrap(resource_rc)
|
||||||
.map_err(|_e| bad_resource("TCP stream is currently in use"))?;
|
.map_err(|_e| bad_resource("TLS stream is currently in use"))?;
|
||||||
let (read_half, write_half) = resource.into_inner();
|
let (read_half, write_half) = resource.into_inner();
|
||||||
let tcp_stream = read_half.reunite(write_half)?;
|
let tcp_stream = read_half.unsplit(write_half);
|
||||||
let io = TokioIo::new(tcp_stream);
|
let io = TokioIo::new(tcp_stream);
|
||||||
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
|
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
|
||||||
|
|
||||||
let (notify, receiver) = tokio::sync::oneshot::channel::<()>();
|
let (notify, receiver) = tokio::sync::oneshot::channel::<()>();
|
||||||
|
|
||||||
// Spawn a task to poll the connection, driving the HTTP state
|
// Spawn a task to poll the connection, driving the HTTP state
|
||||||
let _handle = tokio::task::spawn(async move {
|
(
|
||||||
let _ = notify.send(());
|
tokio::task::spawn(async move {
|
||||||
conn.await?;
|
let _ = notify.send(());
|
||||||
Ok::<_, AnyError>(())
|
conn.await?;
|
||||||
});
|
Ok::<_, AnyError>(())
|
||||||
|
}),
|
||||||
|
sender,
|
||||||
|
receiver,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
let resource_rc = state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.take::<TcpStreamResource>(conn_rid)?;
|
||||||
|
let resource = Rc::try_unwrap(resource_rc)
|
||||||
|
.map_err(|_e| bad_resource("TCP stream is currently in use"))?;
|
||||||
|
let (read_half, write_half) = resource.into_inner();
|
||||||
|
let tcp_stream = read_half.reunite(write_half)?;
|
||||||
|
let io = TokioIo::new(tcp_stream);
|
||||||
|
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
|
||||||
|
|
||||||
|
let (notify, receiver) = tokio::sync::oneshot::channel::<()>();
|
||||||
|
|
||||||
|
// Spawn a task to poll the connection, driving the HTTP state
|
||||||
|
(
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let _ = notify.send(());
|
||||||
|
conn.await?;
|
||||||
|
Ok::<_, AnyError>(())
|
||||||
|
}),
|
||||||
|
sender,
|
||||||
|
receiver,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
// Create the request.
|
// Create the request.
|
||||||
let method = Method::from_bytes(&method)?;
|
let method = Method::from_bytes(&method)?;
|
||||||
|
@ -202,122 +232,6 @@ where
|
||||||
Ok((rid, conn_rid))
|
Ok((rid, conn_rid))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(@satyarohith): deduplicate the code.
|
|
||||||
#[op2(async)]
|
|
||||||
#[serde]
|
|
||||||
pub async fn op_node_http_request_with_tls_conn<P>(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
#[serde] method: ByteString,
|
|
||||||
#[string] url: String,
|
|
||||||
#[serde] headers: Vec<(ByteString, ByteString)>,
|
|
||||||
#[smi] body: Option<ResourceId>,
|
|
||||||
#[smi] conn_rid: ResourceId,
|
|
||||||
) -> Result<(ResourceId, ResourceId), AnyError>
|
|
||||||
where
|
|
||||||
P: crate::NodePermissions + 'static,
|
|
||||||
{
|
|
||||||
// Establish the connection/client.
|
|
||||||
let resource_rc = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.take::<TlsStreamResource>(conn_rid)?;
|
|
||||||
let resource = Rc::try_unwrap(resource_rc)
|
|
||||||
.map_err(|_e| bad_resource("TLS stream is currently in use"))?;
|
|
||||||
let (read_half, write_half) = resource.into_inner();
|
|
||||||
let tls_stream = read_half.unsplit(write_half);
|
|
||||||
let io = TokioIo::new(tls_stream);
|
|
||||||
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
|
|
||||||
|
|
||||||
let (notify, receiver) = tokio::sync::oneshot::channel::<()>();
|
|
||||||
|
|
||||||
// Spawn a task to poll the connection, driving the HTTP state
|
|
||||||
let _handle = tokio::task::spawn(async move {
|
|
||||||
let _ = notify.send(());
|
|
||||||
conn.await?;
|
|
||||||
Ok::<_, AnyError>(())
|
|
||||||
});
|
|
||||||
|
|
||||||
// Create the request.
|
|
||||||
let method = Method::from_bytes(&method)?;
|
|
||||||
let mut url_parsed = Url::parse(&url)?;
|
|
||||||
let maybe_authority = deno_fetch::extract_authority(&mut url_parsed);
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut state_ = state.borrow_mut();
|
|
||||||
let permissions = state_.borrow_mut::<P>();
|
|
||||||
permissions.check_net_url(&url_parsed, "ClientRequest")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut header_map = HeaderMap::new();
|
|
||||||
for (key, value) in headers {
|
|
||||||
let name = HeaderName::from_bytes(&key)
|
|
||||||
.map_err(|err| type_error(err.to_string()))?;
|
|
||||||
let v = HeaderValue::from_bytes(&value)
|
|
||||||
.map_err(|err| type_error(err.to_string()))?;
|
|
||||||
|
|
||||||
header_map.append(name, v);
|
|
||||||
}
|
|
||||||
|
|
||||||
let (body, con_len) = if let Some(body) = body {
|
|
||||||
(
|
|
||||||
BodyExt::boxed(NodeHttpResourceToBodyAdapter::new(
|
|
||||||
state.borrow_mut().resource_table.take_any(body)?,
|
|
||||||
)),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
// POST and PUT requests should always have a 0 length content-length,
|
|
||||||
// if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
|
|
||||||
let len = if matches!(method, Method::POST | Method::PUT) {
|
|
||||||
Some(0)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
(
|
|
||||||
http_body_util::Empty::new()
|
|
||||||
.map_err(|never| match never {})
|
|
||||||
.boxed(),
|
|
||||||
len,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut request = http::Request::new(body);
|
|
||||||
*request.method_mut() = method.clone();
|
|
||||||
let path = url_parsed.path();
|
|
||||||
let query = url_parsed.query();
|
|
||||||
*request.uri_mut() = query
|
|
||||||
.map(|query| format!("{}?{}", path, query))
|
|
||||||
.unwrap_or_else(|| path.to_string())
|
|
||||||
.parse()
|
|
||||||
.map_err(|_| type_error("Invalid URL"))?;
|
|
||||||
*request.headers_mut() = header_map;
|
|
||||||
|
|
||||||
if let Some((username, password)) = maybe_authority {
|
|
||||||
request.headers_mut().insert(
|
|
||||||
AUTHORIZATION,
|
|
||||||
deno_fetch::basic_auth(&username, password.as_deref()),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if let Some(len) = con_len {
|
|
||||||
request.headers_mut().insert(CONTENT_LENGTH, len.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let res = sender.send_request(request).map_err(Error::from).boxed();
|
|
||||||
let rid = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.add(NodeHttpClientResponse {
|
|
||||||
response: res,
|
|
||||||
url: url.clone(),
|
|
||||||
});
|
|
||||||
let conn_rid = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.add(NodeHttpConnReady { recv: receiver });
|
|
||||||
|
|
||||||
Ok((rid, conn_rid))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op2(async)]
|
#[op2(async)]
|
||||||
#[serde]
|
#[serde]
|
||||||
pub async fn op_node_http_wait_for_connection(
|
pub async fn op_node_http_wait_for_connection(
|
||||||
|
|
|
@ -461,15 +461,13 @@ class ClientRequest extends OutgoingMessage {
|
||||||
alpnProtocols: ["http/1.0", "http/1.1"],
|
alpnProtocols: ["http/1.0", "http/1.1"],
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
const op = this._encrypted
|
const [rid, connRid] = await op_node_http_request_with_conn(
|
||||||
? op_node_http_request_with_tls_conn
|
|
||||||
: op_node_http_request_with_conn;
|
|
||||||
const [rid, connRid] = await op(
|
|
||||||
this.method,
|
this.method,
|
||||||
url,
|
url,
|
||||||
headers,
|
headers,
|
||||||
this._bodyWriteRid,
|
this._bodyWriteRid,
|
||||||
baseConnRid,
|
baseConnRid,
|
||||||
|
this._encrypted,
|
||||||
);
|
);
|
||||||
// Emit request ready to let the request body to be written.
|
// Emit request ready to let the request body to be written.
|
||||||
await op_node_http_wait_for_connection(connRid);
|
await op_node_http_wait_for_connection(connRid);
|
||||||
|
|
Loading…
Reference in a new issue