[samples/ffi] Sample for asynchronous native port calls

Issue: https://github.com/dart-lang/sdk/issues/37022#issuecomment-567122704

Change-Id: I774befa1d9843c043883038e59c0f8b629bf3c77
Cq-Include-Trybots: luci.dart.try:vm-ffi-android-debug-arm-try,vm-ffi-android-debug-arm64-try,app-kernel-linux-debug-x64-try,vm-kernel-linux-debug-ia32-try,vm-kernel-win-debug-x64-try,vm-kernel-win-debug-ia32-try,vm-kernel-precomp-linux-debug-x64-try,vm-dartkb-linux-release-x64-abi-try,vm-kernel-precomp-android-release-arm64-try,vm-kernel-asan-linux-release-x64-try,vm-kernel-linux-release-simarm-try,vm-kernel-linux-release-simarm64-try,vm-kernel-precomp-android-release-arm_x64-try,vm-kernel-precomp-obfuscate-linux-release-x64-try,dart-sdk-linux-try,analyzer-analysis-server-linux-try,analyzer-linux-release-try,front-end-linux-release-x64-try,vm-kernel-precomp-win-release-x64-try,vm-kernel-mac-debug-x64-try
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/134822
Commit-Queue: Daco Harkes <dacoharkes@google.com>
Reviewed-by: Martin Kustermann <kustermann@google.com>
This commit is contained in:
Daco Harkes 2020-02-18 08:42:16 +00:00 committed by commit-bot@chromium.org
parent 76ef075b71
commit cb60e4b78e
3 changed files with 402 additions and 0 deletions

View file

@ -460,4 +460,273 @@ DART_EXPORT void ExecuteCallback(Work* work_ptr) {
printf("C Da: ExecuteCallback done.\n");
}
////////////////////////////////////////////////////////////////////////////////
// Functions for async callbacks example.
//
// sample_native_port_call.dart
Dart_Port send_port_;
static void FreeFinalizer(void*, Dart_WeakPersistentHandle, void* value) {
free(value);
}
class PendingCall {
public:
PendingCall(void** buffer, size_t* length)
: response_buffer_(buffer), response_length_(length) {
receive_port_ =
Dart_NewNativePort("cpp-response", &PendingCall::HandleResponse,
/*handle_concurrently=*/false);
}
~PendingCall() { Dart_CloseNativePort(receive_port_); }
Dart_Port port() const { return receive_port_; }
void PostAndWait(Dart_Port port, Dart_CObject* object) {
std::unique_lock<std::mutex> lock(mutex);
const bool success = Dart_PostCObject(send_port_, object);
if (!success) FATAL("Failed to send message, invalid port or isolate died");
printf("C : Waiting for result.\n");
while (!notified) {
cv.wait(lock);
}
}
static void HandleResponse(Dart_Port p, Dart_CObject* message) {
if (message->type != Dart_CObject_kArray) {
FATAL("C : Wrong Data: message->type != Dart_CObject_kArray.\n");
}
Dart_CObject** c_response_args = message->value.as_array.values;
Dart_CObject* c_pending_call = c_response_args[0];
Dart_CObject* c_message = c_response_args[1];
printf("C : HandleResponse (call: %" Px ", message: %" Px ").\n",
reinterpret_cast<intptr_t>(c_pending_call),
reinterpret_cast<intptr_t>(c_message));
auto pending_call = reinterpret_cast<PendingCall*>(
c_pending_call->type == Dart_CObject_kInt64
? c_pending_call->value.as_int64
: c_pending_call->value.as_int32);
pending_call->ResolveCall(c_message);
}
private:
static bool NonEmptyBuffer(void** value) { return *value != nullptr; }
void ResolveCall(Dart_CObject* bytes) {
assert(bytes->type == Dart_CObject_kTypedData);
if (bytes->type != Dart_CObject_kTypedData) {
FATAL("C : Wrong Data: bytes->type != Dart_CObject_kTypedData.\n");
}
const intptr_t response_length = bytes->value.as_typed_data.length;
const uint8_t* response_buffer = bytes->value.as_typed_data.values;
printf("C : ResolveCall(length: %" Pd ", buffer: %" Px ").\n",
response_length, reinterpret_cast<intptr_t>(response_buffer));
void* buffer = malloc(response_length);
memmove(buffer, response_buffer, response_length);
*response_buffer_ = buffer;
*response_length_ = response_length;
printf("C : Notify result ready.\n");
notified = true;
cv.notify_one();
}
std::mutex mutex;
std::condition_variable cv;
bool notified = false;
Dart_Port receive_port_;
void** response_buffer_;
size_t* response_length_;
};
// Do a callback to Dart in a blocking way, being interested in the result.
//
// Dart returns `a + 3`.
uint8_t MyCallback1(uint8_t a) {
const char* methodname = "myCallback1";
size_t request_length = sizeof(uint8_t) * 1;
void* request_buffer = malloc(request_length); // FreeFinalizer.
reinterpret_cast<uint8_t*>(request_buffer)[0] = a; // Populate buffer.
void* response_buffer = nullptr;
size_t response_length = 0;
PendingCall pending_call(&response_buffer, &response_length);
Dart_CObject c_send_port;
c_send_port.type = Dart_CObject_kSendPort;
c_send_port.value.as_send_port.id = pending_call.port();
c_send_port.value.as_send_port.origin_id = ILLEGAL_PORT;
Dart_CObject c_pending_call;
c_pending_call.type = Dart_CObject_kInt64;
c_pending_call.value.as_int64 = reinterpret_cast<int64_t>(&pending_call);
Dart_CObject c_method_name;
c_method_name.type = Dart_CObject_kString;
c_method_name.value.as_string = const_cast<char*>(methodname);
Dart_CObject c_request_data;
c_request_data.type = Dart_CObject_kExternalTypedData;
c_request_data.value.as_external_typed_data.type = Dart_TypedData_kUint8;
c_request_data.value.as_external_typed_data.length = request_length;
c_request_data.value.as_external_typed_data.data =
static_cast<uint8_t*>(request_buffer);
c_request_data.value.as_external_typed_data.peer = request_buffer;
c_request_data.value.as_external_typed_data.callback = FreeFinalizer;
Dart_CObject* c_request_arr[] = {&c_send_port, &c_pending_call,
&c_method_name, &c_request_data};
Dart_CObject c_request;
c_request.type = Dart_CObject_kArray;
c_request.value.as_array.values = c_request_arr;
c_request.value.as_array.length =
sizeof(c_request_arr) / sizeof(c_request_arr[0]);
printf("C : Dart_PostCObject(request: %" Px ", call: %" Px ").\n",
reinterpret_cast<intptr_t>(&c_request),
reinterpret_cast<intptr_t>(&c_pending_call));
pending_call.PostAndWait(send_port_, &c_request);
printf("C : Received result.\n");
const intptr_t result = reinterpret_cast<uint8_t*>(response_buffer)[0];
free(response_buffer);
return result;
}
// Do a callback to Dart in a non-blocking way.
//
// Dart sums all numbers posted to it.
void MyCallback2(uint8_t a) {
const char* methodname = "myCallback2";
void* request_buffer = malloc(sizeof(uint8_t) * 1); // FreeFinalizer.
reinterpret_cast<uint8_t*>(request_buffer)[0] = a; // Populate buffer.
const size_t request_length = sizeof(uint8_t) * 1;
Dart_CObject c_send_port;
c_send_port.type = Dart_CObject_kNull;
Dart_CObject c_pending_call;
c_pending_call.type = Dart_CObject_kNull;
Dart_CObject c_method_name;
c_method_name.type = Dart_CObject_kString;
c_method_name.value.as_string = const_cast<char*>(methodname);
Dart_CObject c_request_data;
c_request_data.type = Dart_CObject_kExternalTypedData;
c_request_data.value.as_external_typed_data.type = Dart_TypedData_kUint8;
c_request_data.value.as_external_typed_data.length = request_length;
c_request_data.value.as_external_typed_data.data =
static_cast<uint8_t*>(request_buffer);
c_request_data.value.as_external_typed_data.peer = request_buffer;
c_request_data.value.as_external_typed_data.callback = FreeFinalizer;
Dart_CObject* c_request_arr[] = {&c_send_port, &c_pending_call,
&c_method_name, &c_request_data};
Dart_CObject c_request;
c_request.type = Dart_CObject_kArray;
c_request.value.as_array.values = c_request_arr;
c_request.value.as_array.length =
sizeof(c_request_arr) / sizeof(c_request_arr[0]);
printf("C : Dart_PostCObject(request: %" Px ", call: %" Px ").\n",
reinterpret_cast<intptr_t>(&c_request),
reinterpret_cast<intptr_t>(&c_pending_call));
Dart_PostCObject(send_port_, &c_request);
}
// Simulated work for Thread #1.
//
// Simulates heavy work with sleeps.
void Work1_2() {
printf("C T1: Work1 Start.\n");
SleepOnAnyOS(1);
const intptr_t val1 = 3;
printf("C T1: MyCallback1(%" Pd ").\n", val1);
const intptr_t val2 = MyCallback1(val1); // val2 = 6.
printf("C T1: MyCallback1 returned %" Pd ".\n", val2);
SleepOnAnyOS(1);
const intptr_t val3 = val2 - 1; // val3 = 5.
printf("C T1: MyCallback2(%" Pd ").\n", val3);
MyCallback2(val3); // Post 5 to Dart.
printf("C T1: Work1 Done.\n");
}
// Simulated work for Thread #2.
//
// Simulates lighter work, no sleeps.
void Work2_2() {
printf("C T2: Work2 Start.\n");
const intptr_t val1 = 5;
printf("C T2: MyCallback2(%" Pd ").\n", val1);
MyCallback2(val1); // Post 5 to Dart.
const intptr_t val2 = 1;
printf("C T2: MyCallback1(%" Pd ").\n", val2);
const intptr_t val3 = MyCallback1(val2); // val3 = 4.
printf("C T2: MyCallback1 returned %" Pd ".\n", val3);
printf("C T2: MyCallback2(%" Pd ").\n", val3);
MyCallback2(val3); // Post 4 to Dart.
printf("C T2: Work2 Done.\n");
}
// Simulator that simulates concurrent work with multiple threads.
class SimulateWork2 {
public:
static void StartWorkSimulator() {
running_work_simulator_ = new SimulateWork2();
running_work_simulator_->Start();
}
static void StopWorkSimulator() {
running_work_simulator_->Stop();
delete running_work_simulator_;
running_work_simulator_ = nullptr;
}
private:
static SimulateWork2* running_work_simulator_;
void Start() {
printf("C Da: Starting SimulateWork.\n");
printf("C Da: Starting worker threads.\n");
thread1 = new std::thread(Work1_2);
thread2 = new std::thread(Work2_2);
printf("C Da: Started SimulateWork.\n");
}
void Stop() {
printf("C Da: Stopping SimulateWork.\n");
printf("C Da: Waiting for worker threads to finish.\n");
thread1->join();
thread2->join();
delete thread1;
delete thread2;
printf("C Da: Stopped SimulateWork.\n");
}
std::thread* thread1;
std::thread* thread2;
};
SimulateWork2* SimulateWork2::running_work_simulator_ = 0;
DART_EXPORT void RegisterSendPort(Dart_Port send_port) {
send_port_ = send_port;
}
DART_EXPORT void StartWorkSimulator2() {
SimulateWork2::StartWorkSimulator();
}
DART_EXPORT void StopWorkSimulator2() {
SimulateWork2::StopWorkSimulator();
}
} // namespace dart

View file

@ -7,7 +7,9 @@
// SharedObjects=ffi_test_dynamic_library ffi_test_functions
import 'sample_async_callback.dart' as sample0;
import 'sample_native_port_call.dart' as sample1;
main() {
sample0.main();
sample1.main();
}

View file

@ -0,0 +1,131 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// Sample showing how to do calls from C into Dart through native ports.
//
// This sample does not use FFI callbacks to do the callbacks at all. Instead,
// it sends a message to Dart through native ports, decodes the message in Dart
// does a method call in Dart and sends the result back to C through a native
// port.
//
// The disadvantage of this approach compared to `sample_async_callback.dart`
// is that it requires more boilerplate, because it does not use the automatic
// marshalling of data of the FFI.
//
// The advantage is that finalizers can be used when passing ownership of data
// (buffers) from C to Dart.
import 'dart:ffi';
import 'dart:isolate';
import 'dart:typed_data';
import 'package:expect/expect.dart';
import '../dylib_utils.dart';
var globalResult = 0;
var numCallbacks1 = 0;
var numCallbacks2 = 0;
main() async {
print("Dart = Dart mutator thread executing Dart.");
print("C Da = Dart mutator thread executing C.");
print("C T1 = Some C thread executing C.");
print("C T2 = Some C thread executing C.");
print("C = C T1 or C T2.");
print("Dart: Setup.");
final interactiveCppRequests = ReceivePort()..listen(handleCppRequests);
final int nativePort = interactiveCppRequests.sendPort.nativePort;
registerSendPort(nativePort);
print("Dart: Tell C to start worker threads.");
startWorkSimulator2();
// We need to yield control in order to be able to receive messages.
while (numCallbacks2 < 3) {
print("Dart: Yielding (able to receive messages on port).");
await asyncSleep(500);
}
print("Dart: Received expected number of callbacks.");
Expect.equals(2, numCallbacks1);
Expect.equals(3, numCallbacks2);
Expect.equals(14, globalResult);
print("Dart: Tell C to stop worker threads.");
stopWorkSimulator2();
interactiveCppRequests.close();
print("Dart: Done.");
}
int myCallback1(int a) {
print("Dart: myCallback1($a).");
numCallbacks1++;
return a + 3;
}
void myCallback2(int a) {
print("Dart: myCallback2($a).");
globalResult += a;
numCallbacks2++;
}
class CppRequest {
final SendPort replyPort;
final int pendingCall;
final String method;
final Uint8List data;
factory CppRequest.fromCppMessage(List message) {
return CppRequest._(message[0], message[1], message[2], message[3]);
}
CppRequest._(this.replyPort, this.pendingCall, this.method, this.data);
String toString() => 'CppRequest(method: $method, ${data.length} bytes)';
}
class CppResponse {
final int pendingCall;
final Uint8List data;
CppResponse(this.pendingCall, this.data);
List toCppMessage() => List.from([pendingCall, data], growable: false);
String toString() => 'CppResponse(message: ${data.length})';
}
void handleCppRequests(dynamic message) {
final cppRequest = CppRequest.fromCppMessage(message);
print('Dart: Got message: $cppRequest');
if (cppRequest.method == 'myCallback1') {
// Use the data in any way you like. Here we just take the first byte as
// the argument to the function.
final int argument = cppRequest.data[0];
final int result = myCallback1(argument);
final cppResponse =
CppResponse(cppRequest.pendingCall, Uint8List.fromList([result]));
print('Dart: Responding: $cppResponse');
cppRequest.replyPort.send(cppResponse.toCppMessage());
} else if (cppRequest.method == 'myCallback2') {
final int argument = cppRequest.data[0];
myCallback2(argument);
}
}
final dl = dlopenPlatformSpecific("ffi_test_functions");
final registerSendPort = dl.lookupFunction<Void Function(Int64 sendPort),
void Function(int sendPort)>('RegisterSendPort');
final startWorkSimulator2 =
dl.lookupFunction<Void Function(), void Function()>('StartWorkSimulator2');
final stopWorkSimulator2 =
dl.lookupFunction<Void Function(), void Function()>('StopWorkSimulator2');
Future asyncSleep(int ms) {
return new Future.delayed(Duration(milliseconds: ms), () => true);
}