[VM] Add Dart_InvokeVMServiceMethod() to dart_tools_api.h

This will allow any embedder to use a C++ API to talk to the vm-service.
Usecase is e.g. trigger a hot-reload without speaking websocket.

Change-Id: Ia54e5a73a0f0a4899e3e6d60694af0b394efb993
Reviewed-on: https://dart-review.googlesource.com/66384
Reviewed-by: Ryan Macnak <rmacnak@google.com>
This commit is contained in:
Martin Kustermann 2018-07-25 12:06:55 +00:00
parent 78462ecbdd
commit 33914226f2
8 changed files with 205 additions and 11 deletions

View file

@ -158,6 +158,29 @@ typedef void (*Dart_EmbedderInformationCallback)(
DART_EXPORT void Dart_SetEmbedderInformationCallback(
Dart_EmbedderInformationCallback callback);
/**
* Invoke a vm-service method and wait for its result.
*
* \param request_json The utf8-encoded json-rpc request.
* \param request_json_length The length of the json-rpc request.
*
* \param response_json The returned utf8-encoded json response, must be
* free()ed by caller.
* \param response_json_length The length of the returned json response.
* \param error An optional error, must be free()ed by caller.
*
* \return Whether the call was sucessfully performed.
*
* NOTE: This method does not need a current isolate and must not have the
* vm-isolate being the current isolate. It must be called after
* Dart_Initialize() and before Dart_Cleanup().
*/
DART_EXPORT bool Dart_InvokeVMServiceMethod(uint8_t* request_json,
intptr_t request_json_length,
uint8_t** response_json,
intptr_t* response_json_length,
char** error);
/*
* ========
* Event Streams

View file

@ -11,6 +11,7 @@
#include "vm/message.h"
#include "vm/native_message_handler.h"
#include "vm/port.h"
#include "vm/service_isolate.h"
namespace dart {
@ -95,6 +96,85 @@ DART_EXPORT bool Dart_CloseNativePort(Dart_Port native_port_id) {
return PortMap::ClosePort(native_port_id);
}
static Monitor* vm_service_calls_monitor = new Monitor();
DART_EXPORT bool Dart_InvokeVMServiceMethod(uint8_t* request_json,
intptr_t request_json_length,
uint8_t** response_json,
intptr_t* response_json_length,
char** error) {
Isolate* isolate = Isolate::Current();
ASSERT(isolate == nullptr || !isolate->is_service_isolate());
IsolateSaver saver(isolate);
// We only allow one isolate reload at a time. If this turns out to be on the
// critical path, we can change it to have a global datastructure which is
// mapping the reply ports to receive buffers.
MonitorLocker _(vm_service_calls_monitor);
static Monitor* vm_service_call_monitor = new Monitor();
static uint8_t* result_bytes = nullptr;
static intptr_t result_length = 0;
ASSERT(result_bytes == nullptr);
ASSERT(result_length == 0);
struct Utils {
static void HandleResponse(Dart_Port dest_port_id, Dart_CObject* message) {
MonitorLocker monitor(vm_service_call_monitor);
RELEASE_ASSERT(message->type == Dart_CObject_kTypedData);
RELEASE_ASSERT(message->value.as_typed_data.type ==
Dart_TypedData_kUint8);
result_length = message->value.as_typed_data.length;
result_bytes = reinterpret_cast<uint8_t*>(malloc(result_length));
memmove(result_bytes, message->value.as_typed_data.values, result_length);
monitor.Notify();
}
};
auto port = Dart_NewNativePort("service-rpc", &Utils::HandleResponse, false);
if (port == ILLEGAL_PORT) {
return Api::NewError("Was unable to create native port.");
}
// Before sending the message we'll lock the monitor, which the receiver
// will later on notify once the answer has been received.
MonitorLocker monitor(vm_service_call_monitor);
if (ServiceIsolate::SendServiceRpc(request_json, request_json_length, port)) {
// We posted successfully and expect the vm-service to send the reply, so
// we will wait for it now.
auto wait_result = monitor.Wait();
ASSERT(wait_result == Monitor::kNotified);
// The caller takes ownership of the data.
*response_json = result_bytes;
*response_json_length = result_length;
// Reset global data, which can be used by the next call (after the mutex
// has been released).
result_bytes = nullptr;
result_length = 0;
// After the data has been received, we will not get any more messages on
// this port and can safely close it now.
Dart_CloseNativePort(port);
return true;
} else {
// We couldn't post the message and will not receive any reply. Therefore we
// clean up the port and return an error.
Dart_CloseNativePort(port);
if (error != nullptr) {
*error = strdup("Was unable to post message to isolate.");
}
return false;
}
}
// --- Verification tools ---
DART_EXPORT Dart_Handle Dart_CompileAll() {

View file

@ -44,6 +44,8 @@ DEFINE_FLAG(bool,
#define VM_SERVICE_WEB_SERVER_CONTROL_MESSAGE_ID 3
#define VM_SERVICE_SERVER_INFO_MESSAGE_ID 4
#define VM_SERVICE_METHOD_CALL_FROM_NATIVE 5
static RawArray* MakeServiceControlMessage(Dart_Port port_id,
intptr_t code,
const String& name) {
@ -153,6 +155,39 @@ Dart_Port ServiceIsolate::LoadPort() {
return load_port_;
}
bool ServiceIsolate::SendServiceRpc(uint8_t* request_json,
intptr_t request_json_length,
Dart_Port reply_port) {
// Keep in sync with "sdk/lib/vmservice/vmservice.dart:_handleNativeRpcCall".
Dart_CObject opcode;
opcode.type = Dart_CObject_kInt32;
opcode.value.as_int32 = VM_SERVICE_METHOD_CALL_FROM_NATIVE;
Dart_CObject message;
message.type = Dart_CObject_kTypedData;
message.value.as_typed_data.type = Dart_TypedData_kUint8;
message.value.as_typed_data.length = request_json_length;
message.value.as_typed_data.values = request_json;
Dart_CObject send_port;
send_port.type = Dart_CObject_kSendPort;
send_port.value.as_send_port.id = reply_port;
send_port.value.as_send_port.origin_id = ILLEGAL_PORT;
Dart_CObject* request_array[] = {
&opcode,
&message,
&send_port,
};
Dart_CObject request;
request.type = Dart_CObject_kArray;
request.value.as_array.values = request_array;
request.value.as_array.length = ARRAY_SIZE(request_array);
return Dart_PostCObject(ServiceIsolate::Port(), &request);
}
bool ServiceIsolate::SendIsolateStartupMessage() {
if (!IsRunning()) {
return false;

View file

@ -29,6 +29,12 @@ class ServiceIsolate : public AllStatic {
static Dart_Port WaitForLoadPort();
static Dart_Port LoadPort();
// Returns `true` if the request was sucessfully sent. If it was, the
// [reply_port] will receive a Dart_TypedData_kUint8 response json.
static bool SendServiceRpc(uint8_t* request_json,
intptr_t request_json_length,
Dart_Port reply_port);
static void Run();
static bool SendIsolateStartupMessage();
static bool SendIsolateShutdownMessage();

View file

@ -11,4 +11,12 @@ class Constants {
static const int ISOLATE_SHUTDOWN_MESSAGE_ID = 2;
static const int WEB_SERVER_CONTROL_MESSAGE_ID = 3;
static const int SERVER_INFO_MESSAGE_ID = 4;
/// Signals an RPC coming from native code (instead of from a websocket
/// connection). These calls are limited to simple request-response and do
/// not allow arbitrary json-rpc messages.
///
/// The messages are an array of length 3:
/// (METHOD_CALL_FROM_NATIVE, String jsonRequest, PortId replyPort).
static const int METHOD_CALL_FROM_NATIVE = 5;
}

View file

@ -182,12 +182,8 @@ class Message {
..[5] = values;
if (!sendIsolateServiceMessage(sendPort, request)) {
receivePort.close();
_completer.complete(new Response.json({
'type': 'ServiceError',
'id': '',
'kind': 'InternalError',
'message': 'could not send message [${serial}] to isolate',
}));
_completer.complete(new Response.internalError(
'could not send message [${serial}] to isolate'));
}
return _completer.future;
}

View file

@ -42,6 +42,15 @@ class Response {
Response.json(Object value)
: this(ResponsePayloadKind.String, json.encode(value));
factory Response.internalError(String message) {
return new Response.json({
'type': 'ServiceError',
'id': '',
'kind': 'InternalError',
'message': message,
});
}
/// Construct response from the response [value] which can be either:
/// String: a string
/// Binary: a Uint8List

View file

@ -291,6 +291,37 @@ class VMService extends MessageRouter {
}
}
Future<Null> _handleNativeRpcCall(message, SendPort replyPort) async {
// Keep in sync with "runtime/vm/service_isolate.cc:InvokeServiceRpc".
Response response;
try {
final Message rpc = new Message.fromJsonRpc(
null, json.decode(utf8.decode(message as List<int>)));
if (rpc.type != MessageType.Request) {
response = new Response.internalError(
'The client sent a non-request json-rpc message.');
} else {
response = await routeRequest(this, rpc);
}
} catch (exception) {
response = new Response.internalError(
'The rpc call resulted in exception: $exception.');
}
List<int> bytes;
switch (response.kind) {
case ResponsePayloadKind.String:
bytes = utf8.encode(response.payload);
bytes = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes);
break;
case ResponsePayloadKind.Binary:
case ResponsePayloadKind.Utf8String:
bytes = response.payload as Uint8List;
break;
}
replyPort.send(bytes);
}
Future _exit() async {
// Stop the server.
if (VMServiceEmbedderHooks.serverStop != null) {
@ -329,11 +360,17 @@ class VMService extends MessageRouter {
return;
}
if (message.length == 3) {
// This is a message interacting with the web server.
assert((message[0] == Constants.WEB_SERVER_CONTROL_MESSAGE_ID) ||
(message[0] == Constants.SERVER_INFO_MESSAGE_ID));
_serverMessageHandler(message[0], message[1], message[2]);
return;
final opcode = message[0];
if (opcode == Constants.METHOD_CALL_FROM_NATIVE) {
_handleNativeRpcCall(message[1], message[2]);
return;
} else {
// This is a message interacting with the web server.
assert((opcode == Constants.WEB_SERVER_CONTROL_MESSAGE_ID) ||
(opcode == Constants.SERVER_INFO_MESSAGE_ID));
_serverMessageHandler(message[0], message[1], message[2]);
return;
}
}
if (message.length == 4) {
// This is a message informing us of the birth or death of an