// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; import 'dart:io'; import 'event.dart'; import 'paths.dart'; import 'testing.dart'; /// Buffers [FileSystemEvent] streams into batches of events. /// /// Two batching strategies are available: "nearby microtask" and "buffered by /// path". extension BatchEvents on Stream { /// Batches all events that are sent at the same time. /// /// When multiple events are synchronously added to a stream controller, the /// [StreamController] implementation uses [scheduleMicrotask] to schedule the /// asynchronous firing of each event. In order to recreate the synchronous /// batches, this collates all the events that are received in "nearby" /// microtasks. /// /// Converts to [Event] using [Event.checkAndConvert], discarding events for /// which it returns `null`. Stream> batchNearbyMicrotasksAndConvertEvents() { var batch = []; return StreamTransformer>.fromHandlers( handleData: (event, sink) { var convertedEvent = Event.checkAndConvert(event); if (convertedEvent == null) return; batch.add(convertedEvent); // [Timer.run] schedules an event that runs after any microtasks that have // been scheduled. Timer.run(() { if (batch.isEmpty) return; sink.add(batch.toList()); batch.clear(); }); }, handleDone: (sink) { if (batch.isNotEmpty) { sink.add(batch.toList()); batch.clear(); } sink.close(); }).bind(this); } /// Batches events by path. /// /// For each path, events are emitted when they are at least [duration] old. /// Rather than emitting split by path, all pending events are periodically /// checked and all old-enough events are emitted in one batch. Stream> batchBufferedByPathAndConvertEvents( {required Duration duration}) { final batcher = _PathBufferedBatcher(duration); return StreamTransformer>.fromHandlers( handleData: batcher.handleData, handleDone: batcher.handleDone) .bind(this); } } class _PathBufferedBatcher { final bufferedEvents = {}; final Duration duration; bool checkAndEmitIsPending = false; _PathBufferedBatcher(this.duration); /// Adds events to [bufferedEvents]. /// /// Calls [maybeScheduleCheckAndEmit] to schedule a [checkAndEmit] if none is /// already pending. void handleData(FileSystemEvent event, Sink> sink) { final convertedEvent = Event.checkAndConvert(event); if (convertedEvent == null) return; for (final splitEvent in convertedEvent.splitIfMove()) { bufferedEvents .putIfAbsent(splitEvent.absolutePath, _BufferedEvents.new) .add(splitEvent); } maybeScheduleCheckAndEmit(sink); } /// If there is no timer running and there are events buffered, starts a timer /// with delay [duration] that will call [checkAndEmit] on [sink]. void maybeScheduleCheckAndEmit(Sink> sink) { if (checkAndEmitIsPending) return; if (bufferedEvents.isEmpty) return; checkAndEmitIsPending = true; Timer(duration, () => checkAndEmit(sink)); } /// Emits events older than [duration] to [sink]. /// /// If any events remain, calls [maybeScheduleCheckAndEmit] to schedule /// another check. void checkAndEmit(Sink> sink) { checkAndEmitIsPending = false; final events = []; final sendEventsBefore = overridableDateTimeNow().subtract(duration); for (var entry in bufferedEvents.entries.toList()) { if (entry.value.lastUpdated.isBefore(sendEventsBefore)) { events.addAll(entry.value.events); bufferedEvents.remove(entry.key); } } if (events.isNotEmpty) { sink.add(events); } maybeScheduleCheckAndEmit(sink); } /// Flushes buffered events and closes the [sink]. void handleDone(Sink> sink) { if (bufferedEvents.isNotEmpty) { sink.add(bufferedEvents.values.expand((x) => x.events).toList()); bufferedEvents.clear(); } sink.close(); } } class _BufferedEvents { final List events = []; DateTime _lastUpdated; _BufferedEvents() : _lastUpdated = overridableDateTimeNow(); void add(Event event) { events.add(event); _lastUpdated = overridableDateTimeNow(); } DateTime get lastUpdated => _lastUpdated; }