"""comfyui_replicate_civitai_oneshot — replica una imagen de Civitai en una llamada. "Te paso un link de Civitai: entro, observo cómo lo hicieron, y construyo un workflow que lo replique." Dado el id/URL de una imagen de Civitai (o un `modelVersionId`, o directamente una URL/dict de workflow ComfyUI), el pipeline: 1. OBSERVA los detalles de generación con `comfyui_fetch_civitai_image_meta` (prompt, negativo, modelo, sampler, steps, cfg, seed, recursos) vía los endpoints tRPC que usa la web de Civitai. 2. TRADUCE la receta a parámetros de ComfyUI con `comfyui_map_a1111_params` (sampler/scheduler, dims, familia del modelo, LoRAs). 3. CONSTRUYE el workflow que la replica: - si la imagen trae un workflow ComfyUI embebido -> se usa TAL CUAL; - si no -> se reconstruye con `comfyui_build_txt2img_workflow` + `comfyui_inject_lora`, sustituyendo el checkpoint original por el más parecido INSTALADO (misma familia) cuando el exacto no está, y descartando los LoRAs ausentes. 4. RESUELVE dependencias y GENERA con `comfyui_run_foreign_workflow_oneshot` (resolve_deps -> submit -> wait -> fetch). 5. JUZGA la réplica con `comfyui_judge_image` contra el prompt extraído. NO baja modelos a ciegas: lo que la receta pide y no tenemos se reporta en `missing_models` con la sustitución aplicada (el modelo más parecido instalado), nunca se descarga. Respeta la política SFW: si la imagen es NSFW, devuelve `ok=False` sin generar. El parecido será aproximado cuando falte el checkpoint/LoRA exacto (se reconstruye con el más parecido) — eso es esperado y queda documentado en `missing_models`. Pipeline impuro: red (HTTP a Civitai + ComfyUI) + escritura en disco + subprocess (jueces). Solo stdlib salvo las funciones del registry que compone. """ from __future__ import annotations import os import re import sys _FUNCTIONS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _FUNCTIONS_ROOT not in sys.path: sys.path.insert(0, _FUNCTIONS_ROOT) from ml.comfyui_build_txt2img_workflow import comfyui_build_txt2img_workflow from ml.comfyui_fetch_civitai_image_meta import comfyui_fetch_civitai_image_meta from ml.comfyui_inject_lora import comfyui_inject_lora from ml.comfyui_judge_image import comfyui_judge_image from ml.comfyui_map_a1111_params import comfyui_map_a1111_params from ml.comfyui_object_info import comfyui_object_info from ml.comfyui_search_civitai_images import comfyui_search_civitai_images from pipelines.comfyui_run_foreign_workflow_oneshot import comfyui_run_foreign_workflow_oneshot # Defaults de generación por familia cuando la meta no los aporta. _FAMILY_DEFAULTS = { "sd15": {"width": 512, "height": 768, "steps": 25, "cfg": 7.0}, "sdxl": {"width": 1024, "height": 1024, "steps": 30, "cfg": 7.0}, "flux": {"width": 1024, "height": 1024, "steps": 30, "cfg": 7.0}, "unknown": {"width": 768, "height": 768, "steps": 25, "cfg": 7.0}, } # Checkpoints que NO sirven para txt2img de imagen (video / 3D / mallas). _NON_IMAGE_CKPT = ("video", "svd", "zero123", "hunyuan", "ltx", "3d") _CIVITAI_IMAGES_RE = re.compile(r"civitai\.com/images/(\d+)|^/?images/(\d+)$") _MODEL_VERSION_RE = re.compile(r"modelVersionId[=/](\d+)|model-versions/(\d+)|modelVersions/(\d+)") _WORKFLOW_EXTS = (".png", ".json", ".webp") def _classify_input(url_or_id): """Clasifica la entrada -> ('image'|'model_version'|'workflow_source', ref).""" if isinstance(url_or_id, dict): return "workflow_source", url_or_id if isinstance(url_or_id, bool): return "error", "entrada booleana no válida" if isinstance(url_or_id, int): return "image", url_or_id if not isinstance(url_or_id, str): return "error", f"entrada no soportada: {type(url_or_id).__name__}" s = url_or_id.strip() if s.isdigit(): return "image", int(s) m = _CIVITAI_IMAGES_RE.search(s) if m: return "image", int(next(g for g in m.groups() if g)) mv = _MODEL_VERSION_RE.search(s) if mv: return "model_version", int(next(g for g in mv.groups() if g)) # URL/archivo de workflow (no es una página de imagen Civitai). low = s.lower().split("?")[0] if low.endswith(_WORKFLOW_EXTS) or os.path.exists(s): return "workflow_source", s if "civitai.com/models/" in low: return "error", ("una URL de página de modelo no apunta a una imagen " "concreta; pásame un link civitai.com/images/ o un " "modelVersionId") # Cualquier otra URL: dejar que el ejecutor de workflows foráneos lo intente. if s.startswith(("http://", "https://")): return "workflow_source", s return "error", f"no se reconoce la entrada {url_or_id!r}" def _server_models(server): """(checkpoints, loras) que ve el servidor; ([], []) si no responde.""" ckpts, loras = [], [] try: ci = comfyui_object_info(server, "CheckpointLoaderSimple") ckpts = ci["CheckpointLoaderSimple"]["input"]["required"]["ckpt_name"][0] except Exception: # noqa: BLE001 — server caído / nodo ausente ckpts = [] try: li = comfyui_object_info(server, "LoraLoader") loras = li["LoraLoader"]["input"]["required"]["lora_name"][0] except Exception: # noqa: BLE001 loras = [] return list(ckpts), list(loras) def _norm(name): """Normaliza un nombre de modelo para comparar (sin ext, sin separadores).""" base = os.path.splitext(str(name))[0].lower() return re.sub(r"[^a-z0-9]", "", base) def _find_installed(name, installed): """Devuelve el filename instalado que casa con `name` (exacto/normalizado), o None.""" if not name: return None target = _norm(name) for cand in installed: if _norm(cand) == target: return cand # Match laxo: el nombre normalizado de la receta contenido en el del archivo. for cand in installed: nc = _norm(cand) if target and (target in nc or nc in target): return cand return None def _pick_checkpoint(installed, family, hint): """Elige el checkpoint instalado más parecido. Devuelve (filename, exact:bool).""" candidates = [c for c in installed if not any(k in c.lower() for k in _NON_IMAGE_CKPT)] if not candidates: candidates = list(installed) if not candidates: return None, False exact = _find_installed(hint, candidates) if exact: return exact, True if family in ("sdxl", "flux"): xl = [c for c in candidates if "xl" in c.lower()] if xl: return xl[0], False if family == "sd15": non_xl = [c for c in candidates if "xl" not in c.lower()] if non_xl: return non_xl[0], False # Familia desconocida o sin candidato de la familia: preferir un SD1.5 versátil. non_xl = [c for c in candidates if "xl" not in c.lower()] return (non_xl[0] if non_xl else candidates[0]), False def _reconstruct_workflow(params, server): """Reconstruye un workflow txt2img desde la receta. Devuelve (workflow, missing).""" family = params["family"] defaults = _FAMILY_DEFAULTS.get(family, _FAMILY_DEFAULTS["unknown"]) installed_ckpts, installed_loras = _server_models(server) if not installed_ckpts: raise _ReplicateError( "el servidor ComfyUI no devolvió checkpoints (¿vivo?); no se puede " "elegir un modelo para la réplica") ckpt, exact = _pick_checkpoint(installed_ckpts, family, params["checkpoint_hint"]) if not ckpt: raise _ReplicateError("no hay ningún checkpoint de imagen instalado en el servidor") missing = [] if not exact and params["checkpoint_hint"]: missing.append({ "kind": "checkpoint", "name": params["checkpoint_hint"], "base_model_family": family, "substituted_with": ckpt, "note": "checkpoint exacto no instalado; réplica con el más parecido", }) width = params["width"] or defaults["width"] height = params["height"] or defaults["height"] steps = params["steps"] or defaults["steps"] cfg = params["cfg"] if params["cfg"] is not None else defaults["cfg"] seed = params["seed"] if params["seed"] is not None else 0 workflow = comfyui_build_txt2img_workflow( ckpt, params["positive"], params["negative"], steps=steps, cfg=cfg, width=width, height=height, seed=seed, sampler_name=params["sampler_name"], scheduler=params["scheduler"], filename_prefix="civitai_replica", ) for lora in params["loras"]: match = _find_installed(lora["name"], installed_loras) if match: workflow = comfyui_inject_lora( workflow, match, strength_model=lora["weight"], strength_clip=lora["weight"], ) else: missing.append({ "kind": "lora", "name": lora["name"], "substituted_with": None, "note": "LoRA no instalado; omitido de la réplica (no se baja a ciegas)", }) return workflow, missing def _extract_positive_from_workflow(workflow): """Saca el texto positivo más largo de los CLIPTextEncode (para juzgar embebidos).""" texts = [] for node in (workflow or {}).values(): if isinstance(node, dict) and "CLIPTextEncode" in str(node.get("class_type", "")): t = node.get("inputs", {}).get("text") if isinstance(t, str) and t.strip(): texts.append(t.strip()) return max(texts, key=len) if texts else "" def comfyui_replicate_civitai_oneshot( url_or_id, *, server: str = "127.0.0.1:8188", dest: str | None = None, judge: bool = True, token: str | None = None, wait_timeout: float = 600.0, ): """Replica una imagen de Civitai (o un workflow ajeno) en una sola llamada. Args: url_or_id: link/URL de una imagen Civitai (`civitai.com/images/`), su id numérico (int o str), un `modelVersionId` (se replica su primera imagen SFW), o directamente una URL/ruta/dict de un workflow ComfyUI (PNG con workflow embebido, .json, o dict en API format). server: host:port del servidor ComfyUI (sin esquema). keyword-only. dest: directorio donde guardar la réplica. None = `~/ComfyUI/civitai_replicas`. keyword-only. judge: si True, juzga la réplica con el panel `comfyui_judge_image` contra el prompt extraído. keyword-only. token: token Civitai (Bearer). None lo resuelve de `pass civitai/api-token`. keyword-only. wait_timeout: segundos máximos esperando a que ComfyUI termine. keyword-only. Returns: dict {ok, source, replica_image_path, prompt_id, judge, missing_models, has_workflow, error}: - source: receta observada {image_id, page_url, prompt, negative, model, family, sampler_name, scheduler, steps, cfg, width, height, seed, loras, process, has_workflow_embedded}. - replica_image_path: ruta local de la imagen réplica generada. - missing_models: modelos que la receta pedía y no teníamos, con la sustitución aplicada (NUNCA se descargan a ciegas). - judge: dict del panel de jueces (None si judge=False o no se generó). - has_workflow: True si se replicó un workflow embebido tal cual. ok=False con error claro si: el link es inválido/privado/sin meta, la imagen es NSFW (se respeta SFW), el server no responde, o la generación falla. """ kind, ref = _classify_input(url_or_id) if kind == "error": return _err(ref) # Caso A: la entrada YA es un workflow (PNG embebido / .json / dict / URL). if kind == "workflow_source": return _replicate_workflow_source(ref, server, dest, judge, token, wait_timeout) # Caso B: modelVersionId -> resolver a la primera imagen SFW de esa versión. if kind == "model_version": sr = comfyui_search_civitai_images(model_version_id=ref, nsfw="None", limit=10, token=token) if not sr.get("ok") or not sr.get("items"): return _err(f"no se hallaron imágenes SFW para modelVersionId {ref}: " f"{sr.get('error') or '0 resultados'}") ref = sr["items"][0]["id"] # Caso C (principal): id/URL de imagen Civitai. src = comfyui_fetch_civitai_image_meta(ref, token=token) if not src.get("ok"): return _err(f"no se pudieron observar los detalles de la imagen: {src.get('error')}") if src.get("nsfw"): return _err( f"la imagen {src.get('image_id')} es NSFW (nivel {src.get('nsfw_level')!r}); " "se respeta la política SFW y NO se replica.", source=_source_from_meta(src, params=None)) params = comfyui_map_a1111_params(src["meta"], src.get("resources")) source = _source_from_meta(src, params) # Construcción del workflow: embebido tal cual, o reconstruido desde la receta. has_workflow = bool(src.get("comfy_workflow")) try: if has_workflow: workflow = src["comfy_workflow"] missing = [] else: workflow, missing = _reconstruct_workflow(params, server) except _ReplicateError as exc: return _err(str(exc), source=source) out_dir = os.path.expanduser(dest or "~/ComfyUI/civitai_replicas") run = comfyui_run_foreign_workflow_oneshot( workflow, server=server, dest=out_dir, output_kind="image", wait_timeout=wait_timeout, civitai_token=token, ) if not run.get("ok"): # run_foreign puede reportar deps faltantes propias (p.ej. un nodo custom). extra_missing = run.get("missing") or [] return _err(f"la generación de la réplica falló: {run.get('error')}", source=source, missing_models=missing + extra_missing, has_workflow=has_workflow) replica = run["outputs"][0] judge_res = None if judge: try: judge_res = comfyui_judge_image(replica, params["positive"], server=server) except Exception as exc: # noqa: BLE001 — el juez no debe tumbar la réplica judge_res = {"ok": False, "error": f"juez no disponible: {exc}"} return { "ok": True, "source": source, "replica_image_path": replica, "prompt_id": run.get("prompt_id", ""), "judge": judge_res, "missing_models": missing, "has_workflow": has_workflow, "error": "", } def _replicate_workflow_source(source_ref, server, dest, judge, token, wait_timeout): """Replica un workflow ya embebido (PNG/.json/dict/URL) ejecutándolo tal cual.""" out_dir = os.path.expanduser(dest or "~/ComfyUI/civitai_replicas") run = comfyui_run_foreign_workflow_oneshot( source_ref, server=server, dest=out_dir, output_kind="image", wait_timeout=wait_timeout, civitai_token=token, ) if not run.get("ok"): return _err(f"no se pudo ejecutar el workflow embebido: {run.get('error')}", missing_models=run.get("missing") or [], has_workflow=True) replica = run["outputs"][0] positive = "" if isinstance(source_ref, dict): positive = _extract_positive_from_workflow(source_ref) judge_res = None if judge: try: judge_res = comfyui_judge_image(replica, positive, server=server) except Exception as exc: # noqa: BLE001 judge_res = {"ok": False, "error": f"juez no disponible: {exc}"} return { "ok": True, "source": {"prompt": positive, "has_workflow_embedded": True, "source_type": run.get("source_type", "")}, "replica_image_path": replica, "prompt_id": run.get("prompt_id", ""), "judge": judge_res, "missing_models": [], "has_workflow": True, "error": "", } def _source_from_meta(src, params): """Construye el sub-dict `source` legible de la salida.""" meta = src.get("meta") or {} base = { "image_id": src.get("image_id"), "page_url": src.get("page_url", ""), "process": src.get("process", ""), "has_workflow_embedded": bool(src.get("comfy_workflow")), "model": meta.get("Model") or meta.get("model"), } if params is not None: base.update({ "prompt": params["positive"], "negative": params["negative"], "family": params["family"], "sampler_name": params["sampler_name"], "scheduler": params["scheduler"], "steps": params["steps"], "cfg": params["cfg"], "width": params["width"], "height": params["height"], "seed": params["seed"], "loras": [lo["name"] for lo in params["loras"]], }) return base def _err(msg, **extra): base = {"ok": False, "source": {}, "replica_image_path": "", "prompt_id": "", "judge": None, "missing_models": [], "has_workflow": False, "error": msg} base.update(extra) return base class _ReplicateError(Exception): """Error interno de reconstrucción, traducido a {ok: False, error}.""" if __name__ == "__main__": import json ref = sys.argv[1] if len(sys.argv) > 1 else "https://civitai.com/images/23526611" out = comfyui_replicate_civitai_oneshot(ref, judge=False) print(json.dumps({k: v for k, v in out.items() if k != "judge"}, ensure_ascii=False, indent=2))