mirror of
https://github.com/denoland/deno.git
synced 2025-01-03 04:48:52 -05:00
refactor: rewrite http_next ops to use op2 macro (#19934)
Ref #19915 --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
This commit is contained in:
parent
7f8bf2537d
commit
433ecc9047
2 changed files with 63 additions and 47 deletions
|
@ -726,7 +726,7 @@ function serveHttpOn(context, callback) {
|
||||||
try {
|
try {
|
||||||
// Attempt to pull as many requests out of the queue as possible before awaiting. This API is
|
// Attempt to pull as many requests out of the queue as possible before awaiting. This API is
|
||||||
// a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
|
// a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
|
||||||
while ((req = op_http_try_wait(rid)) !== 0xffffffff) {
|
while ((req = op_http_try_wait(rid)) !== -1) {
|
||||||
PromisePrototypeCatch(callback(req), promiseErrorHandler);
|
PromisePrototypeCatch(callback(req), promiseErrorHandler);
|
||||||
}
|
}
|
||||||
currentPromise = op_http_wait(rid);
|
currentPromise = op_http_wait(rid);
|
||||||
|
@ -741,7 +741,7 @@ function serveHttpOn(context, callback) {
|
||||||
}
|
}
|
||||||
throw new Deno.errors.Http(error);
|
throw new Deno.errors.Http(error);
|
||||||
}
|
}
|
||||||
if (req === 0xffffffff) {
|
if (req === -1) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
PromisePrototypeCatch(callback(req), promiseErrorHandler);
|
PromisePrototypeCatch(callback(req), promiseErrorHandler);
|
||||||
|
|
|
@ -117,10 +117,11 @@ impl<
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op2(fast)]
|
||||||
|
#[smi]
|
||||||
pub fn op_http_upgrade_raw(
|
pub fn op_http_upgrade_raw(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
) -> Result<ResourceId, AnyError> {
|
) -> Result<ResourceId, AnyError> {
|
||||||
// Stage 1: extract the upgrade future
|
// Stage 1: extract the upgrade future
|
||||||
let upgrade = slab_get(slab_id).upgrade()?;
|
let upgrade = slab_get(slab_id).upgrade()?;
|
||||||
|
@ -184,11 +185,12 @@ pub fn op_http_upgrade_raw(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op2(async)]
|
||||||
|
#[smi]
|
||||||
pub async fn op_http_upgrade_websocket_next(
|
pub async fn op_http_upgrade_websocket_next(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
headers: Vec<(ByteString, ByteString)>,
|
#[serde] headers: Vec<(ByteString, ByteString)>,
|
||||||
) -> Result<ResourceId, AnyError> {
|
) -> Result<ResourceId, AnyError> {
|
||||||
let mut http = slab_get(slab_id);
|
let mut http = slab_get(slab_id);
|
||||||
// Stage 1: set the response to 101 Switching Protocols and send it
|
// Stage 1: set the response to 101 Switching Protocols and send it
|
||||||
|
@ -290,10 +292,11 @@ where
|
||||||
array_value.into()
|
array_value.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op2]
|
||||||
|
#[serde]
|
||||||
pub fn op_http_get_request_header(
|
pub fn op_http_get_request_header(
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
name: String,
|
#[string] name: String,
|
||||||
) -> Option<ByteString> {
|
) -> Option<ByteString> {
|
||||||
let http = slab_get(slab_id);
|
let http = slab_get(slab_id);
|
||||||
let value = http.request_parts().headers.get(name);
|
let value = http.request_parts().headers.get(name);
|
||||||
|
@ -382,11 +385,11 @@ pub fn op_http_read_request_body(
|
||||||
state.resource_table.add_rc(body_resource)
|
state.resource_table.add_rc(body_resource)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(fast)]
|
#[op2]
|
||||||
pub fn op_http_set_response_header(
|
pub fn op_http_set_response_header(
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
name: ByteString,
|
#[serde] name: ByteString,
|
||||||
value: ByteString,
|
#[serde] value: ByteString,
|
||||||
) {
|
) {
|
||||||
let mut http = slab_get(slab_id);
|
let mut http = slab_get(slab_id);
|
||||||
let resp_headers = http.response().headers_mut();
|
let resp_headers = http.response().headers_mut();
|
||||||
|
@ -397,24 +400,22 @@ pub fn op_http_set_response_header(
|
||||||
resp_headers.append(name, value);
|
resp_headers.append(name, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(v8)]
|
#[op2]
|
||||||
fn op_http_set_response_headers(
|
pub fn op_http_set_response_headers(
|
||||||
scope: &mut v8::HandleScope,
|
scope: &mut v8::HandleScope,
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
headers: serde_v8::Value,
|
headers: v8::Local<v8::Array>,
|
||||||
) {
|
) {
|
||||||
let mut http = slab_get(slab_id);
|
let mut http = slab_get(slab_id);
|
||||||
// TODO(mmastrac): Invalid headers should be handled?
|
// TODO(mmastrac): Invalid headers should be handled?
|
||||||
let resp_headers = http.response().headers_mut();
|
let resp_headers = http.response().headers_mut();
|
||||||
|
|
||||||
let arr = v8::Local::<v8::Array>::try_from(headers.v8_value).unwrap();
|
let len = headers.length();
|
||||||
|
|
||||||
let len = arr.length();
|
|
||||||
let header_len = len * 2;
|
let header_len = len * 2;
|
||||||
resp_headers.reserve(header_len.try_into().unwrap());
|
resp_headers.reserve(header_len.try_into().unwrap());
|
||||||
|
|
||||||
for i in 0..len {
|
for i in 0..len {
|
||||||
let item = arr.get_index(scope, i).unwrap();
|
let item = headers.get_index(scope, i).unwrap();
|
||||||
let pair = v8::Local::<v8::Array>::try_from(item).unwrap();
|
let pair = v8::Local::<v8::Array>::try_from(item).unwrap();
|
||||||
let name = pair.get_index(scope, 0).unwrap();
|
let name = pair.get_index(scope, 0).unwrap();
|
||||||
let value = pair.get_index(scope, 1).unwrap();
|
let value = pair.get_index(scope, 1).unwrap();
|
||||||
|
@ -429,10 +430,10 @@ fn op_http_set_response_headers(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op2]
|
||||||
pub fn op_http_set_response_trailers(
|
pub fn op_http_set_response_trailers(
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
trailers: Vec<(ByteString, ByteString)>,
|
#[serde] trailers: Vec<(ByteString, ByteString)>,
|
||||||
) {
|
) {
|
||||||
let mut http = slab_get(slab_id);
|
let mut http = slab_get(slab_id);
|
||||||
let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len());
|
let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len());
|
||||||
|
@ -580,11 +581,11 @@ fn set_response(
|
||||||
response.body_mut().initialize(response_fn(compression))
|
response.body_mut().initialize(response_fn(compression))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(fast)]
|
#[op2(fast)]
|
||||||
pub fn op_http_set_response_body_resource(
|
pub fn op_http_set_response_body_resource(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
stream_rid: ResourceId,
|
#[smi] stream_rid: ResourceId,
|
||||||
auto_close: bool,
|
auto_close: bool,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
|
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
|
||||||
|
@ -605,10 +606,11 @@ pub fn op_http_set_response_body_resource(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(fast)]
|
#[op2(fast)]
|
||||||
|
#[smi]
|
||||||
pub fn op_http_set_response_body_stream(
|
pub fn op_http_set_response_body_stream(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
) -> Result<ResourceId, AnyError> {
|
) -> Result<ResourceId, AnyError> {
|
||||||
// TODO(mmastrac): what should this channel size be?
|
// TODO(mmastrac): what should this channel size be?
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||||
|
@ -619,8 +621,11 @@ pub fn op_http_set_response_body_stream(
|
||||||
Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx)))
|
Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx)))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(fast)]
|
#[op2(fast)]
|
||||||
pub fn op_http_set_response_body_text(slab_id: SlabId, text: String) {
|
pub fn op_http_set_response_body_text(
|
||||||
|
#[smi] slab_id: SlabId,
|
||||||
|
#[string] text: String,
|
||||||
|
) {
|
||||||
if !text.is_empty() {
|
if !text.is_empty() {
|
||||||
set_response(slab_id, Some(text.len()), |compression| {
|
set_response(slab_id, Some(text.len()), |compression| {
|
||||||
ResponseBytesInner::from_vec(compression, text.into_bytes())
|
ResponseBytesInner::from_vec(compression, text.into_bytes())
|
||||||
|
@ -628,8 +633,11 @@ pub fn op_http_set_response_body_text(slab_id: SlabId, text: String) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(fast)]
|
#[op2(fast)]
|
||||||
pub fn op_http_set_response_body_bytes(slab_id: SlabId, buffer: &[u8]) {
|
pub fn op_http_set_response_body_bytes(
|
||||||
|
#[smi] slab_id: SlabId,
|
||||||
|
#[buffer] buffer: &[u8],
|
||||||
|
) {
|
||||||
if !buffer.is_empty() {
|
if !buffer.is_empty() {
|
||||||
set_response(slab_id, Some(buffer.len()), |compression| {
|
set_response(slab_id, Some(buffer.len()), |compression| {
|
||||||
ResponseBytesInner::from_slice(compression, buffer)
|
ResponseBytesInner::from_slice(compression, buffer)
|
||||||
|
@ -637,11 +645,11 @@ pub fn op_http_set_response_body_bytes(slab_id: SlabId, buffer: &[u8]) {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op2(async)]
|
||||||
pub async fn op_http_track(
|
pub async fn op_http_track(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
slab_id: SlabId,
|
#[smi] slab_id: SlabId,
|
||||||
server_rid: ResourceId,
|
#[smi] server_rid: ResourceId,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
let http = slab_get(slab_id);
|
let http = slab_get(slab_id);
|
||||||
let handle = http.body_promise();
|
let handle = http.body_promise();
|
||||||
|
@ -834,10 +842,11 @@ impl Drop for HttpJoinHandle {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(v8)]
|
#[op2]
|
||||||
|
#[serde]
|
||||||
pub fn op_http_serve<HTTP>(
|
pub fn op_http_serve<HTTP>(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
listener_rid: ResourceId,
|
#[smi] listener_rid: ResourceId,
|
||||||
) -> Result<(ResourceId, &'static str, String), AnyError>
|
) -> Result<(ResourceId, &'static str, String), AnyError>
|
||||||
where
|
where
|
||||||
HTTP: HttpPropertyExtractor,
|
HTTP: HttpPropertyExtractor,
|
||||||
|
@ -886,10 +895,11 @@ where
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(v8)]
|
#[op2]
|
||||||
|
#[serde]
|
||||||
pub fn op_http_serve_on<HTTP>(
|
pub fn op_http_serve_on<HTTP>(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
connection_rid: ResourceId,
|
#[smi] connection_rid: ResourceId,
|
||||||
) -> Result<(ResourceId, &'static str, String), AnyError>
|
) -> Result<(ResourceId, &'static str, String), AnyError>
|
||||||
where
|
where
|
||||||
HTTP: HttpPropertyExtractor,
|
HTTP: HttpPropertyExtractor,
|
||||||
|
@ -930,8 +940,9 @@ where
|
||||||
|
|
||||||
/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything
|
/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything
|
||||||
/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error.
|
/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error.
|
||||||
#[op(fast)]
|
#[op2(fast)]
|
||||||
pub fn op_http_try_wait(state: &mut OpState, rid: ResourceId) -> SlabId {
|
#[smi]
|
||||||
|
pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
|
||||||
// The resource needs to exist.
|
// The resource needs to exist.
|
||||||
let Ok(join_handle) = state
|
let Ok(join_handle) = state
|
||||||
.resource_table
|
.resource_table
|
||||||
|
@ -952,10 +963,11 @@ pub fn op_http_try_wait(state: &mut OpState, rid: ResourceId) -> SlabId {
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op2(async)]
|
||||||
|
#[smi]
|
||||||
pub async fn op_http_wait(
|
pub async fn op_http_wait(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
rid: ResourceId,
|
#[smi] rid: ResourceId,
|
||||||
) -> Result<SlabId, AnyError> {
|
) -> Result<SlabId, AnyError> {
|
||||||
// We will get the join handle initially, as we might be consuming requests still
|
// We will get the join handle initially, as we might be consuming requests still
|
||||||
let join_handle = state
|
let join_handle = state
|
||||||
|
@ -1088,11 +1100,15 @@ impl Resource for UpgradeStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(fast)]
|
#[op2(fast)]
|
||||||
pub fn op_can_write_vectored(state: &mut OpState, rid: ResourceId) -> bool {
|
pub fn op_can_write_vectored(
|
||||||
|
state: &mut OpState,
|
||||||
|
#[smi] rid: ResourceId,
|
||||||
|
) -> bool {
|
||||||
state.resource_table.get::<UpgradeStream>(rid).is_ok()
|
state.resource_table.get::<UpgradeStream>(rid).is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(bartlomieju): op2 doesn't want to handle `usize` in the return type
|
||||||
#[op]
|
#[op]
|
||||||
pub async fn op_raw_write_vectored(
|
pub async fn op_raw_write_vectored(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
|
|
Loading…
Reference in a new issue