// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE filevents. import 'dart:async'; import 'package:async/async.dart'; import 'package:test/test.dart'; import 'utils.dart'; void main() { group('source stream', () { test('is listened to on first request, paused between requests', () async { var controller = StreamController(); var events = StreamQueue(controller.stream); await flushMicrotasks(); expect(controller.hasListener, isFalse); var next = events.next; expect(controller.hasListener, isTrue); expect(controller.isPaused, isFalse); controller.add(1); expect(await next, 1); expect(controller.hasListener, isTrue); expect(controller.isPaused, isTrue); next = events.next; expect(controller.hasListener, isTrue); expect(controller.isPaused, isFalse); controller.add(2); expect(await next, 2); expect(controller.hasListener, isTrue); expect(controller.isPaused, isTrue); events.cancel(); expect(controller.hasListener, isFalse); }); }); group('eventsDispatched', () { test('increments after a next future completes', () async { var events = StreamQueue(createStream()); expect(events.eventsDispatched, equals(0)); await flushMicrotasks(); expect(events.eventsDispatched, equals(0)); var next = events.next; expect(events.eventsDispatched, equals(0)); await next; expect(events.eventsDispatched, equals(1)); await events.next; expect(events.eventsDispatched, equals(2)); }); test('increments multiple times for multi-value requests', () async { var events = StreamQueue(createStream()); await events.take(3); expect(events.eventsDispatched, equals(3)); }); test('increments multiple times for an accepted transaction', () async { var events = StreamQueue(createStream()); await events.withTransaction((queue) async { await queue.next; await queue.next; return true; }); expect(events.eventsDispatched, equals(2)); }); test("doesn't increment for rest requests", () async { var events = StreamQueue(createStream()); await events.rest.toList(); expect(events.eventsDispatched, equals(0)); }); }); group('lookAhead operation', () { test('as simple list of events', () async { var events = StreamQueue(createStream()); expect(await events.lookAhead(4), [1, 2, 3, 4]); expect(await events.next, 1); expect(await events.lookAhead(2), [2, 3]); expect(await events.take(2), [2, 3]); expect(await events.next, 4); await events.cancel(); }); test('of 0 events', () async { var events = StreamQueue(createStream()); expect(events.lookAhead(0), completion([])); expect(events.next, completion(1)); expect(events.lookAhead(0), completion([])); expect(events.next, completion(2)); expect(events.lookAhead(0), completion([])); expect(events.next, completion(3)); expect(events.lookAhead(0), completion([])); expect(events.next, completion(4)); expect(events.lookAhead(0), completion([])); expect(events.lookAhead(5), completion([])); expect(events.next, throwsStateError); await events.cancel(); }); test('with bad arguments throws', () async { var events = StreamQueue(createStream()); expect(() => events.lookAhead(-1), throwsArgumentError); expect(await events.next, 1); // Did not consume event. expect(() => events.lookAhead(-1), throwsArgumentError); expect(await events.next, 2); // Did not consume event. await events.cancel(); }); test('of too many arguments', () async { var events = StreamQueue(createStream()); expect(await events.lookAhead(6), [1, 2, 3, 4]); await events.cancel(); }); test('too large later', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.lookAhead(6), [3, 4]); await events.cancel(); }); test('error', () async { var events = StreamQueue(createErrorStream()); expect(events.lookAhead(4), throwsA('To err is divine!')); expect(events.take(4), throwsA('To err is divine!')); expect(await events.next, 4); await events.cancel(); }); }); group('next operation', () { test('simple sequence of requests', () async { var events = StreamQueue(createStream()); for (var i = 1; i <= 4; i++) { expect(await events.next, i); } expect(events.next, throwsStateError); }); test('multiple requests at the same time', () async { var events = StreamQueue(createStream()); var result = await Future.wait([ events.next, events.next, events.next, events.next, ]); expect(result, [1, 2, 3, 4]); await events.cancel(); }); test('sequence of requests with error', () async { var events = StreamQueue(createErrorStream()); expect(await events.next, 1); expect(await events.next, 2); expect(events.next, throwsA('To err is divine!')); expect(await events.next, 4); await events.cancel(); }); }); group('skip operation', () { test('of two elements in the middle of sequence', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.skip(2), 0); expect(await events.next, 4); await events.cancel(); }); test('with negative/bad arguments throws', () async { var events = StreamQueue(createStream()); expect(() => events.skip(-1), throwsArgumentError); // A non-int throws either a type error or an argument error, // depending on whether it's checked mode or not. expect(await events.next, 1); // Did not consume event. expect(() => events.skip(-1), throwsArgumentError); expect(await events.next, 2); // Did not consume event. await events.cancel(); }); test('of 0 elements works', () async { var events = StreamQueue(createStream()); expect(events.skip(0), completion(0)); expect(events.next, completion(1)); expect(events.skip(0), completion(0)); expect(events.next, completion(2)); expect(events.skip(0), completion(0)); expect(events.next, completion(3)); expect(events.skip(0), completion(0)); expect(events.next, completion(4)); expect(events.skip(0), completion(0)); expect(events.skip(5), completion(5)); expect(events.next, throwsStateError); await events.cancel(); }); test('of too many events ends at stream start', () async { var events = StreamQueue(createStream()); expect(await events.skip(6), 2); await events.cancel(); }); test('of too many events after some events', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.skip(6), 4); await events.cancel(); }); test('of too many events ends at stream end', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.next, 3); expect(await events.next, 4); expect(await events.skip(2), 2); await events.cancel(); }); test('of events with error', () async { var events = StreamQueue(createErrorStream()); expect(events.skip(4), throwsA('To err is divine!')); expect(await events.next, 4); await events.cancel(); }); test('of events with error, and skip again after', () async { var events = StreamQueue(createErrorStream()); expect(events.skip(4), throwsA('To err is divine!')); expect(events.skip(2), completion(1)); await events.cancel(); }); test('multiple skips at same time complete in order.', () async { var events = StreamQueue(createStream()); var skip1 = events.skip(1); var skip2 = events.skip(0); var skip3 = events.skip(4); var skip4 = events.skip(1); var index = 0; // Check that futures complete in order. Func1Required sequence(int expectedValue, int sequenceIndex) => (value) { expect(value, expectedValue); expect(index, sequenceIndex); index++; return null; }; await Future.wait([ skip1.then(sequence(0, 0)), skip2.then(sequence(0, 1)), skip3.then(sequence(1, 2)), skip4.then(sequence(1, 3)), ]); await events.cancel(); }); }); group('take operation', () { test('as simple take of events', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.take(2), [2, 3]); expect(await events.next, 4); await events.cancel(); }); test('of 0 events', () async { var events = StreamQueue(createStream()); expect(events.take(0), completion([])); expect(events.next, completion(1)); expect(events.take(0), completion([])); expect(events.next, completion(2)); expect(events.take(0), completion([])); expect(events.next, completion(3)); expect(events.take(0), completion([])); expect(events.next, completion(4)); expect(events.take(0), completion([])); expect(events.take(5), completion([])); expect(events.next, throwsStateError); await events.cancel(); }); test('with bad arguments throws', () async { var events = StreamQueue(createStream()); expect(() => events.take(-1), throwsArgumentError); expect(await events.next, 1); // Did not consume event. expect(() => events.take(-1), throwsArgumentError); expect(await events.next, 2); // Did not consume event. await events.cancel(); }); test('of too many arguments', () async { var events = StreamQueue(createStream()); expect(await events.take(6), [1, 2, 3, 4]); await events.cancel(); }); test('too large later', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.take(6), [3, 4]); await events.cancel(); }); test('error', () async { var events = StreamQueue(createErrorStream()); expect(events.take(4), throwsA('To err is divine!')); expect(await events.next, 4); await events.cancel(); }); }); group('rest operation', () { test('after single next', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.rest.toList(), [2, 3, 4]); }); test('at start', () async { var events = StreamQueue(createStream()); expect(await events.rest.toList(), [1, 2, 3, 4]); }); test('at end', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.next, 3); expect(await events.next, 4); expect(await events.rest.toList(), isEmpty); }); test('after end', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.next, 3); expect(await events.next, 4); expect(events.next, throwsStateError); expect(await events.rest.toList(), isEmpty); }); test('after receiving done requested before', () async { var events = StreamQueue(createStream()); var next1 = events.next; var next2 = events.next; var next3 = events.next; var rest = events.rest; for (var i = 0; i < 10; i++) { await flushMicrotasks(); } expect(await next1, 1); expect(await next2, 2); expect(await next3, 3); expect(await rest.toList(), [4]); }); test('with an error event error', () async { var events = StreamQueue(createErrorStream()); expect(await events.next, 1); var rest = events.rest; var events2 = StreamQueue(rest); expect(await events2.next, 2); expect(events2.next, throwsA('To err is divine!')); expect(await events2.next, 4); }); test('closes the events, prevents other operations', () async { var events = StreamQueue(createStream()); var stream = events.rest; expect(() => events.next, throwsStateError); expect(() => events.skip(1), throwsStateError); expect(() => events.take(1), throwsStateError); expect(() => events.rest, throwsStateError); expect(() => events.cancel(), throwsStateError); expect(stream.toList(), completion([1, 2, 3, 4])); }); test('forwards to underlying stream', () async { var cancel = Completer(); var controller = StreamController(onCancel: () => cancel.future); var events = StreamQueue(controller.stream); expect(controller.hasListener, isFalse); var next = events.next; expect(controller.hasListener, isTrue); expect(controller.isPaused, isFalse); controller.add(1); expect(await next, 1); expect(controller.isPaused, isTrue); var rest = events.rest; var subscription = rest.listen(null); expect(controller.hasListener, isTrue); expect(controller.isPaused, isFalse); dynamic lastEvent; subscription.onData((value) => lastEvent = value); controller.add(2); await flushMicrotasks(); expect(lastEvent, 2); expect(controller.hasListener, isTrue); expect(controller.isPaused, isFalse); subscription.pause(); expect(controller.isPaused, isTrue); controller.add(3); await flushMicrotasks(); expect(lastEvent, 2); subscription.resume(); await flushMicrotasks(); expect(lastEvent, 3); var cancelFuture = subscription.cancel(); expect(controller.hasListener, isFalse); cancel.complete(42); expect(cancelFuture, completion(42)); }); }); group('peek operation', () { test('peeks one event', () async { var events = StreamQueue(createStream()); expect(await events.peek, 1); expect(await events.next, 1); expect(await events.peek, 2); expect(await events.take(2), [2, 3]); expect(await events.peek, 4); expect(await events.next, 4); // Throws at end. expect(events.peek, throwsA(anything)); await events.cancel(); }); test('multiple requests at the same time', () async { var events = StreamQueue(createStream()); var result = await Future.wait([ events.peek, events.peek, events.next, events.peek, events.peek, ]); expect(result, [1, 1, 1, 2, 2]); await events.cancel(); }); test('sequence of requests with error', () async { var events = StreamQueue(createErrorStream()); expect(await events.next, 1); expect(await events.next, 2); expect(events.peek, throwsA('To err is divine!')); // Error stays in queue. expect(events.peek, throwsA('To err is divine!')); expect(events.next, throwsA('To err is divine!')); expect(await events.next, 4); await events.cancel(); }); }); group('cancel operation', () { test('closes the events, prevents any other operation', () async { var events = StreamQueue(createStream()); await events.cancel(); expect(() => events.lookAhead(1), throwsStateError); expect(() => events.next, throwsStateError); expect(() => events.peek, throwsStateError); expect(() => events.skip(1), throwsStateError); expect(() => events.take(1), throwsStateError); expect(() => events.rest, throwsStateError); expect(() => events.cancel(), throwsStateError); }); test( 'cancels underlying subscription when called before any event', () async { var cancelFuture = Future.value(42); var controller = StreamController(onCancel: () => cancelFuture); var events = StreamQueue(controller.stream); expect(await events.cancel(), 42); }, ); test('cancels underlying subscription, returns result', () async { var cancelFuture = Future.value(42); var controller = StreamController(onCancel: () => cancelFuture); var events = StreamQueue(controller.stream); controller.add(1); expect(await events.next, 1); expect(await events.cancel(), 42); }); group('with immediate: true', () { test('closes the events, prevents any other operation', () async { var events = StreamQueue(createStream()); await events.cancel(immediate: true); expect(() => events.next, throwsStateError); expect(() => events.skip(1), throwsStateError); expect(() => events.take(1), throwsStateError); expect(() => events.rest, throwsStateError); expect(() => events.cancel(), throwsStateError); }); test('cancels the underlying subscription immediately', () async { var controller = StreamController(); controller.add(1); var events = StreamQueue(controller.stream); expect(await events.next, 1); expect(controller.hasListener, isTrue); await events.cancel(immediate: true); expect(controller.hasListener, isFalse); }); test( 'cancels the underlying subscription when called before any event', () async { var cancelFuture = Future.value(42); var controller = StreamController(onCancel: () => cancelFuture); var events = StreamQueue(controller.stream); expect(await events.cancel(immediate: true), 42); }, ); test('closes pending requests', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(events.next, throwsStateError); expect(events.hasNext, completion(isFalse)); await events.cancel(immediate: true); }); test( 'returns the result of closing the underlying subscription', () async { var controller = StreamController( onCancel: () => Future.value(42), ); var events = StreamQueue(controller.stream); expect(await events.cancel(immediate: true), 42); }, ); test( "listens and then cancels a stream that hasn't been listened to yet", () async { var wasListened = false; var controller = StreamController( onListen: () => wasListened = true, ); var events = StreamQueue(controller.stream); expect(wasListened, isFalse); expect(controller.hasListener, isFalse); await events.cancel(immediate: true); expect(wasListened, isTrue); expect(controller.hasListener, isFalse); }, ); }); }); group('hasNext operation', () { test('true at start', () async { var events = StreamQueue(createStream()); expect(await events.hasNext, isTrue); }); test('true after start', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, isTrue); }); test('true at end', () async { var events = StreamQueue(createStream()); for (var i = 1; i <= 4; i++) { expect(await events.next, i); } expect(await events.hasNext, isFalse); }); test('true when enqueued', () async { var events = StreamQueue(createStream()); var values = []; for (var i = 1; i <= 3; i++) { events.next.then(values.add); } expect(values, isEmpty); expect(await events.hasNext, isTrue); expect(values, [1, 2, 3]); }); test('false when enqueued', () async { var events = StreamQueue(createStream()); var values = []; for (var i = 1; i <= 4; i++) { events.next.then(values.add); } expect(values, isEmpty); expect(await events.hasNext, isFalse); expect(values, [1, 2, 3, 4]); }); test('true when data event', () async { var controller = StreamController(); var events = StreamQueue(controller.stream); bool? hasNext; events.hasNext.then((result) { hasNext = result; }); await flushMicrotasks(); expect(hasNext, isNull); controller.add(42); expect(hasNext, isNull); await flushMicrotasks(); expect(hasNext, isTrue); }); test('true when error event', () async { var controller = StreamController(); var events = StreamQueue(controller.stream); bool? hasNext; events.hasNext.then((result) { hasNext = result; }); await flushMicrotasks(); expect(hasNext, isNull); controller.addError('BAD'); expect(hasNext, isNull); await flushMicrotasks(); expect(hasNext, isTrue); expect(events.next, throwsA('BAD')); }); test('- hasNext after hasNext', () async { var events = StreamQueue(createStream()); expect(await events.hasNext, true); expect(await events.hasNext, true); expect(await events.next, 1); expect(await events.hasNext, true); expect(await events.hasNext, true); expect(await events.next, 2); expect(await events.hasNext, true); expect(await events.hasNext, true); expect(await events.next, 3); expect(await events.hasNext, true); expect(await events.hasNext, true); expect(await events.next, 4); expect(await events.hasNext, false); expect(await events.hasNext, false); }); test('- next after true', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, true); expect(await events.next, 2); expect(await events.next, 3); }); test('- next after true, enqueued', () async { var events = StreamQueue(createStream()); var responses = []; events.next.then(responses.add); events.hasNext.then(responses.add); events.next.then(responses.add); do { await flushMicrotasks(); } while (responses.length < 3); expect(responses, [1, true, 2]); }); test('- skip 0 after true', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, true); expect(await events.skip(0), 0); expect(await events.next, 2); }); test('- skip 1 after true', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, true); expect(await events.skip(1), 0); expect(await events.next, 3); }); test('- skip 2 after true', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, true); expect(await events.skip(2), 0); expect(await events.next, 4); }); test('- take 0 after true', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, true); expect(await events.take(0), isEmpty); expect(await events.next, 2); }); test('- take 1 after true', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, true); expect(await events.take(1), [2]); expect(await events.next, 3); }); test('- take 2 after true', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, true); expect(await events.take(2), [2, 3]); expect(await events.next, 4); }); test('- rest after true', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.hasNext, true); var stream = events.rest; expect(await stream.toList(), [2, 3, 4]); }); test('- rest after true, at last', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.next, 3); expect(await events.hasNext, true); var stream = events.rest; expect(await stream.toList(), [4]); }); test('- rest after false', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.next, 3); expect(await events.next, 4); expect(await events.hasNext, false); var stream = events.rest; expect(await stream.toList(), isEmpty); }); test('- cancel after true on data', () async { var events = StreamQueue(createStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.hasNext, true); expect(await events.cancel(), null); }); test('- cancel after true on error', () async { var events = StreamQueue(createErrorStream()); expect(await events.next, 1); expect(await events.next, 2); expect(await events.hasNext, true); expect(await events.cancel(), null); }); }); group('startTransaction operation produces a transaction that', () { late StreamQueue events; late StreamQueueTransaction transaction; late StreamQueue queue1; late StreamQueue queue2; setUp(() async { events = StreamQueue(createStream()); expect(await events.next, 1); transaction = events.startTransaction(); queue1 = transaction.newQueue(); queue2 = transaction.newQueue(); }); group('emits queues that', () { test('independently emit events', () async { expect(await queue1.next, 2); expect(await queue2.next, 2); expect(await queue2.next, 3); expect(await queue1.next, 3); expect(await queue1.next, 4); expect(await queue2.next, 4); expect(await queue1.hasNext, isFalse); expect(await queue2.hasNext, isFalse); }); test('queue requests for events', () async { expect(queue1.next, completion(2)); expect(queue2.next, completion(2)); expect(queue2.next, completion(3)); expect(queue1.next, completion(3)); expect(queue1.next, completion(4)); expect(queue2.next, completion(4)); expect(queue1.hasNext, completion(isFalse)); expect(queue2.hasNext, completion(isFalse)); }); test('independently emit errors', () async { events = StreamQueue(createErrorStream()); expect(await events.next, 1); transaction = events.startTransaction(); queue1 = transaction.newQueue(); queue2 = transaction.newQueue(); expect(queue1.next, completion(2)); expect(queue2.next, completion(2)); expect(queue2.next, throwsA('To err is divine!')); expect(queue1.next, throwsA('To err is divine!')); expect(queue1.next, completion(4)); expect(queue2.next, completion(4)); expect(queue1.hasNext, completion(isFalse)); expect(queue2.hasNext, completion(isFalse)); }); }); group('when rejected', () { test('further original requests use the previous state', () async { expect(await queue1.next, 2); expect(await queue2.next, 2); expect(await queue2.next, 3); await flushMicrotasks(); transaction.reject(); expect(await events.next, 2); expect(await events.next, 3); expect(await events.next, 4); expect(await events.hasNext, isFalse); }); test('pending original requests use the previous state', () async { expect(await queue1.next, 2); expect(await queue2.next, 2); expect(await queue2.next, 3); expect(events.next, completion(2)); expect(events.next, completion(3)); expect(events.next, completion(4)); expect(events.hasNext, completion(isFalse)); await flushMicrotasks(); transaction.reject(); }); test( 'further child requests act as though the stream was closed', () async { expect(await queue1.next, 2); transaction.reject(); expect(await queue1.hasNext, isFalse); expect(queue1.next, throwsStateError); }, ); test( 'pending child requests act as though the stream was closed', () async { expect(await queue1.next, 2); expect(queue1.hasNext, completion(isFalse)); expect(queue1.next, throwsStateError); transaction.reject(); }, ); // Regression test. test('pending child rest requests emit no more events', () async { var controller = StreamController(); var events = StreamQueue(controller.stream); var transaction = events.startTransaction(); var queue = transaction.newQueue(); // This should emit no more events after the transaction is rejected. queue.rest.listen( expectAsync1((_) {}, count: 3), onDone: expectAsync0(() {}, count: 0), ); controller.add(1); controller.add(2); controller.add(3); await flushMicrotasks(); transaction.reject(); await flushMicrotasks(); // These shouldn't affect the result of `queue.rest.toList()`. controller.add(4); controller.add(5); }); test("child requests' cancel() may still be called explicitly", () async { transaction.reject(); await queue1.cancel(); }); test('calls to commit() or reject() fail', () async { transaction.reject(); expect(transaction.reject, throwsStateError); expect(() => transaction.commit(queue1), throwsStateError); }); test('before the transaction emits any events, does nothing', () async { var controller = StreamController(); var events = StreamQueue(controller.stream); // Queue a request before the transaction, but don't let it complete // until we're done with the transaction. expect(events.next, completion(equals(1))); events.startTransaction().reject(); expect(events.next, completion(equals(2))); await flushMicrotasks(); controller.add(1); await flushMicrotasks(); controller.add(2); await flushMicrotasks(); controller.close(); }); test( 'can reject a transaction where one copy is fully consumed ' 'in a transaction and a second copy is made', () async { // Regression test for https://github.com/dart-lang/async/issues/229 final queue = StreamQueue(Stream.fromIterable([0])); final transaction = queue.startTransaction(); final copy1 = transaction.newQueue(); final inner1 = copy1.startTransaction(); final innerCopy1 = inner1.newQueue(); await innerCopy1.next; transaction.newQueue(); transaction.reject(); expect(await queue.next, 0); expect(await queue.hasNext, isFalse); }, skip: 'https://github.com/dart-lang/async/issues/229'); }); group('when committed', () { test('further original requests use the committed state', () async { expect(await queue1.next, 2); await flushMicrotasks(); transaction.commit(queue1); expect(await events.next, 3); }); test('pending original requests use the committed state', () async { expect(await queue1.next, 2); expect(events.next, completion(3)); await flushMicrotasks(); transaction.commit(queue1); }); test( 'further child requests act as though the stream was closed', () async { expect(await queue2.next, 2); transaction.commit(queue2); expect(await queue1.hasNext, isFalse); expect(queue1.next, throwsStateError); }, ); test( 'pending child requests act as though the stream was closed', () async { expect(await queue2.next, 2); expect(queue1.hasNext, completion(isFalse)); expect(queue1.next, throwsStateError); transaction.commit(queue2); }, ); test('further requests act as though the stream was closed', () async { expect(await queue1.next, 2); transaction.commit(queue1); expect(await queue1.hasNext, isFalse); expect(queue1.next, throwsStateError); }); test('cancel() may still be called explicitly', () async { expect(await queue1.next, 2); transaction.commit(queue1); await queue1.cancel(); }); test('throws if there are pending requests', () async { expect(await queue1.next, 2); expect(queue1.hasNext, completion(isTrue)); expect(() => transaction.commit(queue1), throwsStateError); }); test('calls to commit() or reject() fail', () async { transaction.commit(queue1); expect(transaction.reject, throwsStateError); expect(() => transaction.commit(queue1), throwsStateError); }); test('before the transaction emits any events, does nothing', () async { var controller = StreamController(); var events = StreamQueue(controller.stream); // Queue a request before the transaction, but don't let it complete // until we're done with the transaction. expect(events.next, completion(equals(1))); var transaction = events.startTransaction(); transaction.commit(transaction.newQueue()); expect(events.next, completion(equals(2))); await flushMicrotasks(); controller.add(1); await flushMicrotasks(); controller.add(2); await flushMicrotasks(); controller.close(); }); }); }); group('withTransaction operation', () { late StreamQueue events; setUp(() async { events = StreamQueue(createStream()); expect(await events.next, 1); }); test('passes a copy of the parent queue', () async { await events.withTransaction( expectAsync1((queue) async { expect(await queue.next, 2); expect(await queue.next, 3); expect(await queue.next, 4); expect(await queue.hasNext, isFalse); return true; }), ); }); test( 'the parent queue continues from the child position if it returns ' 'true', () async { await events.withTransaction( expectAsync1((queue) async { expect(await queue.next, 2); return true; }), ); expect(await events.next, 3); }); test( 'the parent queue continues from its original position if it returns ' 'false', () async { await events.withTransaction( expectAsync1((queue) async { expect(await queue.next, 2); return false; }), ); expect(await events.next, 2); }); test('the parent queue continues from the child position if it throws', () { expect( events.withTransaction( expectAsync1((queue) async { expect(await queue.next, 2); throw 'oh no'; }), ), throwsA('oh no'), ); expect(events.next, completion(3)); }); test('returns whether the transaction succeeded', () { expect(events.withTransaction((_) async => true), completion(isTrue)); expect(events.withTransaction((_) async => false), completion(isFalse)); }); }); group('cancelable operation', () { late StreamQueue events; setUp(() async { events = StreamQueue(createStream()); expect(await events.next, 1); }); test('passes a copy of the parent queue', () async { await events.cancelable( expectAsync1((queue) async { expect(await queue.next, 2); expect(await queue.next, 3); expect(await queue.next, 4); expect(await queue.hasNext, isFalse); }), ).value; }); test( 'the parent queue continues from the child position by default', () async { await events.cancelable( expectAsync1((queue) async { expect(await queue.next, 2); }), ).value; expect(await events.next, 3); }, ); test( 'the parent queue continues from the child position if an error is ' 'thrown', () async { expect( events.cancelable( expectAsync1((queue) async { expect(await queue.next, 2); throw 'oh no'; }), ).value, throwsA('oh no'), ); expect(events.next, completion(3)); }); test( 'the parent queue continues from the original position if canceled', () async { var operation = events.cancelable( expectAsync1((queue) async { expect(await queue.next, 2); }), ); operation.cancel(); expect(await events.next, 2); }, ); test('forwards the value from the callback', () async { expect( await events.cancelable( expectAsync1((queue) async { expect(await queue.next, 2); return 'value'; }), ).value, 'value', ); }); }); test('all combinations sequential skip/next/take operations', () async { // Takes all combinations of two of next, skip and take, then ends with // doing rest. Each of the first rounds do 10 events of each type, // the rest does 20 elements. var eventCount = 20 * (3 * 3 + 1); var events = StreamQueue(createLongStream(eventCount)); // Test expecting [startIndex .. startIndex + 9] as events using // `next`. void nextTest(int startIndex) { for (var i = 0; i < 10; i++) { expect(events.next, completion(startIndex + i)); } } // Test expecting 10 events to be skipped. void skipTest(startIndex) { expect(events.skip(10), completion(0)); } // Test expecting [startIndex .. startIndex + 9] as events using // `take(10)`. void takeTest(int startIndex) { expect( events.take(10), completion(List.generate(10, (i) => startIndex + i)), ); } var tests = [nextTest, skipTest, takeTest]; var counter = 0; // Run through all pairs of two tests and run them. for (var i = 0; i < tests.length; i++) { for (var j = 0; j < tests.length; j++) { tests[i](counter); tests[j](counter + 10); counter += 20; } } // Then expect 20 more events as a `rest` call. expect( events.rest.toList(), completion(List.generate(20, (i) => counter + i)), ); }); } typedef Func1Required = T Function(T value); Stream createStream() async* { yield 1; await flushMicrotasks(); yield 2; await flushMicrotasks(); yield 3; await flushMicrotasks(); yield 4; } Stream createErrorStream() { var controller = StreamController(); () async { controller.add(1); await flushMicrotasks(); controller.add(2); await flushMicrotasks(); controller.addError('To err is divine!'); await flushMicrotasks(); controller.add(4); await flushMicrotasks(); controller.close(); }(); return controller.stream; } Stream createLongStream(int eventCount) async* { for (var i = 0; i < eventCount; i++) { yield i; } }