Add a web socket client

The web socket client is a implemented as a class which is
created from from a HTTP client connection. This way it is
independent from the HTTP client and is just using the HTTP
client.

The protocol handling is mainly re-using the code from the web
socket server.

Change the handling of setting an onNoPendingWrites handler on a
socket output stream. Now setting a new onNoPendingWrites handler
will trigger a callback when there is no pending data to be
written.

Add socket detaching to the HTTP client as well. Add the ability
to get any unparsed data read from the socket for processing
after the protocol upgrade.

R=ager@google.com, ajohnsen@google.com, vsm@google.com, jacobr@google.com

BUG=dart:2001
TEST=tests/standalone/io/web_socket_test.dart

Review URL: https://chromiumcodereview.appspot.com//10262031

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@7224 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
sgjesse@google.com 2012-05-02 10:30:38 +00:00
parent 1d5a9ccb55
commit a108ef203f
7 changed files with 459 additions and 32 deletions

View file

@ -388,10 +388,10 @@ interface HttpResponse default _HttpResponse {
* socket is detached the HTTP server will no longer perform any
* operations on it.
*
* This is normally used when a HTTP upgraded request is received
* This is normally used when a HTTP upgrade request is received
* and the communication should continue with a different protocol.
*/
Socket detachSocket();
DetachedSocket detachSocket();
}
@ -485,6 +485,16 @@ interface HttpClientConnection {
* connecting or processing the HTTP request.
*/
void set onError(void callback(e));
/**
* Detach the underlying socket from the HTTP client. When the
* socket is detached the HTTP client will no longer perform any
* operations on it.
*
* This is normally used when a HTTP upgrade is negotiated and the
* communication should continue with a different protocol.
*/
DetachedSocket detachSocket();
}
@ -550,6 +560,19 @@ interface HttpClientResponse default _HttpClientResponse {
}
/**
* When detaching a socket from either the [:HttpServer:] or the
* [:HttpClient:] due to a HTTP connection upgrade there might be
* unparsed data already read from the socket. This unparsed data
* together with the detached socket is returned in an instance of
* this class.
*/
interface DetachedSocket default _DetachedSocket {
Socket get socket();
List<int> get unparsedData();
}
class HttpException implements Exception {
const HttpException([String this.message = ""]);
String toString() => "HttpException: $message";

View file

@ -467,7 +467,7 @@ class _HttpResponse extends _HttpRequestResponseBase implements HttpResponse {
return _outputStream;
}
Socket detachSocket() {
DetachedSocket detachSocket() {
if (_state >= DONE) throw new HttpException("Response closed");
// Ensure that headers are written.
if (_state == START) {
@ -602,7 +602,7 @@ class _HttpResponse extends _HttpRequestResponseBase implements HttpResponse {
// whether the content length is known.
if (_contentLength > 0) {
_headers.set("Content-Length", _contentLength.toString());
} else {
} else if (_contentLength < 0) {
_headers.set("Transfer-Encoding", "chunked");
}
@ -745,8 +745,10 @@ class _HttpConnectionBase implements Hashable {
int parsed = _httpParser.writeList(buffer, 0, bytesRead);
if (!_httpParser.upgrade) {
if (parsed != bytesRead) {
// TODO(sgjesse): Error handling.
_close();
if (_socket != null) {
// TODO(sgjesse): Error handling.
_close();
}
}
}
}
@ -766,16 +768,15 @@ class _HttpConnectionBase implements Hashable {
_onConnectionClosed(e);
}
Socket _detachSocket() {
DetachedSocket _detachSocket() {
_socket.onData = null;
// TODO(sgjesse): Handle getting the write handler when using output stream.
//_socket.onWrite = null;
_socket.onClosed = null;
_socket.onError = null;
_socket.outputStream.onNoPendingWrites = null;
Socket socket = _socket;
_socket = null;
if (onDetach) onDetach();
return socket;
if (onDetach != null) onDetach();
return new _DetachedSocket(socket, _httpParser.unparsedData);
}
abstract void _onConnectionClosed(e);
@ -1231,6 +1232,10 @@ class _HttpClientConnection
return _request;
}
DetachedSocket detachSocket() {
return _detachSocket();
}
void _onConnectionClosed(e) {
// Socket is closed either due to an error or due to normal socket close.
if (e != null) {
@ -1506,3 +1511,12 @@ class _HttpClient implements HttpClient {
Timer _evictionTimer;
bool _shutdown; // Has this HTTP client been shutdown?
}
class _DetachedSocket implements DetachedSocket {
_DetachedSocket(this._socket, this._unparsedData);
Socket get socket() => _socket;
List<int> get unparsedData() => _unparsedData;
Socket _socket;
List<int> _unparsedData;
}

View file

@ -419,6 +419,8 @@ class _HttpParser {
_expect(byte, _CharCode.LF);
if (_connectionUpgrade) {
_state = _State.UPGRADED;
_unparsedData =
buffer.getRange(index + 1, count - (index + 1 - offset));
if (headersComplete != null) headersComplete();
} else {
if (headersComplete != null) headersComplete();
@ -606,6 +608,8 @@ class _HttpParser {
bool get isIdle() => _state == _State.START;
List<int> get unparsedData() => _unparsedData;
void _bodyEnd() {
if (dataEnd != null) {
dataEnd(_messageType == _MessageType.RESPONSE && !_persistentConnection);
@ -695,6 +699,7 @@ class _HttpParser {
String _responseToMethod; // Indicates the method used for the request.
int _remainingContent;
List<int> _unparsedData; // Unparsed data after connection upgrade.
// Callbacks.
Function requestStart;
Function responseStart;

View file

@ -112,7 +112,7 @@ class _SocketOutputStream
if (!_pendingWrites.isEmpty()) {
// Mark the socket for close when all data is written.
_closing = true;
_socket._onWrite = _onWrite;
_setupWriteHander();
} else {
// Close the socket for writing.
_socket._closeWrite();
@ -128,9 +128,19 @@ class _SocketOutputStream
}
void set onNoPendingWrites(void callback()) {
if (_noPendingWritesTimer != null) {
_noPendingWritesTimer.cancel();
_noPendingWritesTimer = null;
}
_onNoPendingWrites = callback;
if (_onNoPendingWrites != null) {
_socket._onWrite = _onWrite;
if (_pendingWrites.isEmpty()) {
_noPendingWritesTimer = new Timer(0, (t) {
if (_onNoPendingWrites != null) _onNoPendingWrites();
});
} else {
_setupWriteHander();
}
}
}
@ -154,7 +164,7 @@ class _SocketOutputStream
assert(offset + len == buffer.length);
_pendingWrites.add(buffer, notWrittenOffset);
}
_socket._onWrite = _onWrite;
_setupWriteHander();
return false;
}
@ -186,6 +196,16 @@ class _SocketOutputStream
}
}
void _setupWriteHander() {
// Set up the callback for writing the pending data as the
// underlying socket becomes ready for writing.
if (_noPendingWritesTimer != null) {
_noPendingWritesTimer.cancel();
_noPendingWritesTimer = null;
}
_socket._onWrite = _onWrite;
}
bool _onSocketError(e) {
close();
if (_onError != null) {
@ -198,7 +218,8 @@ class _SocketOutputStream
Socket _socket;
_BufferList _pendingWrites;
var _onNoPendingWrites;
Function _onNoPendingWrites;
Timer _noPendingWritesTimer;
bool _closing = false;
bool _closed = false;
}

View file

@ -34,7 +34,7 @@ interface WebSocketHandler default _WebSocketHandler {
/**
* Web socket connection.
* Server web socket connection.
*/
interface WebSocketConnection {
/**
@ -71,6 +71,76 @@ interface WebSocketConnection {
}
/**
* Client web socket connection.
*/
interface WebSocketClientConnection default _WebSocketClientConnection {
/**
* Creates a new web socket client connection based on a HTTP client
* connection. The HTTP client connection must be freshly opened.
*/
WebSocketClientConnection(HttpClientConnection conn,
[List<String> protocols]);
/**
* Sets the callback to be called when the request object for the
* opening handshake request is ready. This callback can be used if
* one need to add additional headers to the opening handshake
* request.
*/
void set onRequest(void callback(HttpClientRequest request));
/**
* Sets the callback to be called when a web socket connection has
* been established.
*/
void set onOpen(void callback());
/**
* Sets the callback to be called when a message have been
* received. The type of [message] is either [:String:] or
* [:List<int>:] depending on whether it is a text or binary
* message. If the message is empty [message] will be [:null:].
*/
void set onMessage(void callback(message));
/**
* Sets the callback to be called when the web socket connection is
* closed.
*/
void set onClosed(void callback(int status, String reason));
/**
* Sets the callback to be called when the response object for the
* opening handshake did not cause a web socket connection
* upgrade. This will be called in case the response status code is
* not 101 (Switching Protocols). If this callback is not set the
* [:onError:] callback will be called if the server did not upgrade
* the connection.
*/
void set onNoUpgrade(void callback(HttpClientResponse response));
/**
* Sets the callback to be called when the web socket connection
* encountered an error.
*/
void set onError(void callback(e));
/**
* Sends a message. The [message] must be a [:String:] or a
* [:List<int>:]. To send an empty message use either an empty
* [:String:] or an empty [:List<int>:]. [:null:] cannot be used.
*/
send(message);
/**
* Close the web socket connection. The default value for [status]
* and [reason] are [:null:].
*/
close([int status, String reason]);
}
class WebSocketException implements Exception {
const WebSocketException([String this.message = ""]);
String toString() => "WebSocketException: $message";

View file

@ -2,6 +2,8 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
final String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
class _WebSocketMessageType {
static final int NONE = 0;
static final int BINARY = 1;
@ -252,7 +254,7 @@ class _WebSocketProtocolProcessor {
*/
void closed() {
if (_state == START || _state == CLOSED || _state == FAILURE) return;
_reportError(new WebSocketException("Protocol error"));
_reportError(new WebSocketException("Protocol error $_state"));
_state = CLOSED;
}
@ -277,6 +279,7 @@ class _WebSocketProtocolProcessor {
if (_remainingPayloadBytes == 0) {
if (_currentMessageType ==_WebSocketMessageType.CLOSE) {
if (onClosed != null) onClosed(null, null);
_state = CLOSED;
} else {
_frameEnd();
}
@ -342,15 +345,19 @@ class _WebSocketProtocolProcessor {
}
class _WebSocketConnection implements WebSocketConnection {
_WebSocketConnection(Socket this._socket) {
class _WebSocketConnectionBase {
void _socketReady(DetachedSocket detached) {
assert(detached.socket != null);
_socket = detached.socket;
_WebSocketProtocolProcessor processor = new _WebSocketProtocolProcessor();
processor.onMessageStart = _onWebSocketMessageStart;
processor.onMessageData = _onWebSocketMessageData;
processor.onMessageEnd = _onWebSocketMessageEnd;
processor.onClosed = _onWebSocketClosed;
processor.onError = _onWebSocketError;
if (detached.unparsedData != null) {
processor.update(detached.unparsedData, 0, detached.unparsedData.length);
}
_socket.onData = () {
int available = _socket.available();
List<int> data = new List<int>(available);
@ -364,14 +371,12 @@ class _WebSocketConnection implements WebSocketConnection {
// that as an error.
if (_closeTimer != null) _closeTimer.cancel();
} else {
if (_onError != null) {
_onError(new WebSocketException("Unexpected close"));
}
_reportError(new WebSocketException("Unexpected close"));
}
_socket.close();
};
_socket.onError = (e) {
if (_onError != null) _onError(e);
_reportError(e);
_socket.close();
};
}
@ -388,7 +393,7 @@ class _WebSocketConnection implements WebSocketConnection {
_onError = callback;
}
send(Object message) {
send(message) {
if (_closeSent) {
throw new WebSocketException("Connection closed");
}
@ -428,6 +433,7 @@ class _WebSocketConnection implements WebSocketConnection {
if (_closeReceived) {
// Close the socket when the close frame has been sent - if it
// does not take too long.
_socket.close(true);
_socket.outputStream.onNoPendingWrites = () {
if (_closeTimer != null) _closeTimer.cancel();
_socket.close();
@ -489,7 +495,7 @@ class _WebSocketConnection implements WebSocketConnection {
}
_onWebSocketError(e) {
if (_onError != null) _onError(e);
_reportError(e);
_socket.close();
}
@ -528,6 +534,14 @@ class _WebSocketConnection implements WebSocketConnection {
}
}
void _reportError(e) {
if (_onError != null) {
_onError(e);
} else {
throw e;
}
}
Socket _socket;
Timer _closeTimer;
@ -543,11 +557,20 @@ class _WebSocketConnection implements WebSocketConnection {
}
class _WebSocketConnection
extends _WebSocketConnectionBase implements WebSocketConnection {
_WebSocketConnection(DetachedSocket detached) {
_socketReady(detached);
}
}
class _WebSocketHandler implements WebSocketHandler {
void onRequest(HttpRequest request, HttpResponse response) {
// Check that this is a web socket upgrade.
if (!_isWebSocketUpgrade(request)) {
response.statusCode = HttpStatus.BAD_REQUEST;
response.outputStream.close();
return;
}
@ -555,15 +578,15 @@ class _WebSocketHandler implements WebSocketHandler {
response.statusCode = HttpStatus.SWITCHING_PROTOCOLS;
response.headers.add(HttpHeaders.CONNECTION, "Upgrade");
response.headers.add(HttpHeaders.UPGRADE, "websocket");
String x = request.headers.value("Sec-WebSocket-Key");
String y = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
String z = _Base64._encode(_Sha1._hash("$x$y".charCodes()));
response.headers.add("Sec-WebSocket-Accept", z);
String key = request.headers.value("Sec-WebSocket-Key");
String accept =
_Base64._encode(_Sha1._hash("$key$_webSocketGUID".charCodes()));
response.headers.add("Sec-WebSocket-Accept", accept);
response.contentLength = 0;
// Upgrade the connection and get the underlying socket.
Socket socket = response.detachSocket();
WebSocketConnection conn = new _WebSocketConnection(socket);
WebSocketConnection conn =
new _WebSocketConnection(response.detachSocket());
if (_onOpen != null) _onOpen(conn);
}
@ -572,6 +595,9 @@ class _WebSocketHandler implements WebSocketHandler {
}
bool _isWebSocketUpgrade(HttpRequest request) {
if (request.method != "GET") {
return false;
}
if (request.headers[HttpHeaders.CONNECTION] == null) {
return false;
}
@ -597,3 +623,113 @@ class _WebSocketHandler implements WebSocketHandler {
Function _onOpen;
}
class _WebSocketClientConnection
extends _WebSocketConnectionBase implements WebSocketClientConnection {
_WebSocketClientConnection(HttpClientConnection this._conn,
[List<String> protocols]) {
_conn.onRequest = _onHttpClientRequest;
_conn.onResponse = _onHttpClientResponse;
_conn.onError = (e) => _reportError(e);
}
void set onRequest(void callback(HttpClientRequest request)) {
_onRequest = callback;
}
void set onOpen(void callback()) {
_onOpen = callback;
}
void set onNoUpgrade(void callback(HttpClientResponse request)) {
_onNoUpgrade = callback;
}
void _onHttpClientRequest(HttpClientRequest request) {
if (_onRequest != null) {
_onRequest(request);
}
// Setup the initial handshake.
_generateNonce();
request.headers.add(HttpHeaders.CONNECTION, "upgrade");
request.headers.set(HttpHeaders.UPGRADE, "websocket");
request.headers.set("Sec-WebSocket-Key", _nonce);
request.headers.set("Sec-WebSocket-Version", "13");
request.contentLength = 0;
request.outputStream.close();
}
void _onHttpClientResponse(HttpClientResponse response) {
if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS) {
if (_onNoUpgrade != null) {
_onNoUpgrade(response);
} else {
_conn.detachSocket().socket.close();
throw new WebSocketException("Protocol upgrade refused");
}
return;
}
if (!_isWebSocketUpgrade(response)) {
_conn.detachSocket().socket.close();
throw new WebSocketException("Protocol upgrade failed");
return;
}
// Connection upgrade successful.
_socketReady(_conn.detachSocket());
if (_onOpen != null) _onOpen();
}
void _generateNonce() {
assert(_nonce == null);
void intToBigEndianBytes(int value, List<int> bytes, int offset) {
bytes[offset] = (value >> 24) & 0xFF;
bytes[offset + 1] = (value >> 16) & 0xFF;
bytes[offset + 2] = (value >> 8) & 0xFF;
bytes[offset + 3] = value & 0xFF;
}
// Generate 16 random bytes.
List<int> nonce = new List<int>(16);
for (int i = 0; i < 4; i++) {
int r = (Math.random() * 0x100000000).toInt();
intToBigEndianBytes(r, nonce, i * 4);
}
_nonce = _Base64._encode(nonce);
}
bool _isWebSocketUpgrade(HttpClientResponse response) {
if (response.headers[HttpHeaders.CONNECTION] == null) {
return false;
}
bool isUpgrade = false;
response.headers[HttpHeaders.CONNECTION].forEach((String value) {
if (value.toLowerCase() == "upgrade") isUpgrade = true;
});
if (!isUpgrade) return false;
String upgrade = response.headers.value(HttpHeaders.UPGRADE);
if (upgrade == null || upgrade.toLowerCase() != "websocket") {
return false;
}
String accept = response.headers.value("Sec-WebSocket-Accept");
if (accept == null) {
return false;
}
List<int> expectedAccept =
_Sha1._hash("$_nonce$_webSocketGUID".charCodes());
List<int> receivedAccept = _Base64._decode(accept);
if (expectedAccept.length != receivedAccept.length) return false;
for (int i = 0; i < expectedAccept.length; i++) {
if (expectedAccept[i] != receivedAccept[i]) return false;
}
return true;
}
Function _onRequest;
Function _onOpen;
Function _onNoUpgrade;
HttpClientConnection _conn;
String _nonce;
}

View file

@ -0,0 +1,158 @@
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
#import("dart:io");
void testRequestResponseClientCloses(
int totalConnections, int closeStatus, String closeReason) {
HttpServer server = new HttpServer();
HttpClient client = new HttpClient();
server.listen("127.0.0.1", 0, totalConnections);
// Create a web socket handler and set is as the HTTP server default
// handler.
WebSocketHandler wsHandler = new WebSocketHandler();
wsHandler.onOpen = (WebSocketConnection conn) {
var count = 0;
conn.onMessage = (Object message) => conn.send(message);
conn.onClosed = (status, reason) {
Expect.equals(closeStatus, status);
Expect.equals(closeReason, reason);
};
};
server.defaultRequestHandler = wsHandler.onRequest;
int closeCount = 0;
String messageText = "Hello, world!";
for (int i = 0; i < totalConnections; i++) {
int messageCount = 0;
HttpClientConnection conn = client.get("127.0.0.1", server.port, "/");
WebSocketClientConnection wsconn = new WebSocketClientConnection(conn);
wsconn.onOpen = () => wsconn.send(messageText);
wsconn.onMessage = (message) {
messageCount++;
if (messageCount < 10) {
Expect.equals(messageText, message);
wsconn.send(message);
} else {
wsconn.close(closeStatus, closeReason);
}
};
wsconn.onClosed = (status, reason) {
Expect.equals(closeStatus, status);
Expect.isNull(reason);
closeCount++;
if (closeCount == totalConnections) {
client.shutdown();
server.close();
}
};
}
}
void testRequestResponseServerCloses(
int totalConnections, int closeStatus, String closeReason) {
HttpServer server = new HttpServer();
HttpClient client = new HttpClient();
server.listen("127.0.0.1", 0, totalConnections);
// Create a web socket handler and set is as the HTTP server default
// handler.
int closeCount = 0;
WebSocketHandler wsHandler = new WebSocketHandler();
wsHandler.onOpen = (WebSocketConnection conn) {
String messageText = "Hello, world!";
int messageCount = 0;
conn.onMessage = (Object message) {
messageCount++;
if (messageCount < 10) {
Expect.equals(messageText, message);
conn.send(message);
} else {
conn.close(closeStatus, closeReason);
}
};
conn.onClosed = (status, reason) {
Expect.equals(closeStatus, status);
Expect.isNull(reason);
closeCount++;
if (closeCount == totalConnections) {
client.shutdown();
server.close();
}
};
conn.send(messageText);
};
server.defaultRequestHandler = wsHandler.onRequest;
for (int i = 0; i < totalConnections; i++) {
HttpClientConnection conn = client.get("127.0.0.1", server.port, "/");
WebSocketClientConnection wsconn = new WebSocketClientConnection(conn);
wsconn.onMessage = (message) => wsconn.send(message);
wsconn.onClosed = (status, reason) {
Expect.equals(closeStatus, status);
Expect.equals(closeReason, reason);
};
}
}
void testNoUpgrade() {
HttpServer server = new HttpServer();
HttpClient client = new HttpClient();
server.listen("127.0.0.1", 0, 5);
// Create a server which always responds with a redirect.
server.defaultRequestHandler = (request, response) {
response.statusCode = HttpStatus.MOVED_PERMANENTLY;
response.outputStream.close();
};
HttpClientConnection conn = client.get("127.0.0.1", server.port, "/");
WebSocketClientConnection wsconn = new WebSocketClientConnection(conn);
wsconn.onNoUpgrade = (response) {
Expect.equals(HttpStatus.MOVED_PERMANENTLY, response.statusCode);
client.shutdown();
server.close();
};
}
void testUsePOST() {
HttpServer server = new HttpServer();
HttpClient client = new HttpClient();
server.listen("127.0.0.1", 0, 5);
// Create a web socket handler and set is as the HTTP server default
// handler.
int closeCount = 0;
WebSocketHandler wsHandler = new WebSocketHandler();
wsHandler.onOpen = (WebSocketConnection conn) {
Expect.fail("No connection expected");
};
server.defaultRequestHandler = wsHandler.onRequest;
HttpClientConnection conn = client.post("127.0.0.1", server.port, "/");
WebSocketClientConnection wsconn = new WebSocketClientConnection(conn);
wsconn.onNoUpgrade = (response) {
Expect.equals(HttpStatus.BAD_REQUEST, response.statusCode);
client.shutdown();
server.close();
};
}
main() {
testRequestResponseClientCloses(2, null, null);
testRequestResponseClientCloses(2, 3001, null);
testRequestResponseClientCloses(2, 3002, "Got tired");
testRequestResponseServerCloses(1, null, null);
testRequestResponseServerCloses(2, 3001, null);
testRequestResponseServerCloses(2, 3002, "Got tired");
testNoUpgrade();
testUsePOST();
}