From 46a4bd5178f5aed22041422c431b5ab6f697865d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 27 Sep 2023 02:21:06 +0200 Subject: [PATCH] feat(unstable): add `Deno.jupyter.broadcast` API (#20656) Closes https://github.com/denoland/deno/issues/20591 --------- Co-authored-by: Kyle Kelley --- cli/build.rs | 1 + cli/js/40_jupyter.js | 16 ++++ cli/js/99_main.js | 1 + cli/ops/jupyter.rs | 81 +++++++++++++++++++ cli/ops/mod.rs | 1 + cli/tests/integration/js_unit_tests.rs | 1 + .../testdata/jupyter/integration_test.ipynb | 51 +++++++++++- cli/tests/unit/jupyter_test.ts | 9 +++ cli/tools/jupyter/mod.rs | 56 +++---------- cli/tools/jupyter/server.rs | 15 +++- cli/tsc/dts/lib.deno.unstable.d.ts | 33 ++++++++ runtime/js/90_deno_ns.js | 1 + runtime/js/99_main.js | 10 +++ 13 files changed, 226 insertions(+), 50 deletions(-) create mode 100644 cli/js/40_jupyter.js create mode 100644 cli/ops/jupyter.rs create mode 100644 cli/tests/unit/jupyter_test.ts diff --git a/cli/build.rs b/cli/build.rs index b1377485b1..4435cd243d 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -331,6 +331,7 @@ deno_core::extension!( esm = [ dir "js", "40_testing.js", + "40_jupyter.js", "99_main.js" ], customizer = |ext: &mut deno_core::Extension| { diff --git a/cli/js/40_jupyter.js b/cli/js/40_jupyter.js new file mode 100644 index 0000000000..8cdf0789a9 --- /dev/null +++ b/cli/js/40_jupyter.js @@ -0,0 +1,16 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +const core = globalThis.Deno.core; +const internals = globalThis.__bootstrap.internals; + +import { denoNsUnstable } from "ext:runtime/90_deno_ns.js"; + +function enableJupyter() { + denoNsUnstable.jupyter = { + async broadcast(msgType, content) { + await core.opAsync("op_jupyter_broadcast", msgType, content); + }, + }; +} + +internals.enableJupyter = enableJupyter; diff --git a/cli/js/99_main.js b/cli/js/99_main.js index dc9d74fb06..37342f6cd6 100644 --- a/cli/js/99_main.js +++ b/cli/js/99_main.js @@ -1,3 +1,4 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. import "ext:cli/40_testing.js"; +import "ext:cli/40_jupyter.js"; import "ext:cli/runtime/js/99_main.js"; diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs new file mode 100644 index 0000000000..765b062e5c --- /dev/null +++ b/cli/ops/jupyter.rs @@ -0,0 +1,81 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::cell::RefCell; +use std::rc::Rc; +use std::sync::Arc; + +use crate::tools::jupyter::jupyter_msg::Connection; +use crate::tools::jupyter::jupyter_msg::JupyterMessage; +use crate::tools::jupyter::server::StdioMsg; +use deno_core::error::AnyError; +use deno_core::op2; +use deno_core::serde_json; +use deno_core::Op; +use deno_core::OpState; +use tokio::sync::mpsc; +use tokio::sync::Mutex; + +deno_core::extension!(deno_jupyter, + ops = [ + op_jupyter_broadcast, + ], + options = { + sender: mpsc::UnboundedSender, + }, + middleware = |op| match op.name { + "op_print" => op_print::DECL, + _ => op, + }, + state = |state, options| { + state.put(options.sender); + }, +); + +#[op2(async)] +pub async fn op_jupyter_broadcast( + state: Rc>, + #[string] message_type: String, + #[serde] content: serde_json::Value, +) -> Result<(), AnyError> { + let (iopub_socket, last_execution_request) = { + let s = state.borrow(); + + ( + s.borrow::>>>() + .clone(), + s.borrow::>>>().clone(), + ) + }; + + let maybe_last_request = last_execution_request.borrow().clone(); + if let Some(last_request) = maybe_last_request { + last_request + .new_message(&message_type) + .with_content(content) + .send(&mut *iopub_socket.lock().await) + .await?; + } + + Ok(()) +} + +#[op2(fast)] +pub fn op_print( + state: &mut OpState, + #[string] msg: &str, + is_err: bool, +) -> Result<(), AnyError> { + let sender = state.borrow_mut::>(); + + if is_err { + if let Err(err) = sender.send(StdioMsg::Stderr(msg.into())) { + eprintln!("Failed to send stderr message: {}", err); + } + return Ok(()); + } + + if let Err(err) = sender.send(StdioMsg::Stdout(msg.into())) { + eprintln!("Failed to send stdout message: {}", err); + } + Ok(()) +} diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index d4d8a84ba2..ad4e4bd4e6 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -9,6 +9,7 @@ use deno_core::Extension; use deno_core::OpState; pub mod bench; +pub mod jupyter; pub mod testing; pub fn cli_exts(npm_resolver: Arc) -> Vec { diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs index 4bb2ef73b8..a13db02d7a 100644 --- a/cli/tests/integration/js_unit_tests.rs +++ b/cli/tests/integration/js_unit_tests.rs @@ -45,6 +45,7 @@ util::unit_test_factory!( internals_test, intl_test, io_test, + jupyter_test, kv_test, kv_queue_test_no_db_close, kv_queue_undelivered_test, diff --git a/cli/tests/testdata/jupyter/integration_test.ipynb b/cli/tests/testdata/jupyter/integration_test.ipynb index e9347750d9..1d48c8fc9c 100644 --- a/cli/tests/testdata/jupyter/integration_test.ipynb +++ b/cli/tests/testdata/jupyter/integration_test.ipynb @@ -665,9 +665,58 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "8e93df23-06eb-414b-98d4-51fbebb53d1f", "metadata": {}, + "outputs": [ + { + "ename": "TypeError", + "evalue": "Cannot read properties of undefined (reading 'broadcast')", + "output_type": "error", + "traceback": [ + "Stack trace:", + "TypeError: Cannot read properties of undefined (reading 'broadcast')", + " at :2:20" + ] + } + ], + "source": [ + "await Deno.jupyter.broadcast(\"display_data\", {\n", + " data: { \"text/html\": \"Processing.\" },\n", + " metadata: {},\n", + " transient: { display_id: \"progress\" }\n", + "});\n", + "\n", + "await new Promise((resolve) => setTimeout(resolve, 500));\n", + "\n", + "await Deno.jupyter.broadcast(\"update_display_data\", {\n", + " data: { \"text/html\": \"Processing..\" },\n", + " metadata: {},\n", + " transient: { display_id: \"progress\" }\n", + "});\n", + "\n", + "await new Promise((resolve) => setTimeout(resolve, 500));\n", + "\n", + "await Deno.jupyter.broadcast(\"update_display_data\", {\n", + " data: { \"text/html\": \"Processing...\" },\n", + " metadata: {},\n", + " transient: { display_id: \"progress\" }\n", + "});\n", + "\n", + "await new Promise((resolve) => setTimeout(resolve, 500));\n", + "\n", + "await Deno.jupyter.broadcast(\"update_display_data\", {\n", + " data: { \"text/html\": \"Complete ✅\" },\n", + " metadata: {},\n", + " transient: { display_id: \"progress\" }\n", + "});" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0181f28e", + "metadata": {}, "outputs": [], "source": [] } diff --git a/cli/tests/unit/jupyter_test.ts b/cli/tests/unit/jupyter_test.ts new file mode 100644 index 0000000000..40eaf4623d --- /dev/null +++ b/cli/tests/unit/jupyter_test.ts @@ -0,0 +1,9 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { assertThrows } from "./test_util.ts"; + +Deno.test("Deno.jupyter is not available", () => { + assertThrows( + () => Deno.jupyter, + "Deno.jupyter is only available in `deno jupyter` subcommand.", + ); +}); diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs index 7d7104060e..fb0860e368 100644 --- a/cli/tools/jupyter/mod.rs +++ b/cli/tools/jupyter/mod.rs @@ -2,24 +2,23 @@ use crate::args::Flags; use crate::args::JupyterFlags; +use crate::ops; use crate::tools::repl; use crate::util::logger; use crate::CliFactory; use deno_core::anyhow::Context; use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; -use deno_core::op2; +use deno_core::located_script_name; use deno_core::resolve_url_or_path; use deno_core::serde::Deserialize; use deno_core::serde_json; -use deno_core::Op; -use deno_core::OpState; use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; +use tokio::sync::mpsc; mod install; -mod jupyter_msg; -mod server; +pub(crate) mod jupyter_msg; +pub(crate) mod server; pub async fn kernel( flags: Flags, @@ -59,7 +58,7 @@ pub async fn kernel( let npm_resolver = factory.npm_resolver().await?.clone(); let resolver = factory.resolver().await?.clone(); let worker_factory = factory.create_cli_main_worker_factory().await?; - let (stdio_tx, stdio_rx) = mpsc::unbounded(); + let (stdio_tx, stdio_rx) = mpsc::unbounded_channel(); let conn_file = std::fs::read_to_string(&connection_filepath).with_context(|| { @@ -77,11 +76,15 @@ pub async fn kernel( .create_custom_worker( main_module.clone(), permissions, - vec![deno_jupyter::init_ops(stdio_tx)], + vec![ops::jupyter::deno_jupyter::init_ops(stdio_tx)], Default::default(), ) .await?; worker.setup_repl().await?; + worker.execute_script_static( + located_script_name!(), + "Deno[Deno.internal].enableJupyter();", + )?; let worker = worker.into_main_worker(); let repl_session = repl::ReplSession::initialize(cli_options, npm_resolver, resolver, worker) @@ -92,43 +95,6 @@ pub async fn kernel( Ok(()) } -deno_core::extension!(deno_jupyter, - options = { - sender: mpsc::UnboundedSender, - }, - middleware = |op| match op.name { - "op_print" => op_print::DECL, - _ => op, - }, - state = |state, options| { - state.put(options.sender); - }, -); - -#[op2(fast)] -pub fn op_print( - state: &mut OpState, - #[string] msg: &str, - is_err: bool, -) -> Result<(), AnyError> { - let sender = state.borrow_mut::>(); - - if is_err { - if let Err(err) = - sender.unbounded_send(server::StdioMsg::Stderr(msg.into())) - { - eprintln!("Failed to send stderr message: {}", err); - } - return Ok(()); - } - - if let Err(err) = sender.unbounded_send(server::StdioMsg::Stdout(msg.into())) - { - eprintln!("Failed to send stdout message: {}", err); - } - Ok(()) -} - #[derive(Debug, Deserialize)] pub struct ConnectionSpec { ip: String, diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs index 2028c7d25f..c54dcd2756 100644 --- a/cli/tools/jupyter/server.rs +++ b/cli/tools/jupyter/server.rs @@ -12,12 +12,11 @@ use crate::tools::repl; use crate::tools::repl::cdp; use deno_core::error::AnyError; use deno_core::futures; -use deno_core::futures::channel::mpsc; -use deno_core::futures::StreamExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::CancelFuture; use deno_core::CancelHandle; +use tokio::sync::mpsc; use tokio::sync::Mutex; use zeromq::SocketRecv; use zeromq::SocketSend; @@ -44,7 +43,7 @@ impl JupyterServer { pub async fn start( spec: ConnectionSpec, mut stdio_rx: mpsc::UnboundedReceiver, - repl_session: repl::ReplSession, + mut repl_session: repl::ReplSession, ) -> Result<(), AnyError> { let mut heartbeat = bind_socket::(&spec, spec.hb_port).await?; @@ -59,6 +58,14 @@ impl JupyterServer { let iopub_socket = Arc::new(Mutex::new(iopub_socket)); let last_execution_request = Rc::new(RefCell::new(None)); + // Store `iopub_socket` in the op state so it's accessible to the runtime API. + { + let op_state_rc = repl_session.worker.js_runtime.op_state(); + let mut op_state = op_state_rc.borrow_mut(); + op_state.put(iopub_socket.clone()); + op_state.put(last_execution_request.clone()); + } + let cancel_handle = CancelHandle::new_rc(); let cancel_handle2 = CancelHandle::new_rc(); @@ -90,7 +97,7 @@ impl JupyterServer { }); let handle4 = deno_core::unsync::spawn(async move { - while let Some(stdio_msg) = stdio_rx.next().await { + while let Some(stdio_msg) = stdio_rx.recv().await { Self::handle_stdio_msg( iopub_socket.clone(), last_execution_request.clone(), diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index c8b857dc67..9f9a4914bb 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1947,6 +1947,39 @@ declare namespace Deno { */ shutdown(): Promise; } + + /** + * A namespace containing runtime APIs available in Jupyter notebooks. + * + * When accessed outside of Jupyter notebook context an error will be thrown. + * + * @category Jupyter */ + export namespace jupyter { + /** + * Broadcast a message on IO pub channel. + * + * ``` + * await Deno.jupyter.broadcast("display_data", { + * data: { "text/html": "Processing." }, + * metadata: {}, + * transient: { display_id: "progress" } + * }); + * + * await new Promise((resolve) => setTimeout(resolve, 500)); + * + * await Deno.jupyter.broadcast("update_display_data", { + * data: { "text/html": "Processing.." }, + * metadata: {}, + * transient: { display_id: "progress" } + * }); + * ``` + * + * @category Jupyter */ + export function broadcast( + msgType: string, + content: Record, + ): Promise; + } } /** **UNSTABLE**: New API, yet to be vetted. diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 00c6d6b475..5364a60eeb 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -2,6 +2,7 @@ const core = globalThis.Deno.core; const ops = core.ops; + import * as timers from "ext:deno_web/02_timers.js"; import * as httpClient from "ext:deno_fetch/22_http_client.js"; import * as console from "ext:deno_console/01_console.js"; diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 8f74e146e0..15e4936b13 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -542,6 +542,16 @@ function bootstrapMainRuntime(runtimeOptions) { if (unstableFlag) { ObjectAssign(finalDenoNs, denoNsUnstable); + // TODO(bartlomieju): this is not ideal, but because we use `ObjectAssign` + // above any properties that are defined elsewhere using `Object.defineProperty` + // are lost. + ObjectDefineProperty(finalDenoNs, "jupyter", { + get() { + throw new Error( + "Deno.jupyter is only available in `deno jupyter` subcommand.", + ); + }, + }); } // Setup `Deno` global - we're actually overriding already existing global