diff --git a/ext/node/ops/http2.rs b/ext/node/ops/http2.rs index e206db61bf..abf7eae5d8 100644 --- a/ext/node/ops/http2.rs +++ b/ext/node/ops/http2.rs @@ -344,6 +344,7 @@ pub async fn op_http2_client_send_data( state: Rc>, #[smi] stream_rid: ResourceId, #[buffer] data: JsBuffer, + end_of_stream: bool, ) -> Result<(), AnyError> { let resource = state .borrow() @@ -351,8 +352,7 @@ pub async fn op_http2_client_send_data( .get::(stream_rid)?; let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; - // TODO(bartlomieju): handle end of stream - stream.send_data(data.to_vec().into(), false)?; + stream.send_data(data.to_vec().into(), end_of_stream)?; Ok(()) } diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 7a9d91097d..2a3b4f7f37 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -978,7 +978,7 @@ export class ClientHttp2Stream extends Duplex { return; } - shutdownWritable(this, cb); + shutdownWritable(this, cb, this.#rid); } // TODO(bartlomieju): needs a proper cleanup @@ -1176,15 +1176,30 @@ export class ClientHttp2Stream extends Duplex { } } -function shutdownWritable(stream, callback) { +function shutdownWritable(stream, callback, streamRid) { debugHttp2(">>> shutdownWritable", callback); const state = stream[kState]; if (state.shutdownWritableCalled) { + debugHttp2(">>> shutdownWritable() already called"); return callback(); } state.shutdownWritableCalled = true; - onStreamTrailers(stream); - callback(); + if (state.flags & STREAM_FLAGS_HAS_TRAILERS) { + onStreamTrailers(stream); + callback(); + } else { + op_http2_client_send_data(streamRid, new Uint8Array(), true) + .then(() => { + callback(); + stream[kMaybeDestroy](); + core.tryClose(streamRid); + }) + .catch((e) => { + callback(e); + core.tryClose(streamRid); + stream._destroy(e); + }); + } // TODO(bartlomieju): might have to add "finish" event listener here, // check it. }