feat(unstable): add Deno.jupyter.broadcast API (#20656)

Closes https://github.com/denoland/deno/issues/20591

---------

Co-authored-by: Kyle Kelley <rgbkrk@gmail.com>
This commit is contained in:
Bartek Iwańczuk 2023-09-27 02:21:06 +02:00 committed by GitHub
parent d39659332c
commit 46a4bd5178
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 226 additions and 50 deletions

View file

@ -331,6 +331,7 @@ deno_core::extension!(
esm = [
dir "js",
"40_testing.js",
"40_jupyter.js",
"99_main.js"
],
customizer = |ext: &mut deno_core::Extension| {

16
cli/js/40_jupyter.js Normal file
View file

@ -0,0 +1,16 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
const core = globalThis.Deno.core;
const internals = globalThis.__bootstrap.internals;
import { denoNsUnstable } from "ext:runtime/90_deno_ns.js";
function enableJupyter() {
denoNsUnstable.jupyter = {
async broadcast(msgType, content) {
await core.opAsync("op_jupyter_broadcast", msgType, content);
},
};
}
internals.enableJupyter = enableJupyter;

View file

@ -1,3 +1,4 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import "ext:cli/40_testing.js";
import "ext:cli/40_jupyter.js";
import "ext:cli/runtime/js/99_main.js";

81
cli/ops/jupyter.rs Normal file
View file

@ -0,0 +1,81 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use crate::tools::jupyter::jupyter_msg::Connection;
use crate::tools::jupyter::jupyter_msg::JupyterMessage;
use crate::tools::jupyter::server::StdioMsg;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::serde_json;
use deno_core::Op;
use deno_core::OpState;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
deno_core::extension!(deno_jupyter,
ops = [
op_jupyter_broadcast,
],
options = {
sender: mpsc::UnboundedSender<StdioMsg>,
},
middleware = |op| match op.name {
"op_print" => op_print::DECL,
_ => op,
},
state = |state, options| {
state.put(options.sender);
},
);
#[op2(async)]
pub async fn op_jupyter_broadcast(
state: Rc<RefCell<OpState>>,
#[string] message_type: String,
#[serde] content: serde_json::Value,
) -> Result<(), AnyError> {
let (iopub_socket, last_execution_request) = {
let s = state.borrow();
(
s.borrow::<Arc<Mutex<Connection<zeromq::PubSocket>>>>()
.clone(),
s.borrow::<Rc<RefCell<Option<JupyterMessage>>>>().clone(),
)
};
let maybe_last_request = last_execution_request.borrow().clone();
if let Some(last_request) = maybe_last_request {
last_request
.new_message(&message_type)
.with_content(content)
.send(&mut *iopub_socket.lock().await)
.await?;
}
Ok(())
}
#[op2(fast)]
pub fn op_print(
state: &mut OpState,
#[string] msg: &str,
is_err: bool,
) -> Result<(), AnyError> {
let sender = state.borrow_mut::<mpsc::UnboundedSender<StdioMsg>>();
if is_err {
if let Err(err) = sender.send(StdioMsg::Stderr(msg.into())) {
eprintln!("Failed to send stderr message: {}", err);
}
return Ok(());
}
if let Err(err) = sender.send(StdioMsg::Stdout(msg.into())) {
eprintln!("Failed to send stdout message: {}", err);
}
Ok(())
}

View file

@ -9,6 +9,7 @@ use deno_core::Extension;
use deno_core::OpState;
pub mod bench;
pub mod jupyter;
pub mod testing;
pub fn cli_exts(npm_resolver: Arc<CliNpmResolver>) -> Vec<Extension> {

View file

@ -45,6 +45,7 @@ util::unit_test_factory!(
internals_test,
intl_test,
io_test,
jupyter_test,
kv_test,
kv_queue_test_no_db_close,
kv_queue_undelivered_test,

View file

@ -665,9 +665,58 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"id": "8e93df23-06eb-414b-98d4-51fbebb53d1f",
"metadata": {},
"outputs": [
{
"ename": "TypeError",
"evalue": "Cannot read properties of undefined (reading 'broadcast')",
"output_type": "error",
"traceback": [
"Stack trace:",
"TypeError: Cannot read properties of undefined (reading 'broadcast')",
" at <anonymous>:2:20"
]
}
],
"source": [
"await Deno.jupyter.broadcast(\"display_data\", {\n",
" data: { \"text/html\": \"<b>Processing.</b>\" },\n",
" metadata: {},\n",
" transient: { display_id: \"progress\" }\n",
"});\n",
"\n",
"await new Promise((resolve) => setTimeout(resolve, 500));\n",
"\n",
"await Deno.jupyter.broadcast(\"update_display_data\", {\n",
" data: { \"text/html\": \"<b>Processing..</b>\" },\n",
" metadata: {},\n",
" transient: { display_id: \"progress\" }\n",
"});\n",
"\n",
"await new Promise((resolve) => setTimeout(resolve, 500));\n",
"\n",
"await Deno.jupyter.broadcast(\"update_display_data\", {\n",
" data: { \"text/html\": \"<b>Processing...</b>\" },\n",
" metadata: {},\n",
" transient: { display_id: \"progress\" }\n",
"});\n",
"\n",
"await new Promise((resolve) => setTimeout(resolve, 500));\n",
"\n",
"await Deno.jupyter.broadcast(\"update_display_data\", {\n",
" data: { \"text/html\": \"<b>Complete ✅</b>\" },\n",
" metadata: {},\n",
" transient: { display_id: \"progress\" }\n",
"});"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0181f28e",
"metadata": {},
"outputs": [],
"source": []
}

View file

@ -0,0 +1,9 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { assertThrows } from "./test_util.ts";
Deno.test("Deno.jupyter is not available", () => {
assertThrows(
() => Deno.jupyter,
"Deno.jupyter is only available in `deno jupyter` subcommand.",
);
});

View file

@ -2,24 +2,23 @@
use crate::args::Flags;
use crate::args::JupyterFlags;
use crate::ops;
use crate::tools::repl;
use crate::util::logger;
use crate::CliFactory;
use deno_core::anyhow::Context;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::op2;
use deno_core::located_script_name;
use deno_core::resolve_url_or_path;
use deno_core::serde::Deserialize;
use deno_core::serde_json;
use deno_core::Op;
use deno_core::OpState;
use deno_runtime::permissions::Permissions;
use deno_runtime::permissions::PermissionsContainer;
use tokio::sync::mpsc;
mod install;
mod jupyter_msg;
mod server;
pub(crate) mod jupyter_msg;
pub(crate) mod server;
pub async fn kernel(
flags: Flags,
@ -59,7 +58,7 @@ pub async fn kernel(
let npm_resolver = factory.npm_resolver().await?.clone();
let resolver = factory.resolver().await?.clone();
let worker_factory = factory.create_cli_main_worker_factory().await?;
let (stdio_tx, stdio_rx) = mpsc::unbounded();
let (stdio_tx, stdio_rx) = mpsc::unbounded_channel();
let conn_file =
std::fs::read_to_string(&connection_filepath).with_context(|| {
@ -77,11 +76,15 @@ pub async fn kernel(
.create_custom_worker(
main_module.clone(),
permissions,
vec![deno_jupyter::init_ops(stdio_tx)],
vec![ops::jupyter::deno_jupyter::init_ops(stdio_tx)],
Default::default(),
)
.await?;
worker.setup_repl().await?;
worker.execute_script_static(
located_script_name!(),
"Deno[Deno.internal].enableJupyter();",
)?;
let worker = worker.into_main_worker();
let repl_session =
repl::ReplSession::initialize(cli_options, npm_resolver, resolver, worker)
@ -92,43 +95,6 @@ pub async fn kernel(
Ok(())
}
deno_core::extension!(deno_jupyter,
options = {
sender: mpsc::UnboundedSender<server::StdioMsg>,
},
middleware = |op| match op.name {
"op_print" => op_print::DECL,
_ => op,
},
state = |state, options| {
state.put(options.sender);
},
);
#[op2(fast)]
pub fn op_print(
state: &mut OpState,
#[string] msg: &str,
is_err: bool,
) -> Result<(), AnyError> {
let sender = state.borrow_mut::<mpsc::UnboundedSender<server::StdioMsg>>();
if is_err {
if let Err(err) =
sender.unbounded_send(server::StdioMsg::Stderr(msg.into()))
{
eprintln!("Failed to send stderr message: {}", err);
}
return Ok(());
}
if let Err(err) = sender.unbounded_send(server::StdioMsg::Stdout(msg.into()))
{
eprintln!("Failed to send stdout message: {}", err);
}
Ok(())
}
#[derive(Debug, Deserialize)]
pub struct ConnectionSpec {
ip: String,

View file

@ -12,12 +12,11 @@ use crate::tools::repl;
use crate::tools::repl::cdp;
use deno_core::error::AnyError;
use deno_core::futures;
use deno_core::futures::channel::mpsc;
use deno_core::futures::StreamExt;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use zeromq::SocketRecv;
use zeromq::SocketSend;
@ -44,7 +43,7 @@ impl JupyterServer {
pub async fn start(
spec: ConnectionSpec,
mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>,
repl_session: repl::ReplSession,
mut repl_session: repl::ReplSession,
) -> Result<(), AnyError> {
let mut heartbeat =
bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?;
@ -59,6 +58,14 @@ impl JupyterServer {
let iopub_socket = Arc::new(Mutex::new(iopub_socket));
let last_execution_request = Rc::new(RefCell::new(None));
// Store `iopub_socket` in the op state so it's accessible to the runtime API.
{
let op_state_rc = repl_session.worker.js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();
op_state.put(iopub_socket.clone());
op_state.put(last_execution_request.clone());
}
let cancel_handle = CancelHandle::new_rc();
let cancel_handle2 = CancelHandle::new_rc();
@ -90,7 +97,7 @@ impl JupyterServer {
});
let handle4 = deno_core::unsync::spawn(async move {
while let Some(stdio_msg) = stdio_rx.next().await {
while let Some(stdio_msg) = stdio_rx.recv().await {
Self::handle_stdio_msg(
iopub_socket.clone(),
last_execution_request.clone(),

View file

@ -1947,6 +1947,39 @@ declare namespace Deno {
*/
shutdown(): Promise<void>;
}
/**
* A namespace containing runtime APIs available in Jupyter notebooks.
*
* When accessed outside of Jupyter notebook context an error will be thrown.
*
* @category Jupyter */
export namespace jupyter {
/**
* Broadcast a message on IO pub channel.
*
* ```
* await Deno.jupyter.broadcast("display_data", {
* data: { "text/html": "<b>Processing.</b>" },
* metadata: {},
* transient: { display_id: "progress" }
* });
*
* await new Promise((resolve) => setTimeout(resolve, 500));
*
* await Deno.jupyter.broadcast("update_display_data", {
* data: { "text/html": "<b>Processing..</b>" },
* metadata: {},
* transient: { display_id: "progress" }
* });
* ```
*
* @category Jupyter */
export function broadcast(
msgType: string,
content: Record<string, unknown>,
): Promise<void>;
}
}
/** **UNSTABLE**: New API, yet to be vetted.

View file

@ -2,6 +2,7 @@
const core = globalThis.Deno.core;
const ops = core.ops;
import * as timers from "ext:deno_web/02_timers.js";
import * as httpClient from "ext:deno_fetch/22_http_client.js";
import * as console from "ext:deno_console/01_console.js";

View file

@ -542,6 +542,16 @@ function bootstrapMainRuntime(runtimeOptions) {
if (unstableFlag) {
ObjectAssign(finalDenoNs, denoNsUnstable);
// TODO(bartlomieju): this is not ideal, but because we use `ObjectAssign`
// above any properties that are defined elsewhere using `Object.defineProperty`
// are lost.
ObjectDefineProperty(finalDenoNs, "jupyter", {
get() {
throw new Error(
"Deno.jupyter is only available in `deno jupyter` subcommand.",
);
},
});
}
// Setup `Deno` global - we're actually overriding already existing global