// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. // ignore_for_file: unawaited_futures import 'dart:async'; import 'package:async/async.dart'; import 'package:test/test.dart'; void main() { late StreamController controller; late StreamSplitter splitter; setUp(() { controller = StreamController(); splitter = StreamSplitter(controller.stream); }); test( "a branch that's created before the stream starts to replay it", () async { var events = []; var branch = splitter.split(); splitter.close(); branch.listen(events.add); controller.add(1); await flushMicrotasks(); expect(events, equals([1])); controller.add(2); await flushMicrotasks(); expect(events, equals([1, 2])); controller.add(3); await flushMicrotasks(); expect(events, equals([1, 2, 3])); controller.close(); }, ); test('a branch replays error events as well as data events', () { var branch = splitter.split(); splitter.close(); controller.add(1); controller.addError('error'); controller.add(3); controller.close(); var count = 0; branch.listen( expectAsync1((value) { expect(count, anyOf(0, 2)); expect(value, equals(count + 1)); count++; }, count: 2), onError: expectAsync1((error) { expect(count, equals(1)); expect(error, equals('error')); count++; }), onDone: expectAsync0(() { expect(count, equals(3)); }), ); }); test( "a branch that's created in the middle of a stream replays it", () async { controller.add(1); controller.add(2); await flushMicrotasks(); var branch = splitter.split(); splitter.close(); controller.add(3); controller.add(4); controller.close(); expect(branch.toList(), completion(equals([1, 2, 3, 4]))); }, ); test( "a branch that's created after the stream is finished replays it", () async { controller.add(1); controller.add(2); controller.add(3); controller.close(); await flushMicrotasks(); expect(splitter.split().toList(), completion(equals([1, 2, 3]))); splitter.close(); }, ); test('creates single-subscription branches', () async { var branch = splitter.split(); expect(branch.isBroadcast, isFalse); branch.listen(null); expect(() => branch.listen(null), throwsStateError); expect(() => branch.listen(null), throwsStateError); }); test('multiple branches each replay the stream', () async { var branch1 = splitter.split(); controller.add(1); controller.add(2); await flushMicrotasks(); var branch2 = splitter.split(); controller.add(3); controller.close(); await flushMicrotasks(); var branch3 = splitter.split(); splitter.close(); expect(branch1.toList(), completion(equals([1, 2, 3]))); expect(branch2.toList(), completion(equals([1, 2, 3]))); expect(branch3.toList(), completion(equals([1, 2, 3]))); }); test("a branch doesn't close until the source stream closes", () async { var branch = splitter.split(); splitter.close(); var closed = false; branch.last.then((_) => closed = true); controller.add(1); controller.add(2); controller.add(3); await flushMicrotasks(); expect(closed, isFalse); controller.close(); await flushMicrotasks(); expect(closed, isTrue); }); test("the source stream isn't listened to until a branch is", () async { expect(controller.hasListener, isFalse); var branch = splitter.split(); splitter.close(); await flushMicrotasks(); expect(controller.hasListener, isFalse); branch.listen(null); await flushMicrotasks(); expect(controller.hasListener, isTrue); }); test('the source stream is paused when all branches are paused', () async { var branch1 = splitter.split(); var branch2 = splitter.split(); var branch3 = splitter.split(); splitter.close(); var subscription1 = branch1.listen(null); var subscription2 = branch2.listen(null); var subscription3 = branch3.listen(null); subscription1.pause(); await flushMicrotasks(); expect(controller.isPaused, isFalse); subscription2.pause(); await flushMicrotasks(); expect(controller.isPaused, isFalse); subscription3.pause(); await flushMicrotasks(); expect(controller.isPaused, isTrue); subscription2.resume(); await flushMicrotasks(); expect(controller.isPaused, isFalse); }); test('the source stream is paused when all branches are canceled', () async { var branch1 = splitter.split(); var branch2 = splitter.split(); var branch3 = splitter.split(); var subscription1 = branch1.listen(null); var subscription2 = branch2.listen(null); var subscription3 = branch3.listen(null); subscription1.cancel(); await flushMicrotasks(); expect(controller.isPaused, isFalse); subscription2.cancel(); await flushMicrotasks(); expect(controller.isPaused, isFalse); subscription3.cancel(); await flushMicrotasks(); expect(controller.isPaused, isTrue); var branch4 = splitter.split(); splitter.close(); await flushMicrotasks(); expect(controller.isPaused, isTrue); branch4.listen(null); await flushMicrotasks(); expect(controller.isPaused, isFalse); }); test( "the source stream is canceled when it's closed after all branches have " 'been canceled', () async { var branch1 = splitter.split(); var branch2 = splitter.split(); var branch3 = splitter.split(); var subscription1 = branch1.listen(null); var subscription2 = branch2.listen(null); var subscription3 = branch3.listen(null); subscription1.cancel(); await flushMicrotasks(); expect(controller.hasListener, isTrue); subscription2.cancel(); await flushMicrotasks(); expect(controller.hasListener, isTrue); subscription3.cancel(); await flushMicrotasks(); expect(controller.hasListener, isTrue); splitter.close(); expect(controller.hasListener, isFalse); }); test( 'the source stream is canceled when all branches are canceled after it ' 'has been closed', () async { var branch1 = splitter.split(); var branch2 = splitter.split(); var branch3 = splitter.split(); splitter.close(); var subscription1 = branch1.listen(null); var subscription2 = branch2.listen(null); var subscription3 = branch3.listen(null); subscription1.cancel(); await flushMicrotasks(); expect(controller.hasListener, isTrue); subscription2.cancel(); await flushMicrotasks(); expect(controller.hasListener, isTrue); subscription3.cancel(); await flushMicrotasks(); expect(controller.hasListener, isFalse); }); test( "a splitter that's closed before any branches are added never listens " 'to the source stream', () { splitter.close(); // This would throw an error if the stream had already been listened to. controller.stream.listen(null); }); test( 'splitFrom splits a source stream into the designated number of ' 'branches', () { var branches = StreamSplitter.splitFrom(controller.stream, 5); controller.add(1); controller.add(2); controller.add(3); controller.close(); expect(branches[0].toList(), completion(equals([1, 2, 3]))); expect(branches[1].toList(), completion(equals([1, 2, 3]))); expect(branches[2].toList(), completion(equals([1, 2, 3]))); expect(branches[3].toList(), completion(equals([1, 2, 3]))); expect(branches[4].toList(), completion(equals([1, 2, 3]))); }); } /// Wait for all microtasks to complete. Future flushMicrotasks() => Future.delayed(Duration.zero);