1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

core: Behavior shouldn't be generic

We always pass around Box<[u8]>, and adding this generic is an
unnecessary complication.

Add deno_core_http_bench_test to test.py

sharedQueue works on deno_core_http_bench
This commit is contained in:
Ryan Dahl 2019-03-14 19:17:52 -04:00
parent 76c73ec61e
commit 1811318097
12 changed files with 787 additions and 355 deletions

View file

@ -13,8 +13,7 @@ group("default") {
":deno",
":hyper_hello",
":test_rs",
"core:deno_core_http_bench",
"core:deno_core_test",
"core:default",
"libdeno:test_cc",
]
}

View file

@ -1,5 +1,19 @@
import("//build_extra/rust/rust.gni")
group("default") {
testonly = true
deps = [
":deno_core_http_bench",
":deno_core_http_bench_test",
":deno_core_test",
]
}
deno_core_deps = [
"../libdeno:libdeno_static_lib",
"../libdeno:v8",
]
# deno_core does not depend on flatbuffers nor tokio.
main_extern = [
"$rust_build:futures",
@ -10,28 +24,36 @@ main_extern = [
rust_crate("deno_core") {
source_root = "lib.rs"
deps = deno_core_deps
extern = main_extern
deps = [
"../libdeno:libdeno_static_lib",
]
}
rust_test("deno_core_test") {
source_root = "lib.rs"
deps = deno_core_deps
extern = main_extern
deps = [
"../libdeno:libdeno_static_lib",
]
}
http_bench_extern = [
"$rust_build:futures",
"$rust_build:lazy_static",
"$rust_build:libc",
"$rust_build:log",
"$rust_build:tokio",
":deno_core",
]
if (is_win) {
http_bench_extern += [ "$rust_build:winapi" ]
}
rust_executable("deno_core_http_bench") {
source_root = "http_bench.rs"
extern = [
"$rust_build:futures",
"$rust_build:lazy_static",
"$rust_build:libc",
"$rust_build:log",
"$rust_build:tokio",
":deno_core"
]
deps = deno_core_deps
extern = http_bench_extern
}
rust_test("deno_core_http_bench_test") {
source_root = "http_bench.rs"
deps = deno_core_deps
extern = http_bench_extern
}

View file

@ -6,64 +6,21 @@ const OP_ACCEPT = 2;
const OP_READ = 3;
const OP_WRITE = 4;
const OP_CLOSE = 5;
const INDEX_START = 0;
const INDEX_END = 1;
const NUM_RECORDS = 128;
const RECORD_SIZE = 4;
const shared32 = new Int32Array(libdeno.shared);
function idx(i, off) {
return 2 + i * RECORD_SIZE + off;
}
function recordsPush(promiseId, opId, arg, result) {
let i = shared32[INDEX_END];
if (i >= NUM_RECORDS) {
return false;
}
shared32[idx(i, 0)] = promiseId;
shared32[idx(i, 1)] = opId;
shared32[idx(i, 2)] = arg;
shared32[idx(i, 3)] = result;
shared32[INDEX_END]++;
return true;
}
function recordsShift() {
if (shared32[INDEX_START] == shared32[INDEX_END]) {
return null;
}
const i = shared32[INDEX_START];
const record = {
promiseId: shared32[idx(i, 0)],
opId: shared32[idx(i, 1)],
arg: shared32[idx(i, 2)],
result: shared32[idx(i, 3)]
};
shared32[INDEX_START]++;
return record;
}
function recordsReset() {
shared32[INDEX_START] = 0;
shared32[INDEX_END] = 0;
}
function recordsSize() {
return shared32[INDEX_END] - shared32[INDEX_START];
}
const requestBuf = new Uint8Array(64 * 1024);
const responseBuf = new Uint8Array(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
.split("")
.map(c => c.charCodeAt(0))
);
const promiseMap = new Map();
let nextPromiseId = 1;
function assert(cond) {
if (!cond) {
throw Error("assert");
}
}
function createResolvable() {
let methods;
const promise = new Promise((resolve, reject) => {
@ -72,36 +29,73 @@ function createResolvable() {
return Object.assign(promise, methods);
}
const scratch32 = new Int32Array(4);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength
);
assert(scratchBytes.byteLength === 4 * 4);
// Toggle what method we send with. false = legacy.
// AFAICT This has no effect on performance.
const sendWithShared = true;
function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
if (sendWithShared) {
Deno._sharedQueue.push(scratchBytes);
libdeno.send(null, zeroCopy);
} else {
libdeno.send(scratchBytes, zeroCopy);
}
}
/** Returns Promise<number> */
function sendAsync(opId, arg, zeroCopyData) {
function sendAsync(opId, arg, zeroCopy = null) {
const promiseId = nextPromiseId++;
const p = createResolvable();
recordsReset();
recordsPush(promiseId, opId, arg, -1);
promiseMap.set(promiseId, p);
libdeno.send(null, zeroCopyData);
send(promiseId, opId, arg, zeroCopy);
return p;
}
/** Returns u32 number */
function sendSync(opId, arg) {
recordsReset();
recordsPush(0, opId, arg, -1);
libdeno.send();
if (recordsSize() != 1) {
throw Error("Expected sharedSimple to have size 1");
}
let { result } = recordsShift();
return result;
function recordFromBuf(buf) {
assert(buf.byteLength === 16);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
return {
promiseId: buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
};
}
function handleAsyncMsgFromRust() {
while (recordsSize() > 0) {
const { promiseId, result } = recordsShift();
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
function recv() {
const buf = Deno._sharedQueue.shift();
if (!buf) {
return null;
}
return recordFromBuf(buf);
}
/** Returns i32 number */
function sendSync(opId, arg) {
send(0, opId, arg);
const record = recv();
assert(recv() == null);
return record.result;
}
function handleAsyncMsgFromRust(buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
}
/** Listens on 0.0.0.0:4500, returns rid. */
@ -147,12 +141,12 @@ async function serve(rid) {
}
async function main() {
libdeno.recv(handleAsyncMsgFromRust);
Deno._setAsyncHandler(handleAsyncMsgFromRust);
libdeno.print("http_bench.js start\n");
const listenerRid = listen();
libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}`);
libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`);
while (true) {
const rid = await accept(listenerRid);
// libdeno.print(`accepted ${rid}`);

View file

@ -16,7 +16,6 @@ use deno_core::*;
use futures::future::lazy;
use std::collections::HashMap;
use std::env;
use std::mem;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@ -29,13 +28,7 @@ const OP_READ: i32 = 3;
const OP_WRITE: i32 = 4;
const OP_CLOSE: i32 = 5;
const INDEX_START: usize = 0;
const INDEX_END: usize = 1;
const NUM_RECORDS: usize = 128;
const RECORD_SIZE: usize = 4;
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
pub op_id: i32,
@ -43,48 +36,84 @@ pub struct Record {
pub result: i32,
}
impl Into<Buf> for Record {
fn into(self) -> Buf {
let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 16];
unsafe { Box::from_raw(ptr) }
}
}
impl From<&[u8]> for Record {
fn from(s: &[u8]) -> Record {
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
}
}
}
impl From<Buf> for Record {
fn from(buf: Buf) -> Record {
assert_eq!(buf.len(), 4 * 4);
//let byte_len = buf.len();
let ptr = Box::into_raw(buf) as *mut [i32; 4];
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
assert_eq!(ints.len(), 4);
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
}
}
}
#[test]
fn test_record_from() {
let r = Record {
promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
};
let expected = r.clone();
let buf: Buf = r.into();
#[cfg(target_endian = "little")]
assert_eq!(
buf,
vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
);
let actual = Record::from(buf);
assert_eq!(actual, expected);
// TODO test From<&[u8]> for Record
}
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
struct HttpBench {
shared32: Vec<i32>,
}
struct HttpBench();
impl HttpBench {
fn new() -> Self {
let mut shared32 = Vec::<i32>::new();
let n = 2 + 4 * NUM_RECORDS;
shared32.resize(n, 0);
shared32[INDEX_START] = 0;
shared32[INDEX_END] = 0;
Self { shared32 }
}
}
fn idx(i: usize, off: usize) -> usize {
2 + i * RECORD_SIZE + off
}
impl Behavior<Record> for HttpBench {
impl Behavior for HttpBench {
fn startup_snapshot(&mut self) -> Option<deno_buf> {
None
}
fn startup_shared(&mut self) -> Option<deno_buf> {
let ptr = self.shared32.as_ptr() as *const u8;
let len = mem::size_of::<i32>() * self.shared32.len();
Some(unsafe { deno_buf::from_raw_parts(ptr, len) })
}
fn resolve(&mut self, _specifier: &str, _referrer: deno_mod) -> deno_mod {
// HttpBench doesn't do ES modules.
unimplemented!()
}
fn recv(
fn dispatch(
&mut self,
record: Record,
control: &[u8],
zero_copy_buf: deno_buf,
) -> (bool, Box<Op<Record>>) {
) -> (bool, Box<Op>) {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
OP_LISTEN => {
@ -125,51 +154,22 @@ impl Behavior<Record> for HttpBench {
eprintln!("unexpected err {}", err);
record_b.result = -1;
Ok(record_b)
}).then(|result| -> Result<Buf, ()> {
let record = result.unwrap();
Ok(record.into())
}),
);
(is_sync, op)
}
fn records_reset(&mut self) {
self.shared32[INDEX_START] = 0;
self.shared32[INDEX_END] = 0;
}
fn records_push(&mut self, record: Record) -> bool {
debug!("push {:?}", record);
let i = self.shared32[INDEX_END] as usize;
if i >= NUM_RECORDS {
return false;
}
self.shared32[idx(i, 0)] = record.promise_id;
self.shared32[idx(i, 1)] = record.op_id;
self.shared32[idx(i, 2)] = record.arg;
self.shared32[idx(i, 3)] = record.result;
self.shared32[INDEX_END] += 1;
true
}
fn records_shift(&mut self) -> Option<Record> {
let i = self.shared32[INDEX_START] as usize;
if i == self.shared32[INDEX_END] as usize {
return None;
}
let record = Record {
promise_id: self.shared32[idx(i, 0)],
op_id: self.shared32[idx(i, 1)],
arg: self.shared32[idx(i, 2)],
result: self.shared32[idx(i, 3)],
};
self.shared32[INDEX_START] += 1;
Some(record)
}
}
fn main() {
let js_source = include_str!("http_bench.js");
let main_future = lazy(move || {
let isolate = deno_core::Isolate::new(HttpBench::new());
let isolate = deno_core::Isolate::new(HttpBench());
isolate.shared_init();
// TODO currently isolate.execute() must be run inside tokio, hence the
// lazy(). It would be nice to not have that contraint. Probably requires

View file

@ -3,6 +3,8 @@ use crate::js_errors::JSError;
use crate::libdeno;
use crate::libdeno::deno_buf;
use crate::libdeno::deno_mod;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::Async;
use futures::Future;
use futures::Poll;
@ -11,19 +13,20 @@ use std::ffi::CStr;
use std::ffi::CString;
use std::sync::{Once, ONCE_INIT};
pub type Op<R> = dyn Future<Item = R, Error = ()> + Send;
pub type Buf = Box<[u8]>;
pub type Op = dyn Future<Item = Buf, Error = ()> + Send;
struct PendingOp<R> {
op: Box<Op<R>>,
struct PendingOp {
op: Box<Op>,
polled_recently: bool,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
}
impl<R> Future for PendingOp<R> {
type Item = R;
impl Future for PendingOp {
type Item = Buf;
type Error = ();
fn poll(&mut self) -> Poll<R, ()> {
fn poll(&mut self) -> Poll<Buf, ()> {
// Do not call poll on ops we've already polled this turn.
if self.polled_recently {
Ok(Async::NotReady)
@ -32,7 +35,7 @@ impl<R> Future for PendingOp<R> {
let op = &mut self.op;
op.poll().map_err(|()| {
// Ops should not error. If an op experiences an error it needs to
// encode that error into the record R, so it can be returned to JS.
// encode that error into a buf, so it can be returned to JS.
panic!("ops should not error")
})
}
@ -40,35 +43,21 @@ impl<R> Future for PendingOp<R> {
}
/// Defines the behavior of an Isolate.
pub trait Behavior<R> {
pub trait Behavior {
/// Called exactly once when an Isolate is created to retrieve the startup
/// snapshot.
fn startup_snapshot(&mut self) -> Option<deno_buf>;
/// Called exactly once when an Isolate is created to provide the
/// backing memory for the libdeno.shared SharedArrayBuffer.
fn startup_shared(&mut self) -> Option<deno_buf>;
/// Called during mod_instantiate() to resolve imports.
fn resolve(&mut self, specifier: &str, referrer: deno_mod) -> deno_mod;
/// Called whenever libdeno.send() is called in JavaScript. zero_copy_buf
/// corresponds to the second argument of libdeno.send().
fn recv(&mut self, record: R, zero_copy_buf: deno_buf) -> (bool, Box<Op<R>>);
// TODO(ry) Remove records_reset().
// TODO(ry) Abstract records_* and startup_shared() methods into standalone
// trait called Shared. It should, however, wait until integration with
// existing Deno codebase is complete.
/// Clears the shared buffer.
fn records_reset(&mut self);
/// Returns false if not enough room.
fn records_push(&mut self, record: R) -> bool;
/// Returns none if empty.
fn records_shift(&mut self) -> Option<R>;
fn dispatch(
&mut self,
control: &[u8],
zero_copy_buf: deno_buf,
) -> (bool, Box<Op>);
}
/// A single execution context of JavaScript. Corresponds roughly to the "Web
@ -77,18 +66,19 @@ pub trait Behavior<R> {
/// pending ops have completed.
///
/// Ops are created in JavaScript by calling libdeno.send(), and in Rust by
/// implementing Behavior::recv. An Op corresponds exactly to a Promise in
/// implementing Behavior::dispatch. An Op corresponds exactly to a Promise in
/// JavaScript.
pub struct Isolate<R, B: Behavior<R>> {
pub struct Isolate<B: Behavior> {
libdeno_isolate: *const libdeno::isolate,
behavior: B,
pending_ops: Vec<PendingOp<R>>,
shared: SharedQueue,
pending_ops: Vec<PendingOp>,
polled_recently: bool,
}
unsafe impl<R, B: Behavior<R>> Send for Isolate<R, B> {}
unsafe impl<B: Behavior> Send for Isolate<B> {}
impl<R, B: Behavior<R>> Drop for Isolate<R, B> {
impl<B: Behavior> Drop for Isolate<B> {
fn drop(&mut self) {
unsafe { libdeno::deno_delete(self.libdeno_isolate) }
}
@ -96,22 +86,21 @@ impl<R, B: Behavior<R>> Drop for Isolate<R, B> {
static DENO_INIT: Once = ONCE_INIT;
impl<R, B: Behavior<R>> Isolate<R, B> {
impl<B: Behavior> Isolate<B> {
pub fn new(mut behavior: B) -> Self {
DENO_INIT.call_once(|| {
unsafe { libdeno::deno_init() };
});
let shared = SharedQueue::new(RECOMMENDED_SIZE);
let config = libdeno::deno_config {
will_snapshot: 0,
load_snapshot: match behavior.startup_snapshot() {
Some(s) => s,
None => libdeno::deno_buf::empty(),
},
shared: match behavior.startup_shared() {
Some(s) => s,
None => libdeno::deno_buf::empty(),
},
shared: shared.as_deno_buf(),
recv_cb: Self::pre_dispatch,
};
let libdeno_isolate = unsafe { libdeno::deno_new(config) };
@ -119,28 +108,50 @@ impl<R, B: Behavior<R>> Isolate<R, B> {
Self {
libdeno_isolate,
behavior,
shared,
pending_ops: Vec::new(),
polled_recently: false,
}
}
/// Executes a bit of built-in JavaScript to provide Deno._sharedQueue.
pub fn shared_init(&self) {
js_check(self.execute("shared_queue.js", include_str!("shared_queue.js")));
}
extern "C" fn pre_dispatch(
user_data: *mut c_void,
control_buf: deno_buf,
control_argv0: deno_buf,
zero_copy_buf: deno_buf,
) {
let isolate = unsafe { Isolate::<R, B>::from_raw_ptr(user_data) };
assert_eq!(control_buf.len(), 0);
let isolate = unsafe { Isolate::<B>::from_raw_ptr(user_data) };
let zero_copy_id = zero_copy_buf.zero_copy_id;
let req_record = isolate.behavior.records_shift().unwrap();
let control_shared = isolate.shared.shift();
isolate.behavior.records_reset();
let (is_sync, op) = if control_argv0.len() > 0 {
// The user called libdeno.send(control)
isolate
.behavior
.dispatch(control_argv0.as_ref(), zero_copy_buf)
} else if let Some(c) = control_shared {
// The user called Deno._sharedQueue.push(control)
isolate.behavior.dispatch(&c, zero_copy_buf)
} else {
// The sharedQueue is empty. The shouldn't happen usually, but it's also
// not technically a failure.
#[cfg(test)]
unreachable!();
#[cfg(not(test))]
return;
};
// At this point the SharedQueue should be empty.
assert_eq!(isolate.shared.size(), 0);
let (is_sync, op) = isolate.behavior.recv(req_record, zero_copy_buf);
if is_sync {
let res_record = op.wait().unwrap();
let push_success = isolate.behavior.records_push(res_record);
let push_success = isolate.shared.push(res_record);
assert!(push_success);
// TODO check that if JSError thrown during respond(), that it will be
// picked up.
@ -295,7 +306,7 @@ impl<R, B: Behavior<R>> Isolate<R, B> {
specifier_ptr: *const libc::c_char,
referrer: deno_mod,
) -> deno_mod {
let isolate = unsafe { Isolate::<R, B>::from_raw_ptr(user_data) };
let isolate = unsafe { Isolate::<B>::from_raw_ptr(user_data) };
let specifier_c: &CStr = unsafe { CStr::from_ptr(specifier_ptr) };
let specifier: &str = specifier_c.to_str().unwrap();
isolate.behavior.resolve(specifier, referrer)
@ -319,7 +330,7 @@ impl Drop for LockerScope {
}
}
impl<R, B: Behavior<R>> Future for Isolate<R, B> {
impl<B: Behavior> Future for Isolate<B> {
type Item = ();
type Error = JSError;
@ -336,22 +347,18 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> {
while !self.polled_recently {
let mut completed_count = 0;
debug!("poll loop");
self.polled_recently = true;
self.behavior.records_reset();
assert_eq!(self.shared.size(), 0);
let mut i = 0;
while i != self.pending_ops.len() {
while i < self.pending_ops.len() {
let pending = &mut self.pending_ops[i];
match pending.poll() {
Err(()) => panic!("unexpectd error"),
Err(()) => panic!("unexpected error"),
Ok(Async::NotReady) => {
i += 1;
}
Ok(Async::Ready(record)) => {
Ok(Async::Ready(buf)) => {
let completed = self.pending_ops.remove(i);
completed_count += 1;
@ -359,15 +366,16 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> {
self.zero_copy_release(completed.zero_copy_id);
}
self.behavior.records_push(record);
self.shared.push(buf);
}
}
}
if completed_count > 0 {
debug!("respond");
self.respond()?;
debug!("after respond");
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
self.shared.reset();
}
}
@ -385,99 +393,33 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> {
}
}
pub fn js_check(r: Result<(), JSError>) {
if let Err(e) = r {
panic!(e.to_string());
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn js_check(r: Result<(), JSError>) {
if let Err(e) = r {
panic!(e.to_string());
}
}
struct TestBehavior {
recv_count: usize,
resolve_count: usize,
push_count: usize,
shift_count: usize,
reset_count: usize,
mod_map: HashMap<String, deno_mod>,
}
impl TestBehavior {
fn new() -> Self {
Self {
recv_count: 0,
resolve_count: 0,
push_count: 0,
shift_count: 0,
reset_count: 0,
mod_map: HashMap::new(),
}
}
fn register(&mut self, name: &str, id: deno_mod) {
self.mod_map.insert(name.to_string(), id);
}
}
impl Behavior<()> for TestBehavior {
fn startup_snapshot(&mut self) -> Option<deno_buf> {
None
}
fn startup_shared(&mut self) -> Option<deno_buf> {
None
}
fn recv(
&mut self,
_record: (),
_zero_copy_buf: deno_buf,
) -> (bool, Box<Op<()>>) {
self.recv_count += 1;
(false, Box::new(futures::future::ok(())))
}
fn resolve(&mut self, specifier: &str, _referrer: deno_mod) -> deno_mod {
self.resolve_count += 1;
match self.mod_map.get(specifier) {
Some(id) => *id,
None => 0,
}
}
fn records_reset(&mut self) {
self.reset_count += 1;
}
fn records_push(&mut self, _record: ()) -> bool {
self.push_count += 1;
true
}
fn records_shift(&mut self) -> Option<()> {
self.shift_count += 1;
Some(())
}
}
use crate::test_util::*;
#[test]
fn test_recv() {
fn test_dispatch() {
let behavior = TestBehavior::new();
let isolate = Isolate::new(behavior);
js_check(isolate.execute(
"filename.js",
r#"
libdeno.send();
let control = new Uint8Array([42]);
libdeno.send(control);
async function main() {
libdeno.send();
libdeno.send(control);
}
main();
"#,
));
assert_eq!(isolate.behavior.recv_count, 2);
assert_eq!(isolate.behavior.dispatch_count, 2);
}
#[test]
@ -491,10 +433,11 @@ mod tests {
r#"
import { b } from 'b.js'
if (b() != 'b') throw Error();
libdeno.send();
let control = new Uint8Array([42]);
libdeno.send(control);
"#,
).unwrap();
assert_eq!(isolate.behavior.recv_count, 0);
assert_eq!(isolate.behavior.dispatch_count, 0);
assert_eq!(isolate.behavior.resolve_count, 0);
let imports = isolate.mod_get_imports(mod_a);
@ -507,16 +450,16 @@ mod tests {
assert_eq!(imports.len(), 0);
js_check(isolate.mod_instantiate(mod_b));
assert_eq!(isolate.behavior.recv_count, 0);
assert_eq!(isolate.behavior.dispatch_count, 0);
assert_eq!(isolate.behavior.resolve_count, 0);
isolate.behavior.register("b.js", mod_b);
js_check(isolate.mod_instantiate(mod_a));
assert_eq!(isolate.behavior.recv_count, 0);
assert_eq!(isolate.behavior.dispatch_count, 0);
assert_eq!(isolate.behavior.resolve_count, 1);
js_check(isolate.mod_evaluate(mod_a));
assert_eq!(isolate.behavior.recv_count, 1);
assert_eq!(isolate.behavior.dispatch_count, 1);
assert_eq!(isolate.behavior.resolve_count, 1);
}
@ -525,11 +468,13 @@ mod tests {
let behavior = TestBehavior::new();
let mut isolate = Isolate::new(behavior);
isolate.shared_init();
js_check(isolate.execute(
"setup.js",
r#"
let nrecv = 0;
libdeno.recv(() => {
Deno._setAsyncHandler((buf) => {
nrecv++;
});
function assertEq(actual, expected) {
@ -539,32 +484,77 @@ mod tests {
}
"#,
));
assert_eq!(isolate.behavior.recv_count, 0);
assert_eq!(isolate.behavior.dispatch_count, 0);
js_check(isolate.execute(
"check1.js",
r#"
assertEq(nrecv, 0);
libdeno.send();
let control = new Uint8Array([42]);
libdeno.send(control);
assertEq(nrecv, 0);
"#,
));
assert_eq!(isolate.behavior.recv_count, 1);
assert_eq!(isolate.behavior.dispatch_count, 1);
assert_eq!(Ok(Async::Ready(())), isolate.poll());
assert_eq!(isolate.behavior.recv_count, 1);
assert_eq!(isolate.behavior.dispatch_count, 1);
js_check(isolate.execute(
"check2.js",
r#"
assertEq(nrecv, 1);
libdeno.send();
libdeno.send(control);
assertEq(nrecv, 1);
"#,
));
assert_eq!(isolate.behavior.recv_count, 2);
assert_eq!(isolate.behavior.dispatch_count, 2);
assert_eq!(Ok(Async::Ready(())), isolate.poll());
js_check(isolate.execute("check3.js", "assertEq(nrecv, 2)"));
assert_eq!(isolate.behavior.recv_count, 2);
assert_eq!(isolate.behavior.dispatch_count, 2);
// We are idle, so the next poll should be the last.
assert_eq!(Ok(Async::Ready(())), isolate.poll());
}
#[test]
fn test_shared() {
let behavior = TestBehavior::new();
let mut isolate = Isolate::new(behavior);
isolate.shared_init();
js_check(isolate.execute(
"setup.js",
r#"
let nrecv = 0;
Deno._setAsyncHandler((buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
nrecv++;
});
function assert(cond) {
if (!cond) {
throw Error("assert");
}
}
"#,
));
assert_eq!(isolate.behavior.dispatch_count, 0);
js_check(isolate.execute(
"send1.js",
r#"
let control = new Uint8Array([42]);
Deno._sharedQueue.push(control);
libdeno.send();
assert(nrecv === 0);
Deno._sharedQueue.push(control);
libdeno.send();
assert(nrecv === 0);
"#,
));
assert_eq!(isolate.behavior.dispatch_count, 2);
assert_eq!(Ok(Async::Ready(())), isolate.poll());
js_check(isolate.execute("send1.js", "assert(nrecv === 2);"));
}
}

View file

@ -8,6 +8,9 @@ mod flags;
mod isolate;
mod js_errors;
mod libdeno;
mod shared_queue;
#[cfg(test)]
mod test_util;
pub use crate::flags::v8_set_flags;
pub use crate::isolate::*;

View file

@ -1,49 +0,0 @@
use crate::libdeno::deno_buf;
use std::mem;
// TODO this is where we abstract flatbuffers at.
// TODO make these constants private to this file.
const INDEX_NUM_RECORDS: usize = 0;
const INDEX_RECORDS: usize = 1;
pub const RECORD_OFFSET_PROMISE_ID: usize = 0;
pub const RECORD_OFFSET_OP: usize = 1;
pub const RECORD_OFFSET_ARG: usize = 2;
pub const RECORD_OFFSET_RESULT: usize = 3;
const RECORD_SIZE: usize = 4;
const NUM_RECORDS: usize = 100;
/// Represents the shared buffer between JS and Rust.
/// Used for FFI.
pub struct Shared(Vec<i32>);
impl Shared {
pub fn new() -> Shared {
let mut vec = Vec::<i32>::new();
vec.resize(INDEX_RECORDS + RECORD_SIZE * NUM_RECORDS, 0);
Shared(vec)
}
pub fn set_record(&mut self, i: usize, off: usize, value: i32) {
assert!(i < NUM_RECORDS);
self.0[INDEX_RECORDS + RECORD_SIZE * i + off] = value;
}
pub fn get_record(&self, i: usize, off: usize) -> i32 {
assert!(i < NUM_RECORDS);
return self.0[INDEX_RECORDS + RECORD_SIZE * i + off];
}
pub fn set_num_records(&mut self, num_records: i32) {
self.0[INDEX_NUM_RECORDS] = num_records;
}
pub fn get_num_records(&self) -> i32 {
return self.0[INDEX_NUM_RECORDS];
}
pub fn as_deno_buf(&mut self) -> deno_buf {
let ptr = self.0.as_mut_ptr() as *mut u8;
let len = mem::size_of::<i32>() * self.0.len();
unsafe { deno_buf::from_raw_parts(ptr, len) }
}
}

127
core/shared_queue.js Normal file
View file

@ -0,0 +1,127 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
(window => {
const MAX_RECORDS = 100;
const INDEX_NUM_RECORDS = 0;
const INDEX_NUM_SHIFTED_OFF = 1;
const INDEX_HEAD = 2;
const INDEX_OFFSETS = 3;
const INDEX_RECORDS = 3 + MAX_RECORDS;
const HEAD_INIT = 4 * INDEX_RECORDS;
let sharedBytes = null;
let shared32 = null;
if (!window["Deno"]) {
window["Deno"] = {};
}
function assert(cond) {
if (!cond) {
throw Error("assert");
}
}
function reset() {
shared32.fill(0, 0, INDEX_RECORDS);
shared32[INDEX_HEAD] = HEAD_INIT;
}
function head() {
return shared32[INDEX_HEAD];
}
function numRecords() {
return shared32[INDEX_NUM_RECORDS];
}
function setEnd(index, end) {
shared32[INDEX_OFFSETS + index] = end;
}
function getEnd(index) {
if (index < numRecords()) {
return shared32[INDEX_OFFSETS + index];
} else {
return null;
}
}
function getOffset(index) {
if (index < numRecords()) {
if (index == 0) {
return HEAD_INIT;
} else {
return shared32[INDEX_OFFSETS + index - 1];
}
} else {
return null;
}
}
function push(buf) {
let off = head();
let end = off + buf.byteLength;
let index = numRecords();
if (end > shared32.byteLength) {
console.log("shared_queue.ts push fail");
return false;
}
setEnd(index, end);
assert(end - off == buf.byteLength);
sharedBytes.set(buf, off);
shared32[INDEX_NUM_RECORDS] += 1;
shared32[INDEX_HEAD] = end;
return true;
}
/// Returns null if empty.
function shift() {
let i = shared32[INDEX_NUM_SHIFTED_OFF];
if (i >= numRecords()) {
return null;
}
let off = getOffset(i);
let end = getEnd(i);
shared32[INDEX_NUM_SHIFTED_OFF] += 1;
return sharedBytes.subarray(off, end);
}
function size() {
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
}
let asyncHandler = null;
function setAsyncHandler(cb) {
assert(asyncHandler == null);
asyncHandler = cb;
}
function handleAsyncMsgFromRust() {
let buf;
while ((buf = shift()) != null) {
asyncHandler(buf);
}
}
function init(shared) {
assert(shared.byteLength > 0);
assert(sharedBytes == null);
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
// Callers should not call libdeno.recv, use setAsyncHandler.
libdeno.recv(handleAsyncMsgFromRust);
}
window.Deno._setAsyncHandler = setAsyncHandler;
window.Deno._sharedQueue = {
head,
numRecords,
size,
push,
reset,
shift
};
init(libdeno.shared);
})(this);

225
core/shared_queue.rs Normal file
View file

@ -0,0 +1,225 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
use crate::isolate::Buf;
use crate::libdeno::deno_buf;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
const INDEX_NUM_RECORDS: usize = 0;
/// Number of records that have been shifted off.
const INDEX_NUM_SHIFTED_OFF: usize = 1;
/// The head is the number of initialized bytes in SharedQueue.
/// It grows monotonically.
const INDEX_HEAD: usize = 2;
const INDEX_OFFSETS: usize = 3;
const INDEX_RECORDS: usize = 3 + MAX_RECORDS;
/// Byte offset of where the records begin. Also where the head starts.
const HEAD_INIT: usize = 4 * INDEX_RECORDS;
/// A rough guess at how big we should make the shared buffer in bytes.
pub const RECOMMENDED_SIZE: usize = 128 * MAX_RECORDS;
pub struct SharedQueue {
bytes: Vec<u8>,
}
impl SharedQueue {
pub fn new(len: usize) -> Self {
let mut bytes = Vec::new();
bytes.resize(HEAD_INIT + len, 0);
let mut q = Self { bytes };
q.reset();
q
}
pub fn as_deno_buf(&self) -> deno_buf {
let ptr = self.bytes.as_ptr();
let len = self.bytes.len();
unsafe { deno_buf::from_raw_parts(ptr, len) }
}
/// Clears the shared buffer.
pub fn reset(&mut self) {
let s: &mut [u32] = self.as_u32_slice_mut();
for i in 0..INDEX_RECORDS {
s[i] = 0;
}
s[INDEX_HEAD] = HEAD_INIT as u32;
}
fn as_u32_slice<'a>(&'a self) -> &'a [u32] {
let p = self.bytes.as_ptr() as *const u32;
unsafe { std::slice::from_raw_parts(p, self.bytes.len() / 4) }
}
fn as_u32_slice_mut<'a>(&'a mut self) -> &'a mut [u32] {
let p = self.bytes.as_mut_ptr() as *mut u32;
unsafe { std::slice::from_raw_parts_mut(p, self.bytes.len() / 4) }
}
pub fn size(&self) -> usize {
let s = self.as_u32_slice();
(s[INDEX_NUM_RECORDS] - s[INDEX_NUM_SHIFTED_OFF]) as usize
}
fn num_records(&self) -> usize {
let s = self.as_u32_slice();
s[INDEX_NUM_RECORDS] as usize
}
fn head(&self) -> usize {
let s = self.as_u32_slice();
s[INDEX_HEAD] as usize
}
fn set_end(&mut self, index: usize, end: usize) {
let s = self.as_u32_slice_mut();
s[INDEX_OFFSETS + index] = end as u32;
}
fn get_end(&self, index: usize) -> Option<usize> {
if index < self.num_records() {
let s = self.as_u32_slice();
Some(s[INDEX_OFFSETS + index] as usize)
} else {
None
}
}
fn get_offset(&self, index: usize) -> Option<usize> {
if index < self.num_records() {
Some(if index == 0 {
HEAD_INIT
} else {
let s = self.as_u32_slice();
s[INDEX_OFFSETS + index - 1] as usize
})
} else {
None
}
}
/// Returns none if empty.
pub fn shift<'a>(&'a mut self) -> Option<&'a [u8]> {
let u32_slice = self.as_u32_slice();
let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize;
if i >= self.num_records() {
assert_eq!(self.size(), 0);
return None;
}
let off = self.get_offset(i).unwrap();
let end = self.get_end(i).unwrap();
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_SHIFTED_OFF] += 1;
Some(&self.bytes[off..end])
}
pub fn push(&mut self, record: Buf) -> bool {
let off = self.head();
let end = off + record.len();
let index = self.num_records();
if end > self.bytes.len() {
eprintln!("WARNING the sharedQueue overflowed");
return false;
}
self.set_end(index, end);
assert_eq!(end - off, record.len());
self.bytes[off..end].copy_from_slice(&record);
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
u32_slice[INDEX_HEAD] = end as u32;
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::isolate::js_check;
use crate::isolate::Isolate;
use crate::test_util::*;
use futures::Async;
use futures::Future;
#[test]
fn basic() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
let h = q.head();
assert!(h > 0);
let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice();
let len = r.len() + h;
assert!(q.push(r));
assert_eq!(q.head(), len);
let r = vec![6, 7].into_boxed_slice();
assert!(q.push(r));
let r = vec![8, 9, 10, 11].into_boxed_slice();
assert!(q.push(r));
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 3);
let r = q.shift().unwrap();
assert_eq!(r.as_ref(), vec![1, 2, 3, 4, 5].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 2);
let r = q.shift().unwrap();
assert_eq!(r.as_ref(), vec![6, 7].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 1);
let r = q.shift().unwrap();
assert_eq!(r.as_ref(), vec![8, 9, 10, 11].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 0);
assert!(q.shift().is_none());
assert!(q.shift().is_none());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 0);
q.reset();
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
}
fn alloc_buf(byte_length: usize) -> Buf {
let mut v = Vec::new();
v.resize(byte_length, 0);
v.into_boxed_slice()
}
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
assert!(q.push(alloc_buf(RECOMMENDED_SIZE - 1)));
assert_eq!(q.size(), 1);
assert!(!q.push(alloc_buf(2)));
assert_eq!(q.size(), 1);
assert!(q.push(alloc_buf(1)));
assert_eq!(q.size(), 2);
assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1);
assert_eq!(q.size(), 1);
assert_eq!(q.shift().unwrap().len(), 1);
assert_eq!(q.size(), 0);
assert!(!q.push(alloc_buf(1)));
}
#[test]
fn test_js() {
let behavior = TestBehavior::new();
let mut isolate = Isolate::new(behavior);
isolate.shared_init();
js_check(
isolate
.execute("shared_queue_test.js", include_str!("shared_queue_test.js")),
);
assert_eq!(Ok(Async::Ready(())), isolate.poll());
}
}

65
core/shared_queue_test.js Normal file
View file

@ -0,0 +1,65 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
function assert(cond) {
if (!cond) {
throw Error("assert");
}
}
function main() {
const q = Deno._sharedQueue;
let h = q.head();
assert(h > 0);
let r = new Uint8Array([1, 2, 3, 4, 5]);
let len = r.byteLength + h;
assert(q.push(r));
assert(q.head() == len);
r = new Uint8Array([6, 7]);
assert(q.push(r));
r = new Uint8Array([8, 9, 10, 11]);
assert(q.push(r));
assert(q.numRecords() == 3);
assert(q.size() == 3);
r = q.shift();
assert(r.byteLength == 5);
assert(r[0] == 1);
assert(r[1] == 2);
assert(r[2] == 3);
assert(r[3] == 4);
assert(r[4] == 5);
assert(q.numRecords() == 3);
assert(q.size() == 2);
r = q.shift();
assert(r.byteLength == 2);
assert(r[0] == 6);
assert(r[1] == 7);
assert(q.numRecords() == 3);
assert(q.size() == 1);
r = q.shift();
assert(r.byteLength == 4);
assert(r[0] == 8);
assert(r[1] == 9);
assert(r[2] == 10);
assert(r[3] == 11);
assert(q.numRecords() == 3);
assert(q.size() == 0);
assert(q.shift() == null);
assert(q.shift() == null);
assert(q.numRecords() == 3);
assert(q.size() == 0);
q.reset();
assert(q.numRecords() == 0);
assert(q.size() == 0);
libdeno.print("shared_queue_test.js ok\n");
}
main();

51
core/test_util.rs Normal file
View file

@ -0,0 +1,51 @@
use crate::isolate::Behavior;
use crate::isolate::Op;
use crate::libdeno::deno_buf;
use crate::libdeno::deno_mod;
use std::collections::HashMap;
pub struct TestBehavior {
pub dispatch_count: usize,
pub resolve_count: usize,
pub mod_map: HashMap<String, deno_mod>,
}
impl TestBehavior {
pub fn new() -> Self {
Self {
dispatch_count: 0,
resolve_count: 0,
mod_map: HashMap::new(),
}
}
pub fn register(&mut self, name: &str, id: deno_mod) {
self.mod_map.insert(name.to_string(), id);
}
}
impl Behavior for TestBehavior {
fn startup_snapshot(&mut self) -> Option<deno_buf> {
None
}
fn dispatch(
&mut self,
control: &[u8],
_zero_copy_buf: deno_buf,
) -> (bool, Box<Op>) {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
self.dispatch_count += 1;
let buf = vec![43u8].into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
}
fn resolve(&mut self, specifier: &str, _referrer: deno_mod) -> deno_mod {
self.resolve_count += 1;
match self.mod_map.get(specifier) {
Some(id) => *id,
None => 0,
}
}
}

View file

@ -90,6 +90,11 @@ def main(argv):
check_exists(deno_core_test)
run([deno_core_test])
deno_core_http_bench_test = os.path.join(
build_dir, "deno_core_http_bench_test" + executable_suffix)
check_exists(deno_core_http_bench_test)
run([deno_core_http_bench_test])
unit_tests(deno_exe)
prefetch_test(deno_exe)