from __future__ import with_statement

import datetime
import platform
import zipfile

from json import dumps
from unittest import TestCase

from abl.installer import DeltaBuilder, relpath
from abl.installer.installer import Updater
from abl.installer.xdelta import create_xdelta
from abl.vpath.base import URI

from .common import create_file


# -----------------------------------------------------------------------------------------


def make_delete_request(delta_path, rel_path):
    deleted = delta_path / "deleted" / rel_path
    deleted.directory().makedirs()

    with (deleted).open("w"):
        pass


def make_create_requests(delta_path, rel_paths):
    created = delta_path / "created"
    created.mkdir()

    for rp in rel_paths:
        ep = created / rp
        ep.directory().makedirs()
        create_file(ep, content=ep.last())


def make_create_request(delta_path, rel_path):
    make_create_requests(delta_path, [rel_path])


def make_update_request(delta_path, rel_path, content="foo"):
    updated = delta_path / "updated" / rel_path
    updated.directory().makedirs()

    create_file(updated, content=content)


# -----------------------------------------------------------------------------------------


class DeltaApplicationTests(TestCase):
    def tearDown(self):
        base = URI("memory:///")
        for fname in base.listdir():
            f = base / fname
            if f.isdir():
                for subname in f.listdir():
                    try:
                        (f / subname)._manipulate(unlock=True)
                    except:
                        pass
                    (f / subname).remove(recursive=True)
            f.remove()

    def test_file_creation(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        make_create_request(delta, "foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").exists()
        assert (dest / "foo").open().read() == "foo"

    def test_file_creation_in_subdir(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        make_create_request(delta, "sub/foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "sub" / "foo").exists()
        assert (dest / "sub" / "foo").open().read() == "foo"

    def test_file_creation_with_unexpected_file(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        create_file(orig / "foo", content="old")

        make_create_request(delta, "foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").exists()
        assert (dest / "foo").open().read() == "foo"
        assert (dest / "foo.orig").open().read() == "old"

    def test_file_creation_with_unexpected_file2(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        create_file(orig / "foo", content="old")
        create_file(orig / "foo.orig", content="evenolder")

        make_create_request(delta, "foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").exists()
        assert (dest / "foo").open().read() == "foo"
        assert (dest / "foo.orig").open().read() == "evenolder"
        assert (dest / "foo.orig-1").open().read() == "old"

    def test_file_creation_in_subdir_with_unexpected_file(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        create_file(orig / "sub" / "foo", content="old")

        make_create_request(delta, "sub/foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "sub" / "foo").exists()
        assert (dest / "sub" / "foo").open().read() == "foo"
        assert (dest / "sub" / "foo.orig").open().read() == "old"

    # -------------------------------------------------------------------------------------

    def test_file_update(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        create_file(orig / "foo", content="old")

        make_update_request(delta, "foo", content="new")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").exists()
        assert (dest / "foo").open().read() == "new"

    def test_file_update_in_subdir(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        (orig / "sub").mkdir()
        create_file(orig / "sub" / "foo", content="old")

        make_update_request(delta, "sub/foo", content="new")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "sub" / "foo").exists()
        assert (dest / "sub" / "foo").open().read() == "new"

    def test_file_update_with_missing_expected_file(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        # the original file expected is missing.  Continue nevertheless
        # create_file(orig / "foo", content="old")

        make_update_request(delta, "foo", content="new")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").exists()
        assert (dest / "foo").open().read() == "new"

    def test_file_update_in_subdir_with_missing_expected_folder(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        # the original expected file (and subfolder) is missing.  Continue nevertheless
        # (orig / "sub").mkdir()
        # create_file(orig / "sub" / "foo", content="old")

        make_update_request(delta, "sub/foo", content="new")

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "sub" / "foo").exists()
        assert (dest / "sub" / "foo").open().read() == "new"

    # -------------------------------------------------------------------------------------

    def test_file_deletion(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        create_file(orig / "foo", content="old")

        make_delete_request(delta, "foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert not (dest / "foo").exists()

    def test_directory_deletion(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        (orig / "foo").mkdir()

        make_delete_request(delta, "foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert not (dest / "foo").exists()

    def test_directory_with_content_deletion(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        (orig / "foo").mkdir()
        create_file(orig / "foo" / "bar", content="foobarbaz")

        make_delete_request(delta, "foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert not (dest / "foo").exists()

    def test_file_deletion_with_missing_expected_file(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        # the expected file foo in orig has vanished.  The update should
        # work nevertheless
        # create_file(orig / "foo")

        make_delete_request(delta, "foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert not (dest / "foo").exists()

    def test_directory_deletion_with_missing_expected_dir(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()

        # the expected file foo in orig has vanished.  The update should
        # work nevertheless
        # (orig / "foo").mkdir()

        make_delete_request(delta, "foo")

        Updater(orig, dest, None).apply_delta(delta)

        assert not (dest / "foo").exists()

    def test_files_replaced_by_directories(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        recent = base / "MoreRecentLive"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()
        recent.mkdir()

        create_file(orig / "foo")

        (recent / "foo").mkdir()
        with (recent / "foo" / "created").open("w") as outf:
            outf.write("created")

        db = DeltaBuilder(orig, recent)
        db.produce_delta(delta)

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").isdir()
        assert (dest / "foo" / "created").isfile()

    def test_directories_replaced_by_files(self):

        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        recent = base / "MoreRecentLive"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()
        recent.mkdir()

        (orig / "foo").mkdir()
        create_file(orig / "foo" / "created")

        with (recent / "foo").open("w") as outf:
            outf.write("foo")

        db = DeltaBuilder(orig, recent)
        db.produce_delta(delta)

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").isfile()

    def test_symlink_created(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        recent = base / "MoreRecentLive"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()
        recent.mkdir()

        create_file(recent / "nothing")
        (recent / "nothing").symlink(recent / "foo")
        (recent / "somedir").mkdir()
        (recent / "somedir").symlink(recent / "bar")

        db = DeltaBuilder(orig, recent)
        db.produce_delta(delta)

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").islink()
        assert (dest / "bar").islink()

    def test_symlink_deleted(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        recent = base / "MoreRecentLive"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()
        recent.mkdir()

        create_file(orig / "nothing")
        create_file(recent / "nothing")
        (orig / "nothing").symlink(orig / "foo")
        (orig / "somedir").mkdir()
        (recent / "somedir").mkdir()
        (orig / "somedir").symlink(orig / "bar")

        db = DeltaBuilder(orig, recent)
        db.produce_delta(delta)

        Updater(orig, dest, None).apply_delta(delta)

        assert not (dest / "foo").islink()
        assert not (dest / "bar").islink()
        assert (dest / "somedir").isdir()
        assert (dest / "nothing").exists()

    def test_file_replaced_by_symlink(self):
        base = URI("memory:///")
        delta = base / "delta"
        orig = base / "Live"
        recent = base / "MoreRecentLive"
        dest = base / "copy"

        delta.mkdir()
        orig.mkdir()
        recent.mkdir()

        create_file(orig / "foo")
        create_file(recent / "nothing")
        (recent / "nothing").symlink(recent / "foo")

        db = DeltaBuilder(orig, recent)
        db.produce_delta(delta)

        Updater(orig, dest, None).apply_delta(delta)

        assert (dest / "foo").islink()

    # -------------------------------------------------------------------------------------

    def test_several_deltas_are_applied_correct(self):
        base = URI("memory:///")
        orig = base / "Live"
        dest = base / "copy"

        orig.mkdir()

        (base / "d0").mkdir()
        d0 = base / "d0" / "delta"
        d0.mkdir()

        (base / "d1").mkdir()
        d1 = base / "d1" / "delta"
        d1.mkdir()

        create_file(orig / "to_delete_and_recreate")

        make_delete_request(d0, "to_delete_and_recreate")

        make_create_requests(d0, ["to_create_and_delete", "to_create_and_update"])
        make_create_request(d1, "to_delete_and_recreate")

        make_delete_request(d1, "to_create_and_delete")

        make_update_request(d1, "to_create_and_update", content="foobar")

        updater = Updater(orig, dest, None)
        updater.apply_delta(d0)
        updater.apply_delta(d1)

    # -------------------------------------------------------------------------------------

    def test_copy(self):
        base = URI("memory:///")
        orig = base / "Live"
        dest = base / "copy"

        orig.mkdir()
        contents = orig / "Contents"
        contents.mkdir()
        subcontents = contents / "Subfolder"
        subcontents.mkdir()
        with (subcontents / "foofile").open("w") as outf:
            outf.write("foo")

        updater = Updater(orig, dest, None)

        assert (dest / "Contents").exists()
        assert (dest / "Contents" / "Subfolder").exists()
        assert (dest / "Contents" / "Subfolder" / "foofile").exists()

    def test_copy_to_existing_target(self):
        base = URI("memory:///")
        orig = base / "Live"
        dest = base / "copy"

        orig.mkdir()
        contents = orig / "Contents"
        contents.mkdir()
        dest.mkdir()

        updater = Updater(orig, dest, None)

        assert (dest / "Contents").exists()

    # -------------------------------------------------------------------------------------

    def test_auto_update_preparation(self):

        base = URI("memory:///")
        orig = base / "Live"
        dest = base / "copy"
        autoupdates_dir_base_path = base / "autoupdates_dir_base_path"

        orig.mkdir()
        autoupdates_dir_base_path.mkdir()

        autoupdates_str = "_autoupdates"
        auto_updates_base = autoupdates_dir_base_path / autoupdates_str
        auto_updates_base.mkdir()

        with (orig / "to_be_removed").open("w") as outf:
            outf.write("foo")

        updater = Updater(orig, dest, autoupdates_dir_base_path)

        # _autoupdates should have been excluded from the copy
        assert not (dest / autoupdates_str).exists()

        # create some trash in the extration_base
        # to verify it's cleaned up
        extraction_base = auto_updates_base / "pending_update"
        extraction_base.mkdir()
        with (extraction_base / "rubbish").open("w") as outf:
            outf.write("foo")

        deltas = auto_updates_base / "deltas"
        deltas.mkdir()

        # create a simple test delta file
        (base / "tmp").mkdir()
        delta = base / "tmp" / "delta"
        delta.mkdir()

        make_create_request(delta, "foo")

        make_delete_request(delta, "to_be_removed")

        delta_file = deltas / "delta.asu"

        with delta_file.open("w") as outf:
            zf = zipfile.ZipFile(outf, "w")
            for root, dirs, filenames in delta.walk():
                for filename in filenames:
                    zf.writestr(
                        relpath(delta, root / filename), (root / filename).open().read()
                    )
            zf.close()

        assert delta_file.exists()

        with (auto_updates_base / "info.json").open("w") as outf:
            outf.write(
                dumps(
                    {
                        # a mapping of version from/to
                        # to the list of delta-names
                        "versions": {
                            "from": {
                                "to": {"status": "downloaded", "deltas": ["delta.asu"]}
                            }
                        }
                    }
                )
            )

        updater.prepare_auto_update("from", "to")

        assert not (extraction_base / "rubbish").exists()

        # test application

        assert (dest / "foo").exists()

        # ensure we clean up properly
        assert not (dest / "to_be_removed").exists()

    def test_auto_update_preparation_for_alps(self):
        # LinuxTODO: re-enable this test once xdelta is built for Linux
        if platform.system() == "Linux":
            return

        base = URI("memory:///")
        orig = base / "Live"
        dest = base / "copy"
        autoupdates_dir_base_path = base / "autoupdates_dir_base_path"

        orig.mkdir()
        autoupdates_dir_base_path.mkdir()

        old_alp = orig / "foo.alp"

        with old_alp.open("wb") as outf:
            outf.write("old" * 100)

        new_alp = base / "foo.alp"
        with new_alp.open("wb") as outf:
            outf.write("new" * 100)

        old_alp.info(dict(mode=12345))
        new_alp.info(dict(mode=54321))

        auto_updates_base = autoupdates_dir_base_path / "_autoupdates"
        auto_updates_base.mkdir()

        extraction_base = auto_updates_base / "pending_update"

        deltas = auto_updates_base / "deltas"
        deltas.mkdir()

        # create a simple test delta file
        (base / "tmp").mkdir()
        delta = base / "tmp" / "delta"
        delta.mkdir()

        updated = delta / "updated"
        updated.mkdir()

        alp_delta = updated / "foo.alp.xdelta"
        create_xdelta(old_alp, new_alp, alp_delta)

        alp_delta.info(dict(mode=54321))

        delta_file = deltas / "delta.asu"

        now = datetime.datetime.now()
        date_time = (now.year, now.month, now.day, now.hour, now.minute, now.second)

        with delta_file.open("w") as outf:
            zf = zipfile.ZipFile(outf, "w")
            for root, dirs, filenames in delta.walk():
                for filename in filenames:
                    source = root / filename
                    zi = zipfile.ZipInfo(
                        filename=relpath(delta, source), date_time=date_time
                    )
                    zi.compress_type = zipfile.ZIP_DEFLATED
                    zi.external_attr = source.info().mode << 16

                    with source.open("rb") as inf:
                        zf.writestr(zi, inf.read())

            zf.close()

        assert delta_file.exists()

        with (auto_updates_base / "info.json").open("w") as outf:
            outf.write(
                dumps(
                    {
                        # a mapping of version from/to
                        # to the list of delta-names
                        "versions": {
                            "from": {
                                "to": {"status": "downloaded", "deltas": ["delta.asu"]}
                            }
                        }
                    }
                )
            )

        updater = Updater(orig, dest, autoupdates_dir_base_path)
        updater.prepare_auto_update("from", "to")
