feat(ext/kv): key expiration (#20091)

Co-authored-by: Luca Casonato <hello@lcas.dev>
This commit is contained in:
Heyang Zhou 2023-08-18 17:34:16 +08:00 committed by GitHub
parent b5839eefcf
commit c77c836a23
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 334 additions and 60 deletions

View file

@ -1806,3 +1806,129 @@ Deno.test({
}
},
});
Deno.test({
name: "kv expiration",
async fn() {
const filename = await Deno.makeTempFile({ prefix: "kv_expiration_db" });
try {
await Deno.remove(filename);
} catch {
// pass
}
let db: Deno.Kv | null = null;
try {
db = await Deno.openKv(filename);
await db.set(["a"], 1, { expireIn: 1000 });
await db.set(["b"], 2, { expireIn: 1000 });
assertEquals((await db.get(["a"])).value, 1);
assertEquals((await db.get(["b"])).value, 2);
// Value overwrite should also reset expiration
await db.set(["b"], 2, { expireIn: 3600 * 1000 });
// Wait for expiration
await sleep(1000);
// Re-open to trigger immediate cleanup
db.close();
db = null;
db = await Deno.openKv(filename);
let ok = false;
for (let i = 0; i < 50; i++) {
await sleep(100);
if (
JSON.stringify(
(await db.getMany([["a"], ["b"]])).map((x) => x.value),
) === "[null,2]"
) {
ok = true;
break;
}
}
if (!ok) {
throw new Error("Values did not expire");
}
} finally {
if (db) {
try {
db.close();
} catch {
// pass
}
}
try {
await Deno.remove(filename);
} catch {
// pass
}
}
},
});
Deno.test({
name: "kv expiration with atomic",
async fn() {
const filename = await Deno.makeTempFile({ prefix: "kv_expiration_db" });
try {
await Deno.remove(filename);
} catch {
// pass
}
let db: Deno.Kv | null = null;
try {
db = await Deno.openKv(filename);
await db.atomic().set(["a"], 1, { expireIn: 1000 }).set(["b"], 2, {
expireIn: 1000,
}).commit();
assertEquals((await db.getMany([["a"], ["b"]])).map((x) => x.value), [
1,
2,
]);
// Wait for expiration
await sleep(1000);
// Re-open to trigger immediate cleanup
db.close();
db = null;
db = await Deno.openKv(filename);
let ok = false;
for (let i = 0; i < 50; i++) {
await sleep(100);
if (
JSON.stringify(
(await db.getMany([["a"], ["b"]])).map((x) => x.value),
) === "[null,null]"
) {
ok = true;
break;
}
}
if (!ok) {
throw new Error("Values did not expire");
}
} finally {
if (db) {
try {
db.close();
} catch {
// pass
}
}
try {
await Deno.remove(filename);
} catch {
// pass
}
}
},
});

View file

@ -1357,7 +1357,13 @@ declare namespace Deno {
* mutation is applied to the key.
*
* - `set` - Sets the value of the key to the given value, overwriting any
* existing value.
* existing value. Optionally an `expireIn` option can be specified to
* set a time-to-live (TTL) for the key. The TTL is specified in
* milliseconds, and the key will be deleted from the database at earliest
* after the specified number of milliseconds have elapsed. Once the
* specified duration has passed, the key may still be visible for some
* additional time. If the `expireIn` option is not specified, the key will
* not expire.
* - `delete` - Deletes the key from the database. The mutation is a no-op if
* the key does not exist.
* - `sum` - Adds the given value to the existing value of the key. Both the
@ -1379,7 +1385,7 @@ declare namespace Deno {
export type KvMutation =
& { key: KvKey }
& (
| { type: "set"; value: unknown }
| { type: "set"; value: unknown; expireIn?: number }
| { type: "delete" }
| { type: "sum"; value: KvU64 }
| { type: "max"; value: KvU64 }
@ -1591,8 +1597,15 @@ declare namespace Deno {
/**
* Add to the operation a mutation that sets the value of the specified key
* to the specified value if all checks pass during the commit.
*
* Optionally an `expireIn` option can be specified to set a time-to-live
* (TTL) for the key. The TTL is specified in milliseconds, and the key will
* be deleted from the database at earliest after the specified number of
* milliseconds have elapsed. Once the specified duration has passed, the
* key may still be visible for some additional time. If the `expireIn`
* option is not specified, the key will not expire.
*/
set(key: KvKey, value: unknown): this;
set(key: KvKey, value: unknown, options?: { expireIn?: number }): this;
/**
* Add to the operation a mutation that deletes the specified key if all
* checks pass during the commit.
@ -1721,8 +1734,19 @@ declare namespace Deno {
* const db = await Deno.openKv();
* await db.set(["foo"], "bar");
* ```
*
* Optionally an `expireIn` option can be specified to set a time-to-live
* (TTL) for the key. The TTL is specified in milliseconds, and the key will
* be deleted from the database at earliest after the specified number of
* milliseconds have elapsed. Once the specified duration has passed, the
* key may still be visible for some additional time. If the `expireIn`
* option is not specified, the key will not expire.
*/
set(key: KvKey, value: unknown): Promise<KvCommitResult>;
set(
key: KvKey,
value: unknown,
options?: { expireIn?: number },
): Promise<KvCommitResult>;
/**
* Delete the value for the given key from the database. If no value exists

View file

@ -130,12 +130,15 @@ class Kv {
});
}
async set(key: Deno.KvKey, value: unknown) {
async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) {
value = serializeValue(value);
const checks: Deno.AtomicCheck[] = [];
const expireAt = typeof options?.expireIn === "number"
? Date.now() + options.expireIn
: undefined;
const mutations = [
[key, "set", value],
[key, "set", value, expireAt],
];
const versionstamp = await core.opAsync(
@ -152,7 +155,7 @@ class Kv {
async delete(key: Deno.KvKey) {
const checks: Deno.AtomicCheck[] = [];
const mutations = [
[key, "delete", null],
[key, "delete", null, undefined],
];
const result = await core.opAsync(
@ -318,7 +321,7 @@ class AtomicOperation {
#rid: number;
#checks: [Deno.KvKey, string | null][] = [];
#mutations: [Deno.KvKey, string, RawValue | null][] = [];
#mutations: [Deno.KvKey, string, RawValue | null, number | undefined][] = [];
#enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][] = [];
constructor(rid: number) {
@ -337,6 +340,7 @@ class AtomicOperation {
const key = mutation.key;
let type: string;
let value: RawValue | null;
let expireAt: number | undefined = undefined;
switch (mutation.type) {
case "delete":
type = "delete";
@ -345,6 +349,10 @@ class AtomicOperation {
}
break;
case "set":
if (typeof mutation.expireIn === "number") {
expireAt = Date.now() + mutation.expireIn;
}
/* falls through */
case "sum":
case "min":
case "max":
@ -357,33 +365,40 @@ class AtomicOperation {
default:
throw new TypeError("Invalid mutation type");
}
this.#mutations.push([key, type, value]);
this.#mutations.push([key, type, value, expireAt]);
}
return this;
}
sum(key: Deno.KvKey, n: bigint): this {
this.#mutations.push([key, "sum", serializeValue(new KvU64(n))]);
this.#mutations.push([key, "sum", serializeValue(new KvU64(n)), undefined]);
return this;
}
min(key: Deno.KvKey, n: bigint): this {
this.#mutations.push([key, "min", serializeValue(new KvU64(n))]);
this.#mutations.push([key, "min", serializeValue(new KvU64(n)), undefined]);
return this;
}
max(key: Deno.KvKey, n: bigint): this {
this.#mutations.push([key, "max", serializeValue(new KvU64(n))]);
this.#mutations.push([key, "max", serializeValue(new KvU64(n)), undefined]);
return this;
}
set(key: Deno.KvKey, value: unknown): this {
this.#mutations.push([key, "set", serializeValue(value)]);
set(
key: Deno.KvKey,
value: unknown,
options?: { expireIn?: number },
): this {
const expireAt = typeof options?.expireIn === "number"
? Date.now() + options.expireIn
: undefined;
this.#mutations.push([key, "set", serializeValue(value), expireAt]);
return this;
}
delete(key: Deno.KvKey): this {
this.#mutations.push([key, "delete", null]);
this.#mutations.push([key, "delete", null, undefined]);
return this;
}

View file

@ -237,6 +237,7 @@ pub struct KvCheck {
pub struct KvMutation {
pub key: Vec<u8>,
pub kind: MutationKind,
pub expire_at: Option<u64>,
}
/// A request to enqueue a message to the database. This message is delivered

View file

@ -375,7 +375,7 @@ impl TryFrom<V8KvCheck> for KvCheck {
}
}
type V8KvMutation = (KvKey, String, Option<FromV8Value>);
type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
impl TryFrom<V8KvMutation> for KvMutation {
type Error = AnyError;
@ -396,7 +396,11 @@ impl TryFrom<V8KvMutation> for KvMutation {
)))
}
};
Ok(KvMutation { key, kind })
Ok(KvMutation {
key,
kind,
expire_at: value.3,
})
}
}

View file

@ -1,7 +1,6 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
use std::cell::Cell;
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
@ -10,10 +9,12 @@ use std::path::PathBuf;
use std::rc::Rc;
use std::rc::Weak;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::SystemTime;
use async_trait::async_trait;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures;
@ -57,7 +58,7 @@ const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str =
const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str =
"select version from kv where k = ?";
const STATEMENT_KV_POINT_SET: &str =
"insert into kv (k, v, v_encoding, version) values (:k, :v, :v_encoding, :version) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version";
"insert into kv (k, v, v_encoding, version, expiration_ms) values (:k, :v, :v_encoding, :version, :expiration_ms) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version, expiration_ms = :expiration_ms";
const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?";
const STATEMENT_QUEUE_ADD_READY: &str = "insert into queue (ts, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)";
@ -79,7 +80,7 @@ create table if not exists migration_state(
)
";
const MIGRATIONS: [&str; 2] = [
const MIGRATIONS: [&str; 3] = [
"
create table data_version (
k integer primary key,
@ -112,12 +113,56 @@ create table queue_running(
primary key (deadline, id)
);
",
"
alter table kv add column seq integer not null default 0;
alter table data_version add column seq integer not null default 0;
alter table kv add column expiration_ms integer not null default -1;
create index kv_expiration_ms_idx on kv (expiration_ms);
",
];
const DISPATCH_CONCURRENCY_LIMIT: usize = 100;
const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1000, 5000, 30000, 60000];
const ERROR_USING_CLOSED_DATABASE: &str = "Attempted to use a closed database";
#[derive(Clone)]
struct ProtectedConn {
guard: Rc<AsyncRefCell<()>>,
conn: Arc<Mutex<Option<rusqlite::Connection>>>,
}
#[derive(Clone)]
struct WeakProtectedConn {
guard: Weak<AsyncRefCell<()>>,
conn: std::sync::Weak<Mutex<Option<rusqlite::Connection>>>,
}
impl ProtectedConn {
fn new(conn: rusqlite::Connection) -> Self {
Self {
guard: Rc::new(AsyncRefCell::new(())),
conn: Arc::new(Mutex::new(Some(conn))),
}
}
fn downgrade(&self) -> WeakProtectedConn {
WeakProtectedConn {
guard: Rc::downgrade(&self.guard),
conn: Arc::downgrade(&self.conn),
}
}
}
impl WeakProtectedConn {
fn upgrade(&self) -> Option<ProtectedConn> {
let guard = self.guard.upgrade()?;
let conn = self.conn.upgrade()?;
Some(ProtectedConn { guard, conn })
}
}
pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
pub default_storage_dir: Option<PathBuf>,
_permissions: PhantomData<P>,
@ -197,7 +242,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
}
})
.await?;
let conn = Rc::new(AsyncRefCell::new(Cell::new(Some(conn))));
let conn = ProtectedConn::new(conn);
SqliteDb::run_tx(conn.clone(), |tx| {
tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
@ -227,16 +272,35 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
})
.await?;
let expiration_watcher = spawn(watch_expiration(conn.clone()));
Ok(SqliteDb {
conn,
queue: OnceCell::new(),
expiration_watcher,
})
}
}
pub struct SqliteDb {
conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
conn: ProtectedConn,
queue: OnceCell<SqliteQueue>,
expiration_watcher: deno_core::task::JoinHandle<()>,
}
impl Drop for SqliteDb {
fn drop(&mut self) {
self.expiration_watcher.abort();
// The above `abort()` operation is asynchronous. It's not
// guaranteed that the sqlite connection will be closed immediately.
// So here we synchronously take the conn mutex and drop the connection.
//
// This blocks the event loop if the connection is still being used,
// but ensures correctness - deleting the database file after calling
// the `close` method will always work.
self.conn.conn.lock().unwrap().take();
}
}
async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>(
@ -263,10 +327,7 @@ async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>(
}
impl SqliteDb {
async fn run_tx<F, R>(
conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
f: F,
) -> Result<R, AnyError>
async fn run_tx<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError>
where
F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
+ Clone
@ -277,42 +338,38 @@ impl SqliteDb {
sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await
}
async fn run_tx_inner<F, R>(
conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
f: F,
) -> Result<R, AnyError>
async fn run_tx_inner<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError>
where
F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
+ Send
+ 'static,
R: Send + 'static,
{
// Transactions need exclusive access to the connection. Wait until
// we can borrow_mut the connection.
let cell = conn.borrow_mut().await;
// `run_tx` runs in an asynchronous context. First acquire the async lock to
// coordinate with other async invocations.
let _guard_holder = conn.guard.borrow_mut().await;
// Take the db out of the cell and run the transaction via spawn_blocking.
let mut db = cell.take().unwrap();
let (result, db) = spawn_blocking(move || {
let result = {
match db.transaction() {
Ok(tx) => f(tx),
Err(e) => Err(e.into()),
}
// Then, take the synchronous lock. This operation is guaranteed to success without waiting,
// unless the database is being closed.
let db = conn.conn.clone();
spawn_blocking(move || {
let mut db = db.try_lock().ok();
let Some(db) = db.as_mut().and_then(|x| x.as_mut()) else {
return Err(type_error(ERROR_USING_CLOSED_DATABASE))
};
(result, db)
let result = match db.transaction() {
Ok(tx) => f(tx),
Err(e) => Err(e.into()),
};
result
})
.await
.unwrap();
// Put the db back into the cell.
cell.set(Some(db));
result
.unwrap()
}
}
pub struct DequeuedMessage {
conn: Weak<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
conn: WeakProtectedConn,
id: String,
payload: Option<Vec<u8>>,
waker_tx: mpsc::Sender<()>,
@ -341,7 +398,20 @@ impl QueueMessageHandle for DequeuedMessage {
tx.commit()?;
Ok(requeued)
})
.await?;
.await;
let requeued = match requeued {
Ok(x) => x,
Err(e) => {
// Silently ignore the error if the database has been closed
// This message will be delivered on the next run
if get_custom_error_class(&e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE
{
return Ok(());
}
return Err(e);
}
};
if requeued {
// If the message was requeued, wake up the dequeue loop.
self.waker_tx.send(()).await?;
@ -360,7 +430,7 @@ impl QueueMessageHandle for DequeuedMessage {
type DequeueReceiver = mpsc::Receiver<(Vec<u8>, String)>;
struct SqliteQueue {
conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
conn: ProtectedConn,
dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>,
concurrency_limiter: Arc<Semaphore>,
waker_tx: mpsc::Sender<()>,
@ -368,7 +438,7 @@ struct SqliteQueue {
}
impl SqliteQueue {
fn new(conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>) -> Self {
fn new(conn: ProtectedConn) -> Self {
let conn_clone = conn.clone();
let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
let (waker_tx, waker_rx) = mpsc::channel::<()>(1);
@ -406,7 +476,7 @@ impl SqliteQueue {
let permit = self.concurrency_limiter.clone().acquire_owned().await?;
Ok(DequeuedMessage {
conn: Rc::downgrade(&self.conn),
conn: self.conn.downgrade(),
id,
payload: Some(payload),
waker_tx: self.waker_tx.clone(),
@ -424,7 +494,7 @@ impl SqliteQueue {
}
async fn dequeue_loop(
conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
conn: ProtectedConn,
dequeue_tx: mpsc::Sender<(Vec<u8>, String)>,
mut shutdown_rx: watch::Receiver<()>,
mut waker_rx: mpsc::Receiver<()>,
@ -511,7 +581,7 @@ impl SqliteQueue {
}
async fn get_earliest_ready_ts(
conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
conn: ProtectedConn,
) -> Result<Option<u64>, AnyError> {
SqliteDb::run_tx(conn.clone(), move |tx| {
let ts = tx
@ -527,7 +597,7 @@ impl SqliteQueue {
}
async fn requeue_inflight_messages(
conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
conn: ProtectedConn,
) -> Result<(), AnyError> {
loop {
let done = SqliteDb::run_tx(conn.clone(), move |tx| {
@ -608,7 +678,7 @@ impl SqliteQueue {
for key in keys_if_undelivered {
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_SET)?
.execute(params![key, &data, &VALUE_ENCODING_V8, &version])?;
.execute(params![key, &data, &VALUE_ENCODING_V8, &version, -1i64])?;
assert_eq!(changed, 1);
}
}
@ -623,6 +693,31 @@ impl SqliteQueue {
}
}
async fn watch_expiration(db: ProtectedConn) {
loop {
// Scan for expired keys
let res = SqliteDb::run_tx(db.clone(), move |tx| {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
tx.prepare_cached(
"delete from kv where expiration_ms >= 0 and expiration_ms <= ?",
)?
.execute(params![now])?;
tx.commit()?;
Ok(())
})
.await;
if let Err(e) = res {
eprintln!("kv: Error in expiration watcher: {}", e);
}
let sleep_duration =
Duration::from_secs_f64(60.0 + rand::thread_rng().gen_range(0.0..30.0));
tokio::time::sleep(sleep_duration).await;
}
}
#[async_trait(?Send)]
impl Database for SqliteDb {
type QMH = DequeuedMessage;
@ -698,9 +793,17 @@ impl Database for SqliteDb {
match &mutation.kind {
MutationKind::Set(value) => {
let (value, encoding) = encode_value(value);
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_SET)?
.execute(params![mutation.key, &value, &encoding, &version])?;
let changed =
tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![
mutation.key,
value,
&encoding,
&version,
mutation
.expire_at
.and_then(|x| i64::try_from(x).ok())
.unwrap_or(-1i64)
])?;
assert_eq!(changed, 1)
}
MutationKind::Delete => {
@ -845,7 +948,8 @@ fn mutate_le64(
key,
&new_value[..],
encoding,
new_version
new_version,
-1i64,
])?;
assert_eq!(changed, 1);