// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; /// An asynchronous operation that can be cancelled. /// /// The value of this operation is exposed as [value]. When this operation is /// cancelled, [value] won't complete either successfully or with an error. If /// [value] has already completed, cancelling the operation does nothing. class CancelableOperation { /// The completer that produced this operation. /// /// That completer is canceled when [cancel] is called. CancelableCompleter _completer; CancelableOperation._(this._completer); /// Creates a [CancelableOperation] with the same result as the [result] /// future. /// /// When this operation is canceled, [onCancel] will be called and any value /// or error later produced by [result] will be discarded. /// If [onCancel] returns a [Future], it will be returned by [cancel]. /// /// The [onCancel] function will be called synchronously /// when the new operation is canceled, and will be called at most once. /// /// Calling this constructor is equivalent to creating a /// [CancelableCompleter] and completing it with [result]. factory CancelableOperation.fromFuture( Future result, { FutureOr Function()? onCancel, }) => (CancelableCompleter(onCancel: onCancel)..complete(result)).operation; /// Creates a [CancelableOperation] which completes to [value]. /// /// Canceling this operation does nothing. /// /// Calling this constructor is equivalent to creating a /// [CancelableCompleter] and completing it with [value]. factory CancelableOperation.fromValue(T value) => (CancelableCompleter()..complete(value)).operation; /// Creates a [CancelableOperation] wrapping [subscription]. /// /// This overrides [StreamSubscription.onDone] and /// [StreamSubscription.onError] so that the returned operation will complete /// when the subscription completes or emits an error. /// When this operation is canceled or when it emits an error, the /// subscription will be canceled (unlike /// `CancelableOperation.fromFuture(subscription.asFuture())`). static CancelableOperation fromSubscription( StreamSubscription subscription, ) { var completer = CancelableCompleter(onCancel: subscription.cancel); subscription.onDone(completer.complete); subscription.onError((Object error, StackTrace stackTrace) { subscription.cancel().whenComplete(() { completer.completeError(error, stackTrace); }); }); return completer.operation; } /// Creates a [CancelableOperation] that completes with the value of the first /// of [operations] to complete. /// /// Once any of [operations] completes, its result is forwarded to the /// new [CancelableOperation] and the rest are cancelled. If the /// new operation is cancelled, all the [operations] are cancelled as /// well. static CancelableOperation race( Iterable> operations, ) { operations = operations.toList(); if (operations.isEmpty) { throw ArgumentError('May not be empty', 'operations'); } var done = false; // Note: if one or more of the completers have already completed, // they're not actually cancelled by this. Future cancelAll() { done = true; return Future.wait([ for (var operation in operations) if (!operation.isCanceled) operation.cancel(), ]); } var completer = CancelableCompleter(onCancel: cancelAll); for (var operation in operations) { operation.then( (value) { if (!done) cancelAll().whenComplete(() => completer.complete(value)); }, onError: (error, stackTrace) { if (!done) { cancelAll().whenComplete( () => completer.completeError(error, stackTrace), ); } }, propagateCancel: false, ); } return completer.operation; } /// The result of this operation, if not cancelled. /// /// This future will not complete if the operation is cancelled. /// Use [valueOrCancellation] for a future which completes /// both if the operation is cancelled and if it isn't. Future get value => _completer._inner?.future ?? Completer().future; /// Creates a [Stream] containing the result of this operation. /// /// This is like `value.asStream()`, but if a subscription to the stream is /// canceled, this operation is as well. Stream asStream() { var controller = StreamController( sync: true, onCancel: _completer._cancel, ); _completer._inner?.future.then( (value) { controller.add(value); controller.close(); }, onError: (Object error, StackTrace stackTrace) { controller.addError(error, stackTrace); controller.close(); }, ); return controller.stream; } /// Creates a [Future] that completes when this operation completes *or* when /// it's cancelled. /// /// If this operation completes, this completes to the same result as [value]. /// If this operation is cancelled, the returned future waits for the future /// returned by [cancel], then completes to [cancellationValue]. Future valueOrCancellation([T? cancellationValue]) { var completer = Completer.sync(); value.then(completer.complete, onError: completer.completeError); _completer._cancelCompleter?.future.then((_) { completer.complete(cancellationValue); }, onError: completer.completeError); return completer.future; } /// Creates a new cancelable operation to be completed when this operation /// completes normally or as an error, or is cancelled. /// /// If this operation completes normally the value is passed to [onValue] /// and the returned operation is completed with the result. /// /// If this operation completes as an error, and no [onError] callback is /// provided, the returned operation is completed with the same error and /// stack trace. /// If this operation completes as an error, and an [onError] callback is /// provided, the returned operation is completed with the result. /// /// If this operation is canceled, and no [onCancel] callback is provided, /// the returned operation is canceled. /// If this operation is canceled, and an [onCancel] callback is provided, /// the returned operation is completed with the result. /// /// At most one of [onValue], [onError], or [onCancel] will be called. /// If any of [onValue], [onError], or [onCancel] throw a synchronous error, /// or return a `Future` that completes as an error, the error will be /// forwarded through the returned operation. /// /// If the returned operation is canceled before this operation completes or /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not /// be invoked. If [propagateCancel] is `true` (the default) then this /// operation is canceled as well. Pass `false` if there are multiple /// listeners on this operation and canceling the [onValue], [onError], and /// [onCancel] callbacks should not cancel the other listeners. CancelableOperation then( FutureOr Function(T) onValue, { FutureOr Function(Object, StackTrace)? onError, FutureOr Function()? onCancel, bool propagateCancel = true, }) => thenOperation( (value, completer) { completer.complete(onValue(value)); }, onError: onError == null ? null : (error, stackTrace, completer) { completer.complete(onError(error, stackTrace)); }, onCancel: onCancel == null ? null : (completer) { completer.complete(onCancel()); }, propagateCancel: propagateCancel, ); /// Creates a new cancelable operation to be completed when this operation /// completes normally or as an error, or is cancelled. /// /// If this operation completes normally the value is passed to [onValue] /// with a [CancelableCompleter] controlling the returned operation. /// /// If this operation completes as an error, and no [onError] callback is /// provided, the returned operation is completed with the same error and /// stack trace. /// If this operation completes as an error, and an [onError] callback is /// provided, the error and stack trace are passed to [onError] with a /// [CancelableCompleter] controlling the returned operation. /// /// If this operation is canceled, and no [onCancel] callback is provided, /// the returned operation is canceled. /// If this operation is canceled, and an [onCancel] callback is provided, /// the [onCancel] callback is called with a [CancelableCompleter] controlling /// the returned operation. /// /// At most one of [onValue], [onError], or [onCancel] will be called. /// If any of [onValue], [onError], or [onCancel] throw a synchronous error, /// or return a `Future` that completes as an error, the error will be /// forwarded through the returned operation. /// /// If the returned operation is canceled before this operation completes or /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not /// be invoked. If [propagateCancel] is `true` (the default) then this /// operation is canceled as well. Pass `false` if there are multiple /// listeners on this operation and canceling the [onValue], [onError], and /// [onCancel] callbacks should not cancel the other listeners. CancelableOperation thenOperation( FutureOr Function(T, CancelableCompleter) onValue, { FutureOr Function(Object, StackTrace, CancelableCompleter)? onError, FutureOr Function(CancelableCompleter)? onCancel, bool propagateCancel = true, }) { final completer = CancelableCompleter( onCancel: propagateCancel ? _cancelIfNotCanceled : null, ); // if `_completer._inner` completes before `completer` is cancelled // call `onValue` or `onError` with the result, and complete `completer` // with the result of that call (unless cancelled in the meantime). // // If `_completer._cancelCompleter` completes (always with a value) // before `completer` is cancelled, then call `onCancel` (if supplied) // with that that value and complete `completer` with the result of that // call (unless cancelled in the meantime). // // If any of the callbacks throw synchronously, the `completer` is // completed with that error. // // If no `onCancel` is provided, and `_completer._cancelCompleter` // completes before `completer` is cancelled, // then cancel `cancelCompleter`. (Cancelling twice is safe.) _completer._inner?.future.then( (value) async { if (completer.isCanceled) return; try { await onValue(value, completer); } catch (error, stack) { completer.completeError(error, stack); } }, onError: onError == null ? completer.completeError // Is ignored if already cancelled. : (Object error, StackTrace stack) async { if (completer.isCanceled) return; try { await onError(error, stack, completer); } catch (error2, stack2) { completer.completeErrorIfPending( error2, identical(error, error2) ? stack : stack2, ); } }, ); final cancelForwarder = _CancelForwarder(completer, onCancel); if (_completer.isCanceled) { cancelForwarder._forward(); } else { (_completer._cancelForwarders ??= []).add(cancelForwarder); } return completer.operation; } /// Cancels this operation. /// /// If this operation [isCompleted] or [isCanceled] this call is ignored. /// Returns the result of the `onCancel` callback, if one exists. Future cancel() => _completer._cancel(); Future? _cancelIfNotCanceled() => isCanceled ? null : cancel(); /// Whether this operation has been canceled before it completed. bool get isCanceled => _completer._isCanceled; /// Whether the result of this operation is ready. /// /// When ready, the [value] future is completed with the result value /// or error, and this operation can no longer be cancelled. /// An operation may be complete before the listeners on [value] are invoked. bool get isCompleted => _completer._isCompleted; } /// A completer for a [CancelableOperation]. class CancelableCompleter { // The cancelable completer is in one of the following states: // * Initial: // _inner != null // _cancelCompleter != null // _mayComplete: true // // * Async-completed: `complete` called with a future while Initial. // _inner != null // _cancelCompleter != null // _mayComplete: false // // * Completed: `complete` called with a value or `completeError` called // while Initial, or the future passed in Async-completed completes // while AsyncCompleted. // _inner != null // _cancelCompleter == null // _mayComplete: false // // * Cancelled may-complete: `_cancel` called while Initial. // Allows calling `complete`/`completeError` even if it does nothing. // _inner == null // _cancelCompleter != null // _mayComplete: true // // * Cancelled can't-complete: `_cancel` called while Async-completed. // _inner == null // _cancelCompleter != null // _mayComplete: false /// The completer for the wrapped future. /// /// At most one of `_inner.future` and `_cancelCompleter.future` will /// ever complete. /// Set to `null` when when the operation is canceled, because then /// it's guaranteed that this completer will never complete. Completer? _inner = Completer(); /// Completed when `cancel` is called. /// /// At most one of `_inner.future` and `_cancelCompleter.future` will /// ever complete. /// Set to `null` when [_inner] is completed, because then it's /// guaranteed that this completer will never complete. Completer? _cancelCompleter = Completer(); /// The callback to call if the operation is canceled. final FutureOr Function()? _onCancel; /// Additional cancellations to forward during cancel. /// /// When a cancelable operation is chained through `then` or `thenOperation` a /// cancellation on the original operation will synchronously cancel the /// chained operations. List<_CancelForwarder>? _cancelForwarders; /// Whether [complete] or [completeError] may still be called. /// /// Set to false when calling either. /// /// When completing by calling [complete] with a future, /// it's still possible to cancel until the result is actually /// available. /// You are also allowed to call [complete] or [completeError] /// after the operation has been canceled, as long as you only call it once. /// It just won't do anything after the operation is cancelled. /// This value only guards the calls to [complete] and [completeError]. bool _mayComplete = true; /// The operation controlled by this completer. late final operation = CancelableOperation._(this); /// Creates a new completer for a [CancelableOperation]. /// /// The cancelable [operation] can be completed using /// [complete] or [completeError]. /// /// The [onCancel] function is called if the [operation] is canceled, /// by calling [CancelableOperation.cancel] /// before the operation has completed. /// If [onCancel] returns a [Future], /// that future is also returned by [CancelableOperation.cancel]. /// /// The [onCancel] function will be called at most once. CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel; /// Whether the [_inner] completer has been completed. /// /// At this point it's no longer possible to cancel the operation. bool get _isCompleted => _cancelCompleter == null; /// Whether the completer was canceled before the result was ready. /// /// At this point, it's no longer possible to complete the operation. bool get _isCanceled => _inner == null; /// Whether the [complete] or [completeError] have been called. /// /// Once this completer has been completed with either a result or error, /// neither method may be called again. /// /// If [complete] was called with a [Future] argument, this completer may be /// completed before it's [operation] is completed. In that case the /// [operation] may still be canceled before the result is available. bool get isCompleted => !_mayComplete; /// Whether the completer was canceled before the result was ready. bool get isCanceled => _isCanceled; /// Completes [operation] with [value]. /// /// If [value] is a [Future] the [operation] will complete /// with the result of that `Future` once it is available. /// In that case [isCompleted] will be `true` before the [operation] /// is complete. /// /// If the type [T] is not nullable [value] may be not be omitted or `null`. /// /// This method may not be called after either [complete] or [completeError] /// has been called once. /// The [isCompleted] is true when either of these methods have been called. void complete([FutureOr? value]) { if (!_mayComplete) throw StateError('Operation already completed'); _mayComplete = false; if (value is! Future) { _completeNow()?.complete(value); return; } if (_inner == null) { // Make sure errors from [value] aren't top-leveled. value.ignore(); return; } value.then( (result) { _completeNow()?.complete(result); }, onError: (Object error, StackTrace stackTrace) { _completeNow()?.completeError(error, stackTrace); }, ); } /// Makes this [CancelableCompleter.operation] complete with the same result /// as [result]. /// /// If [propagateCancel] is `true` (the default), and the [operation] of this /// completer is canceled before [result] completes, then [result] is also /// canceled. void completeOperation( CancelableOperation result, { bool propagateCancel = true, }) { if (!_mayComplete) throw StateError('Already completed'); _mayComplete = false; if (isCanceled) { if (propagateCancel) result.cancel(); result.value.ignore(); return; } result.then( (value) { _inner?.complete( value, ); // _inner is set to null if this.operation is cancelled. }, onError: (error, stack) { _inner?.completeError(error, stack); }, onCancel: () { operation.cancel(); }, ); if (propagateCancel) { _cancelCompleter?.future.whenComplete(result.cancel); } } /// Completer to use for completing with a result. /// /// Returns `null` if it's not possible to complete any more. /// Sets [_cancelCompleter] to `null` if returning non-`null`. Completer? _completeNow() { var inner = _inner; if (inner == null) return null; _cancelCompleter = null; return inner; } /// Completes [operation] with [error] and [stackTrace]. /// /// This method may not be called after either [complete] or [completeError] /// has been called once. /// The [isCompleted] is true when either of these methods have been called. void completeError(Object error, [StackTrace? stackTrace]) { if (!_mayComplete) throw StateError('Operation already completed'); _mayComplete = false; _completeNow()?.completeError(error, stackTrace); } /// Cancels the operation. /// /// If the operation has already completed, prior to being cancelled, /// this method does nothing. /// If the operation has already been cancelled, this method returns /// the same result as the first call to `_cancel`. /// /// The result of the operation may only be available some time after /// the completer has been completed (using [complete] or [completeError], /// which sets [isCompleted] to true) if completed with a [Future]. /// The completer can be cancelled until the result becomes available, /// even if [isCompleted] is true. Future _cancel() { var cancelCompleter = _cancelCompleter; if (cancelCompleter == null) return Future.value(null); if (_inner != null) { _inner = null; cancelCompleter.complete(_invokeCancelCallbacks()); } return cancelCompleter.future; } /// Invoke [_onCancel] and forward to other completers in [_cancelForwarders]. /// /// Returns the same value as [_onCancel]. Legacy uses may return a value /// despite the signature having `void` return. Future _invokeCancelCallbacks() async { final FutureOr toReturn = _onCancel?.call(); final isFuture = toReturn is Future; final cancelFutures = >[ if (isFuture) toReturn, ...?_cancelForwarders?.map(_forward).nonNulls, ]; final results = (isFuture && cancelFutures.length == 1) ? [await toReturn] : cancelFutures.isNotEmpty ? await Future.wait(cancelFutures) : const []; return isFuture ? results.first : toReturn; } } class _CancelForwarder { final CancelableCompleter completer; final FutureOr Function(CancelableCompleter)? onCancel; _CancelForwarder(this.completer, this.onCancel); Future? _forward() { if (completer.isCanceled) return null; final onCancel = this.onCancel; if (onCancel == null) return completer._cancel(); try { final result = onCancel(completer); if (result is Future) { return result.catchError(completer.completeErrorIfPending); } } catch (error, stack) { completer.completeErrorIfPending(error, stack); } return null; } } // Helper function to avoid a closure for `List<_CancelForwarder>.map`. Future? _forward(_CancelForwarder forwarder) => forwarder._forward(); extension on CancelableCompleter { void completeErrorIfPending(Object error, StackTrace stackTrace) { if (isCompleted) return; completeError(error, stackTrace); } }