mirror of
https://github.com/denoland/deno.git
synced 2024-12-26 00:59:24 -05:00
fix: shared queue requires aligned buffer (#2816)
This commit is contained in:
parent
2235dd795d
commit
017f88ee99
4 changed files with 44 additions and 23 deletions
|
@ -37,7 +37,10 @@ fn serialize_result(
|
||||||
Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
|
Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
|
||||||
Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }),
|
Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }),
|
||||||
};
|
};
|
||||||
let vec = serde_json::to_vec(&value).unwrap();
|
let mut vec = serde_json::to_vec(&value).unwrap();
|
||||||
|
debug!("JSON response pre-align, len={}", vec.len());
|
||||||
|
// Align to 32bit word, padding with the space character.
|
||||||
|
vec.resize((vec.len() + 3usize) & !3usize, b' ');
|
||||||
vec.into_boxed_slice()
|
vec.into_boxed_slice()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -785,12 +785,12 @@ pub mod tests {
|
||||||
Mode::AsyncImmediate => {
|
Mode::AsyncImmediate => {
|
||||||
assert_eq!(control.len(), 1);
|
assert_eq!(control.len(), 1);
|
||||||
assert_eq!(control[0], 42);
|
assert_eq!(control[0], 42);
|
||||||
let buf = vec![43u8].into_boxed_slice();
|
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
|
||||||
Op::Async(Box::new(futures::future::ok(buf)))
|
Op::Async(Box::new(futures::future::ok(buf)))
|
||||||
}
|
}
|
||||||
Mode::OverflowReqSync => {
|
Mode::OverflowReqSync => {
|
||||||
assert_eq!(control.len(), 100 * 1024 * 1024);
|
assert_eq!(control.len(), 100 * 1024 * 1024);
|
||||||
let buf = vec![43u8].into_boxed_slice();
|
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
|
||||||
Op::Sync(buf)
|
Op::Sync(buf)
|
||||||
}
|
}
|
||||||
Mode::OverflowResSync => {
|
Mode::OverflowResSync => {
|
||||||
|
@ -804,7 +804,7 @@ pub mod tests {
|
||||||
}
|
}
|
||||||
Mode::OverflowReqAsync => {
|
Mode::OverflowReqAsync => {
|
||||||
assert_eq!(control.len(), 100 * 1024 * 1024);
|
assert_eq!(control.len(), 100 * 1024 * 1024);
|
||||||
let buf = vec![43u8].into_boxed_slice();
|
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
|
||||||
Op::Async(Box::new(futures::future::ok(buf)))
|
Op::Async(Box::new(futures::future::ok(buf)))
|
||||||
}
|
}
|
||||||
Mode::OverflowResAsync => {
|
Mode::OverflowResAsync => {
|
||||||
|
@ -1211,7 +1211,7 @@ pub mod tests {
|
||||||
let control = new Uint8Array(100 * 1024 * 1024);
|
let control = new Uint8Array(100 * 1024 * 1024);
|
||||||
let response = Deno.core.dispatch(99, control);
|
let response = Deno.core.dispatch(99, control);
|
||||||
assert(response instanceof Uint8Array);
|
assert(response instanceof Uint8Array);
|
||||||
assert(response.length == 1);
|
assert(response.length == 4);
|
||||||
assert(response[0] == 43);
|
assert(response[0] == 43);
|
||||||
assert(asyncRecv == 0);
|
assert(asyncRecv == 0);
|
||||||
"#,
|
"#,
|
||||||
|
@ -1251,7 +1251,7 @@ pub mod tests {
|
||||||
let asyncRecv = 0;
|
let asyncRecv = 0;
|
||||||
Deno.core.setAsyncHandler((opId, buf) => {
|
Deno.core.setAsyncHandler((opId, buf) => {
|
||||||
assert(opId == 99);
|
assert(opId == 99);
|
||||||
assert(buf.byteLength === 1);
|
assert(buf.byteLength === 4);
|
||||||
assert(buf[0] === 43);
|
assert(buf[0] === 43);
|
||||||
asyncRecv++;
|
asyncRecv++;
|
||||||
});
|
});
|
||||||
|
|
|
@ -159,9 +159,19 @@ impl SharedQueue {
|
||||||
Some((op_id, &self.bytes[off..end]))
|
Some((op_id, &self.bytes[off..end]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Because JS-side may cast `record` to Int32Array it is required
|
||||||
|
/// that `record`'s length is divisible by 4.
|
||||||
pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
|
pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
|
||||||
let off = self.head();
|
let off = self.head();
|
||||||
let end = off + record.len();
|
let end = off + record.len();
|
||||||
|
debug!(
|
||||||
|
"rust:shared_queue:pre-push: op={}, off={}, end={}, len={}",
|
||||||
|
op_id,
|
||||||
|
off,
|
||||||
|
end,
|
||||||
|
record.len()
|
||||||
|
);
|
||||||
|
assert_eq!(record.len() % 4, 0);
|
||||||
let index = self.num_records();
|
let index = self.num_records();
|
||||||
if end > self.bytes.len() || index >= MAX_RECORDS {
|
if end > self.bytes.len() || index >= MAX_RECORDS {
|
||||||
debug!("WARNING the sharedQueue overflowed");
|
debug!("WARNING the sharedQueue overflowed");
|
||||||
|
@ -195,31 +205,31 @@ mod tests {
|
||||||
let h = q.head();
|
let h = q.head();
|
||||||
assert!(h > 0);
|
assert!(h > 0);
|
||||||
|
|
||||||
let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice();
|
let r = vec![1u8, 2, 3, 4].into_boxed_slice();
|
||||||
let len = r.len() + h;
|
let len = r.len() + h;
|
||||||
assert!(q.push(0, &r));
|
assert!(q.push(0, &r));
|
||||||
assert_eq!(q.head(), len);
|
assert_eq!(q.head(), len);
|
||||||
|
|
||||||
let r = vec![6, 7].into_boxed_slice();
|
let r = vec![5, 6, 7, 8].into_boxed_slice();
|
||||||
assert!(q.push(0, &r));
|
assert!(q.push(0, &r));
|
||||||
|
|
||||||
let r = vec![8, 9, 10, 11].into_boxed_slice();
|
let r = vec![9, 10, 11, 12].into_boxed_slice();
|
||||||
assert!(q.push(0, &r));
|
assert!(q.push(0, &r));
|
||||||
assert_eq!(q.num_records(), 3);
|
assert_eq!(q.num_records(), 3);
|
||||||
assert_eq!(q.size(), 3);
|
assert_eq!(q.size(), 3);
|
||||||
|
|
||||||
let (_op_id, r) = q.shift().unwrap();
|
let (_op_id, r) = q.shift().unwrap();
|
||||||
assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice());
|
assert_eq!(r, vec![1, 2, 3, 4].as_slice());
|
||||||
assert_eq!(q.num_records(), 3);
|
assert_eq!(q.num_records(), 3);
|
||||||
assert_eq!(q.size(), 2);
|
assert_eq!(q.size(), 2);
|
||||||
|
|
||||||
let (_op_id, r) = q.shift().unwrap();
|
let (_op_id, r) = q.shift().unwrap();
|
||||||
assert_eq!(r, vec![6, 7].as_slice());
|
assert_eq!(r, vec![5, 6, 7, 8].as_slice());
|
||||||
assert_eq!(q.num_records(), 3);
|
assert_eq!(q.num_records(), 3);
|
||||||
assert_eq!(q.size(), 1);
|
assert_eq!(q.size(), 1);
|
||||||
|
|
||||||
let (_op_id, r) = q.shift().unwrap();
|
let (_op_id, r) = q.shift().unwrap();
|
||||||
assert_eq!(r, vec![8, 9, 10, 11].as_slice());
|
assert_eq!(r, vec![9, 10, 11, 12].as_slice());
|
||||||
assert_eq!(q.num_records(), 0);
|
assert_eq!(q.num_records(), 0);
|
||||||
assert_eq!(q.size(), 0);
|
assert_eq!(q.size(), 0);
|
||||||
|
|
||||||
|
@ -239,21 +249,21 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn overflow() {
|
fn overflow() {
|
||||||
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
|
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
|
||||||
assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 1)));
|
assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 4)));
|
||||||
assert_eq!(q.size(), 1);
|
assert_eq!(q.size(), 1);
|
||||||
assert!(!q.push(0, &alloc_buf(2)));
|
assert!(!q.push(0, &alloc_buf(8)));
|
||||||
assert_eq!(q.size(), 1);
|
assert_eq!(q.size(), 1);
|
||||||
assert!(q.push(0, &alloc_buf(1)));
|
assert!(q.push(0, &alloc_buf(4)));
|
||||||
assert_eq!(q.size(), 2);
|
assert_eq!(q.size(), 2);
|
||||||
|
|
||||||
let (_op_id, buf) = q.shift().unwrap();
|
let (_op_id, buf) = q.shift().unwrap();
|
||||||
assert_eq!(buf.len(), RECOMMENDED_SIZE - 1);
|
assert_eq!(buf.len(), RECOMMENDED_SIZE - 4);
|
||||||
assert_eq!(q.size(), 1);
|
assert_eq!(q.size(), 1);
|
||||||
|
|
||||||
assert!(!q.push(0, &alloc_buf(1)));
|
assert!(!q.push(0, &alloc_buf(4)));
|
||||||
|
|
||||||
let (_op_id, buf) = q.shift().unwrap();
|
let (_op_id, buf) = q.shift().unwrap();
|
||||||
assert_eq!(buf.len(), 1);
|
assert_eq!(buf.len(), 4);
|
||||||
assert_eq!(q.size(), 0);
|
assert_eq!(q.size(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,11 +271,19 @@ mod tests {
|
||||||
fn full_records() {
|
fn full_records() {
|
||||||
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
|
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
|
||||||
for _ in 0..MAX_RECORDS {
|
for _ in 0..MAX_RECORDS {
|
||||||
assert!(q.push(0, &alloc_buf(1)))
|
assert!(q.push(0, &alloc_buf(4)))
|
||||||
}
|
}
|
||||||
assert_eq!(q.push(0, &alloc_buf(1)), false);
|
assert_eq!(q.push(0, &alloc_buf(4)), false);
|
||||||
// Even if we shift one off, we still cannot push a new record.
|
// Even if we shift one off, we still cannot push a new record.
|
||||||
let _ignored = q.shift().unwrap();
|
let _ignored = q.shift().unwrap();
|
||||||
assert_eq!(q.push(0, &alloc_buf(1)), false);
|
assert_eq!(q.push(0, &alloc_buf(4)), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn bad_buf_length() {
|
||||||
|
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
|
||||||
|
// check that `record` that has length not a multiple of 4 will cause panic
|
||||||
|
q.push(0, &alloc_buf(3));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ def get_addr(port=None):
|
||||||
def deno_tcp(deno_exe):
|
def deno_tcp(deno_exe):
|
||||||
addr = get_addr()
|
addr = get_addr()
|
||||||
deno_cmd = [deno_exe, "run", "--allow-net", "tools/deno_tcp.ts", addr]
|
deno_cmd = [deno_exe, "run", "--allow-net", "tools/deno_tcp.ts", addr]
|
||||||
print "http_benchmark testing DENO."
|
print "http_benchmark testing DENO tcp."
|
||||||
return run(deno_cmd, addr)
|
return run(deno_cmd, addr)
|
||||||
|
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ def deno_tcp_current_thread(deno_exe):
|
||||||
deno_exe, "run", "--current-thread", "--allow-net",
|
deno_exe, "run", "--current-thread", "--allow-net",
|
||||||
"tools/deno_tcp.ts", addr
|
"tools/deno_tcp.ts", addr
|
||||||
]
|
]
|
||||||
print "http_benchmark testing DENO."
|
print "http_benchmark testing DENO tcp (single-thread)."
|
||||||
return run(deno_cmd, addr)
|
return run(deno_cmd, addr)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue