1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-07 22:58:24 -05:00
denoland-deno/ext/cache/sqlite.rs
Luca Casonato 3b6b75bb46
feat(core): improve resource read & write traits (#16115)
This commit introduces two new buffer wrapper types to `deno_core`. The
main benefit of these new wrappers is that they can wrap a number of
different underlying buffer types. This allows for a more flexible read
and write API on resources that will require less copying of data
between different buffer representations.

- `BufView` is a read-only view onto a buffer. It can be backed by
`ZeroCopyBuf`, `Vec<u8>`, and `bytes::Bytes`.
- `BufViewMut` is a read-write view onto a buffer. It can be cheaply
converted into a `BufView`. It can be backed by `ZeroCopyBuf` or
`Vec<u8>`.

Both new buffer views have a cursor. This means that the start point of
the view can be constrained to write / read from just a slice of the
view. Only the start point of the slice can be adjusted. The end point
is fixed. To adjust the end point, the underlying buffer needs to be
truncated.

Readable resources have been changed to better cater to resources that
do not support BYOB reads. The basic `read` method now returns a
`BufView` instead of taking a `ZeroCopyBuf` to fill. This allows the
operation to return buffers that the resource has already allocated,
instead of forcing the caller to allocate the buffer. BYOB reads are
still very useful for resources that support them, so a new `read_byob`
method has been added that takes a `BufViewMut` to fill. `op_read`
attempts to use `read_byob` if the resource supports it, which falls
back to `read` and performs an additional copy if it does not. For
Rust->JS reads this change should have no impact, but for Rust->Rust
reads, this allows the caller to avoid an additional copy in many
scenarios. This combined with the support for `BufView` to be backed by
`bytes::Bytes` allows us to avoid one data copy when piping from a
`fetch` response into an `ext/http` response.

Writable resources have been changed to take a `BufView` instead of a
`ZeroCopyBuf` as an argument. This allows for less copying of data in
certain scenarios, as described above. Additionally a new
`Resource::write_all` method has been added that takes a `BufView` and
continually attempts to write the resource until the entire buffer has
been written. Certain resources like files can override this method to
provide a more efficient `write_all` implementation.
2022-10-09 14:49:25 +00:00

413 lines
13 KiB
Rust

// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use async_trait::async_trait;
use deno_core::error::AnyError;
use deno_core::parking_lot::Mutex;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::ByteString;
use deno_core::Resource;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use std::borrow::Cow;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use crate::deserialize_headers;
use crate::get_header;
use crate::serialize_headers;
use crate::vary_header_matches;
use crate::Cache;
use crate::CacheDeleteRequest;
use crate::CacheMatchRequest;
use crate::CacheMatchResponseMeta;
use crate::CachePutRequest;
#[derive(Clone)]
pub struct SqliteBackedCache {
pub connection: Arc<Mutex<Connection>>,
pub cache_storage_dir: PathBuf,
}
impl SqliteBackedCache {
pub fn new(cache_storage_dir: PathBuf) -> Self {
{
std::fs::create_dir_all(&cache_storage_dir)
.expect("failed to create cache dir");
let path = cache_storage_dir.join("cache_metadata.db");
let connection = rusqlite::Connection::open(&path).unwrap_or_else(|_| {
panic!("failed to open cache db at {}", path.display())
});
connection
.execute(
"CREATE TABLE IF NOT EXISTS cache_storage (
id INTEGER PRIMARY KEY,
cache_name TEXT NOT NULL UNIQUE
)",
(),
)
.expect("failed to create cache_storage table");
connection
.execute(
"CREATE TABLE IF NOT EXISTS request_response_list (
id INTEGER PRIMARY KEY,
cache_id INTEGER NOT NULL,
request_url TEXT NOT NULL,
request_headers BLOB NOT NULL,
response_headers BLOB NOT NULL,
response_status INTEGER NOT NULL,
response_status_text TEXT,
response_body_key TEXT,
last_inserted_at INTEGER UNSIGNED NOT NULL,
FOREIGN KEY (cache_id) REFERENCES cache_storage(id) ON DELETE CASCADE,
UNIQUE (cache_id, request_url)
)",
(),
)
.expect("failed to create request_response_list table");
SqliteBackedCache {
connection: Arc::new(Mutex::new(connection)),
cache_storage_dir,
}
}
}
}
#[async_trait]
impl Cache for SqliteBackedCache {
/// Open a cache storage. Internally, this creates a row in the
/// sqlite db if the cache doesn't exist and returns the internal id
/// of the cache.
async fn storage_open(&self, cache_name: String) -> Result<i64, AnyError> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
tokio::task::spawn_blocking(move || {
let db = db.lock();
db.execute(
"INSERT OR IGNORE INTO cache_storage (cache_name) VALUES (?1)",
params![cache_name],
)?;
let cache_id = db.query_row(
"SELECT id FROM cache_storage WHERE cache_name = ?1",
params![cache_name],
|row| {
let id: i64 = row.get(0)?;
Ok(id)
},
)?;
let responses_dir = get_responses_dir(cache_storage_dir, cache_id);
std::fs::create_dir_all(&responses_dir)?;
Ok::<i64, AnyError>(cache_id)
})
.await?
}
/// Check if a cache with the provided name exists.
/// Note: this doesn't check the disk, it only checks the sqlite db.
async fn storage_has(&self, cache_name: String) -> Result<bool, AnyError> {
let db = self.connection.clone();
tokio::task::spawn_blocking(move || {
let db = db.lock();
let cache_exists = db.query_row(
"SELECT count(cache_name) FROM cache_storage WHERE cache_name = ?1",
params![cache_name],
|row| {
let count: i64 = row.get(0)?;
Ok(count > 0)
},
)?;
Ok::<bool, AnyError>(cache_exists)
})
.await?
}
/// Delete a cache storage. Internally, this deletes the row in the sqlite db.
async fn storage_delete(&self, cache_name: String) -> Result<bool, AnyError> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
tokio::task::spawn_blocking(move || {
let db = db.lock();
let maybe_cache_id = db
.query_row(
"DELETE FROM cache_storage WHERE cache_name = ?1 RETURNING id",
params![cache_name],
|row| {
let id: i64 = row.get(0)?;
Ok(id)
},
)
.optional()?;
if let Some(cache_id) = maybe_cache_id {
let cache_dir = cache_storage_dir.join(cache_id.to_string());
if cache_dir.exists() {
std::fs::remove_dir_all(cache_dir)?;
}
}
Ok::<bool, AnyError>(maybe_cache_id.is_some())
})
.await?
}
async fn put(
&self,
request_response: CachePutRequest,
) -> Result<Option<Rc<dyn Resource>>, AnyError> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let response_body_key = if request_response.response_has_body {
Some(hash(&format!(
"{}_{}",
&request_response.request_url,
now.as_nanos()
)))
} else {
None
};
if let Some(body_key) = response_body_key {
let responses_dir =
get_responses_dir(cache_storage_dir, request_response.cache_id);
let response_path = responses_dir.join(&body_key);
let file = tokio::fs::File::create(response_path).await?;
Ok(Some(Rc::new(CachePutResource {
file: AsyncRefCell::new(file),
db,
put_request: request_response,
response_body_key: body_key,
start_time: now.as_secs(),
})))
} else {
insert_cache_asset(db, request_response, None).await?;
Ok(None)
}
}
async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
Option<(CacheMatchResponseMeta, Option<Rc<dyn Resource>>)>,
AnyError,
> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
let query_result = tokio::task::spawn_blocking(move || {
let db = db.lock();
let result = db.query_row(
"SELECT response_body_key, response_headers, response_status, response_status_text, request_headers
FROM request_response_list
WHERE cache_id = ?1 AND request_url = ?2",
(request.cache_id, &request.request_url),
|row| {
let response_body_key: Option<String> = row.get(0)?;
let response_headers: Vec<u8> = row.get(1)?;
let response_status: u16 = row.get(2)?;
let response_status_text: String = row.get(3)?;
let request_headers: Vec<u8> = row.get(4)?;
let response_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&response_headers);
let request_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&request_headers);
Ok((CacheMatchResponseMeta {request_headers, response_headers,response_status,response_status_text}, response_body_key))
},
);
result.optional()
})
.await??;
match query_result {
Some((cache_meta, Some(response_body_key))) => {
// From https://w3c.github.io/ServiceWorker/#request-matches-cached-item-algorithm
// If there's Vary header in the response, ensure all the
// headers of the cached request match the query request.
if let Some(vary_header) =
get_header("vary", &cache_meta.response_headers)
{
if !vary_header_matches(
&vary_header,
&request.request_headers,
&cache_meta.request_headers,
) {
return Ok(None);
}
}
let response_path =
get_responses_dir(cache_storage_dir, request.cache_id)
.join(response_body_key);
let file = tokio::fs::File::open(response_path).await?;
return Ok(Some((
cache_meta,
Some(Rc::new(CacheResponseResource::new(file))),
)));
}
Some((cache_meta, None)) => {
return Ok(Some((cache_meta, None)));
}
None => return Ok(None),
}
}
async fn delete(
&self,
request: CacheDeleteRequest,
) -> Result<bool, AnyError> {
let db = self.connection.clone();
tokio::task::spawn_blocking(move || {
// TODO(@satyarohith): remove the response body from disk if one exists
let db = db.lock();
let rows_effected = db.execute(
"DELETE FROM request_response_list WHERE cache_id = ?1 AND request_url = ?2",
(request.cache_id, &request.request_url),
)?;
Ok::<bool, AnyError>(rows_effected > 0)
})
.await?
}
}
async fn insert_cache_asset(
db: Arc<Mutex<rusqlite::Connection>>,
put: CachePutRequest,
body_key_start_time: Option<(String, u64)>,
) -> Result<Option<String>, deno_core::anyhow::Error> {
tokio::task::spawn_blocking(move || {
let maybe_response_body = {
let db = db.lock();
let mut response_body_key = None;
if let Some((body_key, start_time)) = body_key_start_time {
response_body_key = Some(body_key);
let last_inserted_at = db.query_row("
SELECT last_inserted_at FROM request_response_list
WHERE cache_id = ?1 AND request_url = ?2",
(put.cache_id, &put.request_url), |row| {
let last_inserted_at: i64 = row.get(0)?;
Ok(last_inserted_at)
}).optional()?;
if let Some(last_inserted) = last_inserted_at {
// Some other worker has already inserted this resource into the cache.
// Note: okay to unwrap() as it is always present when response_body_key is present.
if start_time > (last_inserted as u64) {
return Ok(None);
}
}
}
db.query_row(
"INSERT OR REPLACE INTO request_response_list
(cache_id, request_url, request_headers, response_headers,
response_body_key, response_status, response_status_text, last_inserted_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
RETURNING response_body_key",
(
put.cache_id,
put.request_url,
serialize_headers(&put.request_headers),
serialize_headers(&put.response_headers),
response_body_key,
put.response_status,
put.response_status_text,
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
),
|row| {
let response_body_key: Option<String> = row.get(0)?;
Ok(response_body_key)
},
)?
};
Ok::<Option<String>, AnyError>(maybe_response_body)
}).await?
}
#[inline]
fn get_responses_dir(cache_storage_dir: PathBuf, cache_id: i64) -> PathBuf {
cache_storage_dir
.join(cache_id.to_string())
.join("responses")
}
impl deno_core::Resource for SqliteBackedCache {
fn name(&self) -> std::borrow::Cow<str> {
"SqliteBackedCache".into()
}
}
pub struct CachePutResource {
pub db: Arc<Mutex<rusqlite::Connection>>,
pub put_request: CachePutRequest,
pub response_body_key: String,
pub file: AsyncRefCell<tokio::fs::File>,
pub start_time: u64,
}
impl CachePutResource {
async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
file.write_all(data).await?;
Ok(data.len())
}
async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
file.flush().await?;
file.sync_all().await?;
insert_cache_asset(
self.db.clone(),
self.put_request.clone(),
Some((self.response_body_key.clone(), self.start_time)),
)
.await?;
Ok(())
}
}
impl Resource for CachePutResource {
fn name(&self) -> Cow<str> {
"CachePutResource".into()
}
deno_core::impl_writable!();
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
}
}
pub struct CacheResponseResource {
file: AsyncRefCell<tokio::fs::File>,
}
impl CacheResponseResource {
fn new(file: tokio::fs::File) -> Self {
Self {
file: AsyncRefCell::new(file),
}
}
async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
let nread = file.read(data).await?;
Ok(nread)
}
}
impl Resource for CacheResponseResource {
deno_core::impl_readable_byob!();
fn name(&self) -> Cow<str> {
"CacheResponseResource".into()
}
}
pub fn hash(token: &str) -> String {
use sha2::Digest;
format!("{:x}", sha2::Sha256::digest(token.as_bytes()))
}