[vm/concurrency] Allow closures as entrypoints in Isolate.spawn calls

We already allow sending closures (if immutable via sharing
otherwise via copying). This CL makes use of this to allow the argument
to `Isolate.spawn()` to be any closure.

Similar to normal message sending, the spawn will fail if the closure
cannot be sent (or causes an error on the new isolate, e.g. rehashing
error).

Issue https://github.com/dart-lang/sdk/issues/46623

TEST=vm/dart{_2,}/isolates/closure_entrypoint_test

Change-Id: Iab342267d87bd87bc8c0c82d16aec58a69a3df44
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/212295
Reviewed-by: Alexander Aprelev <aam@google.com>
Commit-Queue: Martin Kustermann <kustermann@google.com>
This commit is contained in:
Martin Kustermann 2021-09-03 19:08:37 +00:00 committed by commit-bot@chromium.org
parent 21cbb2ae3a
commit 5f9ec9f52a
7 changed files with 397 additions and 103 deletions

View file

@ -21,6 +21,7 @@
#include "vm/message_handler.h"
#include "vm/message_snapshot.h"
#include "vm/object.h"
#include "vm/object_graph_copy.h"
#include "vm/object_store.h"
#include "vm/port.h"
#include "vm/resolver.h"
@ -319,6 +320,7 @@ class IsolateSpawnState {
Dart_Port origin_id,
const char* script_url,
const Function& func,
PersistentHandle* closure_tuple_handle,
SerializedObjectBuffer* message_buffer,
const char* package_config,
bool paused,
@ -353,10 +355,16 @@ class IsolateSpawnState {
const char* class_name() const { return class_name_; }
const char* function_name() const { return function_name_; }
const char* debug_name() const { return debug_name_; }
bool is_spawn_uri() const { return library_url_ == nullptr; }
bool is_spawn_uri() const {
return library_url_ == nullptr && // No top-level entrypoint.
closure_tuple_handle_ == nullptr; // No closure entrypoint.
}
bool paused() const { return paused_; }
bool errors_are_fatal() const { return errors_are_fatal_; }
Dart_IsolateFlags* isolate_flags() { return &isolate_flags_; }
PersistentHandle* closure_tuple_handle() const {
return closure_tuple_handle_;
}
ObjectPtr ResolveFunction();
ObjectPtr BuildArgs(Thread* thread);
@ -376,6 +384,7 @@ class IsolateSpawnState {
const char* class_name_ = nullptr;
const char* function_name_ = nullptr;
const char* debug_name_;
PersistentHandle* closure_tuple_handle_ = nullptr;
IsolateGroup* isolate_group_;
std::unique_ptr<Message> serialized_args_;
std::unique_ptr<Message> serialized_message_;
@ -396,6 +405,7 @@ IsolateSpawnState::IsolateSpawnState(Dart_Port parent_port,
Dart_Port origin_id,
const char* script_url,
const Function& func,
PersistentHandle* closure_tuple_handle,
SerializedObjectBuffer* message_buffer,
const char* package_config,
bool paused,
@ -411,25 +421,34 @@ IsolateSpawnState::IsolateSpawnState(Dart_Port parent_port,
script_url_(script_url),
package_config_(package_config),
debug_name_(debug_name),
closure_tuple_handle_(closure_tuple_handle),
isolate_group_(isolate_group),
serialized_args_(nullptr),
serialized_message_(message_buffer->StealMessage()),
paused_(paused),
errors_are_fatal_(errors_are_fatal) {
// Either we have a top-level function or we have a closure.
ASSERT((closure_tuple_handle_ != nullptr) == func.IsNull());
auto thread = Thread::Current();
auto isolate = thread->isolate();
auto zone = thread->zone();
const auto& cls = Class::Handle(zone, func.Owner());
const auto& lib = Library::Handle(zone, cls.library());
const auto& lib_url = String::Handle(zone, lib.url());
library_url_ = NewConstChar(lib_url.ToCString());
String& func_name = String::Handle(zone);
func_name = func.name();
function_name_ = NewConstChar(String::ScrubName(func_name));
if (!cls.IsTopLevel()) {
const auto& class_name = String::Handle(zone, cls.Name());
class_name_ = NewConstChar(class_name.ToCString());
if (!func.IsNull()) {
auto zone = thread->zone();
const auto& cls = Class::Handle(zone, func.Owner());
const auto& lib = Library::Handle(zone, cls.library());
const auto& lib_url = String::Handle(zone, lib.url());
library_url_ = NewConstChar(lib_url.ToCString());
String& func_name = String::Handle(zone);
func_name = func.name();
function_name_ = NewConstChar(String::ScrubName(func_name));
if (!cls.IsTopLevel()) {
const auto& class_name = String::Handle(zone, cls.Name());
class_name_ = NewConstChar(class_name.ToCString());
}
} else {
ASSERT(closure_tuple_handle != nullptr);
}
// Inherit flags from spawning isolate.
@ -446,14 +465,14 @@ IsolateSpawnState::IsolateSpawnState(Dart_Port parent_port,
Dart_Port on_exit_port,
Dart_Port on_error_port,
const char* debug_name,
IsolateGroup* group)
IsolateGroup* isolate_group)
: parent_port_(parent_port),
on_exit_port_(on_exit_port),
on_error_port_(on_error_port),
script_url_(script_url),
package_config_(package_config),
debug_name_(debug_name),
isolate_group_(group),
isolate_group_(isolate_group),
serialized_args_(args_buffer->StealMessage()),
serialized_message_(message_buffer->StealMessage()),
isolate_flags_(),
@ -734,20 +753,32 @@ class SpawnIsolateTask : public ThreadPool::Task {
bool EnqueueEntrypointInvocationAndNotifySpawner(Thread* thread) {
auto isolate = thread->isolate();
auto zone = thread->zone();
const bool is_spawn_uri = state_->is_spawn_uri();
// Step 1) Resolve the entrypoint function.
auto& result = Object::Handle(zone, state_->ResolveFunction());
const bool is_spawn_uri = state_->is_spawn_uri();
if (result.IsError()) {
ASSERT(is_spawn_uri);
ReportError("Failed to resolve entrypoint function.");
return false;
auto& entrypoint_closure = Closure::Handle(zone);
if (state_->closure_tuple_handle() != nullptr) {
const auto& result = Object::Handle(
zone,
ReadObjectGraphCopyMessage(thread, state_->closure_tuple_handle()));
if (result.IsError()) {
ReportError(
"Failed to deserialize the passed entrypoint to the new isolate.");
return false;
}
entrypoint_closure = Closure::RawCast(result.ptr());
} else {
const auto& result = Object::Handle(zone, state_->ResolveFunction());
if (result.IsError()) {
ASSERT(is_spawn_uri);
ReportError("Failed to resolve entrypoint function.");
return false;
}
ASSERT(result.IsFunction());
auto& func = Function::Handle(zone, Function::Cast(result).ptr());
func = func.ImplicitClosureFunction();
entrypoint_closure = func.ImplicitStaticClosure();
}
ASSERT(result.IsFunction());
auto& func = Function::Handle(zone, Function::Cast(result).ptr());
func = func.ImplicitClosureFunction();
const auto& entrypoint_closure =
Object::Handle(zone, func.ImplicitStaticClosure());
// Step 2) Enqueue delayed invocation of entrypoint callback.
const auto& args_obj = Object::Handle(zone, state_->BuildArgs(thread));
@ -776,7 +807,8 @@ class SpawnIsolateTask : public ThreadPool::Task {
const auto& entry_point =
Function::Handle(zone, lib.LookupLocalFunction(entry_name));
ASSERT(entry_point.IsFunction() && !entry_point.IsNull());
result = DartEntry::InvokeFunction(entry_point, args);
const auto& result =
Object::Handle(zone, DartEntry::InvokeFunction(entry_point, args));
if (result.IsError()) {
ReportError("Failed to enqueue delayed entrypoint invocation.");
return false;
@ -858,10 +890,23 @@ static const char* String2UTF8(const String& str) {
return result;
}
static FunctionPtr GetTopLevelFunction(Zone* zone, const Instance& closure) {
if (closure.IsClosure()) {
auto& func = Function::Handle(zone);
func = Closure::Cast(closure).function();
if (func.IsImplicitClosureFunction() && func.is_static()) {
ASSERT(Closure::Cast(closure).context() == Context::null());
// Get the parent function so that we get the right function name.
return func.parent_function();
}
}
return Function::null();
}
DEFINE_NATIVE_ENTRY(Isolate_spawnFunction, 0, 10) {
GET_NON_NULL_NATIVE_ARGUMENT(SendPort, port, arguments->NativeArgAt(0));
GET_NON_NULL_NATIVE_ARGUMENT(String, script_uri, arguments->NativeArgAt(1));
GET_NON_NULL_NATIVE_ARGUMENT(Instance, closure, arguments->NativeArgAt(2));
GET_NON_NULL_NATIVE_ARGUMENT(Closure, closure, arguments->NativeArgAt(2));
GET_NON_NULL_NATIVE_ARGUMENT(Instance, message, arguments->NativeArgAt(3));
GET_NON_NULL_NATIVE_ARGUMENT(Bool, paused, arguments->NativeArgAt(4));
GET_NATIVE_ARGUMENT(Bool, fatalErrors, arguments->NativeArgAt(5));
@ -870,51 +915,62 @@ DEFINE_NATIVE_ENTRY(Isolate_spawnFunction, 0, 10) {
GET_NATIVE_ARGUMENT(String, packageConfig, arguments->NativeArgAt(8));
GET_NATIVE_ARGUMENT(String, debugName, arguments->NativeArgAt(9));
if (closure.IsClosure()) {
Function& func = Function::Handle();
func = Closure::Cast(closure).function();
if (func.IsImplicitClosureFunction() && func.is_static()) {
#if defined(DEBUG)
Context& ctx = Context::Handle();
ctx = Closure::Cast(closure).context();
ASSERT(ctx.IsNull());
#endif
// Get the parent function so that we get the right function name.
func = func.parent_function();
bool fatal_errors = fatalErrors.IsNull() ? true : fatalErrors.value();
Dart_Port on_exit_port = onExit.IsNull() ? ILLEGAL_PORT : onExit.Id();
Dart_Port on_error_port = onError.IsNull() ? ILLEGAL_PORT : onError.Id();
// We first try to serialize the message. In case the message is not
// serializable this will throw an exception.
SerializedObjectBuffer message_buffer;
message_buffer.set_message(WriteMessage(
/* can_send_any_object */ true,
/* same_group */ FLAG_enable_isolate_groups, message, ILLEGAL_PORT,
Message::kNormalPriority));
const char* utf8_package_config =
packageConfig.IsNull() ? NULL : String2UTF8(packageConfig);
const char* utf8_debug_name =
debugName.IsNull() ? NULL : String2UTF8(debugName);
std::unique_ptr<IsolateSpawnState> state(new IsolateSpawnState(
port.Id(), isolate->origin_id(), String2UTF8(script_uri), func,
&message_buffer, utf8_package_config, paused.value(), fatal_errors,
on_exit_port, on_error_port, utf8_debug_name, isolate->group()));
// Since this is a call to Isolate.spawn, copy the parent isolate's code.
state->isolate_flags()->copy_parent_code = true;
isolate->group()->thread_pool()->Run<SpawnIsolateTask>(isolate,
std::move(state));
return Object::null();
const auto& func = Function::Handle(zone, GetTopLevelFunction(zone, closure));
PersistentHandle* closure_tuple_handle = nullptr;
if (func.IsNull()) {
if (!FLAG_enable_isolate_groups) {
const String& msg = String::Handle(String::New(
"Isolate.spawn expects to be passed a static or top-level function"));
Exceptions::ThrowArgumentError(msg);
} else {
// We have a non-toplevel closure that we might need to copy.
// Result will be [<closure-copy>, <objects-in-msg-to-rehash>]
const auto& closure_copy_tuple = Object::Handle(
zone, CopyMutableObjectGraph(closure)); // Throws if it fails.
ASSERT(closure_copy_tuple.IsArray());
ASSERT(Object::Handle(zone, Array::Cast(closure_copy_tuple).At(0))
.IsClosure());
closure_tuple_handle =
isolate->group()->api_state()->AllocatePersistentHandle();
closure_tuple_handle->set_ptr(closure_copy_tuple.ptr());
}
}
const String& msg = String::Handle(String::New(
"Isolate.spawn expects to be passed a static or top-level function"));
Exceptions::ThrowArgumentError(msg);
bool fatal_errors = fatalErrors.IsNull() ? true : fatalErrors.value();
Dart_Port on_exit_port = onExit.IsNull() ? ILLEGAL_PORT : onExit.Id();
Dart_Port on_error_port = onError.IsNull() ? ILLEGAL_PORT : onError.Id();
// We first try to serialize the message. In case the message is not
// serializable this will throw an exception.
SerializedObjectBuffer message_buffer;
message_buffer.set_message(WriteMessage(
/* can_send_any_object */ true,
/* same_group */ FLAG_enable_isolate_groups, message, ILLEGAL_PORT,
Message::kNormalPriority));
const char* utf8_package_config =
packageConfig.IsNull() ? NULL : String2UTF8(packageConfig);
const char* utf8_debug_name =
debugName.IsNull() ? NULL : String2UTF8(debugName);
if (closure_tuple_handle != nullptr && utf8_debug_name == nullptr) {
ASSERT(func.IsNull());
const auto& closure_function = Function::Handle(zone, closure.function());
utf8_debug_name =
NewConstChar(closure_function.QualifiedUserVisibleNameCString());
}
std::unique_ptr<IsolateSpawnState> state(new IsolateSpawnState(
port.Id(), isolate->origin_id(), String2UTF8(script_uri), func,
closure_tuple_handle, &message_buffer, utf8_package_config,
paused.value(), fatal_errors, on_exit_port, on_error_port,
utf8_debug_name, isolate->group()));
// Since this is a call to Isolate.spawn, copy the parent isolate's code.
state->isolate_flags()->copy_parent_code = true;
isolate->group()->thread_pool()->Run<SpawnIsolateTask>(isolate,
std::move(state));
return Object::null();
}

View file

@ -0,0 +1,111 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// VMOptions=--enable-isolate-groups --no-enable-fast-object-copy
// VMOptions=--enable-isolate-groups --enable-fast-object-copy
// VMOptions=--enable-isolate-groups --no-enable-fast-object-copy --gc-on-foc-slow-path --force-evacuation
// VMOptions=--enable-isolate-groups --enable-fast-object-copy --gc-on-foc-slow-path --force-evacuation
// The tests in this file will only succeed when isolate groups are enabled
// (hence the VMOptions above).
import 'dart:async';
import 'dart:isolate';
import 'package:expect/expect.dart';
import 'fast_object_copy_test.dart' show ClassWithNativeFields;
class HashThrower {
static bool throwOnHashCode = true;
const HashThrower();
int get hashCode => throwOnHashCode ? throw 'failing' : 2;
bool operator ==(other) => identical(this, other);
}
Future testWithClosure<T>(
void Function(SendPort) entrypoint, T expectedResult) async {
final rp = ReceivePort();
try {
await Isolate.spawn(entrypoint, rp.sendPort);
Expect.equals(expectedResult, await rp.first);
} finally {
rp.close();
}
}
class ClosureEntrypointTester {
int instanceValue = 42;
void send42(SendPort sendPort) => sendPort.send(42);
Future run() async {
await noCapturedVariablesTest();
await capturedInt();
await capturedInstanceInt();
await captureThisViaMethodTearOff();
await captureInvalidObject();
await captureRehashThrower();
}
Future noCapturedVariablesTest() async {
print('noCapturedVariablesTest');
await testWithClosure((SendPort s) => s.send(42), 42);
}
Future capturedInt() async {
print('capturedInt');
int value = 42;
await testWithClosure((SendPort s) => s.send(value), 42);
}
Future capturedInstanceInt() async {
print('capturedInstanceValue');
await testWithClosure((SendPort s) => s.send(this.instanceValue), 42);
}
Future captureThisViaMethodTearOff() async {
print('captureThisViaMethodTearOff');
await testWithClosure(send42, 42);
}
Future captureInvalidObject() async {
print('captureInvalidObject');
final invalidObject = ClassWithNativeFields();
send42(SendPort sendPort) {
'$invalidObject'; // Use an object that cannot be copied.
sendPort.send(42);
}
throwsAsync<ArgumentError>(() => testWithClosure(send42, 42));
}
Future captureRehashThrower() async {
print('captureRehashThrower');
HashThrower.throwOnHashCode = false;
final hashThrower = {HashThrower()};
send42(SendPort sendPort) {
'$hashThrower'; // Use an object that cannot be deserialized.
sendPort.send(42);
}
throwsAsync<IsolateSpawnException>(() => testWithClosure(send42, 42));
}
Future throwsAsync<T>(Future Function() fun) async {
try {
await fun();
} catch (e) {
if (e is T) return;
rethrow;
}
throw 'Function failed to throw ArgumentError';
}
}
main() async {
await ClosureEntrypointTester().run();
}

View file

@ -0,0 +1,113 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// VMOptions=--enable-isolate-groups --no-enable-fast-object-copy
// VMOptions=--enable-isolate-groups --enable-fast-object-copy
// VMOptions=--enable-isolate-groups --no-enable-fast-object-copy --gc-on-foc-slow-path --force-evacuation
// VMOptions=--enable-isolate-groups --enable-fast-object-copy --gc-on-foc-slow-path --force-evacuation
// The tests in this file will only succeed when isolate groups are enabled
// (hence the VMOptions above).
// @dart = 2.9
import 'dart:async';
import 'dart:isolate';
import 'package:expect/expect.dart';
import 'fast_object_copy_test.dart' show ClassWithNativeFields;
class HashThrower {
static bool throwOnHashCode = true;
const HashThrower();
int get hashCode => throwOnHashCode ? throw 'failing' : 2;
bool operator ==(other) => identical(this, other);
}
Future testWithClosure<T>(
void Function(SendPort) entrypoint, T expectedResult) async {
final rp = ReceivePort();
try {
await Isolate.spawn(entrypoint, rp.sendPort);
Expect.equals(expectedResult, await rp.first);
} finally {
rp.close();
}
}
class ClosureEntrypointTester {
int instanceValue = 42;
void send42(SendPort sendPort) => sendPort.send(42);
Future run() async {
await noCapturedVariablesTest();
await capturedInt();
await capturedInstanceInt();
await captureThisViaMethodTearOff();
await captureInvalidObject();
await captureRehashThrower();
}
Future noCapturedVariablesTest() async {
print('noCapturedVariablesTest');
await testWithClosure((SendPort s) => s.send(42), 42);
}
Future capturedInt() async {
print('capturedInt');
int value = 42;
await testWithClosure((SendPort s) => s.send(value), 42);
}
Future capturedInstanceInt() async {
print('capturedInstanceValue');
await testWithClosure((SendPort s) => s.send(this.instanceValue), 42);
}
Future captureThisViaMethodTearOff() async {
print('captureThisViaMethodTearOff');
await testWithClosure(send42, 42);
}
Future captureInvalidObject() async {
print('captureInvalidObject');
final invalidObject = ClassWithNativeFields();
send42(SendPort sendPort) {
'$invalidObject'; // Use an object that cannot be copied.
sendPort.send(42);
}
throwsAsync<ArgumentError>(() => testWithClosure(send42, 42));
}
Future captureRehashThrower() async {
print('captureRehashThrower');
HashThrower.throwOnHashCode = false;
final hashThrower = {HashThrower()};
send42(SendPort sendPort) {
'$hashThrower'; // Use an object that cannot be deserialized.
sendPort.send(42);
}
throwsAsync<IsolateSpawnException>(() => testWithClosure(send42, 42));
}
Future throwsAsync<T>(Future Function() fun) async {
try {
await fun();
} catch (e) {
if (e is T) return;
rethrow;
}
throw 'Function failed to throw ArgumentError';
}
}
main() async {
await ClosureEntrypointTester().run();
}

View file

@ -3722,40 +3722,43 @@ std::unique_ptr<Message> WriteApiMessage(Zone* zone,
return serializer.Finish(dest_port, priority);
}
ObjectPtr ReadObjectGraphCopyMessage(Thread* thread, PersistentHandle* handle) {
// msg_array = [
// <message>,
// <collection-lib-objects-to-rehash>,
// <core-lib-objects-to-rehash>,
// ]
Zone* zone = thread->zone();
Object& msg_obj = Object::Handle(zone);
const auto& msg_array = Array::Handle(zone, Array::RawCast(handle->ptr()));
ASSERT(msg_array.Length() == 3);
msg_obj = msg_array.At(0);
if (msg_array.At(1) != Object::null()) {
const auto& objects_to_rehash = Object::Handle(zone, msg_array.At(1));
auto& result = Object::Handle(zone);
result = DartLibraryCalls::RehashObjectsInDartCollection(thread,
objects_to_rehash);
if (result.ptr() != Object::null()) {
msg_obj = result.ptr();
}
}
if (msg_array.At(2) != Object::null()) {
const auto& objects_to_rehash = Object::Handle(zone, msg_array.At(2));
auto& result = Object::Handle(zone);
result =
DartLibraryCalls::RehashObjectsInDartCore(thread, objects_to_rehash);
if (result.ptr() != Object::null()) {
msg_obj = result.ptr();
}
}
return msg_obj.ptr();
}
ObjectPtr ReadMessage(Thread* thread, Message* message) {
if (message->IsRaw()) {
return message->raw_obj();
} else if (message->IsPersistentHandle()) {
// msg_array = [
// <message>,
// <collection-lib-objects-to-rehash>,
// <core-lib-objects-to-rehash>,
// ]
Zone* zone = thread->zone();
Object& msg_obj = Object::Handle(zone);
const auto& msg_array = Array::Handle(
zone, Array::RawCast(message->persistent_handle()->ptr()));
ASSERT(msg_array.Length() == 3);
msg_obj = msg_array.At(0);
if (msg_array.At(1) != Object::null()) {
const auto& objects_to_rehash = Object::Handle(zone, msg_array.At(1));
auto& result = Object::Handle(zone);
result = DartLibraryCalls::RehashObjectsInDartCollection(
thread, objects_to_rehash);
if (result.ptr() != Object::null()) {
msg_obj = result.ptr();
}
}
if (msg_array.At(2) != Object::null()) {
const auto& objects_to_rehash = Object::Handle(zone, msg_array.At(2));
auto& result = Object::Handle(zone);
result =
DartLibraryCalls::RehashObjectsInDartCore(thread, objects_to_rehash);
if (result.ptr() != Object::null()) {
msg_obj = result.ptr();
}
}
return msg_obj.ptr();
return ReadObjectGraphCopyMessage(thread, message->persistent_handle());
} else {
RELEASE_ASSERT(message->IsSnapshot());
MessageDeserializer deserializer(thread, message);

View file

@ -24,6 +24,8 @@ std::unique_ptr<Message> WriteApiMessage(Zone* zone,
Dart_Port dest_port,
Message::Priority priority);
ObjectPtr ReadObjectGraphCopyMessage(Thread* thread, PersistentHandle* handle);
ObjectPtr ReadMessage(Thread* thread, Message* message);
Dart_CObject* ReadApiMessage(Zone* zone, Message* message);

View file

@ -1710,7 +1710,12 @@ class ObjectGraphCopier {
thread_->isolate()->set_forward_table_old(nullptr);
}
// Result will be [<msg>, <objects-in-msg-to-rehash>]
// Result will be
// [
// <message>,
// <collection-lib-objects-to-rehash>,
// <core-lib-objects-to-rehash>,
// ]
ObjectPtr CopyObjectGraph(const Object& root) {
const char* volatile exception_msg = nullptr;
auto& result = Object::Handle(zone_);

View file

@ -13,9 +13,13 @@ class ObjectPtr;
// Makes a transitive copy of the object graph referenced by [object]. Will not
// copy objects that can be safely shared - due to being immutable.
//
// The result will be an array of length 2 of the format
// The result will be an array of length 3 of the format
//
// [<copy-of-root>, <array-of-objects-to-rehash / null>]
// [
// <message>,
// <collection-lib-objects-to-rehash>,
// <core-lib-objects-to-rehash>,
// ]
//
// If the array of objects to rehash is not `null` the receiver should re-hash
// those objects.