mirror of
https://github.com/denoland/deno.git
synced 2025-01-03 04:48:52 -05:00
fix(ext/node): fix grpc error_handling example (#23755)
gRPC depends only on the END_STREAM flag to emit "trailers" event which is responsible to propagate the errors correctly. This patch uses Body::is_end_stream() to determine if a stream will end and set the END_STREAM flag. Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
00e6d41a9d
commit
7893ab9f0b
2 changed files with 18 additions and 8 deletions
|
@ -423,7 +423,7 @@ pub struct Http2ClientResponse {
|
||||||
pub async fn op_http2_client_get_response(
|
pub async fn op_http2_client_get_response(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
#[smi] stream_rid: ResourceId,
|
#[smi] stream_rid: ResourceId,
|
||||||
) -> Result<Http2ClientResponse, AnyError> {
|
) -> Result<(Http2ClientResponse, bool), AnyError> {
|
||||||
let resource = state
|
let resource = state
|
||||||
.borrow()
|
.borrow()
|
||||||
.resource_table
|
.resource_table
|
||||||
|
@ -439,6 +439,7 @@ pub async fn op_http2_client_get_response(
|
||||||
for (key, val) in parts.headers.iter() {
|
for (key, val) in parts.headers.iter() {
|
||||||
res_headers.push((key.as_str().into(), val.as_bytes().into()));
|
res_headers.push((key.as_str().into(), val.as_bytes().into()));
|
||||||
}
|
}
|
||||||
|
let end_stream = body.is_end_stream();
|
||||||
|
|
||||||
let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
|
let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
|
||||||
let body_rid =
|
let body_rid =
|
||||||
|
@ -450,11 +451,14 @@ pub async fn op_http2_client_get_response(
|
||||||
trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
|
trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
|
||||||
trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
|
trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
|
||||||
});
|
});
|
||||||
Ok(Http2ClientResponse {
|
Ok((
|
||||||
headers: res_headers,
|
Http2ClientResponse {
|
||||||
body_rid,
|
headers: res_headers,
|
||||||
status_code: status.into(),
|
body_rid,
|
||||||
})
|
status_code: status.into(),
|
||||||
|
},
|
||||||
|
end_stream,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
enum DataOrTrailers {
|
enum DataOrTrailers {
|
||||||
|
|
|
@ -822,7 +822,7 @@ export class ClientHttp2Stream extends Duplex {
|
||||||
session[kDenoClientRid],
|
session[kDenoClientRid],
|
||||||
this.#rid,
|
this.#rid,
|
||||||
);
|
);
|
||||||
const response = await op_http2_client_get_response(
|
const [response, endStream] = await op_http2_client_get_response(
|
||||||
this.#rid,
|
this.#rid,
|
||||||
);
|
);
|
||||||
debugHttp2(">>> after get response", response);
|
debugHttp2(">>> after get response", response);
|
||||||
|
@ -831,7 +831,13 @@ export class ClientHttp2Stream extends Duplex {
|
||||||
...Object.fromEntries(response.headers),
|
...Object.fromEntries(response.headers),
|
||||||
};
|
};
|
||||||
debugHttp2(">>> emitting response", headers);
|
debugHttp2(">>> emitting response", headers);
|
||||||
this.emit("response", headers, 0);
|
this.emit(
|
||||||
|
"response",
|
||||||
|
headers,
|
||||||
|
endStream
|
||||||
|
? constants.NGHTTP2_FLAG_END_STREAM
|
||||||
|
: constants.NGHTTP2_FLAG_NONE,
|
||||||
|
);
|
||||||
this[kDenoResponse] = response;
|
this[kDenoResponse] = response;
|
||||||
this.emit("ready");
|
this.emit("ready");
|
||||||
})();
|
})();
|
||||||
|
|
Loading…
Reference in a new issue