// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. part of "dart:async"; @Since("3.0") extension FutureIterable on Iterable> { /// Waits for futures in parallel. /// /// Waits for all the futures in this iterable. /// Returns a list of the resulting values, /// in the same order as the futures which created them, /// if all futures are successful. /// /// Similar to [Future.wait], but reports errors using a /// [ParallelWaitError], which allows the caller to /// handle errors and dispose successful results if necessary. /// /// The returned future is completed when all the futures have completed. /// If any of the futures do not complete, nor does the returned future. /// /// If any future completes with an error, /// the returned future completes with a [ParallelWaitError]. /// The [ParallelWaitError.values] is a list of the values for /// successful futures and `null` for futures with errors. /// The [ParallelWaitError.errors] is a list of the same length, /// with `null` values for the successful futures /// and an [AsyncError] with the error for futures /// which completed with an error. Future> get wait { var results = [for (var f in this) _FutureResult(f)]; if (results.isEmpty) return Future>.value([]); @pragma('vm:awaiter-link') final c = Completer>.sync(); _FutureResult._waitAll(results, (errors) { if (errors == 0) { c.complete([for (var r in results) r.value]); } else { var errorList = [for (var r in results) r.errorOrNull]; c.completeError( ParallelWaitError, List>( [for (var r in results) r.valueOrNull], errorList, errorCount: errors, defaultError: errorList.firstWhere(_notNull), ), ); } }); return c.future; } } bool _notNull(Object? object) => object != null; /// Parallel operations on a record of futures. /// /// {@template record-parallel-wait} /// Waits for futures in parallel. /// /// Waits for all the futures in this record. /// Returns a record of the values, if all futures are successful. /// /// The returned future is completed when all the futures have completed. /// If any of the futures do not complete, nor does the returned future. /// /// If some futures complete with an error, /// the returned future completes with a [ParallelWaitError]. /// The [ParallelWaitError.values] is a record of the values of /// successful futures, and `null` for futures with errors. /// The [ParallelWaitError.errors] is a record of the same shape, /// with `null` values for the successful futures /// and an [AsyncError] with the error of futures /// which completed with an error. /// {@endtemplate} @Since("3.0") extension FutureRecord2 on (Future, Future) { /// {@macro record-parallel-wait} Future<(T1, T2)> get wait { @pragma('vm:awaiter-link') final c = Completer<(T1, T2)>.sync(); final v1 = _FutureResult($1); final v2 = _FutureResult($2); _FutureResult._waitAll([v1, v2], (int errors) { if (errors == 0) { c.complete((v1.value, v2.value)); } else { c.completeError( ParallelWaitError( (v1.valueOrNull, v2.valueOrNull), (v1.errorOrNull, v2.errorOrNull), errorCount: errors, defaultError: v1.errorOrNull ?? v2.errorOrNull, ), ); } }); return c.future; } } /// Parallel operations on a record of futures. @Since("3.0") extension FutureRecord3 on (Future, Future, Future) { /// {@macro record-parallel-wait} Future<(T1, T2, T3)> get wait { @pragma('vm:awaiter-link') final c = Completer<(T1, T2, T3)>.sync(); final v1 = _FutureResult($1); final v2 = _FutureResult($2); final v3 = _FutureResult($3); _FutureResult._waitAll([v1, v2, v3], (int errors) { if (errors == 0) { c.complete((v1.value, v2.value, v3.value)); } else { c.completeError( ParallelWaitError( (v1.valueOrNull, v2.valueOrNull, v3.valueOrNull), (v1.errorOrNull, v2.errorOrNull, v3.errorOrNull), errorCount: errors, defaultError: v1.errorOrNull ?? v2.errorOrNull ?? v3.errorOrNull, ), ); } }); return c.future; } } /// Parallel operations on a record of futures. @Since("3.0") extension FutureRecord4 on (Future, Future, Future, Future) { /// {@macro record-parallel-wait} Future<(T1, T2, T3, T4)> get wait { @pragma('vm:awaiter-link') final c = Completer<(T1, T2, T3, T4)>.sync(); final v1 = _FutureResult($1); final v2 = _FutureResult($2); final v3 = _FutureResult($3); final v4 = _FutureResult($4); _FutureResult._waitAll([v1, v2, v3, v4], (int errors) { if (errors == 0) { c.complete((v1.value, v2.value, v3.value, v4.value)); } else { c.completeError( ParallelWaitError( (v1.valueOrNull, v2.valueOrNull, v3.valueOrNull, v4.valueOrNull), (v1.errorOrNull, v2.errorOrNull, v3.errorOrNull, v4.errorOrNull), errorCount: errors, defaultError: v1.errorOrNull ?? v2.errorOrNull ?? v3.errorOrNull ?? v4.errorOrNull, ), ); } }); return c.future; } } /// Parallel operations on a record of futures. @Since("3.0") extension FutureRecord5 on (Future, Future, Future, Future, Future) { /// {@macro record-parallel-wait} Future<(T1, T2, T3, T4, T5)> get wait { @pragma('vm:awaiter-link') final c = Completer<(T1, T2, T3, T4, T5)>.sync(); final v1 = _FutureResult($1); final v2 = _FutureResult($2); final v3 = _FutureResult($3); final v4 = _FutureResult($4); final v5 = _FutureResult($5); _FutureResult._waitAll([v1, v2, v3, v4, v5], (int errors) { if (errors == 0) { c.complete((v1.value, v2.value, v3.value, v4.value, v5.value)); } else { c.completeError( ParallelWaitError( ( v1.valueOrNull, v2.valueOrNull, v3.valueOrNull, v4.valueOrNull, v5.valueOrNull, ), ( v1.errorOrNull, v2.errorOrNull, v3.errorOrNull, v4.errorOrNull, v5.errorOrNull, ), errorCount: errors, defaultError: v1.errorOrNull ?? v2.errorOrNull ?? v3.errorOrNull ?? v4.errorOrNull ?? v5.errorOrNull, ), ); } }); return c.future; } } /// Parallel operations on a record of futures. @Since("3.0") extension FutureRecord6 on ( Future, Future, Future, Future, Future, Future, ) { /// {@macro record-parallel-wait} Future<(T1, T2, T3, T4, T5, T6)> get wait { @pragma('vm:awaiter-link') final c = Completer<(T1, T2, T3, T4, T5, T6)>.sync(); final v1 = _FutureResult($1); final v2 = _FutureResult($2); final v3 = _FutureResult($3); final v4 = _FutureResult($4); final v5 = _FutureResult($5); final v6 = _FutureResult($6); _FutureResult._waitAll([v1, v2, v3, v4, v5, v6], (int errors) { if (errors == 0) { c.complete(( v1.value, v2.value, v3.value, v4.value, v5.value, v6.value, )); } else { c.completeError( ParallelWaitError( ( v1.valueOrNull, v2.valueOrNull, v3.valueOrNull, v4.valueOrNull, v5.valueOrNull, v6.valueOrNull, ), ( v1.errorOrNull, v2.errorOrNull, v3.errorOrNull, v4.errorOrNull, v5.errorOrNull, v6.errorOrNull, ), errorCount: errors, defaultError: v1.errorOrNull ?? v2.errorOrNull ?? v3.errorOrNull ?? v4.errorOrNull ?? v5.errorOrNull ?? v6.errorOrNull, ), ); } }); return c.future; } } /// Parallel operations on a record of futures. @Since("3.0") extension FutureRecord7 on ( Future, Future, Future, Future, Future, Future, Future, ) { /// {@macro record-parallel-wait} Future<(T1, T2, T3, T4, T5, T6, T7)> get wait { @pragma('vm:awaiter-link') final c = Completer<(T1, T2, T3, T4, T5, T6, T7)>.sync(); final v1 = _FutureResult($1); final v2 = _FutureResult($2); final v3 = _FutureResult($3); final v4 = _FutureResult($4); final v5 = _FutureResult($5); final v6 = _FutureResult($6); final v7 = _FutureResult($7); _FutureResult._waitAll([v1, v2, v3, v4, v5, v6, v7], (int errors) { if (errors == 0) { c.complete(( v1.value, v2.value, v3.value, v4.value, v5.value, v6.value, v7.value, )); } else { c.completeError( ParallelWaitError( ( v1.valueOrNull, v2.valueOrNull, v3.valueOrNull, v4.valueOrNull, v5.valueOrNull, v6.valueOrNull, v7.valueOrNull, ), ( v1.errorOrNull, v2.errorOrNull, v3.errorOrNull, v4.errorOrNull, v5.errorOrNull, v6.errorOrNull, v7.errorOrNull, ), errorCount: errors, defaultError: v1.errorOrNull ?? v2.errorOrNull ?? v3.errorOrNull ?? v4.errorOrNull ?? v5.errorOrNull ?? v6.errorOrNull ?? v7.errorOrNull, ), ); } }); return c.future; } } /// Parallel operations on a record of futures. @Since("3.0") extension FutureRecord8 on ( Future, Future, Future, Future, Future, Future, Future, Future, ) { /// {@macro record-parallel-wait} Future<(T1, T2, T3, T4, T5, T6, T7, T8)> get wait { @pragma('vm:awaiter-link') final c = Completer<(T1, T2, T3, T4, T5, T6, T7, T8)>.sync(); final v1 = _FutureResult($1); final v2 = _FutureResult($2); final v3 = _FutureResult($3); final v4 = _FutureResult($4); final v5 = _FutureResult($5); final v6 = _FutureResult($6); final v7 = _FutureResult($7); final v8 = _FutureResult($8); _FutureResult._waitAll([v1, v2, v3, v4, v5, v6, v7, v8], (int errors) { if (errors == 0) { c.complete(( v1.value, v2.value, v3.value, v4.value, v5.value, v6.value, v7.value, v8.value, )); } else { c.completeError( ParallelWaitError( ( v1.valueOrNull, v2.valueOrNull, v3.valueOrNull, v4.valueOrNull, v5.valueOrNull, v6.valueOrNull, v7.valueOrNull, v8.valueOrNull, ), ( v1.errorOrNull, v2.errorOrNull, v3.errorOrNull, v4.errorOrNull, v5.errorOrNull, v6.errorOrNull, v7.errorOrNull, v8.errorOrNull, ), errorCount: errors, defaultError: v1.errorOrNull ?? v2.errorOrNull ?? v3.errorOrNull ?? v4.errorOrNull ?? v5.errorOrNull ?? v6.errorOrNull ?? v7.errorOrNull ?? v8.errorOrNull, ), ); } }); return c.future; } } /// Parallel operations on a record of futures. @Since("3.0") extension FutureRecord9 on ( Future, Future, Future, Future, Future, Future, Future, Future, Future, ) { /// {@macro record-parallel-wait} Future<(T1, T2, T3, T4, T5, T6, T7, T8, T9)> get wait { @pragma('vm:awaiter-link') final c = Completer<(T1, T2, T3, T4, T5, T6, T7, T8, T9)>.sync(); final v1 = _FutureResult($1); final v2 = _FutureResult($2); final v3 = _FutureResult($3); final v4 = _FutureResult($4); final v5 = _FutureResult($5); final v6 = _FutureResult($6); final v7 = _FutureResult($7); final v8 = _FutureResult($8); final v9 = _FutureResult($9); _FutureResult._waitAll([v1, v2, v3, v4, v5, v6, v7, v8, v9], (int errors) { if (errors == 0) { c.complete(( v1.value, v2.value, v3.value, v4.value, v5.value, v6.value, v7.value, v8.value, v9.value, )); } else { c.completeError( ParallelWaitError( ( v1.valueOrNull, v2.valueOrNull, v3.valueOrNull, v4.valueOrNull, v5.valueOrNull, v6.valueOrNull, v7.valueOrNull, v8.valueOrNull, v9.valueOrNull, ), ( v1.errorOrNull, v2.errorOrNull, v3.errorOrNull, v4.errorOrNull, v5.errorOrNull, v6.errorOrNull, v7.errorOrNull, v8.errorOrNull, v9.errorOrNull, ), errorCount: errors, defaultError: v1.errorOrNull ?? v2.errorOrNull ?? v3.errorOrNull ?? v4.errorOrNull ?? v5.errorOrNull ?? v6.errorOrNull ?? v7.errorOrNull ?? v8.errorOrNull ?? v9.errorOrNull, ), ); } }); return c.future; } } /// Error thrown when waiting for multiple futures, when some have errors. /// /// The [V] and [E] types will have the same basic shape as the /// original collection of futures that was waited on. /// /// For example, if the original awaited futures were a record /// `(Future, ..., Future)`, /// the type `V` will be `(T1?, ..., Tn?)` which allows keeping the /// values of futures that completed with a value, /// and `E` will be `(AsyncError?, ..., AsyncError?)`, also with *n* /// fields, which can contain the errors for the futures which completed /// with an error. /// /// Waiting for a list or iterable of futures should provide /// a list of nullable values and errors of the same length. @Since("3.0") class ParallelWaitError extends Error { /// Values of successful futures. /// /// Has the same shape as the original collection of futures, /// with values for each successful future and `null` values /// for each failing future. final V values; /// Errors of failing futures. /// /// Has the same shape as the original collection of futures, /// with errors, typically [AsyncError], for each failing /// future and `null` values for each successful future. final E errors; /// An error which, if present, is included in the [toString] output. /// /// If the default error has a stack trace, it's also reported by the /// [stackTrace] getter, instead of where this [ParallelWaitError] was thrown. final AsyncError? _defaultError; /// Number of errors, if available. final int? _errorCount; /// Creates error with the provided [values] and [errors]. /// /// If [defaultError] is provided, its [AsyncError.error] is used in /// the [toString] of this parallel error, and its [AsyncError.stackTrace] /// is returned by [stackTrace]. /// /// If [errorCount] is provided, and it's greater than one, /// the number is reported in the [toString]. ParallelWaitError( this.values, this.errors, { @Since("3.4") int? errorCount, @Since("3.4") AsyncError? defaultError, }) : _defaultError = defaultError, _errorCount = errorCount; String toString() { if (_defaultError == null) { if (_errorCount == null || _errorCount <= 1) { return "ParallelWaitError"; } return "ParallelWaitError($_errorCount errors)"; } return "ParallelWaitError${_errorCount != null && _errorCount > 1 // ? "($_errorCount errors)" : ""}: ${_defaultError.error}"; } StackTrace? get stackTrace => _defaultError?.stackTrace ?? super.stackTrace; } /// The result of a future, when it has completed. /// /// Stores a value result in [value] and an error result in [error]. /// Then calls [onReady] with a 0 argument for a value, and 1 for an error. /// /// The [onReady] callback must be set synchronously, /// before the future has a chance to complete. /// /// Abstracted into a class of its own in order to reuse code. class _FutureResult { // Consider integrating directly into `_Future` as a `_FutureListener` // to avoid creating as many function tear-offs. final Future source; /// The value or `null`. /// /// Set when the future completes with a value. T? valueOrNull; /// Set when the future completes with an error or value. AsyncError? errorOrNull; _FutureResult(this.source); /// The value. /// /// Should only be used when the future is known to have completed with /// a value. T get value => valueOrNull ?? valueOrNull as T; void _wait(@pragma('vm:awaiter-link') void Function(int) whenReady) { source.then( (T value) { valueOrNull = value; whenReady(0); }, onError: (Object error, StackTrace stack) { errorOrNull = AsyncError(error, stack); whenReady(1); }, ); } /// Waits for a number of [_FutureResult]s to all have completed. /// /// List must not be empty. static void _waitAll( List<_FutureResult> results, @pragma('vm:awaiter-link') void Function(int) whenReady, ) { assert(results.isNotEmpty); var ready = 0; var errors = 0; void onReady(int error) { errors += error; if (++ready == results.length) { whenReady(errors); } } for (var r in results) { r._wait(onReady); } } static void _noop(_) {} }