// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; import 'package:meta/meta.dart'; /// A [StreamTransformer] that allows the caller to forcibly close the /// transformed [Stream](s). /// /// When [close] is called, any stream (or streams) transformed by this /// transformer that haven't already completed or been cancelled will emit a /// done event and cancel their underlying subscriptions. /// /// Note that unlike most [StreamTransformer]s, each instance of [StreamCloser] /// has its own state (whether or not it's been closed), so it's a good idea to /// construct a new one for each use unless you need to close multiple streams /// at the same time. @sealed class StreamCloser extends StreamTransformerBase { /// The subscriptions to streams passed to [bind]. final _subscriptions = >{}; /// The controllers for streams returned by [bind]. final _controllers = >{}; /// Closes all transformed streams. /// /// Returns a future that completes when all inner subscriptions' /// [StreamSubscription.cancel] futures have completed. Note that a stream's /// subscription won't be canceled until the transformed stream has a /// listener. /// /// If a transformed stream is listened to after [close] is called, the /// original stream will be listened to and then the subscription immediately /// canceled. If that cancellation throws an error, it will be silently /// ignored. Future close() => _closeFuture ??= () { var futures = [ for (var subscription in _subscriptions) subscription.cancel(), ]; _subscriptions.clear(); var controllers = _controllers.toList(); _controllers.clear(); scheduleMicrotask(() { for (var controller in controllers) { scheduleMicrotask(controller.close); } }); return Future.wait(futures, eagerError: true); }(); Future? _closeFuture; /// Whether [close] has been called. bool get isClosed => _closeFuture != null; @override Stream bind(Stream stream) { var controller = stream.isBroadcast ? StreamController.broadcast(sync: true) : StreamController(sync: true); controller.onListen = () { if (isClosed) { // Ignore errors here, because otherwise there would be no way for the // user to handle them gracefully. stream.listen(null).cancel().catchError((_) {}); return; } var subscription = stream.listen( controller.add, onError: controller.addError, ); subscription.onDone(() { _subscriptions.remove(subscription); _controllers.remove(controller); controller.close(); }); _subscriptions.add(subscription); if (!stream.isBroadcast) { controller.onPause = subscription.pause; controller.onResume = subscription.resume; } controller.onCancel = () { _controllers.remove(controller); // If the subscription has already been removed, that indicates that the // underlying stream has been cancelled by [close] and its cancellation // future has been handled there. In that case, we shouldn't forward it // here as well. if (_subscriptions.remove(subscription)) return subscription.cancel(); return null; }; }; if (isClosed) { controller.close(); } else { _controllers.add(controller); } return controller.stream; } }