Files
fn_registry/python/functions/infra/llm_propose_scraping_schema.py
T
egutierrez a03675113a chore: auto-commit (286 archivos)
- .claude/agents/fn-orquestador/SKILL.md
- .claude/commands/fn_claude.md
- .claude/rules/INDEX.md
- .claude/rules/cpp_apps.md
- .claude/rules/ids_naming.md
- CHANGELOG.md
- apps/dag_engine/README.md
- apps/dag_engine/api.go
- apps/dag_engine/dags_migrated/example.yaml
- apps/dag_engine/dags_migrated/example_lineage_tracking.yaml
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:33:22 +02:00

103 lines
3.3 KiB
Python

"""Orquesta trim_ax_tree -> chunk_ax_tree -> N llamadas claude_cli_prompt -> merge schema."""
import json
import re
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from core.trim_ax_tree import trim_ax_tree
from core.chunk_ax_tree import chunk_ax_tree
from infra.claude_cli_prompt import claude_cli_prompt
def _parse_json_response(text: str) -> dict:
"""Extrae JSON de la respuesta de Claude, tolerante a fenced code blocks."""
# Intentar fenced ```json ... ```
m = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if m:
return json.loads(m.group(1))
# Intentar JSON directo
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
return json.loads(m.group(0))
raise ValueError(f"No se encontro JSON valido en respuesta: {text[:200]}")
def _build_prompt(url: str, chunk_json: str) -> str:
return (
f"Analiza este accessibility tree de la pagina {url}.\n"
"Identifica campos de datos extraibles (tablas, listas, valores estructurados).\n"
"Para cada campo propone:\n"
" - field (snake_case)\n"
" - selector CSS robusto que se pueda usar con document.querySelector\n"
" - sample_value (valor visible representativo)\n"
" - type (string|int|float|bool|date)\n"
" - source_role (role del AXNode origen)\n"
"\n"
'Devuelve JSON valido SIN explicacion:\n'
'{"schema": [...], "notes": "..."}\n'
"\n"
"AX tree:\n"
f"{chunk_json}"
)
def llm_propose_scraping_schema(
url: str,
ax_tree: list,
max_chunks: int = 5,
max_chars_per_chunk: int = 25000,
) -> dict:
"""Orquesta: trim_ax_tree -> chunk_ax_tree -> N llamadas claude_cli_prompt -> merge.
Args:
url: URL de la pagina (se incluye en el prompt para contexto).
ax_tree: AX tree como lista de dicts obtenida via CDP.
max_chunks: Maximo de chunks a procesar (trunca el resto).
max_chars_per_chunk: Caracteres maximos por chunk antes de pasar a Claude.
Returns:
{schema: [{field, selector, sample_value, type, source_role}],
notes: str,
chunks_processed: int,
truncated: bool}
"""
trimmed = trim_ax_tree(ax_tree)
chunks = chunk_ax_tree(trimmed, max_chars=max_chars_per_chunk)
truncated = len(chunks) > max_chunks
chunks = chunks[:max_chunks]
merged_schema: list = []
seen_fields: set = set()
notes_parts: list = []
for chunk in chunks:
chunk_json = json.dumps(chunk, ensure_ascii=False)
prompt = _build_prompt(url, chunk_json)
try:
response = claude_cli_prompt(prompt, timeout_s=60)
parsed = _parse_json_response(response)
except Exception as e:
notes_parts.append(f"[chunk error: {e}]")
continue
for item in parsed.get("schema", []):
field = item.get("field", "")
if field and field not in seen_fields:
seen_fields.add(field)
merged_schema.append(item)
note = parsed.get("notes", "")
if note:
notes_parts.append(note)
return {
"schema": merged_schema,
"notes": " | ".join(notes_parts),
"chunks_processed": len(chunks),
"truncated": truncated,
}