// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; import 'dart:math'; import 'package:async/async.dart'; import 'package:dwds/src/utilities/shared.dart'; /// Stream controller allowing to batch events. class BatchedStreamController { static const _defaultBatchDelayMilliseconds = 1000; final int _checkDelayMilliseconds; final int _batchDelayMilliseconds; final StreamController _inputController; late StreamQueue _inputQueue; final StreamController> _outputController; final Completer _completer = Completer(); /// Create batched stream controller. /// /// Collects events from input [sink] and emits them in batches to the /// output [stream] every [delay] milliseconds. Keeps the original order. BatchedStreamController({int delay = _defaultBatchDelayMilliseconds}) : _batchDelayMilliseconds = delay, _checkDelayMilliseconds = max(delay ~/ 10, 1), _inputController = StreamController(), _outputController = StreamController>() { _inputQueue = StreamQueue(_inputController.stream); safeUnawaited(_batchAndSendEvents()); } /// Sink collecting events. StreamSink get sink => _inputController.sink; /// Output stream of batch events. Stream> get stream => _outputController.stream; /// Close the controller. Future close() { safeUnawaited(_inputController.close()); return _completer.future.then((value) => _outputController.close()); } /// Send events to the output in a batch every [_batchDelayMilliseconds]. Future _batchAndSendEvents() async { final duration = Duration(milliseconds: _checkDelayMilliseconds); final buffer = []; // Batch events every `_batchDelayMilliseconds`. // // Note that events might arrive at random intervals, so collecting // a predetermined number of events to send in a batch might delay // the batch indefinitely. Instead, check for new events every // `_checkDelayMilliseconds` to make sure batches are sent in regular // intervals. var lastSendTime = DateTime.now().millisecondsSinceEpoch; while (await _hasEventOrTimeOut(duration)) { if (await _hasEventDuring(duration)) { buffer.add(await _inputQueue.next); } final now = DateTime.now().millisecondsSinceEpoch; if (now > lastSendTime + _batchDelayMilliseconds) { lastSendTime = now; if (buffer.isNotEmpty) { _outputController.sink.add(List.of(buffer)); buffer.clear(); } } } if (buffer.isNotEmpty) { _outputController.sink.add(List.of(buffer)); } _completer.complete(true); } Future _hasEventOrTimeOut(Duration duration) => _inputQueue.hasNext.timeout(duration, onTimeout: () => true); Future _hasEventDuring(Duration duration) => _inputQueue.hasNext.timeout(duration, onTimeout: () => false); }