// Copyright 2021 The Dart Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. import 'dart:async'; import 'dart:typed_data'; import 'package:logging/logging.dart'; import 'package:sse/client/sse_client.dart'; import 'package:stream_channel/stream_channel.dart'; import 'package:web_socket/web_socket.dart'; abstract class SocketClient { StreamSink get sink; Stream get stream; void close(); } class SseSocketClient extends SocketClient { final SseClient _client; SseSocketClient(this._client); @override StreamSink get sink => _client.sink; @override Stream get stream => _client.stream; @override void close() => _client.close(); } class WebSocketClient extends SocketClient { final StreamChannelMixin _channel; WebSocketClient(this._channel); @override StreamSink get sink => _channel.sink; @override Stream get stream => _channel.stream.map((dynamic o) => o.toString()); @override void close() => _channel.sink.close(); } typedef ReconnectCallback = FutureOr Function(StreamSink); /// A [WebSocket] wrapper that can automatically re-establish a connection /// when a connection is lost (e.g., upon entering "sleep" mode, flaky network /// connections, etc.). class PersistentWebSocket with StreamChannelMixin { PersistentWebSocket._( this.uri, this._ws, { required this.exponentialBackoffDelayMs, required this.maxRetryAttempts, required this.debugName, this.onReconnect, this.logger, }); /// Creates a [PersistentWebSocket] instance connected to [url]. /// /// [debugName] is a string included in logs written by this class to provide /// additional information about the purpose of this connection. /// /// [maxRetryAttempts] sets the maximum number of retry attempts that can be /// made before giving up. /// /// [exponentialBackoffDelayMs] is the length of the initial delay before /// attempting to retry the connection in milliseconds. This delay doubles /// after each unsuccessful retry attempt. /// /// If [logger] is provided, messages will be logged when attempting to /// re-establish a connection. /// /// No retries are attempted when making the initial web socket connection, /// so callers must be prepared to handle both [SocketException]s and /// [WebSocketException] thrown if the connection to [url] fails. static Future connect( Uri uri, { String debugName = kDefaultDebugName, int maxRetryAttempts = kDefaultMaxRetryAttempts, int exponentialBackoffDelayMs = kDefaultBackoffDelayMs, ReconnectCallback? onReconnect, Logger? logger, }) { return WebSocket.connect(uri).then((socket) { return PersistentWebSocket._( uri, socket, exponentialBackoffDelayMs: exponentialBackoffDelayMs, maxRetryAttempts: maxRetryAttempts, logger: logger, onReconnect: onReconnect, debugName: debugName, ); }); } final Logger? logger; static const kDefaultDebugName = 'Unknown'; /// The debug name associated with this connection. /// /// Useful when trying to identify the context of a given connection. If /// [logger] is set, this name will be output as part of messages output /// while attempting to re-establish connections. final String debugName; static const kDefaultMaxRetryAttempts = 3; /// The number of retry attempts to make before giving up. final int maxRetryAttempts; static const kDefaultBackoffDelayMs = 100; /// The amount of time to wait before attempting to retry the connection. /// /// The retry delay is calculated using exponential backoff, where each /// successive delay before a retry attempt is twice as long as the last /// delay. final int exponentialBackoffDelayMs; /// Completes when the web socket connection is disposed of. Future get done => _doneCompleter.future; final _doneCompleter = Completer(); /// The URI used to establish the web socket connection. final Uri uri; ReconnectCallback? onReconnect; WebSocket _ws; late final _incomingStreamController = StreamController() ..onListen = _listenWithRetry; final _outgoingStreamController = StreamController(); /// Used to indicate that the web socket was closed after [close] was /// invoked. var _closedManually = false; void _writeToWebSocket(dynamic data) { if (data is String) { _ws.sendText(data); } else if (data is Uint8List) { _ws.sendBytes(data); } else { throw UnsupportedError('Unexpected data type: ${data.runtimeType}'); } } /// Close the web socket connection. Future close() async { if (_closedManually) { return; } _closedManually = true; await _incomingStreamController.close(); await _outgoingStreamController.close(); await _ws.close(); } Future _listenWithRetry() async { var retry = false; var retryCount = 0; _outgoingStreamController.stream.listen(_writeToWebSocket); Future attemptRetry(String message) async { await Future.delayed(Duration(milliseconds: 100 << retryCount)); retryCount++; retry = !_closedManually; if (!_closedManually) { logger?.info('$message. Retrying ($retryCount / $maxRetryAttempts)'); } } do { final wsOnDoneCompleter = Completer(); // Check if the connection has been closed during an asynchronous gap. if (_closedManually) { break; } final retried = retry; if (retry) { try { _ws = await WebSocket.connect(uri); // Reset the retry counter on a successful reconnection. retryCount = 0; } on Exception { await attemptRetry( 'Failed to establish connection to $uri ($debugName).', ); continue; } retry = false; } // If the last web socket connection closed unexpected, try to // re-establish the connection. late StreamSubscription eventsSub; eventsSub = _ws.events.listen((e) async { switch (e) { case TextDataReceived(:final text): _incomingStreamController.sink.add(text); case BinaryDataReceived(:final data): _incomingStreamController.sink.add(data); case CloseReceived(:final code): if (code == 1006) { await attemptRetry('Lost connection to $uri ($debugName).'); await eventsSub.cancel(); } wsOnDoneCompleter.complete(); } }); if (retried) { await onReconnect?.call(sink); } // Wait for the web socket's onDone handler to run before attempting to // retry. Waiting on _ws.done can result in a race condition. await wsOnDoneCompleter.future; } while (retry && retryCount < maxRetryAttempts); _doneCompleter.complete(); if (!_incomingStreamController.isClosed) { await _incomingStreamController.sink.close(); } } @override StreamSink get sink => _outgoingStreamController.sink; @override Stream get stream => _incomingStreamController.stream.cast(); }