// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. import 'dart:async'; import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; import 'package:sse/server/sse_handler.dart'; import 'package:stream_channel/stream_channel.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import '../dds.dart'; import 'constants.dart'; import 'dap/adapters/dds_hosted_adapter.dart'; import 'dds_impl.dart'; import 'rpc_error_codes.dart'; import 'stream_manager.dart'; /// Representation of a single DDS client which manages the connection and /// DDS request intercepting / forwarding. class DartDevelopmentServiceClient { DartDevelopmentServiceClient.fromWebSocket( DartDevelopmentService dds, WebSocketChannel ws, json_rpc.Peer vmServicePeer, ) : this._( dds as DartDevelopmentServiceImpl, ws, vmServicePeer, ); DartDevelopmentServiceClient.fromSSEConnection( DartDevelopmentService dds, SseConnection sse, json_rpc.Peer vmServicePeer, ) : this._( dds as DartDevelopmentServiceImpl, sse, vmServicePeer, ); DartDevelopmentServiceClient._( this.dds, this.connection, json_rpc.Peer vmServicePeer, ) : _vmServicePeer = vmServicePeer { _clientPeer = json_rpc.Peer( // Manually create a StreamChannel instead of calling // .cast() as cast() results in addStream() being called, // binding the underlying sink. This results in a StateError being thrown // if we try and add directly to the sink, which we do for binary events // in StreamManager's streamNotify(). StreamChannel( connection.stream.cast(), StreamController(sync: true) ..stream .cast() .listen((event) => connection.sink.add(event)) .onDone(() => connection.sink.close()), ), strictProtocolChecks: false, ); _registerJsonRpcMethods(); } /// Start receiving JSON RPC requests from the client. /// /// Returned future completes when the peer is closed. Future listen() => _clientPeer.listen().then( (_) async { dds.streamManager.clientDisconnect(this); for (final pair in createdServiceIdZones) { await _vmServicePeer.sendRequest('deleteIdZone', { 'isolateId': pair.isolateId, 'idZoneId': pair.serviceIdZoneId, }); } }, ); /// Close the connection to the client. Future close() async { // Cleanup the JSON RPC server for this connection if DDS has shutdown. await _clientPeer.close(); } /// Send a JSON RPC notification to the client. void sendNotification(String method, [dynamic parameters]) { if (_clientPeer.isClosed) { return; } _clientPeer.sendNotification(method, parameters); } /// Send a JSON RPC request to the client. Future sendRequest(String method, [dynamic parameters]) async { if (_clientPeer.isClosed) { return null; } return await _clientPeer.sendRequest(method, parameters); } /// Registers handlers for JSON RPC method endpoints. void _registerJsonRpcMethods() { _clientPeer.registerMethod('streamListen', (parameters) async { final streamId = parameters['streamId'].asString; final includePrivates = parameters['_includePrivateMembers'].asBoolOr(false); await dds.streamManager.streamListen( this, streamId, includePrivates: includePrivates, ); return RPCResponses.success; }); /// jrpc endpoint for cancelling a stream. /// /// Parameters: /// 'streamId': the stream to be cancelled. _clientPeer.registerMethod('streamCancel', (parameters) async { final streamId = parameters['streamId'].asString; await dds.streamManager.streamCancel(this, streamId); return RPCResponses.success; }); /// jrpc endpoint for posting an event to a stream. /// /// Parameters: /// 'eventKind': the kind of event being sent. /// 'data': the data being sent over the stream. /// 'stream: the stream that is being posted to. _clientPeer.registerMethod('postEvent', (parameters) async { final eventKind = parameters['eventKind'].asString; final eventData = parameters['eventData'].asMap; final stream = parameters['stream'].asString; dds.streamManager.postEvent(stream, eventKind, eventData); return RPCResponses.success; }); _clientPeer.registerMethod('streamCpuSamplesWithUserTag', (parameters) async { final userTags = parameters['userTags'].asList.cast(); profilerUserTagFilters.clear(); profilerUserTagFilters.addAll(userTags); await dds.streamManager.updateUserTagSubscriptions(userTags); return RPCResponses.success; }); _clientPeer.registerMethod('registerService', (parameters) async { final serviceId = parameters['service'].asString; final alias = parameters['alias'].asString; if (services.containsKey(serviceId)) { throw RpcErrorCodes.buildRpcException( RpcErrorCodes.kServiceAlreadyRegistered, ); } services[serviceId] = alias; // Notify other clients that a new service extension is available. dds.streamManager.sendServiceRegisteredEvent( this, serviceId, alias, ); return RPCResponses.success; }); _clientPeer.registerMethod( 'getClientName', (parameters) => {'type': 'ClientName', 'name': name}, ); _clientPeer.registerMethod( 'setClientName', (parameters) => dds.clientManager.setClientName(this, parameters), ); _clientPeer.registerMethod( 'requirePermissionToResume', (parameters) => dds.clientManager.requirePermissionToResume(this, parameters), ); _clientPeer.registerMethod( 'resume', (parameters) => dds.isolateManager.resumeIsolate(this, parameters), ); _clientPeer.registerMethod( 'readyToResume', (parameters) => dds.isolateManager.readyToResume(this, parameters), ); _clientPeer.registerMethod( 'requireUserPermissionToResume', (parameters) => dds.isolateManager.requireUserPermissionToResume( this, parameters, ), ); _clientPeer.registerMethod('getStreamHistory', (parameters) { final stream = parameters['stream'].asString; final events = dds.streamManager.getStreamHistory(stream); if (events == null) { throw json_rpc.RpcException.invalidParams( "Event history is not collected for stream '$stream'", ); } return { 'type': 'StreamHistory', 'history': events, }; }); _clientPeer.registerMethod( 'getLogHistorySize', (parameters) => { 'type': 'Size', 'size': StreamManager .loggingRepositories[StreamManager.kLoggingStream]! .bufferSize, }); _clientPeer.registerMethod('setLogHistorySize', (parameters) { final size = parameters['size'].asInt; if (size < 0) { throw json_rpc.RpcException.invalidParams( "'size' must be greater or equal to zero", ); } StreamManager.loggingRepositories[StreamManager.kLoggingStream]! .resize(size); return RPCResponses.success; }); _clientPeer.registerMethod('getDartDevelopmentServiceVersion', (parameters) async { final ddsVersion = DartDevelopmentService.protocolVersion.split('.'); return { 'type': 'Version', 'major': int.parse(ddsVersion[0]), 'minor': int.parse(ddsVersion[1]), }; }); _clientPeer.registerMethod('getSupportedProtocols', (parameters) async { final Map supportedProtocols = (await _vmServicePeer .sendRequest('getSupportedProtocols')) as Map; final ddsVersion = DartDevelopmentService.protocolVersion.split('.'); final ddsProtocol = { 'protocolName': 'DDS', 'major': int.parse(ddsVersion[0]), 'minor': int.parse(ddsVersion[1]), }; supportedProtocols['protocols'] .cast>() .add(ddsProtocol); return supportedProtocols; }); _clientPeer.registerMethod( 'getAvailableCachedCpuSamples', (_) => { 'type': 'AvailableCachedCpuSamples', 'cacheNames': [], }, ); _clientPeer.registerMethod('createIdZone', (json_rpc.Parameters parameters) async { final response = await _vmServicePeer.sendRequest( parameters.method, parameters.value, ) as Map; if (response.containsKey('id')) { createdServiceIdZones.add(( serviceIdZoneId: response['id']!, isolateId: parameters['isolateId'].asString, )); } return response; }); _clientPeer.registerMethod( 'getCachedCpuSamples', (_) => { 'type': 'CachedCpuSamples', 'userTag': '', 'samplePeriod': -1, 'maxStackDepth': -1, 'sampleCount': -1, 'timeOriginMicros': -1, 'timeExtentMicros': -1, 'pid': -1, 'functions': [], 'samples': [], }, ); _clientPeer.registerMethod( 'getPerfettoVMTimelineWithCpuSamples', dds.isolateManager.getPerfettoVMTimelineWithCpuSamples, ); // `evaluate` and `evaluateInFrame` actually consist of multiple RPC // invocations, including a call to `compileExpression` which can be // overridden by clients which provide their own implementation (e.g., // Flutter Tools). We handle all of this in [_ExpressionEvaluator]. _clientPeer.registerMethod( 'evaluate', dds.expressionEvaluator.execute, ); _clientPeer.registerMethod( 'evaluateInFrame', dds.expressionEvaluator.execute, ); _clientPeer.registerMethod( 'lookupResolvedPackageUris', dds.packageUriConverter.convert, ); _clientPeer.registerMethod( 'sendDapRequest', (parameters) => dds.dapHandler.sendRequest(adapter, parameters), ); // When invoked within a fallback, the next fallback will start executing. // The final fallback forwards the request to the VM service directly. Never nextFallback() => throw json_rpc.RpcException.methodNotFound(''); // Handle service extension invocations. _clientPeer.registerFallback((parameters) async { hasNamespace(String method) => method.contains('.'); getMethod(String method) => method.split('.').last; getNamespace(String method) => method.split('.').first; if (!hasNamespace(parameters.method)) { nextFallback(); } // Lookup the client associated with the service extension's namespace. // If the client exists and that client has registered the specified // method, forward the request to that client. final method = getMethod(parameters.method); final namespace = getNamespace(parameters.method); final serviceClient = dds.clientManager.clients[namespace]; if (serviceClient != null && serviceClient.services.containsKey(method)) { return await Future.any( [ // Forward the request to the service client or... serviceClient.sendRequest(method, parameters.asMap).catchError((_) { throw RpcErrorCodes.buildRpcException( RpcErrorCodes.kServiceDisappeared, ); }, test: (error) => error is StateError), // if the service client closes, return an error response. serviceClient._clientPeer.done.then( (_) => throw RpcErrorCodes.buildRpcException( RpcErrorCodes.kServiceDisappeared, ), ), ], ); } throw json_rpc.RpcException( RpcErrorCodes.kMethodNotFound, 'Unknown service: ${parameters.method}', ); }); // Unless otherwise specified, the request is forwarded to the VM service. // NOTE: This must be the last fallback registered. _clientPeer.registerFallback((parameters) async { // If _vmServicePeer closes in the middle of a request, this will throw // a StateError. Listeners in dds_impl.dart will handle shutting down the // DDS instance, so we don't try and handle the error here. try { return await _vmServicePeer.sendRequest( parameters.method, parameters.value, ); } on StateError { throw RpcErrorCodes.buildRpcException( RpcErrorCodes.kServiceDisappeared, ); } }); } static int _idCounter = 0; final int _id = ++_idCounter; /// The name given to the client upon its creation. String get defaultClientName => 'client$_id'; /// The current name associated with this client. String? get name => _name; // NOTE: this should not be called directly except from: // - `ClientManager._clearClientName` // - `ClientManager._setClientNameHelper` set name(String? n) => _name = n ?? defaultClientName; String? _name; final DartDevelopmentServiceImpl dds; final StreamChannel connection; final Map services = {}; /// Pairs of 1) the ID of a Service ID zone created by this client and 2) the /// ID of the isolate in which that zone was created. final List<({String serviceIdZoneId, String isolateId})> createdServiceIdZones = []; final Set profilerUserTagFilters = {}; final json_rpc.Peer _vmServicePeer; late json_rpc.Peer _clientPeer; final DdsHostedAdapter adapter = DdsHostedAdapter(); }