from __future__ import with_statement

import sys

from unittest import TestCase

import mox
import xmlrpclib

from abl.util import Bunch
from abl.vpath.base import URI
from abl.webconnector import (
    CRASHLOG_EXT,
    DirectoryConnector,
    RPCConnector,
    check_and_process_auto_updates,
)
from abl.webconnector.connector import ConnectorBase, create_copy_app_base_path

from .common import FakeTransport, FakeUsageReporter


class ConnectorBaseTests(TestCase):
    def tearDown(self):
        base = URI("memory:///")
        for fname in base.listdir():
            f = base / fname
            f.remove(recursive=True)

    def test_saving_update_info_removes_temp(self):
        app_base_path = URI("memory:///app_base_path")
        app_base_path.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()

        connector = ConnectorBase(
            Bunch(
                app_base_path=app_base_path,
                autoupdates_dir_base_path=autoupdates_dir_base_path,
            )
        )

        info_file_name = connector.info_file_name

        with (info_file_name + ".temp").open("wb"):
            pass

        connector.save_update_info({})

        assert not (info_file_name + ".temp").exists()

    def test_running_live_check(self):
        app_base_path = URI("memory:///app_base_path")
        app_base_path.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()

        connector = ConnectorBase(
            Bunch(
                app_base_path=app_base_path,
                autoupdates_dir_base_path=autoupdates_dir_base_path,
                app_exe_path=sys.executable,
            )
        )
        assert connector.check_live_is_running(), (
            "if failing when run locally, make sure you use the full path to the sys.executable to run the tests, not something relative: %s"
            % sys.executable
        )

    def test_create_copy_app_path(self):
        base = URI("memory:///")
        install_name = "Ableton Live"
        base_path = base / install_name / "a" / "b" / "c" / (install_name + ".app")
        test = "." + install_name + "_updated.app"

        copy_path = create_copy_app_base_path(base_path)
        assert copy_path.directory().parse_result == base_path.directory().parse_result
        assert copy_path.last() == test

    def test_create_copy_path(self):
        base = URI("memory:///")
        install_name = "Ableton Live"
        base_path = base / install_name / "a" / "b" / "c" / install_name
        test = "." + install_name + "_updated"

        copy_path = create_copy_app_base_path(base_path)
        assert copy_path.directory().parse_result == base_path.directory().parse_result
        assert copy_path.last() == test


class RPCConnectorTests(TestCase):
    def test_endpoint_slash_tolerance(self):
        base = URI("memory:///app_base_path")
        base.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()

        mocker = mox.Mox()
        mock_conn = mocker.CreateMockAnything()
        mock_conn.check_for_auto_updates(["win64"], "from", None).AndReturn({})

        mock_conn.upload_event_log(mox.IsA(xmlrpclib.Binary)).AndReturn(1)

        mocker.ReplayAll()

        connector = RPCConnector(
            Bunch(
                app_base_path=base, autoupdates_dir_base_path=autoupdates_dir_base_path
            ),
            endpoint_prefix="http://endpoint/",
            transport=FakeTransport(mock_conn),
        )

        # test a software and reports callback
        fake_reporter = FakeUsageReporter()
        connector.check_and_process_auto_updates(["win64"], "from", fake_reporter)

        connector.upload_event_log("foobar")

        mocker.VerifyAll()

        def check():
            proxies = connector.proxies
            for ep in ["software", "reports"]:
                assert ep in proxies
                assert proxies[ep].__dict__["_ServerProxy__handler"] == "/" + ep

        check()

        mocker = mox.Mox()
        mock_conn = mocker.CreateMockAnything()
        mock_conn.check_for_auto_updates(["win64"], "from", None).AndReturn({})

        mock_conn.upload_event_log(mox.IsA(xmlrpclib.Binary)).AndReturn(1)

        mocker.ReplayAll()

        connector = RPCConnector(
            Bunch(
                app_base_path=base, autoupdates_dir_base_path=autoupdates_dir_base_path
            ),
            endpoint_prefix="http://endpoint",
            transport=FakeTransport(mock_conn),
        )

        # test a software and reports callback

        connector.check_and_process_auto_updates(["win64"], "from", fake_reporter)

        connector.upload_event_log("foobar")

        mocker.VerifyAll()


class DirectoryConnectorTests(TestCase):
    def tearDown(self):
        base = URI("memory:///")
        for fname in base.listdir():
            f = base / fname
            f.remove(recursive=True)

    def test_check_for_auto_updates_is_empty(self):
        base = URI("memory:///app_base_path")
        base.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()
        connector = DirectoryConnector(
            Bunch(
                app_base_path=str(base),
                autoupdates_dir_base_path=autoupdates_dir_base_path,
            ),
            base,
        )
        deltas = connector.check_for_auto_updates(["mac-intel"], "from", None)

        self.assertEqual(deltas, {})

    def test_os_tags_work(self):
        base = URI("memory:///app_base_path")
        base.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()

        (base / "from").mkdir()
        h = base / "from" / "to"
        h.mkdir()

        testcases = [
            ("delta_win_64.asu", ["win64"]),
            ("delta_mac_intel.asu", ["mac-intel"]),
            ("delta_lite_mac_intel.asu", ["mac-intel", "lite"]),
            ("delta_trial_mac_intel.asu", ["mac-intel", "trial"]),
        ]
        for name, _ in testcases:
            with (h / name).open("wb") as outf:
                outf.write(name)

        connector = DirectoryConnector(
            Bunch(
                app_base_path=str(base),
                autoupdates_dir_base_path=autoupdates_dir_base_path,
            ),
            base,
        )

        for _, tags in testcases:
            deltas = connector.check_for_auto_updates(tags, "from", None)
            self.assertEqual(len(deltas), 1)

    def test_order_is_based_on_filenames(self):
        base = URI("memory:///app_base_path")
        base.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()

        (base / "from").mkdir()
        h = base / "from" / "to"
        h.mkdir()

        testcases = ["b_delta_win_64.asu", "a_delta_win_64.asu"]
        for name in testcases:
            with (h / name).open("wb") as outf:
                outf.write(name)

        connector = DirectoryConnector(
            Bunch(
                app_base_path=str(base),
                autoupdates_dir_base_path=autoupdates_dir_base_path,
            ),
            base,
        )

        deltas = connector.check_for_auto_updates(["win64"], "from", None)
        self.assertEqual(len(deltas["to"]), 2)
        self.assertEqual([d["name"] for d in deltas["to"]], list(reversed(testcases)))

    def test_delta_retrieval(self):
        base = URI("memory:///app_base_path")
        base.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()

        (base / "from").mkdir()
        h = base / "from" / "to"
        h.mkdir()

        testcases = ["delta_win_64.asu"]
        for name in testcases:
            with (h / name).open("wb") as outf:
                outf.write(name)

        connector = DirectoryConnector(
            Bunch(
                app_base_path=str(base),
                autoupdates_dir_base_path=autoupdates_dir_base_path,
            ),
            base,
        )

        deltas = connector.check_for_auto_updates(["win64"], "from", None)
        self.assertEqual(len(deltas.keys()), 1)
        self.assertEqual(len(deltas["to"]), 1)

        d = deltas["to"][0]
        name, md5, version = d["name"], d["checksum"], d["version"]
        self.assertEqual(version, 1)

        delta_dest = base / "delta.asu.part"
        checksum = connector.retrieve_delta_into(name, delta_dest)

        self.assertEqual(checksum, md5)

    def test_event_log_update(self):
        base = URI("memory:///app_base_path")
        base.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()
        connector = DirectoryConnector(
            Bunch(
                app_base_path=str(base),
                autoupdates_dir_base_path=autoupdates_dir_base_path,
            ),
            base,
        )
        run_id = connector.upload_event_log("event_log")
        connector.upload_log(run_id, CRASHLOG_EXT[1:], "crash_log")

        assert (base / "eventlogs").exists()
        assert (base / "eventlogs" / str(run_id)).exists()
        assert (base / "eventlogs" / str(run_id) / "event_log").exists()
        assert (base / "eventlogs" / str(run_id) / "crashlog_log").exists()
        assert (
            base / "eventlogs" / str(run_id) / "event_log"
        ).open().read() == "event_log"
        assert (
            base / "eventlogs" / str(run_id) / "crashlog_log"
        ).open().read() == "crash_log"

    def test_full_auto_update_integration(self):
        app_base_path = URI("memory:///app_base_path")
        app_base_path.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()

        dc_base = URI("memory:///dc_base")
        dc_base.mkdir()

        (dc_base / "from").mkdir()
        h = dc_base / "from" / "to"
        h.mkdir()

        testcases = ["delta_win_64.asu"]
        for name in testcases:
            with (h / name).open("wb") as outf:
                outf.write(name)

        tags = ["win64"]
        version = "from"

        config = Bunch(
            app_base_path=str(app_base_path),
            autoupdates_dir_base_path=str(autoupdates_dir_base_path),
            auto_updater_exe="auto_updater_exe",
            auto_updater_main="auto_updater_main.py",
        )

        captured_commands = []

        def capture_command(*args):
            captured_commands.append(args)

            class MockPopen(object):
                def communicate(self):
                    return None, None

                def poll(self):
                    return None

            return MockPopen()

        connector = DirectoryConnector(config, dc_base, popen_factory=capture_command)

        fake_reporter = FakeUsageReporter()
        res = check_and_process_auto_updates(connector, tags, version, fake_reporter)

        # we expect a DownloadTask
        # and a VerifyUpdateTask
        self.assertEqual(len(res), 2)
        for task in res:
            task.perform(fake_reporter)

        update_info = connector.load_update_info()
        assert update_info["versions"]["from"]["to"]["status"]

    def test_incomplete_dir_works(self):
        base = URI("memory:///app_base_path")
        base.mkdir()
        autoupdates_dir_base_path = URI("memory:///autoupdates_dir_base_path")
        autoupdates_dir_base_path.mkdir()

        def check():
            connector = DirectoryConnector(
                Bunch(
                    app_base_path=str(base),
                    autoupdates_dir_base_path=autoupdates_dir_base_path,
                ),
                base,
            )
            return connector.check_for_auto_updates(["win64"], "from", None)

        deltas = check()
        assert deltas == {}

        (base / "from").mkdir()

        deltas = check()
        assert deltas == {}

        (base / "from" / "to").mkdir()

        deltas = check()
        assert deltas == {}

        with (base / "from" / "to" / "delta.asu").open("wb") as outf:
            outf.write("foobar")

        deltas = check()
        assert deltas == {}

        with (base / "from" / "to" / "delta_win_64.asu").open("wb") as outf:
            outf.write("foobar")

        # finally, we put something downloadable there..o

        deltas = check()
        assert deltas != {}
