// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. part of "dart:async"; /// Runs user code and takes actions depending on success or failure. _runUserCode( T userCode(), onSuccess(T value), onError(Object error, StackTrace stackTrace), ) { try { onSuccess(userCode()); } catch (error, stackTrace) { AsyncError? replacement = _interceptError(error, stackTrace); if (replacement != null) { onError(replacement.error, replacement.stackTrace); } else { onError(error, stackTrace); } } } /** Helper function to cancel a subscription and wait for the potential future, before completing with an error. */ void _cancelAndError( StreamSubscription subscription, _Future future, AsyncError error, ) { var cancelFuture = subscription.cancel(); if (!identical(cancelFuture, Future._nullFuture)) { cancelFuture.whenComplete(() => future._completeErrorObject(error)); } else { future._completeErrorObject(error); } } void _cancelAndErrorWithReplacement( StreamSubscription subscription, _Future future, Object error, StackTrace stackTrace, ) { _cancelAndError( subscription, future, _interceptCaughtError(error, stackTrace), ); } /// Helper function to make an onError argument to [_runUserCode]. /// /// The error is already an asynchronous error, so is not intercepted. void Function(Object error, StackTrace stackTrace) _cancelAndErrorClosure( StreamSubscription subscription, _Future future, ) { return (Object error, StackTrace stackTrace) { _cancelAndError(subscription, future, AsyncError(error, stackTrace)); }; } // Helper function to cancel a subscription and wait for the potential future, // before completing with a value. void _cancelAndValue(StreamSubscription subscription, _Future future, value) { var cancelFuture = subscription.cancel(); if (!identical(cancelFuture, Future._nullFuture)) { cancelFuture.whenComplete(() => future._complete(value)); } else { future._complete(value); } } /// A [Stream] that forwards subscriptions to another stream. /// /// This stream implements [Stream], but forwards all subscriptions /// to an underlying stream, and wraps the returned subscription to /// modify the events on the way. /// /// This class is intended for internal use only. abstract class _ForwardingStream extends Stream { final Stream _source; _ForwardingStream(this._source); bool get isBroadcast => _source.isBroadcast; StreamSubscription listen( void onData(T value)?, { Function? onError, void onDone()?, bool? cancelOnError, }) { return _createSubscription(onData, onError, onDone, cancelOnError ?? false); } StreamSubscription _createSubscription( void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError, ) { return new _ForwardingStreamSubscription( this, onData, onError, onDone, cancelOnError, ); } // Override the following methods in subclasses to change the behavior. void _handleData(S data, _EventSink sink); void _handleError(Object error, StackTrace stackTrace, _EventSink sink) { sink._addError(error, stackTrace); } void _handleDone(_EventSink sink) { sink._close(); } } /// Abstract superclass for subscriptions that forward to other subscriptions. class _ForwardingStreamSubscription extends _BufferingStreamSubscription { final _ForwardingStream _stream; StreamSubscription? _subscription; _ForwardingStreamSubscription( this._stream, void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError, ) : super(onData, onError, onDone, cancelOnError) { _subscription = _stream._source.listen( _handleData, onError: _handleError, onDone: _handleDone, ); } // _StreamSink interface. // Transformers sending more than one event have no way to know if the stream // is canceled or closed after the first, so we just ignore remaining events. void _add(T data) { if (_isClosed) return; super._add(data); } void _addError(Object error, StackTrace stackTrace) { if (_isClosed) return; super._addError(error, stackTrace); } // StreamSubscription callbacks. void _onPause() { _subscription?.pause(); } void _onResume() { _subscription?.resume(); } Future? _onCancel() { var subscription = _subscription; if (subscription != null) { _subscription = null; return subscription.cancel(); } return null; } // Methods used as listener on source subscription. void _handleData(S data) { _stream._handleData(data, this); } void _handleError(error, StackTrace stackTrace) { _stream._handleError(error, stackTrace, this); } void _handleDone() { _stream._handleDone(this); } } // ------------------------------------------------------------------- // Stream transformers used by the default Stream implementation. // ------------------------------------------------------------------- void _addErrorWithReplacement( _EventSink sink, Object error, StackTrace stackTrace, ) { var replacement = _interceptError(error, stackTrace); if (replacement != null) { error = replacement.error; stackTrace = replacement.stackTrace; } sink._addError(error, stackTrace); } class _WhereStream extends _ForwardingStream { final bool Function(T) _test; _WhereStream(Stream source, bool test(T value)) : _test = test, super(source); void _handleData(T inputEvent, _EventSink sink) { bool satisfies; try { satisfies = _test(inputEvent); } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } if (satisfies) { sink._add(inputEvent); } } } typedef T _Transformation(S value); /// A stream pipe that converts data events before passing them on. class _MapStream extends _ForwardingStream { final _Transformation _transform; _MapStream(Stream source, T transform(S event)) : this._transform = transform, super(source); void _handleData(S inputEvent, _EventSink sink) { T outputEvent; try { outputEvent = _transform(inputEvent); } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } sink._add(outputEvent); } } /// A stream pipe that converts data events before passing them on. class _ExpandStream extends _ForwardingStream { final _Transformation> _expand; _ExpandStream(Stream source, Iterable expand(S event)) : this._expand = expand, super(source); void _handleData(S inputEvent, _EventSink sink) { try { for (T value in _expand(inputEvent)) { sink._add(value); } } catch (e, s) { // If either _expand or iterating the generated iterator throws, // we abort the iteration. _addErrorWithReplacement(sink, e, s); } } } /// A stream pipe that converts or disposes error events /// before passing them on. class _HandleErrorStream extends _ForwardingStream { final void Function(Object, StackTrace) _onError; final bool Function(Object)? _test; _HandleErrorStream(Stream source, this._onError, this._test) : super(source); void _handleData(T data, _EventSink sink) { sink._add(data); } void _handleError(Object error, StackTrace stackTrace, _EventSink sink) { bool matches = true; var test = _test; if (test != null) { try { matches = test(error); } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } } if (matches) { try { _onError(error, stackTrace); } catch (e, s) { if (identical(e, error)) { sink._addError(error, stackTrace); } else { _addErrorWithReplacement(sink, e, s); } return; } } else { sink._addError(error, stackTrace); } } } class _TakeStream extends _ForwardingStream { final int _count; _TakeStream(Stream source, int count) : this._count = count, super(source); StreamSubscription _createSubscription( void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError, ) { if (_count == 0) { _source.listen(null).cancel(); return new _DoneStreamSubscription(onDone); } return new _StateStreamSubscription( this, onData, onError, onDone, cancelOnError, _count, ); } void _handleData(T inputEvent, _EventSink sink) { var subscription = sink as _StateStreamSubscription; int count = subscription._subState; if (count > 0) { sink._add(inputEvent); count -= 1; subscription._subState = count; if (count == 0) { // Closing also unsubscribes all subscribers, which unsubscribes // this from source. sink._close(); } } } } /// A [_ForwardingStreamSubscription] with one extra state field. /// /// Use by several different classes, storing an integer, bool or general. class _StateStreamSubscription extends _ForwardingStreamSubscription { S _subState; _StateStreamSubscription( _ForwardingStream stream, void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError, this._subState, ) : super(stream, onData, onError, onDone, cancelOnError); } class _TakeWhileStream extends _ForwardingStream { final bool Function(T) _test; _TakeWhileStream(Stream source, bool test(T value)) : this._test = test, super(source); void _handleData(T inputEvent, _EventSink sink) { bool satisfies; try { satisfies = _test(inputEvent); } catch (e, s) { _addErrorWithReplacement(sink, e, s); // The test didn't say true. Didn't say false either, but we stop anyway. sink._close(); return; } if (satisfies) { sink._add(inputEvent); } else { sink._close(); } } } class _SkipStream extends _ForwardingStream { final int _count; _SkipStream(Stream source, int count) : this._count = count, super(source) { // This test is done early to avoid handling an async error // in the _handleData method. RangeError.checkNotNegative(count, "count"); } StreamSubscription _createSubscription( void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError, ) { return new _StateStreamSubscription( this, onData, onError, onDone, cancelOnError, _count, ); } void _handleData(T inputEvent, _EventSink sink) { var subscription = sink as _StateStreamSubscription; int count = subscription._subState; if (count > 0) { subscription._subState = count - 1; return; } sink._add(inputEvent); } } class _SkipWhileStream extends _ForwardingStream { final bool Function(T) _test; _SkipWhileStream(Stream source, bool test(T value)) : this._test = test, super(source); StreamSubscription _createSubscription( void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError, ) { return new _StateStreamSubscription( this, onData, onError, onDone, cancelOnError, false, ); } void _handleData(T inputEvent, _EventSink sink) { var subscription = sink as _StateStreamSubscription; bool hasFailed = subscription._subState; if (hasFailed) { sink._add(inputEvent); return; } bool satisfies; try { satisfies = _test(inputEvent); } catch (e, s) { _addErrorWithReplacement(sink, e, s); // A failure to return a boolean is considered "not matching". subscription._subState = true; return; } if (!satisfies) { subscription._subState = true; sink._add(inputEvent); } } } class _DistinctStream extends _ForwardingStream { static final _SENTINEL = new Object(); final bool Function(T, T)? _equals; _DistinctStream(Stream source, bool equals(T a, T b)?) : _equals = equals, super(source); StreamSubscription _createSubscription( void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError, ) { return new _StateStreamSubscription( this, onData, onError, onDone, cancelOnError, _SENTINEL, ); } void _handleData(T inputEvent, _EventSink sink) { var subscription = sink as _StateStreamSubscription; var previous = subscription._subState; if (identical(previous, _SENTINEL)) { // First event. Cannot use [_equals]. subscription._subState = inputEvent; sink._add(inputEvent); } else { T previousEvent = previous as T; var equals = _equals; bool isEqual; try { if (equals == null) { isEqual = (previousEvent == inputEvent); } else { isEqual = equals(previousEvent, inputEvent); } } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } if (!isEqual) { sink._add(inputEvent); subscription._subState = inputEvent; } } } }