From 51f9331ecb50afeafd0fa2ca8336e75aa374465e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Fri, 5 Oct 2018 19:21:15 +0200 Subject: [PATCH] Add deno.metrics() --- BUILD.gn | 1 + js/deno.ts | 1 + js/metrics.ts | 39 +++++++++++ js/metrics_test.ts | 24 +++++++ js/unit_tests.ts | 1 + src/isolate.rs | 154 ++++++++++++++++++++++++++++++++++++++++++- src/msg.fbs | 12 ++++ src/ops.rs | 36 +++++++++- tools/http_server.py | 3 +- 9 files changed, 266 insertions(+), 5 deletions(-) create mode 100644 js/metrics.ts create mode 100644 js/metrics_test.ts diff --git a/BUILD.gn b/BUILD.gn index 4f2bf2a5b3..16941c003f 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -84,6 +84,7 @@ ts_sources = [ "js/libdeno.ts", "js/main.ts", "js/make_temp_dir.ts", + "js/metrics.ts", "js/mkdir.ts", "js/mock_builtin.js", "js/net.ts", diff --git a/js/deno.ts b/js/deno.ts index a7350e03e7..2125445411 100644 --- a/js/deno.ts +++ b/js/deno.ts @@ -35,6 +35,7 @@ export { trace } from "./trace"; export { truncateSync, truncate } from "./truncate"; export { FileInfo } from "./file_info"; export { connect, dial, listen, Listener, Conn } from "./net"; +export { metrics } from "./metrics"; export const args: string[] = []; // Provide the compiler API in an obfuscated way diff --git a/js/metrics.ts b/js/metrics.ts new file mode 100644 index 0000000000..d76b781db6 --- /dev/null +++ b/js/metrics.ts @@ -0,0 +1,39 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. +import * as msg from "gen/msg_generated"; +import { flatbuffers } from "flatbuffers"; +import { assert } from "./util"; +import * as dispatch from "./dispatch"; + +interface Metrics { + opsDispatched: number; + opsCompleted: number; + bytesSentControl: number; + bytesSentData: number; + bytesReceived: number; +} + +export function metrics(): Metrics { + return res(dispatch.sendSync(...req())); +} + +function req(): [flatbuffers.Builder, msg.Any, flatbuffers.Offset] { + const builder = new flatbuffers.Builder(); + msg.Metrics.startMetrics(builder); + const inner = msg.Metrics.endMetrics(builder); + return [builder, msg.Any.Metrics, inner]; +} + +function res(baseRes: null | msg.Base): Metrics { + assert(baseRes !== null); + assert(msg.Any.MetricsRes === baseRes!.innerType()); + const res = new msg.MetricsRes(); + assert(baseRes!.inner(res) !== null); + + return { + opsDispatched: res.opsDispatched().toFloat64(), + opsCompleted: res.opsCompleted().toFloat64(), + bytesSentControl: res.bytesSentControl().toFloat64(), + bytesSentData: res.bytesSentData().toFloat64(), + bytesReceived: res.bytesReceived().toFloat64() + }; +} diff --git a/js/metrics_test.ts b/js/metrics_test.ts new file mode 100644 index 0000000000..6954ae2ce5 --- /dev/null +++ b/js/metrics_test.ts @@ -0,0 +1,24 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. +import { test, assert } from "./test_util.ts"; +import * as deno from "deno"; + +test(function metrics() { + const m1 = deno.metrics(); + assert(m1.opsDispatched > 0); + assert(m1.opsCompleted > 0); + assert(m1.bytesSentControl > 0); + assert(m1.bytesSentData >= 0); + assert(m1.bytesReceived > 0); + + // Write to stdout to ensure a "data" message gets sent instead of just + // control messages. + const dataMsg = new Uint8Array([41, 42, 43]); + deno.stdout.write(dataMsg); + + const m2 = deno.metrics(); + assert(m2.opsDispatched > m1.opsDispatched); + assert(m2.opsCompleted > m1.opsCompleted); + assert(m2.bytesSentControl > m1.bytesSentControl); + assert(m2.bytesSentData >= m1.bytesSentData + dataMsg.byteLength); + assert(m2.bytesReceived > m1.bytesReceived); +}); diff --git a/js/unit_tests.ts b/js/unit_tests.ts index ca152ad391..24fdac8234 100644 --- a/js/unit_tests.ts +++ b/js/unit_tests.ts @@ -25,3 +25,4 @@ import "./trace_test.ts"; import "./truncate_test.ts"; import "./v8_source_maps_test.ts"; import "../website/app_test.js"; +import "./metrics_test.ts"; diff --git a/src/isolate.rs b/src/isolate.rs index 2dec64501e..408259c5fe 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -55,6 +55,7 @@ pub struct IsolateState { pub argv: Vec, pub flags: flags::DenoFlags, tx: Mutex>>, + pub metrics: Mutex, } impl IsolateState { @@ -66,6 +67,32 @@ impl IsolateState { let tx = maybe_tx.unwrap(); tx.send((req_id, buf)).expect("tx.send error"); } + + fn metrics_op_dispatched( + &self, + bytes_sent_control: u64, + bytes_sent_data: u64, + ) { + let mut metrics = self.metrics.lock().unwrap(); + metrics.ops_dispatched += 1; + metrics.bytes_sent_control += bytes_sent_control; + metrics.bytes_sent_data += bytes_sent_data; + } + + fn metrics_op_completed(&self, bytes_received: u64) { + let mut metrics = self.metrics.lock().unwrap(); + metrics.ops_completed += 1; + metrics.bytes_received += bytes_received; + } +} + +#[derive(Default)] +pub struct Metrics { + pub ops_dispatched: u64, + pub ops_completed: u64, + pub bytes_sent_control: u64, + pub bytes_sent_data: u64, + pub bytes_received: u64, } static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT; @@ -92,6 +119,7 @@ impl Isolate { argv: argv_rest, flags, tx: Mutex::new(Some(tx)), + metrics: Mutex::new(Metrics::default()), }), } } @@ -221,6 +249,10 @@ extern "C" fn pre_dispatch( control_buf: libdeno::deno_buf, data_buf: libdeno::deno_buf, ) { + // for metrics + let bytes_sent_control = control_buf.data_len as u64; + let bytes_sent_data = data_buf.data_len as u64; + // control_buf is only valid for the lifetime of this call, thus is // interpretted as a slice. let control_slice = unsafe { @@ -240,16 +272,23 @@ extern "C" fn pre_dispatch( let dispatch = isolate.dispatch; let (is_sync, op) = dispatch(isolate, control_slice, data_slice); + isolate + .state + .metrics_op_dispatched(bytes_sent_control, bytes_sent_data); + if is_sync { // Execute op synchronously. let buf = tokio_util::block_on(op).unwrap(); - if buf.len() != 0 { + let buf_size = buf.len(); + if buf_size != 0 { // Set the synchronous response, the value returned from isolate.send(). isolate.respond(req_id, buf); } + + isolate.state.metrics_op_completed(buf_size as u64); } else { // Execute op asynchronously. - let state = isolate.state.clone(); + let state = Arc::clone(&isolate.state); // TODO Ideally Tokio would could tell us how many tasks are executing, but // it cannot currently. Therefore we track top-level promises/tasks @@ -258,7 +297,9 @@ extern "C" fn pre_dispatch( let task = op .and_then(move |buf| { + let buf_size = buf.len(); state.send_to_js(req_id, buf); + state.metrics_op_completed(buf_size as u64); Ok(()) }).map_err(|_| ()); tokio::spawn(task); @@ -330,4 +371,113 @@ mod tests { let op = Box::new(futures::future::ok(control)); (true, op) } + + #[test] + fn test_metrics_sync() { + let argv = vec![String::from("./deno"), String::from("hello.js")]; + let mut isolate = Isolate::new(argv, metrics_dispatch_sync); + tokio_util::init(|| { + // Verify that metrics have been properly initialized. + { + let metrics = isolate.state.metrics.lock().unwrap(); + assert_eq!(metrics.ops_dispatched, 0); + assert_eq!(metrics.ops_completed, 0); + assert_eq!(metrics.bytes_sent_control, 0); + assert_eq!(metrics.bytes_sent_data, 0); + assert_eq!(metrics.bytes_received, 0); + } + + isolate + .execute( + "y.js", + r#" + const control = new Uint8Array([4, 5, 6]); + const data = new Uint8Array([42, 43, 44, 45, 46]); + libdeno.send(control, data); + "#, + ).expect("execute error"); + isolate.event_loop(); + let metrics = isolate.state.metrics.lock().unwrap(); + assert_eq!(metrics.ops_dispatched, 1); + assert_eq!(metrics.ops_completed, 1); + assert_eq!(metrics.bytes_sent_control, 3); + assert_eq!(metrics.bytes_sent_data, 5); + assert_eq!(metrics.bytes_received, 4); + }); + } + + #[test] + fn test_metrics_async() { + let argv = vec![String::from("./deno"), String::from("hello.js")]; + let mut isolate = Isolate::new(argv, metrics_dispatch_async); + tokio_util::init(|| { + // Verify that metrics have been properly initialized. + { + let metrics = isolate.state.metrics.lock().unwrap(); + assert_eq!(metrics.ops_dispatched, 0); + assert_eq!(metrics.ops_completed, 0); + assert_eq!(metrics.bytes_sent_control, 0); + assert_eq!(metrics.bytes_sent_data, 0); + assert_eq!(metrics.bytes_received, 0); + } + + isolate + .execute( + "y.js", + r#" + const control = new Uint8Array([4, 5, 6]); + const data = new Uint8Array([42, 43, 44, 45, 46]); + let r = libdeno.send(control, data); + if (r != null) throw Error("expected null"); + "#, + ).expect("execute error"); + + // Make sure relevant metrics are updated before task is executed. + { + let metrics = isolate.state.metrics.lock().unwrap(); + assert_eq!(metrics.ops_dispatched, 1); + assert_eq!(metrics.bytes_sent_control, 3); + assert_eq!(metrics.bytes_sent_data, 5); + // Note we cannot check ops_completed nor bytes_received because that + // would be a race condition. It might be nice to have use a oneshot + // with metrics_dispatch_async() to properly validate them. + } + + isolate.event_loop(); + + // Make sure relevant metrics are updated after task is executed. + { + let metrics = isolate.state.metrics.lock().unwrap(); + assert_eq!(metrics.ops_dispatched, 1); + assert_eq!(metrics.ops_completed, 1); + assert_eq!(metrics.bytes_sent_control, 3); + assert_eq!(metrics.bytes_sent_data, 5); + assert_eq!(metrics.bytes_received, 4); + } + }); + } + + fn metrics_dispatch_sync( + _isolate: &mut Isolate, + _control: &[u8], + _data: &'static mut [u8], + ) -> (bool, Box) { + // Send back some sync response + let vec: Vec = vec![1, 2, 3, 4]; + let control = vec.into_boxed_slice(); + let op = Box::new(futures::future::ok(control)); + (true, op) + } + + fn metrics_dispatch_async( + _isolate: &mut Isolate, + _control: &[u8], + _data: &'static mut [u8], + ) -> (bool, Box) { + // Send back some sync response + let vec: Vec = vec![1, 2, 3, 4]; + let control = vec.into_boxed_slice(); + let op = Box::new(futures::future::ok(control)); + (false, op) + } } diff --git a/src/msg.fbs b/src/msg.fbs index 869452ced6..40179fc14d 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -41,6 +41,8 @@ union Any { Accept, Dial, NewConn, + Metrics, + MetricsRes, } enum ErrorKind: byte { @@ -321,4 +323,14 @@ table NewConn { local_addr: string; } +table Metrics {} + +table MetricsRes { + ops_dispatched: uint64; + ops_completed: uint64; + bytes_sent_control: uint64; + bytes_sent_data: uint64; + bytes_received: uint64; +} + root_type Base; diff --git a/src/ops.rs b/src/ops.rs index 3aad572bdb..c32f536340 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -101,6 +101,7 @@ pub fn dispatch( msg::Any::Listen => op_listen, msg::Any::Accept => op_accept, msg::Any::Dial => op_dial, + msg::Any::Metrics => op_metrics, _ => panic!(format!( "Unhandled message {}", msg::enum_name_any(inner_type) @@ -465,7 +466,7 @@ where // fn blocking(is_sync: bool, f: F) -> Box // where F: FnOnce() -> DenoResult macro_rules! blocking { - ($is_sync:expr,$fn:expr) => { + ($is_sync:expr, $fn:expr) => { if $is_sync { // If synchronous, execute the function immediately on the main thread. Box::new(futures::future::result($fn())) @@ -1153,3 +1154,36 @@ fn op_dial( .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream)); Box::new(op) } + +fn op_metrics( + state: Arc, + base: &msg::Base, + data: &'static mut [u8], +) -> Box { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + + let metrics = state.metrics.lock().unwrap(); + + let builder = &mut FlatBufferBuilder::new(); + let inner = msg::MetricsRes::create( + builder, + &msg::MetricsResArgs { + ops_dispatched: metrics.ops_dispatched, + ops_completed: metrics.ops_completed, + bytes_sent_control: metrics.bytes_sent_control, + bytes_sent_data: metrics.bytes_sent_data, + bytes_received: metrics.bytes_received, + ..Default::default() + }, + ); + ok_future(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::MetricsRes, + ..Default::default() + }, + )) +} diff --git a/tools/http_server.py b/tools/http_server.py index d33f24d5d4..c627dfd5f6 100755 --- a/tools/http_server.py +++ b/tools/http_server.py @@ -55,5 +55,4 @@ def spawn(): if __name__ == '__main__': - s = server() - s.serve_forever() + spawn().join()