Files
fn_registry/python/functions/pipelines/migrate_issues_frontmatter.py

647 lines
20 KiB
Python

#!/usr/bin/env python3
"""migrate_issues_frontmatter — migrate dev/issues/*.md from inline **Key:** value
metadata to canonical YAML frontmatter (issue 0100).
Idempotent: files that already have id + domain + scope in frontmatter are skipped.
Files with partial frontmatter get missing keys merged in without overwriting existing ones.
"""
from __future__ import annotations
import json
import os
import re
import shutil
import sys
from datetime import date
from pathlib import Path
from typing import Any
# ---------------------------------------------------------------------------
# Registry path setup
# ---------------------------------------------------------------------------
def _find_registry_root() -> Path:
here = Path(__file__).resolve()
for parent in (here, *here.parents):
if (parent / "registry.db").exists():
return parent
return Path.cwd()
_REGISTRY_ROOT = _find_registry_root()
sys.path.insert(0, str(_REGISTRY_ROOT / "python" / "functions"))
from core.core import extract_frontmatter # noqa: E402
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
TODAY = date.today().isoformat()
_SKIP_NAMES = {"README.md", "template.md", "README", "template"}
_STATUS_ALIASES = {
"pendiente": "pendiente",
"pending": "pendiente",
"in-progress": "in-progress",
"en-progreso": "in-progress",
"en_progreso": "in-progress",
"bloqueado": "bloqueado",
"blocked": "bloqueado",
"completado": "completado",
"done": "completado",
"completed": "completado",
"deferred": "deferred",
"diferido": "deferred",
"closed": "completado",
}
_TYPE_ALIASES = {
"app": "app",
"feature": "feature",
"bugfix": "bugfix",
"bug": "bugfix",
"refactor": "refactor",
"chore": "chore",
"docs": "docs",
"doc": "docs",
"spike": "spike",
"epic": "epic",
"infra": "infra",
"planning": "planning",
}
_PRIORITY_ALIASES = {
"alta": "alta",
"high": "alta",
"media": "media",
"medium": "media",
"baja": "baja",
"low": "baja",
}
# ---------------------------------------------------------------------------
# Heuristics
# ---------------------------------------------------------------------------
def _infer_domain(filename: str) -> list[str]:
"""Return list of canonical domain tags based on filename heuristics."""
f = filename.lower()
domains: list[str] = []
if re.search(r"^cpp-|(-cpp-)|imgui|glfw|glsl|altsnap|sizemove", f):
domains.append("cpp-stack")
if re.search(r"^kanban-|kanban", f):
domains.append("kanban")
if re.search(r"^trading-|0088[a-z]?-trading", f):
domains.append("trading")
if re.search(r"^gamedev-|0072[a-z]?-gamedev", f):
domains.append("gamedev")
if re.search(r"osint|odr-", f):
domains.append("osint")
if re.search(r"metabase|bigquery|datafactory|data-factory|navegator|cdp-", f):
domains.append("data-ingest")
if re.search(r"notify|telegram|matrix", f):
domains.append("notify")
if re.search(r"imagegen|sd-cpp|stable-diffusion", f):
domains.append("imagegen")
if re.search(r"dag-engine|dagu", f):
if "cpp" in f or "imgui" in f:
domains.append("cpp-stack")
else:
domains.append("data-ingest")
if re.search(r"audit-|registry-first|uses.functions|nested-app-md", f):
domains.append("registry-quality")
if re.search(r"autonomous|e2e-validation|registry-call|delegation|capability|call-monitor|mcp-", f):
domains.append("meta")
if re.search(r"deploy|vps", f):
domains.append("deploy")
if re.search(r"fn-run|gradle.run|(?<![a-z])dev-(?!_console)", f):
domains.append("dev-ux")
if re.search(r"browser|chrome|cdp-", f):
domains.append("browser")
if re.search(r"datahub|app-hub|launcher|app-locations", f):
domains.append("apps-infra")
if re.search(r"frontend|react", f):
domains.append("frontend")
if re.search(r"0100|frontmatter|migrate-issues|extract-|audit-", f):
if "registry-quality" not in domains:
domains.append("registry-quality")
# deduplicate while preserving order
seen: set[str] = set()
result: list[str] = []
for d in domains:
if d not in seen:
seen.add(d)
result.append(d)
return result
def _infer_scope(filename: str, inline_type: str) -> str:
f = filename.lower()
if re.search(r"roadmap", f):
return "cross-stack"
if re.search(r"extract-|migrate-|audit-", f):
return "registry-only"
if inline_type == "app" or re.search(r"-app[-.]|app-", f):
return "app-scoped"
return "multi-app"
def _infer_type(filename: str) -> str:
f = filename.lower()
if re.search(r"roadmap", f):
return "epic"
if re.search(r"audit-|cleanup-", f):
return "chore"
if re.search(r"fix-|bugfix-|bug-", f):
return "bugfix"
return "feature"
def _infer_priority_from_mtime(path: Path) -> str:
mtime = path.stat().st_mtime
mtime_date = date.fromtimestamp(mtime)
delta = (date.today() - mtime_date).days
if delta <= 14:
return "alta"
return "media"
# ---------------------------------------------------------------------------
# Inline parser
# ---------------------------------------------------------------------------
_H1_RE = re.compile(r"^#\s+(.+)$", re.MULTILINE)
_BOLD_KEY_RE = re.compile(r"^\*\*([A-Za-z]+)\*\*:\s*(.*)$")
_TABLE_META_RE = re.compile(r"^\|\s*\*\*([A-Za-z]+)\*\*\s*\|\s*(.+?)\s*\|")
# Match "NNNN" or "NNNN — rest"
_ID_FROM_H1_RE = re.compile(r"^(\d{4}[a-z]?)\s*[—-]\s*(.+)$")
_ISSUE_REF_RE = re.compile(r"\b(\d{4}[a-z]?)\b")
def _extract_h1_title(content: str) -> tuple[str, str]:
"""Return (issue_id, clean_title) from H1. Falls back to ('', content first line)."""
m = _H1_RE.search(content)
if not m:
return "", ""
h1 = m.group(1).strip()
id_m = _ID_FROM_H1_RE.match(h1)
if id_m:
return id_m.group(1), id_m.group(2).strip()
return "", h1
def _parse_inline_meta(content: str) -> dict[str, str]:
"""Parse **Key:** value lines and table metadata from first ~40 lines."""
meta: dict[str, str] = {}
lines = content.splitlines()[:40]
for line in lines:
# Bold inline: **Status:** value
bm = _BOLD_KEY_RE.match(line.strip())
if bm:
key = bm.group(1).lower()
val = bm.group(2).strip()
meta[key] = val
continue
# Table row: | **Estado** | value |
tm = _TABLE_META_RE.match(line.strip())
if tm:
key = tm.group(1).lower()
val = tm.group(2).strip()
# Strip markdown bold from value
val = re.sub(r"\*\*(.+?)\*\*", r"\1", val)
meta[key] = val
return meta
def _parse_issue_ids(raw: str) -> list[str]:
"""Extract issue IDs (NNNN or NNNNa) from a raw string like '0096, 0097 — DONE'."""
if not raw or raw.strip() in ("", "-", "", "ninguna", "none"):
return []
return _ISSUE_REF_RE.findall(raw)
def _normalize_status(raw: str) -> str:
raw = raw.lower().strip()
# extract first word-token
token = re.split(r"[\s,;—-]", raw)[0]
return _STATUS_ALIASES.get(token, _STATUS_ALIASES.get(raw, "pendiente"))
def _normalize_type(raw: str) -> str:
raw = raw.lower().strip()
# strip trailing annotation like "feature — apps/kanban/"
token = re.split(r"[\s,;—-]", raw)[0]
return _TYPE_ALIASES.get(token, "")
def _normalize_priority(raw: str) -> str:
raw = raw.lower().strip()
token = re.split(r"[\s,;]", raw)[0]
return _PRIORITY_ALIASES.get(token, "")
# ---------------------------------------------------------------------------
# Frontmatter builder
# ---------------------------------------------------------------------------
def _build_frontmatter(
issue_id: str,
title: str,
status: str,
type_: str,
domain: list[str],
scope: str,
priority: str,
depends: list[str],
blocks: list[str],
related: list[str],
created: str,
updated: str,
tags: list[str],
) -> str:
"""Render canonical YAML frontmatter block as a string."""
lines: list[str] = ["---"]
lines.append(f"id: \"{issue_id}\"")
lines.append(f"title: \"{title}\"")
lines.append(f"status: {status}")
lines.append(f"type: {type_}")
if domain:
lines.append("domain:")
for d in domain:
lines.append(f" - {d}")
else:
lines.append("domain: []")
lines.append(f"scope: {scope}")
lines.append(f"priority: {priority}")
if depends:
lines.append("depends:")
for d in depends:
lines.append(f" - \"{d}\"")
else:
lines.append("depends: []")
if blocks:
lines.append("blocks:")
for b in blocks:
lines.append(f" - \"{b}\"")
else:
lines.append("blocks: []")
if related:
lines.append("related:")
for r in related:
lines.append(f" - \"{r}\"")
else:
lines.append("related: []")
lines.append(f"created: {created}")
lines.append(f"updated: {updated}")
if tags:
lines.append("tags:")
for t in tags:
lines.append(f" - {t}")
else:
lines.append("tags: []")
lines.append("---")
return "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# Per-file processing
# ---------------------------------------------------------------------------
FileResult = dict[str, Any]
def _process_file(
path: Path,
backup_dir: Path | None,
dry_run: bool,
) -> FileResult:
result: FileResult = {
"path": str(path),
"action": "skipped",
"domain_inferred": [],
"scope_inferred": "",
"warnings": [],
}
content = path.read_text(encoding="utf-8")
body_without_fm, existing_fm = extract_frontmatter(content)
# Determine issue_id from filename
stem = path.stem # e.g. "0099-datahub-app-launcher"
id_from_file_m = re.match(r"^(\d{4}[a-z]?)", stem)
file_issue_id = id_from_file_m.group(1) if id_from_file_m else stem
# Check if already fully migrated
if existing_fm and isinstance(existing_fm, dict):
fm = existing_fm
has_id = "id" in fm
has_domain = "domain" in fm
has_scope = "scope" in fm
if has_id and has_domain and has_scope:
result["action"] = "skipped"
return result
# Partial frontmatter — merge missing keys
result["action"] = "merged"
_fill_missing_keys(path, fm, body_without_fm, file_issue_id, result, backup_dir, dry_run)
return result
# No frontmatter: parse inline and build canonical
result["action"] = "migrated"
_migrate_from_inline(path, content, file_issue_id, result, backup_dir, dry_run)
return result
def _fill_missing_keys(
path: Path,
fm: dict,
body: str,
file_issue_id: str,
result: FileResult,
backup_dir: Path | None,
dry_run: bool,
) -> None:
"""Merge missing canonical keys into an existing partial frontmatter."""
warnings = result["warnings"]
filename = path.name
issue_id = str(fm.get("id", file_issue_id))
title = str(fm.get("title", _extract_h1_title(body)[1] or filename))
status = _normalize_status(str(fm.get("status", fm.get("estado", "pendiente"))))
type_ = _normalize_type(str(fm.get("type", fm.get("tipo", ""))))
if not type_:
type_ = _infer_type(filename)
if not type_:
warnings.append(f"type missing, inferred: {type_ or '(empty)'}")
domain_raw = fm.get("domain", fm.get("dominio", []))
if isinstance(domain_raw, str):
domain = [domain_raw] if domain_raw else []
elif isinstance(domain_raw, list):
domain = domain_raw
else:
domain = []
if not domain:
domain = _infer_domain(filename)
result["domain_inferred"] = domain
if not domain:
warnings.append("domain could not be inferred")
scope = str(fm.get("scope", ""))
if not scope:
scope = _infer_scope(filename, type_)
result["scope_inferred"] = scope
priority = _normalize_priority(str(fm.get("priority", fm.get("priority", ""))))
if not priority:
priority = _infer_priority_from_mtime(path)
warnings.append(f"priority missing, inferred from mtime: {priority}")
depends = _coerce_id_list(fm.get("depends", fm.get("depends_on", [])))
blocks = _coerce_id_list(fm.get("blocks", []))
related = _coerce_id_list(fm.get("related", []))
created = str(fm.get("created", TODAY))
tags_raw = fm.get("tags", [])
tags = tags_raw if isinstance(tags_raw, list) else []
new_fm = _build_frontmatter(
issue_id=issue_id,
title=title,
status=status,
type_=type_,
domain=domain,
scope=scope,
priority=priority,
depends=depends,
blocks=blocks,
related=related,
created=created,
updated=TODAY,
tags=tags,
)
new_content = new_fm + body
if not dry_run:
_backup_file(path, backup_dir)
path.write_text(new_content, encoding="utf-8")
def _coerce_id_list(val: Any) -> list[str]:
if isinstance(val, list):
return [str(v) for v in val]
if isinstance(val, str):
return _parse_issue_ids(val)
return []
def _migrate_from_inline(
path: Path,
content: str,
file_issue_id: str,
result: FileResult,
backup_dir: Path | None,
dry_run: bool,
) -> None:
"""Parse inline metadata and write YAML frontmatter."""
warnings = result["warnings"]
filename = path.name
inline = _parse_inline_meta(content)
h1_id, h1_title = _extract_h1_title(content)
issue_id = h1_id or file_issue_id
title = h1_title or filename
# Status
status_raw = inline.get("status", inline.get("estado", ""))
status = _normalize_status(status_raw) if status_raw else "pendiente"
# Type
type_raw = inline.get("type", inline.get("tipo", ""))
type_ = _normalize_type(type_raw) if type_raw else ""
if not type_:
type_ = _infer_type(filename)
warnings.append(f"type missing/unknown ({type_raw!r}), inferred: {type_}")
# Domain
domain_raw = inline.get("domain", inline.get("dominio", ""))
if domain_raw and domain_raw not in ("", "-"):
domain = [d.strip() for d in re.split(r"[,;]", domain_raw) if d.strip()]
else:
domain = _infer_domain(filename)
result["domain_inferred"] = domain
if not domain:
warnings.append("domain could not be inferred from filename")
# Scope
scope_raw = inline.get("scope", inline.get("alcance", ""))
scope = scope_raw if scope_raw and scope_raw not in ("", "-") else _infer_scope(filename, type_)
result["scope_inferred"] = scope
# Priority
priority_raw = inline.get("priority", inline.get("prioridad", ""))
priority = _normalize_priority(priority_raw) if priority_raw else ""
if not priority:
priority = _infer_priority_from_mtime(path)
warnings.append(f"priority missing, inferred from mtime: {priority}")
# Depends / Blocks / Related
depends = _parse_issue_ids(inline.get("depends", inline.get("depends_on", inline.get("depende", ""))))
blocks = _parse_issue_ids(inline.get("blocks", inline.get("bloquea", "")))
related = _parse_issue_ids(inline.get("related", inline.get("relacionado", inline.get("relacionados", ""))))
# Created date
created_raw = inline.get("created", inline.get("fecha", ""))
created = created_raw.strip() if created_raw and re.match(r"\d{4}-\d{2}-\d{2}", created_raw) else TODAY
new_fm = _build_frontmatter(
issue_id=issue_id,
title=title,
status=status,
type_=type_,
domain=domain,
scope=scope,
priority=priority,
depends=depends,
blocks=blocks,
related=related,
created=created,
updated=TODAY,
tags=[],
)
new_content = new_fm + content
if not dry_run:
_backup_file(path, backup_dir)
path.write_text(new_content, encoding="utf-8")
def _backup_file(path: Path, backup_dir: Path | None) -> None:
if backup_dir is None:
return
backup_dir.mkdir(parents=True, exist_ok=True)
dest = backup_dir / path.name
if not dest.exists():
shutil.copy2(path, dest)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def migrate_issues_frontmatter(
issues_dir: str,
backup_dir: str = "",
dry_run: bool = False,
) -> dict[str, Any]:
"""Migrate dev/issues/*.md from inline metadata to canonical YAML frontmatter.
Args:
issues_dir: Path to the issues directory (e.g. "dev/issues").
backup_dir: Where to copy originals before writing. Defaults to
<issues_dir>/.backup_pre_0100/. Pass "" for default.
dry_run: If True, compute changes but do not write files.
Returns:
{
"migrated": N,
"merged": N,
"skipped": N,
"warnings": [...],
"files": [{"path": ..., "action": ..., "domain_inferred": ..., "scope_inferred": ..., "warnings": [...]}, ...]
}
"""
issues_path = Path(issues_dir).expanduser()
if not issues_path.is_dir():
raise FileNotFoundError(f"issues_dir not found: {issues_path}")
# Determine backup directory
if dry_run:
bk_path = None
elif backup_dir:
bk_path = Path(backup_dir).expanduser()
else:
bk_path = issues_path / ".backup_pre_0100"
# Collect all .md files from issues_dir and issues_dir/completed/
all_files: list[Path] = []
for md in sorted(issues_path.glob("*.md")):
if md.name in _SKIP_NAMES:
continue
all_files.append(md)
completed_dir = issues_path / "completed"
if completed_dir.is_dir():
for md in sorted(completed_dir.glob("*.md")):
if md.name in _SKIP_NAMES:
continue
all_files.append(md)
results: list[FileResult] = []
all_warnings: list[str] = []
for path in all_files:
try:
r = _process_file(path, bk_path, dry_run)
except Exception as e:
r = {
"path": str(path),
"action": "warning",
"domain_inferred": [],
"scope_inferred": "",
"warnings": [f"ERROR: {e}"],
}
if r["warnings"]:
for w in r["warnings"]:
all_warnings.append(f"{path.name}: {w}")
results.append(r)
migrated = sum(1 for r in results if r["action"] == "migrated")
merged = sum(1 for r in results if r["action"] == "merged")
skipped = sum(1 for r in results if r["action"] == "skipped")
warnings_count = sum(1 for r in results if r["action"] == "warning")
# Summary table
prefix = "[DRY RUN] " if dry_run else ""
print(f"\n{prefix}migrate_issues_frontmatter summary")
print(f" issues_dir : {issues_path}")
print(f" files found: {len(results)}")
print(f" migrated : {migrated}")
print(f" merged : {merged}")
print(f" skipped : {skipped}")
print(f" warnings : {len(all_warnings)}")
if all_warnings:
print("\nWarnings:")
for w in all_warnings:
print(f" - {w}")
return {
"migrated": migrated,
"merged": merged,
"skipped": skipped,
"warnings": all_warnings,
"files": results,
}
# ---------------------------------------------------------------------------
# CLI entry
# ---------------------------------------------------------------------------
if __name__ == "__main__":
issues_dir_arg = sys.argv[1] if len(sys.argv) > 1 else "dev/issues"
dry = "--dry-run" in sys.argv
result = migrate_issues_frontmatter(issues_dir_arg, dry_run=dry)
print(json.dumps(result, ensure_ascii=False, indent=2))