// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; import 'package:async/async.dart'; import 'src/close_guarantee_channel.dart'; import 'src/guarantee_channel.dart'; import 'src/stream_channel_transformer.dart'; export 'src/delegating_stream_channel.dart'; export 'src/disconnector.dart'; export 'src/json_document_transformer.dart'; export 'src/multi_channel.dart'; export 'src/stream_channel_completer.dart'; export 'src/stream_channel_controller.dart'; export 'src/stream_channel_transformer.dart'; /// An abstract class representing a two-way communication channel. /// /// Users should consider the [stream] emitting a "done" event to be the /// canonical indicator that the channel has closed. If they wish to close the /// channel, they should close the [sink]—canceling the stream subscription is /// not sufficient. Protocol errors may be emitted through the stream or through /// [sink].done, depending on their underlying cause. Note that the sink may /// silently drop events if the channel closes before [sink].close is called. /// /// Implementations are strongly encouraged to mix in or extend /// [StreamChannelMixin] to get default implementations of the various instance /// methods. Adding new methods to this interface will not be considered a /// breaking change if implementations are also added to [StreamChannelMixin]. /// /// Implementations must provide the following guarantees: /// /// * The stream is single-subscription, and must follow all the guarantees of /// single-subscription streams. /// /// * Closing the sink causes the stream to close before it emits any more /// events. /// /// * After the stream closes, the sink is automatically closed. If this /// happens, sink methods should silently drop their arguments until /// [sink].close is called. /// /// * If the stream closes before it has a listener, the sink should silently /// drop events if possible. /// /// * Canceling the stream's subscription has no effect on the sink. The channel /// must still be able to respond to the other endpoint closing the channel /// even after the subscription has been canceled. /// /// * The sink *either* forwards errors to the other endpoint *or* closes as /// soon as an error is added and forwards that error to the [sink].done /// future. /// /// These guarantees allow users to interact uniformly with all implementations, /// and ensure that either endpoint closing the stream produces consistent /// behavior. abstract class StreamChannel { /// The single-subscription stream that emits values from the other endpoint. Stream get stream; /// The sink for sending values to the other endpoint. StreamSink get sink; /// Creates a new [StreamChannel] that communicates over [stream] and [sink]. /// /// Note that this stream/sink pair must provide the guarantees listed in the /// [StreamChannel] documentation. If they don't do so natively, /// [StreamChannel.withGuarantees] should be used instead. factory StreamChannel(Stream stream, StreamSink sink) => _StreamChannel(stream, sink); /// Creates a new [StreamChannel] that communicates over [stream] and [sink]. /// /// Unlike [StreamChannel.new], this enforces the guarantees listed in the /// [StreamChannel] documentation. This makes it somewhat less efficient than /// just wrapping a stream and a sink directly, so [StreamChannel.new] should /// be used when the guarantees are provided natively. /// /// If [allowSinkErrors] is `false`, errors are not allowed to be passed to /// [sink]. If any are, the connection will close and the error will be /// forwarded to [sink].done. factory StreamChannel.withGuarantees(Stream stream, StreamSink sink, {bool allowSinkErrors = true}) => GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors); /// Creates a new [StreamChannel] that communicates over [stream] and [sink]. /// /// This specifically enforces the second guarantee: closing the sink causes /// the stream to close before it emits any more events. This guarantee is /// invalidated when an asynchronous gap is added between the original /// stream's event dispatch and the returned stream's, for example by /// transforming it with a [StreamTransformer]. This is a lighter-weight way /// of preserving that guarantee in particular than /// [StreamChannel.withGuarantees]. factory StreamChannel.withCloseGuarantee( Stream stream, StreamSink sink) => CloseGuaranteeChannel(stream, sink); /// Connects this to [other], so that any values emitted by either are sent /// directly to the other. void pipe(StreamChannel other); /// Transforms this using [transformer]. /// /// This is identical to calling `transformer.bind(channel)`. StreamChannel transform(StreamChannelTransformer transformer); /// Transforms only the [stream] component of this using [transformer]. StreamChannel transformStream(StreamTransformer transformer); /// Transforms only the [sink] component of this using [transformer]. StreamChannel transformSink(StreamSinkTransformer transformer); /// Returns a copy of this with [stream] replaced by [change]'s return /// value. StreamChannel changeStream(Stream Function(Stream) change); /// Returns a copy of this with [sink] replaced by [change]'s return /// value. StreamChannel changeSink(StreamSink Function(StreamSink) change); /// Returns a copy of this with the generic type coerced to [S]. /// /// If any events emitted by [stream] aren't of type [S], they're converted /// into [TypeError] events (`CastError` on some SDK versions). Similarly, if /// any events are added to [sink] that aren't of type [S], a [TypeError] is /// thrown. StreamChannel cast(); } /// An implementation of [StreamChannel] that simply takes a stream and a sink /// as parameters. /// /// This is distinct from [StreamChannel] so that it can use /// [StreamChannelMixin]. class _StreamChannel extends StreamChannelMixin { @override final Stream stream; @override final StreamSink sink; _StreamChannel(this.stream, this.sink); } /// A mixin that implements the instance methods of [StreamChannel] in terms of /// [stream] and [sink]. abstract mixin class StreamChannelMixin implements StreamChannel { @override void pipe(StreamChannel other) { stream.pipe(other.sink); other.stream.pipe(sink); } @override StreamChannel transform(StreamChannelTransformer transformer) => transformer.bind(this); @override StreamChannel transformStream(StreamTransformer transformer) => changeStream(transformer.bind); @override StreamChannel transformSink(StreamSinkTransformer transformer) => changeSink(transformer.bind); @override StreamChannel changeStream(Stream Function(Stream) change) => StreamChannel.withCloseGuarantee(change(stream), sink); @override StreamChannel changeSink(StreamSink Function(StreamSink) change) => StreamChannel.withCloseGuarantee(stream, change(sink)); @override StreamChannel cast() => StreamChannel( stream.cast(), StreamController(sync: true)..stream.cast().pipe(sink)); }