diff --git a/docs/capabilities/comfyui.md b/docs/capabilities/comfyui.md index 0cba92dd..9cf90c16 100644 --- a/docs/capabilities/comfyui.md +++ b/docs/capabilities/comfyui.md @@ -99,6 +99,23 @@ de la meta inline. | [comfyui_extract_recipe_from_png_py_ml](../../python/functions/ml/comfyui_extract_recipe_from_png.md) | `extract_recipe_from_png(png_path, *, slug=None, civitai_meta=None, image_url='', nsfw=False) -> dict` | Destila un PNG cosechado en receta de skill candidata (schema `comfyui-skill`, `source='civitai'`, `score_n=0`). Compone `import_workflow_png` + `read_png_metadata` + fallback de prompts/ckpt para flux. Sin workflow → usa `civitai_meta` (degradación honesta). Impura. | | [comfyui_harvest_civitai_skill_oneshot_py_pipelines](../../python/functions/pipelines/comfyui_harvest_civitai_skill_oneshot.md) | `harvest_civitai_skill_oneshot(*, query=None, model_version_id=None, nsfw='None', dest_dir, library_dir='~/ComfyUI/skills_library', comfyui_dir='~/ComfyUI', token=None, ...) -> dict` | **Pipeline** Civitai→skill candidata: search → fetch (segrega NSFW) → extract → save_skill. Itera items hasta uno con receta destilable (2º pase al feed global si filtró por modelo). **NO baja modelos a ciegas**: checkpoint/LoRA ausente → `missing_models`. Impuro. | +### Replicación desde un link de Civitai — dominio `ml` + `pipelines` (issue C5, report 0127) + +"Te paso un link de Civitai: entra, observa cómo lo hicieron, y construye un workflow que lo +replique." Dado el id/URL de una imagen de Civitai → extrae la receta (prompt, modelo, sampler, +LoRAs) → reconstruye el workflow → lo genera y lo juzga. **Gotcha clave**: la API v1 `/images` +devuelve `meta=null`; la receta por id sale de los endpoints **tRPC** `image.getGenerationData` + +`image.get` (los que usa la web). Como casi nunca tendrás el checkpoint/LoRA exacto, se sustituye +por el más parecido **instalado** (misma familia) y lo ausente se reporta en `missing_models` (NUNCA +se descarga a ciegas). El parecido es aproximado cuando falta el modelo exacto — esperado. SFW +estricto: una imagen NSFW devuelve `ok=False` sin generar. + +| ID | Firma corta | Qué hace | +|---|---|---| +| [comfyui_fetch_civitai_image_meta_py_ml](../../python/functions/ml/comfyui_fetch_civitai_image_meta.md) | `fetch_civitai_image_meta(image_ref, *, token=None, timeout=15.0) -> dict` | "Entra al link y observa": resuelve UNA imagen Civitai por id/URL vía tRPC `image.getGenerationData` + `image.get` → `{meta, resources, comfy_workflow, nsfw, ...}`. Donde `search_civitai_images` da `meta=null`, esta sí trae prompt/modelo/sampler. Impura. | +| [comfyui_map_a1111_params_py_ml](../../python/functions/ml/comfyui_map_a1111_params.md) | `map_a1111_params(meta, resources=None) -> dict` | **Pura**: traduce meta A1111/Civitai a params ComfyUI (sampler `DPM++ 2M Karras`→`dpmpp_2m`/`karras`, dims, seed), infiere familia (`sd15`/`sdxl`/`flux`) y extrae LoRAs (de resources y tags `` del prompt). | +| [comfyui_replicate_civitai_oneshot_py_pipelines](../../python/functions/pipelines/comfyui_replicate_civitai_oneshot.md) | `replicate_civitai_oneshot(url_or_id, *, server, dest=None, judge=True, token=None, wait_timeout=600) -> dict` | **Pipeline** link Civitai→réplica: fetch_meta → map_a1111_params → workflow embebido tal cual O reconstruido (build_txt2img + inject_lora, **sustituye checkpoint ausente por el más parecido instalado**, omite LoRAs ausentes) → run_foreign_workflow_oneshot → judge_image. Acepta también `modelVersionId` o un workflow ajeno (PNG/.json/dict). Impuro. | + ### Retoque pro y oneshot — dominio `ml` + `pipelines` (P0, lote report 0093) Builders que envuelven custom-nodes "pro" ya instalados (Impact-Pack, UltimateSDUpscale) y la diff --git a/python/functions/ml/comfyui_fetch_civitai_image_meta.md b/python/functions/ml/comfyui_fetch_civitai_image_meta.md new file mode 100644 index 00000000..87f13b47 --- /dev/null +++ b/python/functions/ml/comfyui_fetch_civitai_image_meta.md @@ -0,0 +1,68 @@ +--- +name: comfyui_fetch_civitai_image_meta +kind: function +lang: py +domain: ml +version: "1.0.0" +purity: impure +signature: "def comfyui_fetch_civitai_image_meta(image_ref, *, token: str | None = None, timeout: float = 15.0) -> dict" +description: "Recupera los detalles de generacion de una imagen de Civitai por su id o URL (civitai.com/images/): prompt, prompt negativo, modelo, sampler, steps, cfg, seed, dimensiones, recursos (checkpoint + LoRAs) y nivel NSFW. Es el paso 'entrar al link y observar como lo hicieron'. Usa los endpoints tRPC image.getGenerationData + image.get que consume la propia web de civitai.com, porque la API v1 publica (GET /api/v1/images) hoy devuelve meta=null y un JPEG recomprimido sin workflow embebido. Si la meta trae un workflow ComfyUI embebido (campo comfy) lo devuelve en API format. NO descarga la imagen ni reconstruye workflow: solo lee. Impura: HTTP a civitai.com + subprocess (pass para el token)." +tags: [comfyui, civitai, replicate, ml, metadata, http, stable-diffusion] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: ["json", "os", "re", "subprocess", "urllib.error", "urllib.parse", "urllib.request"] +params: + - name: image_ref + desc: "Id numerico de la imagen (int o str), o su URL 'https://civitai.com/images/' (con o sin query string). Tambien acepta el path '/images/'." + - name: token + desc: "API token de Civitai (header Authorization Bearer). Si None se resuelve de 'pass civitai/api-token'. No hardcodear. keyword-only." + - name: timeout + desc: "Timeout HTTP en segundos por peticion tRPC. keyword-only." +output: "dict {ok, image_id, meta, resources, process, comfy_workflow, width, height, nsfw, nsfw_level, post_id, url_uuid, page_url, error}. meta = dict de generacion estilo A1111/Civitai (prompt, negativePrompt, Model, sampler, steps, cfgScale, seed, Size, ...); {} si la imagen no expone datos. resources = lista de {modelType, modelName, ...}. comfy_workflow = workflow ComfyUI en API format si la meta lo trae embebido, {} si no. nsfw = bool derivado de nsfw_level (politica SFW del caller). ok=False con error si el id no se parsea, la red falla, o la imagen no tiene ni meta ni workflow embebido." +tested: true +tests: + - "test_parse_image_id_acepta_int_str_url" + - "test_parse_image_id_rechaza_invalidos" + - "test_is_nsfw_niveles" + - "test_extract_comfy_workflow_embebido_y_ausente" +test_file_path: "python/functions/ml/comfyui_fetch_civitai_image_meta_test.py" +file_path: "python/functions/ml/comfyui_fetch_civitai_image_meta.py" +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) +from ml.comfyui_fetch_civitai_image_meta import comfyui_fetch_civitai_image_meta + +# "Entra al link y observa los detalles de cómo lo hicieron": +src = comfyui_fetch_civitai_image_meta("https://civitai.com/images/23526611") +print(src["ok"], src["nsfw"]) # True False +print(src["meta"].get("sampler"), src["meta"].get("steps")) # Euler 20 +print([r["modelName"] for r in src["resources"]]) # ['FLUX'] +``` + +## Cuando usarla + +Cuando te pasen un link o id de una imagen de Civitai y necesites saber **cómo se generó** +(prompt, modelo, sampler, LoRAs) antes de replicarla, comparar recetas, o alimentar la +librería de skills. Es el primer paso del pipeline `comfyui_replicate_civitai_oneshot`. Para +buscar imágenes por modelo/feed usa `comfyui_search_civitai_images`; esta función resuelve UNA +imagen concreta por id. + +## Gotchas + +- Usa endpoints tRPC **no documentados** de Civitai (`image.getGenerationData`, `image.get`). + Son los que usa la web y hoy funcionan, pero pueden cambiar sin aviso (frágil por diseño; + no hay alternativa pública que dé la meta por id). +- Civitai responde **HTTP 500** para un id inexistente o privado → la función devuelve + `ok=False` con el mensaje. Imágenes antiguas o subidas sin receta pueden devolver `meta={}`. +- El token sale de `pass civitai/api-token`. Sin token algunas imágenes gated no resuelven. +- `comfy_workflow` (workflow embebido) hoy rara vez viene por esta vía; lo normal es + reconstruir desde `meta` con `comfyui_map_a1111_params`. +- El `meta` está en formato A1111 (claves `Model`, `cfgScale`, `Size`), no ComfyUI. Tradúcelo + con `comfyui_map_a1111_params`. diff --git a/python/functions/ml/comfyui_fetch_civitai_image_meta.py b/python/functions/ml/comfyui_fetch_civitai_image_meta.py new file mode 100644 index 00000000..dc3998ff --- /dev/null +++ b/python/functions/ml/comfyui_fetch_civitai_image_meta.py @@ -0,0 +1,226 @@ +"""Recupera los detalles de generación de una imagen de Civitai por su id/URL. + +Es el paso "entrar al link y observar cómo lo hicieron": dado el id de una imagen +de Civitai (o su URL `civitai.com/images/`), consulta los endpoints tRPC que +usa la propia web de Civitai para mostrar el panel *Generation data* y devuelve la +receta cruda de generación: prompt, prompt negativo, modelo, sampler, steps, cfg, +seed, dimensiones, recursos (checkpoint + LoRAs) y el nivel NSFW. + +Por qué tRPC y no la API v1 pública: el endpoint documentado `GET /api/v1/images` +hoy devuelve `meta: null` para casi todas las imágenes y sirve un JPEG recomprimido +sin el workflow embebido (ver Gotchas de `comfyui_search_civitai_images`). Los +endpoints tRPC `image.getGenerationData` e `image.get` —los mismos que consume el +frontend de civitai.com— sí exponen la metadata de generación completa por id. + +Esta función NO descarga la imagen ni reconstruye un workflow: solo lee los datos. +La reconstrucción del workflow y la réplica las hace el pipeline +`comfyui_replicate_civitai_oneshot`, que compone esta función. + +Impura: red (HTTP GET a civitai.com) + subprocess (`pass` para el token). Solo +stdlib. +""" +import json +import os +import re +import subprocess +import urllib.error +import urllib.parse +import urllib.request + +_TRPC = "https://civitai.com/api/trpc" +_TIMEOUT = 15.0 +_UA = "Mozilla/5.0 (fn_registry comfyui_fetch_civitai_image_meta)" +_IMAGE_ID_RE = re.compile(r"/images/(\d+)") + + +def _resolve_token(token): + """Devuelve el token explícito o lo lee de `pass civitai/api-token` (best-effort).""" + if token: + return token + try: + out = subprocess.run( + ["pass", "civitai/api-token"], + capture_output=True, text=True, timeout=10, + ) + if out.returncode == 0: + lines = out.stdout.strip().splitlines() + return lines[0].strip() if lines and lines[0].strip() else None + except Exception: # noqa: BLE001 — pass no instalado / entrada inexistente + pass + return None + + +def _parse_image_id(image_ref): + """Extrae el id de imagen de un int, una cadena numérica o una URL de Civitai. + + Devuelve (image_id:int, error:str). error no vacío si no se pudo determinar. + """ + if isinstance(image_ref, bool): # bool es subclase de int: rechazar explícito + return 0, f"image_ref no válido: {image_ref!r}" + if isinstance(image_ref, int): + return (image_ref, "") if image_ref > 0 else (0, f"id no válido: {image_ref}") + if isinstance(image_ref, str): + s = image_ref.strip() + if s.isdigit(): + return int(s), "" + m = _IMAGE_ID_RE.search(s) + if m: + return int(m.group(1)), "" + return 0, (f"no se pudo extraer un image id de {image_ref!r}; pasa un id " + "numérico o una URL del tipo civitai.com/images/") + return 0, f"image_ref debe ser int o str, no {type(image_ref).__name__}" + + +def _trpc_get(endpoint, image_id, token, timeout): + """GET a un endpoint tRPC de Civitai por id. Devuelve (data:dict, error:str).""" + payload = urllib.parse.quote(json.dumps({"json": {"id": image_id}})) + url = f"{_TRPC}/{endpoint}?input={payload}" + headers = {"User-Agent": _UA} + if token: + headers["Authorization"] = f"Bearer {token}" + try: + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req, timeout=timeout) as resp: + raw = json.loads(resp.read()) + except urllib.error.HTTPError as exc: + body = exc.read().decode(errors="replace")[:200] + return {}, f"HTTP {exc.code} en {endpoint}: {body}" + except urllib.error.URLError as exc: + return {}, f"no se pudo conectar a civitai.com: {exc.reason}" + except json.JSONDecodeError as exc: + return {}, f"respuesta de {endpoint} no es JSON válido: {exc}" + return raw.get("result", {}).get("data", {}).get("json", {}) or {}, "" + + +def _is_nsfw(nsfw_level): + """Interpreta el nsfwLevel de Civitai (None/'None'/1 = SFW; el resto = NSFW).""" + if nsfw_level in (None, "None", "", 0, 1): + return False + if isinstance(nsfw_level, str): + return nsfw_level not in ("None", "Safe") + if isinstance(nsfw_level, (int, float)) and not isinstance(nsfw_level, bool): + return nsfw_level > 1 + return bool(nsfw_level) + + +def _extract_comfy_workflow(meta): + """Devuelve el workflow ComfyUI en API format si la meta lo trae embebido. + + Civitai guarda en las generaciones onSite hechas con ComfyUI un campo `comfy` + con un JSON serializado `{"prompt": {...}, "workflow": {...}}`. Hoy es raro por + la vía tRPC, pero si aparece se devuelve el chunk `prompt` (API format) para + replicar el workflow EXACTO sin reconstruirlo. + """ + raw = (meta or {}).get("comfy") + if not raw: + return {} + try: + obj = json.loads(raw) if isinstance(raw, str) else raw + except (json.JSONDecodeError, TypeError): + return {} + if isinstance(obj, dict): + prompt = obj.get("prompt") + if isinstance(prompt, dict) and prompt: + return prompt + # A veces el propio dict ya es el API format ({node_id: {class_type,...}}). + if obj and all(isinstance(v, dict) and "class_type" in v for v in obj.values()): + return obj + return {} + + +def comfyui_fetch_civitai_image_meta(image_ref, *, token=None, timeout=15.0): + """Recupera los detalles de generación de una imagen de Civitai por id/URL. + + Args: + image_ref: id numérico de la imagen (int o str), o su URL + `https://civitai.com/images/` (con o sin query string). + token: API token de Civitai (header Authorization Bearer). Si None se + resuelve de `pass civitai/api-token`. No hardcodear. keyword-only. + timeout: timeout HTTP en segundos por petición. keyword-only. + + Returns: + dict {ok, image_id, meta, resources, process, comfy_workflow, width, + height, nsfw, nsfw_level, post_id, url_uuid, page_url, error}: + - meta: dict de generación estilo A1111/Civitai (prompt, negativePrompt, + Model, sampler, steps, cfgScale, seed, Size, ...). {} si la imagen no + expone datos de generación. + - resources: lista de {modelType, modelName, ...} (checkpoint + LoRAs). + - comfy_workflow: workflow ComfyUI en API format si la meta lo trae + embebido (campo `comfy`); {} si no. + - nsfw: bool derivado de nsfw_level (política SFW del caller). + - url_uuid / post_id: identificadores para localizar el archivo original. + ok=False con error si el id no se puede parsear, la red falla, o la imagen + no tiene ni meta de generación ni workflow embebido (nada que replicar). + """ + image_id, perr = _parse_image_id(image_ref) + if perr: + return _err(perr) + + tok = _resolve_token(token) + + gen, gerr = _trpc_get("image.getGenerationData", image_id, tok, timeout) + if gerr: + return _err(f"getGenerationData falló: {gerr}", image_id=image_id) + + info, ierr = _trpc_get("image.get", image_id, tok, timeout) + # image.get es complementario (url/dims/nsfw): si falla no abortamos, seguimos + # con lo que dé getGenerationData. + info = info or {} + + meta = gen.get("meta") or {} + resources = gen.get("resources") or [] + comfy_workflow = _extract_comfy_workflow(meta) + nsfw_level = info.get("nsfwLevel", gen.get("nsfwLevel")) + + has_meta = bool(meta.get("prompt") or meta.get("Model") or resources) + if not has_meta and not comfy_workflow: + detail = "" if not ierr else f" (image.get: {ierr})" + return _err( + f"la imagen {image_id} no expone datos de generación ni workflow " + f"embebido (puede ser privada, sin metadata, o subida sin receta){detail}", + image_id=image_id, nsfw_level=nsfw_level, + ) + + return { + "ok": True, + "image_id": image_id, + "meta": meta, + "resources": resources, + "process": gen.get("process") or "", + "comfy_workflow": comfy_workflow, + "width": info.get("width"), + "height": info.get("height"), + "nsfw": _is_nsfw(nsfw_level), + "nsfw_level": nsfw_level, + "post_id": info.get("postId"), + "url_uuid": info.get("url") or "", + "page_url": f"https://civitai.com/images/{image_id}", + "error": "", + } + + +def _err(msg, **extra): + base = { + "ok": False, "image_id": 0, "meta": {}, "resources": [], "process": "", + "comfy_workflow": {}, "width": None, "height": None, "nsfw": False, + "nsfw_level": None, "post_id": None, "url_uuid": "", "page_url": "", + "error": msg, + } + base.update(extra) + return base + + +if __name__ == "__main__": + import sys + + ref = sys.argv[1] if len(sys.argv) > 1 else "https://civitai.com/images/23526611" + out = comfyui_fetch_civitai_image_meta(ref) + m = out.get("meta") or {} + print(json.dumps({ + "ok": out["ok"], "image_id": out["image_id"], "process": out["process"], + "nsfw": out["nsfw"], "has_comfy_workflow": bool(out["comfy_workflow"]), + "model": m.get("Model"), "sampler": m.get("sampler"), + "steps": m.get("steps"), "cfg": m.get("cfgScale"), + "prompt_head": (m.get("prompt") or "")[:80], + "n_resources": len(out["resources"]), "error": out["error"], + }, ensure_ascii=False, indent=2)) diff --git a/python/functions/ml/comfyui_fetch_civitai_image_meta_test.py b/python/functions/ml/comfyui_fetch_civitai_image_meta_test.py new file mode 100644 index 00000000..1e7bc4ed --- /dev/null +++ b/python/functions/ml/comfyui_fetch_civitai_image_meta_test.py @@ -0,0 +1,56 @@ +"""Tests de los helpers puros de comfyui_fetch_civitai_image_meta. + +No tocan la red: validan el parseo de id, la interpretación de NSFW y la +extracción del workflow embebido. La función completa (impura) se valida en vivo +contra Civitai en el report 0127. +""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from comfyui_fetch_civitai_image_meta import ( # noqa: E402 + _extract_comfy_workflow, + _is_nsfw, + _parse_image_id, +) + + +def test_parse_image_id_acepta_int_str_url(): + assert _parse_image_id(23526611) == (23526611, "") + assert _parse_image_id("23526611") == (23526611, "") + assert _parse_image_id("https://civitai.com/images/9078035?x=1") == (9078035, "") + assert _parse_image_id("/images/777") == (777, "") + + +def test_parse_image_id_rechaza_invalidos(): + for bad in ("not_an_id", -5, 0, True, 3.5, None): + iid, err = _parse_image_id(bad) + assert iid == 0 and err, f"deberia rechazar {bad!r}" + + +def test_is_nsfw_niveles(): + for sfw in (None, "None", "", 0, 1, "Safe"): + assert _is_nsfw(sfw) is False, f"{sfw!r} deberia ser SFW" + for nsfw in (2, 4, 16, "Mature", "X", "Soft"): + assert _is_nsfw(nsfw) is True, f"{nsfw!r} deberia ser NSFW" + + +def test_extract_comfy_workflow_embebido_y_ausente(): + # meta con un comfy embebido (string JSON {prompt: API format}) + api = {"3": {"class_type": "KSampler", "inputs": {}}} + import json + meta_ok = {"comfy": json.dumps({"prompt": api})} + assert _extract_comfy_workflow(meta_ok) == api + # meta sin comfy -> {} + assert _extract_comfy_workflow({"prompt": "hola"}) == {} + # comfy ilegible -> {} (no crashea) + assert _extract_comfy_workflow({"comfy": "no es json {"}) == {} + + +if __name__ == "__main__": + test_parse_image_id_acepta_int_str_url() + test_parse_image_id_rechaza_invalidos() + test_is_nsfw_niveles() + test_extract_comfy_workflow_embebido_y_ausente() + print("OK") diff --git a/python/functions/ml/comfyui_map_a1111_params.md b/python/functions/ml/comfyui_map_a1111_params.md new file mode 100644 index 00000000..439fd5a4 --- /dev/null +++ b/python/functions/ml/comfyui_map_a1111_params.md @@ -0,0 +1,66 @@ +--- +name: comfyui_map_a1111_params +kind: function +lang: py +domain: ml +version: "1.0.0" +purity: pure +signature: "def comfyui_map_a1111_params(meta: dict, resources: list | None = None) -> dict" +description: "Traduce la metadata de generacion de Civitai/A1111 a parametros de ComfyUI. Mapea el sampler A1111 ('DPM++ 2M Karras') a (sampler_name='dpmpp_2m', scheduler='karras') de ComfyUI, normaliza steps/cfg/dims/seed/positive/negative a las claves que consumen los builders del registry, infiere la familia del modelo (sd15|sdxl|flux|unknown) por nombre/baseModel/recursos/dimensiones, y extrae los LoRAs tanto de los resources de Civitai como de las etiquetas del prompt (sintaxis A1111, que ademas limpia del prompt). Funcion pura: sin red ni I/O." +tags: [comfyui, civitai, replicate, ml, a1111, sampler, pure, stable-diffusion] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "" +imports: ["re"] +params: + - name: meta + desc: "Dict de generacion estilo A1111/Civitai. Claves reconocidas: prompt, negativePrompt, Model/model, baseModel, sampler, steps, cfgScale, seed, Size ('WxH'), clipSkip." + - name: resources + desc: "Lista de recursos de Civitai ({modelType, modelName, weight, baseModel, ...}) para detectar checkpoint, LoRAs y familia. Opcional (None = lista vacia)." +output: "dict {sampler_name, scheduler, steps, cfg, width, height, seed, positive, negative, family, checkpoint_hint, loras, clip_skip}. Los numericos son None cuando la meta no los aporta (el caller pone defaults por familia). family in {sd15, sdxl, flux, unknown}. loras = [{name, weight, source}]. positive viene SIN las etiquetas (que pasan a loras)." +tested: true +tests: + - "test_map_sampler_karras_y_ancestral" + - "test_infer_family_sdxl_sd15_flux" + - "test_loras_de_resources_y_tags_del_prompt" + - "test_dims_desde_size_y_numericos" +test_file_path: "python/functions/ml/comfyui_map_a1111_params_test.py" +file_path: "python/functions/ml/comfyui_map_a1111_params.py" +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) +from ml.comfyui_map_a1111_params import comfyui_map_a1111_params + +meta = {"prompt": "knight ", "negativePrompt": "blurry", + "Model": "juggernautXL_v11", "sampler": "DPM++ 2M Karras", + "steps": 30, "cfgScale": 5.5, "seed": 12345, "Size": "832x1216"} +p = comfyui_map_a1111_params(meta) +print(p["sampler_name"], p["scheduler"]) # dpmpp_2m karras +print(p["family"]) # sdxl +print(p["loras"]) # [{'name': 'detail', 'weight': 0.6, 'source': 'prompt_tag'}] +``` + +## Cuando usarla + +Cuando tengas la `meta` de una imagen de Civitai/A1111 (p.ej. de +`comfyui_fetch_civitai_image_meta`) y necesites construir un workflow ComfyUI que la +reproduzca: traduce sampler/scheduler/dims/loras al vocabulario de ComfyUI y te dice la familia +del modelo para elegir un checkpoint compatible. Es el puente entre observar una receta y +construir el workflow en `comfyui_replicate_civitai_oneshot`. + +## Gotchas + +- El mapeo de sampler cubre los comunes; un sampler raro o mal escrito cae a + `('euler','normal')` (fallback seguro, no falla). +- `family` por dimensiones es heurística: si no hay pistas en nombres, dimensión mayor >=900 + → `sdxl`, si no `sd15`. Puede equivocarse con modelos atípicos. +- Los nombres de LoRA salen como los nombra Civitai (modelName o tag del prompt), NO como el + filename `.safetensors` instalado — el caller debe casarlos contra los modelos del servidor. +- Pura: no consulta el servidor ComfyUI ni valida que el checkpoint exista; eso lo hace el + pipeline. diff --git a/python/functions/ml/comfyui_map_a1111_params.py b/python/functions/ml/comfyui_map_a1111_params.py new file mode 100644 index 00000000..a15a20bc --- /dev/null +++ b/python/functions/ml/comfyui_map_a1111_params.py @@ -0,0 +1,226 @@ +"""Traduce la metadata de generación de Civitai/A1111 a parámetros de ComfyUI. + +La metadata que expone Civitai (y Automatic1111) nombra el sampler y el scheduler +de forma distinta a ComfyUI: "DPM++ 2M Karras" en A1111 es +`sampler_name="dpmpp_2m"` + `scheduler="karras"` en ComfyUI. Esta función pura hace +ese mapeo y normaliza el resto de la receta a las claves que consumen los builders +del registry (`comfyui_build_txt2img_workflow`, etc.): steps, cfg, width, height, +seed, positive, negative. + +Además infiere la *familia* del modelo (`sd15` / `sdxl` / `flux` / `unknown`) a +partir del nombre del modelo, el `baseModel`, los recursos y las dimensiones, para +que el pipeline de réplica pueda sustituir el checkpoint original por uno instalado +de la misma familia cuando el exacto no esté disponible. Y extrae los LoRAs tanto +de los `resources` de Civitai como de las etiquetas `` embebidas +en el propio prompt (sintaxis A1111). + +Función pura: sin red, sin I/O. Solo stdlib (re). +""" +import re + +# Mapeo sampler A1111 -> (sampler_name ComfyUI, scheduler por defecto). El scheduler +# real puede venir como sufijo del nombre A1111 ("... Karras") y se detecta aparte. +_SAMPLER_MAP = { + "euler": ("euler", "normal"), + "euler a": ("euler_ancestral", "normal"), + "euler ancestral": ("euler_ancestral", "normal"), + "lms": ("lms", "normal"), + "heun": ("heun", "normal"), + "dpm2": ("dpm_2", "normal"), + "dpm2 a": ("dpm_2_ancestral", "normal"), + "dpm fast": ("dpm_fast", "normal"), + "dpm adaptive": ("dpm_adaptive", "normal"), + "dpm++ 2s a": ("dpmpp_2s_ancestral", "normal"), + "dpm++ 2m": ("dpmpp_2m", "normal"), + "dpm++ sde": ("dpmpp_sde", "normal"), + "dpm++ 2m sde": ("dpmpp_2m_sde", "normal"), + "dpm++ 2m sde heun": ("dpmpp_2m_sde", "normal"), + "dpm++ 3m sde": ("dpmpp_3m_sde", "normal"), + "ddim": ("ddim", "ddim_uniform"), + "ddpm": ("ddpm", "normal"), + "plms": ("euler", "normal"), # PLMS no existe en ComfyUI -> fallback euler + "unipc": ("uni_pc", "normal"), + "lcm": ("lcm", "normal"), + "restart": ("restart", "normal"), +} + +# Sufijos de scheduler que A1111 concatena al nombre del sampler. +_SCHEDULER_SUFFIXES = [ + ("karras", "karras"), + ("exponential", "exponential"), + ("sgm uniform", "sgm_uniform"), + ("simple", "simple"), + ("ddim uniform", "ddim_uniform"), + ("beta", "beta"), +] + +_DEFAULT_SAMPLER = ("euler", "normal") + +# Etiqueta A1111 de LoRA embebida en el prompt: . +_LORA_TAG_RE = re.compile(r"]+)(?::([0-9.]+))?[^>]*>", re.IGNORECASE) + +_SDXL_HINTS = ("xl", "pony", "sdxl", "illustrious", "noob", "animagine", "playground") +_SD15_HINTS = ("sd 1.5", "sd1.5", "sd15", "v1-5", "v1.5", "1.5") +_FLUX_HINTS = ("flux",) + + +def _map_sampler(raw): + """Traduce un nombre de sampler A1111 a (sampler_name, scheduler) de ComfyUI.""" + if not raw or not isinstance(raw, str): + return _DEFAULT_SAMPLER + name = raw.strip().lower() + scheduler = None + for suffix, sched in _SCHEDULER_SUFFIXES: + if name.endswith(" " + suffix): + scheduler = sched + name = name[: -len(suffix)].strip() + break + sampler_name, default_sched = _SAMPLER_MAP.get(name, _DEFAULT_SAMPLER) + return sampler_name, (scheduler or default_sched) + + +def _num(value, cast): + """Castea best-effort un valor que puede venir como str/num; None si no se puede.""" + if value is None or isinstance(value, bool): + return None + try: + return cast(value) + except (TypeError, ValueError): + try: + return cast(float(value)) if cast is int else cast(value) + except (TypeError, ValueError): + return None + + +def _dims_from_size(size): + """Parsea 'WxH' (ej. '832x1216') a (width, height); (None, None) si no procede.""" + if not isinstance(size, str) or "x" not in size.lower(): + return None, None + try: + w, h = size.lower().split("x", 1) + return int(w.strip()), int(h.strip()) + except (ValueError, AttributeError): + return None, None + + +def _infer_family(meta, resources, width, height): + """Infiere 'sd15' | 'sdxl' | 'flux' | 'unknown' de la receta.""" + blob_parts = [ + str(meta.get("Model") or ""), + str(meta.get("model") or ""), + str(meta.get("baseModel") or ""), + ] + for res in resources or []: + if isinstance(res, dict): + blob_parts.append(str(res.get("modelName") or "")) + blob_parts.append(str(res.get("baseModel") or "")) + blob = " ".join(blob_parts).lower() + + if any(h in blob for h in _FLUX_HINTS): + return "flux" + if any(h in blob for h in _SDXL_HINTS): + return "sdxl" + if any(h in blob for h in _SD15_HINTS): + return "sd15" + # Sin pistas en los nombres: deducir por la dimensión mayor. + longest = max(width or 0, height or 0) + if longest >= 900: + return "sdxl" + if longest > 0: + return "sd15" + return "unknown" + + +def _checkpoint_hint(meta, resources): + """Nombre del checkpoint original (Model de la meta o primer resource checkpoint).""" + model = meta.get("Model") or meta.get("model") + if isinstance(model, str) and model.strip(): + return model.strip() + for res in resources or []: + if isinstance(res, dict) and str(res.get("modelType", "")).lower() in ( + "checkpoint", "model" + ): + nm = res.get("modelName") or res.get("name") + if nm: + return str(nm) + return "" + + +def _loras(meta, resources): + """Extrae LoRAs de los resources Civitai y de las etiquetas del prompt.""" + out = [] + seen = set() + for res in resources or []: + if isinstance(res, dict) and str(res.get("modelType", "")).lower() == "lora": + nm = res.get("modelName") or res.get("name") + if nm and str(nm) not in seen: + seen.add(str(nm)) + w = res.get("weight") + weight = w if isinstance(w, (int, float)) and not isinstance(w, bool) else 1.0 + out.append({"name": str(nm), "weight": float(weight), "source": "resource"}) + for m in _LORA_TAG_RE.finditer(str(meta.get("prompt") or "")): + nm = m.group(1).strip() + if nm and nm not in seen: + seen.add(nm) + weight = float(m.group(2)) if m.group(2) else 1.0 + out.append({"name": nm, "weight": weight, "source": "prompt_tag"}) + return out + + +def _clean_prompt(prompt): + """Quita las etiquetas del prompt (ComfyUI las maneja como nodos).""" + return _LORA_TAG_RE.sub("", str(prompt or "")).strip().strip(",").strip() + + +def comfyui_map_a1111_params(meta, resources=None): + """Traduce metadata de generación Civitai/A1111 a parámetros de ComfyUI. + + Args: + meta: dict de generación estilo A1111/Civitai. Claves reconocidas: `prompt`, + `negativePrompt`, `Model`/`model`, `baseModel`, `sampler`, `steps`, + `cfgScale`, `seed`, `Size` ('WxH'), `clipSkip`. + resources: lista de recursos de Civitai ({modelType, modelName, weight, + baseModel, ...}) para detectar checkpoint, LoRAs y familia. Opcional. + + Returns: + dict {sampler_name, scheduler, steps, cfg, width, height, seed, positive, + negative, family, checkpoint_hint, loras, clip_skip}. Los valores numéricos + son None cuando la meta no los aporta (el caller pone defaults por familia). + `family` ∈ {sd15, sdxl, flux, unknown}. `loras` = [{name, weight, source}]. + `positive` viene sin las etiquetas (que pasan a `loras`). + """ + meta = meta or {} + resources = resources or [] + + sampler_name, scheduler = _map_sampler(meta.get("sampler")) + width, height = _dims_from_size(meta.get("Size")) + family = _infer_family(meta, resources, width, height) + + return { + "sampler_name": sampler_name, + "scheduler": scheduler, + "steps": _num(meta.get("steps"), int), + "cfg": _num(meta.get("cfgScale"), float), + "width": width, + "height": height, + "seed": _num(meta.get("seed"), int), + "positive": _clean_prompt(meta.get("prompt")), + "negative": str(meta.get("negativePrompt") or "").strip(), + "family": family, + "checkpoint_hint": _checkpoint_hint(meta, resources), + "loras": _loras(meta, resources), + "clip_skip": _num(meta.get("clipSkip"), int), + } + + +if __name__ == "__main__": + import json + + demo_meta = { + "prompt": "cinematic portrait of a knight , sharp focus", + "negativePrompt": "blurry, lowres", + "Model": "juggernautXL_v11", + "sampler": "DPM++ 2M Karras", + "steps": 30, "cfgScale": 5.5, "seed": 12345, "Size": "832x1216", + } + print(json.dumps(comfyui_map_a1111_params(demo_meta), ensure_ascii=False, indent=2)) diff --git a/python/functions/ml/comfyui_map_a1111_params_test.py b/python/functions/ml/comfyui_map_a1111_params_test.py new file mode 100644 index 00000000..4c41646f --- /dev/null +++ b/python/functions/ml/comfyui_map_a1111_params_test.py @@ -0,0 +1,56 @@ +"""Tests de comfyui_map_a1111_params (función pura): mapeo sampler, familia, loras, dims.""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from comfyui_map_a1111_params import comfyui_map_a1111_params # noqa: E402 + + +def test_map_sampler_karras_y_ancestral(): + p = comfyui_map_a1111_params({"sampler": "DPM++ 2M Karras"}) + assert (p["sampler_name"], p["scheduler"]) == ("dpmpp_2m", "karras") + p = comfyui_map_a1111_params({"sampler": "Euler a"}) + assert (p["sampler_name"], p["scheduler"]) == ("euler_ancestral", "normal") + p = comfyui_map_a1111_params({"sampler": "DDIM"}) + assert (p["sampler_name"], p["scheduler"]) == ("ddim", "ddim_uniform") + # sampler desconocido -> fallback seguro + p = comfyui_map_a1111_params({"sampler": "sampler_inventado"}) + assert (p["sampler_name"], p["scheduler"]) == ("euler", "normal") + + +def test_infer_family_sdxl_sd15_flux(): + assert comfyui_map_a1111_params({"Model": "juggernautXL_v11"})["family"] == "sdxl" + assert comfyui_map_a1111_params({"Model": "dreamshaper", "Size": "512x768"})["family"] == "sd15" + assert comfyui_map_a1111_params({}, [{"modelType": "Checkpoint", "modelName": "FLUX"}])["family"] == "flux" + # sin pistas en nombres -> por dimensión + assert comfyui_map_a1111_params({"Size": "1024x1024"})["family"] == "sdxl" + assert comfyui_map_a1111_params({})["family"] == "unknown" + + +def test_loras_de_resources_y_tags_del_prompt(): + meta = {"prompt": "knight , sharp focus"} + resources = [{"modelType": "LORA", "modelName": "Add Detail XL", "weight": 0.8}] + p = comfyui_map_a1111_params(meta, resources) + names = {lo["name"] for lo in p["loras"]} + assert "Add Detail XL" in names and "detail_tweaker" in names + # el tag se quita del positive + assert " None (el caller pone defaults) + p = comfyui_map_a1111_params({}) + assert p["width"] is None and p["steps"] is None and p["cfg"] is None + + +if __name__ == "__main__": + test_map_sampler_karras_y_ancestral() + test_infer_family_sdxl_sd15_flux() + test_loras_de_resources_y_tags_del_prompt() + test_dims_desde_size_y_numericos() + print("OK") diff --git a/python/functions/pipelines/comfyui_replicate_civitai_oneshot.md b/python/functions/pipelines/comfyui_replicate_civitai_oneshot.md new file mode 100644 index 00000000..9af1b0b6 --- /dev/null +++ b/python/functions/pipelines/comfyui_replicate_civitai_oneshot.md @@ -0,0 +1,91 @@ +--- +name: comfyui_replicate_civitai_oneshot +kind: pipeline +lang: py +domain: pipelines +version: "1.0.0" +purity: impure +signature: "def comfyui_replicate_civitai_oneshot(url_or_id, *, server: str = \"127.0.0.1:8188\", dest: str | None = None, judge: bool = True, token: str | None = None, wait_timeout: float = 600.0) -> dict" +description: "Replica una imagen de Civitai en una sola llamada: te paso un link y entra, observa como lo hicieron, construye un workflow que lo replique, lo genera y lo juzga. Acepta civitai.com/images/, su id numerico, un modelVersionId (replica su primera imagen SFW) o directamente una URL/ruta/dict de un workflow ComfyUI (PNG embebido, .json, API format). Pasos: fetch_civitai_image_meta (observa receta) -> map_a1111_params (traduce a ComfyUI) -> workflow embebido tal cual O reconstruido con build_txt2img + inject_lora sustituyendo el checkpoint original por el mas parecido INSTALADO de la misma familia y descartando los LoRAs ausentes -> run_foreign_workflow_oneshot (resolve+submit+wait+fetch) -> judge_image. NO baja modelos a ciegas (los reporta en missing_models con la sustitucion). Respeta SFW: una imagen NSFW devuelve ok=False sin generar. Pipeline impuro: HTTP + disco + subprocess." +tags: [comfyui, civitai, replicate, pipeline, oneshot, ml, image, stable-diffusion] +uses_functions: + - comfyui_fetch_civitai_image_meta_py_ml + - comfyui_map_a1111_params_py_ml + - comfyui_object_info_py_ml + - comfyui_build_txt2img_workflow_py_ml + - comfyui_inject_lora_py_ml + - comfyui_search_civitai_images_py_ml + - comfyui_run_foreign_workflow_oneshot_py_pipelines + - comfyui_judge_image_py_ml +uses_types: [] +returns: [] +returns_optional: false +error_type: error_go_core +imports: [] +params: + - name: url_or_id + desc: "Link/URL de una imagen Civitai (civitai.com/images/), su id numerico (int o str), un modelVersionId (se replica su primera imagen SFW), o directamente una URL/ruta/dict de un workflow ComfyUI (PNG con workflow embebido, .json, o dict en API format)." + - name: server + desc: "host:port del servidor ComfyUI (sin esquema). keyword-only." + - name: dest + desc: "Directorio donde guardar la replica. None = ~/ComfyUI/civitai_replicas. keyword-only." + - name: judge + desc: "Si True, juzga la replica con el panel comfyui_judge_image contra el prompt extraido. keyword-only." + - name: token + desc: "Token Civitai (Bearer). None lo resuelve de 'pass civitai/api-token'. No hardcodear. keyword-only." + - name: wait_timeout + desc: "Segundos maximos esperando a que ComfyUI termine. keyword-only." +output: "dict {ok, source, replica_image_path, prompt_id, judge, missing_models, has_workflow, error}. source = receta observada {image_id, page_url, prompt, negative, model, family, sampler_name, scheduler, steps, cfg, width, height, seed, loras, process, has_workflow_embedded}. replica_image_path = ruta local de la imagen replica. missing_models = modelos que la receta pedia y no teniamos, con la sustitucion aplicada (NUNCA descargados). judge = dict del panel (None si judge=False o no se genero). has_workflow = True si se replico un workflow embebido tal cual. ok=False con error si el link es invalido/privado/sin meta, la imagen es NSFW (se respeta SFW), el server no responde, o la generacion falla." +tested: true +tests: + - "test_classify_input_image_modelversion_workflow_error" + - "test_pick_checkpoint_familia_y_exacto" + - "test_find_installed_match_normalizado" + - "test_extract_positive_from_workflow" +test_file_path: "python/functions/pipelines/comfyui_replicate_civitai_oneshot_test.py" +file_path: "python/functions/pipelines/comfyui_replicate_civitai_oneshot.py" +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) +from pipelines.comfyui_replicate_civitai_oneshot import comfyui_replicate_civitai_oneshot + +# Te paso un link de Civitai SFW -> entra, observa la receta, la replica y la juzga: +out = comfyui_replicate_civitai_oneshot("https://civitai.com/images/23526611") +print(out["ok"], out["replica_image_path"]) +print(out["source"]["family"], out["source"]["sampler_name"]) # flux euler +print(out["missing_models"]) # [{'kind':'checkpoint','name':'FLUX','substituted_with':'juggernaut_xl_v11.safetensors',...}] +print(out["judge"]["verdict"], round(out["judge"]["score"], 2)) # bad 4.74 (parecido aproximado sin el modelo exacto) +``` + +## Cuando usarla + +Cuando te pasen el link de una imagen de Civitai que te gusta y quieras **reproducirla en tu +ComfyUI**: extrae cómo se hizo y genera una versión equivalente con lo que tienes instalado, +sin reconstruir el workflow a mano. También sirve para ejecutar un workflow ComfyUI ajeno +(PNG/JSON/dict) tal cual. Es la doctrina del issue 0087 aplicada a "replicar desde un link": +observar una receta pública y reproducirla en un solo paso. + +## Gotchas + +- **El parecido es aproximado cuando falta el modelo exacto.** Casi nunca tendrás el mismo + checkpoint/LoRA que Civitai: se sustituye por el más parecido instalado (misma familia) y se + reporta en `missing_models`. Replicar un FLUX con un SDXL clava la composición pero no el + texto/estilo fino — es esperado, no un fallo. +- **NO baja modelos a ciegas.** Lo que no tienes se reporta, no se descarga. Bajarlo es una + decisión aparte (`comfyui_search_civitai_models` + `comfyui_download_model`). +- **SFW estricto.** Una imagen con `nsfw_level>1` devuelve `ok=False` sin generar ni descargar. +- Requiere el servidor ComfyUI vivo en `server` y al menos un checkpoint de imagen instalado. + Los checkpoints se detectan vía `/object_info` (reflejan `extra_model_paths.yaml`), no por + `listdir` — funciona aunque los modelos vivan fuera de `~/ComfyUI/models/`. +- El juez LLM puede caer por rate-limit (HTTP 429) si compite con otros agentes; el panel + degrada y vota con los jueces restantes (estético + CLIP), no crashea. +- Los LoRAs de Civitai se nombran por su nombre de modelo, no por el filename instalado: el + match es best-effort (normalizado); si no casa, el LoRA se omite y se reporta. + +## Capability growth log + +(v1.0.0 — versión inicial; aún no ha crecido.) diff --git a/python/functions/pipelines/comfyui_replicate_civitai_oneshot.py b/python/functions/pipelines/comfyui_replicate_civitai_oneshot.py new file mode 100644 index 00000000..ba6aa4a6 --- /dev/null +++ b/python/functions/pipelines/comfyui_replicate_civitai_oneshot.py @@ -0,0 +1,417 @@ +"""comfyui_replicate_civitai_oneshot — replica una imagen de Civitai en una llamada. + +"Te paso un link de Civitai: entro, observo cómo lo hicieron, y construyo un +workflow que lo replique." Dado el id/URL de una imagen de Civitai (o un +`modelVersionId`, o directamente una URL/dict de workflow ComfyUI), el pipeline: + + 1. OBSERVA los detalles de generación con `comfyui_fetch_civitai_image_meta` + (prompt, negativo, modelo, sampler, steps, cfg, seed, recursos) vía los + endpoints tRPC que usa la web de Civitai. + 2. TRADUCE la receta a parámetros de ComfyUI con `comfyui_map_a1111_params` + (sampler/scheduler, dims, familia del modelo, LoRAs). + 3. CONSTRUYE el workflow que la replica: + - si la imagen trae un workflow ComfyUI embebido -> se usa TAL CUAL; + - si no -> se reconstruye con `comfyui_build_txt2img_workflow` + + `comfyui_inject_lora`, sustituyendo el checkpoint original por el más + parecido INSTALADO (misma familia) cuando el exacto no está, y + descartando los LoRAs ausentes. + 4. RESUELVE dependencias y GENERA con `comfyui_run_foreign_workflow_oneshot` + (resolve_deps -> submit -> wait -> fetch). + 5. JUZGA la réplica con `comfyui_judge_image` contra el prompt extraído. + +NO baja modelos a ciegas: lo que la receta pide y no tenemos se reporta en +`missing_models` con la sustitución aplicada (el modelo más parecido instalado), +nunca se descarga. Respeta la política SFW: si la imagen es NSFW, devuelve +`ok=False` sin generar. + +El parecido será aproximado cuando falte el checkpoint/LoRA exacto (se reconstruye +con el más parecido) — eso es esperado y queda documentado en `missing_models`. + +Pipeline impuro: red (HTTP a Civitai + ComfyUI) + escritura en disco + subprocess +(jueces). Solo stdlib salvo las funciones del registry que compone. +""" +from __future__ import annotations + +import os +import re +import sys + +_FUNCTIONS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _FUNCTIONS_ROOT not in sys.path: + sys.path.insert(0, _FUNCTIONS_ROOT) + +from ml.comfyui_build_txt2img_workflow import comfyui_build_txt2img_workflow +from ml.comfyui_fetch_civitai_image_meta import comfyui_fetch_civitai_image_meta +from ml.comfyui_inject_lora import comfyui_inject_lora +from ml.comfyui_judge_image import comfyui_judge_image +from ml.comfyui_map_a1111_params import comfyui_map_a1111_params +from ml.comfyui_object_info import comfyui_object_info +from ml.comfyui_search_civitai_images import comfyui_search_civitai_images +from pipelines.comfyui_run_foreign_workflow_oneshot import comfyui_run_foreign_workflow_oneshot + +# Defaults de generación por familia cuando la meta no los aporta. +_FAMILY_DEFAULTS = { + "sd15": {"width": 512, "height": 768, "steps": 25, "cfg": 7.0}, + "sdxl": {"width": 1024, "height": 1024, "steps": 30, "cfg": 7.0}, + "flux": {"width": 1024, "height": 1024, "steps": 30, "cfg": 7.0}, + "unknown": {"width": 768, "height": 768, "steps": 25, "cfg": 7.0}, +} +# Checkpoints que NO sirven para txt2img de imagen (video / 3D / mallas). +_NON_IMAGE_CKPT = ("video", "svd", "zero123", "hunyuan", "ltx", "3d") +_CIVITAI_IMAGES_RE = re.compile(r"civitai\.com/images/(\d+)|^/?images/(\d+)$") +_MODEL_VERSION_RE = re.compile(r"modelVersionId[=/](\d+)|model-versions/(\d+)|modelVersions/(\d+)") +_WORKFLOW_EXTS = (".png", ".json", ".webp") + + +def _classify_input(url_or_id): + """Clasifica la entrada -> ('image'|'model_version'|'workflow_source', ref).""" + if isinstance(url_or_id, dict): + return "workflow_source", url_or_id + if isinstance(url_or_id, bool): + return "error", "entrada booleana no válida" + if isinstance(url_or_id, int): + return "image", url_or_id + if not isinstance(url_or_id, str): + return "error", f"entrada no soportada: {type(url_or_id).__name__}" + + s = url_or_id.strip() + if s.isdigit(): + return "image", int(s) + m = _CIVITAI_IMAGES_RE.search(s) + if m: + return "image", int(next(g for g in m.groups() if g)) + mv = _MODEL_VERSION_RE.search(s) + if mv: + return "model_version", int(next(g for g in mv.groups() if g)) + # URL/archivo de workflow (no es una página de imagen Civitai). + low = s.lower().split("?")[0] + if low.endswith(_WORKFLOW_EXTS) or os.path.exists(s): + return "workflow_source", s + if "civitai.com/models/" in low: + return "error", ("una URL de página de modelo no apunta a una imagen " + "concreta; pásame un link civitai.com/images/ o un " + "modelVersionId") + # Cualquier otra URL: dejar que el ejecutor de workflows foráneos lo intente. + if s.startswith(("http://", "https://")): + return "workflow_source", s + return "error", f"no se reconoce la entrada {url_or_id!r}" + + +def _server_models(server): + """(checkpoints, loras) que ve el servidor; ([], []) si no responde.""" + ckpts, loras = [], [] + try: + ci = comfyui_object_info(server, "CheckpointLoaderSimple") + ckpts = ci["CheckpointLoaderSimple"]["input"]["required"]["ckpt_name"][0] + except Exception: # noqa: BLE001 — server caído / nodo ausente + ckpts = [] + try: + li = comfyui_object_info(server, "LoraLoader") + loras = li["LoraLoader"]["input"]["required"]["lora_name"][0] + except Exception: # noqa: BLE001 + loras = [] + return list(ckpts), list(loras) + + +def _norm(name): + """Normaliza un nombre de modelo para comparar (sin ext, sin separadores).""" + base = os.path.splitext(str(name))[0].lower() + return re.sub(r"[^a-z0-9]", "", base) + + +def _find_installed(name, installed): + """Devuelve el filename instalado que casa con `name` (exacto/normalizado), o None.""" + if not name: + return None + target = _norm(name) + for cand in installed: + if _norm(cand) == target: + return cand + # Match laxo: el nombre normalizado de la receta contenido en el del archivo. + for cand in installed: + nc = _norm(cand) + if target and (target in nc or nc in target): + return cand + return None + + +def _pick_checkpoint(installed, family, hint): + """Elige el checkpoint instalado más parecido. Devuelve (filename, exact:bool).""" + candidates = [c for c in installed if not any(k in c.lower() for k in _NON_IMAGE_CKPT)] + if not candidates: + candidates = list(installed) + if not candidates: + return None, False + + exact = _find_installed(hint, candidates) + if exact: + return exact, True + + if family in ("sdxl", "flux"): + xl = [c for c in candidates if "xl" in c.lower()] + if xl: + return xl[0], False + if family == "sd15": + non_xl = [c for c in candidates if "xl" not in c.lower()] + if non_xl: + return non_xl[0], False + # Familia desconocida o sin candidato de la familia: preferir un SD1.5 versátil. + non_xl = [c for c in candidates if "xl" not in c.lower()] + return (non_xl[0] if non_xl else candidates[0]), False + + +def _reconstruct_workflow(params, server): + """Reconstruye un workflow txt2img desde la receta. Devuelve (workflow, missing).""" + family = params["family"] + defaults = _FAMILY_DEFAULTS.get(family, _FAMILY_DEFAULTS["unknown"]) + installed_ckpts, installed_loras = _server_models(server) + + if not installed_ckpts: + raise _ReplicateError( + "el servidor ComfyUI no devolvió checkpoints (¿vivo?); no se puede " + "elegir un modelo para la réplica") + + ckpt, exact = _pick_checkpoint(installed_ckpts, family, params["checkpoint_hint"]) + if not ckpt: + raise _ReplicateError("no hay ningún checkpoint de imagen instalado en el servidor") + + missing = [] + if not exact and params["checkpoint_hint"]: + missing.append({ + "kind": "checkpoint", + "name": params["checkpoint_hint"], + "base_model_family": family, + "substituted_with": ckpt, + "note": "checkpoint exacto no instalado; réplica con el más parecido", + }) + + width = params["width"] or defaults["width"] + height = params["height"] or defaults["height"] + steps = params["steps"] or defaults["steps"] + cfg = params["cfg"] if params["cfg"] is not None else defaults["cfg"] + seed = params["seed"] if params["seed"] is not None else 0 + + workflow = comfyui_build_txt2img_workflow( + ckpt, params["positive"], params["negative"], + steps=steps, cfg=cfg, width=width, height=height, seed=seed, + sampler_name=params["sampler_name"], scheduler=params["scheduler"], + filename_prefix="civitai_replica", + ) + + for lora in params["loras"]: + match = _find_installed(lora["name"], installed_loras) + if match: + workflow = comfyui_inject_lora( + workflow, match, + strength_model=lora["weight"], strength_clip=lora["weight"], + ) + else: + missing.append({ + "kind": "lora", "name": lora["name"], + "substituted_with": None, + "note": "LoRA no instalado; omitido de la réplica (no se baja a ciegas)", + }) + return workflow, missing + + +def _extract_positive_from_workflow(workflow): + """Saca el texto positivo más largo de los CLIPTextEncode (para juzgar embebidos).""" + texts = [] + for node in (workflow or {}).values(): + if isinstance(node, dict) and "CLIPTextEncode" in str(node.get("class_type", "")): + t = node.get("inputs", {}).get("text") + if isinstance(t, str) and t.strip(): + texts.append(t.strip()) + return max(texts, key=len) if texts else "" + + +def comfyui_replicate_civitai_oneshot( + url_or_id, + *, + server: str = "127.0.0.1:8188", + dest: str | None = None, + judge: bool = True, + token: str | None = None, + wait_timeout: float = 600.0, +): + """Replica una imagen de Civitai (o un workflow ajeno) en una sola llamada. + + Args: + url_or_id: link/URL de una imagen Civitai (`civitai.com/images/`), su id + numérico (int o str), un `modelVersionId` (se replica su primera imagen + SFW), o directamente una URL/ruta/dict de un workflow ComfyUI (PNG con + workflow embebido, .json, o dict en API format). + server: host:port del servidor ComfyUI (sin esquema). keyword-only. + dest: directorio donde guardar la réplica. None = `~/ComfyUI/civitai_replicas`. + keyword-only. + judge: si True, juzga la réplica con el panel `comfyui_judge_image` contra el + prompt extraído. keyword-only. + token: token Civitai (Bearer). None lo resuelve de `pass civitai/api-token`. + keyword-only. + wait_timeout: segundos máximos esperando a que ComfyUI termine. keyword-only. + + Returns: + dict {ok, source, replica_image_path, prompt_id, judge, missing_models, + has_workflow, error}: + - source: receta observada {image_id, page_url, prompt, negative, model, + family, sampler_name, scheduler, steps, cfg, width, height, seed, loras, + process, has_workflow_embedded}. + - replica_image_path: ruta local de la imagen réplica generada. + - missing_models: modelos que la receta pedía y no teníamos, con la + sustitución aplicada (NUNCA se descargan a ciegas). + - judge: dict del panel de jueces (None si judge=False o no se generó). + - has_workflow: True si se replicó un workflow embebido tal cual. + ok=False con error claro si: el link es inválido/privado/sin meta, la imagen + es NSFW (se respeta SFW), el server no responde, o la generación falla. + """ + kind, ref = _classify_input(url_or_id) + if kind == "error": + return _err(ref) + + # Caso A: la entrada YA es un workflow (PNG embebido / .json / dict / URL). + if kind == "workflow_source": + return _replicate_workflow_source(ref, server, dest, judge, token, wait_timeout) + + # Caso B: modelVersionId -> resolver a la primera imagen SFW de esa versión. + if kind == "model_version": + sr = comfyui_search_civitai_images(model_version_id=ref, nsfw="None", + limit=10, token=token) + if not sr.get("ok") or not sr.get("items"): + return _err(f"no se hallaron imágenes SFW para modelVersionId {ref}: " + f"{sr.get('error') or '0 resultados'}") + ref = sr["items"][0]["id"] + + # Caso C (principal): id/URL de imagen Civitai. + src = comfyui_fetch_civitai_image_meta(ref, token=token) + if not src.get("ok"): + return _err(f"no se pudieron observar los detalles de la imagen: {src.get('error')}") + if src.get("nsfw"): + return _err( + f"la imagen {src.get('image_id')} es NSFW (nivel {src.get('nsfw_level')!r}); " + "se respeta la política SFW y NO se replica.", + source=_source_from_meta(src, params=None)) + + params = comfyui_map_a1111_params(src["meta"], src.get("resources")) + source = _source_from_meta(src, params) + + # Construcción del workflow: embebido tal cual, o reconstruido desde la receta. + has_workflow = bool(src.get("comfy_workflow")) + try: + if has_workflow: + workflow = src["comfy_workflow"] + missing = [] + else: + workflow, missing = _reconstruct_workflow(params, server) + except _ReplicateError as exc: + return _err(str(exc), source=source) + + out_dir = os.path.expanduser(dest or "~/ComfyUI/civitai_replicas") + run = comfyui_run_foreign_workflow_oneshot( + workflow, server=server, dest=out_dir, output_kind="image", + wait_timeout=wait_timeout, civitai_token=token, + ) + if not run.get("ok"): + # run_foreign puede reportar deps faltantes propias (p.ej. un nodo custom). + extra_missing = run.get("missing") or [] + return _err(f"la generación de la réplica falló: {run.get('error')}", + source=source, missing_models=missing + extra_missing, + has_workflow=has_workflow) + + replica = run["outputs"][0] + judge_res = None + if judge: + try: + judge_res = comfyui_judge_image(replica, params["positive"], server=server) + except Exception as exc: # noqa: BLE001 — el juez no debe tumbar la réplica + judge_res = {"ok": False, "error": f"juez no disponible: {exc}"} + + return { + "ok": True, + "source": source, + "replica_image_path": replica, + "prompt_id": run.get("prompt_id", ""), + "judge": judge_res, + "missing_models": missing, + "has_workflow": has_workflow, + "error": "", + } + + +def _replicate_workflow_source(source_ref, server, dest, judge, token, wait_timeout): + """Replica un workflow ya embebido (PNG/.json/dict/URL) ejecutándolo tal cual.""" + out_dir = os.path.expanduser(dest or "~/ComfyUI/civitai_replicas") + run = comfyui_run_foreign_workflow_oneshot( + source_ref, server=server, dest=out_dir, output_kind="image", + wait_timeout=wait_timeout, civitai_token=token, + ) + if not run.get("ok"): + return _err(f"no se pudo ejecutar el workflow embebido: {run.get('error')}", + missing_models=run.get("missing") or [], has_workflow=True) + replica = run["outputs"][0] + positive = "" + if isinstance(source_ref, dict): + positive = _extract_positive_from_workflow(source_ref) + judge_res = None + if judge: + try: + judge_res = comfyui_judge_image(replica, positive, server=server) + except Exception as exc: # noqa: BLE001 + judge_res = {"ok": False, "error": f"juez no disponible: {exc}"} + return { + "ok": True, + "source": {"prompt": positive, "has_workflow_embedded": True, + "source_type": run.get("source_type", "")}, + "replica_image_path": replica, + "prompt_id": run.get("prompt_id", ""), + "judge": judge_res, + "missing_models": [], + "has_workflow": True, + "error": "", + } + + +def _source_from_meta(src, params): + """Construye el sub-dict `source` legible de la salida.""" + meta = src.get("meta") or {} + base = { + "image_id": src.get("image_id"), + "page_url": src.get("page_url", ""), + "process": src.get("process", ""), + "has_workflow_embedded": bool(src.get("comfy_workflow")), + "model": meta.get("Model") or meta.get("model"), + } + if params is not None: + base.update({ + "prompt": params["positive"], + "negative": params["negative"], + "family": params["family"], + "sampler_name": params["sampler_name"], + "scheduler": params["scheduler"], + "steps": params["steps"], + "cfg": params["cfg"], + "width": params["width"], + "height": params["height"], + "seed": params["seed"], + "loras": [lo["name"] for lo in params["loras"]], + }) + return base + + +def _err(msg, **extra): + base = {"ok": False, "source": {}, "replica_image_path": "", "prompt_id": "", + "judge": None, "missing_models": [], "has_workflow": False, "error": msg} + base.update(extra) + return base + + +class _ReplicateError(Exception): + """Error interno de reconstrucción, traducido a {ok: False, error}.""" + + +if __name__ == "__main__": + import json + + ref = sys.argv[1] if len(sys.argv) > 1 else "https://civitai.com/images/23526611" + out = comfyui_replicate_civitai_oneshot(ref, judge=False) + print(json.dumps({k: v for k, v in out.items() if k != "judge"}, + ensure_ascii=False, indent=2)) diff --git a/python/functions/pipelines/comfyui_replicate_civitai_oneshot_test.py b/python/functions/pipelines/comfyui_replicate_civitai_oneshot_test.py new file mode 100644 index 00000000..0fecf290 --- /dev/null +++ b/python/functions/pipelines/comfyui_replicate_civitai_oneshot_test.py @@ -0,0 +1,70 @@ +"""Tests de los helpers puros de comfyui_replicate_civitai_oneshot. + +No tocan red ni GPU: validan la clasificación de la entrada, la elección de +checkpoint sustituto, el match de modelos instalados y la extracción de prompt de +un workflow. El pipeline completo se valida en vivo en el report 0127. +""" +import os +import sys + +_FUNCTIONS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _FUNCTIONS_ROOT not in sys.path: + sys.path.insert(0, _FUNCTIONS_ROOT) + +from pipelines.comfyui_replicate_civitai_oneshot import ( # noqa: E402 + _classify_input, + _extract_positive_from_workflow, + _find_installed, + _pick_checkpoint, +) + + +def test_classify_input_image_modelversion_workflow_error(): + assert _classify_input(23526611) == ("image", 23526611) + assert _classify_input("https://civitai.com/images/777?x=1") == ("image", 777) + assert _classify_input("https://civitai.com/models/1?modelVersionId=42")[0] == "model_version" + assert _classify_input({"3": {"class_type": "KSampler"}})[0] == "workflow_source" + assert _classify_input("/tmp/foo.json")[0] == "workflow_source" + assert _classify_input("https://civitai.com/models/123")[0] == "error" + assert _classify_input("texto_suelto")[0] == "error" + + +def test_pick_checkpoint_familia_y_exacto(): + installed = ["dreamshaper_8.safetensors", "juggernaut_xl_v11.safetensors", + "v1-5-pruned-emaonly-fp16.safetensors", "svd.safetensors", + "hunyuan3d-dit-v2-mini.safetensors"] + # familia sdxl -> elige el que tiene 'xl' + ck, exact = _pick_checkpoint(installed, "sdxl", "algun_modelo_no_instalado") + assert ck == "juggernaut_xl_v11.safetensors" and exact is False + # familia sd15 -> evita los xl y los de video/3d + ck, exact = _pick_checkpoint(installed, "sd15", "otro") + assert "xl" not in ck.lower() and "svd" not in ck and exact is False + # hint exacto instalado -> exact True + ck, exact = _pick_checkpoint(installed, "sd15", "dreamshaper_8") + assert ck == "dreamshaper_8.safetensors" and exact is True + + +def test_find_installed_match_normalizado(): + installed = ["detail_tweaker_xl.safetensors", "watercolor_style_sd15.safetensors"] + # match normalizado (ignora separadores/ext/case) + assert _find_installed("Detail-Tweaker XL", installed) == "detail_tweaker_xl.safetensors" + # no instalado -> None + assert _find_installed("LoRA Inexistente 9000", installed) is None + + +def test_extract_positive_from_workflow(): + wf = { + "6": {"class_type": "CLIPTextEncode", "inputs": {"text": "a long positive prompt about a knight"}}, + "7": {"class_type": "CLIPTextEncode", "inputs": {"text": "blurry"}}, + "3": {"class_type": "KSampler", "inputs": {}}, + } + assert _extract_positive_from_workflow(wf) == "a long positive prompt about a knight" + assert _extract_positive_from_workflow({}) == "" + + +if __name__ == "__main__": + test_classify_input_image_modelversion_workflow_error() + test_pick_checkpoint_familia_y_exacto() + test_find_installed_match_normalizado() + test_extract_positive_from_workflow() + print("OK")