feat(ext/kv): key-value store (#18232)

This commit adds unstable "Deno.openKv()" API that allows to open
a key-value database at a specified path.

---------

Co-authored-by: Luca Casonato <hello@lcas.dev>
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Heyang Zhou 2023-03-22 12:13:24 +08:00 committed by GitHub
parent 8bcffff9dc
commit 92ebf4afe5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 3831 additions and 2 deletions

15
Cargo.lock generated
View file

@ -1078,6 +1078,20 @@ dependencies = [
"winapi",
]
[[package]]
name = "deno_kv"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"base64 0.13.1",
"deno_core",
"hex",
"num-bigint",
"rusqlite",
"serde",
]
[[package]]
name = "deno_lint"
version = "0.43.0"
@ -1188,6 +1202,7 @@ dependencies = [
"deno_fs",
"deno_http",
"deno_io",
"deno_kv",
"deno_napi",
"deno_net",
"deno_node",

View file

@ -23,6 +23,7 @@ members = [
"ext/fs",
"ext/http",
"ext/io",
"ext/kv",
"ext/net",
"ext/node",
"ext/url",
@ -67,6 +68,7 @@ deno_http = { version = "0.88.0", path = "./ext/http" }
deno_io = { version = "0.3.0", path = "./ext/io" }
deno_net = { version = "0.85.0", path = "./ext/net" }
deno_node = { version = "0.30.0", path = "./ext/node" }
deno_kv = { version = "0.1.0", path = "./ext/kv" }
deno_tls = { version = "0.80.0", path = "./ext/tls" }
deno_url = { version = "0.93.0", path = "./ext/url" }
deno_web = { version = "0.124.0", path = "./ext/web" }
@ -91,6 +93,7 @@ encoding_rs = "=0.8.31"
flate2 = "=1.0.24"
fs3 = "0.5.0"
futures = "0.3.21"
hex = "0.4"
http = "0.2.9"
hyper = "0.14.18"
indexmap = { version = "1.9.2", features = ["serde"] }

View file

@ -8,6 +8,7 @@ use deno_core::Extension;
use deno_core::ExtensionFileSource;
use deno_core::ExtensionFileSourceCode;
use deno_runtime::deno_cache::SqliteBackedCache;
use deno_runtime::deno_kv::sqlite::SqliteDbHandler;
use deno_runtime::permissions::PermissionsContainer;
use deno_runtime::*;
@ -353,6 +354,10 @@ fn create_cli_snapshot(snapshot_path: PathBuf) {
None,
),
deno_tls::deno_tls::init_ops(),
deno_kv::deno_kv::init_ops(
SqliteDbHandler::<PermissionsContainer>::new(None),
false, // No --unstable.
),
deno_napi::deno_napi::init_ops::<PermissionsContainer>(),
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Default::default()),

933
cli/tests/unit/kv_test.ts Normal file
View file

@ -0,0 +1,933 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import {
assert,
assertEquals,
AssertionError,
assertRejects,
assertThrows,
} from "./test_util.ts";
function dbTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
Deno.test({
name,
async fn() {
const db: Deno.Kv = await Deno.openKv(
":memory:",
);
try {
await fn(db);
} finally {
await db.close();
}
},
});
}
dbTest("basic read-write-delete and versionstamps", async (db) => {
const result1 = await db.get(["a"]);
assertEquals(result1.key, ["a"]);
assertEquals(result1.value, null);
assertEquals(result1.versionstamp, null);
await db.set(["a"], "b");
const result2 = await db.get(["a"]);
assertEquals(result2.key, ["a"]);
assertEquals(result2.value, "b");
assertEquals(result2.versionstamp, "00000000000000010000");
await db.set(["a"], "c");
const result3 = await db.get(["a"]);
assertEquals(result3.key, ["a"]);
assertEquals(result3.value, "c");
assertEquals(result3.versionstamp, "00000000000000020000");
await db.delete(["a"]);
const result4 = await db.get(["a"]);
assertEquals(result4.key, ["a"]);
assertEquals(result4.value, null);
assertEquals(result4.versionstamp, null);
});
const VALUE_CASES = [
{ name: "string", value: "hello" },
{ name: "number", value: 42 },
{ name: "bigint", value: 42n },
{ name: "boolean", value: true },
{ name: "null", value: null },
{ name: "undefined", value: undefined },
{ name: "Date", value: new Date(0) },
{ name: "Uint8Array", value: new Uint8Array([1, 2, 3]) },
{ name: "ArrayBuffer", value: new ArrayBuffer(3) },
{ name: "array", value: [1, 2, 3] },
{ name: "object", value: { a: 1, b: 2 } },
{ name: "nested array", value: [[1, 2], [3, 4]] },
{ name: "nested object", value: { a: { b: 1 } } },
];
for (const { name, value } of VALUE_CASES) {
dbTest(`set and get ${name} value`, async (db) => {
await db.set(["a"], value);
const result = await db.get(["a"]);
assertEquals(result.key, ["a"]);
assertEquals(result.value, value);
});
}
dbTest("set and get recursive object", async (db) => {
// deno-lint-ignore no-explicit-any
const value: any = { a: undefined };
value.a = value;
await db.set(["a"], value);
const result = await db.get(["a"]);
assertEquals(result.key, ["a"]);
// deno-lint-ignore no-explicit-any
const resultValue: any = result.value;
assert(resultValue.a === resultValue);
});
const keys = [
["a"],
["a", "b"],
["a", "b", "c"],
[1],
["a", 1],
["a", 1, "b"],
[1n],
["a", 1n],
["a", 1n, "b"],
[true],
["a", true],
["a", true, "b"],
[new Uint8Array([1, 2, 3])],
["a", new Uint8Array([1, 2, 3])],
["a", new Uint8Array([1, 2, 3]), "b"],
[1, 1n, true, new Uint8Array([1, 2, 3]), "a"],
];
for (const key of keys) {
dbTest(`set and get ${Deno.inspect(key)} key`, async (db) => {
await db.set(key, "b");
const result = await db.get(key);
assertEquals(result.key, key);
assertEquals(result.value, "b");
});
}
const INVALID_KEYS = [
[null],
[undefined],
[],
[{}],
[new Date()],
[new ArrayBuffer(3)],
[new Uint8Array([1, 2, 3]).buffer],
[["a", "b"]],
];
for (const key of INVALID_KEYS) {
dbTest(`set and get invalid key ${Deno.inspect(key)}`, async (db) => {
await assertRejects(
async () => {
// @ts-ignore - we are testing invalid keys
await db.set(key, "b");
},
Error,
);
});
}
dbTest("compare and mutate", async (db) => {
await db.set(["t"], "1");
const currentValue = await db.get(["t"]);
assertEquals(currentValue.versionstamp, "00000000000000010000");
let ok = await db.atomic()
.check({ key: ["t"], versionstamp: currentValue.versionstamp })
.set(currentValue.key, "2")
.commit();
assertEquals(ok, true);
const newValue = await db.get(["t"]);
assertEquals(newValue.versionstamp, "00000000000000020000");
assertEquals(newValue.value, "2");
ok = await db.atomic()
.check({ key: ["t"], versionstamp: currentValue.versionstamp })
.set(currentValue.key, "3")
.commit();
assertEquals(ok, false);
const newValue2 = await db.get(["t"]);
assertEquals(newValue2.versionstamp, "00000000000000020000");
assertEquals(newValue2.value, "2");
});
dbTest("compare and mutate not exists", async (db) => {
let ok = await db.atomic()
.check({ key: ["t"], versionstamp: null })
.set(["t"], "1")
.commit();
assertEquals(ok, true);
const newValue = await db.get(["t"]);
assertEquals(newValue.versionstamp, "00000000000000010000");
assertEquals(newValue.value, "1");
ok = await db.atomic()
.check({ key: ["t"], versionstamp: null })
.set(["t"], "2")
.commit();
assertEquals(ok, false);
});
dbTest("compare multiple and mutate", async (db) => {
await db.set(["t1"], "1");
await db.set(["t2"], "2");
const currentValue1 = await db.get(["t1"]);
assertEquals(currentValue1.versionstamp, "00000000000000010000");
const currentValue2 = await db.get(["t2"]);
assertEquals(currentValue2.versionstamp, "00000000000000020000");
const ok = await db.atomic()
.check({ key: ["t1"], versionstamp: currentValue1.versionstamp })
.check({ key: ["t2"], versionstamp: currentValue2.versionstamp })
.set(currentValue1.key, "3")
.set(currentValue2.key, "4")
.commit();
assertEquals(ok, true);
const newValue1 = await db.get(["t1"]);
assertEquals(newValue1.versionstamp, "00000000000000030000");
assertEquals(newValue1.value, "3");
const newValue2 = await db.get(["t2"]);
assertEquals(newValue2.versionstamp, "00000000000000030000");
assertEquals(newValue2.value, "4");
// just one of the two checks failed
const ok2 = await db.atomic()
.check({ key: ["t1"], versionstamp: newValue1.versionstamp })
.check({ key: ["t2"], versionstamp: null })
.set(newValue1.key, "5")
.set(newValue2.key, "6")
.commit();
assertEquals(ok2, false);
const newValue3 = await db.get(["t1"]);
assertEquals(newValue3.versionstamp, "00000000000000030000");
assertEquals(newValue3.value, "3");
const newValue4 = await db.get(["t2"]);
assertEquals(newValue4.versionstamp, "00000000000000030000");
assertEquals(newValue4.value, "4");
});
dbTest("atomic mutation ordering (set before delete)", async (db) => {
await db.set(["a"], "1");
const ok1 = await db.atomic()
.set(["a"], "2")
.delete(["a"])
.commit();
assert(ok1);
const result = await db.get(["a"]);
assertEquals(result.value, null);
});
dbTest("atomic mutation ordering (delete before set)", async (db) => {
await db.set(["a"], "1");
const ok1 = await db.atomic()
.delete(["a"])
.set(["a"], "2")
.commit();
assert(ok1);
const result = await db.get(["a"]);
assertEquals(result.value, "2");
});
dbTest("atomic mutation type=set", async (db) => {
const ok = await db.atomic()
.mutate({ key: ["a"], value: "1", type: "set" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assertEquals(result.value, "1");
});
dbTest("atomic mutation type=set overwrite", async (db) => {
await db.set(["a"], "1");
const ok = await db.atomic()
.mutate({ key: ["a"], value: "2", type: "set" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assertEquals(result.value, "2");
});
dbTest("atomic mutation type=delete", async (db) => {
await db.set(["a"], "1");
const ok = await db.atomic()
.mutate({ key: ["a"], type: "delete" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assertEquals(result.value, null);
});
dbTest("atomic mutation type=delete no exists", async (db) => {
const ok = await db.atomic()
.mutate({ key: ["a"], type: "delete" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assertEquals(result.value, null);
});
dbTest("atomic mutation type=sum", async (db) => {
await db.set(["a"], new Deno.KvU64(10n));
const ok = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "sum" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assertEquals(result.value, new Deno.KvU64(11n));
});
dbTest("atomic mutation type=sum no exists", async (db) => {
const ok = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "sum" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assert(result.value);
assertEquals(result.value, new Deno.KvU64(1n));
});
dbTest("atomic mutation type=sum wrap around", async (db) => {
await db.set(["a"], new Deno.KvU64(0xffffffffffffffffn));
const ok = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(10n), type: "sum" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assertEquals(result.value, new Deno.KvU64(9n));
const ok2 = await db.atomic()
.mutate({
key: ["a"],
value: new Deno.KvU64(0xffffffffffffffffn),
type: "sum",
})
.commit();
assert(ok2);
const result2 = await db.get(["a"]);
assertEquals(result2.value, new Deno.KvU64(8n));
});
dbTest("atomic mutation type=sum wrong type in db", async (db) => {
await db.set(["a"], 1);
assertRejects(
async () => {
await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "sum" })
.commit();
},
TypeError,
"Failed to perform 'sum' mutation on a non-U64 value in the database",
);
});
dbTest("atomic mutation type=sum wrong type in mutation", async (db) => {
await db.set(["a"], new Deno.KvU64(1n));
assertRejects(
async () => {
await db.atomic()
// @ts-expect-error wrong type is intentional
.mutate({ key: ["a"], value: 1, type: "sum" })
.commit();
},
TypeError,
"Failed to perform 'sum' mutation on a non-U64 operand",
);
});
dbTest("atomic mutation type=min", async (db) => {
await db.set(["a"], new Deno.KvU64(10n));
const ok = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(5n), type: "min" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assertEquals(result.value, new Deno.KvU64(5n));
const ok2 = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(15n), type: "min" })
.commit();
assert(ok2);
const result2 = await db.get(["a"]);
assertEquals(result2.value, new Deno.KvU64(5n));
});
dbTest("atomic mutation type=min no exists", async (db) => {
const ok = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "min" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assert(result.value);
assertEquals(result.value, new Deno.KvU64(1n));
});
dbTest("atomic mutation type=min wrong type in db", async (db) => {
await db.set(["a"], 1);
assertRejects(
async () => {
await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "min" })
.commit();
},
TypeError,
"Failed to perform 'min' mutation on a non-U64 value in the database",
);
});
dbTest("atomic mutation type=min wrong type in mutation", async (db) => {
await db.set(["a"], new Deno.KvU64(1n));
assertRejects(
async () => {
await db.atomic()
// @ts-expect-error wrong type is intentional
.mutate({ key: ["a"], value: 1, type: "min" })
.commit();
},
TypeError,
"Failed to perform 'min' mutation on a non-U64 operand",
);
});
dbTest("atomic mutation type=max", async (db) => {
await db.set(["a"], new Deno.KvU64(10n));
const ok = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(5n), type: "max" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assertEquals(result.value, new Deno.KvU64(10n));
const ok2 = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(15n), type: "max" })
.commit();
assert(ok2);
const result2 = await db.get(["a"]);
assertEquals(result2.value, new Deno.KvU64(15n));
});
dbTest("atomic mutation type=max no exists", async (db) => {
const ok = await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "max" })
.commit();
assert(ok);
const result = await db.get(["a"]);
assert(result.value);
assertEquals(result.value, new Deno.KvU64(1n));
});
dbTest("atomic mutation type=max wrong type in db", async (db) => {
await db.set(["a"], 1);
assertRejects(
async () => {
await db.atomic()
.mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "max" })
.commit();
},
TypeError,
"Failed to perform 'max' mutation on a non-U64 value in the database",
);
});
dbTest("atomic mutation type=max wrong type in mutation", async (db) => {
await db.set(["a"], new Deno.KvU64(1n));
assertRejects(
async () => {
await db.atomic()
// @ts-expect-error wrong type is intentional
.mutate({ key: ["a"], value: 1, type: "max" })
.commit();
},
TypeError,
"Failed to perform 'max' mutation on a non-U64 operand",
);
});
Deno.test("KvU64 comparison", () => {
const a = new Deno.KvU64(1n);
const b = new Deno.KvU64(1n);
assertEquals(a, b);
assertThrows(() => {
assertEquals(a, new Deno.KvU64(2n));
}, AssertionError);
});
Deno.test("KvU64 overflow", () => {
assertThrows(() => {
new Deno.KvU64(2n ** 64n);
}, RangeError);
});
Deno.test("KvU64 underflow", () => {
assertThrows(() => {
new Deno.KvU64(-1n);
}, RangeError);
});
Deno.test("KvU64 frozen", () => {
const a = new Deno.KvU64(1n);
assertThrows(() => {
// @ts-expect-error value is readonly
a.value = 2n;
}, TypeError);
});
Deno.test("KvU64 unbox", () => {
const a = new Deno.KvU64(1n);
assertEquals(a.value, 1n);
});
async function collect(iter: Deno.KvListIterator): Promise<Deno.KvEntry[]> {
const entries: Deno.KvEntry[] = [];
for await (const entry of iter) {
entries.push(entry);
}
return entries;
}
async function setupData(db: Deno.Kv) {
await db.atomic()
.set(["a"], -1)
.set(["a", "a"], 0)
.set(["a", "b"], 1)
.set(["a", "c"], 2)
.set(["a", "d"], 3)
.set(["a", "e"], 4)
.set(["b"], 99)
.set(["b", "a"], 100)
.commit();
}
dbTest("list prefix", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }));
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix empty", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["c"] }));
assertEquals(entries.length, 0);
const entries2 = await collect(db.list({ prefix: ["a", "f"] }));
assertEquals(entries2.length, 0);
});
dbTest("list prefix with start", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["a"], start: ["a", "c"] }));
assertEquals(entries, [
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix with start empty", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["a"], start: ["a", "f"] }));
assertEquals(entries.length, 0);
});
dbTest("list prefix with end", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["a"], end: ["a", "c"] }));
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix with end empty", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["a"], end: ["a", "a"] }));
assertEquals(entries.length, 0);
});
dbTest("list prefix reverse", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { reverse: true }));
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix reverse with start", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"], start: ["a", "c"] }, { reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix reverse with start empty", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"], start: ["a", "f"] }, { reverse: true }),
);
assertEquals(entries.length, 0);
});
dbTest("list prefix reverse with end", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"], end: ["a", "c"] }, { reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix reverse with end empty", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"], end: ["a", "a"] }, { reverse: true }),
);
assertEquals(entries.length, 0);
});
dbTest("list prefix limit", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { limit: 2 }));
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix limit reverse", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { limit: 2, reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix with small batch size", async (db) => {
await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { batchSize: 2 }));
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix with small batch size reverse", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix with small batch size and limit", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3 }),
);
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix with small batch size and limit reverse", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3, reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix with manual cursor", async (db) => {
await setupData(db);
const iterator = db.list({ prefix: ["a"] }, { limit: 2 });
const values = await collect(iterator);
assertEquals(values, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
]);
const cursor = iterator.cursor;
assertEquals(cursor, "AmIA");
const iterator2 = db.list({ prefix: ["a"] }, { cursor });
const values2 = await collect(iterator2);
assertEquals(values2, [
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
]);
});
dbTest("list prefix with manual cursor reverse", async (db) => {
await setupData(db);
const iterator = db.list({ prefix: ["a"] }, { limit: 2, reverse: true });
const values = await collect(iterator);
assertEquals(values, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
]);
const cursor = iterator.cursor;
assertEquals(cursor, "AmQA");
const iterator2 = db.list({ prefix: ["a"] }, { cursor, reverse: true });
const values2 = await collect(iterator2);
assertEquals(values2, [
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
]);
});
dbTest("list range", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }),
);
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
]);
});
dbTest("list range reverse", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, { reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
]);
});
dbTest("list range with limit", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, { limit: 3 }),
);
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
]);
});
dbTest("list range with limit reverse", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, {
limit: 3,
reverse: true,
}),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
]);
});
dbTest("list range nesting", async (db) => {
await setupData(db);
const entries = await collect(db.list({ start: ["a"], end: ["a", "d"] }));
assertEquals(entries, [
{ key: ["a"], value: -1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
]);
});
dbTest("list range short", async (db) => {
await setupData(db);
const entries = await collect(
db.list({ start: ["a", "b"], end: ["a", "d"] }),
);
assertEquals(entries, [
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
]);
});
dbTest("list range with manual cursor", async (db) => {
await setupData(db);
const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, {
limit: 2,
});
const entries = await collect(iterator);
assertEquals(entries, [
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
]);
const cursor = iterator.cursor;
const iterator2 = db.list({ start: ["a", "b"], end: ["a", "z"] }, {
cursor,
});
const entries2 = await collect(iterator2);
assertEquals(entries2, [
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
]);
});
dbTest("list range with manual cursor reverse", async (db) => {
await setupData(db);
const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, {
limit: 2,
reverse: true,
});
const entries = await collect(iterator);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
]);
const cursor = iterator.cursor;
const iterator2 = db.list({ start: ["a", "b"], end: ["a", "z"] }, {
cursor,
reverse: true,
});
const entries2 = await collect(iterator2);
assertEquals(entries2, [
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
]);
});
dbTest("list invalid selector", async (db) => {
await setupData(db);
await assertRejects(async () => {
await collect(
db.list({ prefix: ["a"], start: ["a", "b"], end: ["a", "c"] }),
);
}, TypeError);
await assertRejects(async () => {
await collect(
// @ts-expect-error missing end
db.list({ start: ["a", "b"] }),
);
}, TypeError);
await assertRejects(async () => {
await collect(
// @ts-expect-error missing start
db.list({ end: ["a", "b"] }),
);
}, TypeError);
});
dbTest("invalid versionstamp in atomic check rejects", async (db) => {
await assertRejects(async () => {
await db.atomic().check({ key: ["a"], versionstamp: "" }).commit();
}, TypeError);
await assertRejects(async () => {
await db.atomic().check({ key: ["a"], versionstamp: "xx".repeat(10) })
.commit();
}, TypeError);
await assertRejects(async () => {
await db.atomic().check({ key: ["a"], versionstamp: "aa".repeat(11) })
.commit();
}, TypeError);
});
dbTest("invalid mutation type rejects", async (db) => {
await assertRejects(async () => {
await db.atomic()
// @ts-expect-error invalid type + value combo
.mutate({ key: ["a"], type: "set" })
.commit();
}, TypeError);
await assertRejects(async () => {
await db.atomic()
// @ts-expect-error invalid type + value combo
.mutate({ key: ["a"], type: "delete", value: "123" })
.commit();
}, TypeError);
await assertRejects(async () => {
await db.atomic()
// @ts-expect-error invalid type
.mutate({ key: ["a"], type: "foobar" })
.commit();
}, TypeError);
await assertRejects(async () => {
await db.atomic()
// @ts-expect-error invalid type
.mutate({ key: ["a"], type: "foobar", value: "123" })
.commit();
}, TypeError);
});

View file

@ -7,6 +7,7 @@ export {
assert,
assertEquals,
assertFalse,
AssertionError,
assertMatch,
assertNotEquals,
assertNotStrictEquals,

View file

@ -1518,6 +1518,505 @@ declare namespace Deno {
* @category HTTP Server
*/
export function upgradeHttpRaw(request: Request): [Deno.Conn, Uint8Array];
/** **UNSTABLE**: New API, yet to be vetted.
*
* Open a new {@linkcode Deno.Kv} connection to persist data.
*
* When a path is provided, the database will be persisted to disk at that
* path. Read and write access to the file is required.
*
* When no path is provided, the database will be opened in a default path for
* the current script. This location is persistent across script runs and is
* keyed on the origin storage key (the same key that is used to determine
* `localStorage` persistence). More information about the origin storage key
* can be found in the Deno Manual.
*
* @tags allow-read, allow-write
* @category KV
*/
export function openKv(path?: string): Promise<Deno.Kv>;
/** **UNSTABLE**: New API, yet to be vetted.
*
* A key to be persisted in a {@linkcode Deno.Kv}. A key is a sequence
* of {@linkcode Deno.KvKeyPart}s.
*
* Keys are ordered lexicographically by their parts. The first part is the
* most significant, and the last part is the least significant. The order of
* the parts is determined by both the type and the value of the part. The
* relative significance of the types can be found in documentation for the
* {@linkcode Deno.KvKeyPart} type.
*
* @category KV
*/
export type KvKey = readonly KvKeyPart[];
/** **UNSTABLE**: New API, yet to be vetted.
*
* A single part of a {@linkcode Deno.KvKey}. Parts are ordered
* lexicographically, first by their type, and within a given type by their
* value.
*
* The ordering of types is as follows:
*
* 1. `Uint8Array`
* 2. `string`
* 3. `number`
* 4. `bigint`
* 5. `boolean`
*
* Within a given type, the ordering is as follows:
*
* - `Uint8Array` is ordered by the byte ordering of the array
* - `string` is ordered by the byte ordering of the UTF-8 encoding of the
* string
* - `number` is ordered following this pattern: `-NaN`
* < `-Infinity` < `-100.0` < `-1.0` < -`0.5` < `-0.0` < `0.0` < `0.5`
* < `1.0` < `100.0` < `Infinity` < `NaN`
* - `bigint` is ordered by mathematical ordering, with the largest negative
* number being the least first value, and the largest positive number
* being the last value
* - `boolean` is ordered by `false` < `true`
*
* This means that the part `1.0` (a number) is ordered before the part `2.0`
* (also a number), but is greater than the part `0n` (a bigint), because
* `1.0` is a number and `0n` is a bigint, and type ordering has precedence
* over the ordering of values within a type.
*
* @category KV
*/
export type KvKeyPart = Uint8Array | string | number | bigint | boolean;
/** **UNSTABLE**: New API, yet to be vetted.
*
* Consistency level of a KV operation.
*
* - `strong` - This operation must be strongly-consistent.
* - `eventual` - Eventually-consistent behavior is allowed.
*
* @category KV
*/
export type KvConsistencyLevel = "strong" | "eventual";
/** **UNSTABLE**: New API, yet to be vetted.
*
* A selector that selects the range of data returned by a list operation on a
* {@linkcode Deno.Kv}.
*
* The selector can either be a prefix selector or a range selector. A prefix
* selector selects all keys that start with the given prefix (optionally
* starting at a given key). A range selector selects all keys that are
* lexicographically between the given start and end keys.
*
* @category KV
*/
export type KvListSelector =
| { prefix: KvKey }
| { prefix: KvKey; start: KvKey }
| { prefix: KvKey; end: KvKey }
| { start: KvKey; end: KvKey };
/** **UNSTABLE**: New API, yet to be vetted.
*
* A mutation to a key in a {@linkcode Deno.Kv}. A mutation is a
* combination of a key, a value, and a type. The type determines how the
* mutation is applied to the key.
*
* - `set` - Sets the value of the key to the given value, overwriting any
* existing value.
* - `delete` - Deletes the key from the database. The mutation is a no-op if
* the key does not exist.
* - `sum` - Adds the given value to the existing value of the key. Both the
* value specified in the mutation, and any existing value must be of type
* `Deno.KvU64`. If the key does not exist, the value is set to the given
* value (summed with 0).
* - `max` - Sets the value of the key to the maximum of the existing value
* and the given value. Both the value specified in the mutation, and any
* existing value must be of type `Deno.KvU64`. If the key does not exist,
* the value is set to the given value.
* - `min` - Sets the value of the key to the minimum of the existing value
* and the given value. Both the value specified in the mutation, and any
* existing value must be of type `Deno.KvU64`. If the key does not exist,
* the value is set to the given value.
*
* @category KV
*/
export type KvMutation =
& { key: KvKey }
& (
| { type: "set"; value: unknown }
| { type: "delete" }
| { type: "sum"; value: KvU64 }
| { type: "max"; value: KvU64 }
| { type: "min"; value: KvU64 }
);
/** **UNSTABLE**: New API, yet to be vetted.
*
* An iterator over a range of data entries in a {@linkcode Deno.Kv}.
*
* The cursor getter returns the cursor that can be used to resume the
* iteration from the current position in the future.
*
* @category KV
*/
export class KvListIterator implements AsyncIterableIterator<KvEntry> {
/**
* Returns the cursor of the current position in the iteration. This cursor
* can be used to resume the iteration from the current position in the
* future by passing it to the `cursor` option of the `list` method.
*/
get cursor(): string;
next(): Promise<IteratorResult<KvEntry, any>>;
[Symbol.asyncIterator](): AsyncIterableIterator<KvEntry>;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* A versioned pair of key and value in a {@linkcode Deno.Kv}.
*
* The `versionstamp` is a string that represents the current version of the
* key-value pair. It can be used to perform atomic operations on the KV store
* by passing it to the `check` method of a {@linkcode Deno.AtomicOperation}.
* A `null` versionstamp indicates that no value exists for the given key in
* the KV store.
*
* @category KV
*/
export interface KvEntry {
key: KvKey;
value: unknown;
versionstamp: string | null;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* Options for listing key-value pairs in a {@linkcode Deno.Kv}.
*
* @category KV
*/
export interface KvListOptions {
/**
* The maximum number of key-value pairs to return. If not specified, all
* matching key-value pairs will be returned.
*/
limit?: number;
/**
* The cursor to resume the iteration from. If not specified, the iteration
* will start from the beginning.
*/
cursor?: string;
/**
* Whether to reverse the order of the returned key-value pairs. If not
* specified, the order will be ascending from the start of the range as per
* the lexicographical ordering of the keys. If `true`, the order will be
* descending from the end of the range.
*
* The default value is `false`.
*/
reverse?: boolean;
/**
* The consistency level of the list operation. The default consistency
* level is "strong". Some use cases can benefit from using a weaker
* consistency level. For more information on consistency levels, see the
* documentation for {@linkcode Deno.KvConsistencyLevel}.
*
* List operations are performed in batches (in sizes specified by the
* `batchSize` option). The consistency level of the list operation is
* applied to each batch individually. This means that while each batch is
* guaranteed to be consistent within itself, the entire list operation may
* not be consistent across batches because a mutation may be applied to a
* key-value pair between batches, in a batch that has already been returned
* by the list operation.
*/
consistency?: KvConsistencyLevel;
/**
* The size of the batches in which the list operation is performed. Larger
* or smaller batch sizes may positively or negatively affect the
* performance of a list operation depending on the specific use case and
* iteration behavior. Slow iterating queries may benefit from using a
* smaller batch size for increased overall consistency, while fast
* iterating queries may benefit from using a larger batch size for better
* performance.
*
* The default batch size is equal to the `limit` option, or 100 if this is
* unset. The maximum value for this option is 500. Larger values will be
* clamped.
*/
batchSize?: number;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* A check to perform as part of a {@linkcode Deno.AtomicOperation}. The check
* will fail if the versionstamp for the key-value pair in the KV store does
* not match the given versionstamp. A check with a `null` versionstamp checks
* that the key-value pair does not currently exist in the KV store.
*
* @category KV
*/
export interface AtomicCheck {
key: KvKey;
versionstamp: string | null;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* An operation on a {@linkcode Deno.Kv} that can be performed
* atomically. Atomic operations do not auto-commit, and must be committed
* explicitly by calling the `commit` method.
*
* Atomic operations can be used to perform multiple mutations on the KV store
* in a single atomic transaction. They can also be used to perform
* conditional mutations by specifying one or more
* {@linkcode Deno.AtomicCheck}s that ensure that a mutation is only performed
* if the key-value pair in the KV has a specific versionstamp. If any of the
* checks fail, the entire operation will fail and no mutations will be made.
*
* The ordering of mutations is guaranteed to be the same as the ordering of
* the mutations specified in the operation. Checks are performed before any
* mutations are performed. The ordering of checks is unobservable.
*
* Atomic operations can be used to implement optimistic locking, where a
* mutation is only performed if the key-value pair in the KV store has not
* been modified since the last read. This can be done by specifying a check
* that ensures that the versionstamp of the key-value pair matches the
* versionstamp that was read. If the check fails, the mutation will not be
* performed and the operation will fail. One can then retry the read-modify-
* write operation in a loop until it succeeds.
*
* The `commit` method of an atomic operation returns a boolean indicating
* whether checks passed and mutations were performed. If the operation failed
* because of a failed check, the return value will be `false`. If the
* operation failed for any other reason (storage error, invalid value, etc.),
* an exception will be thrown.
*
* @category KV
*/
export class AtomicOperation {
/**
* Add to the operation a check that ensures that the versionstamp of the
* key-value pair in the KV store matches the given versionstamp. If the
* check fails, the entire operation will fail and no mutations will be
* performed during the commit.
*/
check(...checks: AtomicCheck[]): this;
/**
* Add to the operation a mutation that performs the specified mutation on
* the specified key if all checks pass during the commit. The types and
* semantics of all available mutations are described in the documentation
* for {@linkcode Deno.KvMutation}.
*/
mutate(...mutations: KvMutation[]): this;
/**
* Add to the operation a mutation that sets the value of the specified key
* to the specified value if all checks pass during the commit.
*/
set(key: KvKey, value: unknown): this;
/**
* Add to the operation a mutation that deletes the specified key if all
* checks pass during the commit.
*/
delete(key: KvKey): this;
/**
* Commit the operation to the KV store. Returns a boolean indicating
* whether checks passed and mutations were performed. If the operation
* failed because of a failed check, the return value will be `false`. If
* the operation failed for any other reason (storage error, invalid value,
* etc.), an exception will be thrown.
*
* If the commit returns `false`, one may create a new atomic operation with
* updated checks and mutations and attempt to commit it again. See the note
* on optimistic locking in the documentation for {@linkcode Deno.AtomicOperation}.
*/
commit(): Promise<boolean>;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* A key-value database that can be used to store and retrieve data.
*
* Data is stored as key-value pairs, where the key is a {@linkcode Deno.KvKey}
* and the value is an arbitrary structured-serializable JavaScript value.
* Keys are ordered lexicographically as described in the documentation for
* {@linkcode Deno.KvKey}. Keys are unique within a database, and the last
* value set for a given key is the one that is returned when reading the
* key. Keys can be deleted from the database, in which case they will no
* longer be returned when reading keys.
*
* Values can be any structured-serializable JavaScript value (objects,
* arrays, strings, numbers, etc.). The special value {@linkcode Deno.KvU64}
* can be used to store 64-bit unsigned integers in the database. This special
* value can not be nested within other objects or arrays. In addition to the
* regular database mutation operations, the unsigned 64-bit integer value
* also supports `sum`, `max`, and `min` mutations.
*
* Keys are versioned on write by assigning the key an ever-increasing
* "versionstamp". The versionstamp represents the version of a key-value pair
* in the database at some point in time, and can be used to perform
* transactional operations on the database without requiring any locking.
* This is enabled by atomic operations, which can have conditions that ensure
* that the operation only succeeds if the versionstamp of the key-value pair
* matches an expected versionstamp.
*
* Keys have a maximum length of 2048 bytes after serialization. Values have a
* maximum length of 16 KiB after serialization. Serialization of both keys
* and values is somewhat opaque, but one can usually assume that the
* serialization of any value is about the same length as the resulting string
* of a JSON serialization of that same value.
*
* @category KV
*/
export class Kv {
/**
* Retrieve the value and versionstamp for the given key from the database
* in the form of a {@linkcode Deno.KvEntry}. If no value exists for the key,
* the returned entry will have a `null` value and versionstamp.
*
* ```ts
* const db = await Deno.openKv();
* const result = await db.get(["foo"]);
* result.key; // ["foo"]
* result.value; // "bar"
* result.versionstamp; // "00000000000000010000"
* ```
*
* The `consistency` option can be used to specify the consistency level
* for the read operation. The default consistency level is "strong". Some
* use cases can benefit from using a weaker consistency level. For more
* information on consistency levels, see the documentation for
* {@linkcode Deno.KvConsistencyLevel}.
*/
get(
key: KvKey,
options?: { consistency?: KvConsistencyLevel },
): Promise<KvEntry>;
/**
* Retrieve multiple values and versionstamps from the database in the form
* of an array of {@linkcode Deno.KvEntry} objects. The returned array will
* have the same length as the `keys` array, and the entries will be in the
* same order as the keys. If no value exists for a given key, the returned
* entry will have a `null` value and versionstamp.
*
* ```ts
* const db = await Deno.openKv();
* const result = await db.getMany([["foo"], ["baz"]]);
* result[0].key; // ["foo"]
* result[0].value; // "bar"
* result[0].versionstamp; // "00000000000000010000"
* result[1].key; // ["baz"]
* result[1].value; // null
* result[1].versionstamp; // null
* ```
*
* The `consistency` option can be used to specify the consistency level
* for the read operation. The default consistency level is "strong". Some
* use cases can benefit from using a weaker consistency level. For more
* information on consistency levels, see the documentation for
* {@linkcode Deno.KvConsistencyLevel}.
*/
getMany(
keys: KvKey[],
options?: { consistency?: KvConsistencyLevel },
): Promise<KvEntry[]>;
/**
* Set the value for the given key in the database. If a value already
* exists for the key, it will be overwritten.
*
* ```ts
* const db = await Deno.openKv();
* await db.set(["foo"], "bar");
* ```
*/
set(key: KvKey, value: unknown): Promise<void>;
/**
* Delete the value for the given key from the database. If no value exists
* for the key, this operation is a no-op.
*
* ```ts
* const db = await Deno.openKv();
* await db.delete(["foo"]);
* ```
*/
delete(key: KvKey): Promise<void>;
/**
* Retrieve a list of keys in the database. The returned list is an
* {@linkcode Deno.KvListIterator} which can be used to iterate over the
* entries in the database.
*
* Each list operation must specify a selector which is used to specify the
* range of keys to return. The selector can either be a prefix selector, or
* a range selector:
*
* - A prefix selector selects all keys that start with the given prefix of
* key parts. For example, the selector `["users"]` will select all keys
* that start with the prefix `["users"]`, such as `["users", "alice"]`
* and `["users", "bob"]`. Note that you can not partially match a key
* part, so the selector `["users", "a"]` will not match the key
* `["users", "alice"]`. A prefix selector may specify a `start` key that
* is used to skip over keys that are lexicographically less than the
* start key.
* - A range selector selects all keys that are lexicographically between
* the given start and end keys (including the start, and excluding the
* end). For example, the selector `["users", "a"], ["users", "n"]` will
* select all keys that start with the prefix `["users"]` and have a
* second key part that is lexicographically between `a` and `n`, such as
* `["users", "alice"]`, `["users", "bob"]`, and `["users", "mike"]`, but
* not `["users", "noa"]` or `["users", "zoe"]`.
*
* ```ts
* const db = await Deno.openKv();
* const entries = db.list({ prefix: ["users"] });
* for await (const entry of entries) {
* entry.key; // ["users", "alice"]
* entry.value; // { name: "Alice" }
* entry.versionstamp; // "00000000000000010000"
* }
* ```
*
* The `options` argument can be used to specify additional options for the
* list operation. See the documentation for {@linkcode Deno.KvListOptions}
* for more information.
*/
list(selector: KvListSelector, options?: KvListOptions): KvListIterator;
/**
* Create a new {@linkcode Deno.AtomicOperation} object which can be used to
* perform an atomic transaction on the database. This does not perform any
* operations on the database - the atomic transaction must be committed
* explicitly using the {@linkcode Deno.AtomicOperation.commit} method once
* all checks and mutations have been added to the operation.
*/
atomic(): AtomicOperation;
/**
* Close the database connection. This will prevent any further operations
* from being performed on the database, but will wait for any in-flight
* operations to complete before closing the underlying database connection.
*/
close(): Promise<void>;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* Wrapper type for 64-bit unsigned integers for use as values in a
* {@linkcode Deno.Kv}.
*
* @category KV
*/
export class KvU64 {
/** Create a new `KvU64` instance from the given bigint value. If the value
* is signed or greater than 64-bits, an error will be thrown. */
constructor(value: bigint);
/** The value of this unsigned 64-bit integer, represented as a bigint. */
readonly value: bigint;
}
}
/** **UNSTABLE**: New API, yet to be vetted.

469
ext/kv/01_db.ts Normal file
View file

@ -0,0 +1,469 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// @ts-ignore internal api
const {
ObjectGetPrototypeOf,
AsyncGeneratorPrototype,
} = globalThis.__bootstrap.primordials;
const core = Deno.core;
const ops = core.ops;
const encodeCursor: (
selector: [Deno.KvKey | null, Deno.KvKey | null, Deno.KvKey | null],
boundaryKey: Deno.KvKey,
) => string = (selector, boundaryKey) =>
ops.op_kv_encode_cursor(selector, boundaryKey);
async function openKv(path: string) {
const rid = await core.opAsync("op_kv_database_open", path);
return new Kv(rid);
}
interface RawKvEntry {
key: Deno.KvKey;
value: RawValue;
versionstamp: string;
}
type RawValue = {
kind: "v8";
value: Uint8Array;
} | {
kind: "bytes";
value: Uint8Array;
} | {
kind: "u64";
value: bigint;
};
class Kv {
#rid: number;
constructor(rid: number) {
this.#rid = rid;
}
atomic() {
return new AtomicOperation(this.#rid);
}
async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) {
key = convertKey(key);
const [entries]: [RawKvEntry[]] = await core.opAsync(
"op_kv_snapshot_read",
this.#rid,
[[
null,
key,
null,
1,
false,
null,
]],
opts?.consistency ?? "strong",
);
if (!entries.length) {
return {
key,
value: null,
versionstamp: null,
};
}
return deserializeValue(entries[0]);
}
async set(key: Deno.KvKey, value: unknown) {
key = convertKey(key);
value = serializeValue(value);
const checks: Deno.AtomicCheck[] = [];
const mutations = [
[key, "set", value],
];
const result = await core.opAsync(
"op_kv_atomic_write",
this.#rid,
checks,
mutations,
[],
);
if (!result) throw new TypeError("Failed to set value");
}
async delete(key: Deno.KvKey) {
key = convertKey(key);
const checks: Deno.AtomicCheck[] = [];
const mutations = [
[key, "delete", null],
];
const result = await core.opAsync(
"op_kv_atomic_write",
this.#rid,
checks,
mutations,
[],
);
if (!result) throw new TypeError("Failed to set value");
}
list(
selector: Deno.KvListSelector,
options: {
limit?: number;
batchSize?: number;
cursor?: string;
reverse?: boolean;
consistency?: Deno.KvConsistencyLevel;
} = {},
): KvListIterator {
if (options.limit !== undefined && options.limit <= 0) {
throw new Error("limit must be positive");
}
let batchSize = options.batchSize ?? (options.limit ?? 100);
if (batchSize <= 0) throw new Error("batchSize must be positive");
if (batchSize > 500) batchSize = 500;
return new KvListIterator({
limit: options.limit,
selector,
cursor: options.cursor,
reverse: options.reverse ?? false,
consistency: options.consistency ?? "strong",
batchSize,
pullBatch: this.#pullBatch(batchSize),
});
}
#pullBatch(batchSize: number): (
selector: Deno.KvListSelector,
cursor: string | undefined,
reverse: boolean,
consistency: Deno.KvConsistencyLevel,
) => Promise<Deno.KvEntry[]> {
return async (selector, cursor, reverse, consistency) => {
const [entries]: [RawKvEntry[]] = await core.opAsync(
"op_kv_snapshot_read",
this.#rid,
[[
"prefix" in selector ? selector.prefix : null,
"start" in selector ? selector.start : null,
"end" in selector ? selector.end : null,
batchSize,
reverse,
cursor,
]],
consistency,
);
return entries.map(deserializeValue);
};
}
close() {
core.close(this.#rid);
}
}
class AtomicOperation {
#rid: number;
#checks: [Deno.KvKey, string | null][] = [];
#mutations: [Deno.KvKey, string, RawValue | null][] = [];
constructor(rid: number) {
this.#rid = rid;
}
check(...checks: Deno.AtomicCheck[]): this {
for (const check of checks) {
this.#checks.push([convertKey(check.key), check.versionstamp]);
}
return this;
}
mutate(...mutations: Deno.KvMutation[]): this {
for (const mutation of mutations) {
const key = convertKey(mutation.key);
let type: string;
let value: RawValue | null;
switch (mutation.type) {
case "delete":
type = "delete";
if (mutation.value) {
throw new TypeError("invalid mutation 'delete' with value");
}
break;
case "set":
case "sum":
case "min":
case "max":
type = mutation.type;
if (!("value" in mutation)) {
throw new TypeError(`invalid mutation '${type}' without value`);
}
value = serializeValue(mutation.value);
break;
default:
throw new TypeError("Invalid mutation type");
}
this.#mutations.push([key, type, value]);
}
return this;
}
set(key: Deno.KvKey, value: unknown): this {
this.#mutations.push([convertKey(key), "set", serializeValue(value)]);
return this;
}
delete(key: Deno.KvKey): this {
this.#mutations.push([convertKey(key), "delete", null]);
return this;
}
async commit(): Promise<boolean> {
const result = await core.opAsync(
"op_kv_atomic_write",
this.#rid,
this.#checks,
this.#mutations,
[], // TODO(@losfair): enqueue
);
return result;
}
then() {
throw new TypeError(
"`Deno.AtomicOperation` is not a promise. Did you forget to call `commit()`?",
);
}
}
const MIN_U64 = 0n;
const MAX_U64 = 0xffffffffffffffffn;
class KvU64 {
readonly value: bigint;
constructor(value: bigint) {
if (typeof value !== "bigint") {
throw new TypeError("value must be a bigint");
}
if (value < MIN_U64) {
throw new RangeError("value must be a positive bigint");
}
if (value > MAX_U64) {
throw new RangeError("value must be a 64-bit unsigned integer");
}
this.value = value;
Object.freeze(this);
}
}
function convertKey(key: Deno.KvKey | Deno.KvKeyPart): Deno.KvKey {
if (Array.isArray(key)) {
return key;
} else {
return [key as Deno.KvKeyPart];
}
}
function deserializeValue(entry: RawKvEntry): Deno.KvEntry {
const { kind, value } = entry.value;
switch (kind) {
case "v8":
return {
...entry,
value: core.deserialize(value),
};
case "bytes":
return {
...entry,
value,
};
case "u64":
return {
...entry,
value: new KvU64(value),
};
default:
throw new TypeError("Invalid value type");
}
}
function serializeValue(value: unknown): RawValue {
if (value instanceof Uint8Array) {
return {
kind: "bytes",
value,
};
} else if (value instanceof KvU64) {
return {
kind: "u64",
value: value.value,
};
} else {
return {
kind: "v8",
value: core.serialize(value),
};
}
}
// This gets the %AsyncIteratorPrototype% object (which exists but is not a
// global). We extend the KvListIterator iterator from, so that we immediately
// support async iterator helpers once they land. The %AsyncIterator% does not
// yet actually exist however, so right now the AsyncIterator binding refers to
// %Object%. I know.
// Once AsyncIterator is a global, we can just use it (from primordials), rather
// than doing this here.
const AsyncIteratorPrototype = ObjectGetPrototypeOf(AsyncGeneratorPrototype);
const AsyncIterator = AsyncIteratorPrototype.constructor;
class KvListIterator extends AsyncIterator
implements AsyncIterator<Deno.KvEntry> {
#selector: Deno.KvListSelector;
#entries: Deno.KvEntry[] | null = null;
#cursorGen: (() => string) | null = null;
#done = false;
#lastBatch = false;
#pullBatch: (
selector: Deno.KvListSelector,
cursor: string | undefined,
reverse: boolean,
consistency: Deno.KvConsistencyLevel,
) => Promise<Deno.KvEntry[]>;
#limit: number | undefined;
#count = 0;
#reverse: boolean;
#batchSize: number;
#consistency: Deno.KvConsistencyLevel;
constructor(
{ limit, selector, cursor, reverse, consistency, batchSize, pullBatch }: {
limit?: number;
selector: Deno.KvListSelector;
cursor?: string;
reverse: boolean;
batchSize: number;
consistency: Deno.KvConsistencyLevel;
pullBatch: (
selector: Deno.KvListSelector,
cursor: string | undefined,
reverse: boolean,
consistency: Deno.KvConsistencyLevel,
) => Promise<Deno.KvEntry[]>;
},
) {
super();
let prefix: Deno.KvKey | undefined;
let start: Deno.KvKey | undefined;
let end: Deno.KvKey | undefined;
if ("prefix" in selector && selector.prefix !== undefined) {
prefix = Object.freeze([...selector.prefix]);
}
if ("start" in selector && selector.start !== undefined) {
start = Object.freeze([...selector.start]);
}
if ("end" in selector && selector.end !== undefined) {
end = Object.freeze([...selector.end]);
}
if (prefix) {
if (start && end) {
throw new TypeError(
"Selector can not specify both 'start' and 'end' key when specifying 'prefix'.",
);
}
if (start) {
this.#selector = { prefix, start };
} else if (end) {
this.#selector = { prefix, end };
} else {
this.#selector = { prefix };
}
} else {
if (start && end) {
this.#selector = { start, end };
} else {
throw new TypeError(
"Selector must specify either 'prefix' or both 'start' and 'end' key.",
);
}
}
Object.freeze(this.#selector);
this.#pullBatch = pullBatch;
this.#limit = limit;
this.#reverse = reverse;
this.#consistency = consistency;
this.#batchSize = batchSize;
this.#cursorGen = cursor ? () => cursor : null;
}
get cursor(): string {
if (this.#cursorGen === null) {
throw new Error("Cannot get cursor before first iteration");
}
return this.#cursorGen();
}
async next(): Promise<IteratorResult<Deno.KvEntry>> {
// Fused or limit exceeded
if (
this.#done ||
(this.#limit !== undefined && this.#count >= this.#limit)
) {
return { done: true, value: undefined };
}
// Attempt to fill the buffer
if (!this.#entries?.length && !this.#lastBatch) {
const batch = await this.#pullBatch(
this.#selector,
this.#cursorGen ? this.#cursorGen() : undefined,
this.#reverse,
this.#consistency,
);
// Reverse the batch so we can pop from the end
batch.reverse();
this.#entries = batch;
// Last batch, do not attempt to pull more
if (batch.length < this.#batchSize) {
this.#lastBatch = true;
}
}
const entry = this.#entries?.pop();
if (!entry) {
this.#done = true;
this.#cursorGen = () => "";
return { done: true, value: undefined };
}
this.#cursorGen = () => {
const selector = this.#selector;
return encodeCursor([
"prefix" in selector ? selector.prefix : null,
"start" in selector ? selector.start : null,
"end" in selector ? selector.end : null,
], entry.key);
};
this.#count++;
return {
done: false,
value: entry,
};
}
[Symbol.asyncIterator](): AsyncIterator<Deno.KvEntry> {
return this;
}
}
export { Kv, KvListIterator, KvU64, openKv };

24
ext/kv/Cargo.toml Normal file
View file

@ -0,0 +1,24 @@
# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
[package]
name = "deno_kv"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
readme = "README.md"
repository.workspace = true
description = "Implementation of the Deno database API"
[lib]
path = "lib.rs"
[dependencies]
anyhow.workspace = true
async-trait.workspace = true
base64.workspace = true
deno_core.workspace = true
hex.workspace = true
num-bigint.workspace = true
rusqlite.workspace = true
serde.workspace = true

559
ext/kv/codec.rs Normal file
View file

@ -0,0 +1,559 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/main/foundationdb/src/tuple/pack.rs
use crate::Key;
use crate::KeyPart;
//const NIL: u8 = 0x00;
const BYTES: u8 = 0x01;
const STRING: u8 = 0x02;
//const NESTED: u8 = 0x05;
const NEGINTSTART: u8 = 0x0b;
const INTZERO: u8 = 0x14;
const POSINTEND: u8 = 0x1d;
//const FLOAT: u8 = 0x20;
const DOUBLE: u8 = 0x21;
const FALSE: u8 = 0x26;
const TRUE: u8 = 0x27;
const ESCAPE: u8 = 0xff;
const CANONICAL_NAN_POS: u64 = 0x7ff8000000000000u64;
const CANONICAL_NAN_NEG: u64 = 0xfff8000000000000u64;
pub fn canonicalize_f64(n: f64) -> f64 {
if n.is_nan() {
if n.is_sign_negative() {
f64::from_bits(CANONICAL_NAN_NEG)
} else {
f64::from_bits(CANONICAL_NAN_POS)
}
} else {
n
}
}
pub fn encode_key(key: &Key) -> std::io::Result<Vec<u8>> {
// Disallow empty key
if key.0.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"key should not be empty",
));
}
let mut output: Vec<u8> = vec![];
for part in &key.0 {
match part {
KeyPart::String(key) => {
output.push(STRING);
escape_raw_bytes_into(&mut output, key.as_bytes());
output.push(0);
}
KeyPart::Int(key) => {
bigint::encode_into(&mut output, key)?;
}
KeyPart::Float(key) => {
double::encode_into(&mut output, *key);
}
KeyPart::Bytes(key) => {
output.push(BYTES);
escape_raw_bytes_into(&mut output, key);
output.push(0);
}
KeyPart::False => {
output.push(FALSE);
}
KeyPart::True => {
output.push(TRUE);
}
}
}
Ok(output)
}
pub fn decode_key(mut bytes: &[u8]) -> std::io::Result<Key> {
// Disallow empty key
if bytes.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"key should not be empty",
));
}
let mut key = Key(vec![]);
while !bytes.is_empty() {
let tag = bytes[0];
bytes = &bytes[1..];
let next_bytes = match tag {
self::STRING => {
let (next_bytes, data) = parse_slice(bytes)?;
let data = String::from_utf8(data).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid utf8")
})?;
key.0.push(KeyPart::String(data));
next_bytes
}
self::NEGINTSTART..=self::POSINTEND => {
let (next_bytes, data) = bigint::decode_from(bytes, tag)?;
key.0.push(KeyPart::Int(data));
next_bytes
}
self::DOUBLE => {
let (next_bytes, data) = double::decode_from(bytes)?;
key.0.push(KeyPart::Float(data));
next_bytes
}
self::BYTES => {
let (next_bytes, data) = parse_slice(bytes)?;
key.0.push(KeyPart::Bytes(data));
next_bytes
}
self::FALSE => {
key.0.push(KeyPart::False);
bytes
}
self::TRUE => {
key.0.push(KeyPart::True);
bytes
}
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"invalid tag",
))
}
};
bytes = next_bytes;
}
Ok(key)
}
fn escape_raw_bytes_into(out: &mut Vec<u8>, x: &[u8]) {
for &b in x {
out.push(b);
if b == 0 {
out.push(ESCAPE);
}
}
}
mod bigint {
use num_bigint::BigInt;
use num_bigint::Sign;
use super::parse_byte;
use super::parse_bytes;
const MAX_SZ: usize = 8;
// Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/7415e116d5d96c2630976058de28e439eed7e809/foundationdb/src/tuple/pack.rs#L575
pub fn encode_into(out: &mut Vec<u8>, key: &BigInt) -> std::io::Result<()> {
if key.sign() == Sign::NoSign {
out.push(super::INTZERO);
return Ok(());
}
let (sign, mut bytes) = key.to_bytes_be();
let n = bytes.len();
match sign {
Sign::Minus => {
if n <= MAX_SZ {
out.push(super::INTZERO - n as u8);
} else {
out.extend_from_slice(&[super::NEGINTSTART, bigint_n(n)? ^ 0xff]);
}
invert(&mut bytes);
out.extend_from_slice(&bytes);
}
Sign::NoSign => unreachable!(),
Sign::Plus => {
if n <= MAX_SZ {
out.push(super::INTZERO + n as u8);
} else {
out.extend_from_slice(&[super::POSINTEND, bigint_n(n)?]);
}
out.extend_from_slice(&bytes);
}
}
Ok(())
}
pub fn decode_from(
input: &[u8],
tag: u8,
) -> std::io::Result<(&[u8], BigInt)> {
if super::INTZERO <= tag && tag <= super::INTZERO + MAX_SZ as u8 {
let n = (tag - super::INTZERO) as usize;
let (input, bytes) = parse_bytes(input, n)?;
Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes)))
} else if super::INTZERO - MAX_SZ as u8 <= tag && tag < super::INTZERO {
let n = (super::INTZERO - tag) as usize;
let (input, bytes) = parse_bytes(input, n)?;
Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes))))
} else if tag == super::NEGINTSTART {
let (input, raw_length) = parse_byte(input)?;
let n = usize::from(raw_length ^ 0xff);
let (input, bytes) = parse_bytes(input, n)?;
Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes))))
} else if tag == super::POSINTEND {
let (input, raw_length) = parse_byte(input)?;
let n: usize = usize::from(raw_length);
let (input, bytes) = parse_bytes(input, n)?;
Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes)))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("unknown bigint tag: {}", tag),
))
}
}
fn invert(bytes: &mut [u8]) {
// The ones' complement of a binary number is defined as the value
// obtained by inverting all the bits in the binary representation
// of the number (swapping 0s for 1s and vice versa).
for byte in bytes.iter_mut() {
*byte = !*byte;
}
}
fn inverted(bytes: &[u8]) -> Vec<u8> {
// The ones' complement of a binary number is defined as the value
// obtained by inverting all the bits in the binary representation
// of the number (swapping 0s for 1s and vice versa).
bytes.iter().map(|byte| !*byte).collect()
}
fn bigint_n(n: usize) -> std::io::Result<u8> {
u8::try_from(n).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"BigUint requires more than 255 bytes to be represented",
)
})
}
}
mod double {
macro_rules! sign_bit {
($type:ident) => {
(1 << (std::mem::size_of::<$type>() * 8 - 1))
};
}
fn f64_to_ux_be_bytes(f: f64) -> [u8; 8] {
let u = if f.is_sign_negative() {
f.to_bits() ^ ::std::u64::MAX
} else {
f.to_bits() ^ sign_bit!(u64)
};
u.to_be_bytes()
}
pub fn encode_into(out: &mut Vec<u8>, x: f64) {
out.push(super::DOUBLE);
out.extend_from_slice(&f64_to_ux_be_bytes(super::canonicalize_f64(x)));
}
pub fn decode_from(input: &[u8]) -> std::io::Result<(&[u8], f64)> {
let (input, bytes) = super::parse_bytes(input, 8)?;
let mut arr = [0u8; 8];
arr.copy_from_slice(bytes);
let u = u64::from_be_bytes(arr);
Ok((
input,
f64::from_bits(if (u & sign_bit!(u64)) == 0 {
u ^ ::std::u64::MAX
} else {
u ^ sign_bit!(u64)
}),
))
}
}
#[inline]
fn parse_bytes(input: &[u8], num: usize) -> std::io::Result<(&[u8], &[u8])> {
if input.len() < num {
Err(std::io::ErrorKind::UnexpectedEof.into())
} else {
Ok((&input[num..], &input[..num]))
}
}
#[inline]
fn parse_byte(input: &[u8]) -> std::io::Result<(&[u8], u8)> {
if input.is_empty() {
Err(std::io::ErrorKind::UnexpectedEof.into())
} else {
Ok((&input[1..], input[0]))
}
}
fn parse_slice(input: &[u8]) -> std::io::Result<(&[u8], Vec<u8>)> {
let mut output: Vec<u8> = Vec::new();
let mut i = 0usize;
while i < input.len() {
let byte = input[i];
i += 1;
if byte == 0 {
if input.get(i).copied() == Some(ESCAPE) {
output.push(0);
i += 1;
continue;
} else {
return Ok((&input[i..], output));
}
}
output.push(byte);
}
Err(std::io::ErrorKind::UnexpectedEof.into())
}
#[cfg(test)]
mod tests {
use num_bigint::BigInt;
use std::cmp::Ordering;
use crate::Key;
use crate::KeyPart;
use super::decode_key;
use super::encode_key;
fn roundtrip(key: Key) {
let bytes = encode_key(&key).unwrap();
let decoded = decode_key(&bytes).unwrap();
assert_eq!(&key, &decoded);
assert_eq!(format!("{:?}", key), format!("{:?}", decoded));
}
fn check_order(a: Key, b: Key, expected: Ordering) {
let a_bytes = encode_key(&a).unwrap();
let b_bytes = encode_key(&b).unwrap();
assert_eq!(a.cmp(&b), expected);
assert_eq!(a_bytes.cmp(&b_bytes), expected);
}
fn check_bijection(key: Key, serialized: &[u8]) {
let bytes = encode_key(&key).unwrap();
assert_eq!(&bytes[..], serialized);
let decoded = decode_key(serialized).unwrap();
assert_eq!(&key, &decoded);
}
#[test]
fn simple_roundtrip() {
roundtrip(Key(vec![
KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00]),
KeyPart::String("foo".to_string()),
KeyPart::Float(-f64::NAN),
KeyPart::Float(-f64::INFINITY),
KeyPart::Float(-42.1),
KeyPart::Float(-0.0),
KeyPart::Float(0.0),
KeyPart::Float(42.1),
KeyPart::Float(f64::INFINITY),
KeyPart::Float(f64::NAN),
KeyPart::Int(BigInt::from(-10000)),
KeyPart::Int(BigInt::from(-1)),
KeyPart::Int(BigInt::from(0)),
KeyPart::Int(BigInt::from(1)),
KeyPart::Int(BigInt::from(10000)),
KeyPart::False,
KeyPart::True,
]));
}
#[test]
#[rustfmt::skip]
fn order_bytes() {
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Ordering::Equal,
);
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Ordering::Greater,
);
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Ordering::Greater,
);
}
#[test]
#[rustfmt::skip]
fn order_tags() {
check_order(
Key(vec![KeyPart::Bytes(vec![])]),
Key(vec![KeyPart::String("".into())]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::String("".into())]),
Key(vec![KeyPart::Int(BigInt::from(0))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(0))]),
Key(vec![KeyPart::Float(0.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(0.0)]),
Key(vec![KeyPart::False]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::False]),
Key(vec![KeyPart::True]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::True]),
Key(vec![KeyPart::Bytes(vec![])]),
Ordering::Greater,
);
}
#[test]
#[rustfmt::skip]
fn order_floats() {
check_order(
Key(vec![KeyPart::Float(-f64::NAN)]),
Key(vec![KeyPart::Float(-f64::INFINITY)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(-f64::INFINITY)]),
Key(vec![KeyPart::Float(-10.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(-10.0)]),
Key(vec![KeyPart::Float(-0.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(-0.0)]),
Key(vec![KeyPart::Float(0.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(0.0)]),
Key(vec![KeyPart::Float(10.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(10.0)]),
Key(vec![KeyPart::Float(f64::INFINITY)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(f64::INFINITY)]),
Key(vec![KeyPart::Float(f64::NAN)]),
Ordering::Less,
);
}
#[test]
#[rustfmt::skip]
fn order_ints() {
check_order(
Key(vec![KeyPart::Int(BigInt::from(-10000))]),
Key(vec![KeyPart::Int(BigInt::from(-100))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(-100))]),
Key(vec![KeyPart::Int(BigInt::from(-1))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(-1))]),
Key(vec![KeyPart::Int(BigInt::from(0))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(0))]),
Key(vec![KeyPart::Int(BigInt::from(1))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(1))]),
Key(vec![KeyPart::Int(BigInt::from(100))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(100))]),
Key(vec![KeyPart::Int(BigInt::from(10000))]),
Ordering::Less,
);
}
#[test]
#[rustfmt::skip]
fn float_canonicalization() {
let key1 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000001))]);
let key2 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000002))]);
assert_eq!(key1, key2);
assert_eq!(encode_key(&key1).unwrap(), encode_key(&key2).unwrap());
}
#[test]
#[rustfmt::skip]
fn explicit_bijection() {
// string
check_bijection(
Key(vec![KeyPart::String("hello".into())]),
&[0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00],
);
// zero byte escape
check_bijection(
Key(vec![KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08])]),
&[0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00],
);
// array
check_bijection(
Key(vec![
KeyPart::String("hello".into()),
KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08]),
]),
&[
0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, /* string */
0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00, /* bytes */
],
);
}
}

294
ext/kv/interface.rs Normal file
View file

@ -0,0 +1,294 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::cmp::Ordering;
use std::num::NonZeroU32;
use std::rc::Rc;
use async_trait::async_trait;
use deno_core::error::AnyError;
use deno_core::OpState;
use num_bigint::BigInt;
use crate::codec::canonicalize_f64;
#[async_trait(?Send)]
pub trait DatabaseHandler {
type DB: Database + 'static;
async fn open(
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Self::DB, AnyError>;
}
#[async_trait(?Send)]
pub trait Database {
async fn snapshot_read(
&self,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn atomic_write(&self, write: AtomicWrite) -> Result<bool, AnyError>;
}
/// Options for a snapshot read.
pub struct SnapshotReadOptions {
pub consistency: Consistency,
}
/// The consistency of a read.
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum Consistency {
Strong,
Eventual,
}
/// A key is for a KV pair. It is a vector of KeyParts.
///
/// The ordering of the keys is defined by the ordering of the KeyParts. The
/// first KeyPart is the most significant, and the last KeyPart is the least
/// significant.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)]
pub struct Key(pub Vec<KeyPart>);
/// A key part is single item in a key. It can be a boolean, a double float, a
/// variable precision signed integer, a UTF-8 string, or an arbitrary byte
/// array.
///
/// The ordering of a KeyPart is dependent on the type of the KeyPart.
///
/// Between different types, the ordering is as follows: arbitrary byte array <
/// UTF-8 string < variable precision signed integer < double float < false < true.
///
/// Within a type, the ordering is as follows:
/// - For a **boolean**, false is less than true.
/// - For a **double float**, the ordering must follow -NaN < -Infinity < -100.0 < -1.0 < -0.5 < -0.0 < 0.0 < 0.5 < 1.0 < 100.0 < Infinity < NaN.
/// - For a **variable precision signed integer**, the ordering must follow mathematical ordering.
/// - For a **UTF-8 string**, the ordering must follow the UTF-8 byte ordering.
/// - For an **arbitrary byte array**, the ordering must follow the byte ordering.
///
/// This means that the key part `1.0` is less than the key part `2.0`, but is
/// greater than the key part `0n`, because `1.0` is a double float and `0n`
/// is a variable precision signed integer, and the ordering types obviously has
/// precedence over the ordering within a type.
#[derive(Clone, Debug)]
pub enum KeyPart {
Bytes(Vec<u8>),
String(String),
Int(BigInt),
Float(f64),
False,
True,
}
impl KeyPart {
fn tag_ordering(&self) -> u8 {
match self {
KeyPart::Bytes(_) => 0,
KeyPart::String(_) => 1,
KeyPart::Int(_) => 2,
KeyPart::Float(_) => 3,
KeyPart::False => 4,
KeyPart::True => 5,
}
}
}
impl Eq for KeyPart {}
impl PartialEq for KeyPart {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Ord for KeyPart {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(KeyPart::Bytes(b1), KeyPart::Bytes(b2)) => b1.cmp(b2),
(KeyPart::String(s1), KeyPart::String(s2)) => {
s1.as_bytes().cmp(s2.as_bytes())
}
(KeyPart::Int(i1), KeyPart::Int(i2)) => i1.cmp(i2),
(KeyPart::Float(f1), KeyPart::Float(f2)) => {
canonicalize_f64(*f1).total_cmp(&canonicalize_f64(*f2))
}
_ => self.tag_ordering().cmp(&other.tag_ordering()),
}
}
}
impl PartialOrd for KeyPart {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// A request to read a range of keys from the database. If `end` is `None`,
/// then the range is from `start` shall also be used as the end of the range.
///
/// The range is inclusive of the start and exclusive of the end. The start may
/// not be greater than the end.
///
/// The range is limited to `limit` number of entries.
pub struct ReadRange {
pub start: Vec<u8>,
pub end: Vec<u8>,
pub limit: NonZeroU32,
pub reverse: bool,
}
/// A response to a `ReadRange` request.
pub struct ReadRangeOutput {
pub entries: Vec<KvEntry>,
}
/// A versionstamp is a 10 byte array that is used to represent the version of
/// a key in the database.
type Versionstamp = [u8; 10];
/// A key-value entry with a versionstamp.
pub struct KvEntry {
pub key: Vec<u8>,
pub value: Value,
pub versionstamp: Versionstamp,
}
/// A serialized value for a KV pair as stored in the database. All values
/// **can** be serialized into the V8 representation, but not all values are.
///
/// The V8 representation is an opaque byte array that is only meaningful to
/// the V8 engine. It is guaranteed to be backwards compatible. Because this
/// representation is opaque, it is not possible to inspect or modify the value
/// without deserializing it.
///
/// The inability to inspect or modify the value without deserializing it means
/// that these values can not be quickly modified when performing atomic
/// read-modify-write operations on the database (because the database may not
/// have the ability to deserialize the V8 value into a modifiable value).
///
/// Because of this constraint, there are more specialized representations for
/// certain types of values that can be used in atomic read-modify-write
/// operations. These specialized representations are:
///
/// - **Bytes**: an arbitrary byte array.
/// - **U64**: a 64-bit unsigned integer.
pub enum Value {
V8(Vec<u8>),
Bytes(Vec<u8>),
U64(u64),
}
/// A request to perform an atomic check-modify-write operation on the database.
///
/// The operation is performed atomically, meaning that the operation will
/// either succeed or fail. If the operation fails, then the database will be
/// left in the same state as before the operation was attempted. If the
/// operation succeeds, then the database will be left in a new state.
///
/// The operation is performed by first checking the database for the current
/// state of the keys, defined by the `checks` field. If the current state of
/// the keys does not match the expected state, then the operation fails. If
/// the current state of the keys matches the expected state, then the
/// mutations are applied to the database.
///
/// All checks and mutations are performed atomically.
///
/// The mutations are performed in the order that they are specified in the
/// `mutations` field. The order of checks is not specified, and is also not
/// important because this ordering is un-observable.
pub struct AtomicWrite {
pub checks: Vec<KvCheck>,
pub mutations: Vec<KvMutation>,
pub enqueues: Vec<Enqueue>,
}
/// A request to perform a check on a key in the database. The check is not
/// performed on the value of the key, but rather on the versionstamp of the
/// key.
pub struct KvCheck {
pub key: Vec<u8>,
pub versionstamp: Option<Versionstamp>,
}
/// A request to perform a mutation on a key in the database. The mutation is
/// performed on the value of the key.
///
/// The type of mutation is specified by the `kind` field. The action performed
/// by each mutation kind is specified in the docs for [MutationKind].
pub struct KvMutation {
pub key: Vec<u8>,
pub kind: MutationKind,
}
/// A request to enqueue a message to the database. This message is delivered
/// to a listener of the queue at least once.
///
/// ## Retry
///
/// When the delivery of a message fails, it is retried for a finite number
/// of times. Each retry happens after a backoff period. The backoff periods
/// are specified by the `backoff_schedule` field in milliseconds. If
/// unspecified, the default backoff schedule of the platform (CLI or Deploy)
/// is used.
///
/// If all retry attempts failed, the message is written to the KV under all
/// keys specified in `keys_if_undelivered`.
pub struct Enqueue {
pub payload: Vec<u8>,
pub deadline_ms: u64,
pub keys_if_undelivered: Vec<Vec<u8>>,
pub backoff_schedule: Option<Vec<u32>>,
}
/// The type of mutation to perform on a key in the database.
///
/// ## Set
///
/// The set mutation sets the value of the key to the specified value. It
/// discards the previous value of the key, if any.
///
/// This operand supports all [Value] types.
///
/// ## Delete
///
/// The delete mutation deletes the value of the key.
///
/// ## Sum
///
/// The sum mutation adds the specified value to the existing value of the key.
///
/// This operand supports only value types [Value::U64]. The existing value in
/// the database must match the type of the value specified in the mutation. If
/// the key does not exist in the database, then the value specified in the
/// mutation is used as the new value of the key.
///
/// ## Min
///
/// The min mutation sets the value of the key to the minimum of the existing
/// value of the key and the specified value.
///
/// This operand supports only value types [Value::U64]. The existing value in
/// the database must match the type of the value specified in the mutation. If
/// the key does not exist in the database, then the value specified in the
/// mutation is used as the new value of the key.
///
/// ## Max
///
/// The max mutation sets the value of the key to the maximum of the existing
/// value of the key and the specified value.
///
/// This operand supports only value types [Value::U64]. The existing value in
/// the database must match the type of the value specified in the mutation. If
/// the key does not exist in the database, then the value specified in the
/// mutation is used as the new value of the key.
pub enum MutationKind {
Set(Value),
Delete,
Sum(Value),
Min(Value),
Max(Value),
}

541
ext/kv/lib.rs Normal file
View file

@ -0,0 +1,541 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
pub mod codec;
mod interface;
pub mod sqlite;
use std::borrow::Cow;
use std::cell::RefCell;
use std::num::NonZeroU32;
use std::rc::Rc;
use codec::decode_key;
use codec::encode_key;
use deno_core::anyhow::Context;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::serde_v8::AnyValue;
use deno_core::serde_v8::BigInt;
use deno_core::ByteString;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
use serde::Serialize;
pub use crate::interface::*;
struct UnstableChecker {
pub unstable: bool,
}
impl UnstableChecker {
// NOTE(bartlomieju): keep in sync with `cli/program_state.rs`
pub fn check_unstable(&self, api_name: &str) {
if !self.unstable {
eprintln!(
"Unstable API '{api_name}'. The --unstable flag must be provided."
);
std::process::exit(70);
}
}
}
deno_core::extension!(deno_kv,
// TODO(bartlomieju): specify deps
deps = [ ],
parameters = [ DBH: DatabaseHandler ],
ops = [
op_kv_database_open<DBH>,
op_kv_snapshot_read<DBH>,
op_kv_atomic_write<DBH>,
op_kv_encode_cursor,
],
esm = [ "01_db.ts" ],
options = {
handler: DBH,
unstable: bool,
},
state = |state, options| {
state.put(Rc::new(options.handler));
state.put(UnstableChecker { unstable: options.unstable })
}
);
struct DatabaseResource<DB: Database + 'static> {
db: Rc<DB>,
}
impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
fn name(&self) -> Cow<str> {
"database".into()
}
}
#[op]
async fn op_kv_database_open<DBH>(
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<ResourceId, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let handler = {
let state = state.borrow();
state
.borrow::<UnstableChecker>()
.check_unstable("Deno.openKv");
state.borrow::<Rc<DBH>>().clone()
};
let db = handler.open(state.clone(), path).await?;
let rid = state
.borrow_mut()
.resource_table
.add(DatabaseResource { db: Rc::new(db) });
Ok(rid)
}
type KvKey = Vec<AnyValue>;
impl From<AnyValue> for KeyPart {
fn from(value: AnyValue) -> Self {
match value {
AnyValue::Bool(false) => KeyPart::True,
AnyValue::Bool(true) => KeyPart::False,
AnyValue::Number(n) => KeyPart::Float(n),
AnyValue::BigInt(n) => KeyPart::Int(n),
AnyValue::String(s) => KeyPart::String(s),
AnyValue::Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
}
}
}
impl From<KeyPart> for AnyValue {
fn from(value: KeyPart) -> Self {
match value {
KeyPart::True => AnyValue::Bool(false),
KeyPart::False => AnyValue::Bool(true),
KeyPart::Float(n) => AnyValue::Number(n),
KeyPart::Int(n) => AnyValue::BigInt(n),
KeyPart::String(s) => AnyValue::String(s),
KeyPart::Bytes(buf) => AnyValue::Buffer(buf.into()),
}
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
enum V8Value {
V8(ZeroCopyBuf),
Bytes(ZeroCopyBuf),
U64(BigInt),
}
impl TryFrom<V8Value> for Value {
type Error = AnyError;
fn try_from(value: V8Value) -> Result<Self, AnyError> {
Ok(match value {
V8Value::V8(buf) => Value::V8(buf.to_vec()),
V8Value::Bytes(buf) => Value::Bytes(buf.to_vec()),
V8Value::U64(n) => Value::U64(num_bigint::BigInt::from(n).try_into()?),
})
}
}
impl From<Value> for V8Value {
fn from(value: Value) -> Self {
match value {
Value::V8(buf) => V8Value::V8(buf.into()),
Value::Bytes(buf) => V8Value::Bytes(buf.into()),
Value::U64(n) => V8Value::U64(num_bigint::BigInt::from(n).into()),
}
}
}
#[derive(Deserialize, Serialize)]
struct V8KvEntry {
key: KvKey,
value: V8Value,
versionstamp: ByteString,
}
impl TryFrom<KvEntry> for V8KvEntry {
type Error = AnyError;
fn try_from(entry: KvEntry) -> Result<Self, AnyError> {
Ok(V8KvEntry {
key: decode_key(&entry.key)?
.0
.into_iter()
.map(Into::into)
.collect(),
value: entry.value.into(),
versionstamp: hex::encode(entry.versionstamp).into(),
})
}
}
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
enum V8Consistency {
Strong,
Eventual,
}
impl From<V8Consistency> for Consistency {
fn from(value: V8Consistency) -> Self {
match value {
V8Consistency::Strong => Consistency::Strong,
V8Consistency::Eventual => Consistency::Eventual,
}
}
}
// (prefix, start, end, limit, reverse, cursor)
type SnapshotReadRange = (
Option<KvKey>,
Option<KvKey>,
Option<KvKey>,
u32,
bool,
Option<ByteString>,
);
#[op]
async fn op_kv_snapshot_read<DBH>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
ranges: Vec<SnapshotReadRange>,
consistency: V8Consistency,
) -> Result<Vec<Vec<V8KvEntry>>, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
let resource =
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
resource.db.clone()
};
let read_ranges = ranges
.into_iter()
.map(|(prefix, start, end, limit, reverse, cursor)| {
let selector = RawSelector::from_tuple(prefix, start, end)?;
let (start, end) =
decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
Ok(ReadRange {
start,
end,
limit: NonZeroU32::new(limit)
.with_context(|| "limit must be greater than 0")?,
reverse,
})
})
.collect::<Result<Vec<_>, AnyError>>()?;
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
let output_ranges = db.snapshot_read(read_ranges, opts).await?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
x.entries
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, AnyError>>()
})
.collect::<Result<Vec<_>, AnyError>>()?;
Ok(output_ranges)
}
type V8KvCheck = (KvKey, Option<ByteString>);
impl TryFrom<V8KvCheck> for KvCheck {
type Error = AnyError;
fn try_from(value: V8KvCheck) -> Result<Self, AnyError> {
let versionstamp = match value.1 {
Some(data) => {
let mut out = [0u8; 10];
hex::decode_to_slice(data, &mut out)
.map_err(|_| type_error("invalid versionstamp"))?;
Some(out)
}
None => None,
};
Ok(KvCheck {
key: encode_v8_key(value.0)?,
versionstamp,
})
}
}
type V8KvMutation = (KvKey, String, Option<V8Value>);
impl TryFrom<V8KvMutation> for KvMutation {
type Error = AnyError;
fn try_from(value: V8KvMutation) -> Result<Self, AnyError> {
let key = encode_v8_key(value.0)?;
let kind = match (value.1.as_str(), value.2) {
("set", Some(value)) => MutationKind::Set(value.try_into()?),
("delete", None) => MutationKind::Delete,
("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
("min", Some(value)) => MutationKind::Min(value.try_into()?),
("max", Some(value)) => MutationKind::Max(value.try_into()?),
(op, Some(_)) => {
return Err(type_error(format!("invalid mutation '{op}' with value")))
}
(op, None) => {
return Err(type_error(format!(
"invalid mutation '{op}' without value"
)))
}
};
Ok(KvMutation { key, kind })
}
}
type V8Enqueue = (ZeroCopyBuf, u64, Vec<KvKey>, Option<Vec<u32>>);
impl TryFrom<V8Enqueue> for Enqueue {
type Error = AnyError;
fn try_from(value: V8Enqueue) -> Result<Self, AnyError> {
Ok(Enqueue {
payload: value.0.to_vec(),
deadline_ms: value.1,
keys_if_undelivered: value
.2
.into_iter()
.map(encode_v8_key)
.collect::<std::io::Result<_>>()?,
backoff_schedule: value.3,
})
}
}
fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
encode_key(&Key(key.into_iter().map(From::from).collect()))
}
enum RawSelector {
Prefixed {
prefix: Vec<u8>,
start: Option<Vec<u8>>,
end: Option<Vec<u8>>,
},
Range {
start: Vec<u8>,
end: Vec<u8>,
},
}
impl RawSelector {
fn from_tuple(
prefix: Option<KvKey>,
start: Option<KvKey>,
end: Option<KvKey>,
) -> Result<Self, AnyError> {
let prefix = prefix.map(encode_v8_key).transpose()?;
let start = start.map(encode_v8_key).transpose()?;
let end = end.map(encode_v8_key).transpose()?;
match (prefix, start, end) {
(Some(prefix), None, None) => Ok(Self::Prefixed {
prefix,
start: None,
end: None,
}),
(Some(prefix), Some(start), None) => Ok(Self::Prefixed {
prefix,
start: Some(start),
end: None,
}),
(Some(prefix), None, Some(end)) => Ok(Self::Prefixed {
prefix,
start: None,
end: Some(end),
}),
(None, Some(start), Some(end)) => Ok(Self::Range { start, end }),
(None, Some(start), None) => {
let end = start.iter().copied().chain(Some(0)).collect();
Ok(Self::Range { start, end })
}
_ => Err(type_error("invalid range")),
}
}
fn start(&self) -> Option<&[u8]> {
match self {
Self::Prefixed { start, .. } => start.as_deref(),
Self::Range { start, .. } => Some(start),
}
}
fn end(&self) -> Option<&[u8]> {
match self {
Self::Prefixed { end, .. } => end.as_deref(),
Self::Range { end, .. } => Some(end),
}
}
fn common_prefix(&self) -> &[u8] {
match self {
Self::Prefixed { prefix, .. } => prefix,
Self::Range { start, end } => common_prefix_for_bytes(start, end),
}
}
fn range_start_key(&self) -> Vec<u8> {
match self {
Self::Prefixed {
start: Some(start), ..
} => start.clone(),
Self::Range { start, .. } => start.clone(),
Self::Prefixed { prefix, .. } => {
prefix.iter().copied().chain(Some(0)).collect()
}
}
}
fn range_end_key(&self) -> Vec<u8> {
match self {
Self::Prefixed { end: Some(end), .. } => end.clone(),
Self::Range { end, .. } => end.clone(),
Self::Prefixed { prefix, .. } => {
prefix.iter().copied().chain(Some(0xff)).collect()
}
}
}
}
fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
let mut i = 0;
while i < a.len() && i < b.len() && a[i] == b[i] {
i += 1;
}
&a[..i]
}
fn encode_cursor(
selector: &RawSelector,
boundary_key: &[u8],
) -> Result<String, AnyError> {
let common_prefix = selector.common_prefix();
if !boundary_key.starts_with(common_prefix) {
return Err(type_error("invalid boundary key"));
}
Ok(base64::encode_config(
&boundary_key[common_prefix.len()..],
base64::URL_SAFE,
))
}
fn decode_selector_and_cursor(
selector: &RawSelector,
reverse: bool,
cursor: Option<&ByteString>,
) -> Result<(Vec<u8>, Vec<u8>), AnyError> {
let Some(cursor) = cursor else {
return Ok((selector.range_start_key(), selector.range_end_key()));
};
let common_prefix = selector.common_prefix();
let cursor = base64::decode_config(cursor, base64::URL_SAFE)
.map_err(|_| type_error("invalid cursor"))?;
let first_key: Vec<u8>;
let last_key: Vec<u8>;
if reverse {
first_key = selector.range_start_key();
last_key = common_prefix
.iter()
.copied()
.chain(cursor.iter().copied())
.collect();
} else {
first_key = common_prefix
.iter()
.copied()
.chain(cursor.iter().copied())
.chain(Some(0))
.collect();
last_key = selector.range_end_key();
}
// Defend against out-of-bounds reading
if let Some(start) = selector.start() {
if &first_key[..] < start {
return Err(type_error("cursor out of bounds"));
}
}
if let Some(end) = selector.end() {
if &last_key[..] > end {
return Err(type_error("cursor out of bounds"));
}
}
Ok((first_key, last_key))
}
#[op]
async fn op_kv_atomic_write<DBH>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
checks: Vec<V8KvCheck>,
mutations: Vec<V8KvMutation>,
enqueues: Vec<V8Enqueue>,
) -> Result<bool, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
let resource =
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
resource.db.clone()
};
let checks = checks
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, AnyError>>()
.with_context(|| "invalid check")?;
let mutations = mutations
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, AnyError>>()
.with_context(|| "invalid mutation")?;
let enqueues = enqueues
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, AnyError>>()
.with_context(|| "invalid enqueue")?;
let atomic_write = AtomicWrite {
checks,
mutations,
enqueues,
};
let result = db.atomic_write(atomic_write).await?;
Ok(result)
}
// (prefix, start, end)
type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);
#[op]
fn op_kv_encode_cursor(
(prefix, start, end): EncodeCursorRangeSelector,
boundary_key: KvKey,
) -> Result<String, AnyError> {
let selector = RawSelector::from_tuple(prefix, start, end)?;
let boundary_key = encode_v8_key(boundary_key)?;
let cursor = encode_cursor(&selector, &boundary_key)?;
Ok(cursor)
}

348
ext/kv/sqlite.rs Normal file
View file

@ -0,0 +1,348 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::OpState;
use rusqlite::params;
use rusqlite::OptionalExtension;
use rusqlite::Transaction;
use crate::AtomicWrite;
use crate::Database;
use crate::DatabaseHandler;
use crate::KvEntry;
use crate::MutationKind;
use crate::ReadRange;
use crate::ReadRangeOutput;
use crate::SnapshotReadOptions;
use crate::Value;
const STATEMENT_INC_AND_GET_DATA_VERSION: &str =
"update data_version set version = version + 1 where k = 0 returning version";
const STATEMENT_KV_RANGE_SCAN: &str =
"select k, v, v_encoding, version from kv where k >= ? and k < ? order by k asc limit ?";
const STATEMENT_KV_RANGE_SCAN_REVERSE: &str =
"select k, v, v_encoding, version from kv where k >= ? and k < ? order by k desc limit ?";
const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str =
"select v, v_encoding from kv where k = ?";
const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str =
"select version from kv where k = ?";
const STATEMENT_KV_POINT_SET: &str =
"insert into kv (k, v, v_encoding, version) values (:k, :v, :v_encoding, :version) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version";
const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?";
const STATEMENT_CREATE_MIGRATION_TABLE: &str = "
create table if not exists migration_state(
k integer not null primary key,
version integer not null
)
";
const MIGRATIONS: [&str; 2] = [
"
create table data_version (
k integer primary key,
version integer not null
);
insert into data_version (k, version) values (0, 0);
create table kv (
k blob primary key,
v blob not null,
v_encoding integer not null,
version integer not null
) without rowid;
",
"
create table queue (
ts integer not null,
id text not null,
data blob not null,
backoff_schedule text not null,
keys_if_undelivered blob not null,
primary key (ts, id)
);
create table queue_running(
deadline integer not null,
id text not null,
data blob not null,
backoff_schedule text not null,
keys_if_undelivered blob not null,
primary key (deadline, id)
);
",
];
pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
pub default_storage_dir: Option<PathBuf>,
_permissions: PhantomData<P>,
}
pub trait SqliteDbHandlerPermissions {
fn check_read(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError>;
fn check_write(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError>;
}
impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> {
pub fn new(default_storage_dir: Option<PathBuf>) -> Self {
Self {
default_storage_dir,
_permissions: PhantomData,
}
}
}
#[async_trait(?Send)]
impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
type DB = SqliteDb;
async fn open(
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Self::DB, AnyError> {
let conn = match (path.as_deref(), &self.default_storage_dir) {
(Some(":memory:") | None, None) => {
rusqlite::Connection::open_in_memory()?
}
(Some(path), _) => {
let path = Path::new(path);
{
let mut state = state.borrow_mut();
let permissions = state.borrow_mut::<P>();
permissions.check_read(path, "Deno.openKv")?;
permissions.check_write(path, "Deno.openKv")?;
}
rusqlite::Connection::open(path)?
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
rusqlite::Connection::open(&path)?
}
};
conn.pragma_update(None, "journal_mode", "wal")?;
conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
let current_version: usize = conn
.query_row(
"select version from migration_state where k = 0",
[],
|row| row.get(0),
)
.optional()?
.unwrap_or(0);
for (i, migration) in MIGRATIONS.iter().enumerate() {
let version = i + 1;
if version > current_version {
conn.execute_batch(migration)?;
conn.execute(
"replace into migration_state (k, version) values(?, ?)",
[&0, &version],
)?;
}
}
Ok(SqliteDb(RefCell::new(conn)))
}
}
pub struct SqliteDb(RefCell<rusqlite::Connection>);
#[async_trait(?Send)]
impl Database for SqliteDb {
async fn snapshot_read(
&self,
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
let mut responses = Vec::with_capacity(requests.len());
let mut db = self.0.borrow_mut();
let tx = db.transaction()?;
for request in requests {
let mut stmt = tx.prepare_cached(if request.reverse {
STATEMENT_KV_RANGE_SCAN_REVERSE
} else {
STATEMENT_KV_RANGE_SCAN
})?;
let entries = stmt
.query_map(
(
request.start.as_slice(),
request.end.as_slice(),
request.limit.get(),
),
|row| {
let key: Vec<u8> = row.get(0)?;
let value: Vec<u8> = row.get(1)?;
let encoding: i64 = row.get(2)?;
let value = decode_value(value, encoding);
let version: i64 = row.get(3)?;
Ok(KvEntry {
key,
value,
versionstamp: version_to_versionstamp(version),
})
},
)?
.collect::<Result<Vec<_>, rusqlite::Error>>()?;
responses.push(ReadRangeOutput { entries });
}
Ok(responses)
}
async fn atomic_write(&self, write: AtomicWrite) -> Result<bool, AnyError> {
let mut db = self.0.borrow_mut();
let tx = db.transaction()?;
for check in write.checks {
let real_versionstamp = tx
.prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
.query_row([check.key.as_slice()], |row| row.get(0))
.optional()?
.map(version_to_versionstamp);
if real_versionstamp != check.versionstamp {
return Ok(false);
}
}
let version: i64 = tx
.prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
.query_row([], |row| row.get(0))?;
for mutation in write.mutations {
match mutation.kind {
MutationKind::Set(value) => {
let (value, encoding) = encode_value(&value);
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_SET)?
.execute(params![mutation.key, &value, &encoding, &version])?;
assert_eq!(changed, 1)
}
MutationKind::Delete => {
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_DELETE)?
.execute(params![mutation.key])?;
assert!(changed == 0 || changed == 1)
}
MutationKind::Sum(operand) => {
mutate_le64(&tx, &mutation.key, "sum", &operand, version, |a, b| {
a.wrapping_add(b)
})?;
}
MutationKind::Min(operand) => {
mutate_le64(&tx, &mutation.key, "min", &operand, version, |a, b| {
a.min(b)
})?;
}
MutationKind::Max(operand) => {
mutate_le64(&tx, &mutation.key, "max", &operand, version, |a, b| {
a.max(b)
})?;
}
}
}
// TODO(@losfair): enqueues
tx.commit()?;
Ok(true)
}
}
/// Mutates a LE64 value in the database, defaulting to setting it to the
/// operand if it doesn't exist.
fn mutate_le64(
tx: &Transaction,
key: &[u8],
op_name: &str,
operand: &Value,
new_version: i64,
mutate: impl FnOnce(u64, u64) -> u64,
) -> Result<(), AnyError> {
let Value::U64(operand) = *operand else {
return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 operand")));
};
let old_value = tx
.prepare_cached(STATEMENT_KV_POINT_GET_VALUE_ONLY)?
.query_row([key], |row| {
let value: Vec<u8> = row.get(0)?;
let encoding: i64 = row.get(1)?;
let value = decode_value(value, encoding);
Ok(value)
})
.optional()?;
let new_value = match old_value {
Some(Value::U64(old_value) ) => mutate(old_value, operand),
Some(_) => return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 value in the database"))),
None => operand,
};
let new_value = Value::U64(new_value);
let (new_value, encoding) = encode_value(&new_value);
let changed = tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![
key,
&new_value[..],
encoding,
new_version
])?;
assert_eq!(changed, 1);
Ok(())
}
fn version_to_versionstamp(version: i64) -> [u8; 10] {
let mut versionstamp = [0; 10];
versionstamp[..8].copy_from_slice(&version.to_be_bytes());
versionstamp
}
const VALUE_ENCODING_V8: i64 = 1;
const VALUE_ENCODING_LE64: i64 = 2;
const VALUE_ENCODING_BYTES: i64 = 3;
fn decode_value(value: Vec<u8>, encoding: i64) -> crate::Value {
match encoding {
VALUE_ENCODING_V8 => crate::Value::V8(value),
VALUE_ENCODING_BYTES => crate::Value::Bytes(value),
VALUE_ENCODING_LE64 => {
let mut buf = [0; 8];
buf.copy_from_slice(&value);
crate::Value::U64(u64::from_le_bytes(buf))
}
_ => todo!(),
}
}
fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
match value {
crate::Value::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8),
crate::Value::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES),
crate::Value::U64(value) => {
let mut buf = [0; 8];
buf.copy_from_slice(&value.to_le_bytes());
(Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64)
}
}
}

View file

@ -18,7 +18,7 @@ aes.workspace = true
cbc.workspace = true
deno_core.workspace = true
digest = { version = "0.10.5", features = ["core-api", "std"] }
hex = "0.4.3"
hex.workspace = true
idna = "0.3.0"
indexmap.workspace = true
md-5 = "0.10.5"

View file

@ -47,6 +47,7 @@ deno_http.workspace = true
deno_io.workspace = true
deno_net.workspace = true
deno_node.workspace = true
deno_kv.workspace = true
deno_tls.workspace = true
deno_url.workspace = true
deno_web.workspace = true
@ -71,6 +72,7 @@ deno_flash.workspace = true
deno_fs.workspace = true
deno_http.workspace = true
deno_io.workspace = true
deno_kv.workspace = true
deno_napi.workspace = true
deno_net.workspace = true
deno_node.workspace = true

View file

@ -200,6 +200,24 @@ mod startup_snapshot {
}
}
impl deno_kv::sqlite::SqliteDbHandlerPermissions for Permissions {
fn check_read(
&mut self,
_path: &Path,
_api_name: &str,
) -> Result<(), AnyError> {
unreachable!("snapshotting!")
}
fn check_write(
&mut self,
_path: &Path,
_api_name: &str,
) -> Result<(), AnyError> {
unreachable!("snapshotting!")
}
}
deno_core::extension!(runtime,
deps = [
deno_webidl,
@ -289,6 +307,10 @@ mod startup_snapshot {
None,
),
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(
deno_kv::sqlite::SqliteDbHandler::<Permissions>::new(None),
false, // No --unstable
),
deno_napi::deno_napi::init_ops_and_esm::<Permissions>(),
deno_http::deno_http::init_ops_and_esm(),
deno_io::deno_io::init_ops_and_esm(Default::default()),

View file

@ -23,6 +23,7 @@ import * as signals from "ext:runtime/40_signals.js";
import * as tty from "ext:runtime/40_tty.js";
// TODO(bartlomieju): this is funky we have two `http` imports
import * as httpRuntime from "ext:runtime/40_http.js";
import * as kv from "ext:deno_kv/01_db.ts";
const denoNs = {
metrics: core.metrics,
@ -169,6 +170,10 @@ const denoNsUnstable = {
funlockSync: fs.funlockSync,
upgradeHttp: http.upgradeHttp,
upgradeHttpRaw: flash.upgradeHttpRaw,
openKv: kv.openKv,
Kv: kv.Kv,
KvU64: kv.KvU64,
KvListIterator: kv.KvListIterator,
};
export { denoNs, denoNsUnstable };

View file

@ -11,6 +11,7 @@ pub use deno_flash;
pub use deno_fs;
pub use deno_http;
pub use deno_io;
pub use deno_kv;
pub use deno_napi;
pub use deno_net;
pub use deno_node;

View file

@ -1967,6 +1967,18 @@ impl deno_ffi::FfiPermissions for PermissionsContainer {
}
}
impl deno_kv::sqlite::SqliteDbHandlerPermissions for PermissionsContainer {
#[inline(always)]
fn check_read(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError> {
self.0.lock().read.check(p, Some(api_name))
}
#[inline(always)]
fn check_write(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError> {
self.0.lock().write.check(p, Some(api_name))
}
}
fn unit_permission_from_flag_bool(
flag: bool,
name: &'static str,

View file

@ -34,6 +34,7 @@ use deno_core::SharedArrayBufferStore;
use deno_core::Snapshot;
use deno_core::SourceMapGetter;
use deno_io::Stdio;
use deno_kv::sqlite::SqliteDbHandler;
use deno_node::RequireNpmResolver;
use deno_tls::rustls::RootCertStore;
use deno_web::create_entangled_message_port;
@ -431,6 +432,10 @@ impl WebWorker {
options.unsafely_ignore_certificate_errors.clone(),
),
deno_tls::deno_tls::init_ops(),
deno_kv::deno_kv::init_ops(
SqliteDbHandler::<PermissionsContainer>::new(None),
unstable,
),
deno_napi::deno_napi::init_ops::<PermissionsContainer>(),
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Some(options.stdio)),

View file

@ -30,6 +30,7 @@ use deno_core::SharedArrayBufferStore;
use deno_core::Snapshot;
use deno_core::SourceMapGetter;
use deno_io::Stdio;
use deno_kv::sqlite::SqliteDbHandler;
use deno_node::RequireNpmResolver;
use deno_tls::rustls::RootCertStore;
use deno_web::BlobStore;
@ -253,6 +254,12 @@ impl MainWorker {
options.unsafely_ignore_certificate_errors.clone(),
),
deno_tls::deno_tls::init_ops(),
deno_kv::deno_kv::init_ops(
SqliteDbHandler::<PermissionsContainer>::new(
options.origin_storage_dir.clone(),
),
unstable,
),
deno_napi::deno_napi::init_ops::<PermissionsContainer>(),
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Some(options.stdio)),

View file

@ -13,6 +13,7 @@ use crate::magic::transl8::visit_magic;
use crate::magic::transl8::FromV8;
use crate::magic::transl8::MagicType;
use crate::payload::ValueType;
use crate::AnyValue;
use crate::BigInt;
use crate::ByteString;
use crate::DetachedBuffer;
@ -135,6 +136,7 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
self.deserialize_f64(visitor)
}
}
ValueType::BigInt => Err(Error::UnsupportedType),
ValueType::String => self.deserialize_string(visitor),
ValueType::Array => self.deserialize_seq(visitor),
ValueType::Object => self.deserialize_map(visitor),
@ -172,7 +174,6 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
{
self.deserialize_f64(visitor)
}
fn deserialize_f64<V>(self, visitor: V) -> Result<V::Value>
where
V: Visitor<'de>,
@ -355,6 +356,9 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
magic::Value::MAGIC_NAME => {
visit_magic(visitor, magic::Value::from_v8(self.scope, self.input)?)
}
AnyValue::MAGIC_NAME => {
visit_magic(visitor, AnyValue::from_v8(self.scope, self.input)?)
}
_ => {
// Regular struct
let obj = v8::Local::<v8::Object>::try_from(self.input)

View file

@ -28,6 +28,7 @@ pub enum Error {
ExpectedUtf8,
ExpectedLatin1,
UnsupportedType,
LengthMismatch,
}

View file

@ -15,6 +15,7 @@ pub use de::Deserializer;
pub use error::Error;
pub use error::Result;
pub use keys::KeyCache;
pub use magic::any_value::AnyValue;
pub use magic::bigint::BigInt;
pub use magic::buffer::ZeroCopyBuf;
pub use magic::bytestring::ByteString;

View file

@ -0,0 +1,66 @@
use num_bigint::BigInt;
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use super::buffer::ZeroCopyBuf;
use super::transl8::FromV8;
use super::transl8::ToV8;
use crate::magic::transl8::impl_magic;
use crate::Error;
/// An untagged enum type that can be any of number, string, bool, bigint, or
/// buffer.
#[derive(Debug)]
pub enum AnyValue {
Buffer(ZeroCopyBuf),
String(String),
Number(f64),
BigInt(BigInt),
Bool(bool),
}
impl_magic!(AnyValue);
impl ToV8 for AnyValue {
fn to_v8<'a>(
&mut self,
scope: &mut v8::HandleScope<'a>,
) -> Result<v8::Local<'a, v8::Value>, crate::Error> {
match self {
Self::Buffer(buf) => buf.to_v8(scope),
Self::String(s) => crate::to_v8(scope, s),
Self::Number(num) => crate::to_v8(scope, num),
Self::BigInt(bigint) => {
crate::to_v8(scope, crate::BigInt::from(bigint.clone()))
}
Self::Bool(b) => crate::to_v8(scope, b),
}
}
}
impl FromV8 for AnyValue {
fn from_v8(
scope: &mut v8::HandleScope,
value: v8::Local<v8::Value>,
) -> Result<Self, crate::Error> {
if value.is_string() {
let string = crate::from_v8(scope, value)?;
Ok(AnyValue::String(string))
} else if value.is_number() {
let string = crate::from_v8(scope, value)?;
Ok(AnyValue::Number(string))
} else if value.is_big_int() {
let bigint = crate::BigInt::from_v8(scope, value)?;
Ok(AnyValue::BigInt(bigint.into()))
} else if value.is_array_buffer_view() {
let buf = ZeroCopyBuf::from_v8(scope, value)?;
Ok(AnyValue::Buffer(buf))
} else if value.is_boolean() {
let string = crate::from_v8(scope, value)?;
Ok(AnyValue::Bool(string))
} else {
Err(Error::Message(
"expected string, number, bigint, ArrayBufferView, boolean".into(),
))
}
}
}

View file

@ -1,4 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
pub mod any_value;
pub mod bigint;
pub mod buffer;
pub mod bytestring;

View file

@ -9,6 +9,7 @@ pub enum ValueType {
Null,
Bool,
Number,
BigInt,
String,
Array,
ArrayBuffer,
@ -26,6 +27,8 @@ impl ValueType {
return Self::String;
} else if v.is_array() {
return Self::Array;
} else if v.is_big_int() {
return Self::BigInt;
} else if v.is_array_buffer() {
return Self::ArrayBuffer;
} else if v.is_array_buffer_view() {

View file

@ -14,6 +14,7 @@ use crate::magic::transl8::opaque_recv;
use crate::magic::transl8::MagicType;
use crate::magic::transl8::ToV8;
use crate::magic::transl8::MAGIC_FIELD;
use crate::AnyValue;
use crate::BigInt;
use crate::ByteString;
use crate::DetachedBuffer;
@ -274,6 +275,7 @@ pub enum StructSerializers<'a, 'b, 'c> {
ExternalPointer(MagicalSerializer<'a, 'b, 'c, magic::ExternalPointer>),
Magic(MagicalSerializer<'a, 'b, 'c, magic::Value<'a>>),
ZeroCopyBuf(MagicalSerializer<'a, 'b, 'c, ZeroCopyBuf>),
MagicAnyValue(MagicalSerializer<'a, 'b, 'c, AnyValue>),
MagicDetached(MagicalSerializer<'a, 'b, 'c, DetachedBuffer>),
MagicByteString(MagicalSerializer<'a, 'b, 'c, ByteString>),
MagicU16String(MagicalSerializer<'a, 'b, 'c, U16String>),
@ -295,6 +297,7 @@ impl<'a, 'b, 'c> ser::SerializeStruct for StructSerializers<'a, 'b, 'c> {
StructSerializers::ExternalPointer(s) => s.serialize_field(key, value),
StructSerializers::Magic(s) => s.serialize_field(key, value),
StructSerializers::ZeroCopyBuf(s) => s.serialize_field(key, value),
StructSerializers::MagicAnyValue(s) => s.serialize_field(key, value),
StructSerializers::MagicDetached(s) => s.serialize_field(key, value),
StructSerializers::MagicByteString(s) => s.serialize_field(key, value),
StructSerializers::MagicU16String(s) => s.serialize_field(key, value),
@ -311,6 +314,7 @@ impl<'a, 'b, 'c> ser::SerializeStruct for StructSerializers<'a, 'b, 'c> {
StructSerializers::ExternalPointer(s) => s.end(),
StructSerializers::Magic(s) => s.end(),
StructSerializers::ZeroCopyBuf(s) => s.end(),
StructSerializers::MagicAnyValue(s) => s.end(),
StructSerializers::MagicDetached(s) => s.end(),
StructSerializers::MagicByteString(s) => s.end(),
StructSerializers::MagicU16String(s) => s.end(),
@ -588,6 +592,10 @@ impl<'a, 'b, 'c> ser::Serializer for Serializer<'a, 'b, 'c> {
let m = MagicalSerializer::<ZeroCopyBuf>::new(self.scope);
Ok(StructSerializers::ZeroCopyBuf(m))
}
AnyValue::MAGIC_NAME => {
let m = MagicalSerializer::<AnyValue>::new(self.scope);
Ok(StructSerializers::MagicAnyValue(m))
}
DetachedBuffer::MAGIC_NAME => {
let m = MagicalSerializer::<DetachedBuffer>::new(self.scope);
Ok(StructSerializers::MagicDetached(m))