fix(ext/kv): send queue wake messages accross different kv instances (#20465)

fixes #20454

Current KV queues implementation assumes that `enqueue` and
`listenQueue` are called on the same instance of `Deno.Kv`. It's
possible that the same Deno process opens multiple KV instances pointing
to the same fs path, and in that case `listenQueue` should still get
notified of messages enqueued through a different KV instance.
This commit is contained in:
Igor Zinkovsky 2023-09-29 11:40:36 -07:00 committed by GitHub
parent 5edd102f3f
commit 61b91e10ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 165 additions and 34 deletions

1
Cargo.lock generated
View file

@ -1430,6 +1430,7 @@ dependencies = [
"base64 0.13.1",
"chrono",
"deno_core",
"deno_node",
"deno_unsync 0.1.1",
"hex",
"log",

View file

@ -1819,6 +1819,40 @@ Deno.test({
},
});
Deno.test({
name: "different kv instances for enqueue and queueListen",
async fn() {
const filename = await Deno.makeTempFile({ prefix: "queue_db" });
try {
const db0 = await Deno.openKv(filename);
const db1 = await Deno.openKv(filename);
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db0.listenQueue((msg) => {
dequeuedMessage = msg;
promise.resolve();
});
try {
const res = await db1.enqueue("test");
assert(res.ok);
assertNotEquals(res.versionstamp, null);
await promise;
assertEquals(dequeuedMessage, "test");
} finally {
db0.close();
await listener;
db1.close();
}
} finally {
try {
await Deno.remove(filename);
} catch {
// pass
}
}
},
});
Deno.test({
name: "queue graceful close",
async fn() {

View file

@ -19,6 +19,7 @@ async-trait.workspace = true
base64.workspace = true
chrono.workspace = true
deno_core.workspace = true
deno_node.workspace = true
deno_unsync = "0.1.1"
hex.workspace = true
log.workspace = true

View file

@ -2,7 +2,10 @@
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::env::current_dir;
use std::future::Future;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
@ -23,11 +26,14 @@ use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::OpState;
use deno_node::PathClean;
use rand::Rng;
use rusqlite::params;
use rusqlite::OpenFlags;
use rusqlite::OptionalExtension;
use rusqlite::Transaction;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::sync::OnceCell;
@ -212,30 +218,35 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
}
}
let conn = sqlite_retry_loop(|| {
let (conn, queue_waker_key) = sqlite_retry_loop(|| {
let path = path.clone();
let default_storage_dir = self.default_storage_dir.clone();
async move {
spawn_blocking(move || {
let conn = match (path.as_deref(), &default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
rusqlite::Connection::open_in_memory()?
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
rusqlite::Connection::open_with_flags(path, flags)?
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
rusqlite::Connection::open(path)?
}
};
let (conn, queue_waker_key) =
match (path.as_deref(), &default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
(rusqlite::Connection::open_in_memory()?, None)
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
let resolved_path = canonicalize_path(&PathBuf::from(path))?;
(
rusqlite::Connection::open_with_flags(path, flags)?,
Some(resolved_path),
)
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
(rusqlite::Connection::open(path.clone())?, Some(path))
}
};
conn.pragma_update(None, "journal_mode", "wal")?;
Ok::<_, AnyError>(conn)
Ok::<_, AnyError>((conn, queue_waker_key))
})
.await
.unwrap()
@ -277,6 +288,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
Ok(SqliteDb {
conn,
queue: OnceCell::new(),
queue_waker_key,
expiration_watcher,
})
}
@ -285,6 +297,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
pub struct SqliteDb {
conn: ProtectedConn,
queue: OnceCell<SqliteQueue>,
queue_waker_key: Option<PathBuf>,
expiration_watcher: deno_core::unsync::JoinHandle<()>,
}
@ -363,7 +376,7 @@ pub struct DequeuedMessage {
conn: WeakProtectedConn,
id: String,
payload: Option<Vec<u8>>,
waker_tx: mpsc::Sender<()>,
waker_tx: broadcast::Sender<()>,
_permit: OwnedSemaphorePermit,
}
@ -403,7 +416,7 @@ impl QueueMessageHandle for DequeuedMessage {
};
if requeued {
// If the message was requeued, wake up the dequeue loop.
self.waker_tx.send(()).await?;
let _ = self.waker_tx.send(());
}
Ok(())
}
@ -422,15 +435,18 @@ struct SqliteQueue {
conn: ProtectedConn,
dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>,
concurrency_limiter: Arc<Semaphore>,
waker_tx: mpsc::Sender<()>,
waker_tx: broadcast::Sender<()>,
shutdown_tx: watch::Sender<()>,
}
impl SqliteQueue {
fn new(conn: ProtectedConn) -> Self {
fn new(
conn: ProtectedConn,
waker_tx: broadcast::Sender<()>,
waker_rx: broadcast::Receiver<()>,
) -> Self {
let conn_clone = conn.clone();
let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
let (waker_tx, waker_rx) = mpsc::channel::<()>(1);
let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec<u8>, String)>(64);
spawn(async move {
@ -486,11 +502,6 @@ impl SqliteQueue {
}))
}
async fn wake(&self) -> Result<(), AnyError> {
self.waker_tx.send(()).await?;
Ok(())
}
fn shutdown(&self) {
let _ = self.shutdown_tx.send(());
}
@ -499,7 +510,7 @@ impl SqliteQueue {
conn: ProtectedConn,
dequeue_tx: mpsc::Sender<(Vec<u8>, String)>,
mut shutdown_rx: watch::Receiver<()>,
mut waker_rx: mpsc::Receiver<()>,
mut waker_rx: broadcast::Receiver<()>,
) -> Result<(), AnyError> {
loop {
let messages = SqliteDb::run_tx(conn.clone(), move |tx| {
@ -575,7 +586,9 @@ impl SqliteQueue {
};
tokio::select! {
_ = sleep_fut => {}
x = waker_rx.recv() => if x.is_none() {return Ok(());},
x = waker_rx.recv() => {
if let Err(RecvError::Closed) = x {return Ok(());}
},
_ = shutdown_rx.changed() => return Ok(())
}
}
@ -773,7 +786,7 @@ impl Database for SqliteDb {
async fn atomic_write(
&self,
_state: Rc<RefCell<OpState>>,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
let write = Arc::new(write);
@ -892,8 +905,17 @@ impl Database for SqliteDb {
.await?;
if has_enqueues {
if let Some(queue) = self.queue.get() {
queue.wake().await?;
match self.queue.get() {
Some(queue) => {
let _ = queue.waker_tx.send(());
}
None => {
if let Some(waker_key) = &self.queue_waker_key {
let (waker_tx, _) =
shared_queue_waker_channel(waker_key, state.clone());
let _ = waker_tx.send(());
}
}
}
}
Ok(commit_result)
@ -901,11 +923,21 @@ impl Database for SqliteDb {
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
state: Rc<RefCell<OpState>>,
) -> Result<Option<Self::QMH>, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })
.get_or_init(|| async move {
let (waker_tx, waker_rx) = {
match &self.queue_waker_key {
Some(waker_key) => {
shared_queue_waker_channel(waker_key, state.clone())
}
None => broadcast::channel(1),
}
};
SqliteQueue::new(self.conn.clone(), waker_tx, waker_rx)
})
.await;
let handle = queue.dequeue().await?;
Ok(handle)
@ -1012,6 +1044,69 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
}
}
pub struct QueueWaker {
wakers_tx: HashMap<PathBuf, broadcast::Sender<()>>,
}
fn shared_queue_waker_channel(
waker_key: &Path,
state: Rc<RefCell<OpState>>,
) -> (broadcast::Sender<()>, broadcast::Receiver<()>) {
let mut state = state.borrow_mut();
let waker = {
let waker = state.try_borrow_mut::<QueueWaker>();
match waker {
Some(waker) => waker,
None => {
let waker = QueueWaker {
wakers_tx: HashMap::new(),
};
state.put::<QueueWaker>(waker);
state.borrow_mut::<QueueWaker>()
}
}
};
let waker_tx = waker
.wakers_tx
.entry(waker_key.to_path_buf())
.or_insert_with(|| {
let (waker_tx, _) = broadcast::channel(1);
waker_tx
});
(waker_tx.clone(), waker_tx.subscribe())
}
/// Same as Path::canonicalize, but also handles non-existing paths.
fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> {
let path = path.to_path_buf().clean();
let mut path = path;
let mut names_stack = Vec::new();
loop {
match path.canonicalize() {
Ok(mut canonicalized_path) => {
for name in names_stack.into_iter().rev() {
canonicalized_path = canonicalized_path.join(name);
}
return Ok(canonicalized_path);
}
Err(err) if err.kind() == ErrorKind::NotFound => {
let file_name = path.file_name().map(|os_str| os_str.to_os_string());
if let Some(file_name) = file_name {
names_stack.push(file_name.to_str().unwrap().to_string());
path = path.parent().unwrap().to_path_buf();
} else {
names_stack.push(path.to_str().unwrap().to_string());
let current_dir = current_dir()?;
path = current_dir.clone();
}
}
Err(err) => return Err(err.into()),
}
}
}
fn is_conn_closed_error(e: &AnyError) -> bool {
get_custom_error_class(e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE