from __future__ import with_statement

import operator
import tempfile
import time

from unittest import TestCase
from zipfile import ZIP_STORED, ZipFile

from abl.installer import DeltaBuilder
from abl.installer.delta import CreateAction, CreateDirAction, DeleteAction, UpdateAction
from abl.installer.delta_creator import delta_creator
from abl.vpath.base import URI


class DeltaCreationTests(TestCase):
    def tearDown(self):
        base = URI("memory:///")
        for fname in base.listdir():
            f = base / fname
            if f.isdir():
                for subname in f.listdir():
                    try:
                        (f / subname)._manipulate(unlock=True)
                    except:
                        pass
                    (f / subname).remove(recursive=True)
            f.remove()

    def test_file_creation(self):

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        with (new / "foo").open("w") as outf:
            outf.write("foo")

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)
        self.assertEqual(len(actions), 1)

        action = actions[0]
        assert isinstance(action, CreateAction)

        # now perform the action on the delta
        # directory that eventually gets zipped
        # and comprises the delta-file

        dest = base / "dest"
        dest.mkdir()

        action.perform(dest)

        assert (dest / "created").exists()
        assert (dest / "created" / "foo").exists()
        assert (dest / "created" / "foo").open().read() == "foo"

    def test_file_mode_preservation(self):

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()
        subdir = new / "subdir"
        subdir.mkdir()

        with (new / "foo").open("w") as outf:
            outf.write("foo")
        with (subdir / "bar").open("w") as outf:
            outf.write("bar")

        mode = 11111

        (new / "foo").info(dict(mode=mode))
        (subdir / "bar").info(dict(mode=mode))

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)

        dest = base / "dest"
        dest.mkdir()

        for action in actions:
            action.perform(dest)

        assert (dest / "created").exists()
        assert (dest / "created" / "foo").exists()
        assert (dest / "created" / "foo").open().read() == "foo"
        self.assertEqual((dest / "created" / "foo").info().mode, mode)
        assert (dest / "created" / "subdir" / "bar").exists()
        assert (dest / "created" / "subdir" / "bar").open().read() == "bar"
        self.assertEqual((dest / "created" / "subdir" / "bar").info().mode, mode)

    def test_file_update(self):

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        with (old / "foo").open("w") as outf:
            outf.write("old")

        with (new / "foo").open("w") as outf:
            outf.write("new")

        with (old / "bar").open("w") as outf:
            outf.write("bar")

        with (new / "bar").open("w") as outf:
            outf.write("bar")

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)
        self.assertEqual(len(actions), 1)

        action = actions[0]
        assert isinstance(action, UpdateAction)

        # now perform the action on the delta
        # directory that eventually gets zipped
        # and comprises the delta-file

        dest = base / "dest"
        dest.mkdir()

        action.perform(dest)

        assert (dest / "updated").exists()
        assert (dest / "updated" / "foo").exists()
        assert (dest / "updated" / "foo").open().read() == "new"

    def test_file_deletion(self):

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        with (old / "foo").open("w") as outf:
            outf.write("old")

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)
        self.assertEqual(len(actions), 1)

        action = actions[0]
        assert isinstance(action, DeleteAction)

        # now perform the action on the delta
        # directory that eventually gets zipped
        # and comprises the delta-file

        dest = base / "dest"
        dest.mkdir()

        action.perform(dest)

        assert (dest / "deleted").exists()
        assert (dest / "deleted" / "foo").exists()
        assert (dest / "deleted" / "foo").info().size == 0

    def test_two_file_deletion(self):
        # this was an error because
        # memory:// behaving different to file://
        # so I created this test to ensure it doesn't
        # fail after I fixed the behavior of memory://

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        (old / "sub").mkdir()
        # we just need an empty dir to provoke
        # two delete actions
        (new / "sub").mkdir()

        with (old / "sub" / "foo").open("w") as outf:
            outf.write("foo")

        with (old / "sub" / "bar").open("w") as outf:
            outf.write("bar")

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)
        self.assertEqual(len(actions), 2)

        dest = base / "dest"
        dest.mkdir()

        for action in actions:
            action.perform(dest)

        assert (dest / "deleted").exists()
        assert (dest / "deleted" / "sub" / "foo").exists()
        assert (dest / "deleted" / "sub" / "foo").isfile()
        assert (dest / "deleted" / "sub" / "bar").exists()
        assert (dest / "deleted" / "sub" / "bar").isfile()

    def test_directory_creation(self):

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        (new / "foo").mkdir()
        with (new / "foo" / "bar").open("w") as outf:
            outf.write("bar")

        (new / "foo" / "baz").mkdir()

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)

        dest = base / "dest"
        dest.mkdir()

        for action in actions:
            action.perform(dest)

        assert (dest / "created").exists()
        assert (dest / "created" / "foo").exists()
        assert (dest / "created" / "foo").isdir()

        assert (dest / "created" / "foo" / "bar").exists()
        assert (dest / "created" / "foo" / "bar").open().read() == "bar"

        assert (dest / "created" / "foo" / "baz").exists()
        assert (dest / "created" / "foo" / "baz").isdir()

    def test_nested_directory_creation(self):
        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        (old / "foo").mkdir()

        new = base / "new"
        new.mkdir()

        (new / "foo").mkdir()
        (new / "foo" / "bar").mkdir()
        with (new / "foo" / "bar" / "baz").open("w") as outf:
            outf.write("bar")

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)

        dest = base / "dest"
        dest.mkdir()

        for action in actions:
            action.perform(dest)

        assert (dest / "created").exists()
        assert (dest / "created" / "foo").exists()
        assert (dest / "created" / "foo").isdir()

        assert (dest / "created" / "foo" / "bar").exists()
        assert (dest / "created" / "foo" / "bar").isdir()

        assert (dest / "created" / "foo" / "bar" / "baz").exists()
        assert (dest / "created" / "foo" / "bar" / "baz").open().read() == "bar"

    def test_directory_deletion(self):

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        (old / "foo").mkdir()

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)
        self.assertEqual(len(actions), 1)

        action = actions[0]
        assert isinstance(action, DeleteAction)

        # now perform the action on the delta
        # directory that eventually gets zipped
        # and comprises the delta-file

        dest = base / "dest"
        dest.mkdir()

        action.perform(dest)

        assert (dest / "deleted").exists()
        assert (dest / "deleted" / "foo").exists()
        assert (dest / "deleted" / "foo").isfile()
        assert (dest / "deleted" / "foo").info().size == 0

    def test_recursion_into_directories(self):

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        (old / "sub").mkdir()
        (new / "sub").mkdir()

        with (old / "sub" / "deleted").open("w"):
            pass

        with (new / "sub" / "created").open("w") as outf:
            outf.write("created")

        with (old / "sub" / "updated").open("w") as outf:
            outf.write("old")

        with (new / "sub" / "updated").open("w") as outf:
            outf.write("new")

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)
        self.assertEqual(len(actions), 3)

        dest = base / "dest"
        dest.mkdir()

        for action in actions:
            action.perform(dest)

        assert (dest / "created" / "sub").exists()
        assert (dest / "created" / "sub" / "created").exists()
        assert (dest / "created" / "sub" / "created").open().read() == "created"

        assert (dest / "updated" / "sub").exists()
        assert (dest / "updated" / "sub" / "updated").exists()
        assert (dest / "updated" / "sub" / "updated").open().read() == "new"

        assert (dest / "deleted" / "sub").exists()
        assert (dest / "deleted" / "sub" / "deleted").exists()

    # the next two cases show that they are split into
    # two parts - deletion of a directory/file,
    # and creation of a directory/file.
    #
    # This means that for this to work eventually,
    # the order of application of a delta is important - first,
    # deletion needs to happen. Then creation, then updating

    def test_files_replaced_by_diretories(self):
        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        with (old / "foo").open("w") as outf:
            outf.write("foo")

        (new / "foo").mkdir()
        with (new / "foo" / "file").open("w") as outf:
            outf.write("file")

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)

        dest = base / "dest"
        dest.mkdir()

        for action in actions:
            action.perform(dest)

        assert (dest / "deleted" / "foo").exists()
        assert (dest / "created" / "foo").exists()
        assert (dest / "created" / "foo").isdir()

    def test_directories_replaced_by_files(self):
        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        (old / "foo").mkdir()
        with (old / "foo" / "file").open("w") as outf:
            outf.write("file")

        with (new / "foo").open("w") as outf:
            outf.write("foo")

        db = DeltaBuilder(old, new)
        actions = db.compute_action_list(True)

        dest = base / "dest"
        dest.mkdir()

        for action in actions:
            action.perform(dest)

        assert (dest / "deleted" / "foo").exists()
        assert (dest / "created" / "foo").exists()
        assert (dest / "created" / "foo").isfile()
        assert (dest / "created" / "foo").open().read() == "foo"

    def test_full_creation_process(self):

        base = URI("memory://")

        old = base / "old"
        old.mkdir()
        new = base / "new"
        new.mkdir()

        (old / "sub").mkdir()
        (new / "sub").mkdir()

        with (old / "sub" / "deleted").open("w"):
            pass

        with (new / "sub" / "created").open("w") as outf:
            outf.write("created")

        with (old / "sub" / "updated").open("w") as outf:
            outf.write("old")

        with (new / "sub" / "updated").open("w") as outf:
            outf.write("new")

        (old / "sub").symlink(old / "deleted_dir_symlink")
        (old / "sub" / "deleted").symlink(old / "deleted_file_symlink")
        (new / "sub").symlink(new / "dir_symlink")
        (new / "sub" / "created").symlink(new / "file_symlink")

        dest = base / "dest"
        dest.mkdir()

        db = DeltaBuilder(old, new)
        db.produce_delta(dest)

        assert (dest / "created" / "sub").exists()
        assert (dest / "created" / "sub" / "created").exists()
        assert (dest / "created" / "sub" / "created").open().read() == "created"

        assert (dest / "updated" / "sub").exists()
        assert (dest / "updated" / "sub" / "updated").exists()
        assert (dest / "updated" / "sub" / "updated").open().read() == "new"

        assert (dest / "deleted" / "sub").exists()
        assert (dest / "deleted" / "sub" / "deleted").exists()

        assert (dest / "deleted" / "deleted_dir_symlink").exists()
        assert (dest / "deleted" / "deleted_file_symlink").exists()
        assert (dest / "created" / "dir_symlink").islink()
        assert (dest / "created" / "file_symlink").islink()
        # Note: it may seem weird that these are in the "new" dir, but in reality
        # symlinks are relative and will point to the right place:
        assert (dest / "created" / "dir_symlink").readlink() == (new / "sub")
        assert (dest / "created" / "file_symlink").readlink() == (new / "sub" / "created")

    def test_delta_creator(self):

        delta_filename = tempfile.mktemp()
        df = URI(delta_filename)
        base = URI(tempfile.mkdtemp())
        try:

            old = base / "old"
            old.mkdir()
            new = base / "new"
            new.mkdir()

            (old / "sub").mkdir()
            (new / "sub").mkdir()

            with (old / "sub" / "updated.alp").open("w") as outf:
                outf.write("old")

            with (new / "sub" / "updated.alp").open("w") as outf:
                outf.write("new")

            args = ["-f", str(old), "-t", str(new), "--name=%s" % delta_filename]
            delta_creator(args)
            assert df.exists()
            zipfile = ZipFile(delta_filename, "r")
            assert zipfile.infolist()
            for info in zipfile.infolist():
                self.assertEqual(info.compress_type, ZIP_STORED)
            zipfile.close()

        finally:
            base.remove(recursive=True)

            if df.exists():
                df.remove()
