mirror of
https://github.com/denoland/deno
synced 2024-10-04 07:09:20 +00:00
refactor(jupyter): move communication methods out of data structs (#23622)
Moves the communication methods out of the data structs and onto the `Connection` struct.
This commit is contained in:
parent
f2216c90a7
commit
486437fee1
|
@ -50,12 +50,16 @@ pub async fn op_jupyter_broadcast(
|
||||||
|
|
||||||
let maybe_last_request = last_execution_request.borrow().clone();
|
let maybe_last_request = last_execution_request.borrow().clone();
|
||||||
if let Some(last_request) = maybe_last_request {
|
if let Some(last_request) = maybe_last_request {
|
||||||
last_request
|
(*iopub_socket.lock().await)
|
||||||
.new_message(&message_type)
|
.send(
|
||||||
.with_content(content)
|
&last_request
|
||||||
.with_metadata(metadata)
|
.new_message(&message_type)
|
||||||
.with_buffers(buffers.into_iter().map(|b| b.to_vec().into()).collect())
|
.with_content(content)
|
||||||
.send(&mut *iopub_socket.lock().await)
|
.with_metadata(metadata)
|
||||||
|
.with_buffers(
|
||||||
|
buffers.into_iter().map(|b| b.to_vec().into()).collect(),
|
||||||
|
),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,14 +16,14 @@ use uuid::Uuid;
|
||||||
|
|
||||||
use crate::util::time::utc_now;
|
use crate::util::time::utc_now;
|
||||||
|
|
||||||
pub(crate) struct Connection<S> {
|
pub struct Connection<S> {
|
||||||
pub(crate) socket: S,
|
socket: S,
|
||||||
/// Will be None if our key was empty (digest authentication disabled).
|
/// Will be None if our key was empty (digest authentication disabled).
|
||||||
pub(crate) mac: Option<hmac::Key>,
|
mac: Option<hmac::Key>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: zeromq::Socket> Connection<S> {
|
impl<S: zeromq::Socket> Connection<S> {
|
||||||
pub(crate) fn new(socket: S, key: &str) -> Self {
|
pub fn new(socket: S, key: &str) -> Self {
|
||||||
let mac = if key.is_empty() {
|
let mac = if key.is_empty() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
|
@ -33,21 +33,107 @@ impl<S: zeromq::Socket> Connection<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<S: zeromq::SocketSend + zeromq::SocketRecv> Connection<S> {
|
||||||
|
pub async fn single_heartbeat(&mut self) -> Result<(), AnyError> {
|
||||||
|
self.socket.recv().await?;
|
||||||
|
self
|
||||||
|
.socket
|
||||||
|
.send(zeromq::ZmqMessage::from(b"ping".to_vec()))
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: zeromq::SocketRecv> Connection<S> {
|
||||||
|
pub async fn read(&mut self) -> Result<JupyterMessage, AnyError> {
|
||||||
|
let multipart = self.socket.recv().await?;
|
||||||
|
let raw_message = RawMessage::from_multipart(multipart, self.mac.as_ref())?;
|
||||||
|
JupyterMessage::from_raw_message(raw_message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: zeromq::SocketSend> Connection<S> {
|
||||||
|
pub async fn send(
|
||||||
|
&mut self,
|
||||||
|
message: &JupyterMessage,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
// If performance is a concern, we can probably avoid the clone and to_vec calls with a bit
|
||||||
|
// of refactoring.
|
||||||
|
let mut jparts: Vec<Bytes> = vec![
|
||||||
|
serde_json::to_string(&message.header)
|
||||||
|
.unwrap()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec()
|
||||||
|
.into(),
|
||||||
|
serde_json::to_string(&message.parent_header)
|
||||||
|
.unwrap()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec()
|
||||||
|
.into(),
|
||||||
|
serde_json::to_string(&message.metadata)
|
||||||
|
.unwrap()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec()
|
||||||
|
.into(),
|
||||||
|
serde_json::to_string(&message.content)
|
||||||
|
.unwrap()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec()
|
||||||
|
.into(),
|
||||||
|
];
|
||||||
|
jparts.extend_from_slice(&message.buffers);
|
||||||
|
let raw_message = RawMessage {
|
||||||
|
zmq_identities: message.zmq_identities.clone(),
|
||||||
|
jparts,
|
||||||
|
};
|
||||||
|
self.send_raw(raw_message).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_raw(
|
||||||
|
&mut self,
|
||||||
|
raw_message: RawMessage,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
let hmac = if let Some(key) = &self.mac {
|
||||||
|
let ctx = digest(key, &raw_message.jparts);
|
||||||
|
let tag = ctx.sign();
|
||||||
|
HEXLOWER.encode(tag.as_ref())
|
||||||
|
} else {
|
||||||
|
String::new()
|
||||||
|
};
|
||||||
|
let mut parts: Vec<bytes::Bytes> = Vec::new();
|
||||||
|
for part in &raw_message.zmq_identities {
|
||||||
|
parts.push(part.to_vec().into());
|
||||||
|
}
|
||||||
|
parts.push(DELIMITER.into());
|
||||||
|
parts.push(hmac.as_bytes().to_vec().into());
|
||||||
|
for part in &raw_message.jparts {
|
||||||
|
parts.push(part.to_vec().into());
|
||||||
|
}
|
||||||
|
// ZmqMessage::try_from only fails if parts is empty, which it never
|
||||||
|
// will be here.
|
||||||
|
let message = zeromq::ZmqMessage::try_from(parts).unwrap();
|
||||||
|
self.socket.send(message).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn digest(mac: &hmac::Key, jparts: &[Bytes]) -> hmac::Context {
|
||||||
|
let mut hmac_ctx = hmac::Context::with_key(mac);
|
||||||
|
for part in jparts {
|
||||||
|
hmac_ctx.update(part);
|
||||||
|
}
|
||||||
|
hmac_ctx
|
||||||
|
}
|
||||||
|
|
||||||
struct RawMessage {
|
struct RawMessage {
|
||||||
zmq_identities: Vec<Bytes>,
|
zmq_identities: Vec<Bytes>,
|
||||||
jparts: Vec<Bytes>,
|
jparts: Vec<Bytes>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RawMessage {
|
impl RawMessage {
|
||||||
pub(crate) async fn read<S: zeromq::SocketRecv>(
|
pub fn from_multipart(
|
||||||
connection: &mut Connection<S>,
|
|
||||||
) -> Result<RawMessage, AnyError> {
|
|
||||||
Self::from_multipart(connection.socket.recv().await?, connection)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn from_multipart<S>(
|
|
||||||
multipart: zeromq::ZmqMessage,
|
multipart: zeromq::ZmqMessage,
|
||||||
connection: &Connection<S>,
|
mac: Option<&hmac::Key>,
|
||||||
) -> Result<RawMessage, AnyError> {
|
) -> Result<RawMessage, AnyError> {
|
||||||
let delimiter_index = multipart
|
let delimiter_index = multipart
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -65,7 +151,7 @@ impl RawMessage {
|
||||||
jparts,
|
jparts,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(key) = &connection.mac {
|
if let Some(key) = mac {
|
||||||
let sig = HEXLOWER.decode(&expected_hmac)?;
|
let sig = HEXLOWER.decode(&expected_hmac)?;
|
||||||
let mut msg = Vec::new();
|
let mut msg = Vec::new();
|
||||||
for part in &raw_message.jparts {
|
for part in &raw_message.jparts {
|
||||||
|
@ -79,45 +165,10 @@ impl RawMessage {
|
||||||
|
|
||||||
Ok(raw_message)
|
Ok(raw_message)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send<S: zeromq::SocketSend>(
|
|
||||||
self,
|
|
||||||
connection: &mut Connection<S>,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
let hmac = if let Some(key) = &connection.mac {
|
|
||||||
let ctx = self.digest(key);
|
|
||||||
let tag = ctx.sign();
|
|
||||||
HEXLOWER.encode(tag.as_ref())
|
|
||||||
} else {
|
|
||||||
String::new()
|
|
||||||
};
|
|
||||||
let mut parts: Vec<bytes::Bytes> = Vec::new();
|
|
||||||
for part in &self.zmq_identities {
|
|
||||||
parts.push(part.to_vec().into());
|
|
||||||
}
|
|
||||||
parts.push(DELIMITER.into());
|
|
||||||
parts.push(hmac.as_bytes().to_vec().into());
|
|
||||||
for part in &self.jparts {
|
|
||||||
parts.push(part.to_vec().into());
|
|
||||||
}
|
|
||||||
// ZmqMessage::try_from only fails if parts is empty, which it never
|
|
||||||
// will be here.
|
|
||||||
let message = zeromq::ZmqMessage::try_from(parts).unwrap();
|
|
||||||
connection.socket.send(message).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn digest(&self, mac: &hmac::Key) -> hmac::Context {
|
|
||||||
let mut hmac_ctx = hmac::Context::with_key(mac);
|
|
||||||
for part in &self.jparts {
|
|
||||||
hmac_ctx.update(part);
|
|
||||||
}
|
|
||||||
hmac_ctx
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct JupyterMessage {
|
pub struct JupyterMessage {
|
||||||
zmq_identities: Vec<Bytes>,
|
zmq_identities: Vec<Bytes>,
|
||||||
header: serde_json::Value,
|
header: serde_json::Value,
|
||||||
parent_header: serde_json::Value,
|
parent_header: serde_json::Value,
|
||||||
|
@ -129,12 +180,6 @@ pub(crate) struct JupyterMessage {
|
||||||
const DELIMITER: &[u8] = b"<IDS|MSG>";
|
const DELIMITER: &[u8] = b"<IDS|MSG>";
|
||||||
|
|
||||||
impl JupyterMessage {
|
impl JupyterMessage {
|
||||||
pub(crate) async fn read<S: zeromq::SocketRecv>(
|
|
||||||
connection: &mut Connection<S>,
|
|
||||||
) -> Result<JupyterMessage, AnyError> {
|
|
||||||
Self::from_raw_message(RawMessage::read(connection).await?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn from_raw_message(
|
fn from_raw_message(
|
||||||
raw_message: RawMessage,
|
raw_message: RawMessage,
|
||||||
) -> Result<JupyterMessage, AnyError> {
|
) -> Result<JupyterMessage, AnyError> {
|
||||||
|
@ -156,32 +201,32 @@ impl JupyterMessage {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn message_type(&self) -> &str {
|
pub fn message_type(&self) -> &str {
|
||||||
self.header["msg_type"].as_str().unwrap_or("")
|
self.header["msg_type"].as_str().unwrap_or("")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn store_history(&self) -> bool {
|
pub fn store_history(&self) -> bool {
|
||||||
self.content["store_history"].as_bool().unwrap_or(true)
|
self.content["store_history"].as_bool().unwrap_or(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn silent(&self) -> bool {
|
pub fn silent(&self) -> bool {
|
||||||
self.content["silent"].as_bool().unwrap_or(false)
|
self.content["silent"].as_bool().unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn code(&self) -> &str {
|
pub fn code(&self) -> &str {
|
||||||
self.content["code"].as_str().unwrap_or("")
|
self.content["code"].as_str().unwrap_or("")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn cursor_pos(&self) -> usize {
|
pub fn cursor_pos(&self) -> usize {
|
||||||
self.content["cursor_pos"].as_u64().unwrap_or(0) as usize
|
self.content["cursor_pos"].as_u64().unwrap_or(0) as usize
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn comm_id(&self) -> &str {
|
pub fn comm_id(&self) -> &str {
|
||||||
self.content["comm_id"].as_str().unwrap_or("")
|
self.content["comm_id"].as_str().unwrap_or("")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new child message of this message. ZMQ identities are not transferred.
|
// Creates a new child message of this message. ZMQ identities are not transferred.
|
||||||
pub(crate) fn new_message(&self, msg_type: &str) -> JupyterMessage {
|
pub fn new_message(&self, msg_type: &str) -> JupyterMessage {
|
||||||
let mut header = self.header.clone();
|
let mut header = self.header.clone();
|
||||||
header["msg_type"] = serde_json::Value::String(msg_type.to_owned());
|
header["msg_type"] = serde_json::Value::String(msg_type.to_owned());
|
||||||
header["username"] = serde_json::Value::String("kernel".to_owned());
|
header["username"] = serde_json::Value::String("kernel".to_owned());
|
||||||
|
@ -200,7 +245,7 @@ impl JupyterMessage {
|
||||||
|
|
||||||
// Creates a reply to this message. This is a child with the message type determined
|
// Creates a reply to this message. This is a child with the message type determined
|
||||||
// automatically by replacing "request" with "reply". ZMQ identities are transferred.
|
// automatically by replacing "request" with "reply". ZMQ identities are transferred.
|
||||||
pub(crate) fn new_reply(&self) -> JupyterMessage {
|
pub fn new_reply(&self) -> JupyterMessage {
|
||||||
let mut reply =
|
let mut reply =
|
||||||
self.new_message(&self.message_type().replace("_request", "_reply"));
|
self.new_message(&self.message_type().replace("_request", "_reply"));
|
||||||
reply.zmq_identities = self.zmq_identities.clone();
|
reply.zmq_identities = self.zmq_identities.clone();
|
||||||
|
@ -208,21 +253,18 @@ impl JupyterMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[must_use = "Need to send this message for it to have any effect"]
|
#[must_use = "Need to send this message for it to have any effect"]
|
||||||
pub(crate) fn comm_close_message(&self) -> JupyterMessage {
|
pub fn comm_close_message(&self) -> JupyterMessage {
|
||||||
self.new_message("comm_close").with_content(json!({
|
self.new_message("comm_close").with_content(json!({
|
||||||
"comm_id": self.comm_id()
|
"comm_id": self.comm_id()
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn with_content(
|
pub fn with_content(mut self, content: serde_json::Value) -> JupyterMessage {
|
||||||
mut self,
|
|
||||||
content: serde_json::Value,
|
|
||||||
) -> JupyterMessage {
|
|
||||||
self.content = content;
|
self.content = content;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn with_metadata(
|
pub fn with_metadata(
|
||||||
mut self,
|
mut self,
|
||||||
metadata: serde_json::Value,
|
metadata: serde_json::Value,
|
||||||
) -> JupyterMessage {
|
) -> JupyterMessage {
|
||||||
|
@ -230,46 +272,10 @@ impl JupyterMessage {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn with_buffers(mut self, buffers: Vec<Bytes>) -> JupyterMessage {
|
pub fn with_buffers(mut self, buffers: Vec<Bytes>) -> JupyterMessage {
|
||||||
self.buffers = buffers;
|
self.buffers = buffers;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn send<S: zeromq::SocketSend>(
|
|
||||||
&self,
|
|
||||||
connection: &mut Connection<S>,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
// If performance is a concern, we can probably avoid the clone and to_vec calls with a bit
|
|
||||||
// of refactoring.
|
|
||||||
let mut jparts: Vec<Bytes> = vec![
|
|
||||||
serde_json::to_string(&self.header)
|
|
||||||
.unwrap()
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec()
|
|
||||||
.into(),
|
|
||||||
serde_json::to_string(&self.parent_header)
|
|
||||||
.unwrap()
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec()
|
|
||||||
.into(),
|
|
||||||
serde_json::to_string(&self.metadata)
|
|
||||||
.unwrap()
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec()
|
|
||||||
.into(),
|
|
||||||
serde_json::to_string(&self.content)
|
|
||||||
.unwrap()
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec()
|
|
||||||
.into(),
|
|
||||||
];
|
|
||||||
jparts.extend_from_slice(&self.buffers);
|
|
||||||
let raw_message = RawMessage {
|
|
||||||
zmq_identities: self.zmq_identities.clone(),
|
|
||||||
jparts,
|
|
||||||
};
|
|
||||||
raw_message.send(connection).await
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for JupyterMessage {
|
impl fmt::Debug for JupyterMessage {
|
||||||
|
|
|
@ -28,8 +28,8 @@ use tokio::sync::mpsc;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
|
||||||
mod install;
|
mod install;
|
||||||
pub(crate) mod jupyter_msg;
|
pub mod jupyter_msg;
|
||||||
pub(crate) mod server;
|
pub mod server;
|
||||||
|
|
||||||
pub async fn kernel(
|
pub async fn kernel(
|
||||||
flags: Flags,
|
flags: Flags,
|
||||||
|
|
|
@ -18,8 +18,6 @@ use deno_core::CancelFuture;
|
||||||
use deno_core::CancelHandle;
|
use deno_core::CancelHandle;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use zeromq::SocketRecv;
|
|
||||||
use zeromq::SocketSend;
|
|
||||||
|
|
||||||
use super::jupyter_msg::Connection;
|
use super::jupyter_msg::Connection;
|
||||||
use super::jupyter_msg::JupyterMessage;
|
use super::jupyter_msg::JupyterMessage;
|
||||||
|
@ -67,7 +65,6 @@ impl JupyterServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
let cancel_handle = CancelHandle::new_rc();
|
let cancel_handle = CancelHandle::new_rc();
|
||||||
let cancel_handle2 = CancelHandle::new_rc();
|
|
||||||
|
|
||||||
let mut server = Self {
|
let mut server = Self {
|
||||||
execution_count: 0,
|
execution_count: 0,
|
||||||
|
@ -82,11 +79,14 @@ impl JupyterServer {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let handle2 = deno_core::unsync::spawn(async move {
|
let handle2 = deno_core::unsync::spawn({
|
||||||
if let Err(err) =
|
let cancel_handle = cancel_handle.clone();
|
||||||
Self::handle_control(control_socket, cancel_handle2).await
|
async move {
|
||||||
{
|
if let Err(err) =
|
||||||
eprintln!("Control error: {}", err);
|
Self::handle_control(control_socket, cancel_handle).await
|
||||||
|
{
|
||||||
|
eprintln!("Control error: {}", err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -129,13 +129,11 @@ impl JupyterServer {
|
||||||
StdioMsg::Stderr(text) => ("stderr", text),
|
StdioMsg::Stderr(text) => ("stderr", text),
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = exec_request
|
let result = (*iopub_socket.lock().await)
|
||||||
.new_message("stream")
|
.send(&exec_request.new_message("stream").with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"name": name,
|
"name": name,
|
||||||
"text": text
|
"text": text
|
||||||
}))
|
})))
|
||||||
.send(&mut *iopub_socket.lock().await)
|
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Err(err) = result {
|
if let Err(err) = result {
|
||||||
|
@ -148,11 +146,7 @@ impl JupyterServer {
|
||||||
connection: &mut Connection<zeromq::RepSocket>,
|
connection: &mut Connection<zeromq::RepSocket>,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
loop {
|
loop {
|
||||||
connection.socket.recv().await?;
|
connection.single_heartbeat().await?;
|
||||||
connection
|
|
||||||
.socket
|
|
||||||
.send(zeromq::ZmqMessage::from(b"ping".to_vec()))
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,13 +155,11 @@ impl JupyterServer {
|
||||||
cancel_handle: Rc<CancelHandle>,
|
cancel_handle: Rc<CancelHandle>,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
loop {
|
loop {
|
||||||
let msg = JupyterMessage::read(&mut connection).await?;
|
let msg = connection.read().await?;
|
||||||
match msg.message_type() {
|
match msg.message_type() {
|
||||||
"kernel_info_request" => {
|
"kernel_info_request" => {
|
||||||
msg
|
connection
|
||||||
.new_reply()
|
.send(&msg.new_reply().with_content(kernel_info()))
|
||||||
.with_content(kernel_info())
|
|
||||||
.send(&mut connection)
|
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
"shutdown_request" => {
|
"shutdown_request" => {
|
||||||
|
@ -191,7 +183,7 @@ impl JupyterServer {
|
||||||
mut connection: Connection<zeromq::RouterSocket>,
|
mut connection: Connection<zeromq::RouterSocket>,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
loop {
|
loop {
|
||||||
let msg = JupyterMessage::read(&mut connection).await?;
|
let msg = connection.read().await?;
|
||||||
self.handle_shell_message(msg, &mut connection).await?;
|
self.handle_shell_message(msg, &mut connection).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -201,25 +193,23 @@ impl JupyterServer {
|
||||||
msg: JupyterMessage,
|
msg: JupyterMessage,
|
||||||
connection: &mut Connection<zeromq::RouterSocket>,
|
connection: &mut Connection<zeromq::RouterSocket>,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
msg
|
self
|
||||||
.new_message("status")
|
.send_iopub(
|
||||||
.with_content(json!({"execution_state": "busy"}))
|
&msg
|
||||||
.send(&mut *self.iopub_socket.lock().await)
|
.new_message("status")
|
||||||
|
.with_content(json!({"execution_state": "busy"})),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
match msg.message_type() {
|
match msg.message_type() {
|
||||||
"kernel_info_request" => {
|
"kernel_info_request" => {
|
||||||
msg
|
connection
|
||||||
.new_reply()
|
.send(&msg.new_reply().with_content(kernel_info()))
|
||||||
.with_content(kernel_info())
|
|
||||||
.send(connection)
|
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
"is_complete_request" => {
|
"is_complete_request" => {
|
||||||
msg
|
connection
|
||||||
.new_reply()
|
.send(&msg.new_reply().with_content(json!({"status": "complete"})))
|
||||||
.with_content(json!({"status": "complete"}))
|
|
||||||
.send(connection)
|
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
"execute_request" => {
|
"execute_request" => {
|
||||||
|
@ -228,10 +218,7 @@ impl JupyterServer {
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
"comm_open" => {
|
"comm_open" => {
|
||||||
msg
|
self.send_iopub(&msg.comm_close_message()).await?;
|
||||||
.comm_close_message()
|
|
||||||
.send(&mut *self.iopub_socket.lock().await)
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
"complete_request" => {
|
"complete_request" => {
|
||||||
let user_code = msg.code();
|
let user_code = msg.code();
|
||||||
|
@ -259,16 +246,14 @@ impl JupyterServer {
|
||||||
.map(|item| item.range.end)
|
.map(|item| item.range.end)
|
||||||
.unwrap_or(cursor_pos);
|
.unwrap_or(cursor_pos);
|
||||||
|
|
||||||
msg
|
connection
|
||||||
.new_reply()
|
.send(&msg.new_reply().with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"status": "ok",
|
"status": "ok",
|
||||||
"matches": matches,
|
"matches": matches,
|
||||||
"cursor_start": cursor_start,
|
"cursor_start": cursor_start,
|
||||||
"cursor_end": cursor_end,
|
"cursor_end": cursor_end,
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
}))
|
})))
|
||||||
.send(connection)
|
|
||||||
.await?;
|
.await?;
|
||||||
} else {
|
} else {
|
||||||
let expr = get_expr_from_line_at_pos(user_code, cursor_pos);
|
let expr = get_expr_from_line_at_pos(user_code, cursor_pos);
|
||||||
|
@ -307,16 +292,14 @@ impl JupyterServer {
|
||||||
|
|
||||||
(candidates, cursor_pos - expr.len())
|
(candidates, cursor_pos - expr.len())
|
||||||
};
|
};
|
||||||
msg
|
connection
|
||||||
.new_reply()
|
.send(&msg.new_reply().with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"status": "ok",
|
"status": "ok",
|
||||||
"matches": completions,
|
"matches": completions,
|
||||||
"cursor_start": cursor_start,
|
"cursor_start": cursor_start,
|
||||||
"cursor_end": cursor_pos,
|
"cursor_end": cursor_pos,
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
}))
|
})))
|
||||||
.send(connection)
|
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -328,10 +311,12 @@ impl JupyterServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
msg
|
self
|
||||||
.new_message("status")
|
.send_iopub(
|
||||||
.with_content(json!({"execution_state": "idle"}))
|
&msg
|
||||||
.send(&mut *self.iopub_socket.lock().await)
|
.new_message("status")
|
||||||
|
.with_content(json!({"execution_state": "idle"})),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -346,13 +331,11 @@ impl JupyterServer {
|
||||||
}
|
}
|
||||||
*self.last_execution_request.borrow_mut() = Some(msg.clone());
|
*self.last_execution_request.borrow_mut() = Some(msg.clone());
|
||||||
|
|
||||||
msg
|
self
|
||||||
.new_message("execute_input")
|
.send_iopub(&msg.new_message("execute_input").with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"execution_count": self.execution_count,
|
"execution_count": self.execution_count,
|
||||||
"code": msg.code()
|
"code": msg.code()
|
||||||
}))
|
})))
|
||||||
.send(&mut *self.iopub_socket.lock().await)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let result = self
|
let result = self
|
||||||
|
@ -363,22 +346,18 @@ impl JupyterServer {
|
||||||
let evaluate_response = match result {
|
let evaluate_response = match result {
|
||||||
Ok(eval_response) => eval_response,
|
Ok(eval_response) => eval_response,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
msg
|
self
|
||||||
.new_message("error")
|
.send_iopub(&msg.new_message("error").with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"ename": err.to_string(),
|
"ename": err.to_string(),
|
||||||
"evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error
|
"evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error
|
||||||
"traceback": [],
|
"traceback": [],
|
||||||
}))
|
})))
|
||||||
.send(&mut *self.iopub_socket.lock().await)
|
|
||||||
.await?;
|
.await?;
|
||||||
msg
|
connection
|
||||||
.new_reply()
|
.send(&msg.new_reply().with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"status": "error",
|
"status": "error",
|
||||||
"execution_count": self.execution_count,
|
"execution_count": self.execution_count,
|
||||||
}))
|
})))
|
||||||
.send(connection)
|
|
||||||
.await?;
|
.await?;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -393,14 +372,12 @@ impl JupyterServer {
|
||||||
publish_result(&mut self.repl_session, &result, self.execution_count)
|
publish_result(&mut self.repl_session, &result, self.execution_count)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
msg
|
connection
|
||||||
.new_reply()
|
.send(&msg.new_reply().with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"status": "ok",
|
"status": "ok",
|
||||||
"execution_count": self.execution_count,
|
"execution_count": self.execution_count,
|
||||||
// FIXME: also include user_expressions
|
// FIXME: also include user_expressions
|
||||||
}))
|
})))
|
||||||
.send(connection)
|
|
||||||
.await?;
|
.await?;
|
||||||
// Let's sleep here for a few ms, so we give a chance to the task that is
|
// Let's sleep here for a few ms, so we give a chance to the task that is
|
||||||
// handling stdout and stderr streams to receive and flush the content.
|
// handling stdout and stderr streams to receive and flush the content.
|
||||||
|
@ -479,27 +456,30 @@ impl JupyterServer {
|
||||||
message
|
message
|
||||||
};
|
};
|
||||||
|
|
||||||
msg
|
self
|
||||||
.new_message("error")
|
.send_iopub(&msg.new_message("error").with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"ename": ename,
|
"ename": ename,
|
||||||
"evalue": evalue,
|
"evalue": evalue,
|
||||||
"traceback": traceback,
|
"traceback": traceback,
|
||||||
}))
|
})))
|
||||||
.send(&mut *self.iopub_socket.lock().await)
|
|
||||||
.await?;
|
.await?;
|
||||||
msg
|
connection
|
||||||
.new_reply()
|
.send(&msg.new_reply().with_content(json!({
|
||||||
.with_content(json!({
|
|
||||||
"status": "error",
|
"status": "error",
|
||||||
"execution_count": self.execution_count,
|
"execution_count": self.execution_count,
|
||||||
}))
|
})))
|
||||||
.send(connection)
|
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_iopub(
|
||||||
|
&mut self,
|
||||||
|
message: &JupyterMessage,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
self.iopub_socket.lock().await.send(message).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn bind_socket<S: zeromq::Socket>(
|
async fn bind_socket<S: zeromq::Socket>(
|
||||||
|
|
Loading…
Reference in a new issue