mirror of
https://github.com/dart-lang/sdk
synced 2024-10-01 19:14:49 +00:00
[vm/isolate] Add TransferableTypedData class that allows low-cost passing of Uint8List between isolates.
TransferableTypedData instances are one-use kind of thing: once receiver materializes it, it can't be used again, once sender sends it out to an isolate, sender can't send it to different isolate. Example of use: sender isolate: ``` Future<TransferableTypedData> consolidateHttpClientResponseBytes(HttpClientResponse response) { final completer = Completer<TransferableTypedData>(); final chunks = <Uint8List>[]; response.listen((List<int> chunk) { chunks.add(chunk); }, onDone: () { completer.complete(TransferableTypedData.fromList(chunks)); }); return completer.future; } ... sendPort.send(await consolidateHttpClientResponseBytes(response)); ``` receiver isolate: ``` RawReceivePort port = RawReceivePort((TransferableTypedData transferable) { Uint8List content = transferable.materialize().asUint8List(); ... }); ``` 31959[tr] and 31960[tr] tests were inspired by dartbug.com/31959, dartbug.com/31960 that this CL attempts to address: ``` ╰─➤ out/ReleaseX64/dart 31960.dart sending... 163ms for round-trip sending... 81ms for round-trip sending... 20ms for round-trip sending... 14ms for round-trip sending... 20ms for round-trip sending... 14ms for round-trip ``` (notice no "since last checking" pauses") vs ``` ╰─➤ out/ReleaseX64/dart 31960.dart sending... 154ms since last checkin 174ms for round-trip sending... 68ms since last checkin 9ms since last checkin 171ms for round-trip sending... 13ms since last checkin 108ms for round-trip sending... 14ms since last checkin 108ms for round-trip sending... 14ms since last checkin 107ms for round-trip ``` Change-Id: I0fcb5ce285394f498c3f1db4414204531f98199d Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/99623 Commit-Queue: Alexander Aprelev <aam@google.com> Reviewed-by: Ryan Macnak <rmacnak@google.com> Reviewed-by: Lasse R.H. Nielsen <lrn@google.com> Reviewed-by: Martin Kustermann <kustermann@google.com>
This commit is contained in:
parent
527dffd21b
commit
4ccae238ea
|
@ -1,4 +1,12 @@
|
|||
## 2.3.2-dev.XX.0
|
||||
|
||||
### Core library changes
|
||||
|
||||
#### `dart:isolate`
|
||||
|
||||
* `TransferableTypedData` class was added to facilitate faster cross-isolate
|
||||
communication of `Uint8List` data.
|
||||
|
||||
(Add new changes here, and they will be copied to the change section for the
|
||||
next dev version)
|
||||
|
||||
|
|
|
@ -434,4 +434,120 @@ DEFINE_NATIVE_ENTRY(Isolate_sendOOB, 0, 2) {
|
|||
return Object::null();
|
||||
}
|
||||
|
||||
static void ExternalTypedDataFinalizer(void* isolate_callback_data,
|
||||
Dart_WeakPersistentHandle handle,
|
||||
void* peer) {
|
||||
free(peer);
|
||||
}
|
||||
|
||||
static intptr_t GetUint8SizeOrThrow(const Instance& instance) {
|
||||
// From the Dart side we are guaranteed that the type of [instance] is a
|
||||
// subtype of TypedData.
|
||||
if (instance.IsTypedDataBase()) {
|
||||
return TypedDataBase::Cast(instance).LengthInBytes();
|
||||
}
|
||||
|
||||
// This can happen if [instance] is `null` or an instance of a 3rd party class
|
||||
// which implements [TypedData].
|
||||
Exceptions::ThrowArgumentError(instance);
|
||||
}
|
||||
|
||||
DEFINE_NATIVE_ENTRY(TransferableTypedData_factory, 0, 2) {
|
||||
ASSERT(
|
||||
TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
|
||||
|
||||
GET_NON_NULL_NATIVE_ARGUMENT(Instance, array_instance,
|
||||
arguments->NativeArgAt(1));
|
||||
|
||||
Array& array = Array::Handle();
|
||||
intptr_t array_length;
|
||||
if (array_instance.IsGrowableObjectArray()) {
|
||||
const auto& growable_array = GrowableObjectArray::Cast(array_instance);
|
||||
array ^= growable_array.data();
|
||||
array_length = growable_array.Length();
|
||||
} else if (array_instance.IsArray()) {
|
||||
array ^= Array::Cast(array_instance).raw();
|
||||
array_length = array.Length();
|
||||
} else {
|
||||
Exceptions::ThrowArgumentError(array_instance);
|
||||
UNREACHABLE();
|
||||
}
|
||||
Instance& instance = Instance::Handle();
|
||||
unsigned long long total_bytes = 0;
|
||||
const unsigned long kMaxBytes =
|
||||
TypedData::MaxElements(kTypedDataUint8ArrayCid);
|
||||
for (intptr_t i = 0; i < array_length; i++) {
|
||||
instance ^= array.At(i);
|
||||
total_bytes += GetUint8SizeOrThrow(instance);
|
||||
if (total_bytes > kMaxBytes) {
|
||||
const Array& error_args = Array::Handle(Array::New(3));
|
||||
error_args.SetAt(0, array);
|
||||
error_args.SetAt(1, String::Handle(String::New("data")));
|
||||
error_args.SetAt(2,
|
||||
String::Handle(String::NewFormatted(
|
||||
"Aggregated list exceeds max size %ld", kMaxBytes)));
|
||||
Exceptions::ThrowByType(Exceptions::kArgumentValue, error_args);
|
||||
UNREACHABLE();
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t* data = reinterpret_cast<uint8_t*>(malloc(total_bytes));
|
||||
if (data == nullptr) {
|
||||
const Instance& exception =
|
||||
Instance::Handle(thread->isolate()->object_store()->out_of_memory());
|
||||
Exceptions::Throw(thread, exception);
|
||||
UNREACHABLE();
|
||||
}
|
||||
intptr_t offset = 0;
|
||||
for (intptr_t i = 0; i < array_length; i++) {
|
||||
instance ^= array.At(i);
|
||||
|
||||
{
|
||||
NoSafepointScope no_safepoint;
|
||||
const auto& typed_data = TypedDataBase::Cast(instance);
|
||||
const intptr_t length_in_bytes = typed_data.LengthInBytes();
|
||||
|
||||
void* source = typed_data.DataAddr(0);
|
||||
// The memory does not overlap.
|
||||
memcpy(data + offset, source, length_in_bytes);
|
||||
offset += length_in_bytes;
|
||||
}
|
||||
}
|
||||
ASSERT(static_cast<unsigned long>(offset) == total_bytes);
|
||||
return TransferableTypedData::New(data, total_bytes);
|
||||
}
|
||||
|
||||
DEFINE_NATIVE_ENTRY(TransferableTypedData_materialize, 0, 1) {
|
||||
GET_NON_NULL_NATIVE_ARGUMENT(TransferableTypedData, t,
|
||||
arguments->NativeArgAt(0));
|
||||
|
||||
void* peer;
|
||||
{
|
||||
NoSafepointScope no_safepoint;
|
||||
peer = thread->heap()->GetPeer(t.raw());
|
||||
// Assume that object's Peer is only used to track transferrability state.
|
||||
ASSERT(peer != nullptr);
|
||||
}
|
||||
|
||||
TransferableTypedDataPeer* tpeer =
|
||||
reinterpret_cast<TransferableTypedDataPeer*>(peer);
|
||||
const intptr_t length = tpeer->length();
|
||||
uint8_t* data = tpeer->data();
|
||||
if (data == nullptr) {
|
||||
const auto& error = String::Handle(String::New(
|
||||
"Attempt to materialize object that was transferred already."));
|
||||
Exceptions::ThrowArgumentError(error);
|
||||
UNREACHABLE();
|
||||
}
|
||||
tpeer->ClearData();
|
||||
|
||||
const ExternalTypedData& typed_data = ExternalTypedData::Handle(
|
||||
ExternalTypedData::New(kExternalTypedDataUint8ArrayCid, data, length,
|
||||
thread->heap()->SpaceForExternal(length)));
|
||||
FinalizablePersistentHandle::New(thread->isolate(), typed_data,
|
||||
/* peer= */ data,
|
||||
&ExternalTypedDataFinalizer, length);
|
||||
return typed_data.raw();
|
||||
}
|
||||
|
||||
} // namespace dart
|
||||
|
|
|
@ -7,12 +7,13 @@
|
|||
/// used by patches of that library. We plan to change this when we have a
|
||||
/// shared front end and simply use parts.
|
||||
|
||||
import "dart:_internal" show VMLibraryHooks, patch;
|
||||
import "dart:_internal" show ClassID, VMLibraryHooks, patch;
|
||||
|
||||
import "dart:async"
|
||||
show Completer, Future, Stream, StreamController, StreamSubscription, Timer;
|
||||
|
||||
import "dart:collection" show HashMap;
|
||||
import "dart:typed_data" show ByteBuffer, TypedData, Uint8List;
|
||||
|
||||
/// These are the additional parts of this patch library:
|
||||
// part "timer_impl.dart";
|
||||
|
@ -671,3 +672,33 @@ class Isolate {
|
|||
|
||||
static String _getCurrentRootUriStr() native "Isolate_getCurrentRootUriStr";
|
||||
}
|
||||
|
||||
@patch
|
||||
abstract class TransferableTypedData {
|
||||
@patch
|
||||
factory TransferableTypedData.fromList(List<TypedData> chunks) {
|
||||
if (chunks == null) {
|
||||
throw ArgumentError(chunks);
|
||||
}
|
||||
final int cid = ClassID.getID(chunks);
|
||||
if (cid != ClassID.cidArray &&
|
||||
cid != ClassID.cidGrowableObjectArray &&
|
||||
cid != ClassID.cidImmutableArray) {
|
||||
chunks = List.unmodifiable(chunks);
|
||||
}
|
||||
return _TransferableTypedDataImpl(chunks);
|
||||
}
|
||||
}
|
||||
|
||||
@pragma("vm:entry-point")
|
||||
class _TransferableTypedDataImpl implements TransferableTypedData {
|
||||
factory _TransferableTypedDataImpl(List<TypedData> list)
|
||||
native "TransferableTypedData_factory";
|
||||
|
||||
ByteBuffer materialize() {
|
||||
return _materializeIntoUint8List().buffer;
|
||||
}
|
||||
|
||||
Uint8List _materializeIntoUint8List()
|
||||
native "TransferableTypedData_materialize";
|
||||
}
|
||||
|
|
90
runtime/tests/vm/dart/issue_31959_31960_test.dart
Normal file
90
runtime/tests/vm/dart/issue_31959_31960_test.dart
Normal file
|
@ -0,0 +1,90 @@
|
|||
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
import 'dart:async';
|
||||
import 'dart:isolate';
|
||||
import 'dart:math';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:async_helper/async_helper.dart' show asyncStart, asyncEnd;
|
||||
import 'package:expect/expect.dart';
|
||||
|
||||
Uint8List generateSampleList(final size) {
|
||||
final list = Uint8List(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
list[i] = i % 243;
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
void validateReceivedList(final expectedSize, final list) {
|
||||
Expect.equals(expectedSize, list.length);
|
||||
// probe few elements
|
||||
for (int i = 0; i < list.length; i += max<num>(1, expectedSize ~/ 1000)) {
|
||||
Expect.equals(i % 243, list[i]);
|
||||
}
|
||||
}
|
||||
|
||||
Future<Null> testSend(
|
||||
bool transferable, int toIsolateSize, int fromIsolateSize) async {
|
||||
asyncStart();
|
||||
final port = ReceivePort();
|
||||
final inbox = StreamIterator(port);
|
||||
await Isolate.spawn(isolateMain,
|
||||
[transferable, toIsolateSize, fromIsolateSize, port.sendPort]);
|
||||
await inbox.moveNext();
|
||||
final outbox = inbox.current;
|
||||
final workWatch = Stopwatch();
|
||||
final data = generateSampleList(toIsolateSize);
|
||||
int count = 10;
|
||||
workWatch.start();
|
||||
while (count-- > 0) {
|
||||
outbox.send(transferable ? TransferableTypedData.fromList([data]) : data);
|
||||
await inbox.moveNext();
|
||||
validateReceivedList(
|
||||
fromIsolateSize,
|
||||
transferable
|
||||
? inbox.current.materialize().asUint8List()
|
||||
: inbox.current);
|
||||
}
|
||||
print('total ${workWatch.elapsedMilliseconds}ms');
|
||||
outbox.send(null);
|
||||
port.close();
|
||||
asyncEnd();
|
||||
}
|
||||
|
||||
main() async {
|
||||
asyncStart();
|
||||
int bignum = 100 * 1000 * 1000;
|
||||
await testSend(false, bignum, 1); // none
|
||||
await testSend(true, bignum, 1); // 31959tr
|
||||
await testSend(false, bignum, 1); // 31960
|
||||
await testSend(true, bignum, 1); // 31960tr
|
||||
asyncEnd();
|
||||
}
|
||||
|
||||
Future<Null> isolateMain(List config) async {
|
||||
bool transferable = config[0];
|
||||
int toIsolateSize = config[1];
|
||||
int fromIsolateSize = config[2];
|
||||
SendPort outbox = config[3];
|
||||
|
||||
final port = ReceivePort();
|
||||
final inbox = StreamIterator(port);
|
||||
outbox.send(port.sendPort);
|
||||
final data = generateSampleList(fromIsolateSize);
|
||||
while (true) {
|
||||
await inbox.moveNext();
|
||||
if (inbox.current == null) {
|
||||
break;
|
||||
}
|
||||
validateReceivedList(
|
||||
toIsolateSize,
|
||||
transferable
|
||||
? inbox.current.materialize().asUint8List()
|
||||
: inbox.current);
|
||||
outbox.send(transferable ? TransferableTypedData.fromList([data]) : data);
|
||||
}
|
||||
port.close();
|
||||
}
|
130
runtime/tests/vm/dart/transferable_test.dart
Normal file
130
runtime/tests/vm/dart/transferable_test.dart
Normal file
|
@ -0,0 +1,130 @@
|
|||
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
// Test that validates that transferables are faster than regular typed data.
|
||||
|
||||
import 'dart:async';
|
||||
import 'dart:isolate';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import "package:expect/expect.dart";
|
||||
|
||||
const int toIsolateSize = 100 * 1024 * 1024;
|
||||
const int fromIsolateSize = 100 * 1024 * 1024;
|
||||
|
||||
const int nIterations = 5;
|
||||
|
||||
int iteration;
|
||||
bool keepTimerRunning;
|
||||
|
||||
main() async {
|
||||
keepTimerRunning = true;
|
||||
|
||||
print('--- standard');
|
||||
iteration = nIterations;
|
||||
final stopwatch = new Stopwatch()..start();
|
||||
await runBatch(useTransferable: false);
|
||||
final standard = stopwatch.elapsedMilliseconds;
|
||||
|
||||
print('--- transferable');
|
||||
iteration = nIterations;
|
||||
stopwatch.reset();
|
||||
await runBatch(useTransferable: true);
|
||||
final transferable = stopwatch.elapsedMilliseconds;
|
||||
print(
|
||||
'standard($standard ms)/transferable($transferable ms): ${standard / transferable}x');
|
||||
Expect.isTrue(standard / transferable > 1.2);
|
||||
keepTimerRunning = false;
|
||||
}
|
||||
|
||||
packageList(Uint8List data, bool useTransferable) {
|
||||
return useTransferable
|
||||
? TransferableTypedData.fromList(<Uint8List>[data])
|
||||
: data;
|
||||
}
|
||||
|
||||
packageByteData(ByteData data, bool useTransferable) {
|
||||
return useTransferable
|
||||
? TransferableTypedData.fromList(<Uint8List>[data.buffer.asUint8List()])
|
||||
: data;
|
||||
}
|
||||
|
||||
class StartMessage {
|
||||
final SendPort sendPort;
|
||||
final bool useTransferable;
|
||||
|
||||
StartMessage(this.sendPort, this.useTransferable);
|
||||
}
|
||||
|
||||
runBatch({bool useTransferable}) async {
|
||||
Timer.run(idleTimer);
|
||||
final port = ReceivePort();
|
||||
final inbox = StreamIterator<dynamic>(port);
|
||||
final worker = await Isolate.spawn(
|
||||
isolateMain, StartMessage(port.sendPort, useTransferable),
|
||||
paused: true);
|
||||
final workerCompleted = Completer<bool>();
|
||||
final workerExitedPort = ReceivePort()
|
||||
..listen((_) => workerCompleted.complete(true));
|
||||
worker.addOnExitListener(workerExitedPort.sendPort);
|
||||
worker.resume(worker.pauseCapability);
|
||||
|
||||
await inbox.moveNext();
|
||||
final outbox = inbox.current;
|
||||
final workWatch = new Stopwatch();
|
||||
final data = new Uint8List(toIsolateSize);
|
||||
|
||||
while (iteration-- > 0) {
|
||||
final packagedData = packageList(data, useTransferable);
|
||||
workWatch.start();
|
||||
outbox.send(packagedData);
|
||||
await inbox.moveNext();
|
||||
|
||||
final received = inbox.current;
|
||||
final receivedData =
|
||||
received is TransferableTypedData ? received.materialize() : received;
|
||||
int time = workWatch.elapsedMilliseconds;
|
||||
print('${time}ms for round-trip');
|
||||
workWatch.reset();
|
||||
}
|
||||
outbox.send(null);
|
||||
|
||||
await workerCompleted.future;
|
||||
workerExitedPort.close();
|
||||
port.close();
|
||||
}
|
||||
|
||||
Future<Null> isolateMain(StartMessage startMessage) async {
|
||||
final port = new ReceivePort();
|
||||
final inbox = new StreamIterator<dynamic>(port);
|
||||
startMessage.sendPort.send(port.sendPort);
|
||||
final data = Uint8List.view(new Uint8List(fromIsolateSize).buffer);
|
||||
while (true) {
|
||||
await inbox.moveNext();
|
||||
final received = inbox.current;
|
||||
if (received == null) {
|
||||
break;
|
||||
}
|
||||
final receivedData =
|
||||
received is TransferableTypedData ? received.materialize() : received;
|
||||
|
||||
final packagedData = packageList(data, startMessage.useTransferable);
|
||||
|
||||
startMessage.sendPort.send(packagedData);
|
||||
}
|
||||
port.close();
|
||||
}
|
||||
|
||||
final Stopwatch idleWatch = new Stopwatch();
|
||||
|
||||
void idleTimer() {
|
||||
idleWatch.stop();
|
||||
final time = idleWatch.elapsedMilliseconds;
|
||||
if (time > 5) print('${time}ms since last checkin');
|
||||
idleWatch.reset();
|
||||
idleWatch.start();
|
||||
if (keepTimerRunning) {
|
||||
Timer.run(idleTimer);
|
||||
}
|
||||
}
|
117
runtime/tests/vm/dart/transferable_throws_test.dart
Normal file
117
runtime/tests/vm/dart/transferable_throws_test.dart
Normal file
|
@ -0,0 +1,117 @@
|
|||
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
// Test that ensures correct exception is thrown when attempting to use
|
||||
// transferred transferables.
|
||||
|
||||
import 'dart:async';
|
||||
import 'dart:collection';
|
||||
import 'dart:core';
|
||||
import 'dart:io';
|
||||
import 'dart:isolate';
|
||||
import 'dart:typed_data';
|
||||
import 'dart:math';
|
||||
|
||||
import "package:expect/expect.dart";
|
||||
|
||||
throwsIfMaterializeAfterSend() {
|
||||
final rp = ReceivePort();
|
||||
final transferable = TransferableTypedData.fromList([Uint8List(1024)]);
|
||||
rp.sendPort.send(transferable);
|
||||
Expect.throwsArgumentError(() => transferable.materialize());
|
||||
rp.close();
|
||||
}
|
||||
|
||||
throwsIfSendMoreThanOnce() {
|
||||
final rp = ReceivePort();
|
||||
final bytes = Uint8List(1024);
|
||||
final transferable = TransferableTypedData.fromList([bytes]);
|
||||
rp.sendPort.send(transferable);
|
||||
Expect.throwsArgumentError(() => rp.sendPort.send(transferable));
|
||||
rp.close();
|
||||
}
|
||||
|
||||
throwsIfMaterializeMoreThanOnce() {
|
||||
final transferable = TransferableTypedData.fromList([Uint8List(1024)]);
|
||||
transferable.materialize();
|
||||
Expect.throwsArgumentError(() => transferable.materialize());
|
||||
}
|
||||
|
||||
throwsIfReceiverMaterializesMoreThanOnce() async {
|
||||
final rp = ReceivePort();
|
||||
final completer = Completer<List>();
|
||||
final isolateErrors = ReceivePort()..listen((e) => completer.complete(e));
|
||||
await Isolate.spawn(
|
||||
receiver, TransferableTypedData.fromList([Uint8List(1024)]),
|
||||
onError: isolateErrors.sendPort);
|
||||
final error = await completer.future;
|
||||
Expect.equals(
|
||||
error[0],
|
||||
"Invalid argument(s): Attempt to materialize object that was"
|
||||
" transferred already.");
|
||||
isolateErrors.close();
|
||||
rp.close();
|
||||
}
|
||||
|
||||
void receiver(final transferable) {
|
||||
transferable.materialize();
|
||||
transferable.materialize();
|
||||
}
|
||||
|
||||
throwsIfCummulativeListIsTooLargeOn32bitPlatform() {
|
||||
try {
|
||||
int maxUint8ListSize = pow(2, 30);
|
||||
// Check whether we are on 32-bit or 64-bit platform.
|
||||
new Uint8List(maxUint8ListSize);
|
||||
// On 64-bit platform we will have difficulty allocating large enough
|
||||
// Uint8List to verify "too large" use case, so do nothing.
|
||||
return;
|
||||
} catch (_) {}
|
||||
|
||||
var halfmax = new Uint8List(pow(2, 29) - 1);
|
||||
Expect.throwsArgumentError(
|
||||
() => TransferableTypedData.fromList([halfmax, halfmax, Uint8List(2)]));
|
||||
}
|
||||
|
||||
throwsIfCummulativeListCantBeAllocated() {
|
||||
// Attempt to create total 1tb uint8list which should fail on 32 and 64-bit
|
||||
// platforms.
|
||||
final bytes100MB = Uint8List(100 * 1024 * 1024);
|
||||
final total1TB = List<Uint8List>.filled(10000, bytes100MB);
|
||||
// Try to make a 1 TB transferable.
|
||||
Expect.throws(() => TransferableTypedData.fromList(total1TB));
|
||||
}
|
||||
|
||||
class MyList<T> extends ListBase<T> {
|
||||
@override
|
||||
int length;
|
||||
|
||||
@override
|
||||
T operator [](int index) => null;
|
||||
@override
|
||||
void operator []=(int index, T value) {}
|
||||
}
|
||||
|
||||
class MyTypedData implements TypedData {
|
||||
noSuchMethod(_) {}
|
||||
}
|
||||
|
||||
main() {
|
||||
throwsIfMaterializeAfterSend();
|
||||
throwsIfSendMoreThanOnce();
|
||||
throwsIfMaterializeMoreThanOnce();
|
||||
throwsIfReceiverMaterializesMoreThanOnce();
|
||||
throwsIfCummulativeListIsTooLargeOn32bitPlatform();
|
||||
if (!Platform.isMacOS) {
|
||||
// this test crashes the process on mac.
|
||||
throwsIfCummulativeListCantBeAllocated();
|
||||
}
|
||||
|
||||
Expect.throwsArgumentError(() => TransferableTypedData.fromList(null));
|
||||
Expect.throwsArgumentError(() => TransferableTypedData.fromList([null]));
|
||||
Expect.throwsArgumentError(
|
||||
() => TransferableTypedData.fromList(MyList<Uint8List>()));
|
||||
Expect.throwsArgumentError(
|
||||
() => TransferableTypedData.fromList([MyTypedData()]));
|
||||
}
|
|
@ -35,6 +35,7 @@ dart/snapshot_version_test: RuntimeError
|
|||
[ $hot_reload || $hot_reload_rollback ]
|
||||
dart/compilation_trace_test: Pass, Slow
|
||||
dart/type_feedback_test: Pass, Slow
|
||||
dart/issue_31959_31960_test: SkipSlow
|
||||
|
||||
[ $compiler != dartk || ($arch != x64 && $arch != simarm && $arch != arm) || $hot_reload || $hot_reload_rollback ]
|
||||
dart/entrypoints/jit/*: SkipByDesign # Only supported in the Dart 2 JIT and AOT, and test optimizations - hence disabled on hotreload bots.
|
||||
|
|
|
@ -387,7 +387,9 @@ namespace dart {
|
|||
V(Ffi_fromFunction, 1) \
|
||||
V(Ffi_dl_open, 1) \
|
||||
V(Ffi_dl_lookup, 2) \
|
||||
V(Ffi_dl_getHandle, 1)
|
||||
V(Ffi_dl_getHandle, 1) \
|
||||
V(TransferableTypedData_factory, 2) \
|
||||
V(TransferableTypedData_materialize, 1)
|
||||
|
||||
// List of bootstrap native entry points used in the dart:mirror library.
|
||||
#define MIRRORS_BOOTSTRAP_NATIVE_LIST(V) \
|
||||
|
|
|
@ -78,7 +78,8 @@ namespace dart {
|
|||
V(WeakProperty) \
|
||||
V(MirrorReference) \
|
||||
V(LinkedHashMap) \
|
||||
V(UserTag)
|
||||
V(UserTag) \
|
||||
V(TransferableTypedData)
|
||||
|
||||
#define CLASS_LIST_ARRAYS(V) \
|
||||
V(Array) \
|
||||
|
|
|
@ -274,17 +274,6 @@ static Dart_Handle GetNativeFieldsOfArgument(NativeArguments* arguments,
|
|||
current_func, field_count, num_fields);
|
||||
}
|
||||
|
||||
Heap::Space SpaceForExternal(Thread* thread, intptr_t size) {
|
||||
Heap* heap = thread->heap();
|
||||
// If 'size' would be a significant fraction of new space, then use old.
|
||||
static const int kExtNewRatio = 16;
|
||||
if (size > (heap->CapacityInWords(Heap::kNew) * kWordSize) / kExtNewRatio) {
|
||||
return Heap::kOld;
|
||||
} else {
|
||||
return Heap::kNew;
|
||||
}
|
||||
}
|
||||
|
||||
static RawObject* Send0Arg(const Instance& receiver, const String& selector) {
|
||||
const intptr_t kTypeArgsLen = 0;
|
||||
const intptr_t kNumArgs = 1;
|
||||
|
@ -2527,7 +2516,7 @@ Dart_NewExternalLatin1String(const uint8_t* latin1_array,
|
|||
return Api::NewHandle(
|
||||
T,
|
||||
String::NewExternal(latin1_array, length, peer, external_allocation_size,
|
||||
callback, SpaceForExternal(T, length)));
|
||||
callback, T->heap()->SpaceForExternal(length)));
|
||||
}
|
||||
|
||||
DART_EXPORT Dart_Handle
|
||||
|
@ -2549,7 +2538,7 @@ Dart_NewExternalUTF16String(const uint16_t* utf16_array,
|
|||
return Api::NewHandle(
|
||||
T,
|
||||
String::NewExternal(utf16_array, length, peer, external_allocation_size,
|
||||
callback, SpaceForExternal(T, bytes)));
|
||||
callback, T->heap()->SpaceForExternal(bytes)));
|
||||
}
|
||||
|
||||
DART_EXPORT Dart_Handle Dart_StringToCString(Dart_Handle object,
|
||||
|
@ -3433,8 +3422,9 @@ static Dart_Handle NewExternalTypedData(
|
|||
Zone* zone = thread->zone();
|
||||
intptr_t bytes = length * ExternalTypedData::ElementSizeInBytes(cid);
|
||||
const ExternalTypedData& result = ExternalTypedData::Handle(
|
||||
zone, ExternalTypedData::New(cid, reinterpret_cast<uint8_t*>(data),
|
||||
length, SpaceForExternal(thread, bytes)));
|
||||
zone,
|
||||
ExternalTypedData::New(cid, reinterpret_cast<uint8_t*>(data), length,
|
||||
thread->heap()->SpaceForExternal(bytes)));
|
||||
if (callback != NULL) {
|
||||
AllocateFinalizableHandle(thread, result, peer, external_allocation_size,
|
||||
callback);
|
||||
|
|
|
@ -15,6 +15,7 @@ struct FinalizableData {
|
|||
void* data;
|
||||
void* peer;
|
||||
Dart_WeakPersistentHandleFinalizer callback;
|
||||
Dart_WeakPersistentHandleFinalizer successful_write_callback;
|
||||
};
|
||||
|
||||
class MessageFinalizableData {
|
||||
|
@ -23,18 +24,24 @@ class MessageFinalizableData {
|
|||
|
||||
~MessageFinalizableData() {
|
||||
for (intptr_t i = position_; i < records_.length(); i++) {
|
||||
records_[i].callback(NULL, NULL, records_[i].peer);
|
||||
records_[i].callback(nullptr, nullptr, records_[i].peer);
|
||||
}
|
||||
}
|
||||
|
||||
void Put(intptr_t external_size,
|
||||
/// If [successful_write_callback] is provided, it's invoked when message
|
||||
/// was serialized successfully.
|
||||
/// [callback] is invoked when serialization failed.
|
||||
void Put(
|
||||
intptr_t external_size,
|
||||
void* data,
|
||||
void* peer,
|
||||
Dart_WeakPersistentHandleFinalizer callback) {
|
||||
Dart_WeakPersistentHandleFinalizer callback,
|
||||
Dart_WeakPersistentHandleFinalizer successful_write_callback = nullptr) {
|
||||
FinalizableData finalizable_data;
|
||||
finalizable_data.data = data;
|
||||
finalizable_data.peer = peer;
|
||||
finalizable_data.callback = callback;
|
||||
finalizable_data.successful_write_callback = successful_write_callback;
|
||||
records_.Add(finalizable_data);
|
||||
external_size_ += external_size;
|
||||
}
|
||||
|
@ -44,6 +51,15 @@ class MessageFinalizableData {
|
|||
return records_[position_++];
|
||||
}
|
||||
|
||||
void SerializationSucceeded() {
|
||||
for (intptr_t i = position_; i < records_.length(); i++) {
|
||||
if (records_[i].successful_write_callback != nullptr) {
|
||||
records_[i].successful_write_callback(nullptr, nullptr,
|
||||
records_[i].peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
intptr_t external_size() const { return external_size_; }
|
||||
|
||||
private:
|
||||
|
|
|
@ -1031,6 +1031,16 @@ void Heap::PrintStatsToTimeline(TimelineEventScope* event, GCReason reason) {
|
|||
#endif // !defined(PRODUCT)
|
||||
}
|
||||
|
||||
Heap::Space Heap::SpaceForExternal(intptr_t size) const {
|
||||
// If 'size' would be a significant fraction of new space, then use old.
|
||||
static const int kExtNewRatio = 16;
|
||||
if (size > (CapacityInWords(Heap::kNew) * kWordSize) / kExtNewRatio) {
|
||||
return Heap::kOld;
|
||||
} else {
|
||||
return Heap::kNew;
|
||||
}
|
||||
}
|
||||
|
||||
NoHeapGrowthControlScope::NoHeapGrowthControlScope()
|
||||
: ThreadStackResource(Thread::Current()) {
|
||||
Heap* heap = reinterpret_cast<Isolate*>(isolate())->heap();
|
||||
|
|
|
@ -301,6 +301,7 @@ class Heap {
|
|||
}
|
||||
void MakeTLABIterable(Thread* thread);
|
||||
void AbandonRemainingTLAB(Thread* thread);
|
||||
Space SpaceForExternal(intptr_t size) const;
|
||||
|
||||
void CollectOnNextAllocation();
|
||||
|
||||
|
|
|
@ -1527,6 +1527,11 @@ RawError* Object::Init(Isolate* isolate,
|
|||
RegisterPrivateClass(cls, Symbols::_SendPortImpl(), isolate_lib);
|
||||
pending_classes.Add(cls);
|
||||
|
||||
cls = Class::New<TransferableTypedData>();
|
||||
RegisterPrivateClass(cls, Symbols::_TransferableTypedDataImpl(),
|
||||
isolate_lib);
|
||||
pending_classes.Add(cls);
|
||||
|
||||
const Class& stacktrace_cls = Class::Handle(zone, Class::New<StackTrace>());
|
||||
RegisterPrivateClass(stacktrace_cls, Symbols::_StackTrace(), core_lib);
|
||||
pending_classes.Add(stacktrace_cls);
|
||||
|
@ -2059,6 +2064,8 @@ RawError* Object::Init(Isolate* isolate,
|
|||
|
||||
cls = Class::New<MirrorReference>();
|
||||
cls = Class::New<UserTag>();
|
||||
|
||||
cls = Class::New<TransferableTypedData>();
|
||||
}
|
||||
return Error::null();
|
||||
}
|
||||
|
@ -21410,6 +21417,40 @@ const char* SendPort::ToCString() const {
|
|||
return "SendPort";
|
||||
}
|
||||
|
||||
static void TransferableTypedDataFinalizer(void* isolate_callback_data,
|
||||
Dart_WeakPersistentHandle handle,
|
||||
void* peer) {
|
||||
delete (reinterpret_cast<TransferableTypedDataPeer*>(peer));
|
||||
}
|
||||
|
||||
RawTransferableTypedData* TransferableTypedData::New(uint8_t* data,
|
||||
intptr_t length,
|
||||
Heap::Space space) {
|
||||
TransferableTypedDataPeer* peer = new TransferableTypedDataPeer(data, length);
|
||||
|
||||
Thread* thread = Thread::Current();
|
||||
TransferableTypedData& result = TransferableTypedData::Handle();
|
||||
{
|
||||
RawObject* raw =
|
||||
Object::Allocate(TransferableTypedData::kClassId,
|
||||
TransferableTypedData::InstanceSize(), space);
|
||||
NoSafepointScope no_safepoint;
|
||||
thread->heap()->SetPeer(raw, peer);
|
||||
result ^= raw;
|
||||
}
|
||||
// Set up finalizer so it frees allocated memory if handle is
|
||||
// garbage-collected.
|
||||
peer->set_handle(FinalizablePersistentHandle::New(
|
||||
thread->isolate(), result, peer, &TransferableTypedDataFinalizer,
|
||||
length));
|
||||
|
||||
return result.raw();
|
||||
}
|
||||
|
||||
const char* TransferableTypedData::ToCString() const {
|
||||
return "TransferableTypedData";
|
||||
}
|
||||
|
||||
const char* Closure::ToCString() const {
|
||||
Zone* zone = Thread::Current()->zone();
|
||||
const Function& fun = Function::Handle(zone, function());
|
||||
|
|
|
@ -8522,12 +8522,22 @@ class TypedDataBase : public Instance {
|
|||
}
|
||||
}
|
||||
|
||||
void* DataAddr(intptr_t byte_offset) const {
|
||||
ASSERT((byte_offset == 0) ||
|
||||
((byte_offset > 0) && (byte_offset < LengthInBytes())));
|
||||
return reinterpret_cast<void*>(Validate(raw_ptr()->data_) + byte_offset);
|
||||
}
|
||||
|
||||
protected:
|
||||
void SetLength(intptr_t value) const {
|
||||
ASSERT(value <= Smi::kMaxValue);
|
||||
StoreSmi(&raw_ptr()->length_, Smi::New(value));
|
||||
}
|
||||
|
||||
virtual uint8_t* Validate(uint8_t* data) const {
|
||||
return UnsafeMutableNonPointer(data);
|
||||
}
|
||||
|
||||
private:
|
||||
friend class Class;
|
||||
|
||||
|
@ -8551,13 +8561,6 @@ class TypedData : public TypedDataBase {
|
|||
// architecture.
|
||||
static const intptr_t kHashBits = 30;
|
||||
|
||||
void* DataAddr(intptr_t byte_offset) const {
|
||||
ASSERT((byte_offset == 0) ||
|
||||
((byte_offset > 0) && (byte_offset < LengthInBytes())));
|
||||
return reinterpret_cast<void*>(UnsafeMutableNonPointer(raw_ptr()->data()) +
|
||||
byte_offset);
|
||||
}
|
||||
|
||||
virtual bool CanonicalizeEquals(const Instance& other) const;
|
||||
virtual uint32_t CanonicalizeHash() const;
|
||||
|
||||
|
@ -8698,12 +8701,6 @@ class ExternalTypedData : public TypedDataBase {
|
|||
// snapshot. Should be independent of word size.
|
||||
static const int kDataSerializationAlignment = 8;
|
||||
|
||||
void* DataAddr(intptr_t byte_offset) const {
|
||||
ASSERT((byte_offset == 0) ||
|
||||
((byte_offset > 0) && (byte_offset < LengthInBytes())));
|
||||
return reinterpret_cast<void*>(raw_ptr()->data_ + byte_offset);
|
||||
}
|
||||
|
||||
#define TYPED_GETTER_SETTER(name, type) \
|
||||
type Get##name(intptr_t byte_offset) const { \
|
||||
return ReadUnaligned(reinterpret_cast<type*>(DataAddr(byte_offset))); \
|
||||
|
@ -8757,6 +8754,8 @@ class ExternalTypedData : public TypedDataBase {
|
|||
}
|
||||
|
||||
protected:
|
||||
virtual uint8_t* Validate(uint8_t* data) const { return data; }
|
||||
|
||||
void SetLength(intptr_t value) const {
|
||||
ASSERT(value <= Smi::kMaxValue);
|
||||
StoreSmi(&raw_ptr()->length_, Smi::New(value));
|
||||
|
@ -8829,6 +8828,9 @@ class TypedDataView : public TypedDataBase {
|
|||
|
||||
RawSmi* offset_in_bytes() const { return raw_ptr()->offset_in_bytes_; }
|
||||
|
||||
protected:
|
||||
virtual uint8_t* Validate(uint8_t* data) const { return data; }
|
||||
|
||||
private:
|
||||
void RecomputeDataField() { raw()->RecomputeDataField(); }
|
||||
|
||||
|
@ -9195,6 +9197,50 @@ class SendPort : public Instance {
|
|||
friend class Class;
|
||||
};
|
||||
|
||||
// This is allocated when new instance of TransferableTypedData is created in
|
||||
// [TransferableTypedData::New].
|
||||
class TransferableTypedDataPeer {
|
||||
public:
|
||||
// [data] backing store should be malloc'ed, not new'ed.
|
||||
TransferableTypedDataPeer(uint8_t* data, intptr_t length)
|
||||
: data_(data), length_(length), handle_(nullptr) {}
|
||||
|
||||
~TransferableTypedDataPeer() { free(data_); }
|
||||
|
||||
uint8_t* data() const { return data_; }
|
||||
intptr_t length() const { return length_; }
|
||||
FinalizablePersistentHandle* handle() const { return handle_; }
|
||||
void set_handle(FinalizablePersistentHandle* handle) { handle_ = handle; }
|
||||
|
||||
void ClearData() {
|
||||
data_ = nullptr;
|
||||
length_ = 0;
|
||||
handle_ = nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
uint8_t* data_;
|
||||
intptr_t length_;
|
||||
FinalizablePersistentHandle* handle_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(TransferableTypedDataPeer);
|
||||
};
|
||||
|
||||
class TransferableTypedData : public Instance {
|
||||
public:
|
||||
static RawTransferableTypedData* New(uint8_t* data,
|
||||
intptr_t len,
|
||||
Heap::Space space = Heap::kNew);
|
||||
|
||||
static intptr_t InstanceSize() {
|
||||
return RoundedAllocationSize(sizeof(RawTransferableTypedData));
|
||||
}
|
||||
|
||||
private:
|
||||
FINAL_HEAP_OBJECT_IMPLEMENTATION(TransferableTypedData, Instance);
|
||||
friend class Class;
|
||||
};
|
||||
|
||||
// Internal stacktrace object used in exceptions for printing stack traces.
|
||||
class StackTrace : public Instance {
|
||||
public:
|
||||
|
|
|
@ -1455,6 +1455,10 @@ void SendPort::PrintJSONImpl(JSONStream* stream, bool ref) const {
|
|||
Instance::PrintJSONImpl(stream, ref);
|
||||
}
|
||||
|
||||
void TransferableTypedData::PrintJSONImpl(JSONStream* stream, bool ref) const {
|
||||
Instance::PrintJSONImpl(stream, ref);
|
||||
}
|
||||
|
||||
void ClosureData::PrintJSONImpl(JSONStream* stream, bool ref) const {
|
||||
Object::PrintJSONImpl(stream, ref);
|
||||
}
|
||||
|
|
|
@ -478,6 +478,7 @@ NULL_VISITOR(Float64x2)
|
|||
NULL_VISITOR(Bool)
|
||||
NULL_VISITOR(Capability)
|
||||
NULL_VISITOR(SendPort)
|
||||
NULL_VISITOR(TransferableTypedData)
|
||||
REGULAR_VISITOR(Pointer)
|
||||
NULL_VISITOR(DynamicLibrary)
|
||||
VARIABLE_NULL_VISITOR(Instructions, Instructions::Size(raw_obj))
|
||||
|
|
|
@ -723,6 +723,7 @@ class RawObject {
|
|||
friend class ObjectOffsetTrait; // GetClassId
|
||||
friend class WriteBarrierUpdateVisitor; // CheckHeapPointerStore
|
||||
friend class OffsetsTable;
|
||||
friend class RawTransferableTypedData; // GetClassId
|
||||
|
||||
DISALLOW_ALLOCATION();
|
||||
DISALLOW_IMPLICIT_CONSTRUCTORS(RawObject);
|
||||
|
@ -2411,6 +2412,11 @@ class RawReceivePort : public RawInstance {
|
|||
VISIT_TO(RawObject*, handler_)
|
||||
};
|
||||
|
||||
class RawTransferableTypedData : public RawInstance {
|
||||
RAW_HEAP_OBJECT_IMPLEMENTATION(TransferableTypedData);
|
||||
VISIT_NOTHING();
|
||||
};
|
||||
|
||||
// VM type for capturing stacktraces when exceptions are thrown,
|
||||
// Currently we don't have any interface that this object is supposed
|
||||
// to implement so we just support the 'toString' method which
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
#include "vm/dart_api_state.h"
|
||||
#include "vm/message.h"
|
||||
#include "vm/native_entry.h"
|
||||
#include "vm/object.h"
|
||||
|
@ -2133,6 +2134,68 @@ void RawSendPort::WriteTo(SnapshotWriter* writer,
|
|||
writer->Write<uint64_t>(ptr()->origin_id_);
|
||||
}
|
||||
|
||||
RawTransferableTypedData* TransferableTypedData::ReadFrom(
|
||||
SnapshotReader* reader,
|
||||
intptr_t object_id,
|
||||
intptr_t tags,
|
||||
Snapshot::Kind kind,
|
||||
bool as_reference) {
|
||||
ASSERT(reader != nullptr);
|
||||
|
||||
ASSERT(!Snapshot::IsFull(kind));
|
||||
const intptr_t length = reader->Read<int32_t>();
|
||||
|
||||
const FinalizableData finalizable_data =
|
||||
static_cast<MessageSnapshotReader*>(reader)->finalizable_data()->Take();
|
||||
uint8_t* data = reinterpret_cast<uint8_t*>(finalizable_data.data);
|
||||
auto& transferableTypedData = TransferableTypedData::ZoneHandle(
|
||||
reader->zone(), TransferableTypedData::New(data, length));
|
||||
reader->AddBackRef(object_id, &transferableTypedData, kIsDeserialized);
|
||||
return transferableTypedData.raw();
|
||||
}
|
||||
|
||||
void RawTransferableTypedData::WriteTo(SnapshotWriter* writer,
|
||||
intptr_t object_id,
|
||||
Snapshot::Kind kind,
|
||||
bool as_reference) {
|
||||
ASSERT(writer != nullptr);
|
||||
ASSERT(GetClassId() == kTransferableTypedDataCid);
|
||||
void* peer = writer->thread()->heap()->GetPeer(this);
|
||||
// Assume that object's Peer is only used to track transferrability state.
|
||||
ASSERT(peer != nullptr);
|
||||
TransferableTypedDataPeer* tpeer =
|
||||
reinterpret_cast<TransferableTypedDataPeer*>(peer);
|
||||
intptr_t length = tpeer->length(); // In bytes.
|
||||
void* data = tpeer->data();
|
||||
if (data == nullptr) {
|
||||
writer->SetWriteException(
|
||||
Exceptions::kArgument,
|
||||
"Illegal argument in isolate message"
|
||||
" : (TransferableTypedData has been transferred already)");
|
||||
return;
|
||||
}
|
||||
|
||||
// Write out the serialization header value for this object.
|
||||
writer->WriteInlinedObjectHeader(object_id);
|
||||
|
||||
writer->WriteIndexedObject(GetClassId());
|
||||
writer->WriteTags(writer->GetObjectTags(this));
|
||||
writer->Write<int32_t>(length);
|
||||
|
||||
static_cast<MessageWriter*>(writer)->finalizable_data()->Put(
|
||||
length, data, tpeer,
|
||||
// Finalizer does nothing - in case of failure to serialize,
|
||||
// [data] remains wrapped in sender's [TransferableTypedData].
|
||||
[](void* data, Dart_WeakPersistentHandle handle, void* peer) {},
|
||||
// This is invoked on successful serialization of the message
|
||||
[](void* data, Dart_WeakPersistentHandle handle, void* peer) {
|
||||
TransferableTypedDataPeer* tpeer =
|
||||
reinterpret_cast<TransferableTypedDataPeer*>(peer);
|
||||
tpeer->handle()->EnsureFreeExternal(Isolate::Current());
|
||||
tpeer->ClearData();
|
||||
});
|
||||
}
|
||||
|
||||
RawStackTrace* StackTrace::ReadFrom(SnapshotReader* reader,
|
||||
intptr_t object_id,
|
||||
intptr_t tags,
|
||||
|
|
|
@ -45,7 +45,8 @@ static bool IsBootstrapedClassId(intptr_t class_id) {
|
|||
RawObject::IsStringClassId(class_id) ||
|
||||
RawObject::IsTypedDataClassId(class_id) ||
|
||||
RawObject::IsExternalTypedDataClassId(class_id) ||
|
||||
RawObject::IsTypedDataViewClassId(class_id) || class_id == kNullCid);
|
||||
RawObject::IsTypedDataViewClassId(class_id) || class_id == kNullCid ||
|
||||
class_id == kTransferableTypedDataCid);
|
||||
}
|
||||
|
||||
static bool IsObjectStoreTypeId(intptr_t index) {
|
||||
|
@ -1483,6 +1484,8 @@ std::unique_ptr<Message> MessageWriter::WriteMessage(
|
|||
}
|
||||
if (has_exception) {
|
||||
ThrowException(exception_type(), exception_msg());
|
||||
} else {
|
||||
finalizable_data_->SerializationSucceeded();
|
||||
}
|
||||
|
||||
MessageFinalizableData* finalizable_data = finalizable_data_;
|
||||
|
|
|
@ -442,6 +442,7 @@ class SnapshotReader : public BaseReader {
|
|||
friend class Script;
|
||||
friend class SignatureData;
|
||||
friend class SubtypeTestCache;
|
||||
friend class TransferableTypedData;
|
||||
friend class Type;
|
||||
friend class TypedDataView;
|
||||
friend class TypeArguments;
|
||||
|
@ -715,6 +716,7 @@ class SnapshotWriter : public BaseWriter {
|
|||
friend class RawScript;
|
||||
friend class RawStackTrace;
|
||||
friend class RawSubtypeTestCache;
|
||||
friend class RawTransferableTypedData;
|
||||
friend class RawType;
|
||||
friend class RawTypedDataView;
|
||||
friend class RawTypeRef;
|
||||
|
|
|
@ -287,6 +287,7 @@ class ObjectPointerVisitor;
|
|||
V(ThrowNewInvocation, "_throwNewInvocation") \
|
||||
V(TopLevel, "::") \
|
||||
V(TruncDivOperator, "~/") \
|
||||
V(TransferableTypedData, "TransferableTypedData") \
|
||||
V(TryFinallyReturnValue, ":try_finally_return_value") \
|
||||
V(TwoByteString, "_TwoByteString") \
|
||||
V(TwoNewlines, "\n\n") \
|
||||
|
@ -403,6 +404,7 @@ class ObjectPointerVisitor;
|
|||
V(_RawReceivePortImpl, "_RawReceivePortImpl") \
|
||||
V(_RegExp, "_RegExp") \
|
||||
V(_SendPortImpl, "_SendPortImpl") \
|
||||
V(_TransferableTypedDataImpl, "_TransferableTypedDataImpl") \
|
||||
V(_Smi, "_Smi") \
|
||||
V(_SourceLocation, "_SourceLocation") \
|
||||
V(_SpecialTypeMirror, "_SpecialTypeMirror") \
|
||||
|
|
|
@ -116,6 +116,13 @@ class Capability {
|
|||
factory Capability() => _unsupported();
|
||||
}
|
||||
|
||||
@patch
|
||||
abstract class TransferableTypedData {
|
||||
@patch
|
||||
factory TransferableTypedData.fromList(List<TypedData> list) =>
|
||||
_unsupported();
|
||||
}
|
||||
|
||||
@NoReifyGeneric()
|
||||
T _unsupported<T>() {
|
||||
throw UnsupportedError('dart:isolate is not supported on dart4web');
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
import "dart:async";
|
||||
import 'dart:_foreign_helper' show JS;
|
||||
import 'dart:_js_helper' show patch;
|
||||
import "dart:typed_data" show ByteData, TypedData, Uint8List;
|
||||
|
||||
@patch
|
||||
class Isolate {
|
||||
|
@ -146,6 +147,14 @@ class Capability {
|
|||
}
|
||||
}
|
||||
|
||||
@patch
|
||||
abstract class TransferableTypedData {
|
||||
@patch
|
||||
factory TransferableTypedData.fromList(List<TypedData> list) {
|
||||
throw new UnsupportedError('TransferableTypedData.fromList');
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the base path added to Uri.base to resolve `package:` Uris.
|
||||
///
|
||||
/// This is used by `Isolate.resolvePackageUri` to load resources. The default
|
||||
|
|
|
@ -18,6 +18,7 @@ library dart.isolate;
|
|||
|
||||
import "dart:async";
|
||||
import "dart:_internal" show Since;
|
||||
import "dart:typed_data" show ByteBuffer, TypedData, Uint8List;
|
||||
|
||||
part "capability.dart";
|
||||
|
||||
|
@ -753,3 +754,36 @@ class RemoteError implements Error {
|
|||
stackTrace = new StackTrace.fromString(stackDescription);
|
||||
String toString() => _description;
|
||||
}
|
||||
|
||||
/*
|
||||
* An efficiently transferable sequence of byte values.
|
||||
*
|
||||
* A [TransferableTypedData] is created from a number of bytes.
|
||||
* This will take time proportional to the number of bytes.
|
||||
*
|
||||
* The [TransferableTypedData] can be moved between isolates, so
|
||||
* sending it through a send port will only take constant time.
|
||||
*
|
||||
* When sent this way, the local transferable can no longer be materialized,
|
||||
* and the received object is now the only way to materialize the data.
|
||||
*/
|
||||
@Since("2.3.2")
|
||||
abstract class TransferableTypedData {
|
||||
/**
|
||||
* Creates a new [TransferableTypedData] containing the bytes of [list].
|
||||
*
|
||||
* It must be possible to create a single [Uint8List] containing the
|
||||
* bytes, so if there are more bytes than what the platform allows in
|
||||
* a single [Uint8List], then creation fails.
|
||||
*/
|
||||
external factory TransferableTypedData.fromList(List<TypedData> list);
|
||||
|
||||
/**
|
||||
* Creates a new [ByteBuffer] containing the bytes stored in this [TransferableTypedData].
|
||||
*
|
||||
* The [TransferableTypedData] is a cross-isolate single-use resource.
|
||||
* This method must not be called more than once on the same underlying
|
||||
* transferable bytes, even if the calls occur in different isolates.
|
||||
*/
|
||||
ByteBuffer materialize();
|
||||
}
|
||||
|
|
83
tests/lib_2/isolate/transferable_failed_to_send_test.dart
Normal file
83
tests/lib_2/isolate/transferable_failed_to_send_test.dart
Normal file
|
@ -0,0 +1,83 @@
|
|||
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
import "dart:io" show ServerSocket;
|
||||
import "dart:isolate";
|
||||
import "dart:typed_data" show ByteData;
|
||||
|
||||
import "package:expect/expect.dart";
|
||||
import "package:async_helper/async_helper.dart";
|
||||
|
||||
void main() async {
|
||||
final port = new ReceivePort();
|
||||
|
||||
// Sending a socket object will result in an error.
|
||||
final socket = await ServerSocket.bind('localhost', 0);
|
||||
|
||||
final x = new ByteData(4);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x.setUint8(i, i);
|
||||
}
|
||||
{
|
||||
final transferableFirst = TransferableTypedData.fromList([x]);
|
||||
Expect.throwsArgumentError(
|
||||
() => port.sendPort.send(<dynamic>[transferableFirst, socket]));
|
||||
// Once TransferableTypedData was sent even if attempt failed, it can't be
|
||||
// materialized.
|
||||
// This need to be changed so that on failed send we should not detach the
|
||||
// buffer form the transferrable. The order should not matter (i.e. if the
|
||||
// error happens before or after the serializer hits a transferrable object)
|
||||
|
||||
final data1 = transferableFirst.materialize().asUint8List();
|
||||
Expect.equals(x.lengthInBytes, data1.length);
|
||||
for (int i = 0; i < data1.length; i++) {
|
||||
Expect.equals(i, data1[i]);
|
||||
}
|
||||
}
|
||||
{
|
||||
final transferableFirst = TransferableTypedData.fromList([x]);
|
||||
Expect.throwsArgumentError(() => port.sendPort
|
||||
.send(<dynamic>[transferableFirst, transferableFirst, socket]));
|
||||
// Once TransferableTypedData was sent even if attempt failed, it can't be
|
||||
// materialized.
|
||||
// This need to be changed so that on failed send we should not detach the
|
||||
// buffer form the transferrable. The order should not matter (i.e. if the
|
||||
// error happens before or after the serializer hits a transferrable object)
|
||||
|
||||
final data1 = transferableFirst.materialize().asUint8List();
|
||||
Expect.equals(x.lengthInBytes, data1.length);
|
||||
for (int i = 0; i < data1.length; i++) {
|
||||
Expect.equals(i, data1[i]);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
final transferableSecond = TransferableTypedData.fromList([x]);
|
||||
Expect.throwsArgumentError(
|
||||
() => port.sendPort.send(<dynamic>[socket, transferableSecond]));
|
||||
// Once TransferableTypedData was sent even if attempt failed, it can't be
|
||||
// materialized.
|
||||
final data2 = transferableSecond.materialize().asUint8List();
|
||||
Expect.equals(x.lengthInBytes, data2.length);
|
||||
for (int i = 0; i < data2.length; i++) {
|
||||
Expect.equals(i, data2[i]);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
final transferableSecond = TransferableTypedData.fromList([x]);
|
||||
Expect.throwsArgumentError(() => port.sendPort
|
||||
.send(<dynamic>[socket, transferableSecond, transferableSecond]));
|
||||
// Once TransferableTypedData was sent even if attempt failed, it can't be
|
||||
// materialized.
|
||||
final data2 = transferableSecond.materialize().asUint8List();
|
||||
Expect.equals(x.lengthInBytes, data2.length);
|
||||
for (int i = 0; i < data2.length; i++) {
|
||||
Expect.equals(i, data2[i]);
|
||||
}
|
||||
}
|
||||
|
||||
socket.close();
|
||||
port.close();
|
||||
}
|
284
tests/lib_2/isolate/transferable_test.dart
Normal file
284
tests/lib_2/isolate/transferable_test.dart
Normal file
|
@ -0,0 +1,284 @@
|
|||
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
import "dart:async";
|
||||
import "dart:collection";
|
||||
import "dart:isolate";
|
||||
import "dart:typed_data";
|
||||
import "package:expect/expect.dart";
|
||||
|
||||
const large = 2 * 1024 * 1024;
|
||||
|
||||
void child(replyPort) {
|
||||
print("Child start");
|
||||
|
||||
print("Child ByteData");
|
||||
dynamic x = new ByteData(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x.setInt8(i, i);
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x.buffer.asUint8List()]));
|
||||
|
||||
print("Child Uint8List");
|
||||
x = new Uint8List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child Uint8List.view");
|
||||
x = new Uint8List.view(x.buffer, 1, 2);
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child Int8List");
|
||||
x = new Int8List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child Uint16List");
|
||||
x = new Uint16List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child Int16List");
|
||||
x = new Int16List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child Uint32List");
|
||||
x = new Uint32List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child Int32List");
|
||||
x = new Int32List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child Uint64List");
|
||||
x = new Uint64List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child Int64List");
|
||||
x = new Int64List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send(TransferableTypedData.fromList([x]));
|
||||
|
||||
print("Child two Uint8Lists");
|
||||
x = new Uint8List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
replyPort.send([
|
||||
TransferableTypedData.fromList([x]),
|
||||
TransferableTypedData.fromList([x])
|
||||
]);
|
||||
|
||||
print("Child same Uint8List twice - materialize first");
|
||||
x = new Uint8List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
var tr = TransferableTypedData.fromList([x]);
|
||||
replyPort.send([tr, tr]);
|
||||
|
||||
print("Child same Uint8List twice - materialize second");
|
||||
x = new Uint8List(large);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
x[i] = i;
|
||||
}
|
||||
tr = TransferableTypedData.fromList([x]);
|
||||
replyPort.send([tr, tr]);
|
||||
|
||||
print("Child done");
|
||||
}
|
||||
|
||||
Future<void> main(List<String> args) async {
|
||||
print("Parent start");
|
||||
|
||||
ReceivePort port = new ReceivePort();
|
||||
Isolate.spawn(child, port.sendPort);
|
||||
StreamIterator<dynamic> incoming = new StreamIterator<dynamic>(port);
|
||||
|
||||
print("Parent ByteData");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
dynamic x = incoming.current.materialize().asByteData();
|
||||
Expect.isTrue(x is ByteData);
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x.getUint8(i));
|
||||
}
|
||||
|
||||
print("Parent Uint8List");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize();
|
||||
Expect.isTrue(x is ByteBuffer);
|
||||
x = x.asUint8List();
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x[i]);
|
||||
}
|
||||
|
||||
print("Parent Uint8List view");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize().asUint8List();
|
||||
Expect.equals(1, x[0]);
|
||||
Expect.equals(2, x[1]);
|
||||
|
||||
print("Parent Int8");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize().asInt8List();
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x[i]);
|
||||
}
|
||||
|
||||
print("Parent Uint16");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize().asUint16List();
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x[i]);
|
||||
}
|
||||
|
||||
print("Parent Int16");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize().asInt16List();
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x[i]);
|
||||
}
|
||||
|
||||
print("Parent Uint32");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize().asUint32List();
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x[i]);
|
||||
}
|
||||
|
||||
print("Parent Int32");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize().asInt32List();
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x[i]);
|
||||
}
|
||||
|
||||
print("Parent Uint64");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize().asUint64List();
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x[i]);
|
||||
}
|
||||
|
||||
print("Parent Int64");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
x = incoming.current.materialize().asInt64List();
|
||||
Expect.equals(large, x.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x[i]);
|
||||
}
|
||||
|
||||
print("Parent two Uint8Lists");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
final x1 = incoming.current[0].materialize().asUint8List();
|
||||
final x2 = incoming.current[1].materialize().asUint8List();
|
||||
Expect.equals(large, x1.length);
|
||||
Expect.equals(large, x2.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, x1[i]);
|
||||
Expect.equals(i, x2[i]);
|
||||
}
|
||||
|
||||
print("Parent same Uint8Lists twice, materialize first");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
final tr0 = incoming.current[0].materialize().asUint8List();
|
||||
Expect.throwsArgumentError(() => incoming.current[1].materialize());
|
||||
Expect.equals(large, tr0.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, tr0[i]);
|
||||
}
|
||||
|
||||
print("Parent same Uint8Lists twice, materialize second");
|
||||
Expect.isTrue(await incoming.moveNext());
|
||||
final tr1 = incoming.current[1].materialize().asUint8List();
|
||||
Expect.throwsArgumentError(() => incoming.current[0].materialize());
|
||||
Expect.equals(large, tr1.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Expect.equals(i, tr1[i]);
|
||||
}
|
||||
|
||||
port.close();
|
||||
print("Parent done");
|
||||
|
||||
testCreateMaterializeInSameIsolate();
|
||||
testIterableToList();
|
||||
testUserExtendedList();
|
||||
}
|
||||
|
||||
testCreateMaterializeInSameIsolate() {
|
||||
// Test same-isolate operation of TransferableTypedData.
|
||||
final Uint8List bytes = new Uint8List(large);
|
||||
for (int i = 0; i < bytes.length; ++i) {
|
||||
bytes[i] = i % 256;
|
||||
}
|
||||
final tr = TransferableTypedData.fromList([bytes]);
|
||||
Expect.listEquals(bytes, tr.materialize().asUint8List());
|
||||
}
|
||||
|
||||
testIterableToList() {
|
||||
// Test that iterable.toList() can be used as an argument.
|
||||
final list1 = Uint8List(10);
|
||||
for (int i = 0; i < list1.length; i++) {
|
||||
list1[i] = i;
|
||||
}
|
||||
final list2 = Uint8List(20);
|
||||
for (int i = 0; i < list2.length; i++) {
|
||||
list2[i] = i + list1.length;
|
||||
}
|
||||
final map = {list1: true, list2: true};
|
||||
Iterable<Uint8List> iterable = map.keys;
|
||||
final result = TransferableTypedData.fromList(iterable.toList())
|
||||
.materialize()
|
||||
.asUint8List();
|
||||
for (int i = 0; i < result.length; i++) {
|
||||
Expect.equals(i, result[i]);
|
||||
}
|
||||
}
|
||||
|
||||
class MyList<E> extends ListBase<E> {
|
||||
List<E> _source;
|
||||
MyList(this._source);
|
||||
int get length => _source.length;
|
||||
void set length(int length) {
|
||||
_source.length = length;
|
||||
}
|
||||
|
||||
E operator [](int index) => _source[index];
|
||||
void operator []=(int index, E value) {
|
||||
_source[index] = value;
|
||||
}
|
||||
}
|
||||
|
||||
testUserExtendedList() {
|
||||
final list = MyList<TypedData>([Uint8List(10)]);
|
||||
TransferableTypedData.fromList(list);
|
||||
}
|
Loading…
Reference in a new issue