// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; import 'package:async/async.dart'; import 'package:test/test.dart'; import 'utils.dart'; void main() { late StreamCloser closer; setUp(() { closer = StreamCloser(); }); group('when the closer is never closed', () { test('forwards data and done events', () { expect( createStream().transform(closer).toList(), completion([1, 2, 3, 4]), ); }); test('forwards error events', () { expect( Stream.error('oh no').transform(closer).toList(), throwsA('oh no'), ); }); test('transforms a broadcast stream into a broadcast stream', () { expect(const Stream.empty().transform(closer).isBroadcast, isTrue); }); test("doesn't eagerly listen", () { var controller = StreamController(); var transformed = controller.stream.transform(closer); expect(controller.hasListener, isFalse); transformed.listen(null); expect(controller.hasListener, isTrue); }); test('forwards pause and resume', () { var controller = StreamController(); var transformed = controller.stream.transform(closer); var subscription = transformed.listen(null); expect(controller.isPaused, isFalse); subscription.pause(); expect(controller.isPaused, isTrue); subscription.resume(); expect(controller.isPaused, isFalse); }); test('forwards cancel', () { var isCancelled = false; var controller = StreamController( onCancel: () => isCancelled = true, ); var transformed = controller.stream.transform(closer); expect(isCancelled, isFalse); var subscription = transformed.listen(null); expect(isCancelled, isFalse); subscription.cancel(); expect(isCancelled, isTrue); }); test('forwards errors from cancel', () { var controller = StreamController(onCancel: () => throw 'oh no'); expect( controller.stream.transform(closer).listen(null).cancel(), throwsA('oh no'), ); }); }); group('when a stream is added before the closer is closed', () { test('the stream emits a close event once the closer is closed', () async { var queue = StreamQueue(createStream().transform(closer)); await expectLater(queue, emits(1)); await expectLater(queue, emits(2)); expect(closer.close(), completes); expect(queue, emitsDone); }); test('the inner subscription is canceled once the closer is closed', () { var isCancelled = false; var controller = StreamController( onCancel: () => isCancelled = true, ); expect(controller.stream.transform(closer), emitsDone); expect(closer.close(), completes); expect(isCancelled, isTrue); }); test('closer.close() forwards errors from StreamSubscription.cancel()', () { var controller = StreamController(onCancel: () => throw 'oh no'); expect(controller.stream.transform(closer), emitsDone); expect(closer.close(), throwsA('oh no')); }); test( 'closer.close() works even if a stream has already completed', () async { expect( await createStream().transform(closer).toList(), equals([1, 2, 3, 4]), ); expect(closer.close(), completes); }, ); test( 'closer.close() works even if a stream has already been canceled', () async { createStream().transform(closer).listen(null).cancel(); expect(closer.close(), completes); }, ); group('but listened afterwards', () { test('the output stream immediately emits done', () { var stream = createStream().transform(closer); expect(closer.close(), completes); expect(stream, emitsDone); }); test( 'the underlying subscription is never listened if the stream is ' 'never listened', () async { var controller = StreamController( onListen: expectAsync0(() {}, count: 0), ); controller.stream.transform(closer); expect(closer.close(), completes); await pumpEventQueue(); }); test( 'the underlying subscription is listened and then canceled once the ' 'stream is listened', () { var controller = StreamController( onListen: expectAsync0(() {}), onCancel: expectAsync0(() {}), ); var stream = controller.stream.transform(closer); expect(closer.close(), completes); stream.listen(null); }); test('Subscription.cancel() errors are silently ignored', () async { var controller = StreamController( onCancel: expectAsync0(() => throw 'oh no'), ); var stream = controller.stream.transform(closer); expect(closer.close(), completes); stream.listen(null); await pumpEventQueue(); }); }); }); group('when a stream is added after the closer is closed', () { test('the output stream immediately emits done', () { expect(closer.close(), completes); expect(createStream().transform(closer), emitsDone); }); test( 'the underlying subscription is never listened if the stream is never ' 'listened', () async { expect(closer.close(), completes); var controller = StreamController( onListen: expectAsync0(() {}, count: 0), ); controller.stream.transform(closer); await pumpEventQueue(); }); test( 'the underlying subscription is listened and then canceled once the ' 'stream is listened', () { expect(closer.close(), completes); var controller = StreamController( onListen: expectAsync0(() {}), onCancel: expectAsync0(() {}), ); controller.stream.transform(closer).listen(null); }); test('Subscription.cancel() errors are silently ignored', () async { expect(closer.close(), completes); var controller = StreamController( onCancel: expectAsync0(() => throw 'oh no'), ); controller.stream.transform(closer).listen(null); await pumpEventQueue(); }); }); } Stream createStream() async* { yield 1; await flushMicrotasks(); yield 2; await flushMicrotasks(); yield 3; await flushMicrotasks(); yield 4; }