// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use std::cell::RefCell; use std::collections::HashMap; use std::io::Read; use std::time::Duration; use pretty_assertions::assert_eq; use regex::Regex; use reqwest::RequestBuilder; use test_util as util; use test_util::DenoChild; use tokio::time::timeout; struct ServeClient { child: RefCell, client: reqwest::Client, output_buf: RefCell>, endpoint: RefCell>, } impl Drop for ServeClient { fn drop(&mut self) { let mut child = self.child.borrow_mut(); child.kill().unwrap(); child.wait().unwrap(); } } struct ServeClientBuilder(util::TestCommandBuilder, Option); impl ServeClientBuilder { fn build(self) -> ServeClient { let Some(entry_point) = self.1 else { panic!("entry point required"); }; let cmd = self.0.arg(entry_point); let child = cmd.spawn().unwrap(); ServeClient::with_child(child) } fn map( self, f: impl FnOnce(util::TestCommandBuilder) -> util::TestCommandBuilder, ) -> Self { Self(f(self.0), self.1) } fn entry_point(self, file: impl AsRef) -> Self { Self(self.0, Some(file.as_ref().into())) } fn worker_count(self, n: Option) -> Self { self.map(|t| { let t = t.arg("--parallel"); if let Some(n) = n { t.env("DENO_JOBS", n.to_string()) } else { t } }) } fn new() -> Self { Self( util::deno_cmd() .env("NO_COLOR", "1") .current_dir(util::testdata_path()) .arg("serve") .arg("--port") .arg("0") .stdout_piped() .stderr_piped(), None, ) } } impl ServeClient { fn builder() -> ServeClientBuilder { ServeClientBuilder::new() } fn with_child(child: DenoChild) -> Self { Self { child: RefCell::new(child), output_buf: Default::default(), endpoint: Default::default(), client: reqwest::Client::builder() .add_root_certificate( reqwest::Certificate::from_pem(include_bytes!( "../testdata/tls/RootCA.crt" )) .unwrap(), ) // disable connection pooling so we create a new connection per request // which allows us to distribute requests across workers .pool_max_idle_per_host(0) .pool_idle_timeout(Duration::from_nanos(1)) .http2_prior_knowledge() .build() .unwrap(), } } fn kill(self) { let mut child = self.child.borrow_mut(); child.kill().unwrap(); child.wait().unwrap(); } fn output(self) -> String { let mut child = self.child.borrow_mut(); child.kill().unwrap(); let mut stderr = child.stderr.take().unwrap(); child.wait().unwrap(); let mut output_buf = self.output_buf.borrow_mut(); stderr.read_to_end(&mut output_buf).unwrap(); String::from_utf8(std::mem::take(&mut *output_buf)).unwrap() } fn get(&self) -> RequestBuilder { let endpoint = self.endpoint(); self.client.get(&*endpoint) } fn endpoint(&self) -> String { if let Some(e) = self.endpoint.borrow().as_ref() { return e.to_string(); }; let mut buffer = self.output_buf.borrow_mut(); let mut temp_buf = [0u8; 64]; let mut child = self.child.borrow_mut(); let stderr = child.stderr.as_mut().unwrap(); let port_regex = regex::bytes::Regex::new(r"Listening on https?:[^:]+:(\d+)/").unwrap(); let start = std::time::Instant::now(); // try to find the port number in the output // it may not be the first line, so we need to read the output in a loop let port = loop { if start.elapsed() > Duration::from_secs(5) { panic!( "timed out waiting for serve to start. serve output:\n{}", String::from_utf8_lossy(&buffer) ); } let read = stderr.read(&mut temp_buf).unwrap(); buffer.extend_from_slice(&temp_buf[..read]); if let Some(p) = port_regex .captures(&buffer) .and_then(|c| c.get(1)) .map(|v| std::str::from_utf8(v.as_bytes()).unwrap().to_owned()) { break p; } // this is technically blocking, but it's just a test and // I don't want to switch RefCell to Mutex just for this std::thread::sleep(Duration::from_millis(10)); }; eprintln!("stderr: {}", String::from_utf8_lossy(&temp_buf)); self .endpoint .replace(Some(format!("http://127.0.0.1:{port}"))); return self.endpoint.borrow().clone().unwrap(); } } #[tokio::test] async fn deno_serve_port_0() { let client = ServeClient::builder() .entry_point("./serve/port_0.ts") .build(); let res = client.get().send().await.unwrap(); assert_eq!(200, res.status()); let body = res.text().await.unwrap(); assert_eq!(body, "deno serve --port 0 works!"); client.kill(); } #[tokio::test] async fn deno_serve_no_args() { let client = ServeClient::builder() .entry_point("./serve/no_args.ts") .build(); let res = client.get().send().await.unwrap(); assert_eq!(200, res.status()); let body = res.text().await.unwrap(); assert_eq!(body, "deno serve with no args in fetch() works!"); } #[tokio::test] async fn deno_serve_parallel() { let client = ServeClient::builder() .entry_point("./serve/parallel.ts") .worker_count(Some(4)) .build(); let mut serve_counts = HashMap::::new(); tokio::time::sleep(Duration::from_millis(1000)).await; let serve_regex = Regex::new(r"\[serve\-worker\-(\d+)\s*\] serving request").unwrap(); for _ in 0..100 { let response = timeout(Duration::from_secs(2), client.get().send()) .await .unwrap() .unwrap(); assert_eq!(200, response.status()); let body = response.text().await.unwrap(); assert_eq!(body, "deno serve parallel"); tokio::time::sleep(Duration::from_millis(1)).await; } let output = client.output(); let listening_regex = Regex::new(r"Listening on http[\w:/\.]+ with (\d+) threads").unwrap(); eprintln!("serve output:\n{output}"); assert_eq!( listening_regex .captures(&output) .unwrap() .get(1) .unwrap() .as_str() .trim(), "4" ); // make sure all workers have at least started let mut started = [false; 4]; let start_regex = Regex::new(r"\[serve\-worker\-(\d+)\s*\] starting serve").unwrap(); for capture in start_regex.captures_iter(&output) { if let Some(worker_number) = capture.get(1).and_then(|m| m.as_str().parse::().ok()) { started[worker_number as usize] = true; } } assert!(started.iter().all(|&b| b)); for capture in serve_regex.captures_iter(&output) { if let Some(worker_number) = capture.get(1).and_then(|m| m.as_str().parse::().ok()) { *serve_counts.entry(worker_number).or_default() += 1; } } #[cfg(not(target_vendor = "apple"))] // FIXME: flaky on macOS, it tends to not distribute requests evenly assert!( serve_counts.values().filter(|&&n| n > 2).count() >= 2, "bad {serve_counts:?}" ); }