#!/usr/bin/env python3 """migrate_issues_frontmatter — migrate dev/issues/*.md from inline **Key:** value metadata to canonical YAML frontmatter (issue 0100). Idempotent: files that already have id + domain + scope in frontmatter are skipped. Files with partial frontmatter get missing keys merged in without overwriting existing ones. """ from __future__ import annotations import json import os import re import shutil import sys from datetime import date from pathlib import Path from typing import Any # --------------------------------------------------------------------------- # Registry path setup # --------------------------------------------------------------------------- def _find_registry_root() -> Path: here = Path(__file__).resolve() for parent in (here, *here.parents): if (parent / "registry.db").exists(): return parent return Path.cwd() _REGISTRY_ROOT = _find_registry_root() sys.path.insert(0, str(_REGISTRY_ROOT / "python" / "functions")) from core.core import extract_frontmatter # noqa: E402 # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- TODAY = date.today().isoformat() _SKIP_NAMES = {"README.md", "template.md", "README", "template"} _STATUS_ALIASES = { "pendiente": "pendiente", "pending": "pendiente", "in-progress": "in-progress", "en-progreso": "in-progress", "en_progreso": "in-progress", "bloqueado": "bloqueado", "blocked": "bloqueado", "completado": "completado", "done": "completado", "completed": "completado", "deferred": "deferred", "diferido": "deferred", "closed": "completado", } _TYPE_ALIASES = { "app": "app", "feature": "feature", "bugfix": "bugfix", "bug": "bugfix", "refactor": "refactor", "chore": "chore", "docs": "docs", "doc": "docs", "spike": "spike", "epic": "epic", "infra": "infra", "planning": "planning", } _PRIORITY_ALIASES = { "alta": "alta", "high": "alta", "media": "media", "medium": "media", "baja": "baja", "low": "baja", } # --------------------------------------------------------------------------- # Heuristics # --------------------------------------------------------------------------- def _infer_domain(filename: str) -> list[str]: """Return list of canonical domain tags based on filename heuristics.""" f = filename.lower() domains: list[str] = [] if re.search(r"^cpp-|(-cpp-)|imgui|glfw|glsl|altsnap|sizemove", f): domains.append("cpp-stack") if re.search(r"^kanban-|kanban", f): domains.append("kanban") if re.search(r"^trading-|0088[a-z]?-trading", f): domains.append("trading") if re.search(r"^gamedev-|0072[a-z]?-gamedev", f): domains.append("gamedev") if re.search(r"osint|odr-", f): domains.append("osint") if re.search(r"metabase|bigquery|datafactory|data-factory|navegator|cdp-", f): domains.append("data-ingest") if re.search(r"notify|telegram|matrix", f): domains.append("notify") if re.search(r"imagegen|sd-cpp|stable-diffusion", f): domains.append("imagegen") if re.search(r"dag-engine|dagu", f): if "cpp" in f or "imgui" in f: domains.append("cpp-stack") else: domains.append("data-ingest") if re.search(r"audit-|registry-first|uses.functions|nested-app-md", f): domains.append("registry-quality") if re.search(r"autonomous|e2e-validation|registry-call|delegation|capability|call-monitor|mcp-", f): domains.append("meta") if re.search(r"deploy|vps", f): domains.append("deploy") if re.search(r"fn-run|gradle.run|(? str: f = filename.lower() if re.search(r"roadmap", f): return "cross-stack" if re.search(r"extract-|migrate-|audit-", f): return "registry-only" if inline_type == "app" or re.search(r"-app[-.]|app-", f): return "app-scoped" return "multi-app" def _infer_type(filename: str) -> str: f = filename.lower() if re.search(r"roadmap", f): return "epic" if re.search(r"audit-|cleanup-", f): return "chore" if re.search(r"fix-|bugfix-|bug-", f): return "bugfix" return "feature" def _infer_priority_from_mtime(path: Path) -> str: mtime = path.stat().st_mtime mtime_date = date.fromtimestamp(mtime) delta = (date.today() - mtime_date).days if delta <= 14: return "alta" return "media" # --------------------------------------------------------------------------- # Inline parser # --------------------------------------------------------------------------- _H1_RE = re.compile(r"^#\s+(.+)$", re.MULTILINE) _BOLD_KEY_RE = re.compile(r"^\*\*([A-Za-z]+)\*\*:\s*(.*)$") _TABLE_META_RE = re.compile(r"^\|\s*\*\*([A-Za-z]+)\*\*\s*\|\s*(.+?)\s*\|") # Match "NNNN" or "NNNN — rest" _ID_FROM_H1_RE = re.compile(r"^(\d{4}[a-z]?)\s*[—-]\s*(.+)$") _ISSUE_REF_RE = re.compile(r"\b(\d{4}[a-z]?)\b") def _extract_h1_title(content: str) -> tuple[str, str]: """Return (issue_id, clean_title) from H1. Falls back to ('', content first line).""" m = _H1_RE.search(content) if not m: return "", "" h1 = m.group(1).strip() id_m = _ID_FROM_H1_RE.match(h1) if id_m: return id_m.group(1), id_m.group(2).strip() return "", h1 def _parse_inline_meta(content: str) -> dict[str, str]: """Parse **Key:** value lines and table metadata from first ~40 lines.""" meta: dict[str, str] = {} lines = content.splitlines()[:40] for line in lines: # Bold inline: **Status:** value bm = _BOLD_KEY_RE.match(line.strip()) if bm: key = bm.group(1).lower() val = bm.group(2).strip() meta[key] = val continue # Table row: | **Estado** | value | tm = _TABLE_META_RE.match(line.strip()) if tm: key = tm.group(1).lower() val = tm.group(2).strip() # Strip markdown bold from value val = re.sub(r"\*\*(.+?)\*\*", r"\1", val) meta[key] = val return meta def _parse_issue_ids(raw: str) -> list[str]: """Extract issue IDs (NNNN or NNNNa) from a raw string like '0096, 0097 — DONE'.""" if not raw or raw.strip() in ("—", "-", "", "ninguna", "none"): return [] return _ISSUE_REF_RE.findall(raw) def _normalize_status(raw: str) -> str: raw = raw.lower().strip() # extract first word-token token = re.split(r"[\s,;—-]", raw)[0] return _STATUS_ALIASES.get(token, _STATUS_ALIASES.get(raw, "pendiente")) def _normalize_type(raw: str) -> str: raw = raw.lower().strip() # strip trailing annotation like "feature — apps/kanban/" token = re.split(r"[\s,;—-]", raw)[0] return _TYPE_ALIASES.get(token, "") def _normalize_priority(raw: str) -> str: raw = raw.lower().strip() token = re.split(r"[\s,;]", raw)[0] return _PRIORITY_ALIASES.get(token, "") # --------------------------------------------------------------------------- # Frontmatter builder # --------------------------------------------------------------------------- def _build_frontmatter( issue_id: str, title: str, status: str, type_: str, domain: list[str], scope: str, priority: str, depends: list[str], blocks: list[str], related: list[str], created: str, updated: str, tags: list[str], ) -> str: """Render canonical YAML frontmatter block as a string.""" lines: list[str] = ["---"] lines.append(f"id: \"{issue_id}\"") lines.append(f"title: \"{title}\"") lines.append(f"status: {status}") lines.append(f"type: {type_}") if domain: lines.append("domain:") for d in domain: lines.append(f" - {d}") else: lines.append("domain: []") lines.append(f"scope: {scope}") lines.append(f"priority: {priority}") if depends: lines.append("depends:") for d in depends: lines.append(f" - \"{d}\"") else: lines.append("depends: []") if blocks: lines.append("blocks:") for b in blocks: lines.append(f" - \"{b}\"") else: lines.append("blocks: []") if related: lines.append("related:") for r in related: lines.append(f" - \"{r}\"") else: lines.append("related: []") lines.append(f"created: {created}") lines.append(f"updated: {updated}") if tags: lines.append("tags:") for t in tags: lines.append(f" - {t}") else: lines.append("tags: []") lines.append("---") return "\n".join(lines) + "\n" # --------------------------------------------------------------------------- # Per-file processing # --------------------------------------------------------------------------- FileResult = dict[str, Any] def _process_file( path: Path, backup_dir: Path | None, dry_run: bool, ) -> FileResult: result: FileResult = { "path": str(path), "action": "skipped", "domain_inferred": [], "scope_inferred": "", "warnings": [], } content = path.read_text(encoding="utf-8") body_without_fm, existing_fm = extract_frontmatter(content) # Determine issue_id from filename stem = path.stem # e.g. "0099-datahub-app-launcher" id_from_file_m = re.match(r"^(\d{4}[a-z]?)", stem) file_issue_id = id_from_file_m.group(1) if id_from_file_m else stem # Check if already fully migrated if existing_fm and isinstance(existing_fm, dict): fm = existing_fm has_id = "id" in fm has_domain = "domain" in fm has_scope = "scope" in fm if has_id and has_domain and has_scope: result["action"] = "skipped" return result # Partial frontmatter — merge missing keys result["action"] = "merged" _fill_missing_keys(path, fm, body_without_fm, file_issue_id, result, backup_dir, dry_run) return result # No frontmatter: parse inline and build canonical result["action"] = "migrated" _migrate_from_inline(path, content, file_issue_id, result, backup_dir, dry_run) return result def _fill_missing_keys( path: Path, fm: dict, body: str, file_issue_id: str, result: FileResult, backup_dir: Path | None, dry_run: bool, ) -> None: """Merge missing canonical keys into an existing partial frontmatter.""" warnings = result["warnings"] filename = path.name issue_id = str(fm.get("id", file_issue_id)) title = str(fm.get("title", _extract_h1_title(body)[1] or filename)) status = _normalize_status(str(fm.get("status", fm.get("estado", "pendiente")))) type_ = _normalize_type(str(fm.get("type", fm.get("tipo", "")))) if not type_: type_ = _infer_type(filename) if not type_: warnings.append(f"type missing, inferred: {type_ or '(empty)'}") domain_raw = fm.get("domain", fm.get("dominio", [])) if isinstance(domain_raw, str): domain = [domain_raw] if domain_raw else [] elif isinstance(domain_raw, list): domain = domain_raw else: domain = [] if not domain: domain = _infer_domain(filename) result["domain_inferred"] = domain if not domain: warnings.append("domain could not be inferred") scope = str(fm.get("scope", "")) if not scope: scope = _infer_scope(filename, type_) result["scope_inferred"] = scope priority = _normalize_priority(str(fm.get("priority", fm.get("priority", "")))) if not priority: priority = _infer_priority_from_mtime(path) warnings.append(f"priority missing, inferred from mtime: {priority}") depends = _coerce_id_list(fm.get("depends", fm.get("depends_on", []))) blocks = _coerce_id_list(fm.get("blocks", [])) related = _coerce_id_list(fm.get("related", [])) created = str(fm.get("created", TODAY)) tags_raw = fm.get("tags", []) tags = tags_raw if isinstance(tags_raw, list) else [] new_fm = _build_frontmatter( issue_id=issue_id, title=title, status=status, type_=type_, domain=domain, scope=scope, priority=priority, depends=depends, blocks=blocks, related=related, created=created, updated=TODAY, tags=tags, ) new_content = new_fm + body if not dry_run: _backup_file(path, backup_dir) path.write_text(new_content, encoding="utf-8") def _coerce_id_list(val: Any) -> list[str]: if isinstance(val, list): return [str(v) for v in val] if isinstance(val, str): return _parse_issue_ids(val) return [] def _migrate_from_inline( path: Path, content: str, file_issue_id: str, result: FileResult, backup_dir: Path | None, dry_run: bool, ) -> None: """Parse inline metadata and write YAML frontmatter.""" warnings = result["warnings"] filename = path.name inline = _parse_inline_meta(content) h1_id, h1_title = _extract_h1_title(content) issue_id = h1_id or file_issue_id title = h1_title or filename # Status status_raw = inline.get("status", inline.get("estado", "")) status = _normalize_status(status_raw) if status_raw else "pendiente" # Type type_raw = inline.get("type", inline.get("tipo", "")) type_ = _normalize_type(type_raw) if type_raw else "" if not type_: type_ = _infer_type(filename) warnings.append(f"type missing/unknown ({type_raw!r}), inferred: {type_}") # Domain domain_raw = inline.get("domain", inline.get("dominio", "")) if domain_raw and domain_raw not in ("—", "-"): domain = [d.strip() for d in re.split(r"[,;]", domain_raw) if d.strip()] else: domain = _infer_domain(filename) result["domain_inferred"] = domain if not domain: warnings.append("domain could not be inferred from filename") # Scope scope_raw = inline.get("scope", inline.get("alcance", "")) scope = scope_raw if scope_raw and scope_raw not in ("—", "-") else _infer_scope(filename, type_) result["scope_inferred"] = scope # Priority priority_raw = inline.get("priority", inline.get("prioridad", "")) priority = _normalize_priority(priority_raw) if priority_raw else "" if not priority: priority = _infer_priority_from_mtime(path) warnings.append(f"priority missing, inferred from mtime: {priority}") # Depends / Blocks / Related depends = _parse_issue_ids(inline.get("depends", inline.get("depends_on", inline.get("depende", "")))) blocks = _parse_issue_ids(inline.get("blocks", inline.get("bloquea", ""))) related = _parse_issue_ids(inline.get("related", inline.get("relacionado", inline.get("relacionados", "")))) # Created date created_raw = inline.get("created", inline.get("fecha", "")) created = created_raw.strip() if created_raw and re.match(r"\d{4}-\d{2}-\d{2}", created_raw) else TODAY new_fm = _build_frontmatter( issue_id=issue_id, title=title, status=status, type_=type_, domain=domain, scope=scope, priority=priority, depends=depends, blocks=blocks, related=related, created=created, updated=TODAY, tags=[], ) new_content = new_fm + content if not dry_run: _backup_file(path, backup_dir) path.write_text(new_content, encoding="utf-8") def _backup_file(path: Path, backup_dir: Path | None) -> None: if backup_dir is None: return backup_dir.mkdir(parents=True, exist_ok=True) dest = backup_dir / path.name if not dest.exists(): shutil.copy2(path, dest) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def migrate_issues_frontmatter( issues_dir: str, backup_dir: str = "", dry_run: bool = False, ) -> dict[str, Any]: """Migrate dev/issues/*.md from inline metadata to canonical YAML frontmatter. Args: issues_dir: Path to the issues directory (e.g. "dev/issues"). backup_dir: Where to copy originals before writing. Defaults to /.backup_pre_0100/. Pass "" for default. dry_run: If True, compute changes but do not write files. Returns: { "migrated": N, "merged": N, "skipped": N, "warnings": [...], "files": [{"path": ..., "action": ..., "domain_inferred": ..., "scope_inferred": ..., "warnings": [...]}, ...] } """ issues_path = Path(issues_dir).expanduser() if not issues_path.is_dir(): raise FileNotFoundError(f"issues_dir not found: {issues_path}") # Determine backup directory if dry_run: bk_path = None elif backup_dir: bk_path = Path(backup_dir).expanduser() else: bk_path = issues_path / ".backup_pre_0100" # Collect all .md files from issues_dir and issues_dir/completed/ all_files: list[Path] = [] for md in sorted(issues_path.glob("*.md")): if md.name in _SKIP_NAMES: continue all_files.append(md) completed_dir = issues_path / "completed" if completed_dir.is_dir(): for md in sorted(completed_dir.glob("*.md")): if md.name in _SKIP_NAMES: continue all_files.append(md) results: list[FileResult] = [] all_warnings: list[str] = [] for path in all_files: try: r = _process_file(path, bk_path, dry_run) except Exception as e: r = { "path": str(path), "action": "warning", "domain_inferred": [], "scope_inferred": "", "warnings": [f"ERROR: {e}"], } if r["warnings"]: for w in r["warnings"]: all_warnings.append(f"{path.name}: {w}") results.append(r) migrated = sum(1 for r in results if r["action"] == "migrated") merged = sum(1 for r in results if r["action"] == "merged") skipped = sum(1 for r in results if r["action"] == "skipped") warnings_count = sum(1 for r in results if r["action"] == "warning") # Summary table prefix = "[DRY RUN] " if dry_run else "" print(f"\n{prefix}migrate_issues_frontmatter summary") print(f" issues_dir : {issues_path}") print(f" files found: {len(results)}") print(f" migrated : {migrated}") print(f" merged : {merged}") print(f" skipped : {skipped}") print(f" warnings : {len(all_warnings)}") if all_warnings: print("\nWarnings:") for w in all_warnings: print(f" - {w}") return { "migrated": migrated, "merged": merged, "skipped": skipped, "warnings": all_warnings, "files": results, } # --------------------------------------------------------------------------- # CLI entry # --------------------------------------------------------------------------- if __name__ == "__main__": issues_dir_arg = sys.argv[1] if len(sys.argv) > 1 else "dev/issues" dry = "--dry-run" in sys.argv result = migrate_issues_frontmatter(issues_dir_arg, dry_run=dry) print(json.dumps(result, ensure_ascii=False, indent=2))