"""Descarga el PNG de una imagen de Civitai, segregando el NSFW a una subcarpeta. Baja el binario de la imagen a `/` (o, si `nsfw=True`, a `//`), aplicando la misma validación no-HTML que `comfyui_download_model` para no dejar páginas de error de Cloudflare/login disfrazadas de imagen. Las URLs de Civitai suelen apuntar a una variante redimensionada (`/width=N/`) que pierde los chunks de texto; por defecto se reescribe a la original (`/original=true/`) para conservar el workflow ComfyUI embebido que luego destila `comfyui_extract_recipe_from_png`. **Segregación NSFW**: la política del sistema permite NSFW pero SIEMPRE separado en su propia carpeta marcada. El caller pasa `nsfw=True` (tomado del item de `comfyui_search_civitai_images`) y la función lo enruta a `nsfw_subdir`. Impura: red (HTTP GET) + escritura en disco. Solo stdlib. """ import os import re import urllib.error import urllib.parse import urllib.request _HTML_SNIFF = (b" str: """Reescribe una URL de Civitai redimensionada a su original (best-effort).""" if _WIDTH_RE.search(url): return _WIDTH_RE.sub("/original=true/", url) return url def _derive_filename(url: str) -> str: """Nombre único: ., o el último segmento con extensión. Las URLs de Civitai llevan el UUID de la imagen como segmento de ruta; usarlo como nombre garantiza unicidad y evita que dos cosechas colisionen en un genérico tipo "original.png". """ path = urllib.parse.urlparse(url).path segs = [s for s in path.split("/") if s and "=" not in s] ext = ".png" for seg in reversed(segs): if "." in seg and not seg.endswith("."): cand_ext = os.path.splitext(seg)[1].lower() if cand_ext in (".png", ".jpeg", ".jpg", ".webp"): ext = cand_ext break uuid = _UUID_RE.search(path) if uuid: return uuid.group(0) + ext for seg in reversed(segs): if "." in seg and not seg.endswith("."): return seg return (segs[-1] if segs else "civitai_image") + ext def comfyui_fetch_civitai_image( image_url: str, *, dest_dir: str, nsfw: bool = False, nsfw_subdir: str = "nsfw", token: str | None = None, prefer_original: bool = True, timeout_s: float = 120.0, ) -> dict: """Descarga el PNG de una imagen de Civitai a disco, segregando el NSFW. Args: image_url: URL de la imagen (campo `url` de `comfyui_search_civitai_images`). dest_dir: carpeta destino (se expande ~). Se crea si no existe. keyword-only. nsfw: si True, la imagen se guarda en `//` en vez de directamente en `dest_dir`. keyword-only. nsfw_subdir: nombre de la subcarpeta para NSFW. Default "nsfw". keyword-only. token: token Civitai (header Authorization Bearer). Algunas imágenes lo exigen para servir el original. None lo omite. No hardcodear. keyword-only. prefer_original: si True (default) reescribe la URL `/width=N/` a `/original=true/` para conservar el workflow embebido. keyword-only. timeout_s: timeout HTTP en segundos. keyword-only. Returns: dict {ok, path, size_bytes, nsfw, error}. ok=False si la respuesta era HTML de error, demasiado pequeña, o falló la red/escritura (sin dejar basura en disco). `nsfw` refleja la carpeta usada. """ base = os.path.expanduser(dest_dir) target_dir = os.path.join(base, nsfw_subdir) if nsfw else base req_url = _to_original_url(image_url) if prefer_original else image_url headers = {"User-Agent": "fn-registry/comfyui_fetch_civitai_image"} if token: headers["Authorization"] = f"Bearer {token}" tmp_path = None try: req = urllib.request.Request(req_url, headers=headers) with urllib.request.urlopen(req, timeout=timeout_s) as resp: content_type = resp.headers.get("Content-Type", "") name = _derive_filename(resp.geturl()) or _derive_filename(image_url) if "text/html" in content_type.lower(): return {"ok": False, "path": "", "size_bytes": 0, "nsfw": nsfw, "error": (f"la respuesta es HTML (Content-Type: {content_type}), " "no una imagen. Revisa la URL/token.")} os.makedirs(target_dir, exist_ok=True) final_path = os.path.join(target_dir, name) tmp_path = final_path + ".part" first = resp.read(512) low = first.lower().lstrip() if any(low.startswith(sig) for sig in _HTML_SNIFF): return {"ok": False, "path": "", "size_bytes": 0, "nsfw": nsfw, "error": "la respuesta empieza con HTML (página de error/login), no una imagen."} size = 0 with open(tmp_path, "wb") as fh: fh.write(first) size += len(first) while True: chunk = resp.read(1024 * 256) if not chunk: break fh.write(chunk) size += len(chunk) except urllib.error.HTTPError as exc: body = exc.read().decode(errors="replace")[:300] _cleanup(tmp_path) return {"ok": False, "path": "", "size_bytes": 0, "nsfw": nsfw, "error": f"HTTP {exc.code} en {image_url}: {body}"} except Exception as exc: # noqa: BLE001 — red/DNS/escritura _cleanup(tmp_path) return {"ok": False, "path": "", "size_bytes": 0, "nsfw": nsfw, "error": f"fallo descargando {image_url}: {exc}"} if size < 1024: _cleanup(tmp_path) return {"ok": False, "path": "", "size_bytes": size, "nsfw": nsfw, "error": f"descarga sospechosamente pequeña ({size} bytes); probable error, no una imagen."} os.replace(tmp_path, final_path) return {"ok": True, "path": final_path, "size_bytes": size, "nsfw": nsfw, "error": ""} def _cleanup(path: str | None) -> None: if path and os.path.exists(path): try: os.remove(path) except OSError: pass if __name__ == "__main__": import json import sys out = comfyui_fetch_civitai_image( sys.argv[1] if len(sys.argv) > 1 else "http://127.0.0.1:8188/", dest_dir="/tmp/civitai_harvest_smoke", ) print(json.dumps(out, ensure_ascii=False, indent=2))