1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-22 15:06:54 -05:00
denoland-deno/tests/integration/serve_tests.rs
Nathan Whitaker e92a05b551
feat(serve): Opt-in parallelism for deno serve (#24920)
Adds a `parallel` flag to `deno serve`. When present, we spawn multiple
workers to parallelize serving requests.


```bash
deno serve --parallel main.ts
```

Currently on linux we use `SO_REUSEPORT` and rely on the fact that the
kernel will distribute connections in a round-robin manner.

On mac and windows, we sort of emulate this by cloning the underlying
file descriptor and passing a handle to each worker. The connections
will not be guaranteed to be fairly distributed (and in practice almost
certainly won't be), but the distribution is still spread enough to
provide a significant performance increase.

---
(Run on an Macbook Pro with an M3 Max, serving `deno.com`

baseline::
```
❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000
Running 30s test @ http://127.0.0.1:8000
  2 threads and 125 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   239.78ms   13.56ms 330.54ms   79.12%
    Req/Sec   258.58     35.56   360.00     70.64%
  Latency Distribution
     50%  236.72ms
     75%  248.46ms
     90%  256.84ms
     99%  268.23ms
  15458 requests in 30.02s, 2.47GB read
Requests/sec:    514.89
Transfer/sec:     84.33MB
```

this PR (`with --parallel` flag)
```
❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000
Running 30s test @ http://127.0.0.1:8000
  2 threads and 125 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   117.40ms  142.84ms 590.45ms   79.07%
    Req/Sec     1.33k   175.19     1.77k    69.00%
  Latency Distribution
     50%   22.34ms
     75%  223.67ms
     90%  357.32ms
     99%  460.50ms
  79636 requests in 30.07s, 12.74GB read
Requests/sec:   2647.96
Transfer/sec:    433.71MB
```
2024-08-14 22:26:21 +00:00

244 lines
6.3 KiB
Rust

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Read;
use std::time::Duration;
use pretty_assertions::assert_eq;
use regex::Regex;
use reqwest::RequestBuilder;
use test_util as util;
use test_util::DenoChild;
use tokio::time::timeout;
struct ServeClient {
child: RefCell<DenoChild>,
client: reqwest::Client,
output_buf: RefCell<Vec<u8>>,
endpoint: RefCell<Option<String>>,
}
impl Drop for ServeClient {
fn drop(&mut self) {
let mut child = self.child.borrow_mut();
child.kill().unwrap();
child.wait().unwrap();
}
}
struct ServeClientBuilder(util::TestCommandBuilder, Option<String>);
impl ServeClientBuilder {
fn build(self) -> ServeClient {
let Some(entry_point) = self.1 else {
panic!("entry point required");
};
let cmd = self.0.arg(entry_point);
let child = cmd.spawn().unwrap();
ServeClient::with_child(child)
}
fn map(
self,
f: impl FnOnce(util::TestCommandBuilder) -> util::TestCommandBuilder,
) -> Self {
Self(f(self.0), self.1)
}
fn entry_point(self, file: impl AsRef<str>) -> Self {
Self(self.0, Some(file.as_ref().into()))
}
fn worker_count(self, n: Option<u64>) -> Self {
self.map(|t| {
let t = t.arg("--parallel");
if let Some(n) = n {
t.env("DENO_JOBS", n.to_string())
} else {
t
}
})
}
fn new() -> Self {
Self(
util::deno_cmd()
.current_dir(util::testdata_path())
.arg("serve")
.arg("--port")
.arg("0")
.stdout_piped(),
None,
)
}
}
impl ServeClient {
fn builder() -> ServeClientBuilder {
ServeClientBuilder::new()
}
fn with_child(child: DenoChild) -> Self {
Self {
child: RefCell::new(child),
output_buf: Default::default(),
endpoint: Default::default(),
client: reqwest::Client::builder()
.add_root_certificate(
reqwest::Certificate::from_pem(include_bytes!(
"../testdata/tls/RootCA.crt"
))
.unwrap(),
)
// disable connection pooling so we create a new connection per request
// which allows us to distribute requests across workers
.pool_max_idle_per_host(0)
.pool_idle_timeout(Duration::from_nanos(1))
.http2_prior_knowledge()
.build()
.unwrap(),
}
}
fn kill(self) {
let mut child = self.child.borrow_mut();
child.kill().unwrap();
child.wait().unwrap();
}
fn output(self) -> String {
let mut child = self.child.borrow_mut();
child.kill().unwrap();
let mut stdout = child.stdout.take().unwrap();
child.wait().unwrap();
let mut output_buf = self.output_buf.borrow_mut();
stdout.read_to_end(&mut output_buf).unwrap();
String::from_utf8(std::mem::take(&mut *output_buf)).unwrap()
}
fn get(&self) -> RequestBuilder {
let endpoint = self.endpoint();
self.client.get(&*endpoint)
}
fn endpoint(&self) -> String {
if let Some(e) = self.endpoint.borrow().as_ref() {
return e.to_string();
};
let mut buffer = self.output_buf.borrow_mut();
let mut temp_buf = [0u8; 64];
let mut child = self.child.borrow_mut();
let stdout = child.stdout.as_mut().unwrap();
let port_regex = regex::bytes::Regex::new(r":(\d+)").unwrap();
let start = std::time::Instant::now();
// try to find the port number in the output
// it may not be the first line, so we need to read the output in a loop
let port = loop {
if start.elapsed() > Duration::from_secs(5) {
panic!(
"timed out waiting for serve to start. serve output:\n{}",
String::from_utf8_lossy(&buffer)
);
}
let read = stdout.read(&mut temp_buf).unwrap();
buffer.extend_from_slice(&temp_buf[..read]);
if let Some(p) = port_regex
.captures(&buffer)
.and_then(|c| c.get(1))
.map(|v| std::str::from_utf8(v.as_bytes()).unwrap().to_owned())
{
break p;
}
// this is technically blocking, but it's just a test and
// I don't want to switch RefCell to Mutex just for this
std::thread::sleep(Duration::from_millis(10));
};
self
.endpoint
.replace(Some(format!("http://127.0.0.1:{port}")));
return self.endpoint.borrow().clone().unwrap();
}
}
#[tokio::test]
async fn deno_serve_port_0() {
let client = ServeClient::builder()
.entry_point("./serve/port_0.ts")
.build();
let res = client.get().send().await.unwrap();
assert_eq!(200, res.status());
let body = res.text().await.unwrap();
assert_eq!(body, "deno serve --port 0 works!");
client.kill();
}
#[tokio::test]
async fn deno_serve_no_args() {
let client = ServeClient::builder()
.entry_point("./serve/no_args.ts")
.build();
let res = client.get().send().await.unwrap();
assert_eq!(200, res.status());
let body = res.text().await.unwrap();
assert_eq!(body, "deno serve with no args in fetch() works!");
}
#[tokio::test]
async fn deno_serve_parallel() {
let client = ServeClient::builder()
.entry_point("./serve/parallel.ts")
.worker_count(Some(4))
.build();
let mut serve_counts = HashMap::<u32, u32>::new();
tokio::time::sleep(Duration::from_millis(1000)).await;
let serve_regex =
Regex::new(r"\[serve\-worker\-(\d+)\s*\] serving request").unwrap();
for _ in 0..100 {
let response = timeout(Duration::from_secs(2), client.get().send())
.await
.unwrap()
.unwrap();
assert_eq!(200, response.status());
let body = response.text().await.unwrap();
assert_eq!(body, "deno serve parallel");
tokio::time::sleep(Duration::from_millis(1)).await;
}
let output = client.output();
let listening_regex =
Regex::new(r"Listening on http[\w:/\.]+ with (\d+) threads").unwrap();
eprintln!("serve output:\n{output}");
assert_eq!(
listening_regex
.captures(&output)
.unwrap()
.get(1)
.unwrap()
.as_str()
.trim(),
"4"
);
for capture in serve_regex.captures_iter(&output) {
if let Some(worker_number) =
capture.get(1).and_then(|m| m.as_str().parse::<u32>().ok())
{
*serve_counts.entry(worker_number).or_default() += 1;
}
}
assert!(
serve_counts.values().filter(|&&n| n > 2).count() >= 2,
"bad {serve_counts:?}"
);
}