541 lines
22 KiB
Python
541 lines
22 KiB
Python
"""Extracción de grafo ontológico desde un documento.
|
|
|
|
Uso: python extract.py <archivo>
|
|
python extract.py data/condiciones-generales-bizum.pdf
|
|
|
|
Optimizaciones vs extraction_pipeline:
|
|
- 1 sola llamada LLM por chunk (entities + relations + tipos sugeridos)
|
|
- Chunks de 2000 chars
|
|
- Paralelizado con ThreadPoolExecutor
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "lib"))
|
|
|
|
from extract_text_from_file import extract_text_from_file
|
|
from core_functions import preprocess_text, extract_json_from_llm
|
|
from split_text_into_chunks import split_text_into_chunks
|
|
from deduplicate_entities import deduplicate_entities
|
|
from deduplicate_relations import deduplicate_relations
|
|
from entity_candidate import EntityCandidate
|
|
from relation_candidate import RelationCandidate
|
|
from render_sigma_html import render_sigma_html
|
|
|
|
# ── Presets ────────────────────────────────────────────────────────────────────
|
|
|
|
OSINT_PRESETS = [
|
|
{"type_ref": "person", "label": "Person",
|
|
"metadata_fields": ["full_name", "alias", "nationality", "dob", "gender", "risk_score"]},
|
|
{"type_ref": "organization", "label": "Organization",
|
|
"metadata_fields": ["legal_name", "country", "sector", "founded", "risk_score"]},
|
|
{"type_ref": "location", "label": "Location",
|
|
"metadata_fields": ["lat", "lon", "address", "country", "city"]},
|
|
{"type_ref": "event", "label": "Event",
|
|
"metadata_fields": ["event_type", "date", "location", "description", "severity"]},
|
|
{"type_ref": "email", "label": "Email",
|
|
"metadata_fields": ["address", "provider", "verified", "breached"]},
|
|
{"type_ref": "domain", "label": "Domain",
|
|
"metadata_fields": ["fqdn", "registrar", "created_date", "expires_date"]},
|
|
{"type_ref": "ip_address", "label": "IP Address",
|
|
"metadata_fields": ["ip", "asn", "country", "isp", "geolocation"]},
|
|
{"type_ref": "phone", "label": "Phone",
|
|
"metadata_fields": ["number", "country_code", "carrier", "phone_type"]},
|
|
{"type_ref": "social_media", "label": "Social Media Account",
|
|
"metadata_fields": ["platform", "username", "url", "followers", "verified"]},
|
|
{"type_ref": "document", "label": "Document",
|
|
"metadata_fields": ["title", "format", "classification", "source"]},
|
|
{"type_ref": "crypto_wallet", "label": "Crypto Wallet",
|
|
"metadata_fields": ["address", "blockchain", "balance"]},
|
|
{"type_ref": "malware", "label": "Malware",
|
|
"metadata_fields": ["family", "hash_sha256", "threat_level"]},
|
|
{"type_ref": "vulnerability", "label": "Vulnerability",
|
|
"metadata_fields": ["cve_id", "cvss", "affected_product", "exploited"]},
|
|
]
|
|
|
|
GENERIC_PRESETS = [
|
|
{"type_ref": "concept", "label": "Concept",
|
|
"metadata_fields": ["name", "category", "definition"]},
|
|
{"type_ref": "url", "label": "URL/Link",
|
|
"metadata_fields": ["url", "domain", "context"]},
|
|
{"type_ref": "date_reference", "label": "Date/Time",
|
|
"metadata_fields": ["date", "precision", "context"]},
|
|
{"type_ref": "quantity", "label": "Quantity/Amount",
|
|
"metadata_fields": ["value", "unit", "context"]},
|
|
{"type_ref": "coordinates", "label": "Coordinates",
|
|
"metadata_fields": ["lat", "lon", "label"]},
|
|
{"type_ref": "text_fragment", "label": "Key Text Fragment",
|
|
"metadata_fields": ["text", "category", "relevance"]},
|
|
]
|
|
|
|
# ── Custom presets (acumulativo, pensado para promoción al registry) ───────────
|
|
|
|
CUSTOM_PRESETS_PATH = os.path.join(os.path.dirname(__file__), "data", "custom_presets.json")
|
|
|
|
|
|
def load_custom_presets() -> list[dict]:
|
|
"""Carga presets custom desde data/custom_presets.json si existe."""
|
|
if not os.path.exists(CUSTOM_PRESETS_PATH):
|
|
return []
|
|
with open(CUSTOM_PRESETS_PATH) as f:
|
|
data = json.load(f)
|
|
return data.get("presets", [])
|
|
|
|
|
|
def save_custom_presets(presets: list[dict]) -> None:
|
|
"""Guarda presets custom en data/custom_presets.json.
|
|
|
|
Formato pensado para promoción al registry:
|
|
{
|
|
"presets": [
|
|
{
|
|
"type_ref": "snake_case_id",
|
|
"label": "Human Label",
|
|
"metadata_fields": ["field1", "field2"],
|
|
"reason": "why this type exists",
|
|
"source_doc": "document where it was first discovered",
|
|
"promoted": false // true cuando se registre en el registry
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
os.makedirs(os.path.dirname(CUSTOM_PRESETS_PATH), exist_ok=True)
|
|
with open(CUSTOM_PRESETS_PATH, "w") as f:
|
|
json.dump({"presets": presets}, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def merge_suggested_into_custom(suggested: list[dict], source_doc: str) -> list[dict]:
|
|
"""Mergea tipos sugeridos con custom existentes. Dedup por type_ref."""
|
|
existing = load_custom_presets()
|
|
existing_refs = {p["type_ref"] for p in existing}
|
|
|
|
added = []
|
|
for s in suggested:
|
|
ref = s.get("type_ref", "")
|
|
if not ref or ref in existing_refs:
|
|
continue
|
|
existing_refs.add(ref)
|
|
preset = {
|
|
"type_ref": ref,
|
|
"label": s.get("label", ref),
|
|
"metadata_fields": s.get("metadata_fields", []),
|
|
"reason": s.get("reason", ""),
|
|
"source_doc": source_doc,
|
|
"promoted": False,
|
|
}
|
|
existing.append(preset)
|
|
added.append(preset)
|
|
|
|
if added:
|
|
save_custom_presets(existing)
|
|
|
|
return added
|
|
|
|
|
|
RELATION_TYPES = [
|
|
"employs", "works_for", "founded", "owns", "controls",
|
|
"member_of", "affiliated_with", "collaborates_with",
|
|
"communicates_with", "sent_to", "received_from",
|
|
"located_in", "headquartered_in", "traveled_to", "operates_in",
|
|
"participated_in", "caused", "occurred_at", "occurred_on",
|
|
"mentions", "references", "describes", "authored", "published",
|
|
"funds", "transacted_with", "invested_in",
|
|
"hosts", "resolves_to", "exploits", "targets",
|
|
"related_to", "part_of", "instance_of", "has_attribute",
|
|
]
|
|
|
|
# ── LLM wrapper ───────────────────────────────────────────────────────────────
|
|
|
|
def claude_haiku_json(messages: list[dict]) -> dict:
|
|
parts = []
|
|
for msg in messages:
|
|
if msg["role"] == "system":
|
|
parts.append(f"[SYSTEM]\n{msg['content']}")
|
|
elif msg["role"] == "user":
|
|
parts.append(f"[USER]\n{msg['content']}")
|
|
prompt = "\n\n".join(parts)
|
|
|
|
result = subprocess.run(
|
|
["claude", "-p", "--model", "haiku", "--output-format", "json", prompt],
|
|
capture_output=True, text=True, timeout=120,
|
|
)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"claude -p failed: {result.stderr[:200]}")
|
|
|
|
envelope = json.loads(result.stdout)
|
|
return extract_json_from_llm(envelope.get("result", ""))
|
|
|
|
# ── Unified prompt ─────────────────────────────────────────────────────────────
|
|
|
|
def build_unified_prompt(presets, rel_types):
|
|
type_lines = []
|
|
for p in presets:
|
|
fields = ", ".join(p.get("metadata_fields", []))
|
|
type_lines.append(f"- {p['label']} (type_ref: {p['type_ref']}): [{fields}]")
|
|
|
|
return (
|
|
"You are an entity and relation extraction expert. "
|
|
"Given text, extract ALL entities and relations in a single pass.\n\n"
|
|
"ENTITY TYPES:\n" + "\n".join(type_lines) + "\n\n"
|
|
"RELATION TYPES: " + ", ".join(rel_types) + "\n\n"
|
|
'OUTPUT FORMAT (strict JSON):\n'
|
|
'{\n'
|
|
' "entities": [{"name": "...", "type_ref": "...", "attributes": {...}, "confidence": 0.9}],\n'
|
|
' "relations": [{"from_name": "...", "to_name": "...", "relation_type": "...", "confidence": 0.8, "description": "..."}],\n'
|
|
' "suggested_types": [{"type_ref": "snake_case_id", "label": "Human Label", "metadata_fields": ["f1","f2"], "reason": "..."}]\n'
|
|
'}\n\n'
|
|
"RULES:\n"
|
|
"- Extract ALL entities explicitly mentioned\n"
|
|
"- Use exact type_ref from schema. Unknown attributes = null\n"
|
|
"- Confidence: 1.0=explicit, 0.7=strongly implied, 0.5=weakly implied\n"
|
|
"- Relations: from_name/to_name MUST match entity names exactly\n"
|
|
"- suggested_types: for important entities that do NOT fit any type, suggest a new type. "
|
|
"Use those suggested type_refs for those entities in the entities array.\n"
|
|
'- If no new types needed: "suggested_types": []\n'
|
|
"- Respond in the same language as the text for descriptions"
|
|
)
|
|
|
|
# ── Process one chunk ──────────────────────────────────────────────────────────
|
|
|
|
def process_chunk(chunk_idx: int, chunk_text: str, system_prompt: str):
|
|
"""Procesa un chunk: extrae entities + relations + suggested_types."""
|
|
try:
|
|
resp = claude_haiku_json([
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": chunk_text},
|
|
])
|
|
except Exception as e:
|
|
print(f" [WARN] chunk {chunk_idx}: {e}")
|
|
return [], [], []
|
|
|
|
raw_entities = resp.get("entities", [])
|
|
raw_relations = resp.get("relations", [])
|
|
suggested = resp.get("suggested_types", [])
|
|
|
|
entities = []
|
|
for ent in raw_entities:
|
|
name = ent.get("name", "").strip()
|
|
if not name:
|
|
continue
|
|
entities.append(EntityCandidate(
|
|
name=name,
|
|
type_ref=ent.get("type_ref", "concept"),
|
|
attributes=ent.get("attributes", {}),
|
|
confidence=float(ent.get("confidence", 0.5)),
|
|
source_chunk_indices=[chunk_idx],
|
|
))
|
|
|
|
relations = []
|
|
for rel in raw_relations:
|
|
fn = rel.get("from_name", "").strip()
|
|
tn = rel.get("to_name", "").strip()
|
|
if not fn or not tn:
|
|
continue
|
|
relations.append(RelationCandidate(
|
|
from_name=fn,
|
|
to_name=tn,
|
|
relation_type=rel.get("relation_type", "related_to"),
|
|
confidence=float(rel.get("confidence", 0.5)),
|
|
description=rel.get("description", ""),
|
|
source_chunk_index=chunk_idx,
|
|
))
|
|
|
|
return entities, relations, suggested
|
|
|
|
# ── Sigma conversion ───────────────────────────────────────────────────────────
|
|
|
|
TYPE_COLORS = {
|
|
"person": "#e74c3c",
|
|
"organization": "#3498db",
|
|
"location": "#2ecc71",
|
|
"event": "#f39c12",
|
|
"email": "#9b59b6",
|
|
"domain": "#1abc9c",
|
|
"ip_address": "#e67e22",
|
|
"phone": "#95a5a6",
|
|
"social_media": "#e91e63",
|
|
"document": "#607d8b",
|
|
"crypto_wallet": "#ff9800",
|
|
"malware": "#f44336",
|
|
"vulnerability": "#ff5722",
|
|
"concept": "#00bcd4",
|
|
"url": "#8bc34a",
|
|
"date_reference": "#cddc39",
|
|
"quantity": "#ffc107",
|
|
"coordinates": "#4caf50",
|
|
"text_fragment": "#78909c",
|
|
}
|
|
|
|
def to_sigma(entities, relations, entity_id_map):
|
|
# Build name→UUID lookup from dedup map
|
|
# entity_id_map: {name_variant -> uuid, ...}
|
|
# Invert to uuid→canonical_name using entities list
|
|
uuid_to_name = {}
|
|
name_to_uuid = {}
|
|
for e in entities:
|
|
# Find this entity's UUID in the map
|
|
uuid = entity_id_map.get(e.name, entity_id_map.get(e.name.lower().strip(), e.name))
|
|
uuid_to_name[uuid] = e.name
|
|
name_to_uuid[e.name] = uuid
|
|
|
|
degree = {}
|
|
for r in relations:
|
|
fid = r.from_id or r.from_name
|
|
tid = r.to_id or r.to_name
|
|
degree[fid] = degree.get(fid, 0) + 1
|
|
degree[tid] = degree.get(tid, 0) + 1
|
|
|
|
nodes = []
|
|
seen_uuids = set()
|
|
for e in entities:
|
|
uuid = name_to_uuid.get(e.name, e.name)
|
|
if uuid in seen_uuids:
|
|
continue
|
|
seen_uuids.add(uuid)
|
|
# Filter out 'type' — sigma.js reserves it for node render program
|
|
reserved = {"type", "hidden", "x", "y"}
|
|
attrs = {k: str(v) for k, v in (e.attributes or {}).items() if v is not None and k not in reserved}
|
|
nodes.append({
|
|
"key": uuid,
|
|
"attributes": {
|
|
"label": e.name,
|
|
"color": TYPE_COLORS.get(e.type_ref, "#aaaaaa"),
|
|
"size": 4 + min(degree.get(uuid, 0) * 2, 20),
|
|
"entity_type": e.type_ref,
|
|
**attrs,
|
|
},
|
|
})
|
|
|
|
node_keys = {n["key"] for n in nodes}
|
|
edges = []
|
|
seen_edges = set()
|
|
for i, r in enumerate(relations):
|
|
fid = r.from_id or r.from_name
|
|
tid = r.to_id or r.to_name
|
|
if fid in node_keys and tid in node_keys and fid != tid:
|
|
edge_key = (fid, tid, r.relation_type)
|
|
if edge_key in seen_edges:
|
|
continue
|
|
seen_edges.add(edge_key)
|
|
edges.append({
|
|
"key": f"e{i}",
|
|
"source": fid,
|
|
"target": tid,
|
|
"attributes": {"label": r.relation_type},
|
|
})
|
|
|
|
return {"nodes": nodes, "edges": edges}
|
|
|
|
# ── Reclasificación de entidades genéricas ─────────────────────────────────────
|
|
|
|
GENERIC_TYPE_REFS = {"concept", "text_fragment", "url", "date_reference", "quantity", "coordinates"}
|
|
|
|
|
|
def reclassify_generic_entities(entities, new_presets, workers=4):
|
|
"""Reclasifica entidades genéricas usando los tipos recién descubiertos.
|
|
|
|
En vez de re-procesar chunks, hace 1 llamada batch a haiku con las entidades
|
|
genéricas y los nuevos presets para reclasificarlas in-place.
|
|
"""
|
|
generic = [(i, e) for i, e in enumerate(entities) if e.type_ref in GENERIC_TYPE_REFS]
|
|
if not generic or not new_presets:
|
|
return 0
|
|
|
|
# Construir prompt de reclasificación
|
|
type_lines = []
|
|
for p in new_presets:
|
|
fields = ", ".join(p.get("metadata_fields", []))
|
|
type_lines.append(f"- {p['label']} (type_ref: {p['type_ref']}): [{fields}]")
|
|
|
|
system = (
|
|
"You reclassify entities into more specific types. "
|
|
"For each entity, decide if it fits one of the NEW types below better than its current generic type. "
|
|
"If it fits, return the new type_ref and updated attributes. If not, return null.\n\n"
|
|
"NEW TYPES:\n" + "\n".join(type_lines) + "\n\n"
|
|
'OUTPUT: {"reclassified": [{"index": 0, "type_ref": "new_type", "attributes": {...}}, ...]}\n'
|
|
"Only include entities that should change. Omit those that should stay as-is."
|
|
)
|
|
|
|
# Procesar en batches de 30 entidades para no exceder contexto
|
|
batch_size = 30
|
|
total_changed = 0
|
|
|
|
def _reclassify_batch(batch):
|
|
items = [{"index": idx, "name": e.name, "current_type": e.type_ref,
|
|
"attributes": e.attributes} for idx, e in batch]
|
|
try:
|
|
resp = claude_haiku_json([
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": json.dumps(items, ensure_ascii=False)},
|
|
])
|
|
return resp.get("reclassified", [])
|
|
except Exception:
|
|
return []
|
|
|
|
batches = [generic[i:i+batch_size] for i in range(0, len(generic), batch_size)]
|
|
|
|
with ThreadPoolExecutor(max_workers=workers) as pool:
|
|
futures = {pool.submit(_reclassify_batch, b): b for b in batches}
|
|
for future in as_completed(futures):
|
|
for item in future.result():
|
|
idx = item.get("index")
|
|
new_ref = item.get("type_ref", "")
|
|
if idx is not None and new_ref and 0 <= idx < len(entities):
|
|
entities[idx].type_ref = new_ref
|
|
if item.get("attributes"):
|
|
entities[idx].attributes.update(item["attributes"])
|
|
total_changed += 1
|
|
|
|
return total_changed
|
|
|
|
|
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Uso: python extract.py <archivo>")
|
|
sys.exit(1)
|
|
|
|
file_path = sys.argv[1]
|
|
if not os.path.isabs(file_path):
|
|
file_path = os.path.join(os.path.dirname(__file__), file_path)
|
|
|
|
workers = int(sys.argv[2]) if len(sys.argv) > 2 else 4
|
|
|
|
print(f"=== Ontology Graph Extraction ===")
|
|
print(f"File: {file_path}")
|
|
print(f"Workers: {workers}")
|
|
start = time.monotonic()
|
|
|
|
# 1. Extraer y preprocesar texto
|
|
print("\n[1/5] Extracting text...")
|
|
raw = extract_text_from_file(file_path)
|
|
text = preprocess_text(raw)
|
|
print(f" {len(text)} chars")
|
|
|
|
# 2. Chunking
|
|
print("[2/5] Chunking...")
|
|
chunks = split_text_into_chunks(text, chunk_size=2000, overlap=200)
|
|
print(f" {len(chunks)} chunks")
|
|
|
|
# 3. Extracción paralela
|
|
custom = load_custom_presets()
|
|
# Solo usar custom no promovidos (los promovidos ya estarán en el registry)
|
|
active_custom = [p for p in custom if not p.get("promoted", False)]
|
|
all_presets = OSINT_PRESETS + GENERIC_PRESETS + active_custom
|
|
print(f" Presets: {len(OSINT_PRESETS)} OSINT + {len(GENERIC_PRESETS)} generic + {len(active_custom)} custom")
|
|
system_prompt = build_unified_prompt(all_presets, RELATION_TYPES)
|
|
|
|
print(f"[3/5] Extracting entities + relations ({workers} workers)...")
|
|
all_entities = []
|
|
all_relations = []
|
|
all_suggested = []
|
|
|
|
with ThreadPoolExecutor(max_workers=workers) as pool:
|
|
futures = {
|
|
pool.submit(process_chunk, i, chunk, system_prompt): i
|
|
for i, chunk in enumerate(chunks)
|
|
}
|
|
for future in as_completed(futures):
|
|
idx = futures[future]
|
|
ents, rels, sugg = future.result()
|
|
all_entities.extend(ents)
|
|
all_relations.extend(rels)
|
|
all_suggested.extend(sugg)
|
|
print(f" chunk {idx+1}/{len(chunks)}: {len(ents)} entities, {len(rels)} relations" +
|
|
(f", {len(sugg)} new types" if sugg else ""))
|
|
|
|
# 4. Deduplicación
|
|
print(f"\n[4/5] Deduplicating...")
|
|
print(f" Raw: {len(all_entities)} entities, {len(all_relations)} relations")
|
|
|
|
dedup = deduplicate_entities(all_entities, name_threshold=0.85)
|
|
final_entities = dedup.entities
|
|
entity_id_map = dedup.name_to_id
|
|
|
|
final_relations = deduplicate_relations(all_relations, entity_id_map)
|
|
|
|
print(f" Final: {len(final_entities)} entities, {len(final_relations)} relations")
|
|
print(f" Merged: {dedup.total_before - dedup.total_after} entities, "
|
|
f"{len(all_relations) - len(final_relations)} relations")
|
|
|
|
# Registrar tipos sugeridos en custom_presets.json
|
|
unique_suggested = []
|
|
if all_suggested:
|
|
seen = set()
|
|
for s in all_suggested:
|
|
key = s.get("type_ref", "")
|
|
if key and key not in seen:
|
|
seen.add(key)
|
|
unique_suggested.append(s)
|
|
|
|
source_doc = os.path.basename(file_path)
|
|
added = merge_suggested_into_custom(unique_suggested, source_doc)
|
|
total_custom = len(load_custom_presets())
|
|
|
|
if added:
|
|
print(f"\n New types registered ({len(added)}):")
|
|
for p in added:
|
|
print(f" + {p['label']} ({p['type_ref']}): {p['metadata_fields']}")
|
|
print(f" Reason: {p['reason']}")
|
|
print(f" Total custom presets: {total_custom} (in {CUSTOM_PRESETS_PATH})")
|
|
|
|
# Reclasificar entidades genéricas con los tipos recién descubiertos
|
|
n_generic = sum(1 for e in final_entities if e.type_ref in GENERIC_TYPE_REFS)
|
|
if n_generic > 0:
|
|
print(f"\n Reclassifying {n_generic} generic entities with new types...")
|
|
changed = reclassify_generic_entities(final_entities, added, workers=workers)
|
|
print(f" Reclassified: {changed}/{n_generic}")
|
|
else:
|
|
print(f"\n {len(unique_suggested)} suggested types already registered ({total_custom} total custom)")
|
|
|
|
# Stats por tipo
|
|
type_counts = {}
|
|
for e in final_entities:
|
|
type_counts[e.type_ref] = type_counts.get(e.type_ref, 0) + 1
|
|
print(f"\n Entity types:")
|
|
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
|
|
print(f" {t}: {c}")
|
|
|
|
rel_counts = {}
|
|
for r in final_relations:
|
|
rel_counts[r.relation_type] = rel_counts.get(r.relation_type, 0) + 1
|
|
print(f" Relation types:")
|
|
for t, c in sorted(rel_counts.items(), key=lambda x: -x[1]):
|
|
print(f" {t}: {c}")
|
|
|
|
# 5. Visualización
|
|
print(f"\n[5/5] Generating graph...")
|
|
graph = to_sigma(final_entities, final_relations, entity_id_map)
|
|
out_dir = os.path.join(os.path.dirname(__file__), "data")
|
|
html_path = render_sigma_html(graph, os.path.join(out_dir, "ontology_graph.html"), "Ontology Graph")
|
|
print(f" {len(graph['nodes'])} nodes, {len(graph['edges'])} edges")
|
|
print(f" HTML: file://{html_path}")
|
|
|
|
# Guardar JSON intermedio
|
|
json_path = os.path.join(out_dir, "extraction_result.json")
|
|
with open(json_path, "w") as f:
|
|
json.dump({
|
|
"entities": [{"name": e.name, "type_ref": e.type_ref,
|
|
"confidence": e.confidence, "attributes": e.attributes}
|
|
for e in final_entities],
|
|
"relations": [{"from": r.from_name, "to": r.to_name,
|
|
"type": r.relation_type, "confidence": r.confidence,
|
|
"description": r.description}
|
|
for r in final_relations],
|
|
"suggested_types": [dict(s) for s in (unique_suggested if all_suggested else [])],
|
|
}, f, ensure_ascii=False, indent=2)
|
|
print(f" JSON: {json_path}")
|
|
|
|
elapsed = time.monotonic() - start
|
|
print(f"\nDone in {elapsed:.1f}s")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|