// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; import 'dart:collection'; final _empty = Future.value(); /// Finds and returns every node in a graph who's nodes and edges are /// asynchronously resolved. /// /// Cycles are allowed. If this is an undirected graph the [edges] function /// may be symmetric. In this case the [roots] may be any node in each connected /// graph. /// /// [V] is the type of values in the graph nodes. [K] must be a type suitable /// for using as a Map or Set key. [edges] should return the next reachable /// nodes. /// /// There are no ordering guarantees. This is useful for ensuring some work is /// performed at every node in an asynchronous graph, but does not give /// guarantees that the work is done in topological order. /// /// If either [readNode] or [edges] throws the error will be forwarded /// through the result stream and no further nodes will be crawled, though some /// work may have already been started. /// /// Crawling is eager, so calls to [edges] may overlap with other calls that /// have not completed. If the [edges] callback needs to be limited or throttled /// that must be done by wrapping it before calling [crawlAsync]. Stream crawlAsync( Iterable roots, FutureOr Function(K) readNode, FutureOr> Function(K, V) edges, ) { final crawl = _CrawlAsync(roots, readNode, edges)..run(); return crawl.result.stream; } class _CrawlAsync { final result = StreamController(); final FutureOr Function(K) readNode; final FutureOr> Function(K, V) edges; final Iterable roots; final _seen = HashSet(); _CrawlAsync(this.roots, this.readNode, this.edges); /// Add all nodes in the graph to [result] and return a Future which fires /// after all nodes have been seen. Future run() async { try { await Future.wait(roots.map(_visit), eagerError: true); await result.close(); } catch (e, st) { result.addError(e, st); await result.close(); } } /// Resolve the node at [key] and output it, then start crawling all of it's /// edges. Future _crawlFrom(K key) async { final value = await readNode(key); if (result.isClosed) return; result.add(value); final next = await edges(key, value); await Future.wait(next.map(_visit), eagerError: true); } /// Synchronously record that [key] is being handled then start work on the /// node for [key]. /// /// The returned Future will complete only after the work for [key] and all /// transitively reachable nodes has either been finished, or will be finished /// by some other Future in [_seen]. Future _visit(K key) { if (_seen.contains(key)) return _empty; _seen.add(key); return _crawlFrom(key); } }