mirror of
https://github.com/uprightbass360/AzerothCore-RealmMaster.git
synced 2026-01-13 09:07:20 +00:00
547 lines
19 KiB
Python
Executable File
547 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Module manifest helper.
|
|
|
|
Reads config/modules.json and .env to produce canonical module state that
|
|
downstream shell scripts can consume for staging, rebuild detection, and
|
|
dependency validation.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import textwrap
|
|
from dataclasses import dataclass, asdict, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Optional, Tuple
|
|
import shlex
|
|
|
|
|
|
STRICT_TRUE = {"1", "true", "yes", "on"}
|
|
|
|
|
|
def parse_bool(value: str) -> bool:
|
|
if value is None:
|
|
return False
|
|
return str(value).strip().lower() in STRICT_TRUE
|
|
|
|
|
|
def load_env_file(env_path: Path) -> Dict[str, str]:
|
|
if not env_path.exists():
|
|
return {}
|
|
env: Dict[str, str] = {}
|
|
for raw_line in env_path.read_text(encoding="utf-8").splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if line.startswith("export "):
|
|
line = line[len("export ") :].strip()
|
|
if "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
if value.startswith('"') and value.endswith('"'):
|
|
value = value[1:-1]
|
|
elif value.startswith("'") and value.endswith("'"):
|
|
value = value[1:-1]
|
|
env[key] = value
|
|
return env
|
|
|
|
|
|
def load_manifest(manifest_path: Path) -> List[Dict[str, object]]:
|
|
if not manifest_path.exists():
|
|
raise FileNotFoundError(f"Manifest file not found: {manifest_path}")
|
|
with manifest_path.open("r", encoding="utf-8") as fh:
|
|
manifest = json.load(fh)
|
|
modules = manifest.get("modules")
|
|
if not isinstance(modules, list):
|
|
raise ValueError("Manifest must define a top-level 'modules' array")
|
|
validated: List[Dict[str, object]] = []
|
|
seen_keys: set[str] = set()
|
|
for entry in modules:
|
|
if not isinstance(entry, dict):
|
|
raise ValueError("Each manifest entry must be an object")
|
|
key = entry.get("key")
|
|
name = entry.get("name")
|
|
repo = entry.get("repo")
|
|
if not key or not isinstance(key, str):
|
|
raise ValueError("Manifest entry missing 'key'")
|
|
if key in seen_keys:
|
|
raise ValueError(f"Duplicate manifest key detected: {key}")
|
|
seen_keys.add(key)
|
|
if not name or not isinstance(name, str):
|
|
raise ValueError(f"Manifest entry {key} missing 'name'")
|
|
if not repo or not isinstance(repo, str):
|
|
raise ValueError(f"Manifest entry {key} missing 'repo'")
|
|
validated.append(entry)
|
|
return validated
|
|
|
|
|
|
@dataclass
|
|
class ModuleState:
|
|
key: str
|
|
name: str
|
|
repo: str
|
|
needs_build: bool
|
|
module_type: str
|
|
requires: List[str] = field(default_factory=list)
|
|
ref: Optional[str] = None
|
|
status: str = "active"
|
|
block_reason: Optional[str] = None
|
|
post_install_hooks: List[str] = field(default_factory=list)
|
|
config_cleanup: List[str] = field(default_factory=list)
|
|
sql: Optional[object] = None
|
|
notes: Optional[str] = None
|
|
enabled_raw: bool = False
|
|
enabled_effective: bool = False
|
|
value: str = "0"
|
|
dependency_issues: List[str] = field(default_factory=list)
|
|
warnings: List[str] = field(default_factory=list)
|
|
errors: List[str] = field(default_factory=list)
|
|
|
|
@property
|
|
def blocked(self) -> bool:
|
|
return self.status.lower() == "blocked"
|
|
|
|
|
|
@dataclass
|
|
class ModuleCollectionState:
|
|
manifest_path: Path
|
|
env_path: Path
|
|
modules: List[ModuleState]
|
|
generated_at: datetime
|
|
warnings: List[str]
|
|
errors: List[str]
|
|
|
|
def enabled_modules(self) -> List[ModuleState]:
|
|
return [module for module in self.modules if module.enabled_effective]
|
|
|
|
def compile_modules(self) -> List[ModuleState]:
|
|
return [
|
|
module
|
|
for module in self.modules
|
|
if module.enabled_effective and module.needs_build
|
|
]
|
|
|
|
def requires_playerbot_source(self) -> bool:
|
|
module_map = {m.key: m for m in self.modules}
|
|
playerbots_enabled = module_map.get("MODULE_PLAYERBOTS")
|
|
return bool(playerbots_enabled and playerbots_enabled.enabled_effective)
|
|
|
|
def requires_custom_build(self) -> bool:
|
|
return any(module.needs_build and module.enabled_effective for module in self.modules)
|
|
|
|
|
|
def build_state(env_path: Path, manifest_path: Path) -> ModuleCollectionState:
|
|
env_map = load_env_file(env_path)
|
|
manifest_entries = load_manifest(manifest_path)
|
|
modules: List[ModuleState] = []
|
|
errors: List[str] = []
|
|
warnings: List[str] = []
|
|
|
|
# Track which manifest keys appear in .env for coverage validation
|
|
env_keys_in_manifest: set[str] = set()
|
|
|
|
for entry in manifest_entries:
|
|
key = entry["key"]
|
|
name = entry["name"]
|
|
repo = entry["repo"]
|
|
module_type = str(entry.get("type", "cpp"))
|
|
needs_build_flag = entry.get("needs_build")
|
|
if needs_build_flag is None:
|
|
needs_build = module_type.lower() == "cpp"
|
|
else:
|
|
needs_build = bool(needs_build_flag)
|
|
requires = entry.get("requires") or []
|
|
if not isinstance(requires, list):
|
|
raise ValueError(f"Manifest entry {key} has non-list 'requires'")
|
|
requires = [str(dep) for dep in requires]
|
|
|
|
status = entry.get("status", "active")
|
|
block_reason = entry.get("block_reason")
|
|
post_install_hooks = entry.get("post_install_hooks") or []
|
|
if not isinstance(post_install_hooks, list):
|
|
raise ValueError(f"Manifest entry {key} has non-list 'post_install_hooks'")
|
|
post_install_hooks = [str(hook) for hook in post_install_hooks]
|
|
config_cleanup = entry.get("config_cleanup") or []
|
|
if not isinstance(config_cleanup, list):
|
|
raise ValueError(f"Manifest entry {key} has non-list 'config_cleanup'")
|
|
config_cleanup = [str(pattern) for pattern in config_cleanup]
|
|
sql = entry.get("sql")
|
|
ref = entry.get("ref")
|
|
notes = entry.get("notes")
|
|
|
|
raw_value = env_map.get(key, os.environ.get(key, "0"))
|
|
env_keys_in_manifest.add(key)
|
|
enabled_raw = parse_bool(raw_value)
|
|
|
|
module = ModuleState(
|
|
key=key,
|
|
name=name,
|
|
repo=repo,
|
|
needs_build=needs_build,
|
|
module_type=module_type,
|
|
requires=requires,
|
|
ref=ref,
|
|
status=status,
|
|
block_reason=block_reason,
|
|
post_install_hooks=post_install_hooks,
|
|
config_cleanup=config_cleanup,
|
|
sql=sql,
|
|
notes=notes,
|
|
enabled_raw=enabled_raw,
|
|
)
|
|
|
|
if module.blocked and enabled_raw:
|
|
module.errors.append(
|
|
f"{module.key} is blocked: {module.block_reason or 'blocked in manifest'}"
|
|
)
|
|
|
|
# Effective enablement respects block status
|
|
module.enabled_effective = enabled_raw and not module.blocked
|
|
module.value = "1" if module.enabled_effective else "0"
|
|
|
|
modules.append(module)
|
|
|
|
module_map: Dict[str, ModuleState] = {module.key: module for module in modules}
|
|
|
|
# Dependency validation
|
|
for module in modules:
|
|
if not module.enabled_effective:
|
|
continue
|
|
missing: List[str] = []
|
|
for dependency in module.requires:
|
|
dep_state = module_map.get(dependency)
|
|
if not dep_state or not dep_state.enabled_effective:
|
|
missing.append(dependency)
|
|
if missing:
|
|
plural = "modules" if len(missing) > 1 else "module"
|
|
list_str = ", ".join(missing)
|
|
message = f"{module.key} requires {plural}: {list_str}"
|
|
module.errors.append(message)
|
|
|
|
# Collect warnings/errors
|
|
for module in modules:
|
|
if module.errors:
|
|
errors.extend(module.errors)
|
|
if module.warnings:
|
|
warnings.extend(module.warnings)
|
|
|
|
# Warn if .env defines modules not in manifest
|
|
extra_env_modules = [
|
|
key for key in env_map.keys() if key.startswith("MODULE_") and key not in module_map
|
|
]
|
|
for unknown_key in extra_env_modules:
|
|
warnings.append(f".env defines {unknown_key} but it is missing from the manifest")
|
|
|
|
# Warn if manifest entry lacks .env toggle
|
|
for module in modules:
|
|
if module.key not in env_map and module.key not in os.environ:
|
|
warnings.append(
|
|
f"Manifest includes {module.key} but .env does not define it (defaulting to 0)"
|
|
)
|
|
|
|
return ModuleCollectionState(
|
|
manifest_path=manifest_path,
|
|
env_path=env_path,
|
|
modules=modules,
|
|
generated_at=datetime.now(timezone.utc),
|
|
warnings=warnings,
|
|
errors=errors,
|
|
)
|
|
|
|
|
|
def write_outputs(state: ModuleCollectionState, output_dir: Path) -> None:
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
env_lines: List[str] = [
|
|
"# Autogenerated by scripts/modules.py",
|
|
f"# Generated at {state.generated_at.isoformat()}",
|
|
f'export MODULES_MANIFEST="{state.manifest_path}"',
|
|
f'export MODULES_ENV_PATH="{state.env_path}"',
|
|
]
|
|
|
|
enabled_names: List[str] = []
|
|
compile_names: List[str] = []
|
|
enabled_keys: List[str] = []
|
|
compile_keys: List[str] = []
|
|
|
|
for module in state.modules:
|
|
env_lines.append(f"export {module.key}={module.value}")
|
|
if module.enabled_effective:
|
|
enabled_names.append(module.name)
|
|
enabled_keys.append(module.key)
|
|
if module.enabled_effective and module.needs_build:
|
|
compile_names.append(module.name)
|
|
compile_keys.append(module.key)
|
|
|
|
env_lines.append(f'export MODULES_ENABLED="{ " ".join(enabled_names) }"'.rstrip())
|
|
env_lines.append(f'export MODULES_COMPILE="{ " ".join(compile_names) }"'.rstrip())
|
|
env_lines.append(f'export MODULES_ENABLED_LIST="{",".join(enabled_keys)}"')
|
|
env_lines.append(f'export MODULES_CPP_LIST="{",".join(compile_keys)}"')
|
|
env_lines.append(
|
|
f"export MODULES_REQUIRES_PLAYERBOT_SOURCE="
|
|
f'{"1" if state.requires_playerbot_source() else "0"}'
|
|
)
|
|
env_lines.append(
|
|
f"export MODULES_REQUIRES_CUSTOM_BUILD="
|
|
f'{"1" if state.requires_custom_build() else "0"}'
|
|
)
|
|
env_lines.append(f"export MODULES_WARNING_COUNT={len(state.warnings)}")
|
|
env_lines.append(f"export MODULES_ERROR_COUNT={len(state.errors)}")
|
|
|
|
modules_env_path = output_dir / "modules.env"
|
|
modules_env_path.write_text("\n".join(env_lines) + "\n", encoding="utf-8")
|
|
|
|
state_payload = {
|
|
"generated_at": state.generated_at.isoformat(),
|
|
"manifest_path": str(state.manifest_path),
|
|
"env_path": str(state.env_path),
|
|
"warnings": state.warnings,
|
|
"errors": state.errors,
|
|
"modules": [
|
|
{
|
|
**asdict(module),
|
|
"enabled_raw": module.enabled_raw,
|
|
"enabled_effective": module.enabled_effective,
|
|
"blocked": module.blocked,
|
|
}
|
|
for module in state.modules
|
|
],
|
|
"enabled_modules": [module.name for module in state.enabled_modules()],
|
|
"compile_modules": [module.name for module in state.compile_modules()],
|
|
"requires_playerbot_source": state.requires_playerbot_source(),
|
|
"requires_custom_build": state.requires_custom_build(),
|
|
}
|
|
|
|
modules_state_path = output_dir / "modules-state.json"
|
|
modules_state_path.write_text(
|
|
json.dumps(state_payload, indent=2, sort_keys=True) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
meta_dir = output_dir / ".modules-meta"
|
|
meta_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
compile_list_path = meta_dir / "modules-compile.txt"
|
|
compile_list_path.write_text(
|
|
"\n".join(state_payload["compile_modules"]) + ("\n" if compile_names else ""),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
enabled_list_path = meta_dir / "modules-enabled.txt"
|
|
enabled_list_path.write_text(
|
|
"\n".join(state_payload["enabled_modules"]) + ("\n" if enabled_names else ""),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def print_list(state: ModuleCollectionState, selector: str) -> None:
|
|
if selector == "compile":
|
|
items = [module.name for module in state.compile_modules()]
|
|
elif selector == "enabled":
|
|
items = [module.name for module in state.enabled_modules()]
|
|
elif selector == "keys":
|
|
items = [module.key for module in state.enabled_modules()]
|
|
else:
|
|
raise ValueError(f"Unknown list selector: {selector}")
|
|
for item in items:
|
|
print(item)
|
|
|
|
|
|
def print_requires_playerbot(state: ModuleCollectionState) -> None:
|
|
print("1" if state.requires_playerbot_source() else "0")
|
|
|
|
|
|
|
|
def print_requires_custom_build(state: ModuleCollectionState) -> None:
|
|
print("1" if state.requires_custom_build() else "0")
|
|
|
|
|
|
def print_state(state: ModuleCollectionState, fmt: str) -> None:
|
|
payload = {
|
|
"generated_at": state.generated_at.isoformat(),
|
|
"warnings": state.warnings,
|
|
"errors": state.errors,
|
|
"modules": [
|
|
{
|
|
"key": module.key,
|
|
"name": module.name,
|
|
"enabled": module.enabled_effective,
|
|
"needs_build": module.needs_build,
|
|
"requires": module.requires,
|
|
"blocked": module.blocked,
|
|
"dependency_issues": module.dependency_issues,
|
|
"post_install_hooks": module.post_install_hooks,
|
|
"config_cleanup": module.config_cleanup,
|
|
}
|
|
for module in state.modules
|
|
],
|
|
"enabled_modules": [module.name for module in state.enabled_modules()],
|
|
"compile_modules": [module.name for module in state.compile_modules()],
|
|
"requires_playerbot_source": state.requires_playerbot_source(),
|
|
}
|
|
if fmt == "json":
|
|
json.dump(payload, sys.stdout, indent=2, sort_keys=True)
|
|
sys.stdout.write("\n")
|
|
elif fmt == "shell":
|
|
keys = [module.key for module in state.modules]
|
|
quoted_keys = " ".join(shlex.quote(key) for key in keys)
|
|
print(f"MODULE_KEYS=({quoted_keys})")
|
|
print(
|
|
"declare -A MODULE_NAME MODULE_REPO MODULE_REF MODULE_TYPE MODULE_ENABLED "
|
|
"MODULE_NEEDS_BUILD MODULE_BLOCKED MODULE_POST_INSTALL MODULE_REQUIRES "
|
|
"MODULE_CONFIG_CLEANUP "
|
|
"MODULE_NOTES MODULE_STATUS MODULE_BLOCK_REASON"
|
|
)
|
|
for module in state.modules:
|
|
key = module.key
|
|
post_install = ",".join(module.post_install_hooks)
|
|
dependencies = ",".join(module.requires)
|
|
block_reason = module.block_reason or ""
|
|
ref = module.ref or ""
|
|
notes = module.notes or ""
|
|
config_cleanup = ",".join(module.config_cleanup)
|
|
print(f"MODULE_NAME[{key}]={shlex.quote(module.name)}")
|
|
print(f"MODULE_REPO[{key}]={shlex.quote(module.repo)}")
|
|
print(f"MODULE_REF[{key}]={shlex.quote(ref)}")
|
|
print(f"MODULE_TYPE[{key}]={shlex.quote(module.module_type)}")
|
|
print(f"MODULE_ENABLED[{key}]={1 if module.enabled_effective else 0}")
|
|
print(f"MODULE_NEEDS_BUILD[{key}]={1 if module.needs_build else 0}")
|
|
print(f"MODULE_BLOCKED[{key}]={1 if module.blocked else 0}")
|
|
print(f"MODULE_POST_INSTALL[{key}]={shlex.quote(post_install)}")
|
|
print(f"MODULE_REQUIRES[{key}]={shlex.quote(dependencies)}")
|
|
print(f"MODULE_CONFIG_CLEANUP[{key}]={shlex.quote(config_cleanup)}")
|
|
print(f"MODULE_NOTES[{key}]={shlex.quote(notes)}")
|
|
print(f"MODULE_STATUS[{key}]={shlex.quote(module.status)}")
|
|
print(f"MODULE_BLOCK_REASON[{key}]={shlex.quote(block_reason)}")
|
|
else:
|
|
raise ValueError(f"Unsupported format: {fmt}")
|
|
|
|
|
|
def handle_generate(args: argparse.Namespace) -> int:
|
|
env_path = Path(args.env_path).resolve()
|
|
manifest_path = Path(args.manifest).resolve()
|
|
output_dir = Path(args.output_dir).resolve()
|
|
state = build_state(env_path, manifest_path)
|
|
write_outputs(state, output_dir)
|
|
|
|
if state.warnings:
|
|
warning_block = "\n".join(f"- {warning}" for warning in state.warnings)
|
|
print(
|
|
textwrap.dedent(
|
|
f"""\
|
|
⚠️ Module manifest warnings detected:
|
|
{warning_block}
|
|
"""
|
|
),
|
|
file=sys.stderr,
|
|
)
|
|
if state.errors:
|
|
error_block = "\n".join(f"- {error}" for error in state.errors)
|
|
print(
|
|
textwrap.dedent(
|
|
f"""\
|
|
❌ Module manifest errors detected:
|
|
{error_block}
|
|
"""
|
|
),
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def configure_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Module manifest helper")
|
|
parser.add_argument(
|
|
"--env-path",
|
|
default=".env",
|
|
help="Path to .env file (default: .env)",
|
|
)
|
|
parser.add_argument(
|
|
"--manifest",
|
|
default="config/modules.json",
|
|
help="Path to module manifest (default: config/modules.json)",
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
generate_parser = subparsers.add_parser("generate", help="Generate module state files")
|
|
generate_parser.add_argument(
|
|
"--output-dir",
|
|
default="local-storage/modules",
|
|
help="Directory for generated module artifacts (default: local-storage/modules)",
|
|
)
|
|
generate_parser.set_defaults(func=handle_generate)
|
|
|
|
list_parser = subparsers.add_parser("list", help="Print module lists")
|
|
list_parser.add_argument(
|
|
"--type",
|
|
choices=["compile", "enabled", "keys"],
|
|
default="compile",
|
|
help="List selector (default: compile)",
|
|
)
|
|
|
|
def handle_list(args: argparse.Namespace) -> int:
|
|
state = build_state(Path(args.env_path).resolve(), Path(args.manifest).resolve())
|
|
print_list(state, args.type)
|
|
return 1 if state.errors else 0
|
|
|
|
list_parser.set_defaults(func=handle_list)
|
|
|
|
rps_parser = subparsers.add_parser(
|
|
"requires-playerbot", help="Print 1 if playerbot source is required else 0"
|
|
)
|
|
|
|
def handle_requires_playerbot(args: argparse.Namespace) -> int:
|
|
state = build_state(Path(args.env_path).resolve(), Path(args.manifest).resolve())
|
|
print_requires_playerbot(state)
|
|
return 1 if state.errors else 0
|
|
|
|
rps_parser.set_defaults(func=handle_requires_playerbot)
|
|
|
|
rcb_parser = subparsers.add_parser(
|
|
"requires-custom-build",
|
|
help="Print 1 if a custom source build is required else 0",
|
|
)
|
|
|
|
def handle_requires_custom_build(args: argparse.Namespace) -> int:
|
|
state = build_state(Path(args.env_path).resolve(), Path(args.manifest).resolve())
|
|
print_requires_custom_build(state)
|
|
return 1 if state.errors else 0
|
|
|
|
rcb_parser.set_defaults(func=handle_requires_custom_build)
|
|
|
|
dump_parser = subparsers.add_parser("dump", help="Dump module state (JSON format)")
|
|
dump_parser.add_argument(
|
|
"--format",
|
|
choices=["json", "shell"],
|
|
default="json",
|
|
help="Output format (default: json)",
|
|
)
|
|
|
|
def handle_dump(args: argparse.Namespace) -> int:
|
|
state = build_state(Path(args.env_path).resolve(), Path(args.manifest).resolve())
|
|
print_state(state, args.format)
|
|
return 1 if state.errors else 0
|
|
|
|
dump_parser.set_defaults(func=handle_dump)
|
|
|
|
return parser
|
|
|
|
|
|
def main(argv: Optional[Iterable[str]] = None) -> int:
|
|
parser = configure_parser()
|
|
args = parser.parse_args(argv)
|
|
return args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|