From 017f88ee99b0fe40221e6af92e0b6a976fbaf2ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 26 Aug 2019 13:48:40 +0200 Subject: [PATCH] fix: shared queue requires aligned buffer (#2816) --- cli/ops/dispatch_json.rs | 5 ++++- core/isolate.rs | 10 ++++----- core/shared_queue.rs | 48 +++++++++++++++++++++++++++------------- tools/http_benchmark.py | 4 ++-- 4 files changed, 44 insertions(+), 23 deletions(-) diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index a575aedb3e..77d76f39f7 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -37,7 +37,10 @@ fn serialize_result( Ok(v) => json!({ "ok": v, "promiseId": promise_id }), Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }), }; - let vec = serde_json::to_vec(&value).unwrap(); + let mut vec = serde_json::to_vec(&value).unwrap(); + debug!("JSON response pre-align, len={}", vec.len()); + // Align to 32bit word, padding with the space character. + vec.resize((vec.len() + 3usize) & !3usize, b' '); vec.into_boxed_slice() } diff --git a/core/isolate.rs b/core/isolate.rs index e0acbfd9f2..a5e659c9a7 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -785,12 +785,12 @@ pub mod tests { Mode::AsyncImmediate => { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); - let buf = vec![43u8].into_boxed_slice(); + let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Op::Async(Box::new(futures::future::ok(buf))) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); - let buf = vec![43u8].into_boxed_slice(); + let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Op::Sync(buf) } Mode::OverflowResSync => { @@ -804,7 +804,7 @@ pub mod tests { } Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); - let buf = vec![43u8].into_boxed_slice(); + let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Op::Async(Box::new(futures::future::ok(buf))) } Mode::OverflowResAsync => { @@ -1211,7 +1211,7 @@ pub mod tests { let control = new Uint8Array(100 * 1024 * 1024); let response = Deno.core.dispatch(99, control); assert(response instanceof Uint8Array); - assert(response.length == 1); + assert(response.length == 4); assert(response[0] == 43); assert(asyncRecv == 0); "#, @@ -1251,7 +1251,7 @@ pub mod tests { let asyncRecv = 0; Deno.core.setAsyncHandler((opId, buf) => { assert(opId == 99); - assert(buf.byteLength === 1); + assert(buf.byteLength === 4); assert(buf[0] === 43); asyncRecv++; }); diff --git a/core/shared_queue.rs b/core/shared_queue.rs index 11c8e21271..5f9554ad21 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -159,9 +159,19 @@ impl SharedQueue { Some((op_id, &self.bytes[off..end])) } + /// Because JS-side may cast `record` to Int32Array it is required + /// that `record`'s length is divisible by 4. pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool { let off = self.head(); let end = off + record.len(); + debug!( + "rust:shared_queue:pre-push: op={}, off={}, end={}, len={}", + op_id, + off, + end, + record.len() + ); + assert_eq!(record.len() % 4, 0); let index = self.num_records(); if end > self.bytes.len() || index >= MAX_RECORDS { debug!("WARNING the sharedQueue overflowed"); @@ -195,31 +205,31 @@ mod tests { let h = q.head(); assert!(h > 0); - let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice(); + let r = vec![1u8, 2, 3, 4].into_boxed_slice(); let len = r.len() + h; assert!(q.push(0, &r)); assert_eq!(q.head(), len); - let r = vec![6, 7].into_boxed_slice(); + let r = vec![5, 6, 7, 8].into_boxed_slice(); assert!(q.push(0, &r)); - let r = vec![8, 9, 10, 11].into_boxed_slice(); + let r = vec![9, 10, 11, 12].into_boxed_slice(); assert!(q.push(0, &r)); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 3); let (_op_id, r) = q.shift().unwrap(); - assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice()); + assert_eq!(r, vec![1, 2, 3, 4].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 2); let (_op_id, r) = q.shift().unwrap(); - assert_eq!(r, vec![6, 7].as_slice()); + assert_eq!(r, vec![5, 6, 7, 8].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 1); let (_op_id, r) = q.shift().unwrap(); - assert_eq!(r, vec![8, 9, 10, 11].as_slice()); + assert_eq!(r, vec![9, 10, 11, 12].as_slice()); assert_eq!(q.num_records(), 0); assert_eq!(q.size(), 0); @@ -239,21 +249,21 @@ mod tests { #[test] fn overflow() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); - assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 1))); + assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 4))); assert_eq!(q.size(), 1); - assert!(!q.push(0, &alloc_buf(2))); + assert!(!q.push(0, &alloc_buf(8))); assert_eq!(q.size(), 1); - assert!(q.push(0, &alloc_buf(1))); + assert!(q.push(0, &alloc_buf(4))); assert_eq!(q.size(), 2); let (_op_id, buf) = q.shift().unwrap(); - assert_eq!(buf.len(), RECOMMENDED_SIZE - 1); + assert_eq!(buf.len(), RECOMMENDED_SIZE - 4); assert_eq!(q.size(), 1); - assert!(!q.push(0, &alloc_buf(1))); + assert!(!q.push(0, &alloc_buf(4))); let (_op_id, buf) = q.shift().unwrap(); - assert_eq!(buf.len(), 1); + assert_eq!(buf.len(), 4); assert_eq!(q.size(), 0); } @@ -261,11 +271,19 @@ mod tests { fn full_records() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); for _ in 0..MAX_RECORDS { - assert!(q.push(0, &alloc_buf(1))) + assert!(q.push(0, &alloc_buf(4))) } - assert_eq!(q.push(0, &alloc_buf(1)), false); + assert_eq!(q.push(0, &alloc_buf(4)), false); // Even if we shift one off, we still cannot push a new record. let _ignored = q.shift().unwrap(); - assert_eq!(q.push(0, &alloc_buf(1)), false); + assert_eq!(q.push(0, &alloc_buf(4)), false); + } + + #[test] + #[should_panic] + fn bad_buf_length() { + let mut q = SharedQueue::new(RECOMMENDED_SIZE); + // check that `record` that has length not a multiple of 4 will cause panic + q.push(0, &alloc_buf(3)); } } diff --git a/tools/http_benchmark.py b/tools/http_benchmark.py index e3674d0954..2394ad160f 100755 --- a/tools/http_benchmark.py +++ b/tools/http_benchmark.py @@ -28,7 +28,7 @@ def get_addr(port=None): def deno_tcp(deno_exe): addr = get_addr() deno_cmd = [deno_exe, "run", "--allow-net", "tools/deno_tcp.ts", addr] - print "http_benchmark testing DENO." + print "http_benchmark testing DENO tcp." return run(deno_cmd, addr) @@ -38,7 +38,7 @@ def deno_tcp_current_thread(deno_exe): deno_exe, "run", "--current-thread", "--allow-net", "tools/deno_tcp.ts", addr ] - print "http_benchmark testing DENO." + print "http_benchmark testing DENO tcp (single-thread)." return run(deno_cmd, addr)