// Copyright 2014 The Flutter Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. import 'dart:async'; import 'dart:io'; import 'dart:typed_data'; import 'package:file/memory.dart'; import 'package:flutter_tools/src/application_package.dart'; import 'package:flutter_tools/src/base/dds.dart'; import 'package:flutter_tools/src/base/file_system.dart'; import 'package:flutter_tools/src/base/logger.dart'; import 'package:flutter_tools/src/base/utils.dart'; import 'package:flutter_tools/src/daemon.dart'; import 'package:flutter_tools/src/device.dart'; import 'package:flutter_tools/src/device_vm_service_discovery_for_attach.dart'; import 'package:flutter_tools/src/proxied_devices/devices.dart'; import 'package:flutter_tools/src/proxied_devices/file_transfer.dart'; import 'package:flutter_tools/src/resident_runner.dart'; import 'package:test/fake.dart'; import '../../src/common.dart'; import '../../src/fake_devices.dart'; void main() { late BufferLogger bufferLogger; late DaemonConnection serverDaemonConnection; late DaemonConnection clientDaemonConnection; setUp(() { bufferLogger = BufferLogger.test(); final serverDaemonStreams = FakeDaemonStreams(); serverDaemonConnection = DaemonConnection( daemonStreams: serverDaemonStreams, logger: bufferLogger, ); final clientDaemonStreams = FakeDaemonStreams(); clientDaemonConnection = DaemonConnection( daemonStreams: clientDaemonStreams, logger: bufferLogger, ); serverDaemonStreams.inputs.addStream(clientDaemonStreams.outputs.stream); clientDaemonStreams.inputs.addStream(serverDaemonStreams.outputs.stream); }); tearDown(() async { await serverDaemonConnection.dispose(); await clientDaemonConnection.dispose(); }); group('ProxiedPortForwarder', () { testWithoutContext('works correctly without device id', () async { final fakeServerSocket = FakeServerSocket(200); final portForwarder = ProxiedPortForwarder( clientDaemonConnection, logger: bufferLogger, createSocketServer: (Logger logger, int? hostPort, bool? ipv6) async => fakeServerSocket, ); final int result = await portForwarder.forward(100); expect(result, 200); final fakeSocket = FakeSocket(); fakeServerSocket.controller.add(fakeSocket); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); DaemonMessage message = await broadcastOutput.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'proxy.connect'); expect(message.data['params'], {'port': 100}); const id = 'random_id'; serverDaemonConnection.sendResponse(message.data['id']!, id); // Forwards the data received from socket to daemon. fakeSocket.controller.add(Uint8List.fromList([1, 2, 3])); message = await broadcastOutput.first; expect(message.data['method'], 'proxy.write'); expect(message.data['params'], {'id': id}); expect(message.binary, isNotNull); final List> binary = await message.binary!.toList(); expect(binary, >[ [1, 2, 3], ]); // Forwards data received as event to socket. expect(fakeSocket.addedData.isEmpty, true); serverDaemonConnection.sendEvent('proxy.data.$id', null, [4, 5, 6]); await pumpEventQueue(); expect(fakeSocket.addedData.isNotEmpty, true); expect(fakeSocket.addedData[0], [4, 5, 6]); // Closes the socket after the remote end disconnects expect(fakeSocket.closeCalled, false); serverDaemonConnection.sendEvent('proxy.disconnected.$id'); await pumpEventQueue(); expect(fakeSocket.closeCalled, true); }); testWithoutContext('handles errors', () async { final fakeServerSocket = FakeServerSocket(200); final portForwarder = ProxiedPortForwarder( FakeDaemonConnection( handledRequests: { 'proxy.connect': '1', // id }, ), logger: bufferLogger, createSocketServer: (Logger logger, int? hostPort, bool? ipv6) async => fakeServerSocket, ); final int result = await portForwarder.forward(100); expect(result, 200); final fakeSocket = FakeSocket(); fakeServerSocket.controller.add(fakeSocket); fakeSocket.controller.add(Uint8List.fromList([1, 2, 3])); await pumpEventQueue(); }); testWithoutContext('forwards the port from the remote end with device id', () async { final fakeServerSocket = FakeServerSocket(400); final portForwarder = ProxiedPortForwarder( clientDaemonConnection, deviceId: 'device_id', logger: bufferLogger, createSocketServer: (Logger logger, int? hostPort, bool? ipv6) async => fakeServerSocket, ); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final Future result = portForwarder.forward(300); DaemonMessage message = await broadcastOutput.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'device.forward'); expect(message.data['params'], {'deviceId': 'device_id', 'devicePort': 300}); serverDaemonConnection.sendResponse(message.data['id']!, {'hostPort': 350}); expect(await result, 400); final fakeSocket = FakeSocket(); fakeServerSocket.controller.add(fakeSocket); message = await broadcastOutput.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'proxy.connect'); expect(message.data['params'], {'port': 350}); const id = 'random_id'; serverDaemonConnection.sendResponse(message.data['id']!, id); // Unforward will try to disconnect the remote port. portForwarder.forwardedPorts.single.dispose(); expect(fakeServerSocket.closeCalled, true); message = await broadcastOutput.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'device.unforward'); expect(message.data['params'], { 'deviceId': 'device_id', 'devicePort': 300, 'hostPort': 350, }); }); group('socket done', () { late Stream broadcastOutput; late FakeSocket fakeSocket; const id = 'random_id'; setUp(() async { final fakeServerSocket = FakeServerSocket(400); final portForwarder = ProxiedPortForwarder( clientDaemonConnection, deviceId: 'device_id', logger: bufferLogger, createSocketServer: (Logger logger, int? hostPort, bool? ipv6) async => fakeServerSocket, ); broadcastOutput = serverDaemonConnection.incomingCommands.asBroadcastStream(); unawaited(portForwarder.forward(300)); // Consumes the message. DaemonMessage message = await broadcastOutput.first; serverDaemonConnection.sendResponse(message.data['id']!, { 'hostPort': 350, }); fakeSocket = FakeSocket(); fakeServerSocket.controller.add(fakeSocket); // Consumes the message. message = await broadcastOutput.first; serverDaemonConnection.sendResponse(message.data['id']!, id); // Pump the event queue so that the socket future error handler has a // chance to be listened to. await pumpEventQueue(); }); testWithoutContext('without error, should calls proxy.disconnect', () async { // It will try to disconnect the remote port when socket is done. fakeSocket.doneCompleter.complete(true); final DaemonMessage message = await broadcastOutput.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'proxy.disconnect'); expect(message.data['params'], {'id': 'random_id'}); }); testWithoutContext('with error, should also calls proxy.disconnect', () async { fakeSocket.doneCompleter.complete(true); final DaemonMessage message = await broadcastOutput.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'proxy.disconnect'); expect(message.data['params'], {'id': 'random_id'}); // Send an error response and make sure that it won't crash the client. serverDaemonConnection.sendErrorResponse( message.data['id']!, 'some error', StackTrace.current, ); // Wait the event queue and make sure that it doesn't crash. await pumpEventQueue(); }); testWithoutContext('should not forward new data to socket after disconnection', () async { // Data will be forwarded before disconnection serverDaemonConnection.sendEvent('proxy.data.$id', null, [1, 2, 3]); await pumpEventQueue(); expect(fakeSocket.addedData, >[ [1, 2, 3], ]); // It will try to disconnect the remote port when socket is done. fakeSocket.doneCompleter.complete(true); final DaemonMessage message = await broadcastOutput.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'proxy.disconnect'); expect(message.data['params'], {'id': 'random_id'}); await pumpEventQueue(); serverDaemonConnection.sendEvent('proxy.data.$id', null, [4, 5, 6]); await pumpEventQueue(); expect(fakeSocket.addedData, >[ [1, 2, 3], ]); }); }); testWithoutContext('disposes multiple sockets correctly', () async { final fakeServerSocket = FakeServerSocket(200); final portForwarder = ProxiedPortForwarder( clientDaemonConnection, logger: bufferLogger, createSocketServer: (Logger logger, int? hostPort, bool? ipv6) async => fakeServerSocket, ); final int result = await portForwarder.forward(100); expect(result, 200); final fakeSocket1 = FakeSocket(); final fakeSocket2 = FakeSocket(); fakeServerSocket.controller.add(fakeSocket1); fakeServerSocket.controller.add(fakeSocket2); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final DaemonMessage message1 = await broadcastOutput.first; expect(message1.data['id'], isNotNull); expect(message1.data['method'], 'proxy.connect'); expect(message1.data['params'], {'port': 100}); const id1 = 'random_id1'; serverDaemonConnection.sendResponse(message1.data['id']!, id1); final DaemonMessage message2 = await broadcastOutput.first; expect(message2.data['id'], isNotNull); expect(message2.data['id'], isNot(message1.data['id'])); expect(message2.data['method'], 'proxy.connect'); expect(message2.data['params'], {'port': 100}); const id2 = 'random_id2'; serverDaemonConnection.sendResponse(message2.data['id']!, id2); await pumpEventQueue(); // Closes the socket after port forwarder dispose. expect(fakeSocket1.closeCalled, false); expect(fakeSocket2.closeCalled, false); await portForwarder.dispose(); expect(fakeSocket1.closeCalled, true); expect(fakeSocket2.closeCalled, true); }); }); final fakeDevice = { 'name': 'device-name', 'id': 'device-id', 'category': 'mobile', 'platformType': 'android', 'platform': 'android-arm', 'emulator': true, 'ephemeral': false, 'sdk': 'Test SDK (1.2.3)', 'capabilities': { 'hotReload': true, 'hotRestart': true, 'screenshot': false, 'flutterExit': true, 'hardwareRendering': true, 'startPaused': true, }, }; final fakeDevice2 = { 'name': 'device-name2', 'id': 'device-id2', 'category': 'mobile', 'platformType': 'android', 'platform': 'android-arm', 'emulator': true, 'ephemeral': false, 'sdk': 'Test SDK (1.2.3)', 'capabilities': { 'hotReload': true, 'hotRestart': true, 'screenshot': false, 'flutterExit': true, 'hardwareRendering': true, 'startPaused': true, }, }; group('ProxiedDevice', () { testWithoutContext('calls stopApp without application package if not passed', () async { bufferLogger = BufferLogger.test(); final proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger); final ProxiedDevice device = proxiedDevices.deviceFromDaemonResult(fakeDevice); unawaited(device.stopApp(null, userIdentifier: 'user-id')); final DaemonMessage message = await serverDaemonConnection.incomingCommands.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'device.stopApp'); expect(message.data['params'], { 'deviceId': 'device-id', 'userIdentifier': 'user-id', }); }); group('when launching an app with PrebuiltApplicationPackage', () { late MemoryFileSystem fileSystem; late FakePrebuiltApplicationPackage applicationPackage; const fileContent = [100, 120, 140]; setUp(() { fileSystem = MemoryFileSystem.test() ..directory('dir').createSync() ..file('dir/foo').writeAsBytesSync(fileContent); applicationPackage = FakePrebuiltApplicationPackage(fileSystem.file('dir/foo')); }); testWithoutContext('transfers file to the daemon', () async { bufferLogger = BufferLogger.test(); final proxiedDevices = ProxiedDevices( clientDaemonConnection, logger: bufferLogger, deltaFileTransfer: false, ); final ProxiedDevice device = proxiedDevices.deviceFromDaemonResult(fakeDevice); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final Future resultFuture = device.applicationPackageId(applicationPackage); // Send proxy.writeTempFile. final DaemonMessage writeTempFileMessage = await broadcastOutput.first; expect(writeTempFileMessage.data['id'], isNotNull); expect(writeTempFileMessage.data['method'], 'proxy.writeTempFile'); expect(writeTempFileMessage.data['params'], {'path': 'foo'}); expect(await writeTempFileMessage.binary?.first, fileContent); serverDaemonConnection.sendResponse(writeTempFileMessage.data['id']!); // Send device.uploadApplicationPackage. final DaemonMessage uploadApplicationPackageMessage = await broadcastOutput.first; expect(uploadApplicationPackageMessage.data['id'], isNotNull); expect(uploadApplicationPackageMessage.data['method'], 'device.uploadApplicationPackage'); expect(uploadApplicationPackageMessage.data['params'], { 'targetPlatform': 'android-arm', 'applicationBinary': 'foo', }); serverDaemonConnection.sendResponse(uploadApplicationPackageMessage.data['id']!, 'test_id'); expect(await resultFuture, 'test_id'); }); testWithoutContext( 'transfers file to the daemon with delta turned on, file not exist on remote', () async { bufferLogger = BufferLogger.test(); final fileTransfer = FakeFileTransfer(); final proxiedDevices = ProxiedDevices( clientDaemonConnection, logger: bufferLogger, fileTransfer: fileTransfer, ); final ProxiedDevice device = proxiedDevices.deviceFromDaemonResult(fakeDevice); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final Future resultFuture = device.applicationPackageId(applicationPackage); // Send proxy.calculateFileHashes. final DaemonMessage calculateFileHashesMessage = await broadcastOutput.first; expect(calculateFileHashesMessage.data['id'], isNotNull); expect(calculateFileHashesMessage.data['method'], 'proxy.calculateFileHashes'); expect(calculateFileHashesMessage.data['params'], {'path': 'foo'}); serverDaemonConnection.sendResponse(calculateFileHashesMessage.data['id']!); // Send proxy.writeTempFile. final DaemonMessage writeTempFileMessage = await broadcastOutput.first; expect(writeTempFileMessage.data['id'], isNotNull); expect(writeTempFileMessage.data['method'], 'proxy.writeTempFile'); expect(writeTempFileMessage.data['params'], {'path': 'foo'}); expect(await writeTempFileMessage.binary?.first, fileContent); serverDaemonConnection.sendResponse(writeTempFileMessage.data['id']!); // Send device.uploadApplicationPackage. final DaemonMessage uploadApplicationPackageMessage = await broadcastOutput.first; expect(uploadApplicationPackageMessage.data['id'], isNotNull); expect(uploadApplicationPackageMessage.data['method'], 'device.uploadApplicationPackage'); expect(uploadApplicationPackageMessage.data['params'], { 'targetPlatform': 'android-arm', 'applicationBinary': 'foo', }); serverDaemonConnection.sendResponse( uploadApplicationPackageMessage.data['id']!, 'test_id', ); expect(await resultFuture, 'test_id'); }, ); testWithoutContext( 'transfers file to the daemon with delta turned on, file exists on remote', () async { bufferLogger = BufferLogger.test(); final fileTransfer = FakeFileTransfer(); const blockHashes = BlockHashes( blockSize: 10, totalSize: 30, adler32: [1, 2, 3], md5: ['a', 'b', 'c'], fileMd5: 'abc', ); const deltaBlocks = [ FileDeltaBlock.fromSource(start: 10, size: 10), FileDeltaBlock.fromDestination(start: 30, size: 40), ]; fileTransfer.binary = Uint8List.fromList([11, 12, 13]); fileTransfer.delta = deltaBlocks; final proxiedDevices = ProxiedDevices( clientDaemonConnection, logger: bufferLogger, fileTransfer: fileTransfer, ); final ProxiedDevice device = proxiedDevices.deviceFromDaemonResult(fakeDevice); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final Future resultFuture = device.applicationPackageId(applicationPackage); // Send proxy.calculateFileHashes. final DaemonMessage calculateFileHashesMessage = await broadcastOutput.first; expect(calculateFileHashesMessage.data['id'], isNotNull); expect(calculateFileHashesMessage.data['method'], 'proxy.calculateFileHashes'); expect(calculateFileHashesMessage.data['params'], {'path': 'foo'}); serverDaemonConnection.sendResponse( calculateFileHashesMessage.data['id']!, blockHashes.toJson(), ); // Send proxy.updateFile. final DaemonMessage updateFileMessage = await broadcastOutput.first; expect(updateFileMessage.data['id'], isNotNull); expect(updateFileMessage.data['method'], 'proxy.updateFile'); expect(updateFileMessage.data['params'], { 'path': 'foo', 'delta': >[ {'size': 10}, {'start': 30, 'size': 40}, ], }); expect(await updateFileMessage.binary?.first, [11, 12, 13]); serverDaemonConnection.sendResponse(updateFileMessage.data['id']!); // Send device.uploadApplicationPackage. final DaemonMessage uploadApplicationPackageMessage = await broadcastOutput.first; expect(uploadApplicationPackageMessage.data['id'], isNotNull); expect(uploadApplicationPackageMessage.data['method'], 'device.uploadApplicationPackage'); expect(uploadApplicationPackageMessage.data['params'], { 'targetPlatform': 'android-arm', 'applicationBinary': 'foo', }); serverDaemonConnection.sendResponse( uploadApplicationPackageMessage.data['id']!, 'test_id', ); expect(await resultFuture, 'test_id'); }, ); }); }); group('ProxiedDevices', () { testWithoutContext('devices respects the filter passed in', () async { bufferLogger = BufferLogger.test(); final proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger); final fakeFilter = FakeDeviceDiscoveryFilter(); final supportedDevice = FakeDevice('Device', 'supported'); fakeFilter.filteredDevices = [supportedDevice]; final Future> resultFuture = proxiedDevices.devices(filter: fakeFilter); final DaemonMessage message = await serverDaemonConnection.incomingCommands.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'device.discoverDevices'); serverDaemonConnection.sendResponse(message.data['id']!, >[ fakeDevice, fakeDevice2, ]); final List result = await resultFuture; expect(result.length, 1); expect(result.first.id, supportedDevice.id); expect(fakeFilter.devices!.length, 2); expect(fakeFilter.devices![0].id, fakeDevice['id']); expect(fakeFilter.devices![1].id, fakeDevice2['id']); }); testWithoutContext('publishes the devices on deviceNotifier after startPolling', () async { bufferLogger = BufferLogger.test(); final proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger); proxiedDevices.startPolling(); final ItemListNotifier deviceNotifier = proxiedDevices.deviceNotifier; expect(deviceNotifier, isNotNull); final devicesAdded = []; deviceNotifier.onAdded.listen((Device device) { devicesAdded.add(device); }); final DaemonMessage message = await serverDaemonConnection.incomingCommands.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'device.discoverDevices'); serverDaemonConnection.sendResponse(message.data['id']!, >[ fakeDevice, fakeDevice2, ]); await pumpEventQueue(); expect(devicesAdded.length, 2); expect(devicesAdded[0].id, fakeDevice['id']); expect(devicesAdded[1].id, fakeDevice2['id']); }); testWithoutContext('handles getDiagnostics', () async { bufferLogger = BufferLogger.test(); final proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger); final Future> resultFuture = proxiedDevices.getDiagnostics(); final DaemonMessage message = await serverDaemonConnection.incomingCommands.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'device.getDiagnostics'); serverDaemonConnection.sendResponse(message.data['id']!, ['1', '2']); final List result = await resultFuture; expect(result, ['1', '2']); }); testWithoutContext( 'returns empty result when daemon does not understand getDiagnostics', () async { bufferLogger = BufferLogger.test(); final proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger); final Future> resultFuture = proxiedDevices.getDiagnostics(); final DaemonMessage message = await serverDaemonConnection.incomingCommands.first; expect(message.data['id'], isNotNull); expect(message.data['method'], 'device.getDiagnostics'); serverDaemonConnection.sendErrorResponse( message.data['id']!, 'command not understood: device.getDiagnostics', StackTrace.current, ); final List result = await resultFuture; expect(result, isEmpty); }, ); }); group('ProxiedDartDevelopmentService', () { testWithoutContext('forwards start and shutdown to remote', () async { final portForwarder = FakeProxiedPortForwarder(); portForwarder.originalRemotePortReturnValue = 200; portForwarder.forwardReturnValue = 400; final devicePortForwarder = FakeProxiedPortForwarder(); final dds = ProxiedDartDevelopmentService( clientDaemonConnection, 'test_id', logger: bufferLogger, proxiedPortForwarder: portForwarder, devicePortForwarder: devicePortForwarder, ); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final Future startFuture = dds.startDartDevelopmentService( Uri.parse('http://127.0.0.1:100/fake'), disableServiceAuthCodes: true, ddsPort: 150, ipv6: false, ); final DaemonMessage startMessage = await broadcastOutput.first; expect(startMessage.data['id'], isNotNull); expect(startMessage.data['method'], 'device.startDartDevelopmentService'); expect(startMessage.data['params'], { 'deviceId': 'test_id', 'vmServiceUri': 'http://127.0.0.1:200/fake', 'disableServiceAuthCodes': true, 'enableDevTools': false, }); serverDaemonConnection.sendResponse(startMessage.data['id']!, const { 'ddsUri': 'http://127.0.0.1:300/remote', }); await startFuture; expect(portForwarder.receivedLocalForwardedPort, 100); expect(portForwarder.forwardedDevicePort, 300); expect(portForwarder.forwardedHostPort, 150); expect(portForwarder.forwardedIpv6, false); expect(dds.uri, Uri.parse('http://127.0.0.1:400/remote')); expect( bufferLogger.eventText.trim(), '{"name":"device.proxied_dds_forwarded","args":{"deviceId":"test_id","remoteUri":"http://127.0.0.1:300/remote","localUri":"http://127.0.0.1:400/remote"}}', ); unawaited(dds.shutdown()); final DaemonMessage shutdownMessage = await broadcastOutput.first; expect(shutdownMessage.data['id'], isNotNull); expect(shutdownMessage.data['method'], 'device.shutdownDartDevelopmentService'); expect(shutdownMessage.data['params'], {'deviceId': 'test_id'}); }); testWithoutContext( 'forwards start and shutdown to remote if port was forwarded by the device port forwarder', () async { final portForwarder = FakeProxiedPortForwarder(); portForwarder.forwardReturnValue = 400; final devicePortForwarder = FakeProxiedPortForwarder(); devicePortForwarder.originalRemotePortReturnValue = 200; final dds = ProxiedDartDevelopmentService( clientDaemonConnection, 'test_id', logger: bufferLogger, proxiedPortForwarder: portForwarder, devicePortForwarder: devicePortForwarder, ); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final Future startFuture = dds.startDartDevelopmentService( Uri.parse('http://127.0.0.1:100/fake'), disableServiceAuthCodes: true, ddsPort: 150, ipv6: false, ); final DaemonMessage startMessage = await broadcastOutput.first; expect(startMessage.data['id'], isNotNull); expect(startMessage.data['method'], 'device.startDartDevelopmentService'); expect(startMessage.data['params'], { 'deviceId': 'test_id', 'vmServiceUri': 'http://127.0.0.1:200/fake', 'disableServiceAuthCodes': true, 'enableDevTools': false, }); serverDaemonConnection.sendResponse(startMessage.data['id']!, { 'ddsUri': 'http://127.0.0.1:300/remote', }); await startFuture; expect(portForwarder.receivedLocalForwardedPort, 100); expect(portForwarder.forwardedDevicePort, 300); expect(portForwarder.forwardedHostPort, 150); expect(portForwarder.forwardedIpv6, false); expect(dds.uri, Uri.parse('http://127.0.0.1:400/remote')); expect( bufferLogger.eventText.trim(), '{"name":"device.proxied_dds_forwarded","args":{"deviceId":"test_id","remoteUri":"http://127.0.0.1:300/remote","localUri":"http://127.0.0.1:400/remote"}}', ); unawaited(dds.shutdown()); final DaemonMessage shutdownMessage = await broadcastOutput.first; expect(shutdownMessage.data['id'], isNotNull); expect(shutdownMessage.data['method'], 'device.shutdownDartDevelopmentService'); expect(shutdownMessage.data['params'], {'deviceId': 'test_id'}); }, ); testWithoutContext( 'starts a local dds if the VM service port is not a forwarded port', () async { final portForwarder = FakeProxiedPortForwarder(); final devicePortForwarder = FakeProxiedPortForwarder(); final localDds = FakeDartDevelopmentService(); localDds.uri = Uri.parse('http://127.0.0.1:450/local'); final dds = ProxiedDartDevelopmentService( clientDaemonConnection, 'test_id', logger: bufferLogger, proxiedPortForwarder: portForwarder, devicePortForwarder: devicePortForwarder, localDds: localDds, ); expect(localDds.startCalled, false); await dds.startDartDevelopmentService( Uri.parse('http://127.0.0.1:100/fake'), disableServiceAuthCodes: true, ddsPort: 150, ipv6: false, ); expect(localDds.startCalled, true); expect(portForwarder.receivedLocalForwardedPort, 100); expect(portForwarder.forwardedDevicePort, null); expect(dds.uri, Uri.parse('http://127.0.0.1:450/local')); expect(localDds.shutdownCalled, false); await dds.shutdown(); expect(localDds.shutdownCalled, true); await serverDaemonConnection.dispose(); expect(await serverDaemonConnection.incomingCommands.isEmpty, true); }, ); testWithoutContext( 'starts a local dds if the remote VM does not support starting DDS', () async { final portForwarder = FakeProxiedPortForwarder(); portForwarder.originalRemotePortReturnValue = 200; final devicePortForwarder = FakeProxiedPortForwarder(); final localDds = FakeDartDevelopmentService(); localDds.uri = Uri.parse('http://127.0.0.1:450/local'); final dds = ProxiedDartDevelopmentService( clientDaemonConnection, 'test_id', logger: bufferLogger, proxiedPortForwarder: portForwarder, devicePortForwarder: devicePortForwarder, localDds: localDds, ); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final Future startFuture = dds.startDartDevelopmentService( Uri.parse('http://127.0.0.1:100/fake'), disableServiceAuthCodes: true, ddsPort: 150, ipv6: false, ); expect(localDds.startCalled, false); final DaemonMessage startMessage = await broadcastOutput.first; expect(startMessage.data['id'], isNotNull); expect(startMessage.data['method'], 'device.startDartDevelopmentService'); expect(startMessage.data['params'], { 'deviceId': 'test_id', 'vmServiceUri': 'http://127.0.0.1:200/fake', 'disableServiceAuthCodes': true, 'enableDevTools': false, }); serverDaemonConnection.sendErrorResponse( startMessage.data['id']!, 'command not understood: device.startDartDevelopmentService', StackTrace.current, ); await startFuture; expect(localDds.startCalled, true); expect(portForwarder.receivedLocalForwardedPort, 100); expect(portForwarder.forwardedDevicePort, null); expect(dds.uri, Uri.parse('http://127.0.0.1:450/local')); expect(localDds.shutdownCalled, false); await dds.shutdown(); expect(localDds.shutdownCalled, true); }, ); }); group('ProxiedVMServiceDiscoveryForAttach', () { testWithoutContext('sends the request and forwards the port', () async { final portForwarder = FakeProxiedPortForwarder(); portForwarder.forwardReturnValue = 400; final discovery = ProxiedVMServiceDiscoveryForAttach( clientDaemonConnection, 'test_device', proxiedPortForwarder: portForwarder, fallbackDiscovery: () => throw UnimplementedError(), ipv6: false, logger: bufferLogger, ); final uriCompleter = Completer(); // Start listening on the stream to trigger sending the request. discovery.uris.listen(uriCompleter.complete); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final DaemonMessage startMessage = await broadcastOutput.first; expect(startMessage.data['id'], isNotNull); expect(startMessage.data['method'], 'device.startVMServiceDiscoveryForAttach'); expect(startMessage.data['params'], { 'deviceId': 'test_device', 'appId': null, 'fuchsiaModule': null, 'filterDevicePort': null, 'ipv6': false, }); serverDaemonConnection.sendResponse(startMessage.data['id']!, 'request_id'); serverDaemonConnection.sendEvent( 'device.VMServiceDiscoveryForAttach.request_id', 'http://127.0.0.1:300/auth_code', ); expect(await uriCompleter.future, Uri.parse('http://127.0.0.1:400/auth_code')); expect(portForwarder.forwardedDevicePort, 300); expect(portForwarder.forwardedHostPort, null); }); testWithoutContext('sends additional information, and forwards the correct port', () async { final portForwarder = FakeProxiedPortForwarder(); portForwarder.forwardReturnValue = 400; final discovery = ProxiedVMServiceDiscoveryForAttach( clientDaemonConnection, 'test_device', proxiedPortForwarder: portForwarder, fallbackDiscovery: () => throw UnimplementedError(), appId: 'test_app_id', fuchsiaModule: 'test_fuchsia_module', filterDevicePort: 100, expectedHostPort: 200, ipv6: false, logger: bufferLogger, ); final uriCompleter = Completer(); // Start listening on the stream to trigger sending the request. discovery.uris.listen(uriCompleter.complete); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final DaemonMessage startMessage = await broadcastOutput.first; expect(startMessage.data['id'], isNotNull); expect(startMessage.data['method'], 'device.startVMServiceDiscoveryForAttach'); expect(startMessage.data['params'], { 'deviceId': 'test_device', 'appId': 'test_app_id', 'fuchsiaModule': 'test_fuchsia_module', 'filterDevicePort': 100, 'ipv6': false, }); serverDaemonConnection.sendResponse(startMessage.data['id']!, 'request_id'); serverDaemonConnection.sendEvent( 'device.VMServiceDiscoveryForAttach.request_id', 'http://127.0.0.1:300/auth_code', ); expect(await uriCompleter.future, Uri.parse('http://127.0.0.1:400/auth_code')); expect(portForwarder.forwardedDevicePort, 300); expect(portForwarder.forwardedHostPort, 200); }); testWithoutContext( 'use the fallback discovery if the remote daemon does not support proxied discovery', () async { final portForwarder = FakeProxiedPortForwarder(); final fallbackUri = Stream.value(Uri.parse('http://127.0.0.1:500/fallback_auth_code')); final discovery = ProxiedVMServiceDiscoveryForAttach( clientDaemonConnection, 'test_device', proxiedPortForwarder: portForwarder, fallbackDiscovery: () => FakeVMServiceDiscoveryForAttach(fallbackUri), ipv6: false, logger: bufferLogger, ); final uriCompleter = Completer(); // Start listening on the stream to trigger sending the request. discovery.uris.listen(uriCompleter.complete); final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final DaemonMessage startMessage = await broadcastOutput.first; expect(startMessage.data['id'], isNotNull); expect(startMessage.data['method'], 'device.startVMServiceDiscoveryForAttach'); expect(startMessage.data['params'], { 'deviceId': 'test_device', 'appId': null, 'fuchsiaModule': null, 'filterDevicePort': null, 'ipv6': false, }); serverDaemonConnection.sendErrorResponse( startMessage.data['id']!, 'command not understood: device.startDartDevelopmentService', StackTrace.current, ); expect(await uriCompleter.future, Uri.parse('http://127.0.0.1:500/fallback_auth_code')); expect(portForwarder.forwardedDevicePort, null); expect(portForwarder.forwardedHostPort, null); }, ); testWithoutContext('forwards other error from the daemon', () async { final portForwarder = FakeProxiedPortForwarder(); final fallbackUri = Stream.value(Uri.parse('http://127.0.0.1:500/fallback_auth_code')); final discovery = ProxiedVMServiceDiscoveryForAttach( clientDaemonConnection, 'test_device', proxiedPortForwarder: portForwarder, fallbackDiscovery: () => FakeVMServiceDiscoveryForAttach(fallbackUri), ipv6: false, logger: bufferLogger, ); // Start listening on the stream to trigger sending the request. final Future uriFuture = discovery.uris.first; final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final DaemonMessage startMessage = await broadcastOutput.first; expect(startMessage.data['id'], isNotNull); expect(startMessage.data['method'], 'device.startVMServiceDiscoveryForAttach'); expect(startMessage.data['params'], { 'deviceId': 'test_device', 'appId': null, 'fuchsiaModule': null, 'filterDevicePort': null, 'ipv6': false, }); serverDaemonConnection.sendErrorResponse( startMessage.data['id']!, 'other error', StackTrace.current, ); expect(uriFuture, throwsA('other error')); expect(portForwarder.forwardedDevicePort, null); expect(portForwarder.forwardedHostPort, null); }); testWithoutContext('forwards the port forwarder error', () async { final portForwarder = FakeProxiedPortForwarder(); portForwarder.forwardThrowException = TestException(); final discovery = ProxiedVMServiceDiscoveryForAttach( clientDaemonConnection, 'test_device', proxiedPortForwarder: portForwarder, fallbackDiscovery: () => throw UnimplementedError(), ipv6: false, logger: bufferLogger, ); // Start listening on the stream to trigger sending the request. final Future uriFuture = discovery.uris.first; final Stream broadcastOutput = serverDaemonConnection.incomingCommands .asBroadcastStream(); final DaemonMessage startMessage = await broadcastOutput.first; expect(startMessage.data['id'], isNotNull); expect(startMessage.data['method'], 'device.startVMServiceDiscoveryForAttach'); expect(startMessage.data['params'], { 'deviceId': 'test_device', 'appId': null, 'fuchsiaModule': null, 'filterDevicePort': null, 'ipv6': false, }); serverDaemonConnection.sendResponse(startMessage.data['id']!, 'request_id'); serverDaemonConnection.sendEvent( 'device.VMServiceDiscoveryForAttach.request_id', 'http://127.0.0.1:300/auth_code', ); expect(uriFuture, throwsA(isA())); }); }); } class FakeDaemonStreams implements DaemonStreams { final inputs = StreamController(); final outputs = StreamController(); @override Stream get inputStream { return inputs.stream; } @override void send(Map message, [List? binary]) { outputs.add(DaemonMessage(message, binary != null ? Stream>.value(binary) : null)); } @override Future dispose() async { await inputs.close(); // In some tests, outputs have no listeners. We don't wait for outputs to close. unawaited(outputs.close()); } } class FakeServerSocket extends Fake implements ServerSocket { FakeServerSocket(this.port); @override final int port; bool closeCalled = false; final controller = StreamController(); @override StreamSubscription listen( void Function(Socket event)? onData, { Function? onError, void Function()? onDone, bool? cancelOnError, }) { return controller.stream.listen( onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError, ); } @override Future close() async { closeCalled = true; return this; } } class FakeSocket extends Fake implements Socket { bool closeCalled = false; final controller = StreamController(); final addedData = >[]; final doneCompleter = Completer(); @override StreamSubscription listen( void Function(Uint8List event)? onData, { Function? onError, void Function()? onDone, bool? cancelOnError, }) { return controller.stream.listen( onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError, ); } @override void add(List data) { addedData.add(data); } @override Future close() async { closeCalled = true; doneCompleter.complete(true); } @override Future get done => doneCompleter.future; @override void destroy() {} } class FakeDaemonConnection extends Fake implements DaemonConnection { FakeDaemonConnection({ this.handledRequests = const {}, this.daemonEventStreams = const >{}, }); /// Mapping of method name to returned object from the [sendRequest] method. final Map handledRequests; final Map> daemonEventStreams; @override Stream listenToEvent(String eventToListen) { final List? iterable = daemonEventStreams[eventToListen]; if (iterable != null) { return Stream.fromIterable(iterable); } return const Stream.empty(); } @override Future sendRequest(String method, [Object? params, List? binary]) async { final Object? response = handledRequests[method]; if (response != null) { return response; } throw Exception('"$method" request failed'); } } class FakeDeviceDiscoveryFilter extends Fake implements DeviceDiscoveryFilter { List? filteredDevices; List? devices; @override Future> filterDevices(List devices) async { this.devices = devices; return filteredDevices!; } } class FakeProxiedPortForwarder extends Fake implements ProxiedPortForwarder { int? originalRemotePortReturnValue; int? receivedLocalForwardedPort; Exception? forwardThrowException; int? forwardReturnValue; int? forwardedDevicePort; int? forwardedHostPort; bool? forwardedIpv6; @override int? originalRemotePort(int localForwardedPort) { receivedLocalForwardedPort = localForwardedPort; return originalRemotePortReturnValue; } @override Future forward(int devicePort, {int? hostPort, bool? ipv6}) async { forwardedDevicePort = devicePort; forwardedHostPort = hostPort; forwardedIpv6 = ipv6; if (forwardThrowException != null) { throw forwardThrowException!; } return forwardReturnValue!; } } class FakeDartDevelopmentService extends Fake implements DartDevelopmentService { bool startCalled = false; Uri? startUri; bool shutdownCalled = false; @override Future get done => _completer.future; final _completer = Completer(); @override Uri? uri; @override Future startDartDevelopmentService( Uri vmServiceUri, { FlutterDevice? device, int? ddsPort, bool? ipv6, bool? disableServiceAuthCodes, bool enableDevTools = false, bool cacheStartupProfile = false, String? google3WorkspaceRoot, Uri? devToolsServerAddress, }) async { startCalled = true; startUri = vmServiceUri; } @override Future shutdown() async => shutdownCalled = true; @override Future invokeServiceExtensions(FlutterDevice? device) async {} } class FakePrebuiltApplicationPackage extends Fake implements PrebuiltApplicationPackage { FakePrebuiltApplicationPackage(this.applicationPackage); @override final FileSystemEntity applicationPackage; } class FakeFileTransfer extends Fake implements FileTransfer { List? delta; Uint8List? binary; @override Future> computeDelta(File file, BlockHashes hashes) async => delta!; @override Future binaryForRebuilding(File file, List delta) async => binary!; } class FakeVMServiceDiscoveryForAttach extends Fake implements VMServiceDiscoveryForAttach { FakeVMServiceDiscoveryForAttach(this.uris); @override Stream uris; } class TestException implements Exception {}