Web Socket compression - take two

Resolve issues with typed data tests and compression.
Uses HeaderValues to parse permessage_deflate option.

Includes changes from https://codereview.chromium.org/1208473005/

BUG=https://github.com/dart-lang/sdk/issues/14441
R=sgjesse@google.com

Committed: 59bb84127b

Reverted: d1e01e205a

Additional fixes for failing tests.

Review URL: https://codereview.chromium.org/1390353005 .
This commit is contained in:
Søren Gjesse 2015-11-06 08:59:02 +01:00
parent 9efe59cc02
commit 95bb2159d9
6 changed files with 818 additions and 277 deletions

View file

@ -683,9 +683,11 @@ abstract class HeaderValue {
*/
static HeaderValue parse(String value,
{String parameterSeparator: ";",
String valueSeparator: null,
bool preserveBackslash: false}) {
return _HeaderValue.parse(value,
parameterSeparator: parameterSeparator,
valueSeparator: valueSeparator,
preserveBackslash: preserveBackslash);
}

View file

@ -630,10 +630,11 @@ class _HeaderValue implements HeaderValue {
static _HeaderValue parse(String value,
{parameterSeparator: ";",
valueSeparator: null,
preserveBackslash: false}) {
// Parse the string.
var result = new _HeaderValue();
result._parse(value, parameterSeparator, preserveBackslash);
result._parse(value, parameterSeparator, valueSeparator, preserveBackslash);
return result;
}
@ -664,7 +665,10 @@ class _HeaderValue implements HeaderValue {
return sb.toString();
}
void _parse(String s, String parameterSeparator, bool preserveBackslash) {
void _parse(String s,
String parameterSeparator,
String valueSeparator,
bool preserveBackslash) {
int index = 0;
bool done() => index == s.length;
@ -681,6 +685,7 @@ class _HeaderValue implements HeaderValue {
while (!done()) {
if (s[index] == " " ||
s[index] == "\t" ||
s[index] == valueSeparator ||
s[index] == parameterSeparator) break;
index++;
}
@ -705,14 +710,17 @@ class _HeaderValue implements HeaderValue {
String parseParameterName() {
int start = index;
while (!done()) {
if (s[index] == " " || s[index] == "\t" || s[index] == "=") break;
if (s[index] == " " ||
s[index] == "\t" ||
s[index] == "=" ||
s[index] == valueSeparator) break;
index++;
}
return s.substring(start, index).toLowerCase();
}
String parseParameterValue() {
if (s[index] == "\"") {
if (!done() && s[index] == "\"") {
// Parse quoted value.
StringBuffer sb = new StringBuffer();
index++;
@ -735,7 +743,8 @@ class _HeaderValue implements HeaderValue {
return sb.toString();
} else {
// Parse non-quoted value.
return parseValue();
var val = parseValue();
return val == "" ? null : val;
}
}
@ -744,8 +753,16 @@ class _HeaderValue implements HeaderValue {
if (done()) return;
String name = parseParameterName();
skipWS();
expect("=");
if (done()) {
parameters[name] = null;
return;
}
maybeExpect("=");
skipWS();
if(done()) {
parameters[name] = null;
return;
}
String value = parseParameterValue();
if (name == 'charset' && this is _ContentType) {
// Charset parameter of ContentTypes are always lower-case.
@ -754,6 +771,8 @@ class _HeaderValue implements HeaderValue {
parameters[name] = value;
skipWS();
if (done()) return;
// TODO: Implement support for multi-valued parameters.
if(s[index] == valueSeparator) return;
expect(parameterSeparator);
}
}
@ -800,7 +819,7 @@ class _ContentType extends _HeaderValue implements ContentType {
static _ContentType parse(String value) {
var result = new _ContentType._();
result._parse(value, ";", false);
result._parse(value, ";", null, false);
int index = result._value.indexOf("/");
if (index == -1 || index == (result._value.length - 1)) {
result._primaryType = result._value.trim().toLowerCase();

View file

@ -12,7 +12,7 @@ abstract class WebSocketStatus {
static const int GOING_AWAY = 1001;
static const int PROTOCOL_ERROR = 1002;
static const int UNSUPPORTED_DATA = 1003;
static const int RESERVED_1004 = 1004;
static const int RESERVED_1004 = 1004;
static const int NO_STATUS_RECEIVED = 1005;
static const int ABNORMAL_CLOSURE = 1006;
static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
@ -23,6 +23,141 @@ abstract class WebSocketStatus {
static const int RESERVED_1015 = 1015;
}
/**
* The [CompressionOptions] class allows you to control
* the options of WebSocket compression.
*/
class CompressionOptions {
/**
* Default WebSocket Compression options.
* Compression will be enabled with the following options:
* clientNoContextTakeover: false
* serverNoContextTakeover: false
* clientMaxWindowBits: 15
* serverMaxWindowBits: 15
*/
static const CompressionOptions DEFAULT = const CompressionOptions();
/**
* Disables WebSocket Compression.
*/
static const CompressionOptions OFF =
const CompressionOptions(enabled: false);
/**
* Control whether the client will reuse it's compression instances.
*/
final bool clientNoContextTakeover;
/**
* Control whether the server will reuse it's compression instances.
*/
final bool serverNoContextTakeover;
/**
* Sets the Max Window Bits for the Client.
*/
final int clientMaxWindowBits;
/**
* Sets the Max Window Bits for the Server.
*/
final int serverMaxWindowBits;
/**
* Enables or disables WebSocket compression.
*/
final bool enabled;
const CompressionOptions(
{this.clientNoContextTakeover: false,
this.serverNoContextTakeover: false,
this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
this.enabled: true});
/// Parses list of requested server headers to return server compression
/// response headers. Uses [serverMaxWindowBits] value if set, otherwise will
/// attempt to use value from headers. Defaults to
/// [WebSocket.DEFAULT_WINDOW_BITS]
List _createServerResponseHeader(HeaderValue requested) {
var info = new List(2);
int mwb;
var part = requested.parameters[_serverMaxWindowBits];
if (part != null) {
if (part.length >= 2 && part.startsWith('0')) {
throw new ArgumentError("Illegal 0 padding on value.");
} else {
mwb = serverMaxWindowBits == null
? int.parse(part,
onError: (source) => _WebSocketImpl.DEFAULT_WINDOW_BITS)
: serverMaxWindowBits;
info[0] = "; server_max_window_bits=${mwb}";
info[1] = mwb;
}
} else {
info[1] = _WebSocketImpl.DEFAULT_WINDOW_BITS;
}
return info;
}
/// Returns default values for client compression request headers.
List _createClientRequestHeader(HeaderValue requested) {
var info = new List(2);
info[1] = _WebSocketImpl.DEFAULT_WINDOW_BITS;
if (requested != null &&
requested.parameters[_clientMaxWindowBits] != null) {
info[0] = "; client_max_window_bits=${info[1]}";
} else {
info[0] = "; client_max_window_bits";
}
return info;
}
/// Create a Compression Header. If [requested] is null or contains
/// client request headers, returns Client compression request headers.
/// If [requested] contains server response headers this method returns
/// a Server compression response header.
List _createHeader([HeaderValue requested]) {
if (!enabled) {
return ["", 0];
}
var info = new List(2);
var header = _WebSocketImpl.PER_MESSAGE_DEFLATE;
if (clientNoContextTakeover &&
(requested != null &&
requested.parameters.containsKey(_clientNoContextTakeover))) {
header += "; client_no_context_takeover";
}
if (serverNoContextTakeover &&
(requested != null &&
requested.parameters.containsKey(_serverNoContextTakeover))) {
header += "; server_no_context_takeover";
}
if (requested == null ||
requested.parameters.containsKey(_clientMaxWindowBits)) {
var clientList = _createClientRequestHeader(requested);
header += clientList[0];
info[1] = clientList[1];
} else {
var headerList = _createServerResponseHeader(requested);
header += headerList[0];
info[1] = headerList[1];
}
info[0] = header;
return info;
}
}
/**
* The [WebSocketTransformer] provides the ability to upgrade a
* [HttpRequest] to a [WebSocket] connection. It supports both
@ -53,7 +188,6 @@ abstract class WebSocketStatus {
*/
abstract class WebSocketTransformer
implements StreamTransformer<HttpRequest, WebSocket> {
/**
* Create a new [WebSocketTransformer].
*
@ -62,9 +196,15 @@ abstract class WebSocketTransformer
* [protocolSelector] is should return either a [String] or a [Future]
* completing with a [String]. The [String] must exist in the list of
* protocols.
*
* If [compression] is provided, the [WebSocket] created will be configured
* to negotiate with the specified [CompressionOptions]. If none is specified
* then the [WebSocket] will be created with the default [CompressionOptions].
*/
factory WebSocketTransformer({protocolSelector(List<String> protocols)})
=> new _WebSocketTransformerImpl(protocolSelector);
factory WebSocketTransformer(
{protocolSelector(List<String> protocols),
CompressionOptions compression: CompressionOptions.DEFAULT}) =>
new _WebSocketTransformerImpl(protocolSelector, compression);
/**
* Upgrades a [HttpRequest] to a [WebSocket] connection. If the
@ -78,10 +218,16 @@ abstract class WebSocketTransformer
* [protocolSelector] is should return either a [String] or a [Future]
* completing with a [String]. The [String] must exist in the list of
* protocols.
*
* If [compression] is provided, the [WebSocket] created will be configured
* to negotiate with the specified [CompressionOptions]. If none is specified
* then the [WebSocket] will be created with the default [CompressionOptions].
*/
static Future<WebSocket> upgrade(HttpRequest request,
{protocolSelector(List<String> protocols)}) {
return _WebSocketTransformerImpl._upgrade(request, protocolSelector);
{protocolSelector(List<String> protocols),
CompressionOptions compression: CompressionOptions.DEFAULT}) {
return _WebSocketTransformerImpl._upgrade(
request, protocolSelector, compression);
}
/**
@ -92,7 +238,6 @@ abstract class WebSocketTransformer
}
}
/**
* A two-way HTTP communication object for client or server applications.
*
@ -152,9 +297,10 @@ abstract class WebSocket implements Stream, StreamSink {
* authentication when setting up the connection.
*/
static Future<WebSocket> connect(String url,
{Iterable<String> protocols,
Map<String, dynamic> headers}) =>
_WebSocketImpl.connect(url, protocols, headers);
{Iterable<String> protocols,
Map<String, dynamic> headers,
CompressionOptions compression: CompressionOptions.DEFAULT}) =>
_WebSocketImpl.connect(url, protocols, headers, compression: compression);
@Deprecated('This constructor will be removed in Dart 2.0. Use `implements`'
' instead of `extends` if implementing this abstract class.')
@ -174,14 +320,21 @@ abstract class WebSocket implements Stream, StreamSink {
* [serverSide] must be passed explicitly. If it's `false`, the WebSocket will
* act as the client and mask the messages it sends. If it's `true`, it will
* act as the server and will not mask its messages.
*
* If [compression] is provided, the [WebSocket] created will be configured
* to negotiate with the specified [CompressionOptions]. If none is specified
* then the [WebSocket] will be created with the default [CompressionOptions].
*/
factory WebSocket.fromUpgradedSocket(Socket socket, {String protocol,
bool serverSide}) {
factory WebSocket.fromUpgradedSocket(Socket socket,
{String protocol,
bool serverSide,
CompressionOptions compression: CompressionOptions.DEFAULT}) {
if (serverSide == null) {
throw new ArgumentError("The serverSide argument must be passed "
"explicitly to WebSocket.fromUpgradedSocket.");
}
return new _WebSocketImpl._fromSocket(socket, protocol, serverSide);
return new _WebSocketImpl._fromSocket(
socket, protocol, compression, serverSide);
}
/**
@ -238,9 +391,10 @@ abstract class WebSocket implements Stream, StreamSink {
Future addStream(Stream stream);
}
class WebSocketException implements IOException {
final String message;
const WebSocketException([this.message = ""]);
String toString() => "WebSocketException: $message";
}

View file

@ -5,6 +5,10 @@
part of dart.io;
const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
const String _clientNoContextTakeover = "client_no_context_takeover";
const String _serverNoContextTakeover = "server_no_context_takeover";
const String _clientMaxWindowBits = "client_max_window_bits";
const String _serverMaxWindowBits = "server_max_window_bits";
// Matches _WebSocketOpcode.
class _WebSocketMessageType {
@ -13,7 +17,6 @@ class _WebSocketMessageType {
static const int BINARY = 2;
}
class _WebSocketOpcode {
static const int CONTINUATION = 0;
static const int TEXT = 1;
@ -38,8 +41,8 @@ class _WebSocketOpcode {
* which is supplied through the [:handleData:]. As the protocol is processed,
* it'll output frame data as either a List<int> or String.
*
* Important infomation about usage: Be sure you use cancelOnError, so the
* socket will be closed when the processer encounter an error. Not using it
* Important information about usage: Be sure you use cancelOnError, so the
* socket will be closed when the processor encounter an error. Not using it
* will lead to undefined behaviour.
*/
// TODO(ajohnsen): make this transformer reusable?
@ -51,9 +54,15 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
static const int PAYLOAD = 4;
static const int CLOSED = 5;
static const int FAILURE = 6;
static const int FIN = 0x80;
static const int RSV1 = 0x40;
static const int RSV2 = 0x20;
static const int RSV3 = 0x10;
static const int OPCODE = 0xF;
int _state = START;
bool _fin = false;
bool _compressed = false;
int _opcode = -1;
int _len = -1;
bool _masked = false;
@ -71,18 +80,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
final List _maskingBytes = new List(4);
final BytesBuilder _payload = new BytesBuilder(copy: false);
_WebSocketProtocolTransformer([this._serverSide = false]);
_WebSocketPerMessageDeflate _deflate;
_WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
Stream bind(Stream stream) {
return new Stream.eventTransformed(
stream,
(EventSink eventSink) {
if (_eventSink != null) {
throw new StateError("WebSocket transformer already used.");
}
_eventSink = eventSink;
return this;
});
return new Stream.eventTransformed(stream, (EventSink eventSink) {
if (_eventSink != null) {
throw new StateError("WebSocket transformer already used.");
}
_eventSink = eventSink;
return this;
});
}
void addError(Object error, [StackTrace stackTrace]) =>
@ -94,9 +102,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
* Process data received from the underlying communication channel.
*/
void add(Uint8List buffer) {
int count = buffer.length;
int index = 0;
int lastIndex = count;
int lastIndex = buffer.length;
if (_state == CLOSED) {
throw new WebSocketException("Data on closed connection");
}
@ -107,12 +114,20 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
int byte = buffer[index];
if (_state <= LEN_REST) {
if (_state == START) {
_fin = (byte & 0x80) != 0;
if ((byte & 0x70) != 0) {
// The RSV1, RSV2 bits RSV3 must be all zero.
_fin = (byte & FIN) != 0;
if((byte & (RSV2 | RSV3)) != 0) {
// The RSV2, RSV3 bits must both be zero.
throw new WebSocketException("Protocol error");
}
_opcode = (byte & 0xF);
if ((byte & RSV1) != 0) {
_compressed = true;
} else {
_compressed = false;
}
_opcode = (byte & OPCODE);
if (_opcode <= _WebSocketOpcode.BINARY) {
if (_opcode == _WebSocketOpcode.CONTINUATION) {
if (_currentMessageType == _WebSocketMessageType.NONE) {
@ -120,14 +135,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
}
} else {
assert(_opcode == _WebSocketOpcode.TEXT ||
_opcode == _WebSocketOpcode.BINARY);
_opcode == _WebSocketOpcode.BINARY);
if (_currentMessageType != _WebSocketMessageType.NONE) {
throw new WebSocketException("Protocol error");
}
_currentMessageType = _opcode;
}
} else if (_opcode >= _WebSocketOpcode.CLOSE &&
_opcode <= _WebSocketOpcode.PONG) {
_opcode <= _WebSocketOpcode.PONG) {
// Control frames cannot be fragmented.
if (!_fin) throw new WebSocketException("Protocol error");
} else {
@ -176,15 +191,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
_unmask(index, payloadLength, buffer);
}
// Control frame and data frame share _payloads.
_payload.add(
new Uint8List.view(buffer.buffer, index, payloadLength));
_payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
index += payloadLength;
if (_isControlFrame()) {
if (_remainingPayloadBytes == 0) _controlFrameEnd();
} else {
if (_currentMessageType != _WebSocketMessageType.TEXT &&
_currentMessageType != _WebSocketMessageType.BINARY) {
throw new WebSocketException("Protocol error");
throw new WebSocketException("Protocol error");
}
if (_remainingPayloadBytes == 0) _messageFrameEnd();
}
@ -219,8 +233,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
}
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
Int32x4List blockBuffer = new Int32x4List.view(
buffer.buffer, index, blockCount);
Int32x4List blockBuffer =
new Int32x4List.view(buffer.buffer, index, blockCount);
for (int i = 0; i < blockBuffer.length; i++) {
blockBuffer[i] ^= blockMask;
}
@ -284,12 +298,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
void _messageFrameEnd() {
if (_fin) {
var bytes = _payload.takeBytes();
if (_deflate != null && _compressed) {
bytes = _deflate.processIncomingMessage(bytes);
}
switch (_currentMessageType) {
case _WebSocketMessageType.TEXT:
_eventSink.add(UTF8.decode(_payload.takeBytes()));
_eventSink.add(UTF8.decode(bytes));
break;
case _WebSocketMessageType.BINARY:
_eventSink.add(_payload.takeBytes());
_eventSink.add(bytes);
break;
}
_currentMessageType = _WebSocketMessageType.NONE;
@ -331,8 +350,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
bool _isControlFrame() {
return _opcode == _WebSocketOpcode.CLOSE ||
_opcode == _WebSocketOpcode.PING ||
_opcode == _WebSocketOpcode.PONG;
_opcode == _WebSocketOpcode.PING ||
_opcode == _WebSocketOpcode.PONG;
}
void _prepareForNextFrame() {
@ -347,31 +366,29 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
}
}
class _WebSocketPing {
final List<int> payload;
_WebSocketPing([this.payload = null]);
}
class _WebSocketPong {
final List<int> payload;
_WebSocketPong([this.payload = null]);
}
class _WebSocketTransformerImpl implements WebSocketTransformer {
final StreamController<WebSocket> _controller =
new StreamController<WebSocket>(sync: true);
final Function _protocolSelector;
final CompressionOptions _compression;
_WebSocketTransformerImpl(this._protocolSelector);
_WebSocketTransformerImpl(this._protocolSelector, this._compression);
Stream<WebSocket> bind(Stream<HttpRequest> stream) {
stream.listen((request) {
_upgrade(request, _protocolSelector)
.then((WebSocket webSocket) => _controller.add(webSocket))
.catchError(_controller.addError);
_upgrade(request, _protocolSelector, _compression)
.then((WebSocket webSocket) => _controller.add(webSocket))
.catchError(_controller.addError);
}, onDone: () {
_controller.close();
});
@ -379,13 +396,14 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
return _controller.stream;
}
static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) {
static Future<WebSocket> _upgrade(
HttpRequest request, _protocolSelector, CompressionOptions compression) {
var response = request.response;
if (!_isUpgradeRequest(request)) {
// Send error response.
response
..statusCode = HttpStatus.BAD_REQUEST
..close();
..statusCode = HttpStatus.BAD_REQUEST
..close();
return new Future.error(
new WebSocketException("Invalid WebSocket upgrade request"));
}
@ -393,9 +411,9 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
Future upgrade(String protocol) {
// Send the upgrade response.
response
..statusCode = HttpStatus.SWITCHING_PROTOCOLS
..headers.add(HttpHeaders.CONNECTION, "Upgrade")
..headers.add(HttpHeaders.UPGRADE, "websocket");
..statusCode = HttpStatus.SWITCHING_PROTOCOLS
..headers.add(HttpHeaders.CONNECTION, "Upgrade")
..headers.add(HttpHeaders.UPGRADE, "websocket");
String key = request.headers.value("Sec-WebSocket-Key");
_SHA1 sha1 = new _SHA1();
sha1.add("$key$_webSocketGUID".codeUnits);
@ -404,10 +422,13 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
if (protocol != null) {
response.headers.add("Sec-WebSocket-Protocol", protocol);
}
var deflate = _negotiateCompression(request, response, compression);
response.headers.contentLength = 0;
return response.detachSocket()
.then((socket) => new _WebSocketImpl._fromSocket(
socket, protocol, true));
return response.detachSocket().then((socket) =>
new _WebSocketImpl._fromSocket(
socket, protocol, compression, true, deflate));
}
var protocols = request.headers['Sec-WebSocket-Protocol'];
@ -416,26 +437,53 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
// consisting of multiple protocols. To unify all of them, first join
// the lists with ', ' and then tokenize.
protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
return new Future(() => _protocolSelector(protocols))
.then((protocol) {
if (protocols.indexOf(protocol) < 0) {
throw new WebSocketException(
"Selected protocol is not in the list of available protocols");
}
return protocol;
})
.catchError((error) {
response
..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
..close();
throw error;
})
.then(upgrade);
return new Future(() => _protocolSelector(protocols)).then((protocol) {
if (protocols.indexOf(protocol) < 0) {
throw new WebSocketException(
"Selected protocol is not in the list of available protocols");
}
return protocol;
}).catchError((error) {
response
..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
..close();
throw error;
}).then(upgrade);
} else {
return upgrade(null);
}
}
static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
HttpResponse response, CompressionOptions compression) {
var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
if (extensionHeader == null) {
extensionHeader = "";
}
var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
var info = compression._createHeader(hv);
response.headers.add("Sec-WebSocket-Extensions", info[0]);
var serverNoContextTakeover =
hv.parameters.containsKey(_serverNoContextTakeover);
var clientNoContextTakeover =
hv.parameters.containsKey(_clientNoContextTakeover);
var deflate = new _WebSocketPerMessageDeflate(
serverNoContextTakeover: serverNoContextTakeover,
clientNoContextTakeover: clientNoContextTakeover,
serverMaxWindowBits: info[1],
clientMaxWindowBits: info[1],
serverSide: true);
return deflate;
}
return null;
}
static bool _isUpgradeRequest(HttpRequest request) {
if (request.method != "GET") {
return false;
@ -464,24 +512,127 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
}
}
class _WebSocketPerMessageDeflate {
bool serverNoContextTakeover;
bool clientNoContextTakeover;
int clientMaxWindowBits;
int serverMaxWindowBits;
bool serverSide;
_Filter decoder;
_Filter encoder;
_WebSocketPerMessageDeflate(
{this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
this.serverNoContextTakeover: false,
this.clientNoContextTakeover: false,
this.serverSide: false});
void _ensureDecoder() {
if (decoder == null) {
decoder = _Filter._newZLibInflateFilter(
serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
}
}
void _ensureEncoder() {
if (encoder == null) {
encoder = _Filter._newZLibDeflateFilter(
false,
ZLibOption.DEFAULT_LEVEL,
serverSide ? serverMaxWindowBits : clientMaxWindowBits,
ZLibOption.DEFAULT_MEM_LEVEL,
ZLibOption.STRATEGY_DEFAULT,
null,
true);
}
}
Uint8List processIncomingMessage(List<int> msg) {
_ensureDecoder();
var data = [];
data.addAll(msg);
data.addAll(const [0x00, 0x00, 0xff, 0xff]);
decoder.process(data, 0, data.length);
var reuse =
!(serverSide ? clientNoContextTakeover : serverNoContextTakeover);
var result = [];
var out;
while ((out = decoder.processed(flush: reuse)) != null) {
result.addAll(out);
}
decoder.processed(flush: reuse);
if (!reuse) {
decoder.end();
decoder = null;
}
return new Uint8List.fromList(result);
}
List<int> processOutgoingMessage(List<int> msg) {
_ensureEncoder();
var reuse =
!(serverSide ? serverNoContextTakeover : clientNoContextTakeover);
var result = [];
Uint8List buffer;
var out;
if (msg is! Uint8List) {
for (var i = 0; i < msg.length; i++) {
if (msg[i] < 0 || 255 < msg[i]) {
throw new ArgumentError("List element is not a byte value "
"(value ${msg[i]} at index $i)");
}
}
buffer = new Uint8List.fromList(msg);
} else {
buffer = msg;
}
encoder.process(buffer, 0, buffer.length);
while ((out = encoder.processed(flush: reuse)) != null) {
result.addAll(out);
}
if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) {
encoder.end();
encoder = null;
}
if (result.length > 4) {
result = result.sublist(0, result.length - 4);
}
return result;
}
}
// TODO(ajohnsen): Make this transformer reusable.
class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
final _WebSocketImpl webSocket;
EventSink _eventSink;
_WebSocketOutgoingTransformer(this.webSocket);
_WebSocketPerMessageDeflate _deflateHelper;
_WebSocketOutgoingTransformer(this.webSocket) {
_deflateHelper = webSocket._deflate;
}
Stream bind(Stream stream) {
return new Stream.eventTransformed(
stream,
(EventSink eventSink) {
if (_eventSink != null) {
throw new StateError("WebSocket transformer already used");
}
_eventSink = eventSink;
return this;
});
return new Stream.eventTransformed(stream, (EventSink eventSink) {
if (_eventSink != null) {
throw new StateError("WebSocket transformer already used");
}
_eventSink = eventSink;
return this;
});
}
void add(message) {
@ -500,12 +651,16 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
opcode = _WebSocketOpcode.TEXT;
data = UTF8.encode(message);
} else {
if (message is !List<int>) {
if (message is! List<int>) {
throw new ArgumentError(message);
}
opcode = _WebSocketOpcode.BINARY;
data = message;
}
if (_deflateHelper != null) {
data = _deflateHelper.processOutgoingMessage(data);
}
} else {
opcode = _WebSocketOpcode.TEXT;
}
@ -531,11 +686,19 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
_eventSink.close();
}
void addFrame(int opcode, List<int> data) =>
createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
void addFrame(int opcode, List<int> data) => createFrame(
opcode,
data,
webSocket._serverSide,
_deflateHelper != null &&
(opcode == _WebSocketOpcode.TEXT ||
opcode == _WebSocketOpcode.BINARY)).forEach((e) {
_eventSink.add(e);
});
static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
bool mask = !serverSide; // Masking not implemented for server.
static Iterable createFrame(
int opcode, List<int> data, bool serverSide, bool compressed) {
bool mask = !serverSide; // Masking not implemented for server.
int dataLength = data == null ? 0 : data.length;
// Determine the header size.
int headerSize = (mask) ? 6 : 2;
@ -546,11 +709,15 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
}
Uint8List header = new Uint8List(headerSize);
int index = 0;
// Set FIN and opcode.
header[index++] = 0x80 | opcode;
var hoc = _WebSocketProtocolTransformer.FIN
| (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
| (opcode & _WebSocketProtocolTransformer.OPCODE);
header[index++] = hoc;
// Determine size and position of length field.
int lengthBytes = 1;
int firstLengthByte = 1;
if (dataLength > 65535) {
header[index++] = 127;
lengthBytes = 8;
@ -580,8 +747,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
list = new Uint8List(data.length);
for (int i = 0; i < data.length; i++) {
if (data[i] < 0 || 255 < data[i]) {
throw new ArgumentError(
"List element is not a byte value "
throw new ArgumentError("List element is not a byte value "
"(value ${data[i]} at index $i)");
}
list[i] = data[i];
@ -597,8 +763,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
mask = (mask << 8) | maskBytes[i];
}
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
Int32x4List blockBuffer = new Int32x4List.view(
list.buffer, 0, blockCount);
Int32x4List blockBuffer =
new Int32x4List.view(list.buffer, 0, blockCount);
for (int i = 0; i < blockBuffer.length; i++) {
blockBuffer[i] ^= blockMask;
}
@ -619,7 +785,6 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
}
}
class _WebSocketConsumer implements StreamConsumer {
final _WebSocketImpl webSocket;
final Socket socket;
@ -664,28 +829,28 @@ class _WebSocketConsumer implements StreamConsumer {
_ensureController() {
if (_controller != null) return;
_controller = new StreamController(sync: true,
onPause: _onPause,
onResume: _onResume,
onCancel: _onListen);
var stream = _controller.stream.transform(
new _WebSocketOutgoingTransformer(webSocket));
socket.addStream(stream)
.then((_) {
_done();
_closeCompleter.complete(webSocket);
}, onError: (error, StackTrace stackTrace) {
_closed = true;
_cancel();
if (error is ArgumentError) {
if (!_done(error, stackTrace)) {
_closeCompleter.completeError(error, stackTrace);
}
} else {
_done();
_closeCompleter.complete(webSocket);
}
});
_controller = new StreamController(
sync: true,
onPause: _onPause,
onResume: _onResume,
onCancel: _onListen);
var stream = _controller.stream
.transform(new _WebSocketOutgoingTransformer(webSocket));
socket.addStream(stream).then((_) {
_done();
_closeCompleter.complete(webSocket);
}, onError: (error, StackTrace stackTrace) {
_closed = true;
_cancel();
if (error is ArgumentError) {
if (!_done(error, stackTrace)) {
_closeCompleter.completeError(error, stackTrace);
}
} else {
_done();
_closeCompleter.complete(webSocket);
}
});
}
bool _done([error, StackTrace stackTrace]) {
@ -706,13 +871,9 @@ class _WebSocketConsumer implements StreamConsumer {
}
_ensureController();
_completer = new Completer();
_subscription = stream.listen(
(data) {
_controller.add(data);
},
onDone: _done,
onError: _done,
cancelOnError: true);
_subscription = stream.listen((data) {
_controller.add(data);
}, onDone: _done, onError: _done, cancelOnError: true);
if (_issuedPause) {
_subscription.pause();
_issuedPause = false;
@ -742,10 +903,11 @@ class _WebSocketConsumer implements StreamConsumer {
}
}
class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
// Use default Map so we keep order.
static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
static const int DEFAULT_WINDOW_BITS = 15;
static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
final String protocol;
@ -766,11 +928,13 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
int _outCloseCode;
String _outCloseReason;
Timer _closeTimer;
_WebSocketPerMessageDeflate _deflate;
static final HttpClient _httpClient = new HttpClient();
static Future<WebSocket> connect(
String url, Iterable<String> protocols, Map<String, dynamic> headers) {
String url, Iterable<String> protocols, Map<String, dynamic> headers,
{CompressionOptions compression: CompressionOptions.DEFAULT}) {
Uri uri = Uri.parse(url);
if (uri.scheme != "ws" && uri.scheme != "wss") {
throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
@ -784,144 +948,182 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
}
String nonce = _CryptoUtils.bytesToBase64(nonceData);
uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http",
userInfo: uri.userInfo,
host: uri.host,
port: uri.port,
path: uri.path,
query: uri.query,
fragment: uri.fragment);
return _httpClient.openUrl("GET", uri)
.then((request) {
if (uri.userInfo != null && !uri.userInfo.isEmpty) {
// If the URL contains user information use that for basic
// authorization.
String auth =
_CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
}
if (headers != null) {
headers.forEach((field, value) => request.headers.add(field, value));
}
// Setup the initial handshake.
uri = new Uri(
scheme: uri.scheme == "wss" ? "https" : "http",
userInfo: uri.userInfo,
host: uri.host,
port: uri.port,
path: uri.path,
query: uri.query,
fragment: uri.fragment);
return _httpClient.openUrl("GET", uri).then((request) {
if (uri.userInfo != null && !uri.userInfo.isEmpty) {
// If the URL contains user information use that for basic
// authorization.
String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
}
if (headers != null) {
headers.forEach((field, value) => request.headers.add(field, value));
}
// Setup the initial handshake.
request.headers
..set(HttpHeaders.CONNECTION, "Upgrade")
..set(HttpHeaders.UPGRADE, "websocket")
..set("Sec-WebSocket-Key", nonce)
..set("Cache-Control", "no-cache")
..set("Sec-WebSocket-Version", "13");
if (protocols != null) {
request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
}
if (compression.enabled) {
request.headers
..set(HttpHeaders.CONNECTION, "Upgrade")
..set(HttpHeaders.UPGRADE, "websocket")
..set("Sec-WebSocket-Key", nonce)
..set("Cache-Control", "no-cache")
..set("Sec-WebSocket-Version", "13");
if (protocols != null) {
request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
.add("Sec-WebSocket-Extensions", compression._createHeader());
}
return request.close();
}).then((response) {
void error(String message) {
// Flush data.
response.detachSocket().then((socket) {
socket.destroy();
});
throw new WebSocketException(message);
}
if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
response.headers[HttpHeaders.CONNECTION] == null ||
!response.headers[HttpHeaders.CONNECTION]
.any((value) => value.toLowerCase() == "upgrade") ||
response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
"websocket") {
error("Connection to '$uri' was not upgraded to websocket");
}
String accept = response.headers.value("Sec-WebSocket-Accept");
if (accept == null) {
error("Response did not contain a 'Sec-WebSocket-Accept' header");
}
_SHA1 sha1 = new _SHA1();
sha1.add("$nonce$_webSocketGUID".codeUnits);
List<int> expectedAccept = sha1.close();
List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
if (expectedAccept.length != receivedAccept.length) {
error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
}
for (int i = 0; i < expectedAccept.length; i++) {
if (expectedAccept[i] != receivedAccept[i]) {
error("Bad response 'Sec-WebSocket-Accept' header");
}
return request.close();
})
.then((response) {
void error(String message) {
// Flush data.
response.detachSocket().then((socket) {
socket.destroy();
});
throw new WebSocketException(message);
}
if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
response.headers[HttpHeaders.CONNECTION] == null ||
!response.headers[HttpHeaders.CONNECTION].any(
(value) => value.toLowerCase() == "upgrade") ||
response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
"websocket") {
error("Connection to '$uri' was not upgraded to websocket");
}
String accept = response.headers.value("Sec-WebSocket-Accept");
if (accept == null) {
error("Response did not contain a 'Sec-WebSocket-Accept' header");
}
_SHA1 sha1 = new _SHA1();
sha1.add("$nonce$_webSocketGUID".codeUnits);
List<int> expectedAccept = sha1.close();
List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
if (expectedAccept.length != receivedAccept.length) {
error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
}
for (int i = 0; i < expectedAccept.length; i++) {
if (expectedAccept[i] != receivedAccept[i]) {
error("Bad response 'Sec-WebSocket-Accept' header");
}
}
var protocol = response.headers.value('Sec-WebSocket-Protocol');
return response.detachSocket()
.then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
});
}
var protocol = response.headers.value('Sec-WebSocket-Protocol');
_WebSocketPerMessageDeflate deflate =
negotiateClientCompression(response, compression);
return response.detachSocket().then((socket) =>
new _WebSocketImpl._fromSocket(
socket, protocol, compression, false, deflate));
});
}
_WebSocketImpl._fromSocket(this._socket, this.protocol,
[this._serverSide = false]) {
static _WebSocketPerMessageDeflate negotiateClientCompression(
HttpClientResponse response, CompressionOptions compression) {
String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
if (extensionHeader == null) {
extensionHeader = "";
}
var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) {
var serverNoContextTakeover =
hv.parameters.containsKey(_serverNoContextTakeover);
var clientNoContextTakeover =
hv.parameters.containsKey(_clientNoContextTakeover);
int getWindowBits(String type) {
var o = hv.parameters[type];
if (o == null) {
return DEFAULT_WINDOW_BITS;
}
o = int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS);
return o;
}
return new _WebSocketPerMessageDeflate(
clientMaxWindowBits: getWindowBits(_clientMaxWindowBits),
serverMaxWindowBits: getWindowBits(_serverMaxWindowBits),
clientNoContextTakeover: clientNoContextTakeover,
serverNoContextTakeover: serverNoContextTakeover);
}
return null;
}
_WebSocketImpl._fromSocket(
this._socket, this.protocol, CompressionOptions compression,
[this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
_consumer = new _WebSocketConsumer(this, _socket);
_sink = new _StreamSinkImpl(_consumer);
_readyState = WebSocket.OPEN;
_deflate = deflate;
var transformer = new _WebSocketProtocolTransformer(_serverSide);
_subscription = _socket.transform(transformer).listen(
(data) {
if (data is _WebSocketPing) {
if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
} else if (data is _WebSocketPong) {
// Simply set pingInterval, as it'll cancel any timers.
pingInterval = _pingInterval;
} else {
_controller.add(data);
}
},
onError: (error) {
if (_closeTimer != null) _closeTimer.cancel();
if (error is FormatException) {
_close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
} else {
_close(WebSocketStatus.PROTOCOL_ERROR);
}
// An error happened, set the close code set above.
_closeCode = _outCloseCode;
_closeReason = _outCloseReason;
_controller.close();
},
onDone: () {
if (_closeTimer != null) _closeTimer.cancel();
if (_readyState == WebSocket.OPEN) {
_readyState = WebSocket.CLOSING;
if (!_isReservedStatusCode(transformer.closeCode)) {
_close(transformer.closeCode, transformer.closeReason);
} else {
_close();
}
_readyState = WebSocket.CLOSED;
}
// Protocol close, use close code from transformer.
_closeCode = transformer.closeCode;
_closeReason = transformer.closeReason;
_controller.close();
},
cancelOnError: true);
var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
_subscription = _socket.transform(transformer).listen((data) {
if (data is _WebSocketPing) {
if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
} else if (data is _WebSocketPong) {
// Simply set pingInterval, as it'll cancel any timers.
pingInterval = _pingInterval;
} else {
_controller.add(data);
}
}, onError: (error, stackTrace) {
if (_closeTimer != null) _closeTimer.cancel();
if (error is FormatException) {
_close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
} else {
_close(WebSocketStatus.PROTOCOL_ERROR);
}
// An error happened, set the close code set above.
_closeCode = _outCloseCode;
_closeReason = _outCloseReason;
_controller.close();
}, onDone: () {
if (_closeTimer != null) _closeTimer.cancel();
if (_readyState == WebSocket.OPEN) {
_readyState = WebSocket.CLOSING;
if (!_isReservedStatusCode(transformer.closeCode)) {
_close(transformer.closeCode, transformer.closeReason);
} else {
_close();
}
_readyState = WebSocket.CLOSED;
}
// Protocol close, use close code from transformer.
_closeCode = transformer.closeCode;
_closeReason = transformer.closeReason;
_controller.close();
}, cancelOnError: true);
_subscription.pause();
_controller = new StreamController(sync: true,
onListen: _subscription.resume,
onCancel: () {
_subscription.cancel();
_subscription = null;
},
onPause: _subscription.pause,
onResume: _subscription.resume);
_controller = new StreamController(
sync: true, onListen: _subscription.resume, onCancel: () {
_subscription.cancel();
_subscription = null;
}, onPause: _subscription.pause, onResume: _subscription.resume);
_webSockets[_serviceId] = this;
try { _socket._owner = this; } catch (_) {}
try {
_socket._owner = this;
} catch (_) {}
}
StreamSubscription listen(void onData(message),
{Function onError,
void onDone(),
bool cancelOnError}) {
{Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError);
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
Duration get pingInterval => _pingInterval;
@ -1027,13 +1229,12 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
static bool _isReservedStatusCode(int code) {
return code != null &&
(code < WebSocketStatus.NORMAL_CLOSURE ||
(code < WebSocketStatus.NORMAL_CLOSURE ||
code == WebSocketStatus.RESERVED_1004 ||
code == WebSocketStatus.NO_STATUS_RECEIVED ||
code == WebSocketStatus.ABNORMAL_CLOSURE ||
(code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
code < WebSocketStatus.RESERVED_1015) ||
(code >= WebSocketStatus.RESERVED_1015 &&
code < 3000));
code < WebSocketStatus.RESERVED_1015) ||
(code >= WebSocketStatus.RESERVED_1015 && code < 3000));
}
}

View file

@ -0,0 +1,153 @@
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// VMOptions=
// VMOptions=--short_socket_read
// VMOptions=--short_socket_write
// VMOptions=--short_socket_read --short_socket_write
import "dart:async";
import "dart:convert";
import "dart:io";
import "dart:typed_data";
import "package:async_helper/async_helper.dart";
import "package:crypto/crypto.dart";
import "package:expect/expect.dart";
import "package:path/path.dart";
const WEB_SOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
const String HOST_NAME = 'localhost';
String localFile(path) => Platform.script.resolve(path).toFilePath();
SecurityContext serverContext = new SecurityContext()
..useCertificateChain(localFile('certificates/server_chain.pem'))
..usePrivateKey(localFile('certificates/server_key.pem'),
password: 'dartdart');
class SecurityConfiguration {
final bool secure;
SecurityConfiguration({bool this.secure});
Future<HttpServer> createServer({int backlog: 0}) =>
secure ? HttpServer.bindSecure(HOST_NAME,
0,
serverContext,
backlog: backlog)
: HttpServer.bind(HOST_NAME,
0,
backlog: backlog);
Future<WebSocket> createClient(int port) =>
// TODO(whesse): Add client context argument to WebSocket.connect
WebSocket.connect('${secure ? "wss" : "ws"}://$HOST_NAME:$port/');
void testCompressionSupport({server: false,
client: false,
contextTakeover: false}) {
asyncStart();
var clientOptions = new CompressionOptions(
enabled: client,
serverNoContextTakeover: contextTakeover,
clientNoContextTakeover: contextTakeover);
var serverOptions = new CompressionOptions(
enabled: server,
serverNoContextTakeover: contextTakeover,
clientNoContextTakeover: contextTakeover);
createServer().then((server) {
server.listen((request) {
Expect.isTrue(WebSocketTransformer.isUpgradeRequest(request));
WebSocketTransformer.upgrade(request, compression: serverOptions)
.then((webSocket) {
webSocket.listen((message) {
Expect.equals("Hello World", message);
webSocket.add(message);
webSocket.close();
});
webSocket.add("Hello World");
});
});
var url = '${secure ? "wss" : "ws"}://$HOST_NAME:${server.port}/';
WebSocket.connect(url, compression: clientOptions).then((websocket) {
var future = websocket.listen((message) {
Expect.equals("Hello World", message);
}).asFuture();
websocket.add("Hello World");
return future;
}).then((_) {
server.close();
asyncEnd();
});
});
}
void testCompressionHeaders() {
asyncStart();
createServer().then((server) {
server.listen((request) {
Expect.equals('Upgrade', request.headers.value(HttpHeaders.CONNECTION));
Expect.equals('websocket', request.headers.value(HttpHeaders.UPGRADE));
var key = request.headers.value('Sec-WebSocket-Key');
var sha1 = new SHA1()..add("$key$WEB_SOCKET_GUID".codeUnits);
var accept = CryptoUtils.bytesToBase64(sha1.close());
request.response
..statusCode = HttpStatus.SWITCHING_PROTOCOLS
..headers.add(HttpHeaders.CONNECTION, "Upgrade")
..headers.add(HttpHeaders.UPGRADE, "websocket")
..headers.add("Sec-WebSocket-Accept", accept)
..headers.add("Sec-WebSocket-Extensions",
"permessage-deflate;"
// Test quoted values and space padded =
'server_max_window_bits="10"; client_max_window_bits = 12'
'client_no_context_takeover; server_no_context_takeover');
request.response.contentLength = 0;
request.response.detachSocket().then((socket) {
return new WebSocket.fromUpgradedSocket(socket, serverSide: true);
}).then((websocket) {
websocket.add("Hello");
websocket.close();
asyncEnd();
});
});
var url = '${secure ? "wss" : "ws"}://$HOST_NAME:${server.port}/';
WebSocket.connect(url).then((websocket) {
return websocket.listen((message) {
Expect.equals("Hello", message);
websocket.close();
}).asFuture();
}).then((_) => server.close());
});
}
void runTests() {
// No compression or takeover
testCompressionSupport();
// compression no takeover
testCompressionSupport(server: true, client: true);
// compression and context takeover.
testCompressionSupport(server: true, client: true, contextTakeover: true);
// Compression on client but not server. No take over
testCompressionSupport(client: true);
// Compression on server but not client.
testCompressionSupport(server: true);
testCompressionHeaders();
}
}
main() {
new SecurityConfiguration(secure: false).runTests();
// TODO(whesse): Make WebSocket.connect() take an optional context: parameter.
// new SecurityConfiguration(secure: true).runTests();
}

View file

@ -15,13 +15,17 @@ import "dart:typed_data";
Future<HttpServer> createServer() => HttpServer.bind("127.0.0.1", 0);
Future<WebSocket> createClient(int port) =>
WebSocket.connect('ws://127.0.0.1:$port/');
Future<WebSocket> createClient(int port, bool compression) =>
compression ? WebSocket.connect('ws://127.0.0.1:$port/')
: WebSocket.connect('ws://127.0.0.1:$port/',
compression: CompressionOptions.OFF);
void test(expected, testData) {
void test(expected, testData, compression) {
createServer().then((server) {
var messageCount = 0;
server.transform(new WebSocketTransformer()).listen((webSocket) {
var transformer = compression ? new WebSocketTransformer()
: new WebSocketTransformer(compression: CompressionOptions.OFF);
server.transform(transformer).listen((webSocket) {
webSocket.listen(
(message) {
Expect.listEquals(expected, message);
@ -31,7 +35,7 @@ void test(expected, testData) {
onDone: () => Expect.equals(testData.length, messageCount));
});
createClient(server.port).then((webSocket) {
createClient(server.port, compression).then((webSocket) {
var messageCount = 0;
webSocket.listen(
(message) {
@ -45,7 +49,7 @@ void test(expected, testData) {
});
}
testUintLists() {
testUintLists({bool compression: false}) {
var fillData = new List.generate(256, (index) => index);
var testData = [
new Uint8List(256),
@ -55,10 +59,10 @@ testUintLists() {
new Uint64List(256),
];
testData.forEach((list) => list.setAll(0, fillData));
test(fillData, testData);
test(fillData, testData, compression);
}
testIntLists() {
testIntLists({bool compression: false}) {
var fillData = new List.generate(128, (index) => index);
var testData = [
new Int8List(128),
@ -67,18 +71,20 @@ testIntLists() {
new Int64List(128),
];
testData.forEach((list) => list.setAll(0, fillData));
test(fillData, testData);
test(fillData, testData, compression);
}
void testOutOfRangeClient() {
void testOutOfRangeClient({bool compression: false}) {
createServer().then((server) {
var messageCount = 0;
server.transform(new WebSocketTransformer()).listen((webSocket) {
var transformer = compression ? new WebSocketTransformer()
: new WebSocketTransformer(compression: CompressionOptions.OFF);
server.transform(transformer).listen((webSocket) {
webSocket.listen((message) => Expect.fail("No message expected"));
});
Future clientError(data) {
return createClient(server.port).then((webSocket) {
return createClient(server.port, compression).then((webSocket) {
var messageCount = 0;
webSocket.listen((message) => Expect.fail("No message expected"));
webSocket.add(data);
@ -129,7 +135,7 @@ void testOutOfRangeClient() {
});
}
void testOutOfRangeServer() {
void testOutOfRangeServer({bool compression: false}) {
var futures = [];
var testData = [];
var data;
@ -175,7 +181,9 @@ void testOutOfRangeServer() {
createServer().then((server) {
var messageCount = 0;
server.transform(new WebSocketTransformer()).listen((webSocket) {
var transformer = compression ? new WebSocketTransformer()
: new WebSocketTransformer(compression: CompressionOptions.OFF);
server.transform(transformer).listen((webSocket) {
webSocket.listen((message) {
messageCount++;
webSocket.add(testData[message[0]]);
@ -187,7 +195,7 @@ void testOutOfRangeServer() {
Future x(int i) {
var completer = new Completer();
createClient(server.port).then((webSocket) {
createClient(server.port, compression).then((webSocket) {
webSocket.listen((message) => Expect.fail("No message expected"),
onDone: () => completer.complete(true),
onError: (e) => completer.completeError(e));
@ -204,7 +212,11 @@ void testOutOfRangeServer() {
main() {
testUintLists();
testUintLists(compression: true);
testIntLists();
testIntLists(compression: true);
testOutOfRangeClient();
testOutOfRangeClient(compression: true);
// testOutOfRangeServer();
// testOutOfRangeServer(compression: true);
}