// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. part of "dart:async"; /// Wraps an [_EventSink] so it exposes only the [EventSink] interface. class _EventSinkWrapper implements EventSink { _EventSink _sink; _EventSinkWrapper(this._sink); void add(T data) { _sink._add(data); } void addError(Object error, [StackTrace? stackTrace]) { _sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error)); } void close() { _sink._close(); } } /// A StreamSubscription that pipes data through a sink. /// /// The constructor of this class takes a [_SinkMapper] which maps from /// [EventSink] to [EventSink]. The input to the mapper is the output of /// the transformation. The returned sink is the transformation's input. class _SinkTransformerStreamSubscription extends _BufferingStreamSubscription { /// The transformer's input sink. late EventSink _transformerSink; /// The subscription to the input stream. StreamSubscription? _subscription; _SinkTransformerStreamSubscription( Stream source, _SinkMapper mapper, void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError, ) // We set the adapter's target only when the user is allowed to send data. : super(onData, onError, onDone, cancelOnError) { _transformerSink = mapper(_EventSinkWrapper(this)); _subscription = source.listen( _handleData, onError: _handleError, onDone: _handleDone, ); } // _EventSink interface. /// Adds an event to this subscriptions. /// /// Contrary to normal [_BufferingStreamSubscription]s we may receive /// events when the stream is already closed. Report them as state /// error. void _add(T data) { if (_isClosed) { throw StateError("Stream is already closed"); } super._add(data); } /// Adds an error event to this subscriptions. /// /// Contrary to normal [_BufferingStreamSubscription]s we may receive /// events when the stream is already closed. Report them as state /// error. void _addError(Object error, StackTrace stackTrace) { if (_isClosed) { throw StateError("Stream is already closed"); } super._addError(error, stackTrace); } /// Adds a close event to this subscriptions. /// /// Contrary to normal [_BufferingStreamSubscription]s we may receive /// events when the stream is already closed. Report them as state /// error. void _close() { if (_isClosed) { throw StateError("Stream is already closed"); } super._close(); } // _BufferingStreamSubscription hooks. void _onPause() { _subscription?.pause(); } void _onResume() { _subscription?.resume(); } Future? _onCancel() { var subscription = _subscription; if (subscription != null) { _subscription = null; return subscription.cancel(); } return null; } void _handleData(S data) { try { _transformerSink.add(data); } catch (e, s) { _addError(e, s); } } void _handleError(Object error, StackTrace stackTrace) { try { _transformerSink.addError(error, stackTrace); } catch (e, s) { if (identical(e, error)) { _addError(error, stackTrace); } else { _addError(e, s); } } } void _handleDone() { try { _subscription = null; _transformerSink.close(); } catch (e, s) { _addError(e, s); } } } typedef EventSink _SinkMapper(EventSink output); /// A [StreamTransformer] for [Sink]-mappers. /// /// A Sink-mapper takes an [EventSink] (its output) and returns another /// [EventSink] (its input). /// /// Note that this class can be `const`. class _StreamSinkTransformer extends StreamTransformerBase { final _SinkMapper _sinkMapper; const _StreamSinkTransformer(this._sinkMapper); Stream bind(Stream stream) => _BoundSinkStream(stream, _sinkMapper); } /// The result of binding a [StreamTransformer] for [Sink]-mappers. /// /// It contains the bound Stream and the sink-mapper. Only when the user starts /// listening to this stream is the sink-mapper invoked. The result is used /// to create a StreamSubscription that transforms events. class _BoundSinkStream extends Stream { final _SinkMapper _sinkMapper; final Stream _stream; bool get isBroadcast => _stream.isBroadcast; _BoundSinkStream(this._stream, this._sinkMapper); StreamSubscription listen( void onData(T event)?, { Function? onError, void onDone()?, bool? cancelOnError, }) { StreamSubscription subscription = _SinkTransformerStreamSubscription( _stream, _sinkMapper, onData, onError, onDone, cancelOnError ?? false, ); return subscription; } } /// Data-handler coming from [StreamTransformer.fromHandlers]. typedef void _TransformDataHandler(S data, EventSink sink); /// Error-handler coming from [StreamTransformer.fromHandlers]. typedef void _TransformErrorHandler( Object error, StackTrace stackTrace, EventSink sink, ); /// Done-handler coming from [StreamTransformer.fromHandlers]. typedef void _TransformDoneHandler(EventSink sink); /// Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. /// /// This way we can reuse the code from [_StreamSinkTransformer]. class _HandlerEventSink implements EventSink { final _TransformDataHandler? _handleData; final _TransformErrorHandler? _handleError; final _TransformDoneHandler? _handleDone; /// The output sink where the handlers should send their data into. /// Set to `null` when closed. EventSink? _sink; _HandlerEventSink( this._handleData, this._handleError, this._handleDone, EventSink this._sink, ); void add(S data) { var sink = _sink; if (sink == null) { throw StateError("Sink is closed"); } var handleData = _handleData; if (handleData != null) { handleData(data, sink); } else { sink.add(data as T); } } void addError(Object error, [StackTrace? stackTrace]) { var sink = _sink; if (sink == null) { throw StateError("Sink is closed"); } var handleError = _handleError; stackTrace ??= AsyncError.defaultStackTrace(error); if (handleError != null) { handleError(error, stackTrace, sink); } else { sink.addError(error, stackTrace); } } void close() { var sink = _sink; if (sink == null) return; _sink = null; var handleDone = _handleDone; if (handleDone != null) { handleDone(sink); } else { sink.close(); } } } /// A StreamTransformer that transformers events with the given handlers. /// /// Note that this transformer can only be used once. class _StreamHandlerTransformer extends _StreamSinkTransformer { _StreamHandlerTransformer({ void handleData(S data, EventSink sink)?, void handleError(Object error, StackTrace stackTrace, EventSink sink)?, void handleDone(EventSink sink)?, }) : super((EventSink outputSink) { return _HandlerEventSink( handleData, handleError, handleDone, outputSink, ); }); Stream bind(Stream stream) { return super.bind(stream); } } /// A StreamTransformer that overrides [StreamTransformer.bind] with a callback. class _StreamBindTransformer extends StreamTransformerBase { final Stream Function(Stream) _bind; _StreamBindTransformer(this._bind); Stream bind(Stream stream) => _bind(stream); } /// A closure mapping a stream and cancelOnError to a StreamSubscription. typedef StreamSubscription _SubscriptionTransformer( Stream stream, bool cancelOnError, ); /// A [StreamTransformer] that minimizes the number of additional classes. /// /// Instead of implementing three classes: a [StreamTransformer], a [Stream] /// (as the result of a `bind` call) and a [StreamSubscription] (which does the /// actual work), this class only requires a function that is invoked when the /// last bit (the subscription) of the transformer-workflow is needed. /// /// The given transformer function maps from Stream and cancelOnError to a /// `StreamSubscription`. As such it can also act on `cancel` events, making it /// fully general. class _StreamSubscriptionTransformer extends StreamTransformerBase { final _SubscriptionTransformer _onListen; const _StreamSubscriptionTransformer(this._onListen); Stream bind(Stream stream) => _BoundSubscriptionStream(stream, _onListen); } /// A stream transformed by a [_StreamSubscriptionTransformer]. /// /// When this stream is listened to it invokes the [_onListen] function with /// the stored [_stream]. Usually the transformer starts listening at this /// moment. class _BoundSubscriptionStream extends Stream { final _SubscriptionTransformer _onListen; final Stream _stream; bool get isBroadcast => _stream.isBroadcast; _BoundSubscriptionStream(this._stream, this._onListen); StreamSubscription listen( void onData(T event)?, { Function? onError, void onDone()?, bool? cancelOnError, }) { StreamSubscription result = _onListen(_stream, cancelOnError ?? false); result.onData(onData); result.onError(onError); result.onDone(onDone); return result; } }