dart:io | Enable multithreaded secure networking encryption.

BUG=
R=ajohnsen@google.com, sgjesse@google.com

Review URL: https://codereview.chromium.org//16858011

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@24233 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
whesse@google.com 2013-06-20 15:06:43 +00:00
parent ff7d45d16f
commit 91deb19914
7 changed files with 677 additions and 371 deletions

View file

@ -1180,8 +1180,8 @@ static CObject* FileStatRequest(const CObjectArray& request) {
static void FileService(Dart_Port dest_port_id,
Dart_Port reply_port_id,
Dart_CObject* message) {
Dart_Port reply_port_id,
Dart_CObject* message) {
CObject* response = CObject::IllegalArgumentError();
CObjectArray request(message);
if (message->type == Dart_CObject_kArray) {

View file

@ -61,10 +61,11 @@ namespace bin {
V(SecureSocket_Handshake, 1) \
V(SecureSocket_Init, 1) \
V(SecureSocket_PeerCertificate, 1) \
V(SecureSocket_ProcessBuffer, 2) \
V(SecureSocket_RegisterBadCertificateCallback, 2) \
V(SecureSocket_RegisterHandshakeCompleteCallback, 2) \
V(SecureSocket_InitializeLibrary, 3) \
V(SecureSocket_NewServicePort, 0) \
V(SecureSocket_FilterPointer, 1) \
V(StringToSystemEncoding, 1) \
V(SystemEncodingToString, 1)

View file

@ -41,6 +41,13 @@ dart::Mutex SSLFilter::mutex_; // To protect library initialization.
// be null if only secure client sockets are used.
const char* SSLFilter::password_ = NULL;
// Forward declaration.
static void ProcessFilter(Dart_Port dest_port_id,
Dart_Port reply_port_id,
Dart_CObject* message);
NativeService SSLFilter::filter_service_("FilterService", ProcessFilter, 16);
static const int kSSLFilterNativeFieldIndex = 0;
static SSLFilter* GetFilter(Dart_NativeArguments args) {
@ -177,22 +184,6 @@ void FUNCTION_NAME(SecureSocket_RegisterBadCertificateCallback)(
}
void FUNCTION_NAME(SecureSocket_ProcessBuffer)(Dart_NativeArguments args) {
Dart_EnterScope();
Dart_Handle buffer_id_object = ThrowIfError(Dart_GetNativeArgument(args, 1));
int64_t buffer_id = DartUtils::GetIntegerValue(buffer_id_object);
if (buffer_id < 0 || buffer_id >= SSLFilter::kNumBuffers) {
Dart_ThrowException(DartUtils::NewDartArgumentError(
"Illegal argument to ProcessBuffer"));
}
intptr_t bytes_read =
GetFilter(args)->ProcessBuffer(static_cast<int>(buffer_id));
Dart_SetReturnValue(args, Dart_NewInteger(bytes_read));
Dart_ExitScope();
}
void FUNCTION_NAME(SecureSocket_InitializeLibrary)
(Dart_NativeArguments args) {
Dart_EnterScope();
@ -247,6 +238,131 @@ void FUNCTION_NAME(SecureSocket_PeerCertificate)
}
void FUNCTION_NAME(SecureSocket_FilterPointer)(Dart_NativeArguments args) {
Dart_EnterScope();
intptr_t filter_pointer = reinterpret_cast<intptr_t>(GetFilter(args));
Dart_SetReturnValue(args, Dart_NewInteger(filter_pointer));
Dart_ExitScope();
}
/**
* Pushes data through the SSL filter, reading and writing from circular
* buffers shared with Dart.
*
* The Dart _SecureFilterImpl class contains 4 ExternalByteArrays used to
* pass encrypted and plaintext data to and from the C++ SSLFilter object.
*
* ProcessFilter is called with a CObject array containing the pointer to
* the SSLFilter, encoded as an int, and the start and end positions of the
* valid data in the four circular buffers. The function only reads from
* the valid data area of the input buffers, and only writes to the free
* area of the output buffers. The function returns the new start and end
* positions in the buffers, but it only updates start for input buffers, and
* end for output buffers. Therefore, the Dart thread can simultaneously
* write to the free space and end pointer of input buffers, and read from
* the data space of output buffers, and modify the start pointer.
*
* When ProcessFilter returns, the Dart thread is responsible for combining
* the updated pointers from Dart and C++, to make the new valid state of
* the circular buffer.
*/
static void ProcessFilter(Dart_Port dest_port_id,
Dart_Port reply_port_id,
Dart_CObject* message) {
CObjectArray args(message);
CObjectIntptr filter_object(args[0]);
SSLFilter* filter = reinterpret_cast<SSLFilter*>(filter_object.Value());
bool in_handshake = CObjectBool(args[1]).Value();
int starts[SSLFilter::kNumBuffers];
int ends[SSLFilter::kNumBuffers];
for (int i = 0; i < SSLFilter::kNumBuffers; ++i) {
starts[i] = CObjectInt32(args[2 * i + 2]).Value();
ends[i] = CObjectInt32(args[2 * i + 3]).Value();
}
filter->ProcessAllBuffers(starts, ends, in_handshake);
for (int i = 0; i < SSLFilter::kNumBuffers; ++i) {
args[2 * i + 2]->AsApiCObject()->value.as_int32 = starts[i];
args[2 * i + 3]->AsApiCObject()->value.as_int32 = ends[i];
}
Dart_PostCObject(reply_port_id, args.AsApiCObject());
}
void SSLFilter::ProcessAllBuffers(int starts[kNumBuffers],
int ends[kNumBuffers],
bool in_handshake) {
for (int i = 0; i < kNumBuffers; ++i) {
if (in_handshake && (i == kReadPlaintext || i == kWritePlaintext)) continue;
int start = starts[i];
int end = ends[i];
int size = isBufferEncrypted(i) ? encrypted_buffer_size_ : buffer_size_;
switch (i) {
case kReadPlaintext:
case kWriteEncrypted:
// Write data to the circular buffer's free space. If the buffer
// is full, neither if statement is executed and nothing happens.
if (start <= end) {
// If the free space may be split into two segments,
// then the first is [end, size), unless start == 0.
// Then, since the last free byte is at position start - 2,
// the interval is [end, size - 1).
int buffer_end = (start == 0) ? size - 1 : size;
int bytes = (i == kReadPlaintext) ?
ProcessReadPlaintextBuffer(end, buffer_end) :
ProcessWriteEncryptedBuffer(end, buffer_end);
end += bytes;
ASSERT(end <= size);
if (end == size) end = 0;
}
if (start > end + 1) {
int bytes = (i == kReadPlaintext) ?
ProcessReadPlaintextBuffer(end, start - 1) :
ProcessWriteEncryptedBuffer(end, start - 1);
end += bytes;
ASSERT(end < start);
}
ends[i] = end;
break;
case kReadEncrypted:
// Read data from circular buffer.
if (end < start) {
// Data may be split into two segments. In this case,
// the first is [start, size).
int bytes = ProcessReadEncryptedBuffer(start, size);
start += bytes;
ASSERT(start <= size);
if (start == size) start = 0;
}
if (start < end) {
int bytes = ProcessReadEncryptedBuffer(start, end);
start += bytes;
ASSERT(start <= end);
}
starts[i] = start;
break;
case kWritePlaintext:
if (end < start) {
// Data is split into two segments, [start, size) and [0, end).
int bytes = ProcessWritePlaintextBuffer(start, size, 0, end);
start += bytes;
if (start >= size) start -= size;
} else {
int bytes = ProcessWritePlaintextBuffer(start, end, 0, 0);
start += bytes;
ASSERT(start <= end);
}
starts[i] = start;
break;
default:
UNREACHABLE();
}
}
}
static Dart_Handle X509FromCertificate(CERTCertificate* certificate) {
PRTime start_validity;
PRTime end_validity;
@ -307,15 +423,13 @@ void SSLFilter::InitializeBuffers(Dart_Handle dart_this) {
// Create SSLFilter buffers as ExternalUint8Array objects.
Dart_Handle dart_buffers_object = ThrowIfError(
Dart_GetField(dart_this, DartUtils::NewString("buffers")));
Dart_Handle dart_buffer_object =
Dart_ListGetAt(dart_buffers_object, kReadPlaintext);
Dart_Handle external_buffer_class =
Dart_InstanceGetClass(dart_buffer_object);
Dart_Handle secure_filter_impl_class =
Dart_InstanceGetClass(dart_this);
Dart_Handle dart_buffer_size = ThrowIfError(
Dart_GetField(external_buffer_class, DartUtils::NewString("SIZE")));
Dart_GetField(secure_filter_impl_class, DartUtils::NewString("SIZE")));
int64_t buffer_size = DartUtils::GetIntegerValue(dart_buffer_size);
Dart_Handle dart_encrypted_buffer_size = ThrowIfError(
Dart_GetField(external_buffer_class,
Dart_GetField(secure_filter_impl_class,
DartUtils::NewString("ENCRYPTED_SIZE")));
int64_t encrypted_buffer_size =
DartUtils::GetIntegerValue(dart_encrypted_buffer_size);
@ -333,7 +447,7 @@ void SSLFilter::InitializeBuffers(Dart_Handle dart_this) {
Dart_Handle data_identifier = DartUtils::NewString("data");
for (int i = 0; i < kNumBuffers; ++i) {
int size = isEncrypted(i) ? encrypted_buffer_size_ : buffer_size_;
int size = isBufferEncrypted(i) ? encrypted_buffer_size_ : buffer_size_;
dart_buffer_objects_[i] =
Dart_NewPersistentHandle(Dart_ListGetAt(dart_buffers_object, i));
ASSERT(dart_buffer_objects_[i] != NULL);
@ -494,6 +608,11 @@ void SSLFilter::Connect(const char* host_name,
ThrowPRException("Failed SSL_ImportFD call");
}
SSLVersionRange vrange;
vrange.min = SSL_LIBRARY_VERSION_3_0;
vrange.max = SSL_LIBRARY_VERSION_TLS_1_1;
SSL_VersionRangeSet(filter_, &vrange);
SECStatus status;
if (is_server) {
PK11_SetPasswordFunc(PasswordCallback);
@ -648,106 +767,112 @@ void SSLFilter::Destroy() {
}
intptr_t SSLFilter::ProcessBuffer(int buffer_index) {
int size = isEncrypted(buffer_index) ? encrypted_buffer_size_ : buffer_size_;
Dart_Handle buffer_object =
Dart_HandleFromPersistent(dart_buffer_objects_[buffer_index]);
Dart_Handle start_object = ThrowIfError(
Dart_GetField(buffer_object, Dart_HandleFromPersistent(string_start_)));
Dart_Handle length_object = ThrowIfError(
Dart_GetField(buffer_object, Dart_HandleFromPersistent(string_length_)));
int64_t unsafe_start = DartUtils::GetIntegerValue(start_object);
int64_t unsafe_length = DartUtils::GetIntegerValue(length_object);
ASSERT(unsafe_start >= 0);
ASSERT(unsafe_start < size);
ASSERT(unsafe_length >= 0);
ASSERT(unsafe_length <= size);
int start = static_cast<int>(unsafe_start);
int length = static_cast<int>(unsafe_length);
uint8_t* buffer = buffers_[buffer_index];
intptr_t SSLFilter::ProcessReadPlaintextBuffer(int start, int end) {
int length = end - start;
int bytes_processed = 0;
switch (buffer_index) {
case kReadPlaintext: {
int bytes_free = size - start - length;
bytes_processed = PR_Read(filter_,
buffer + start + length,
bytes_free);
if (bytes_processed < 0) {
ASSERT(bytes_processed == -1);
// TODO(whesse): Handle unexpected errors here.
PRErrorCode pr_error = PR_GetError();
if (PR_WOULD_BLOCK_ERROR != pr_error) {
ThrowPRException("Error reading plaintext from SSLFilter");
}
bytes_processed = 0;
if (length > 0) {
bytes_processed = PR_Read(filter_,
buffers_[kReadPlaintext] + start,
length);
if (bytes_processed < 0) {
ASSERT(bytes_processed == -1);
PRErrorCode pr_error = PR_GetError();
if (PR_WOULD_BLOCK_ERROR != pr_error) {
// TODO(11383): Handle unexpected errors here.
FATAL("Error reading plaintext from SSLFilter");
}
break;
}
case kWriteEncrypted: {
const uint8_t* buf1;
const uint8_t* buf2;
unsigned int len1;
unsigned int len2;
int bytes_free = size - start - length;
memio_Private* secret = memio_GetSecret(filter_);
memio_GetWriteParams(secret, &buf1, &len1, &buf2, &len2);
int bytes_to_send =
dart::Utils::Minimum(len1, static_cast<unsigned>(bytes_free));
if (bytes_to_send > 0) {
memmove(buffer + start + length, buf1, bytes_to_send);
bytes_processed = bytes_to_send;
}
bytes_to_send = dart::Utils::Minimum(len2,
static_cast<unsigned>(bytes_free - bytes_processed));
if (bytes_to_send > 0) {
memmove(buffer + start + length + bytes_processed, buf2,
bytes_to_send);
bytes_processed += bytes_to_send;
}
if (bytes_processed > 0) {
memio_PutWriteResult(secret, bytes_processed);
}
break;
}
case kReadEncrypted: {
if (length > 0) {
bytes_processed = length;
memio_Private* secret = memio_GetSecret(filter_);
uint8_t* filter_buf;
int free_bytes = memio_GetReadParams(secret, &filter_buf);
if (free_bytes < bytes_processed) bytes_processed = free_bytes;
memmove(filter_buf,
buffer + start,
bytes_processed);
memio_PutReadResult(secret, bytes_processed);
}
break;
}
case kWritePlaintext: {
if (length > 0) {
bytes_processed = PR_Write(filter_,
buffer + start,
length);
}
if (bytes_processed < 0) {
ASSERT(bytes_processed == -1);
// TODO(whesse): Handle unexpected errors here.
PRErrorCode pr_error = PR_GetError();
if (PR_WOULD_BLOCK_ERROR != pr_error) {
ThrowPRException("Error reading plaintext from SSLFilter");
}
bytes_processed = 0;
}
break;
bytes_processed = 0;
}
}
return bytes_processed;
}
intptr_t SSLFilter::ProcessWritePlaintextBuffer(int start1, int end1,
int start2, int end2) {
PRIOVec ranges[2];
uint8_t* buffer = buffers_[kWritePlaintext];
ranges[0].iov_base = reinterpret_cast<char*>(buffer + start1);
ranges[0].iov_len = end1 - start1;
ranges[1].iov_base = reinterpret_cast<char*>(buffer + start2);
ranges[1].iov_len = end2 - start2;
int bytes_processed = PR_Writev(filter_, ranges, 2, PR_INTERVAL_NO_TIMEOUT);
if (bytes_processed < 0) {
ASSERT(bytes_processed == -1);
PRErrorCode pr_error = PR_GetError();
if (PR_WOULD_BLOCK_ERROR != pr_error) {
// TODO(11383): Handle unexpected errors here.
FATAL("Error reading plaintext from SSLFilter");
}
bytes_processed = 0;
}
return bytes_processed;
}
intptr_t SSLFilter::ProcessReadEncryptedBuffer(int start, int end) {
int length = end - start;
int bytes_processed = 0;
if (length > 0) {
memio_Private* secret = memio_GetSecret(filter_);
uint8_t* filter_buf;
int free_bytes = memio_GetReadParams(secret, &filter_buf);
bytes_processed = dart::Utils::Minimum(length, free_bytes);
memmove(filter_buf, buffers_[kReadEncrypted] + start, bytes_processed);
memio_PutReadResult(secret, bytes_processed);
}
return bytes_processed;
}
intptr_t SSLFilter::ProcessWriteEncryptedBuffer(int start, int end) {
int length = end - start;
int bytes_processed = 0;
if (length > 0) {
uint8_t* buffer = buffers_[kWriteEncrypted];
const uint8_t* buf1;
const uint8_t* buf2;
unsigned int len1;
unsigned int len2;
memio_Private* secret = memio_GetSecret(filter_);
memio_GetWriteParams(secret, &buf1, &len1, &buf2, &len2);
int bytes_to_send =
dart::Utils::Minimum(len1, static_cast<unsigned>(length));
if (bytes_to_send > 0) {
memmove(buffer + start, buf1, bytes_to_send);
bytes_processed = bytes_to_send;
}
bytes_to_send = dart::Utils::Minimum(len2,
static_cast<unsigned>(length - bytes_processed));
if (bytes_to_send > 0) {
memmove(buffer + start + bytes_processed, buf2, bytes_to_send);
bytes_processed += bytes_to_send;
}
if (bytes_processed > 0) {
memio_PutWriteResult(secret, bytes_processed);
}
}
return bytes_processed;
}
Dart_Port SSLFilter::GetServicePort() {
return filter_service_.GetServicePort();
}
void FUNCTION_NAME(SecureSocket_NewServicePort)(Dart_NativeArguments args) {
Dart_EnterScope();
Dart_SetReturnValue(args, Dart_Null());
Dart_Port service_port = SSLFilter::GetServicePort();
if (service_port != ILLEGAL_PORT) {
// Return a send port for the service port.
Dart_Handle send_port = Dart_NewSendPort(service_port);
Dart_SetReturnValue(args, send_port);
}
Dart_ExitScope();
}
} // namespace bin
} // namespace dart

View file

@ -22,6 +22,7 @@
#include "bin/dartutils.h"
#include "bin/socket.h"
#include "bin/utils.h"
#include "bin/native_service.h"
namespace dart {
namespace bin {
@ -88,18 +89,27 @@ class SSLFilter {
Dart_Handle bad_certificate_callback() {
return Dart_HandleFromPersistent(bad_certificate_callback_);
}
intptr_t ProcessReadPlaintextBuffer(int start, int end);
intptr_t ProcessWritePlaintextBuffer(int start1, int end1,
int start2, int end2);
intptr_t ProcessReadEncryptedBuffer(int start, int end);
intptr_t ProcessWriteEncryptedBuffer(int start, int end);
void ProcessAllBuffers(int starts[kNumBuffers],
int ends[kNumBuffers],
bool in_handshake);
Dart_Handle PeerCertificate();
static void InitializeLibrary(const char* certificate_database,
const char* password,
bool use_builtin_root_certificates,
bool report_duplicate_initialization = true);
intptr_t ProcessBuffer(int bufferIndex);
Dart_Handle PeerCertificate();
static Dart_Port GetServicePort();
private:
static const int kMemioBufferSize = 20 * KB;
static bool library_initialized_;
static const char* password_;
static dart::Mutex mutex_; // To protect library initialization.
static NativeService filter_service_;
uint8_t* buffers_[kNumBuffers];
int buffer_size_;
@ -114,7 +124,7 @@ class SSLFilter {
char* client_certificate_name_;
PRFileDesc* filter_;
static bool isEncrypted(int i) {
static bool isBufferEncrypted(int i) {
return static_cast<BufferIndex>(i) >= kFirstEncrypted;
}
void InitializeBuffers(Dart_Handle dart_this);

View file

@ -15,6 +15,9 @@ patch class SecureSocket {
patch class _SecureFilter {
/* patch */ factory _SecureFilter() => new _SecureFilterImpl();
/* patch */ static SendPort _newServicePort()
native "SecureSocket_NewServicePort";
}
@ -49,10 +52,17 @@ class _SecureSocket extends _Socket implements SecureSocket {
class _SecureFilterImpl
extends NativeFieldWrapperClass1
implements _SecureFilter {
// Performance is improved if a full buffer of plaintext fits
// in the encrypted buffer, when encrypted.
static final int SIZE = 8 * 1024;
static final int ENCRYPTED_SIZE = 10 * 1024;
_SecureFilterImpl() {
buffers = new List<_ExternalBuffer>(_RawSecureSocket.NUM_BUFFERS);
for (int i = 0; i < _RawSecureSocket.NUM_BUFFERS; ++i) {
buffers[i] = new _ExternalBuffer();
buffers[i] = new _ExternalBuffer(_RawSecureSocket._isBufferEncrypted(i) ?
ENCRYPTED_SIZE :
SIZE);
}
}
@ -78,13 +88,14 @@ class _SecureFilterImpl
X509Certificate get peerCertificate native "SecureSocket_PeerCertificate";
int processBuffer(int bufferIndex) native "SecureSocket_ProcessBuffer";
void registerBadCertificateCallback(Function callback)
native "SecureSocket_RegisterBadCertificateCallback";
void registerHandshakeCompleteCallback(Function handshakeCompleteHandler)
native "SecureSocket_RegisterHandshakeCompleteCallback";
// This is a security issue, as it exposes a raw pointer to Dart code.
int _pointer() native "SecureSocket_FilterPointer";
List<_ExternalBuffer> buffers;
}

View file

@ -272,6 +272,9 @@ patch class _SecureFilter {
patch factory _SecureFilter() {
throw new UnsupportedError("_SecureFilter._SecureFilter");
}
patch static SendPort _newServicePort() {
throw new UnsupportedError("_SecureFilter._newServicePort");
}
}
patch class _StdIOUtils {

View file

@ -358,10 +358,23 @@ class X509Certificate {
}
class _FilterStatus {
bool progress = false; // The filter read or wrote data to the buffers.
bool readEmpty = true; // The read buffers and decryption filter are empty.
bool writeEmpty = true; // The write buffers and encryption filter are empty.
// These are set if a buffer changes state from empty or full.
bool readPlaintextNoLongerEmpty = false;
bool writePlaintextNoLongerFull = false;
bool readEncryptedNoLongerFull = false;
bool writeEncryptedNoLongerEmpty = false;
_FilterStatus();
}
class _RawSecureSocket extends Stream<RawSocketEvent>
implements RawSecureSocket {
// Status states
static final int NOT_CONNECTED = 200;
static final int HANDSHAKE = 201;
static final int CONNECTED = 202;
static final int CLOSED = 203;
@ -374,6 +387,9 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
static final int WRITE_ENCRYPTED = 3;
static final int NUM_BUFFERS = 4;
// Is a buffer identifier for an encrypted buffer?
static bool _isBufferEncrypted(int identifier) => identifier >= READ_ENCRYPTED;
RawSocket _socket;
final Completer<_RawSecureSocket> _handshakeComplete =
new Completer<_RawSecureSocket>();
@ -390,17 +406,23 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
final bool sendClientCertificate;
final Function onBadCertificate;
var _status = NOT_CONNECTED;
var _status = HANDSHAKE;
bool _writeEventsEnabled = true;
bool _readEventsEnabled = true;
int _pauseCount = 0;
bool _pendingReadEvent = false;
bool _socketClosedRead = false; // The network socket is closed for reading.
bool _socketClosedWrite = false; // The network socket is closed for writing.
bool _closedRead = false; // The secure socket has fired an onClosed event.
bool _closedWrite = false; // The secure socket has been closed for writing.
bool _filterReadEmpty = true; // There is no buffered data to read.
bool _filterWriteEmpty = true; // There is no buffered data to be written.
_FilterStatus _filterStatus = new _FilterStatus();
bool _connectPending = false;
bool _filterPending = false;
bool _filterActive = false;
_SecureFilter _secureFilter = new _SecureFilter();
int _filterPointer;
SendPort _filterService;
static Future<_RawSecureSocket> connect(
host,
@ -464,7 +486,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
// errors will be reported through the future or the stream.
_verifyFields();
_secureFilter.init();
if (_bufferedData != null) _readFromBuffered();
_filterPointer = _secureFilter._pointer();
_secureFilter.registerHandshakeCompleteCallback(
_secureHandshakeCompleteHandler);
if (onBadCertificate != null) {
@ -501,7 +523,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
requireClientCertificate,
requireClientCertificate,
sendClientCertificate);
_status = HANDSHAKE;
_secureHandshake();
})
.catchError((error) {
@ -514,10 +535,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
{void onError(error),
void onDone(),
bool cancelOnError}) {
if (_writeEventsEnabled) {
_writeEventsEnabled = false;
_controller.add(RawSocketEvent.WRITE);
}
_sendWriteEvent();
return _stream.listen(onData,
onError: onError,
onDone: onDone,
@ -559,7 +577,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
int available() {
if (_status != CONNECTED) return 0;
_readEncryptedData();
return _secureFilter.buffers[READ_PLAINTEXT].length;
}
@ -575,7 +592,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
}
_socketClosedWrite = true;
_socketClosedRead = true;
if (_secureFilter != null) {
if (!_filterActive && _secureFilter != null) {
_secureFilter.destroy();
_secureFilter = null;
}
@ -590,8 +607,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
if (direction == SocketDirection.SEND ||
direction == SocketDirection.BOTH) {
_closedWrite = true;
_writeEncryptedData();
if (_filterWriteEmpty) {
if (_filterStatus.writeEmpty) {
_socket.shutdown(SocketDirection.SEND);
_socketClosedWrite = true;
if (_closedRead) {
@ -613,99 +629,60 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
bool get writeEventsEnabled => _writeEventsEnabled;
void set writeEventsEnabled(bool value) {
if (value &&
_controller.hasListener &&
_secureFilter != null &&
_secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
Timer.run(() => _controller.add(RawSocketEvent.WRITE));
} else {
_writeEventsEnabled = value;
_writeEventsEnabled = value;
if (value) {
Timer.run(() => _sendWriteEvent());
}
}
bool get readEventsEnabled => _readEventsEnabled;
void set readEventsEnabled(bool value) {
_readEventsEnabled = value;
if (value &&
((_secureFilter != null &&
_secureFilter.buffers[READ_PLAINTEXT].length > 0) ||
_socketClosedRead)) {
// We might not have no underlying socket to set off read events.
Timer.run(_readHandler);
}
_readEventsEnabled = value;
_scheduleReadEvent();
}
List<int> read([int len]) {
List<int> read([int length]) {
if (length != null && (length is! int || length < 0)) {
throw new ArgumentError(
"Invalid length parameter in SecureSocket.read (length: $length)");
}
if (_closedRead) {
throw new SocketException("Reading from a closed socket");
}
if (_status != CONNECTED) {
return null;
}
var buffer = _secureFilter.buffers[READ_PLAINTEXT];
_readEncryptedData();
int toRead = buffer.length;
if (len != null) {
if (len is! int || len < 0) {
throw new ArgumentError(
"Invalid len parameter in SecureSocket.read (len: $len)");
}
if (len < toRead) {
toRead = len;
}
}
List<int> result = (toRead == 0) ? null :
buffer.data.sublist(buffer.start, buffer.start + toRead);
buffer.advanceStart(toRead);
// Set up a read event if the filter still has data.
if (!_filterReadEmpty) {
Timer.run(_readHandler);
}
if (_socketClosedRead) { // An onClose event is pending.
// _closedRead is false, since we are in a read call.
if (!_filterReadEmpty) {
// _filterReadEmpty may be out of date since read empties
// the plaintext buffer after calling _readEncryptedData.
// TODO(whesse): Fix this as part of fixing read.
_readEncryptedData();
}
if (_filterReadEmpty) {
// This can't be an else clause: the value of _filterReadEmpty changes.
// This must be asynchronous, because we are in a read call.
Timer.run(_closeHandler);
}
}
var result = _secureFilter.buffers[READ_PLAINTEXT].read(length);
_scheduleFilter();
return result;
}
// Write the data to the socket, and flush it as much as possible
// until it would block. If the write would block, _writeEncryptedData sets
// up handlers to flush the pipeline when possible.
// Write the data to the socket, and schedule the filter to encrypt it.
int write(List<int> data, [int offset, int bytes]) {
if (bytes != null && (bytes is! int || bytes < 0)) {
throw new ArgumentError(
"Invalid bytes parameter in SecureSocket.read (bytes: $bytes)");
}
if (offset != null && (offset is! int || offset < 0)) {
throw new ArgumentError(
"Invalid offset parameter in SecureSocket.read (offset: $offset)");
}
if (_closedWrite) {
_controller.addError(new SocketException("Writing to a closed socket"));
return 0;
}
if (_status != CONNECTED) return 0;
if (offset == null) offset = 0;
if (bytes == null) bytes = data.length - offset;
var buffer = _secureFilter.buffers[WRITE_PLAINTEXT];
if (bytes > buffer.free) {
bytes = buffer.free;
int written =
_secureFilter.buffers[WRITE_PLAINTEXT].write(data, offset, bytes);
if (written > 0) {
_filterStatus.writeEmpty = false;
}
if (bytes > 0) {
int startIndex = buffer.start + buffer.length;
buffer.data.setRange(startIndex, startIndex + bytes, data, offset);
buffer.length += bytes;
}
_writeEncryptedData(); // Tries to flush all pipeline stages.
return bytes;
_scheduleFilter();
return written;
}
X509Certificate get peerCertificate => _secureFilter.peerCertificate;
@ -715,28 +692,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
return _socket.setOption(option, enabled);
}
void _writeHandler() {
if (_status == CLOSED) return;
_writeEncryptedData();
if (_filterWriteEmpty && _closedWrite && !_socketClosedWrite) {
// Close _socket for write, by calling shutdown(), to avoid cloning the
// socket closing code in shutdown().
shutdown(SocketDirection.SEND);
}
if (_status == HANDSHAKE) {
try {
_secureHandshake();
} catch (e) { _reportError(e, "RawSecureSocket error"); }
} else if (_status == CONNECTED &&
_controller.hasListener &&
_writeEventsEnabled &&
_secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
// Reset the one-shot handler.
_writeEventsEnabled = false;
_controller.add(RawSocketEvent.WRITE);
}
}
void _eventDispatcher(RawSocketEvent event) {
if (event == RawSocketEvent.READ) {
_readHandler();
@ -747,56 +702,18 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
}
}
void _readFromBuffered() {
assert(_bufferedData != null);
var encrypted = _secureFilter.buffers[READ_ENCRYPTED];
var bytes = _bufferedData.length - _bufferedDataIndex;
int startIndex = encrypted.start + encrypted.length;
encrypted.data.setRange(startIndex,
startIndex + bytes,
_bufferedData,
_bufferedDataIndex);
encrypted.length += bytes;
_bufferedDataIndex += bytes;
if (_bufferedData.length == _bufferedDataIndex) {
_bufferedData = null;
}
void _readHandler() {
_readSocket();
_scheduleFilter();
}
void _readHandler() {
if (_status == CLOSED) {
return;
} else if (_status == HANDSHAKE) {
try {
_secureHandshake();
if (_status != HANDSHAKE) _readHandler();
} catch (e) { _reportError(e, "RawSecureSocket error"); }
} else {
if (_status != CONNECTED) {
// Cannot happen.
throw new SocketException("Internal SocketIO Error");
}
try {
_readEncryptedData();
} catch (e) { _reportError(e, "RawSecureSocket error"); }
if (!_filterReadEmpty) {
if (_readEventsEnabled) {
if (_secureFilter.buffers[READ_PLAINTEXT].length > 0) {
_controller.add(RawSocketEvent.READ);
}
if (_socketClosedRead) {
// Keep firing read events until we are paused or buffer is empty.
Timer.run(_readHandler);
}
}
} else if (_socketClosedRead) {
_closeHandler();
}
}
void _writeHandler() {
_writeSocket();
_scheduleFilter();
}
void _doneHandler() {
if (_filterReadEmpty) {
if (_filterStatus.readEmpty) {
_close();
}
}
@ -826,38 +743,59 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
if (_status == CONNECTED) {
if (_closedRead) return;
_socketClosedRead = true;
if (_filterReadEmpty) {
if (_filterStatus.readEmpty) {
_closedRead = true;
_controller.add(RawSocketEvent.READ_CLOSED);
if (_socketClosedWrite) {
_close();
}
} else {
_scheduleFilter();
}
} else if (_status == HANDSHAKE) {
_socketClosedRead = true;
if (_filterStatus.readEmpty) {
_reportError(
new SocketException('Connection terminated during handshake'),
'handshake error');
'RawSecureSocket error');
} else {
_secureHandshake();
}
}
}
void _secureHandshake() {
_readEncryptedData();
_secureFilter.handshake();
_writeEncryptedData();
try {
_secureFilter.handshake();
_filterStatus.writeEmpty = false;
_readSocket();
_writeSocket();
_scheduleFilter();
} catch (e) {
_reportError(e, "RawSecureSocket error");
}
}
void _secureHandshakeCompleteHandler() {
_status = CONNECTED;
if (_connectPending) {
_connectPending = false;
// If we complete the future synchronously, user code will run here,
// and modify the state of the RawSecureSocket. For example, it
// could close the socket, and set _filter to null.
// We don't want user code to run synchronously in this callback.
Timer.run(() => _handshakeComplete.complete(this));
}
}
void _onPauseStateChange() {
if (_controller.isPaused) {
_pauseCount++;
} else {
_pauseCount--;
if (_pauseCount == 0) {
_scheduleReadEvent();
_sendWriteEvent(); // Can send event synchronously.
}
}
if (!_socketClosedRead || !_socketClosedWrite) {
if (_controller.isPaused) {
_socketSubscription.pause();
@ -873,120 +811,337 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
}
}
void _readEncryptedData() {
// Read from the socket, and push it through the filter as far as
// possible.
var encrypted = _secureFilter.buffers[READ_ENCRYPTED];
var plaintext = _secureFilter.buffers[READ_PLAINTEXT];
bool progress = true;
while (progress) {
progress = false;
// Do not try to read plaintext from the filter while handshaking.
if ((_status == CONNECTED) && plaintext.free > 0) {
int bytes = _secureFilter.processBuffer(READ_PLAINTEXT);
if (bytes > 0) {
plaintext.length += bytes;
progress = true;
}
}
if (encrypted.length > 0) {
int bytes = _secureFilter.processBuffer(READ_ENCRYPTED);
if (bytes > 0) {
encrypted.advanceStart(bytes);
progress = true;
}
}
if (!_socketClosedRead && encrypted.free > 0) {
if (_bufferedData != null) {
_readFromBuffered();
progress = true;
} else {
List<int> data = _socket.read(encrypted.free);
if (data != null) {
int bytes = data.length;
int startIndex = encrypted.start + encrypted.length;
encrypted.data.setRange(startIndex, startIndex + bytes, data);
encrypted.length += bytes;
progress = true;
}
}
}
}
// If there is any data in any stages of the filter, there should
// be data in the plaintext buffer after this process.
// TODO(whesse): Verify that this is true, and there can be no
// partial encrypted block stuck in the secureFilter.
_filterReadEmpty = (plaintext.length == 0);
void _scheduleFilter() {
_filterPending = true;
_tryFilter();
}
void _writeEncryptedData() {
if (_socketClosedWrite) return;
var encrypted = _secureFilter.buffers[WRITE_ENCRYPTED];
var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT];
while (true) {
if (encrypted.length > 0) {
// Write from the filter to the socket.
int bytes = _socket.write(encrypted.data,
encrypted.start,
encrypted.length);
encrypted.advanceStart(bytes);
if (encrypted.length > 0) {
// The socket has blocked while we have data to write.
// We must be notified when it becomes unblocked.
_socket.writeEventsEnabled = true;
_filterWriteEmpty = false;
break;
void _tryFilter() {
if (_status == CLOSED) return;
if (_filterPending && !_filterActive) {
_filterActive = true;
_filterPending = false;
_pushAllFilterStages().then((status) {
_filterStatus = status;
_filterActive = false;
if (_status == CLOSED) {
_secureFilter.destroy();
_secureFilter = null;
return;
}
} else {
var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT];
if (plaintext.length > 0) {
int plaintext_bytes = _secureFilter.processBuffer(WRITE_PLAINTEXT);
plaintext.advanceStart(plaintext_bytes);
if (_filterStatus.writeEmpty && _closedWrite && !_socketClosedWrite) {
// Checks for and handles all cases of partially closed sockets.
shutdown(SocketDirection.SEND);
if (_status == CLOSED) return;
}
int bytes = _secureFilter.processBuffer(WRITE_ENCRYPTED);
if (bytes <= 0) {
// We know the WRITE_ENCRYPTED buffer is empty, and the
// filter wrote zero bytes to it, so the filter must be empty.
// Also, the WRITE_PLAINTEXT buffer must have been empty, or
// it would have written to the filter.
// TODO(whesse): Verify that the filter works this way.
_filterWriteEmpty = true;
break;
if (_filterStatus.readEmpty && _socketClosedRead && !_closedRead) {
if (_status == HANDSHAKE) {
_secureFilter.handshake();
if (_status == HANDSHAKE) {
_reportError(
new SocketException('Connection terminated during handshake'),
'RawSecureSocket error');
}
}
_closeHandler();
}
encrypted.length += bytes;
}
if (_status == CLOSED) return;
if (_filterStatus.progress) {
_filterPending = true;
if (_filterStatus.writePlaintextNoLongerFull) _sendWriteEvent();
if (_filterStatus.readEncryptedNoLongerFull) _readSocket();
if (_filterStatus.writeEncryptedNoLongerEmpty) _writeSocket();
if (_filterStatus.readPlaintextNoLongerEmpty) _scheduleReadEvent();
if (_status == HANDSHAKE) _secureHandshake();
}
_tryFilter();
});
}
}
List<int> _readSocketOrBufferedData(int bytes) {
if (_bufferedData != null) {
if (bytes > _bufferedData.length - _bufferedDataIndex) {
bytes = _bufferedData.length - _bufferedDataIndex;
}
var result = _bufferedData.sublist(_bufferedDataIndex,
_bufferedDataIndex + bytes);
_bufferedDataIndex += bytes;
if (_bufferedData.length == _bufferedDataIndex) {
_bufferedData = null;
}
return result;
} else if (!_socketClosedRead) {
try {
return _socket.read(bytes);
} catch (e) {
_reportError(e, "RawSecureSocket error reading encrypted socket");
return null;
}
} else {
return null;
}
}
void _readSocket() {
if (_status == CLOSED) return;
var buffer = _secureFilter.buffers[READ_ENCRYPTED];
if (buffer.writeFromSource(_readSocketOrBufferedData) > 0) {
_filterStatus.readEmpty = false;
}
}
void _writeSocket() {
if (_socketClosedWrite) return;
var buffer = _secureFilter.buffers[WRITE_ENCRYPTED];
if (buffer.readToSocket(_socket)) { // Returns true if blocked
_socket.writeEventsEnabled = true;
}
}
// If a read event should be sent, add it to the controller.
_scheduleReadEvent() {
if (!_pendingReadEvent &&
_readEventsEnabled &&
_pauseCount == 0 &&
_secureFilter != null &&
!_secureFilter.buffers[READ_PLAINTEXT].isEmpty) {
_pendingReadEvent = true;
Timer.run(_sendReadEvent);
}
}
_sendReadEvent() {
_pendingReadEvent = false;
if (_readEventsEnabled &&
_pauseCount == 0 &&
_secureFilter != null &&
!_secureFilter.buffers[READ_PLAINTEXT].isEmpty) {
_controller.add(RawSocketEvent.READ);
_scheduleReadEvent();
}
}
// If a write event should be sent, add it to the controller.
_sendWriteEvent() {
if (!_closedWrite &&
_writeEventsEnabled &&
_pauseCount == 0 &&
_secureFilter != null &&
_secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
_writeEventsEnabled = false;
_controller.add(RawSocketEvent.WRITE);
}
}
Future<_FilterStatus> _pushAllFilterStages() {
if (_filterService == null) {
_filterService = _SecureFilter._newServicePort();
}
List args = [_filterPointer, _status != CONNECTED];
var bufs = _secureFilter.buffers;
for (var i = 0; i < NUM_BUFFERS; ++i) {
args.add(bufs[i].start);
args.add(bufs[i].end);
}
return _filterService.call(args).then((response) {
bool wasInHandshake = response[1];
int start(int index) => response[2 * index + 2];
int end(int index) => response[2 * index + 3];
_FilterStatus status = new _FilterStatus();
// Compute writeEmpty as "write plaintext buffer and write encrypted
// buffer were empty when we started and are empty now".
status.writeEmpty = bufs[WRITE_PLAINTEXT].isEmpty &&
start(WRITE_ENCRYPTED) == end(WRITE_ENCRYPTED);
// If we were in handshake when this started, _writeEmpty may be false
// because the handshake wrote data after we checked.
if (wasInHandshake) status.writeEmpty = false;
// Compute readEmpty as "both read buffers were empty when we started
// and are empty now".
status.readEmpty = bufs[READ_ENCRYPTED].isEmpty &&
start(READ_PLAINTEXT) == end(READ_PLAINTEXT);
_ExternalBuffer buffer = bufs[WRITE_PLAINTEXT];
int new_start = start(WRITE_PLAINTEXT);
if (new_start != buffer.start) {
status.progress = true;
if (buffer.free == 0) {
status.writePlaintextNoLongerFull = true;
}
buffer.start = new_start;
}
buffer = bufs[READ_ENCRYPTED];
new_start = start(READ_ENCRYPTED);
if (new_start != buffer.start) {
status.progress = true;
if (buffer.free == 0) {
status.readEncryptedNoLongerFull = true;
}
buffer.start = new_start;
}
buffer = bufs[WRITE_ENCRYPTED];
int new_end = end(WRITE_ENCRYPTED);
if (new_end != buffer.end) {
status.progress = true;
if (buffer.length == 0) {
status.writeEncryptedNoLongerEmpty = true;
}
buffer.end = new_end;
}
buffer = bufs[READ_PLAINTEXT];
new_end = end(READ_PLAINTEXT);
if (new_end != buffer.end) {
status.progress = true;
if (buffer.length == 0) {
status.readPlaintextNoLongerEmpty = true;
}
buffer.end = new_end;
}
return status;
});
}
}
/**
* A circular buffer backed by an external byte array. Accessed from
* both C++ and Dart code in an unsynchronized way, with one reading
* and one writing. All updates to start and end are done by Dart code.
*/
class _ExternalBuffer {
// Performance is improved if a full buffer of plaintext fits
// in the encrypted buffer, when encrypted.
static final int SIZE = 8 * 1024;
static final int ENCRYPTED_SIZE = 10 * 1024;
_ExternalBuffer() : start = 0, length = 0;
_ExternalBuffer(this.size) {
start = size~/2;
end = size~/2;
}
// TODO(whesse): Consider making this a circular buffer. Only if it helps.
void advanceStart(int numBytes) {
start += numBytes;
length -= numBytes;
if (length == 0) {
start = 0;
void advanceStart(int bytes) {
assert(start > end || start + bytes <= end);
start += bytes;
if (start >= size) {
start -= size;
assert(start <= end);
assert(start < size);
}
}
int get free => data.length - (start + length);
void advanceEnd(int bytes) {
assert(start <= end || start > end + bytes);
end += bytes;
if (end >= size) {
end -= size;
assert(end < start);
assert(end < size);
}
}
bool get isEmpty => end == start;
int get length {
if (start > end) return size + end - start;
return end - start;
}
int get linearLength {
if (start > end) return size - start;
return end - start;
}
int get free {
if (start > end) return start - end - 1;
return size + start - end - 1;
}
int get linearFree {
if (start > end) return start - end - 1;
if (start == 0) return size - end - 1;
return size - end;
}
List<int> read(int bytes) {
if (bytes == null) {
bytes = length;
} else {
bytes = min(bytes, length);
}
if (bytes == 0) return null;
List<int> result = new Uint8List(bytes);
int bytesRead = 0;
// Loop over zero, one, or two linear data ranges.
while (bytesRead < bytes) {
int toRead = linearLength;
result.setRange(bytesRead,
bytesRead + toRead,
data,
start);
advanceStart(toRead);
bytesRead += toRead;
}
return result;
}
int write(List<int> inputData, int offset, int bytes) {
if (bytes > free) {
bytes = free;
}
int written = 0;
int toWrite = min(bytes, linearFree);
// Loop over zero, one, or two linear data ranges.
while (toWrite > 0) {
data.setRange(end, end + toWrite, inputData, offset);
advanceEnd(toWrite);
offset += toWrite;
written += toWrite;
toWrite = min(bytes - written, linearFree);
}
return written;
}
int writeFromSource(List<int> getData(int requested)) {
int written = 0;
int toWrite = linearFree;
// Loop over zero, one, or two linear data ranges.
while (toWrite > 0) {
// Source returns at most toWrite bytes, and it returns null when empty.
var inputData = getData(toWrite);
if (inputData == null) break;
var len = inputData.length;
data.setRange(end, end + len, inputData);
advanceEnd(len);
written += len;
toWrite = linearFree;
}
return written;
}
bool readToSocket(RawSocket socket) {
// Loop over zero, one, or two linear data ranges.
while (true) {
var toWrite = linearLength;
if (toWrite == 0) return false;
int bytes = socket.write(data, start, toWrite);
advanceStart(bytes);
if (bytes < toWrite) {
// The socket has blocked while we have data to write.
return true;
}
}
}
List data; // This will be a ExternalByteArray, backed by C allocated data.
int start;
int length;
int end;
final size;
}
abstract class _SecureFilter {
external factory _SecureFilter();
external static SendPort _newServicePort();
void connect(String hostName,
Uint8List addr,
int port,
@ -1002,6 +1157,7 @@ abstract class _SecureFilter {
int processBuffer(int bufferIndex);
void registerBadCertificateCallback(Function callback);
void registerHandshakeCompleteCallback(Function handshakeCompleteHandler);
int _pointer();
List<_ExternalBuffer> get buffers;
}