#!/usr/bin/env python3

from __future__ import annotations

import argparse
import json
import os
import platform
import re
import shutil
import subprocess
import sys
from dataclasses import dataclass
from hashlib import sha1
from pathlib import Path


DEFAULT_TYPES = ("prod", "optional")
NODE_GYP_SPECS = ("node-gyp/bin/node-gyp.js", "@electron/node-gyp/bin/node-gyp.js")
OWL_BUILD_STAMP = ".owl-native-module-build.json"
OWL_CHROMIUM_SRC_ENV = "OWL_CHROMIUM_SRC"
OPENAI_CREDENTIAL_ENV_NAMES = ("OPENAI_API_KEY", "OPENAI_KEY")
SUPPORTED_VISUAL_STUDIO_MAJOR_VERSIONS = (18, 17)
VISUAL_STUDIO_VC_TOOLS_COMPONENT = "Microsoft.VisualStudio.Component.VC.Tools.x86.x64"

RED_TEXT = "\033[31m"
YELLOW_TEXT = "\033[33m"
NORMAL_TEXT = "\033[0m"


@dataclass(frozen=True)
class NativeModuleTarget:
    source_dir: Path
    install_dir: Path
    package_json: dict

    @property
    def name(self) -> str:
        return self.package_json.get("name", self.source_dir.name)


@dataclass(frozen=True)
class NapiPrebuildTarget(NativeModuleTarget):
    prebuilds: tuple[Path, ...]


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description=(
            "Rebuild native Node modules for a local Electron app against Owl's "
            "embedded Node headers."
        )
    )
    parser.add_argument(
        "--app-dir",
        type=Path,
        required=True,
        help="Path to the Electron app package directory or .asar archive.",
    )
    parser.add_argument(
        "--source-app-dir",
        type=Path,
        default=None,
        help=(
            "Optional source Electron app workspace directory to use when --app-dir "
            "points at a packaged .asar. Defaults to inferring a nearby workspace."
        ),
    )
    parser.add_argument(
        "--out-dir",
        type=Path,
        default=None,
        help="Owl output directory. Defaults to <detected Chromium src>/out/owl.",
    )
    parser.add_argument(
        "--types",
        default="prod,optional",
        help='Comma-separated dependency types to rebuild: "prod", "optional", "dev".',
    )
    parser.add_argument(
        "--module",
        action="append",
        default=[],
        help="Specific package name to rebuild. May be passed multiple times.",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Print the discovered rebuild targets and whether they would rebuild.",
    )
    parser.add_argument(
        "--force",
        action="store_true",
        help=(
            "Rebuild matching native modules even if existing artifacts already "
            "match Owl's ABI. N-API prebuilds are still used as-is."
        ),
    )
    return parser.parse_args()


def run(
    cmd: list[str], *, cwd: Path | None = None, env: dict[str, str] | None = None
) -> None:
    print("+", " ".join(cmd))
    subprocess.run(cmd, cwd=cwd, env=env, check=True)


def run_capture(cmd: list[str], *, cwd: Path | None = None) -> str:
    return subprocess.check_output(cmd, cwd=cwd, text=True).strip()


# Native build tools may log their environment in verbose mode, so keep
# application credentials out of the toolchain environment.
def native_build_environment(env: dict[str, str]) -> dict[str, str]:
    return {
        name: value
        for name, value in env.items()
        if name.upper() not in OPENAI_CREDENTIAL_ENV_NAMES
    }


def read_json(path: Path) -> dict:
    return json.loads(path.read_text(encoding="utf-8"))


def parse_types(raw: str) -> tuple[str, ...]:
    requested = tuple(part.strip() for part in raw.split(",") if part.strip())
    if not requested:
        raise SystemExit("Expected at least one dependency type in --types.")
    allowed = {"prod", "optional", "dev"}
    invalid = [value for value in requested if value not in allowed]
    if invalid:
        raise SystemExit(f"Unsupported dependency types: {', '.join(invalid)}")
    return requested


def strip_sanitizer_flag(value: str, flag_pattern: str) -> str:
    stripped = re.sub(flag_pattern, " ", value)
    return " ".join(stripped.split())


def strip_address_sanitizer_flags(env: dict[str, str]) -> None:
    # Native Node addons loaded with dlopen() must not inherit ASan flags from
    # the parent Owl build environment on macOS.
    patterns = {
        "CFLAGS": r"(^|\s)-fsanitize=address(?=\s|$)",
        "CXXFLAGS": r"(^|\s)-fsanitize=address(?=\s|$)",
        "CPPFLAGS": r"(^|\s)-fsanitize=address(?=\s|$)",
        "LDFLAGS": r"(^|\s)-fsanitize=address(?=\s|$)",
        "CL": r"(^|\s)/fsanitize=address(?=\s|$)",
        "LINK": r"(^|\s)/fsanitize=address(?=\s|$)",
        "_CL_": r"(^|\s)/fsanitize=address(?=\s|$)",
    }
    for key, pattern in patterns.items():
        value = env.get(key)
        if not value:
            continue
        stripped = strip_sanitizer_flag(value, pattern)
        if stripped:
            env[key] = stripped
        else:
            env.pop(key, None)


def is_asar_path(path: Path) -> bool:
    return path.suffix == ".asar"


def validate_app_path(app_path: Path) -> None:
    if app_path.is_dir():
        if not (app_path / "package.json").is_file():
            raise SystemExit(f"Missing package.json in app dir: {app_path}")
        return
    if is_asar_path(app_path) and app_path.is_file():
        return
    raise SystemExit(f"Missing app dir or .asar archive: {app_path}")


def unpacked_app_dir_for_asar(app_path: Path) -> Path:
    return app_path.parent / f"{app_path.name}.unpacked"


def find_source_app_dir_for_asar(app_path: Path) -> Path | None:
    for candidate in app_path.parents:
        if (candidate / "package.json").is_file() and (candidate / "node_modules").is_dir():
            return candidate.resolve()
    return None


def parse_windows_installation_version(raw: object) -> tuple[int, ...]:
    if not isinstance(raw, str):
        return ()
    match = re.match(r"(\d+)\.(\d+)\.(\d+)\.(\d+)", raw)
    if match is None:
        return ()
    return tuple(int(part) for part in match.groups())


def query_windows_vswhere(vswhere: Path, *, requires: str | None = None) -> list[dict]:
    # Include all product types so VS 2022 Build Tools are considered even
    # when a newer full Visual Studio install is the default vswhere result.
    cmd = [str(vswhere), "-all", "-products", "*"]
    if requires is not None:
        cmd.extend(["-requires", requires])
    cmd.extend(["-format", "json"])

    try:
        raw = run_capture(cmd)
    except subprocess.CalledProcessError as exc:
        raise RuntimeError(
            f"Failed to query Visual Studio Installer with {vswhere}"
        ) from exc

    if not raw:
        return []

    try:
        instances = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise RuntimeError(
            f"Failed to parse Visual Studio Installer output from {vswhere}"
        ) from exc

    if not isinstance(instances, list):
        instances = [instances]
    return [instance for instance in instances if isinstance(instance, dict)]


def windows_supported_vs_instances(
    vswhere: Path, *, requires: str | None = None
) -> list[dict]:
    instances = []
    for instance in query_windows_vswhere(vswhere, requires=requires):
        install_path = instance.get("installationPath")
        version = parse_windows_installation_version(instance.get("installationVersion"))
        if (
            install_path
            and version
            and version[0] in SUPPORTED_VISUAL_STUDIO_MAJOR_VERSIONS
        ):
            instances.append(instance)
    return sorted(
        instances,
        key=lambda instance: parse_windows_installation_version(
            instance.get("installationVersion")
        ),
        reverse=True,
    )


def detected_instance_paths(instances: list[dict]) -> str:
    paths = [str(instance.get("installationPath")) for instance in instances]
    paths = [path for path in paths if path]
    if not paths:
        return "  - <no installation path reported>"
    return "\n".join(f"  - {path}" for path in paths)


def env_value(env: dict[str, str], name: str) -> str | None:
    value = env.get(name)
    if value is not None:
        return value

    lower_name = name.lower()
    for key, value in env.items():
        if key.lower() == lower_name:
            return value
    return None


def windows_gyp_msvs_version(env: dict[str, str]) -> str:
    visual_studio_version = env_value(env, "VisualStudioVersion") or ""
    major_version = visual_studio_version.partition(".")[0]
    return {"17": "2022", "18": "2026"}.get(major_version, "2022")


def find_windows_vcvarsall_candidates() -> list[Path]:
    if sys.platform != "win32":
        return []

    program_files_x86 = os.environ.get("ProgramFiles(x86)")
    if not program_files_x86:
        raise RuntimeError(
            f"{RED_TEXT}Native module rebuilds require Visual Studio 2022 or 2026.{NORMAL_TEXT}\n"
            "ProgramFiles(x86) is not set, so vswhere.exe could not be located."
        )

    vswhere = (
        Path(program_files_x86)
        / "Microsoft Visual Studio"
        / "Installer"
        / "vswhere.exe"
    )
    if not vswhere.is_file():
        raise RuntimeError(
            f"{RED_TEXT}Native module rebuilds require Visual Studio 2022 or 2026.{NORMAL_TEXT}\n"
            f"Could not find Visual Studio Installer query tool at {vswhere}."
        )

    supported_instances = windows_supported_vs_instances(vswhere)
    if not supported_instances:
        raise RuntimeError(
            f"{RED_TEXT}Native module rebuilds require Visual Studio 2022 or 2026.{NORMAL_TEXT}\n"
            "👉 Install Visual Studio or Build Tools with Desktop Development With C++.\n"
            "\n"
            'Run "Visual Studio Installer" from the Start menu to install or modify it.\n'
        )

    vc_tool_instances = windows_supported_vs_instances(
        vswhere, requires=VISUAL_STUDIO_VC_TOOLS_COMPONENT
    )
    if not vc_tool_instances:
        raise RuntimeError(
            f"{RED_TEXT}Native module rebuilds require Visual Studio 2022 or 2026 "
            f"C++ build tools.{NORMAL_TEXT}\n"
            "Visual Studio is installed, but I did not find any C++ tools installed.\n"
            "\n"
            '👉 Manually run "Visual Studio Installer" from the Start menu, "Modify"\n'
            'the installation, and install "Desktop Development With C++".\n'
        )

    if len(vc_tool_instances) > 1:
        print(
            f"{YELLOW_TEXT}⚠️  Warning: Multiple supported Visual Studio instances with C++ build tools were found.{NORMAL_TEXT}\n"
            "Trying them newest-first until a complete native build environment is found:\n"
            f"  {detected_instance_paths(vc_tool_instances)}",
            file=sys.stderr,
        )

    candidates = []
    missing_vcvarsall = []
    for instance in vc_tool_instances:
        install_path = instance.get("installationPath")
        vcvarsall = (
            Path(install_path)
            / "VC"
            / "Auxiliary"
            / "Build"
            / "vcvarsall.bat"
        )
        if vcvarsall.is_file():
            candidates.append(vcvarsall.resolve())
        else:
            missing_vcvarsall.append(vcvarsall)

    if not candidates:
        searched = "\n".join(f"  - {path}" for path in missing_vcvarsall)
        raise RuntimeError(
            f"{RED_TEXT}Could not find vcvarsall.bat{NORMAL_TEXT} in any supported Visual Studio installation:\n"
            f"{searched}",
        )

    return candidates


def capture_windows_vcvars_env(vcvarsall: Path) -> dict[str, str]:
    cmd = f'call "{vcvarsall}" x64 >nul && set'
    # Use cmd's native parsing for the vcvarsall.bat + call chain; Python's
    # argv-to-command-line quoting is brittle for this batch-file shape.
    output = subprocess.check_output(cmd, text=True, errors="replace", shell=True)

    captured: dict[str, str] = {}
    for line in output.splitlines():
        if "=" not in line:
            continue
        name, value = line.split("=", 1)
        if not name:
            continue
        captured[name] = value.rstrip()

    gyp_msvs_version = windows_gyp_msvs_version(captured)
    captured["GYP_MSVS_VERSION"] = gyp_msvs_version
    captured["npm_config_msvs_version"] = gyp_msvs_version
    return captured


def missing_windows_native_build_tools(env: dict[str, str]) -> list[str]:
    path_value = env_value(env, "Path") or ""
    return [
        tool
        for tool in ("cl.exe", "rc.exe")
        if shutil.which(tool, path=path_value) is None
    ]


def windows_spectre_mitigation_libs_issue(env: dict[str, str]) -> str | None:
    vc_tools_install_dir = env_value(env, "VCToolsInstallDir")
    if not vc_tools_install_dir:
        return "VCToolsInstallDir is not set"

    target_arch = (env_value(env, "VSCMD_ARG_TGT_ARCH") or "x64").lower()
    target_arch = {"amd64": "x64"}.get(target_arch, target_arch)
    spectre_lib_dir = Path(vc_tools_install_dir) / "lib" / "spectre" / target_arch
    required_libs = ["libcmt.lib", "msvcprt.lib", "vcruntime.lib"]
    missing_libs = [name for name in required_libs if not (spectre_lib_dir / name).is_file()]
    if not missing_libs:
        return None

    missing = ", ".join(missing_libs)
    return f"missing {missing} under {spectre_lib_dir}"


def windows_node_gyp_visual_studio_issue(
    env: dict[str, str], node_gyp: str
) -> str | None:
    visual_studio_finder = Path(node_gyp).resolve().parent.parent / "lib/find-visualstudio.js"
    if not visual_studio_finder.is_file():
        return f"node-gyp Visual Studio finder is missing at {visual_studio_finder}"

    script = (
        "const finder = require(process.argv[1]);"
        "const nodeSemver = { major: Number(process.versions.node.split('.')[0]) };"
        "finder.findVisualStudio(nodeSemver, process.env.npm_config_msvs_version)"
        ".then(() => process.exit(0))"
        ".catch((error) => { console.error(error.message); process.exit(1); });"
    )
    node = shutil.which("node", path=env_value(env, "Path")) or "node"
    result = subprocess.run(
        [node, "-e", script, str(visual_studio_finder)],
        env=env,
        text=True,
        capture_output=True,
        check=False,
    )
    if result.returncode == 0:
        return None

    detail = (result.stderr or result.stdout).strip().splitlines()
    reason = detail[-1] if detail else f"probe exited with code {result.returncode}"
    return f"node-gyp cannot use this Visual Studio installation: {reason}"


def ensure_windows_native_build_env(
    env: dict[str, str], *, node_gyp: str
) -> dict[str, str]:
    if sys.platform != "win32":
        return env

    if (
        env_value(env, "VCINSTALLDIR")
        and env_value(env, "WindowsSdkDir")
        and env_value(env, "WindowsSDKVersion")
    ):
        gyp_msvs_version = windows_gyp_msvs_version(env)
        env.setdefault("GYP_MSVS_VERSION", gyp_msvs_version)
        env.setdefault("npm_config_msvs_version", gyp_msvs_version)
        node_gyp_issue = windows_node_gyp_visual_studio_issue(env, node_gyp)
        if node_gyp_issue is not None:
            raise RuntimeError(node_gyp_issue)
        return env

    failures = []
    for vcvarsall in find_windows_vcvarsall_candidates():
        merged = env.copy()
        try:
            merged.update(capture_windows_vcvars_env(vcvarsall))
        except subprocess.CalledProcessError as exc:
            failures.append(
                f"  - {vcvarsall}: vcvarsall.bat failed with exit code {exc.returncode}"
            )
            continue

        missing_tools = missing_windows_native_build_tools(merged)
        if missing_tools:
            failures.append(
                f"  - {vcvarsall}: missing required tools {', '.join(missing_tools)}"
            )
            continue

        spectre_issue = windows_spectre_mitigation_libs_issue(merged)
        if spectre_issue is not None:
            failures.append(f"  - {vcvarsall}: {spectre_issue}")
            continue

        node_gyp_issue = windows_node_gyp_visual_studio_issue(merged, node_gyp)
        if node_gyp_issue is not None:
            failures.append(f"  - {vcvarsall}: {node_gyp_issue}")
            continue

        print(f"bootstrapped Visual Studio build environment from {vcvarsall}")
        return merged

    details = "\n".join(failures)
    raise RuntimeError(
        f"{RED_TEXT}No supported Visual Studio installation has a complete native build environment.{NORMAL_TEXT}\n"
        f"{details}"
    )


def require_windows_spectre_mitigation_libs(env: dict[str, str]) -> None:
    if sys.platform != "win32":
        return

    issue = windows_spectre_mitigation_libs_issue(env)
    if issue is not None:
        raise RuntimeError(
            f"{RED_TEXT}Native module rebuilds require Visual Studio's Spectre-mitigated C++ libraries.{NORMAL_TEXT}\n"
            f"The selected build environment is {issue}.\n"
            "\n"
            "Install the VS Installer individual component named "
            "'MSVC v143 - VS 2022 C++ x64/x86 Spectre-mitigated libs', then retry.\n"
            "\n"
            "From an admin PowerShell, run:\n"
            '  & "${env:ProgramFiles(x86)}\\Microsoft Visual Studio\\Installer\\setup.exe" modify `\n'
            '  --installPath "C:\\Program Files\\Microsoft Visual Studio\\2022\\Community" `\n'
            "  --add Microsoft.VisualStudio.Component.VC.14.44.17.14.x86.x64.Spectre `\n"
            "  --passive `\n"
            "  --norestart"
        )


def read_node_module_version(config_gypi: Path) -> str:
    match = re.search(
        r"['\"]node_module_version['\"]:\s*([0-9]+)",
        config_gypi.read_text(encoding="utf-8"),
    )
    if match is None:
        raise RuntimeError(f"Unable to read node_module_version from {config_gypi}")
    return match.group(1)


def owl_config_gypi(out_dir: Path) -> Path:
    return out_dir / "gen/third_party/owl-node/config.gypi"


def staged_owl_config_gypi(headers_dir: Path) -> Path:
    return headers_dir / "owl-config.gypi"


def staged_node_config_gypi(headers_dir: Path) -> Path:
    return headers_dir / "include/node/config.gypi"


def write_unsanitized_owl_config_gypi(source: Path, destination: Path) -> None:
    config = read_json(source)
    variables = config.setdefault("variables", {})
    if isinstance(variables, dict):
        variables["asan"] = 0
    destination.parent.mkdir(parents=True, exist_ok=True)
    destination.write_text(
        json.dumps(config, sort_keys=True) + "\n",
        encoding="utf-8",
    )


def windows_native_module_v8_import_libs(out_dir: Path) -> list[Path]:
    libs = [out_dir / "v8.dll.lib"]
    return [path for path in libs if path.is_file()]


def windows_native_module_signature_inputs(headers_dir: Path, out_dir: Path) -> list[Path]:
    return [
        headers_dir / "Release" / "node.lib",
        *windows_native_module_v8_import_libs(out_dir),
    ]


def append_unique_path(paths: list[Path], path: Path) -> None:
    resolved = path.expanduser().resolve()
    if resolved not in paths:
        paths.append(resolved)


def chromium_src_from_out_dir(out_dir: Path | None) -> Path | None:
    if out_dir is None:
        return None
    resolved = out_dir.expanduser().resolve()
    if resolved.parent.name != "out":
        return None
    return resolved.parent.parent


def git_worktree_roots(owl_root: Path) -> list[Path]:
    try:
        output = subprocess.check_output(
            ["git", "-C", str(owl_root), "worktree", "list", "--porcelain"],
            text=True,
            stderr=subprocess.DEVNULL,
        )
    except (FileNotFoundError, subprocess.CalledProcessError):
        return []

    roots = []
    for line in output.splitlines():
        if line.startswith("worktree "):
            roots.append(Path(line.removeprefix("worktree ")))
    return roots


def resolve_chromium_src(owl_root: Path, out_dir: Path | None = None) -> Path:
    candidates: list[Path] = []
    if env_chromium_src := os.environ.get(OWL_CHROMIUM_SRC_ENV):
        append_unique_path(candidates, Path(env_chromium_src))

    out_dir_chromium_src = chromium_src_from_out_dir(out_dir)
    if out_dir_chromium_src is not None:
        append_unique_path(candidates, out_dir_chromium_src)

    append_unique_path(candidates, owl_root / "chromium" / "src")
    append_unique_path(candidates, owl_root.parent)
    for worktree_root in git_worktree_roots(owl_root):
        append_unique_path(candidates, worktree_root / "chromium" / "src")

    for candidate in candidates:
        if (candidate / "third_party/owl-node").is_dir():
            return candidate
    raise RuntimeError(
        "Unable to locate Chromium src root. Expected third_party/owl-node under "
        + ", ".join(str(candidate) for candidate in candidates)
        + f". Set {OWL_CHROMIUM_SRC_ENV} to an existing Chromium src checkout "
        "when running from a source-only Owl worktree."
    )


def update_signature_with_file_contents(hasher, path: Path, label: str) -> None:
    hasher.update(label.replace("\\", "/").encode("utf-8"))
    hasher.update(b"\0")
    with path.open("rb") as file:
        for chunk in iter(lambda: file.read(1024 * 1024), b""):
            hasher.update(chunk)
    hasher.update(b"\0")


def update_signature_with_file_metadata(hasher, path: Path, label: str) -> None:
    stat = path.stat()
    hasher.update(label.replace("\\", "/").encode("utf-8"))
    hasher.update(b"\0")
    hasher.update(str(stat.st_size).encode("ascii"))
    hasher.update(b"\0")
    hasher.update(str(stat.st_mtime_ns).encode("ascii"))
    hasher.update(b"\0")


def update_signature_with_tree_metadata(hasher, root: Path, label: str) -> None:
    if not root.is_dir():
        raise RuntimeError(f"Missing signature input directory: {root}")
    for path in sorted(candidate for candidate in root.rglob("*") if candidate.is_file()):
        relative = path.relative_to(root).as_posix()
        update_signature_with_file_metadata(hasher, path, f"{label}/{relative}")


def update_signature_with_tree_contents(hasher, root: Path, label: str) -> None:
    if not root.is_dir():
        raise RuntimeError(f"Missing signature input directory: {root}")
    for path in sorted(candidate for candidate in root.rglob("*") if candidate.is_file()):
        relative = path.relative_to(root).as_posix()
        update_signature_with_file_contents(hasher, path, f"{label}/{relative}")


def owl_build_signature(config_gypi: Path, headers_dir: Path, out_dir: Path) -> str:
    hasher = sha1()
    update_signature_with_file_contents(hasher, config_gypi, "config.gypi")
    update_signature_with_tree_contents(hasher, headers_dir / "include", "headers/include")

    if sys.platform == "win32":
        # Windows native modules also depend on build-output import libraries.
        for path in windows_native_module_signature_inputs(headers_dir, out_dir):
            if not path.is_file():
                raise RuntimeError(f"Missing native module signature input: {path}")
            update_signature_with_file_contents(
                hasher,
                path,
                path.relative_to(out_dir).as_posix()
                if path.is_relative_to(out_dir)
                else path.name,
            )

    return hasher.hexdigest()


def install_owl_node_headers(src_root: Path, out_dir: Path, headers_dir: Path) -> str:
    source_config_gypi = owl_config_gypi(out_dir)
    if not source_config_gypi.exists():
        raise RuntimeError(f"Missing {source_config_gypi}. Run ./owl first.")
    config_gypi = staged_owl_config_gypi(headers_dir)
    write_unsanitized_owl_config_gypi(source_config_gypi, config_gypi)

    cmd = [
        sys.executable,
        str(src_root / "third_party/owl-node/tools/install.py"),
        "install",
        "--headers-only",
        "--root-dir",
        str(src_root / "third_party/owl-node"),
        "--v8-dir",
        str(src_root / "v8"),
        "--config-gypi-path",
        str(config_gypi),
        "--dest-dir",
        str(headers_dir),
        "--prefix",
        "/",
    ]
    print("+", " ".join(cmd))
    result = subprocess.run(cmd, text=True, capture_output=True)
    if result.returncode != 0:
        if result.stdout:
            print(result.stdout, end="", file=sys.stderr)
        if result.stderr:
            print(result.stderr, end="", file=sys.stderr)
        raise subprocess.CalledProcessError(result.returncode, cmd)
    write_unsanitized_owl_config_gypi(config_gypi, staged_node_config_gypi(headers_dir))
    if sys.platform == "win32":
        # node-gyp's Windows generator links addons against
        # <nodedir>/$(Configuration)/node.lib. Owl exports Node symbols from
        # chrome.dll, so stage Chromium's import library at the path node-gyp
        # expects for a Node/Electron header tree.
        node_lib_dir = headers_dir / "Release"
        node_lib_dir.mkdir(parents=True, exist_ok=True)
        chrome_import_lib = out_dir / "chrome.dll.lib"
        if not chrome_import_lib.exists():
            raise RuntimeError(f"Missing {chrome_import_lib}. Run ./owl first.")
        shutil.copy2(chrome_import_lib, node_lib_dir / "node.lib")
    print(f"installed Owl Node headers in {headers_dir}")
    return read_node_module_version(config_gypi)


def node_eval(script: str, *args: str, cwd: Path | None = None) -> str:
    node = shutil.which("node") or "node"
    return run_capture([node, "-e", script, *args], cwd=cwd)


def node_modules_dirs(from_dir: Path) -> list[Path]:
    current = from_dir.resolve()
    candidates: list[Path] = []
    seen: set[Path] = set()
    for candidate_root in (current, *current.parents):
        candidate = candidate_root / "node_modules"
        resolved = candidate.resolve(strict=False)
        if resolved in seen or not candidate.is_dir():
            continue
        seen.add(resolved)
        candidates.append(resolved)
    return candidates


def resolve_package_dir(module_name: str, from_dir: Path) -> Path:
    for modules_dir in node_modules_dirs(from_dir):
        candidate = modules_dir / Path(*module_name.split("/"))
        if (candidate / "package.json").is_file():
            return candidate.resolve()
    raise FileNotFoundError(f"Unable to resolve {module_name!r} from {from_dir}")


def resolve_node_gyp(from_dir: Path) -> str:
    script = (
        "const [fromDir, ...specs] = process.argv.slice(1);"
        "for (const spec of specs) {"
        "  try { process.stdout.write(require.resolve(spec, { paths: [fromDir] })); process.exit(0); }"
        "  catch {}"
        "}"
        "process.exit(1);"
    )
    try:
        return node_eval(script, str(from_dir), *NODE_GYP_SPECS)
    except subprocess.CalledProcessError as exc:
        raise RuntimeError(
            f"Unable to resolve node-gyp from {from_dir}. Install node-gyp in the app workspace."
        ) from exc


def parse_electron_version(app_package: dict) -> str | None:
    raw = (
        app_package.get("devDependencies", {}).get("electron")
        or app_package.get("dependencies", {}).get("electron")
    )
    if not isinstance(raw, str):
        return None
    match = re.search(r"(\d+\.\d+\.\d+)", raw)
    return match.group(1) if match else None


def direct_dependency_names(package_json: dict, types: tuple[str, ...]) -> list[str]:
    names: list[str] = []
    if "prod" in types:
        names.extend(package_json.get("dependencies", {}).keys())
    if "optional" in types:
        names.extend(package_json.get("optionalDependencies", {}).keys())
    if "dev" in types:
        names.extend(package_json.get("devDependencies", {}).keys())
    return names


def child_dependency_names(package_json: dict) -> list[str]:
    names = list(package_json.get("dependencies", {}).keys())
    names.extend(package_json.get("optionalDependencies", {}).keys())
    return names


def npm_current_cpu() -> str:
    machine = platform.machine().lower()
    return {
        "amd64": "x64",
        "x86_64": "x64",
        "i386": "ia32",
        "i686": "ia32",
        "aarch64": "arm64",
    }.get(machine, machine)


def npm_platform_field_matches(raw: object, current: str) -> bool:
    if raw is None:
        return True
    values = [raw] if isinstance(raw, str) else raw
    if not isinstance(values, list):
        return True

    entries = [value for value in values if isinstance(value, str)]
    if not entries:
        return True

    if f"!{current}" in entries:
        return False

    allowed = [value for value in entries if not value.startswith("!")]
    return not allowed or current in allowed


def package_matches_current_platform(package_json: dict) -> bool:
    if not npm_platform_field_matches(package_json.get("os"), sys.platform):
        return False
    if not npm_platform_field_matches(package_json.get("cpu"), npm_current_cpu()):
        return False
    if sys.platform == "linux":
        libc, _ = platform.libc_ver()
        current_libc = "glibc" if libc == "glibc" else libc
        if current_libc and not npm_platform_field_matches(
            package_json.get("libc"), current_libc
        ):
            return False
    return True


def matches_module_filter(package_json: dict, package_dir: Path, filters: set[str]) -> bool:
    if not filters:
        return True
    name = package_json.get("name", "")
    return name in filters or package_dir.name in filters


def discover_native_modules(
    app_dir: Path, types: tuple[str, ...], only_modules: set[str]
) -> list[tuple[Path, dict]]:
    root_package = read_json(app_dir / "package.json")
    seen_dirs: set[Path] = set()
    discovered: list[tuple[Path, dict]] = []
    pending = [(module_name, app_dir) for module_name in direct_dependency_names(root_package, types)]

    while pending:
        module_name, from_dir = pending.pop()
        try:
            package_dir = resolve_package_dir(module_name, from_dir)
        except FileNotFoundError:
            print(f"warning: could not resolve {module_name!r} from {from_dir}", file=sys.stderr)
            continue

        real_dir = package_dir.resolve()
        if real_dir in seen_dirs:
            continue
        seen_dirs.add(real_dir)

        package_json = read_json(real_dir / "package.json")
        if not package_matches_current_platform(package_json):
            continue

        if (real_dir / "binding.gyp").is_file() and matches_module_filter(
            package_json, real_dir, only_modules
        ):
            discovered.append((real_dir, package_json))

        for child_name in child_dependency_names(package_json):
            pending.append((child_name, real_dir))

    if (app_dir / "binding.gyp").is_file():
        app_package = root_package
        if package_matches_current_platform(app_package) and matches_module_filter(
            app_package, app_dir, only_modules
        ):
            discovered.append((app_dir.resolve(), app_package))

    discovered.sort(key=lambda item: item[1].get("name", item[0].name))
    return discovered


def sanitize_name(value: str) -> str:
    return re.sub(r"[^A-Za-z0-9._-]+", "_", value)


def build_work_dir(out_dir: Path, package_dir: Path, package_json: dict) -> Path:
    package_name = package_json.get("name", package_dir.name)
    version = package_json.get("version", "0")
    identity = sha1(str(package_dir.resolve()).encode()).hexdigest()[:8]
    dirname = f"{sanitize_name(package_name)}-{sanitize_name(version)}-{identity}"
    return out_dir / "native-module-builds" / dirname


def built_node_artifacts(work_dir: Path) -> list[Path]:
    return sorted(
        path
        for path in work_dir.rglob("*.node")
        if "build" in path.parts
    )


def existing_node_artifacts(package_dir: Path) -> list[Path]:
    return built_node_artifacts(package_dir)


def binary_symbols(binary: Path) -> str:
    if sys.platform == "win32":
        # Native Windows toolchains are not guaranteed to have nm/dumpbin on
        # PATH. The exported Node registration symbol is present as a byte
        # string in PE artifacts, which is enough for the ABI/stamp check.
        return binary.read_bytes().decode("latin-1", errors="ignore")
    return subprocess.check_output(["nm", "-gU", str(binary)], text=True)


def is_node_module_artifact(binary: Path) -> bool:
    symbols = binary_symbols(binary)
    return (
        "_node_register_module_v" in symbols
        or "node_register_module_v" in symbols
        or "_napi_register_module_v" in symbols
        or "napi_register_module_v" in symbols
    )


def addon_artifacts(artifacts: list[Path]) -> list[Path]:
    return [path for path in artifacts if is_node_module_artifact(path)]


def binary_matches_owl(binary: Path, expected_version: str) -> bool:
    symbols = binary_symbols(binary)
    return (
        f"_node_register_module_v{expected_version}" in symbols
        or f"node_register_module_v{expected_version}" in symbols
        or "_napi_register_module_v" in symbols
        or "napi_register_module_v" in symbols
        or " napi_register_module_v" in symbols
    )


def binary_is_napi(binary: Path) -> bool:
    symbols = binary_symbols(binary)
    return (
        "_napi_register_module_v" in symbols
        or "napi_register_module_v" in symbols
        or " napi_register_module_v" in symbols
    )


def parse_prebuild_tuple(name: str) -> tuple[str, tuple[str, ...]] | None:
    parts = name.split("-")
    if len(parts) != 2:
        return None
    platform_name, arch_names = parts
    arches = tuple(arch for arch in arch_names.split("+") if arch)
    if not platform_name or not arches:
        return None
    return platform_name, arches


def current_prebuild_dir(package_dir: Path) -> Path | None:
    prebuilds_dir = package_dir / "prebuilds"
    if not prebuilds_dir.is_dir():
        return None

    platform_name = sys.platform
    arch_name = npm_current_cpu()
    exact = prebuilds_dir / f"{platform_name}-{arch_name}"
    if exact.is_dir():
        return exact

    candidates: list[tuple[int, Path]] = []
    for entry in prebuilds_dir.iterdir():
        if not entry.is_dir():
            continue
        parsed = parse_prebuild_tuple(entry.name)
        if parsed is None:
            continue
        entry_platform, entry_arches = parsed
        if entry_platform == platform_name and arch_name in entry_arches:
            candidates.append((len(entry_arches), entry))

    if not candidates:
        return None
    return sorted(candidates)[0][1]


def relative_napi_prebuilds(package_dir: Path) -> tuple[Path, ...]:
    prebuild_dir = current_prebuild_dir(package_dir)
    if prebuild_dir is None:
        return ()

    node_files = tuple(sorted(prebuild_dir.glob("*.node")))
    if not node_files or not all(binary_is_napi(path) for path in node_files):
        return ()
    return tuple(path.relative_to(package_dir) for path in node_files)


def install_has_napi_prebuilds(
    install_package_dir: Path, prebuilds: tuple[Path, ...]
) -> bool:
    for relative_path in prebuilds:
        installed = install_package_dir / relative_path
        if not installed.is_file() or not binary_is_napi(installed):
            return False
    return True


def owl_build_stamp_path(package_dir: Path) -> Path:
    return package_dir / OWL_BUILD_STAMP


def read_owl_build_stamp(package_dir: Path) -> dict | None:
    path = owl_build_stamp_path(package_dir)
    if not path.is_file():
        return None
    try:
        return read_json(path)
    except json.JSONDecodeError:
        return None


def write_owl_build_stamp(package_dir: Path, *, expected_version: str, build_signature: str) -> None:
    owl_build_stamp_path(package_dir).write_text(
        json.dumps(
            {
                "tool": "owl/tools/rebuild_native_modules.py",
                "node_module_version": expected_version,
                "build_signature": build_signature,
            },
            indent=2,
            sort_keys=True,
        )
        + "\n",
        encoding="utf-8",
    )


def copy_artifacts(package_dir: Path, work_dir: Path, artifacts: list[Path]) -> None:
    for artifact in artifacts:
        relative = artifact.relative_to(work_dir)
        destination = package_dir / relative
        destination.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy2(artifact, destination)


def install_package_dir_for_name(install_app_dir: Path, package_name: str) -> Path:
    return install_app_dir / "node_modules" / Path(*package_name.split("/"))


def install_path_for_package(
    source_app_dir: Path,
    install_app_dir: Path,
    source_package_dir: Path,
    package_json: dict,
) -> Path:
    if source_package_dir.resolve() == source_app_dir.resolve():
        return install_app_dir
    package_name = package_json.get("name", source_package_dir.name)
    return install_package_dir_for_name(install_app_dir, package_name)


def print_prebuilt_module_statuses(prebuilt_modules: list[NapiPrebuildTarget]) -> None:
    for module in prebuilt_modules:
        source_display = f"{module.source_dir} -> {module.install_dir}"
        prebuild_display = ", ".join(str(path) for path in module.prebuilds)
        label = "prebuild" if len(module.prebuilds) == 1 else "prebuilds"
        print(
            f"- {module.name} [{source_display}] "
            f"(ok: N-API {label} {prebuild_display})"
        )


def remove_shadowing_build_artifacts(package_dir: Path) -> None:
    for artifact in existing_node_artifacts(package_dir):
        artifact.unlink()


def prepare_napi_prebuilt_modules(prebuilt_modules: list[NapiPrebuildTarget]) -> None:
    for module in prebuilt_modules:
        remove_shadowing_build_artifacts(module.install_dir)


def rebuild_reason(
    package_dir: Path, expected_version: str, build_signature: str, *, force: bool
) -> str | None:
    if force:
        return "forced"
    artifacts = existing_node_artifacts(package_dir)
    if not artifacts:
        return "missing build artifacts"
    addons = addon_artifacts(artifacts)
    if not addons:
        return "missing Node addon artifacts"
    if not all(binary_matches_owl(path, expected_version) for path in addons):
        return f"artifacts do not match Owl ABI {expected_version}"
    stamp = read_owl_build_stamp(package_dir)
    if stamp is None:
        return "missing Owl build stamp"
    if stamp.get("node_module_version") != expected_version:
        return f"stamp node_module_version does not match Owl ABI {expected_version}"
    if stamp.get("build_signature") != build_signature:
        return "stamp build signature does not match current Owl headers"
    return None


def rebuild_module(
    source_package_dir: Path,
    install_package_dir: Path,
    package_json: dict,
    *,
    base_env: dict[str, str],
    out_dir: Path,
    headers_dir: Path,
    expected_version: str,
    build_signature: str,
    node_gyp: str,
    electron_version: str | None,
) -> None:
    work_dir = build_work_dir(out_dir, source_package_dir, package_json)
    if work_dir.exists():
        shutil.rmtree(work_dir)
    shutil.copytree(source_package_dir, work_dir, symlinks=False)

    package_name = package_json.get("name", source_package_dir.name)

    env = base_env.copy()
    env["npm_config_nodedir"] = str(headers_dir)
    env["npm_config_build_from_source"] = "true"
    node_path_entries = [str(path) for path in node_modules_dirs(source_package_dir)]
    if env.get("NODE_PATH"):
        node_path_entries.append(env["NODE_PATH"])
    if node_path_entries:
        env["NODE_PATH"] = os.pathsep.join(node_path_entries)
    if electron_version is not None:
        env["npm_config_runtime"] = "electron"
        env["npm_config_target"] = electron_version
    defines = (
        f" -DUSING_ELECTRON_CONFIG_GYPI=1"
        f" -DNODE_EMBEDDER_MODULE_VERSION={expected_version}"
    )
    strip_address_sanitizer_flags(env)
    env["CXXFLAGS"] = (env.get("CXXFLAGS", "") + defines).strip()
    if sys.platform == "win32":
        v8_import_libs = windows_native_module_v8_import_libs(out_dir)
        if v8_import_libs:
            link_options = [env.get("LINK", "").strip()]
            link_options.extend(f'"{path}"' for path in v8_import_libs)
            env["LINK"] = " ".join(option for option in link_options if option)
            # Component builds ship V8 as v8.dll, whose import library does
            # not export every inline V8 API symbol emitted by the headers.
            env["_CL_"] = (
                env.get("_CL_", "")
                + " /DOWL_NATIVE_MODULE_NO_USING_V8_SHARED=1 /UUSING_V8_SHARED /DUSING_V8_SHARED=0"
            ).strip()

        msvc_defines = (
            f" /DUSING_ELECTRON_CONFIG_GYPI=1"
            f" /DNODE_EMBEDDER_MODULE_VERSION={expected_version}"
        )
        env["CL"] = (env.get("CL", "") + msvc_defines).strip()

    node = shutil.which("node") or "node"
    run([node, node_gyp, "rebuild", "--release"], cwd=work_dir, env=env)

    artifacts = built_node_artifacts(work_dir)
    if not artifacts:
        raise RuntimeError(
            f"No built .node artifacts found for {package_name}"
        )

    addons = addon_artifacts(artifacts)
    if not addons:
        names = ", ".join(str(path.relative_to(work_dir)) for path in artifacts)
        raise RuntimeError(
            f"No Node addon artifacts found for {package_name}: {names}"
        )

    if not all(binary_matches_owl(path, expected_version) for path in addons):
        names = ", ".join(str(path.relative_to(work_dir)) for path in addons)
        raise RuntimeError(
            f"Built artifacts for {package_name} did not match Owl ABI {expected_version}: {names}"
        )

    copy_artifacts(install_package_dir, work_dir, artifacts)
    write_owl_build_stamp(
        install_package_dir,
        expected_version=expected_version,
        build_signature=build_signature,
    )
    print(
        f"installed {package_name} "
        f"({len(artifacts)} artifact{'s' if len(artifacts) != 1 else ''})"
    )


def main() -> int:
    args = parse_args()
    owl_root = Path(__file__).resolve().parents[1]
    raw_out_dir = args.out_dir.expanduser().resolve() if args.out_dir is not None else None
    src_root = resolve_chromium_src(owl_root, raw_out_dir)
    out_dir = raw_out_dir or (src_root / "out/owl").resolve()
    app_dir = args.app_dir.expanduser().resolve()
    validate_app_path(app_dir)

    source_app_dir = app_dir
    install_app_dir = app_dir
    if is_asar_path(app_dir):
        install_app_dir = unpacked_app_dir_for_asar(app_dir)
        if not install_app_dir.is_dir():
            raise RuntimeError(
                f"Missing unpacked app directory for packaged ASAR app: {install_app_dir}"
            )
        if args.source_app_dir is not None:
            source_app_dir = args.source_app_dir.expanduser().resolve()
        else:
            inferred = find_source_app_dir_for_asar(app_dir)
            if inferred is None:
                raise RuntimeError(
                    "Unable to infer source app workspace for packaged ASAR app. "
                    "Pass --source-app-dir."
                )
            source_app_dir = inferred
        validate_app_path(source_app_dir)
        if is_asar_path(source_app_dir):
            raise RuntimeError(
                f"--source-app-dir must be a workspace directory, not an ASAR archive: {source_app_dir}"
            )

    types = parse_types(args.types)
    only_modules = set(args.module)
    native_modules = discover_native_modules(source_app_dir, types, only_modules)
    rebuildable_modules: list[NativeModuleTarget] = []
    prebuilt_modules: list[NapiPrebuildTarget] = []
    for source_package_dir, package_json in native_modules:
        package_name = package_json.get("name", source_package_dir.name)
        install_package = install_path_for_package(
            source_app_dir,
            install_app_dir,
            source_package_dir,
            package_json,
        )
        if not install_package.is_dir():
            if is_asar_path(app_dir):
                print(
                    f"warning: packaged app is missing unpacked module {package_name!r} at {install_package}",
                    file=sys.stderr,
                )
            continue
        target = NativeModuleTarget(source_package_dir, install_package, package_json)
        prebuilds = relative_napi_prebuilds(source_package_dir)
        if prebuilds and install_has_napi_prebuilds(install_package, prebuilds):
            prebuilt_modules.append(
                NapiPrebuildTarget(
                    source_package_dir, install_package, package_json, prebuilds
                )
            )
            continue
        rebuildable_modules.append(target)

    if not rebuildable_modules and not prebuilt_modules:
        print(f"No rebuildable native modules found for install target {install_app_dir}")
        return 0
    if not rebuildable_modules:
        print("Native modules:")
        print_prebuilt_module_statuses(prebuilt_modules)
        if not args.dry_run:
            prepare_napi_prebuilt_modules(prebuilt_modules)
        print("All native modules are satisfied by N-API prebuilds")
        return 0

    config_gypi = owl_config_gypi(out_dir)
    if not config_gypi.exists():
        raise RuntimeError(f"Missing {config_gypi}. Run ./owl first.")
    headers_dir = out_dir / "owl-node-headers"
    expected_version = install_owl_node_headers(src_root, out_dir, headers_dir)
    build_signature = owl_build_signature(staged_owl_config_gypi(headers_dir), headers_dir, out_dir)

    print("Native modules:")
    print_prebuilt_module_statuses(prebuilt_modules)
    for module in rebuildable_modules:
        reason = rebuild_reason(
            module.install_dir, expected_version, build_signature, force=args.force
        )
        status = f"rebuild: {reason}" if reason else f"ok: Owl ABI {expected_version}"
        source_display = f"{module.source_dir} -> {module.install_dir}"
        print(f"- {module.name} [{source_display}] ({status})")

    if args.dry_run:
        return 0
    prepare_napi_prebuilt_modules(prebuilt_modules)

    rebuild_targets = [
        module
        for module in rebuildable_modules
        if rebuild_reason(module.install_dir, expected_version, build_signature, force=args.force)
        is not None
    ]
    if not rebuild_targets:
        print(f"All native modules already match Owl ABI {expected_version}")
        return 0

    node_gyp = resolve_node_gyp(source_app_dir)
    base_env = os.environ.copy()
    if sys.platform == "win32":
        base_env = ensure_windows_native_build_env(base_env, node_gyp=node_gyp)
        require_windows_spectre_mitigation_libs(base_env)
    base_env = native_build_environment(base_env)

    electron_version = parse_electron_version(read_json(source_app_dir / "package.json"))

    for module in rebuild_targets:
        rebuild_module(
            module.source_dir,
            module.install_dir,
            module.package_json,
            base_env=base_env,
            out_dir=out_dir,
            headers_dir=headers_dir,
            expected_version=expected_version,
            build_signature=build_signature,
            node_gyp=node_gyp,
            electron_version=electron_version,
        )

    return 0


if __name__ == "__main__":
    raise SystemExit(main())
