// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:built_value/serializer.dart'; import 'package:web_socket_channel/io.dart'; import 'constants.dart'; import 'data/build_request.dart'; import 'data/build_status.dart'; import 'data/build_target.dart'; import 'data/build_target_request.dart'; import 'data/serializers.dart'; import 'data/server_log.dart'; import 'data/shutdown_notification.dart'; import 'src/file_wait.dart'; Future _existingPort(String workingDirectory) async { final portFile = File(portFilePath(workingDirectory)); if (!await waitForFile(portFile)) throw MissingPortFile(); return int.parse(portFile.readAsStringSync()); } Future _handleDaemonStartup( Process process, void Function(ServerLog) logHandler, ) async { process.stderr.transform(utf8.decoder).transform(const LineSplitter()).listen( (line) { logHandler( ServerLog((b) { b.level = Level.SEVERE; b.message = line; }), ); }, ); final stdout = process.stdout .transform(utf8.decoder) .transform(const LineSplitter()) .asBroadcastStream(); // The daemon may log critical information prior to it successfully // starting. Capture this data and forward to the logHandler. // // Whenever we see a `logStartMarker` we will parse everything between that // and the `logEndMarker` as a `ServerLog`. Everything else is considered a // normal INFO level log. StringBuffer? nextLogRecord; final sub = stdout.where((line) => !_isActionMessage(line)).listen((line) { if (nextLogRecord != null) { if (line == logEndMarker) { try { logHandler( serializers.deserialize(jsonDecode(nextLogRecord.toString())) as ServerLog, ); } catch (e, s) { logHandler( ServerLog( (builder) => builder ..message = 'Failed to read log message:\n$nextLogRecord' ..level = Level.SEVERE ..error = '$e' ..stackTrace = '$s', ), ); } nextLogRecord = null; } else { nextLogRecord!.writeln(line); } } else if (line == logStartMarker) { nextLogRecord = StringBuffer(); } else { logHandler( ServerLog((b) { b.level = Level.INFO; b.message = line; }), ); } }); final daemonAction = await stdout.firstWhere( _isActionMessage, orElse: () => throw StateError('Unable to start build daemon.'), ); if (daemonAction == versionSkew) { throw VersionSkew(); } else if (daemonAction == optionsSkew) { throw OptionsSkew(); } await sub.cancel(); } bool _isActionMessage(String line) => line == versionSkew || line == readyToConnectLog || line == optionsSkew; /// A client of the build daemon. /// /// Handles starting and connecting to the build daemon. /// /// Example: /// https://pub.dev/packages/build_daemon#-example-tab- class BuildDaemonClient { final _buildResults = StreamController.broadcast(); final _shutdownNotifications = StreamController.broadcast(); final Serializers _serializers; final IOWebSocketChannel _channel; BuildDaemonClient._( int port, this._serializers, void Function(ServerLog) logHandler, ) : _channel = IOWebSocketChannel.connect('ws://localhost:$port') { _channel.stream .listen((data) { final message = _serializers.deserialize(jsonDecode(data as String)); if (message is ServerLog) { logHandler(message); } else if (message is BuildResults) { _buildResults.add(message); } else if (message is ShutdownNotification) { _shutdownNotifications.add(message); } else { // In practice we should never reach this state due to the // deserialize call. throw StateError( 'Unexpected message from the Dart Build Daemon\n $message', ); } }) // TODO(grouma) - Implement proper error handling. .onError(print); } Stream get buildResults => _buildResults.stream; Stream get shutdownNotifications => _shutdownNotifications.stream; Future get finished async => await _channel.sink.done; /// Registers a build target to be built upon any file change. void registerBuildTarget(BuildTarget target) => _channel.sink.add( jsonEncode( _serializers.serialize(BuildTargetRequest((b) => b..target = target)), ), ); /// Builds all registered targets, including those not from this client. /// /// Note this will wait for any ongoing build to finish before starting a new /// one. void startBuild() { final request = BuildRequest(); _channel.sink.add(jsonEncode(_serializers.serialize(request))); } Future close() => _channel.sink.close(); /// Connects to the current daemon instance. /// /// The options of the running daemon are checked against [daemonCommand]. /// If there is a mismatch, an exception is thrown. /// /// If one is not running, a new daemon instance will be started. static Future connect( String workingDirectory, List daemonCommand, { Serializers? serializersOverride, void Function(ServerLog)? logHandler, bool includeParentEnvironment = true, Map? environment, BuildMode buildMode = BuildMode.Auto, }) async { logHandler ??= (_) {}; final daemonArgs = daemonCommand.sublist(1) ..add('--$buildModeFlag=$buildMode'); final process = await Process.start( daemonCommand.first, daemonArgs, mode: ProcessStartMode.detachedWithStdio, workingDirectory: workingDirectory, environment: environment, includeParentEnvironment: includeParentEnvironment, ); try { await _handleDaemonStartup(process, logHandler); } catch (_) { process.kill(); rethrow; } return connectUnchecked( workingDirectory, serializersOverride: serializersOverride, logHandler: logHandler, ); } /// Connects to the current daemon instance. /// /// Does not check the options the daemon is running with, so this is /// primarily useful in tests where the daemon has just been launched. /// /// To connect and check the options, use [connect]. static Future connectUnchecked( String workingDirectory, { Serializers? serializersOverride, void Function(ServerLog)? logHandler, }) async { logHandler ??= (_) {}; final daemonSerializers = serializersOverride ?? serializers; return BuildDaemonClient._( await _existingPort(workingDirectory), daemonSerializers, logHandler, ); } } /// Thrown when the port file for the running daemon instance can't be found. class MissingPortFile implements Exception {} /// Thrown if the client requests conflicting options with the current daemon /// instance. class OptionsSkew implements Exception {} /// Thrown if the current daemon instance version does not match that of the /// client. class VersionSkew implements Exception {}