// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::cell::RefCell; use std::rc::Rc; use crate::remote::RemoteDbHandlerPermissions; use crate::sqlite::SqliteDbHandler; use crate::sqlite::SqliteDbHandlerPermissions; use crate::AtomicWrite; use crate::CommitResult; use crate::Database; use crate::DatabaseHandler; use crate::QueueMessageHandle; use crate::ReadRange; use crate::ReadRangeOutput; use crate::SnapshotReadOptions; use async_trait::async_trait; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::OpState; pub struct MultiBackendDbHandler { backends: Vec<(&'static [&'static str], Box)>, } impl MultiBackendDbHandler { pub fn new( backends: Vec<(&'static [&'static str], Box)>, ) -> Self { Self { backends } } pub fn remote_or_sqlite< P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static, >( default_storage_dir: Option, ) -> Self { Self::new(vec![ ( &["https://", "http://"], Box::new(crate::remote::RemoteDbHandler::

::new()), ), ( &[""], Box::new(SqliteDbHandler::

::new(default_storage_dir)), ), ]) } } #[async_trait(?Send)] impl DatabaseHandler for MultiBackendDbHandler { type DB = Box; async fn open( &self, state: Rc>, path: Option, ) -> Result { for (prefixes, handler) in &self.backends { for &prefix in *prefixes { if prefix.is_empty() { return handler.dyn_open(state.clone(), path.clone()).await; } let Some(path) = &path else { continue; }; if path.starts_with(prefix) { return handler.dyn_open(state.clone(), Some(path.clone())).await; } } } Err(type_error(format!( "No backend supports the given path: {:?}", path ))) } } #[async_trait(?Send)] pub trait DynamicDbHandler { async fn dyn_open( &self, state: Rc>, path: Option, ) -> Result, AnyError>; } #[async_trait(?Send)] impl DatabaseHandler for Box { type DB = Box; async fn open( &self, state: Rc>, path: Option, ) -> Result { (**self).dyn_open(state, path).await } } #[async_trait(?Send)] impl DynamicDbHandler for T where T: DatabaseHandler, DB: Database + 'static, { async fn dyn_open( &self, state: Rc>, path: Option, ) -> Result, AnyError> { Ok(Box::new(self.open(state, path).await?)) } } #[async_trait(?Send)] pub trait DynamicDb { async fn dyn_snapshot_read( &self, state: Rc>, requests: Vec, options: SnapshotReadOptions, ) -> Result, AnyError>; async fn dyn_atomic_write( &self, state: Rc>, write: AtomicWrite, ) -> Result, AnyError>; async fn dyn_dequeue_next_message( &self, state: Rc>, ) -> Result, AnyError>; fn dyn_close(&self); } #[async_trait(?Send)] impl Database for Box { type QMH = Box; async fn snapshot_read( &self, state: Rc>, requests: Vec, options: SnapshotReadOptions, ) -> Result, AnyError> { (**self).dyn_snapshot_read(state, requests, options).await } async fn atomic_write( &self, state: Rc>, write: AtomicWrite, ) -> Result, AnyError> { (**self).dyn_atomic_write(state, write).await } async fn dequeue_next_message( &self, state: Rc>, ) -> Result, AnyError> { (**self).dyn_dequeue_next_message(state).await } fn close(&self) { (**self).dyn_close() } } #[async_trait(?Send)] impl DynamicDb for T where T: Database, QMH: QueueMessageHandle + 'static, { async fn dyn_snapshot_read( &self, state: Rc>, requests: Vec, options: SnapshotReadOptions, ) -> Result, AnyError> { Ok(self.snapshot_read(state, requests, options).await?) } async fn dyn_atomic_write( &self, state: Rc>, write: AtomicWrite, ) -> Result, AnyError> { Ok(self.atomic_write(state, write).await?) } async fn dyn_dequeue_next_message( &self, state: Rc>, ) -> Result, AnyError> { Ok(Box::new(self.dequeue_next_message(state).await?)) } fn dyn_close(&self) { self.close() } } #[async_trait(?Send)] impl QueueMessageHandle for Box { async fn take_payload(&mut self) -> Result, AnyError> { (**self).take_payload().await } async fn finish(&self, success: bool) -> Result<(), AnyError> { (**self).finish(success).await } }