Source code for matmmextract.shared.downloader

"""
matmmextract.shared.downloader
==================================
Threaded image download engine shared by the Elsevier and Springer
download scripts.

The two original scripts (``elsevier_download_image.py`` and
``download_springer_img.py``) had largely parallel structures.
This module provides:

* ``next_name_allocator``  — resume-safe sequential image naming
                             (ported from the Springer script; replaces the
                             hardcoded ``img_counter = 13101`` in Elsevier)
* ``load_download_log``    — read a previous log to skip already-done URLs
* ``append_log``           — append rows to the CSV log incrementally
* ``DownloadResult``       — typed result from a single download attempt
* ``download_one``         — download one URL with retries
* ``run_downloads``        — orchestrate a full threaded batch

Each publisher supplies its own ``build_candidate_urls`` callable and
``request_headers`` dict; everything else is shared.
"""

from __future__ import annotations

import csv
import re
import time
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

import pandas as pd
import requests



[docs] @dataclass class DownloadResult: row_index: Any image_url: str status: str # "success" | "failed" | "empty_url" | "skipped" downloaded_image_name: str local_path: str attempted_url: str error: str = ""
[docs] def next_name_allocator( output_dir: str | Path, df: pd.DataFrame, name_prefix: str, ) -> Callable[[str], str]: """Return a closure that allocates the next unique image filename. Scans *output_dir* for existing files matching ``<name_prefix><N>.*`` and the ``downloaded_image_name`` column of *df* so that resumed runs never overwrite previous downloads. Parameters ---------- output_dir: Directory where images will be saved. df: DataFrame that may already have a ``downloaded_image_name`` column from a previous run. name_prefix: Prefix string, e.g. ``"alloy_img"`` or ``"img"``. Returns ------- Callable[[str], str] ``allocate(ext)`` → unique filename string (e.g. ``"alloy_img42.jpg"``). """ used: set[str] = set() max_num = 0 for path in Path(output_dir).glob(f"{name_prefix}*.*"): used.add(path.name) m = re.match(rf"^{re.escape(name_prefix)}(\d+)\.", path.name) if m: max_num = max(max_num, int(m.group(1))) if "downloaded_image_name" in df.columns: for val in df["downloaded_image_name"].dropna().astype(str): name = val.strip() if not name: continue used.add(name) m = re.match(rf"^{re.escape(name_prefix)}(\d+)\.", name) if m: max_num = max(max_num, int(m.group(1))) counter = max_num + 1 def allocate(ext: str) -> str: nonlocal counter while True: name = f"{name_prefix}{counter}{ext}" counter += 1 if name not in used: used.add(name) return name return allocate
[docs] def load_download_log(log_file: str | Path) -> dict[str, dict]: """Load a previous download log CSV into a ``{image_url: result}`` dict. Only ``"success"`` rows whose local file still exists are returned. Parameters ---------- log_file: Path to the CSV log written by previous runs. Returns ------- dict Keys are ``image_url`` strings; values are dicts with keys ``download_status``, ``downloaded_image_name``, ``downloaded_image_path``, ``attempted_url``. """ log_path = Path(log_file) if not log_path.exists(): return {} lookup: dict[str, dict] = {} with open(log_path, newline="", encoding="utf-8") as fh: reader = csv.reader(fh) next(reader, None) # skip header for row in reader: if len(row) < 4: continue image_url = row[0].strip() status = row[1].strip() # Support both 4-column and 6-column log formats if len(row) >= 6: local_name = row[2].strip() local_path = row[3].strip() attempted_url = row[4].strip() else: local_path = row[2].strip() attempted_url = row[3].strip() local_name = Path(local_path).name if local_path else "" if not image_url or status != "success" or not local_path: continue if not Path(local_path).exists(): continue lookup[image_url] = { "download_status": status, "downloaded_image_name": local_name or Path(local_path).name, "downloaded_image_path": local_path, "attempted_url": attempted_url, } return lookup
[docs] def append_log(log_file: str | Path, rows: list[dict]) -> None: """Append *rows* to the CSV log, writing a header if the file is new. Parameters ---------- log_file: Destination CSV path. rows: List of dicts; all must share the same keys. """ if not rows: return log_path = Path(log_file) write_header = not log_path.exists() or log_path.stat().st_size == 0 pd.DataFrame(rows).to_csv(log_path, mode="a", header=write_header, index=False)
[docs] def extension_from_url(url: str) -> str: """Infer a file extension from a URL, defaulting to ``.jpg``.""" suffix = Path(urlparse(str(url)).path).suffix.lower() if suffix in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg", ".tif", ".tiff"}: return suffix return ".jpg"
[docs] def download_one( index: Any, image_url: str, candidate_urls: list[str], local_path: Path, local_name: str, headers: dict, retries: int = 3, timeout: int = 90, ) -> DownloadResult: """Attempt to download one image, trying each candidate URL in order. Parameters ---------- index: DataFrame row index (passed through to the result for later join). image_url: Original URL from the figure CSV (used as the lookup key in logs). candidate_urls: Ordered list of URLs to try (built by the caller's ``build_candidate_urls`` function). local_path: Full path where the file should be written on success. local_name: Bare filename (used in the result for the CSV column). headers: HTTP request headers (auth tokens, User-Agent, Accept, etc.). retries: Attempts per candidate URL before moving to the next. timeout: Per-request timeout in seconds. Returns ------- DownloadResult """ last_error = "NO_URL" for url in candidate_urls: for attempt in range(retries): try: response = requests.get(url, headers=headers, timeout=timeout) if response.status_code == 429: time.sleep(60 + attempt * 30) continue if response.status_code != 200: last_error = f"HTTP_{response.status_code}" time.sleep(1 + attempt) continue content_type = response.headers.get("content-type", "").lower() content = response.content is_svg = content.lstrip().startswith(b"<svg") if "image" not in content_type and not is_svg: last_error = f"NON_IMAGE:{content_type}" break # wrong content-type: skip remaining attempts for this URL local_path.write_bytes(content) return DownloadResult( row_index=index, image_url=image_url, status="success", downloaded_image_name=local_name, local_path=str(local_path), attempted_url=url, ) except Exception as exc: last_error = str(exc) time.sleep(2 + attempt * 2) return DownloadResult( row_index=index, image_url=image_url, status="failed", downloaded_image_name="", local_path="", attempted_url=" || ".join(candidate_urls), error=last_error, )
[docs] def run_downloads( df: pd.DataFrame, output_dir: str | Path, output_csv: str | Path, log_file: str | Path, build_candidate_urls: Callable[[str, str], list[str]], request_headers: dict, name_prefix: str = "img", max_workers: int = 4, retries: int = 3, timeout: int = 90, cool_every: int = 100, cool_seconds: float = 60.0, long_cool_every: int = 1500, long_cool_seconds: float = 300.0, xml_file_col: str = "xml_file", verbose: bool = True, ) -> pd.DataFrame: """Download all images referenced in *df*, with resume support. Parameters ---------- df: Figure DataFrame with at least an ``image_url`` column. Modified in-place with ``download_status``, ``downloaded_image_name``, ``downloaded_image_path`` columns. output_dir: Directory where image files are saved. output_csv: CSV written after each batch (safe to resume from). log_file: Per-URL download log CSV. build_candidate_urls: ``(image_url, xml_file) -> [url, ...]`` Publisher-specific URL builder. request_headers: HTTP headers for every request (auth keys, User-Agent, etc.). name_prefix: Image filename prefix (e.g. ``"alloy_img"`` or ``"img"``). max_workers: Thread pool size. retries: Attempts per candidate URL inside :func:`download_one`. timeout: Per-request timeout seconds. cool_every / cool_seconds: Pause after this many *real* downloads. long_cool_every / long_cool_seconds: Longer pause after this many real downloads. xml_file_col: Column in *df* holding the source XML filename (used by ``build_candidate_urls`` for relative-URL resolution). verbose: Print progress lines. Returns ------- pd.DataFrame Updated *df* (also written to *output_csv*). """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) for col in ("download_status", "downloaded_image_name", "downloaded_image_path"): if col not in df.columns: df[col] = "" already_done = load_download_log(log_file) allocate_name = next_name_allocator(output_dir, df, name_prefix) # Build task list tasks: list[tuple] = [] for i, row in df.iterrows(): image_url = str(row.get("image_url", "")).strip() if not image_url: df.at[i, "download_status"] = "empty_url" continue # Resume: already succeeded in a previous run if image_url in already_done: prev = already_done[image_url] df.at[i, "download_status"] = prev["download_status"] df.at[i, "downloaded_image_name"] = prev["downloaded_image_name"] df.at[i, "downloaded_image_path"] = prev["downloaded_image_path"] continue # Resume: success recorded in the df itself and file exists existing_status = str(row.get("download_status", "")).strip() existing_path = str(row.get("downloaded_image_path", "")).strip() if existing_status == "success" and existing_path and Path(existing_path).exists(): if Path(existing_path).parent.resolve() == output_dir.resolve(): continue # Stale path pointing to a different directory — retry df.at[i, "downloaded_image_path"] = "" df.at[i, "download_status"] = "" candidates = build_candidate_urls(image_url, str(row.get(xml_file_col, ""))) ext = extension_from_url(candidates[0] if candidates else image_url) local_name = allocate_name(ext) local_path = output_dir / local_name tasks.append((i, image_url, candidates, local_path, local_name)) total = len(tasks) if verbose: print(f"\nPending downloads: {total} | output: {output_dir} | workers: {max_workers}\n") if not tasks: df.to_csv(output_csv, index=False) return df completed = 0 downloaded_count = 0 batch_log: list[dict] = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit( download_one, idx, url, candidates, lpath, lname, request_headers, retries, timeout, ): idx for idx, url, candidates, lpath, lname in tasks } for future in as_completed(futures): result: DownloadResult = future.result() i = result.row_index df.at[i, "download_status"] = result.status df.at[i, "downloaded_image_name"] = result.downloaded_image_name df.at[i, "downloaded_image_path"] = result.local_path batch_log.append({ "image_url": result.image_url, "status": result.status, "downloaded_image_name": result.downloaded_image_name, "local_path": result.local_path, "attempted_url": result.attempted_url, "error": result.error, }) if result.status == "success": downloaded_count += 1 completed += 1 if completed % 25 == 0 or completed == total: append_log(log_file, batch_log) batch_log = [] df.to_csv(output_csv, index=False) if verbose: print(f" {completed}/{total}{downloaded_count} downloaded") # Cooling pauses (only triggered by real downloads) if downloaded_count and downloaded_count % long_cool_every == 0: if verbose: print(f"\nLong cool-down after {long_cool_every} downloads → {long_cool_seconds}s\n") time.sleep(long_cool_seconds) elif downloaded_count and downloaded_count % cool_every == 0: if verbose: print(f"\nCool-down after {cool_every} downloads → {cool_seconds}s\n") time.sleep(cool_seconds) # Final flush if batch_log: append_log(log_file, batch_log) df.to_csv(output_csv, index=False) if verbose: success = int((df["download_status"] == "success").sum()) failed = int((df["download_status"] == "failed").sum()) print(f"\nDone. Success: {success} | Failed: {failed} | CSV: {output_csv}") return df