from __future__ import with_statement

import platform
import tempfile

from unittest import TestCase

from abl.installer import Differ
from abl.installer.diffing import ALPDiffer, MD5Differ, SymlinkDiffer
from abl.installer.xdelta import XDeltaApplicationError, apply_xdelta, create_xdelta
from abl.vpath.base import URI


class DiffingTests(TestCase):
    def setUp(self):
        super(DiffingTests, self).setUp()

        URI("memory:///").remove(recursive=True)

        base = URI(__file__).directory()

        db = {}
        for f in (base / "data").glob("*.pyc"):
            db[f.basename()] = f

        self.pyc_db = db

        base = URI("memory:///")
        base.remove(recursive=True)

        foo = base / "foo.bin"
        bar = base / "bar.bin"

        with foo.open("wb") as outf:
            outf.write("foo")

        with bar.open("wb") as outf:
            outf.write("bar")

        self.foo, self.bar = foo, bar

    def test_md5_diffing(self):

        foo, bar = self.foo, self.bar

        for old, new, result in [
            (foo, foo, False),
            (bar, bar, False),
            (foo, bar, True),
            (bar, foo, True),
        ]:
            assert MD5Differ(old, new).are_different() == result

    def test_symlink_diffing(self):

        base = URI("memory:///")
        old = base / "old"
        old.mkdir()

        new = base / "new"
        new.mkdir()

        delta = base / "delta"
        delta.mkdir()

        old_link = old / "foo"
        new_link = new / "foo"

        (base / "zot").symlink(old_link)
        (base / "bepis").symlink(new_link)

        differ = SymlinkDiffer(old_link, new_link)
        assert differ.are_different()
        outp = delta / "foo"
        differ.delta_file_into(outp)
        assert outp.islink()
        assert outp.readlink() == base / "bepis"

    def test_alp_diffing(self):
        # LinuxTODO: re-enable this test once xdelta is built for Linux
        if platform.system() == "Linux":
            return

        base = URI("memory:///")
        old = base / "old"
        old.mkdir()

        new = base / "new"
        new.mkdir()

        delta = base / "delta"
        delta.mkdir()

        old_alp = old / "foo.alp"
        new_alp = new / "foo.alp"

        with old_alp.open("wb") as outf:
            outf.write("oldfoobarbaz" * 1000)

        with new_alp.open("wb") as outf:
            outf.write("newfoobarbaz" * 1000)

        differ = ALPDiffer(old_alp, new_alp)
        assert differ.are_different()
        differ.delta_file_into(delta / "foo.alp")
        # we don't create what we ask for, but
        # instead an xdelta
        assert not (delta / "foo.alp").exists()
        assert (delta / "foo.alp.xdelta").exists()

    def test_correct_differ_is_chosen(self):
        base = URI("memory:///")

        old = base / "old"
        old.mkdir()

        new = base / "new"
        new.mkdir()

        pyc_content = self.pyc_db["foo_py25_mac.pyc"].open("rb").read()

        for d in (old, new):
            with (d / "foo.bin").open("wb") as outf:
                outf.write("foo")
            with (d / "bar.pyc").open("wb") as outf:
                outf.write(pyc_content)
            foo = d / "foo.bin"
            foo.symlink(d / "link1")

        differ = Differ.select_differ(old / "foo.bin", new / "foo.bin", True)

        assert isinstance(differ, MD5Differ)

        differ = Differ.select_differ(old / "bar.pyc", new / "bar.pyc", True)

        assert isinstance(differ, MD5Differ)

        differ = Differ.select_differ(old / "bar.alp", new / "bar.alp", True)

        assert isinstance(differ, ALPDiffer)

        self.failUnlessRaises(
            Differ.BasenameMismatch,
            Differ.select_differ,
            old / "foo.bin",
            new / "bar.pyc",
            True,
        )

        # when we don't allow
        # xdelta, MD5Differ must be used
        differ = Differ.select_differ(old / "bar.alp", new / "bar.alp", False)

        assert isinstance(differ, MD5Differ)

        differ = Differ.select_differ(old / "link1", new / "link1", True)

        assert isinstance(differ, SymlinkDiffer)

    def test_xdelta(self):
        # LinuxTODO: re-enable this test once xdelta is built for Linux
        if platform.system() == "Linux":
            return

        old_content = "old" * 100
        new_content = "new" * 100

        def test(old, new, xdelta, patched):
            with old.open("wb") as outf:
                outf.write(old_content)

            with new.open("wb") as outf:
                outf.write(new_content)

            try:
                create_xdelta(old, new, xdelta)
                apply_xdelta(old, xdelta, patched)
                with patched.open("rb") as inf:
                    patched_content = inf.read()
                self.assertEqual(patched_content, new_content)
            finally:
                if old.exists():
                    old.remove()
                if new.exists():
                    new.remove()
                if xdelta.exists():
                    xdelta.remove()
                if patched.exists():
                    patched.remove()

        # for testing-purposes, the xdelta
        # stuff must work with memory-files
        # as well. It does so by creating temporary
        # files. Which sucks, but it's better than
        # having to make all tests with the FS.
        old = URI("memory:///old")
        new = URI("memory:///new")
        xdelta = URI("memory:///xdelta")
        patched = URI("memory:///patched")

        test(old, new, xdelta, patched)

        old = URI(tempfile.mktemp())
        new = URI(tempfile.mktemp())
        xdelta = URI(tempfile.mktemp())
        patched = URI(tempfile.mktemp())
        test(old, new, xdelta, patched)

    def test_xdelta_only_applied_when_old_is_matching(self):
        # LinuxTODO: re-enable this test once xdelta is built for Linux
        if platform.system() == "Linux":
            return

        old_content = "old" * 100
        new_content = old_content[:200] + "new" + old_content[200:]

        old = URI("memory:///old")
        new = URI("memory:///new")
        xdelta = URI("memory:///xdelta")
        patched = URI("memory:///patched")
        wrong_old = URI("memory:///wrong_old")

        with old.open("wb") as outf:
            outf.write(old_content)

        with new.open("wb") as outf:
            outf.write(new_content)

        with wrong_old.open("wb") as outf:
            outf.write("foo")

        create_xdelta(old, new, xdelta)

        self.failUnlessRaises(
            XDeltaApplicationError, apply_xdelta, wrong_old, xdelta, patched
        )

    def test_flagdiff(self):
        # flagdiffing is base-class behaviour

        # works for md5

        base = URI("memory:///")

        old = base / "old"
        old.mkdir()

        new = base / "new"
        new.mkdir()

        foo_old = old / "foo"
        foo_new = new / "foo"
        with foo_old.open("wb") as outf:
            outf.write("foo")

        with foo_new.open("wb") as outf:
            outf.write("foo")

        differ = Differ.select_differ(foo_old, foo_new, False)

        assert not differ.are_different()
        foo_new.info(dict(mode=foo_old.info().mode + 1))
        assert differ.are_different()

        # check that alp-differ delegates to super class for
        # flag diffs

        alp_old = old / "foo.alp"
        alp_new = new / "foo.alp"
        with alp_old.open("wb") as outf:
            outf.write("alp")

        with alp_new.open("wb") as outf:
            outf.write("alp")

        differ = Differ.select_differ(alp_old, alp_new, True)

        assert isinstance(differ, ALPDiffer)

        assert not differ.are_different()
        alp_new.info(dict(mode=alp_old.info().mode + 1))
        assert differ.are_different()
