1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-12 00:54:02 -05:00

fix(ext/web): Prevent (De-)CompressionStream resource leak on stream cancellation (#21199)

Based on #21074 and #20741 I was looking for further potential use cases
of `TransformStream` `cancel()` method, so here go `CompressionStream`
and `DecompressionStream`.

Fixes #14212
This commit is contained in:
Florian Schwalm 2024-02-13 22:45:23 +01:00 committed by GitHub
parent 365d788648
commit 082f8128b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 84 additions and 11 deletions

View file

@ -50,9 +50,12 @@ class CompressionStream {
maybeEnqueue(controller, output); maybeEnqueue(controller, output);
}, },
flush(controller) { flush(controller) {
const output = op_compression_finish(rid); const output = op_compression_finish(rid, true);
maybeEnqueue(controller, output); maybeEnqueue(controller, output);
}, },
cancel: (_reason) => {
op_compression_finish(rid, false);
},
}); });
this[webidl.brand] = webidl.brand; this[webidl.brand] = webidl.brand;
@ -109,9 +112,12 @@ class DecompressionStream {
maybeEnqueue(controller, output); maybeEnqueue(controller, output);
}, },
flush(controller) { flush(controller) {
const output = op_compression_finish(rid); const output = op_compression_finish(rid, true);
maybeEnqueue(controller, output); maybeEnqueue(controller, output);
}, },
cancel: (_reason) => {
op_compression_finish(rid, false);
},
}); });
this[webidl.brand] = webidl.brand; this[webidl.brand] = webidl.brand;

View file

@ -116,25 +116,35 @@ pub fn op_compression_write(
pub fn op_compression_finish( pub fn op_compression_finish(
state: &mut OpState, state: &mut OpState,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
report_errors: bool,
) -> Result<ToJsBuffer, AnyError> { ) -> Result<ToJsBuffer, AnyError> {
let resource = state.resource_table.take::<CompressionResource>(rid)?; let resource = state.resource_table.take::<CompressionResource>(rid)?;
let resource = Rc::try_unwrap(resource).unwrap(); let resource = Rc::try_unwrap(resource).unwrap();
let inner = resource.0.into_inner(); let inner = resource.0.into_inner();
let out: Vec<u8> = match inner { let out = match inner {
Inner::DeflateDecoder(d) => { Inner::DeflateDecoder(d) => {
d.finish().map_err(|e| type_error(e.to_string()))? d.finish().map_err(|e| type_error(e.to_string()))
} }
Inner::DeflateEncoder(d) => { Inner::DeflateEncoder(d) => {
d.finish().map_err(|e| type_error(e.to_string()))? d.finish().map_err(|e| type_error(e.to_string()))
} }
Inner::DeflateRawDecoder(d) => { Inner::DeflateRawDecoder(d) => {
d.finish().map_err(|e| type_error(e.to_string()))? d.finish().map_err(|e| type_error(e.to_string()))
} }
Inner::DeflateRawEncoder(d) => { Inner::DeflateRawEncoder(d) => {
d.finish().map_err(|e| type_error(e.to_string()))? d.finish().map_err(|e| type_error(e.to_string()))
} }
Inner::GzDecoder(d) => d.finish().map_err(|e| type_error(e.to_string()))?, Inner::GzDecoder(d) => d.finish().map_err(|e| type_error(e.to_string())),
Inner::GzEncoder(d) => d.finish().map_err(|e| type_error(e.to_string()))?, Inner::GzEncoder(d) => d.finish().map_err(|e| type_error(e.to_string())),
}; };
Ok(out.into()) match out {
Err(err) => {
if report_errors {
Err(err)
} else {
Ok(Vec::with_capacity(0).into())
}
}
Ok(out) => Ok(out.into()),
}
} }

View file

@ -1,5 +1,5 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
import { assertEquals, fail } from "./test_util.ts"; import { assertEquals, assertRejects, fail } from "./test_util.ts";
const { const {
core, core,
@ -476,3 +476,60 @@ for (const packetCount of [1, 1024]) {
assertEquals(await promise, "resource closed"); assertEquals(await promise, "resource closed");
}); });
} }
Deno.test(async function compressionStreamWritableMayBeAborted() {
await Promise.all([
new CompressionStream("gzip").writable.getWriter().abort(),
new CompressionStream("deflate").writable.getWriter().abort(),
new CompressionStream("deflate-raw").writable.getWriter().abort(),
]);
});
Deno.test(async function compressionStreamReadableMayBeCancelled() {
await Promise.all([
new CompressionStream("gzip").readable.getReader().cancel(),
new CompressionStream("deflate").readable.getReader().cancel(),
new CompressionStream("deflate-raw").readable.getReader().cancel(),
]);
});
Deno.test(async function decompressionStreamWritableMayBeAborted() {
await Promise.all([
new DecompressionStream("gzip").writable.getWriter().abort(),
new DecompressionStream("deflate").writable.getWriter().abort(),
new DecompressionStream("deflate-raw").writable.getWriter().abort(),
]);
});
Deno.test(async function decompressionStreamReadableMayBeCancelled() {
await Promise.all([
new DecompressionStream("gzip").readable.getReader().cancel(),
new DecompressionStream("deflate").readable.getReader().cancel(),
new DecompressionStream("deflate-raw").readable.getReader().cancel(),
]);
});
Deno.test(async function decompressionStreamValidGzipDoesNotThrow() {
const cs = new CompressionStream("gzip");
const ds = new DecompressionStream("gzip");
cs.readable.pipeThrough(ds);
const writer = cs.writable.getWriter();
await writer.write(new Uint8Array([1]));
writer.releaseLock();
await cs.writable.close();
let result = new Uint8Array();
for await (const chunk of ds.readable.values()) {
result = new Uint8Array([...result, ...chunk]);
}
assertEquals(result, new Uint8Array([1]));
});
Deno.test(async function decompressionStreamInvalidGzipStillReported() {
await assertRejects(
async () => {
await new DecompressionStream("gzip").writable.close();
},
TypeError,
"corrupt gzip stream does not have a matching checksum",
);
});