// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. part of "dart:_http"; const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; const String _clientNoContextTakeover = "client_no_context_takeover"; const String _serverNoContextTakeover = "server_no_context_takeover"; const String _clientMaxWindowBits = "client_max_window_bits"; const String _serverMaxWindowBits = "server_max_window_bits"; // Matches _WebSocketOpcode. class _WebSocketMessageType { static const int NONE = 0; static const int TEXT = 1; static const int BINARY = 2; } class _WebSocketOpcode { static const int CONTINUATION = 0; static const int TEXT = 1; static const int BINARY = 2; static const int RESERVED_3 = 3; static const int RESERVED_4 = 4; static const int RESERVED_5 = 5; static const int RESERVED_6 = 6; static const int RESERVED_7 = 7; static const int CLOSE = 8; static const int PING = 9; static const int PONG = 10; static const int RESERVED_B = 11; static const int RESERVED_C = 12; static const int RESERVED_D = 13; static const int RESERVED_E = 14; static const int RESERVED_F = 15; } class _EncodedString { final List bytes; _EncodedString(this.bytes); } /// Stores the header and integer value derived from negotiation of /// client_max_window_bits and server_max_window_bits. headerValue will be /// set in the Websocket response headers. class _CompressionMaxWindowBits { String headerValue; int maxWindowBits; _CompressionMaxWindowBits(this.headerValue, this.maxWindowBits); String toString() => headerValue; } /// The web socket protocol transformer handles the protocol byte stream /// which is supplied through the `handleData`. As the protocol is processed, /// it'll output frame data as either a List or String. /// /// Important information about usage: Be sure you use cancelOnError, so the /// socket will be closed when the processor encounter an error. Not using it /// will lead to undefined behaviour. class _WebSocketProtocolTransformer extends StreamTransformerBase< List, dynamic /*List|_WebSocketPing|_WebSocketPong*/ > implements EventSink> { static const int START = 0; static const int LEN_FIRST = 1; static const int LEN_REST = 2; static const int MASK = 3; static const int PAYLOAD = 4; static const int CLOSED = 5; static const int FAILURE = 6; static const int FIN = 0x80; static const int RSV1 = 0x40; static const int RSV2 = 0x20; static const int RSV3 = 0x10; static const int OPCODE = 0xF; int _state = START; bool _fin = false; bool _compressed = false; int _opcode = -1; int _len = -1; bool _masked = false; int _remainingLenBytes = -1; int _remainingMaskingKeyBytes = 4; int _remainingPayloadBytes = -1; int _unmaskingIndex = 0; int _currentMessageType = _WebSocketMessageType.NONE; int closeCode = WebSocketStatus.abnormalClosure; String closeReason = ""; EventSink|_WebSocketPing|_WebSocketPong*/>? _eventSink; final bool _serverSide; final Uint8List _maskingBytes = Uint8List(4); final BytesBuilder _payload = BytesBuilder(copy: false); final _WebSocketPerMessageDeflate? _deflate; _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); Stream|_WebSocketPing|_WebSocketPong*/> bind( Stream> stream, ) { return Stream.eventTransformed(stream, (EventSink eventSink) { if (_eventSink != null) { throw StateError("WebSocket transformer already used."); } _eventSink = eventSink; return this; }); } void addError(Object error, [StackTrace? stackTrace]) { // TODO(40614): Remove once non-nullability is sound. ArgumentError.checkNotNull(error, "error"); _eventSink!.addError(error, stackTrace); } void close() { _eventSink!.close(); } /// Process data received from the underlying communication channel. void add(List bytes) { var buffer = bytes is Uint8List ? bytes : Uint8List.fromList(bytes); int index = 0; int lastIndex = buffer.length; if (_state == CLOSED) { throw WebSocketException("Data on closed connection"); } if (_state == FAILURE) { throw WebSocketException("Data on failed connection"); } while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { int byte = buffer[index]; if (_state <= LEN_REST) { if (_state == START) { _fin = (byte & FIN) != 0; if ((byte & (RSV2 | RSV3)) != 0) { // The RSV2, RSV3 bits must both be zero. throw WebSocketException("Protocol error"); } _opcode = (byte & OPCODE); if (_opcode != _WebSocketOpcode.CONTINUATION) { if ((byte & RSV1) != 0) { _compressed = true; } else { _compressed = false; } } if (_opcode <= _WebSocketOpcode.BINARY) { if (_opcode == _WebSocketOpcode.CONTINUATION) { if (_currentMessageType == _WebSocketMessageType.NONE) { throw WebSocketException("Protocol error"); } } else { assert( _opcode == _WebSocketOpcode.TEXT || _opcode == _WebSocketOpcode.BINARY, ); if (_currentMessageType != _WebSocketMessageType.NONE) { throw WebSocketException("Protocol error"); } _currentMessageType = _opcode; } } else if (_opcode >= _WebSocketOpcode.CLOSE && _opcode <= _WebSocketOpcode.PONG) { // Control frames cannot be fragmented. if (!_fin) throw WebSocketException("Protocol error"); } else { throw WebSocketException("Protocol error"); } _state = LEN_FIRST; } else if (_state == LEN_FIRST) { _masked = (byte & 0x80) != 0; _len = byte & 0x7F; if (_isControlFrame() && _len > 125) { throw WebSocketException("Protocol error"); } if (_len == 126) { _len = 0; _remainingLenBytes = 2; _state = LEN_REST; } else if (_len == 127) { _len = 0; _remainingLenBytes = 8; _state = LEN_REST; } else { assert(_len < 126); _lengthDone(); } } else { assert(_state == LEN_REST); _len = _len << 8 | byte; _remainingLenBytes--; if (_remainingLenBytes == 0) { _lengthDone(); } } } else { if (_state == MASK) { _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; if (_remainingMaskingKeyBytes == 0) { _maskDone(); } } else { assert(_state == PAYLOAD); // The payload is not handled one byte at a time but in blocks. int payloadLength = min(lastIndex - index, _remainingPayloadBytes); _remainingPayloadBytes -= payloadLength; // Unmask payload if masked. if (_masked) { _unmask(index, payloadLength, buffer); } // Control frame and data frame share _payloads. _payload.add( Uint8List.view( buffer.buffer, buffer.offsetInBytes + index, payloadLength, ), ); index += payloadLength; if (_isControlFrame()) { if (_remainingPayloadBytes == 0) _controlFrameEnd(); } else { if (_currentMessageType != _WebSocketMessageType.TEXT && _currentMessageType != _WebSocketMessageType.BINARY) { throw WebSocketException("Protocol error"); } if (_remainingPayloadBytes == 0) _messageFrameEnd(); } // Hack - as we always do index++ below. index--; } } // Move to the next byte. index++; } } void _unmask(int index, int length, Uint8List buffer) { const int BLOCK_SIZE = 16; // Skip Int32x4-version if message is small. if (length >= BLOCK_SIZE) { // Start by aligning to 16 bytes. final int startOffset = BLOCK_SIZE - (index & 15); final int end = index + startOffset; for (int i = index; i < end; i++) { buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; } index += startOffset; length -= startOffset; final int blockCount = length ~/ BLOCK_SIZE; if (blockCount > 0) { // Create mask block. int mask = 0; for (int i = 3; i >= 0; i--) { mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; } Int32x4 blockMask = Int32x4(mask, mask, mask, mask); Int32x4List blockBuffer = Int32x4List.view( buffer.buffer, buffer.offsetInBytes + index, blockCount, ); for (int i = 0; i < blockBuffer.length; i++) { blockBuffer[i] ^= blockMask; } final int bytes = blockCount * BLOCK_SIZE; index += bytes; length -= bytes; } } // Handle end. final int end = index + length; for (int i = index; i < end; i++) { buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; } } void _lengthDone() { if (_masked) { if (!_serverSide) { throw WebSocketException("Received masked frame from server"); } _state = MASK; } else { if (_serverSide) { throw WebSocketException("Received unmasked frame from client"); } _remainingPayloadBytes = _len; _startPayload(); } } void _maskDone() { _remainingPayloadBytes = _len; _startPayload(); } void _startPayload() { // If there is no actual payload perform callbacks without // going through the PAYLOAD state. if (_remainingPayloadBytes == 0) { if (_isControlFrame()) { switch (_opcode) { case _WebSocketOpcode.CLOSE: _state = CLOSED; closeCode = WebSocketStatus.noStatusReceived; _eventSink!.close(); break; case _WebSocketOpcode.PING: _eventSink!.add(_WebSocketPing()); break; case _WebSocketOpcode.PONG: _eventSink!.add(_WebSocketPong()); break; } _prepareForNextFrame(); } else { _messageFrameEnd(); } } else { _state = PAYLOAD; } } void _messageFrameEnd() { if (_fin) { var bytes = _payload.takeBytes(); var deflate = _deflate; if (deflate != null && _compressed) { bytes = deflate.processIncomingMessage(bytes); } switch (_currentMessageType) { case _WebSocketMessageType.TEXT: _eventSink!.add(utf8.decode(bytes)); break; case _WebSocketMessageType.BINARY: _eventSink!.add(bytes); break; } _currentMessageType = _WebSocketMessageType.NONE; } _prepareForNextFrame(); } void _controlFrameEnd() { switch (_opcode) { case _WebSocketOpcode.CLOSE: closeCode = WebSocketStatus.noStatusReceived; var payload = _payload.takeBytes(); if (payload.isNotEmpty) { if (payload.length == 1) { throw WebSocketException("Protocol error"); } closeCode = payload[0] << 8 | payload[1]; if (closeCode == WebSocketStatus.noStatusReceived) { throw WebSocketException("Protocol error"); } if (payload.length > 2) { closeReason = utf8.decode(payload.sublist(2)); } } _state = CLOSED; _eventSink!.close(); break; case _WebSocketOpcode.PING: _eventSink!.add(_WebSocketPing(_payload.takeBytes())); break; case _WebSocketOpcode.PONG: _eventSink!.add(_WebSocketPong(_payload.takeBytes())); break; } _prepareForNextFrame(); } bool _isControlFrame() { return _opcode == _WebSocketOpcode.CLOSE || _opcode == _WebSocketOpcode.PING || _opcode == _WebSocketOpcode.PONG; } void _prepareForNextFrame() { if (_state != CLOSED && _state != FAILURE) _state = START; _fin = false; _opcode = -1; _len = -1; _remainingLenBytes = -1; _remainingMaskingKeyBytes = 4; _remainingPayloadBytes = -1; _unmaskingIndex = 0; } } class _WebSocketPing { final List? payload; _WebSocketPing([this.payload]); } class _WebSocketPong { final List? payload; _WebSocketPong([this.payload]); } typedef /*String|Future*/ _ProtocolSelector = Function(List protocols); class _WebSocketTransformerImpl extends StreamTransformerBase implements WebSocketTransformer { final StreamController _controller = StreamController( sync: true, ); final _ProtocolSelector? _protocolSelector; final CompressionOptions _compression; _WebSocketTransformerImpl(this._protocolSelector, this._compression); Stream bind(Stream stream) { stream.listen( (request) { _upgrade(request, _protocolSelector, _compression) .then((WebSocket webSocket) => _controller.add(webSocket)) .catchError(_controller.addError); }, onDone: () { _controller.close(); }, ); return _controller.stream; } static List _tokenizeFieldValue(String headerValue) { List tokens = []; int start = 0; int index = 0; while (index < headerValue.length) { if (headerValue[index] == ",") { tokens.add(headerValue.substring(start, index)); start = index + 1; } else if (headerValue[index] == " " || headerValue[index] == "\t") { start++; } index++; } tokens.add(headerValue.substring(start, index)); return tokens; } static Future _upgrade( HttpRequest request, _ProtocolSelector? protocolSelector, CompressionOptions compression, ) { var response = request.response; if (!_isUpgradeRequest(request)) { // Send error response. response ..statusCode = HttpStatus.badRequest ..close(); return Future.error( WebSocketException("Invalid WebSocket upgrade request"), ); } Future upgrade(String? protocol) { // Send the upgrade response. response ..statusCode = HttpStatus.switchingProtocols ..headers.add(HttpHeaders.connectionHeader, "Upgrade") ..headers.add(HttpHeaders.upgradeHeader, "websocket"); String key = request.headers.value("Sec-WebSocket-Key")!; _SHA1 sha1 = _SHA1(); sha1.add("$key$_webSocketGUID".codeUnits); String accept = base64Encode(sha1.close()); response.headers.add("Sec-WebSocket-Accept", accept); if (protocol != null) { response.headers.add("Sec-WebSocket-Protocol", protocol); } var deflate = _negotiateCompression(request, response, compression); response.headers.contentLength = 0; return response.detachSocket().then( (socket) => _WebSocketImpl._fromSocket( socket, protocol, compression, true, deflate, ), ); } var protocols = request.headers['Sec-WebSocket-Protocol']; if (protocols != null && protocolSelector != null) { // The suggested protocols can be spread over multiple lines, each // consisting of multiple protocols. To unify all of them, first join // the lists with ', ' and then tokenize. var tokenizedProtocols = _tokenizeFieldValue(protocols.join(', ')); return Future(() => protocolSelector(tokenizedProtocols)) .then((protocol) { if (!tokenizedProtocols.contains(protocol)) { throw WebSocketException( "Selected protocol is not in the list of available protocols", ); } return protocol; }) .catchError((error) { response ..statusCode = HttpStatus.internalServerError ..close(); throw error; }) .then(upgrade); } else { return upgrade(null); } } static _WebSocketPerMessageDeflate? _negotiateCompression( HttpRequest request, HttpResponse response, CompressionOptions compression, ) { var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); extensionHeader ??= ""; var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) { var info = compression._createHeader(hv); response.headers.add("Sec-WebSocket-Extensions", info.headerValue); var serverNoContextTakeover = (hv.parameters.containsKey(_serverNoContextTakeover) && compression.serverNoContextTakeover); var clientNoContextTakeover = (hv.parameters.containsKey(_clientNoContextTakeover) && compression.clientNoContextTakeover); var deflate = _WebSocketPerMessageDeflate( serverNoContextTakeover: serverNoContextTakeover, clientNoContextTakeover: clientNoContextTakeover, serverMaxWindowBits: info.maxWindowBits, clientMaxWindowBits: info.maxWindowBits, serverSide: true, ); return deflate; } return null; } static bool _isUpgradeRequest(HttpRequest request) { if (request.method != "GET") { return false; } var connectionHeader = request.headers[HttpHeaders.connectionHeader]; if (connectionHeader == null) { return false; } bool isUpgrade = false; for (var value in connectionHeader) { if (value.toLowerCase() == "upgrade") { isUpgrade = true; break; } } if (!isUpgrade) return false; String? upgrade = request.headers.value(HttpHeaders.upgradeHeader); if (upgrade == null || upgrade.toLowerCase() != "websocket") { return false; } String? version = request.headers.value("Sec-WebSocket-Version"); if (version == null || version != "13") { return false; } String? key = request.headers.value("Sec-WebSocket-Key"); if (key == null) { return false; } return true; } } class _WebSocketPerMessageDeflate { bool serverNoContextTakeover; bool clientNoContextTakeover; int clientMaxWindowBits; int serverMaxWindowBits; bool serverSide; RawZLibFilter? decoder; RawZLibFilter? encoder; _WebSocketPerMessageDeflate({ this.clientMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS, this.serverMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS, this.serverNoContextTakeover = false, this.clientNoContextTakeover = false, this.serverSide = false, }); RawZLibFilter _ensureDecoder() => decoder ??= RawZLibFilter.inflateFilter( windowBits: serverSide ? clientMaxWindowBits : serverMaxWindowBits, raw: true, ); RawZLibFilter _ensureEncoder() => encoder ??= RawZLibFilter.deflateFilter( windowBits: serverSide ? serverMaxWindowBits : clientMaxWindowBits, raw: true, ); Uint8List processIncomingMessage(List msg) { var decoder = _ensureDecoder(); var data = []; data.addAll(msg); data.addAll(const [0x00, 0x00, 0xff, 0xff]); decoder.process(data, 0, data.length); final result = BytesBuilder(); while (true) { final out = decoder.processed(); if (out == null) break; result.add(out); } if ((serverSide && clientNoContextTakeover) || (!serverSide && serverNoContextTakeover)) { this.decoder = null; } return result.takeBytes(); } List processOutgoingMessage(List msg) { var encoder = _ensureEncoder(); var result = []; Uint8List buffer; if (msg is! Uint8List) { for (var i = 0; i < msg.length; i++) { if (msg[i] < 0 || 255 < msg[i]) { throw ArgumentError( "List element is not a byte value " "(value ${msg[i]} at index $i)", ); } } buffer = Uint8List.fromList(msg); } else { buffer = msg; } encoder.process(buffer, 0, buffer.length); while (true) { final out = encoder.processed(); if (out == null) break; result.addAll(out); } if ((!serverSide && clientNoContextTakeover) || (serverSide && serverNoContextTakeover)) { this.encoder = null; } if (result.length > 4) { result = result.sublist(0, result.length - 4); } // RFC 7692 7.2.3.6. "Generating an Empty Fragment" says that if the // compression library doesn't generate any data when the buffer is empty, // then an empty uncompressed deflate block is used for this purpose. The // 0x00 block has the BFINAL header bit set to 0 and the BTYPE header set to // 00 along with 5 bits of padding. This block decodes to zero bytes. if (result.isEmpty) { return [0x00]; } return result; } } // TODO(ajohnsen): Make this transformer reusable. class _WebSocketOutgoingTransformer extends StreamTransformerBase> implements EventSink { final _WebSocketImpl webSocket; EventSink>? _eventSink; final _WebSocketPerMessageDeflate? _deflateHelper; _WebSocketOutgoingTransformer(this.webSocket) : _deflateHelper = webSocket._deflate; Stream> bind(Stream stream) { return Stream>.eventTransformed(stream, ( EventSink> eventSink, ) { if (_eventSink != null) { throw StateError("WebSocket transformer already used"); } _eventSink = eventSink; return this; }); } void add(message) { if (message is _WebSocketPong) { addFrame(_WebSocketOpcode.PONG, message.payload); return; } if (message is _WebSocketPing) { addFrame(_WebSocketOpcode.PING, message.payload); return; } List? data; int opcode; if (message != null) { List messageData; if (message is String) { opcode = _WebSocketOpcode.TEXT; messageData = utf8.encode(message); } else if (message is List) { opcode = _WebSocketOpcode.BINARY; messageData = message; } else if (message is _EncodedString) { opcode = _WebSocketOpcode.TEXT; messageData = message.bytes; } else { throw ArgumentError(message); } var deflateHelper = _deflateHelper; if (deflateHelper != null) { messageData = deflateHelper.processOutgoingMessage(messageData); } data = messageData; } else { opcode = _WebSocketOpcode.TEXT; } addFrame(opcode, data); } void addError(Object error, [StackTrace? stackTrace]) { // TODO(40614): Remove once non-nullability is sound. ArgumentError.checkNotNull(error, "error"); _eventSink!.addError(error, stackTrace); } void close() { int? code = webSocket._outCloseCode; String? reason = webSocket._outCloseReason; List? data; if (code != null) { data = [ (code >> 8) & 0xFF, code & 0xFF, if (reason != null) ...utf8.encode(reason), ]; } addFrame(_WebSocketOpcode.CLOSE, data); _eventSink!.close(); } void addFrame(int opcode, List? data) { createFrame( opcode, data, webSocket._serverSide, _deflateHelper != null && (opcode == _WebSocketOpcode.TEXT || opcode == _WebSocketOpcode.BINARY), ).forEach((e) { _eventSink!.add(e); }); } static Iterable> createFrame( int opcode, List? data, bool serverSide, bool compressed, ) { bool mask = !serverSide; // Masking not implemented for server. int dataLength = data == null ? 0 : data.length; // Determine the header size. int headerSize = (mask) ? 6 : 2; if (dataLength > 65535) { headerSize += 8; } else if (dataLength > 125) { headerSize += 2; } Uint8List header = Uint8List(headerSize); int index = 0; // Set FIN and opcode. var hoc = _WebSocketProtocolTransformer.FIN | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) | (opcode & _WebSocketProtocolTransformer.OPCODE); header[index++] = hoc; // Determine size and position of length field. int lengthBytes = 1; if (dataLength > 65535) { header[index++] = 127; lengthBytes = 8; } else if (dataLength > 125) { header[index++] = 126; lengthBytes = 2; } // Write the length in network byte order into the header. for (int i = 0; i < lengthBytes; i++) { header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; } if (mask) { header[1] |= 1 << 7; var maskBytes = _CryptoUtils.getRandomBytes(4); header.setRange(index, index + 4, maskBytes); index += 4; if (data != null) { Uint8List list; // If this is a text message just do the masking inside the // encoded data. if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { list = data; } else { if (data is Uint8List) { list = Uint8List.fromList(data); } else { list = Uint8List(data.length); for (int i = 0; i < data.length; i++) { if (data[i] < 0 || 255 < data[i]) { throw ArgumentError( "List element is not a byte value " "(value ${data[i]} at index $i)", ); } list[i] = data[i]; } } } const int BLOCK_SIZE = 16; int blockCount = list.length ~/ BLOCK_SIZE; if (blockCount > 0) { // Create mask block. int mask = 0; for (int i = 3; i >= 0; i--) { mask = (mask << 8) | maskBytes[i]; } Int32x4 blockMask = Int32x4(mask, mask, mask, mask); Int32x4List blockBuffer = Int32x4List.view( list.buffer, list.offsetInBytes, blockCount, ); for (int i = 0; i < blockBuffer.length; i++) { blockBuffer[i] ^= blockMask; } } // Handle end. for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { list[i] ^= maskBytes[i & 3]; } data = list; } } assert(index == headerSize); if (data == null) { return [header]; } else { return [header, data]; } } } class _WebSocketConsumer implements StreamConsumer { final _WebSocketImpl webSocket; final Socket socket; StreamController? _controller; StreamSubscription? _subscription; bool _issuedPause = false; bool _closed = false; final Completer _closeCompleter = Completer(); Completer? _completer; _WebSocketConsumer(this.webSocket, this.socket); void _onListen() { _subscription?.cancel(); } void _onPause() { var subscription = _subscription; if (subscription != null) { subscription.pause(); } else { _issuedPause = true; } } void _onResume() { var subscription = _subscription; if (subscription != null) { subscription.resume(); } else { _issuedPause = false; } } void _cancel() { var subscription = _subscription; if (subscription != null) { _subscription = null; subscription.cancel(); } } StreamController _ensureController() { var controller = _controller; if (controller != null) return controller; controller = _controller = StreamController( sync: true, onPause: _onPause, onResume: _onResume, onCancel: _onListen, ); var stream = controller.stream.transform( _WebSocketOutgoingTransformer(webSocket), ); socket .addStream(stream) .then( (_) { _done(); _closeCompleter.complete(webSocket); }, onError: (Object error, StackTrace stackTrace) { _closed = true; _cancel(); if (error is ArgumentError) { if (!_done(error, stackTrace)) { _closeCompleter.completeError(error, stackTrace); } } else { _done(); _closeCompleter.complete(webSocket); } }, ); return controller; } bool _done([Object? error, StackTrace? stackTrace]) { var completer = _completer; if (completer == null) return false; if (error != null) { completer.completeError(error, stackTrace); } else { completer.complete(webSocket); } _completer = null; return true; } Future addStream(Stream stream) { if (_closed) { stream.listen(null).cancel(); return Future.value(webSocket); } _ensureController(); var completer = _completer = Completer(); var subscription = _subscription = stream.listen( (data) { _controller!.add(data); }, onDone: _done, onError: _done, cancelOnError: true, ); if (_issuedPause) { subscription.pause(); _issuedPause = false; } return completer.future; } Future close() { _ensureController().close(); return _closeCompleter.future.then( (_) => socket.close().catchError((_) {}).then((_) => webSocket), ); } void add(data) { if (_closed) return; var controller = _ensureController(); // Stop sending message if _controller has been closed. // https://github.com/dart-lang/sdk/issues/37441 if (controller.isClosed) return; controller.add(data); } void closeSocket() { _closed = true; _cancel(); close(); } } class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { // Use default Map so we keep order. static final Map _webSockets = {}; static const int DEFAULT_WINDOW_BITS = 15; static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; final String? protocol; final StreamController _controller; StreamSubscription? _subscription; late StreamSink _sink; final Socket _socket; final bool _serverSide; int _readyState = WebSocket.connecting; bool _writeClosed = false; int? _closeCode; String? _closeReason; Duration? _pingInterval; Timer? _pingTimer; late _WebSocketConsumer _consumer; int? _outCloseCode; String? _outCloseReason; Timer? _closeTimer; _WebSocketPerMessageDeflate? _deflate; static final HttpClient _httpClient = HttpClient(); static Future connect( String url, Iterable? protocols, Map? headers, { CompressionOptions compression = CompressionOptions.compressionDefault, HttpClient? customClient, }) { Uri uri = Uri.parse(url); if (!uri.isScheme("ws") && !uri.isScheme("wss")) { throw WebSocketException("Unsupported URL scheme '${uri.scheme}'"); } Random random = Random(); // Generate 16 random bytes. Uint8List nonceData = Uint8List(16); for (int i = 0; i < 16; i++) { nonceData[i] = random.nextInt(256); } String nonce = base64Encode(nonceData); final callerStackTrace = StackTrace.current; uri = Uri( scheme: uri.isScheme("wss") ? "https" : "http", userInfo: uri.userInfo, host: uri.host, port: uri.port, path: uri.path, query: uri.query, fragment: uri.fragment, ); return (customClient ?? _httpClient) .openUrl("GET", uri) .then((request) { if (uri.userInfo.isNotEmpty) { // If the URL contains user information use that for basic // authorization. String auth = base64Encode(utf8.encode(uri.userInfo)); request.headers.set(HttpHeaders.authorizationHeader, "Basic $auth"); } if (headers != null) { headers.forEach( (field, value) => request.headers.add(field, value), ); } // Setup the initial handshake. request.headers ..set(HttpHeaders.connectionHeader, "Upgrade") ..set(HttpHeaders.upgradeHeader, "websocket") ..set("Sec-WebSocket-Key", nonce) ..set("Cache-Control", "no-cache") ..set("Sec-WebSocket-Version", "13"); if (protocols != null) { request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); } if (compression.enabled) { request.headers.add( "Sec-WebSocket-Extensions", compression._createHeader(), ); } return request.close(); }) .then((response) { Future error(String message, [int? httpStatusCode]) { // Flush data. response.detachSocket().then((socket) { socket.destroy(); }); return Future.error( WebSocketException(message, httpStatusCode), callerStackTrace, ); } var connectionHeader = response.headers[HttpHeaders.connectionHeader]; if (response.statusCode != HttpStatus.switchingProtocols || connectionHeader == null || !connectionHeader.any( (value) => value.toLowerCase() == "upgrade", ) || response.headers .value(HttpHeaders.upgradeHeader)! .toLowerCase() != "websocket") { return error( "Connection to '$uri' was not upgraded to websocket", response.statusCode, ); } String? accept = response.headers.value("Sec-WebSocket-Accept"); if (accept == null) { return error( "Response did not contain a 'Sec-WebSocket-Accept' header", ); } _SHA1 sha1 = _SHA1(); sha1.add("$nonce$_webSocketGUID".codeUnits); List expectedAccept = sha1.close(); List receivedAccept = base64Decode(accept); if (expectedAccept.length != receivedAccept.length) { return error( "Response header 'Sec-WebSocket-Accept' is the wrong length", ); } for (int i = 0; i < expectedAccept.length; i++) { if (expectedAccept[i] != receivedAccept[i]) { return error("Bad response 'Sec-WebSocket-Accept' header"); } } var protocol = response.headers.value('Sec-WebSocket-Protocol'); _WebSocketPerMessageDeflate? deflate = negotiateClientCompression( response, compression, ); return response.detachSocket().then( (socket) => _WebSocketImpl._fromSocket( socket, protocol, compression, false, deflate, ), ); }); } static _WebSocketPerMessageDeflate? negotiateClientCompression( HttpClientResponse response, CompressionOptions compression, ) { String extensionHeader = response.headers.value('Sec-WebSocket-Extensions') ?? ""; var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) { var serverNoContextTakeover = hv.parameters.containsKey( _serverNoContextTakeover, ); var clientNoContextTakeover = hv.parameters.containsKey( _clientNoContextTakeover, ); int getWindowBits(String type) { var o = hv.parameters[type]; if (o == null) { return DEFAULT_WINDOW_BITS; } return int.tryParse(o) ?? DEFAULT_WINDOW_BITS; } return _WebSocketPerMessageDeflate( clientMaxWindowBits: getWindowBits(_clientMaxWindowBits), serverMaxWindowBits: getWindowBits(_serverMaxWindowBits), clientNoContextTakeover: clientNoContextTakeover, serverNoContextTakeover: serverNoContextTakeover, ); } return null; } _WebSocketImpl._fromSocket( this._socket, this.protocol, CompressionOptions compression, [ this._serverSide = false, _WebSocketPerMessageDeflate? deflate, ]) : _controller = StreamController(sync: true) { _consumer = _WebSocketConsumer(this, _socket); _sink = _StreamSinkImpl(_consumer); _readyState = WebSocket.open; _deflate = deflate; var transformer = _WebSocketProtocolTransformer(_serverSide, deflate); var subscription = _subscription = transformer .bind(_socket) .listen( (data) { if (data is _WebSocketPing) { if (!_writeClosed) _consumer.add(_WebSocketPong(data.payload)); } else if (data is _WebSocketPong) { // Simply set pingInterval, as it'll cancel any timers. pingInterval = _pingInterval; } else { _controller.add(data); } }, onError: (Object error, StackTrace stackTrace) { _closeTimer?.cancel(); if (error is FormatException) { _close(WebSocketStatus.invalidFramePayloadData); } else { _close(WebSocketStatus.protocolError); } // An error happened, set the close code set above. _closeCode = _outCloseCode; _closeReason = _outCloseReason; _controller.close(); }, onDone: () { _closeTimer?.cancel(); if (_readyState == WebSocket.open) { _readyState = WebSocket.closing; if (!_isReservedStatusCode(transformer.closeCode)) { _close(transformer.closeCode, transformer.closeReason); } else { _close(); } _readyState = WebSocket.closed; } // Protocol close, use close code from transformer. _closeCode = transformer.closeCode; _closeReason = transformer.closeReason; _controller.close(); }, cancelOnError: true, ); subscription.pause(); _controller ..onListen = subscription.resume ..onCancel = () { _subscription!.cancel(); _subscription = null; } ..onPause = subscription.pause ..onResume = subscription.resume; _webSockets[_serviceId] = this; } StreamSubscription listen( void onData(message)?, { Function? onError, void Function()? onDone, bool? cancelOnError, }) { return _controller.stream.listen( onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError, ); } Duration? get pingInterval => _pingInterval; void set pingInterval(Duration? interval) { if (_writeClosed) return; _pingTimer?.cancel(); _pingInterval = interval; if (interval == null) return; _pingTimer = Timer(interval, () { if (_writeClosed) return; _consumer.add(_WebSocketPing()); _pingTimer = Timer(interval, () { _closeTimer?.cancel(); // No pong received. _close(WebSocketStatus.goingAway); _closeCode = _outCloseCode; _closeReason = _outCloseReason; _controller.close(); }); }); } int get readyState => _readyState; String get extensions => ""; int? get closeCode => _closeCode; String? get closeReason => _closeReason; void add(data) { _sink.add(data); } void addUtf8Text(List bytes) { // TODO(40614): Remove once non-nullability is sound. ArgumentError.checkNotNull(bytes, "bytes"); _sink.add(_EncodedString(bytes)); } void addError(Object error, [StackTrace? stackTrace]) { _sink.addError(error, stackTrace); } Future addStream(Stream stream) => _sink.addStream(stream); Future get done => _sink.done; Future close([int? code, String? reason]) { if (_isReservedStatusCode(code)) { throw WebSocketException("Reserved status code $code"); } if (_outCloseCode == null) { _outCloseCode = code; _outCloseReason = reason; } if (!_controller.isClosed) { // If a close has not yet been received from the other end then // 1) make sure to listen on the stream so the close frame will be // processed if received. // 2) set a timer terminate the connection if a close frame is // not received. if (!_controller.hasListener && _subscription != null) { _controller.stream.drain().catchError((_) {}); } _closeTimer ??= Timer(const Duration(seconds: 5), () { // Reuse code and reason from the local close. _closeCode = _outCloseCode; _closeReason = _outCloseReason; _subscription?.cancel(); _controller.close(); _webSockets.remove(_serviceId); }); } return _sink.close(); } static String? get userAgent => _httpClient.userAgent; static set userAgent(String? userAgent) { _httpClient.userAgent = userAgent; } void _close([int? code, String? reason]) { if (_writeClosed) return; if (_outCloseCode == null) { _outCloseCode = code; _outCloseReason = reason; } _pingTimer?.cancel(); _writeClosed = true; _consumer.closeSocket(); _webSockets.remove(_serviceId); } static bool _isReservedStatusCode(int? code) { return code != null && (code < WebSocketStatus.normalClosure || code == WebSocketStatus.reserved1004 || code == WebSocketStatus.noStatusReceived || code == WebSocketStatus.abnormalClosure || (code > WebSocketStatus.internalServerError && code < WebSocketStatus.reserved1015) || (code >= WebSocketStatus.reserved1015 && code < 3000)); } }