Commit 02cbd593 authored by Jan Reimes's avatar Jan Reimes
Browse files

Fix empty checkout dirs and Docs/Docs/ URL duplication

Three fixes for checkout reliability:

1. checkout_tdoc: 'already checked out' guard now verifies the directory
   contains actual files, not just that it exists. Empty dirs from failed
   downloads trigger re-download.

2. fetch_tdoc_files: uses _checkout_has_files() instead of bare
   Path.exists() to decide whether download is needed.

3. _resolve_corrected_url: walks up parent directories when direct listing
   fails, handling duplicated path segments (e.g. Docs/Docs/). Also
   triggered on HTTPError (403), not just BadZipFile.

4. checkout_tdoc: HTTPError from download triggers URL correction fallback,
   not just BadZipFile.
parent 1e43ed37
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
@@ -78,7 +78,8 @@ def fetch_tdoc_files(document_id: str, force_download: bool = False) -> TDocFile
    checkout_dir = PathConfig().checkout_dir
    checkout_path = get_checkout_path(metadata, checkout_dir)

    if not checkout_path.exists() or force_download:
    needs_download = force_download or not _checkout_has_files(checkout_path)
    if needs_download:
        with create_cached_session() as session:
            checkout_tdoc(metadata, checkout_dir, force=force_download, session=session)

@@ -97,6 +98,13 @@ def fetch_spec_files(checkout_path: Path) -> TDocFiles:
    return _find_files_in_checkout(checkout_path)


def _checkout_has_files(checkout_path: Path) -> bool:
    """Check if a checkout directory exists and contains actual files."""
    if not checkout_path.is_dir():
        return False
    return any(f.is_file() for f in checkout_path.rglob("*") if not f.name.startswith("."))


def _find_files_in_checkout(checkout_path: Path) -> TDocFiles:
    """Find available file types in checkout directory."""
    files = TDocFiles(checkout_dir=checkout_path)
+48 −18
Original line number Diff line number Diff line
@@ -86,34 +86,52 @@ def get_checkout_path(metadata: TDocMetadata, checkout_dir: Path) -> Path:


def _resolve_corrected_url(metadata: TDocMetadata, session: requests.Session | None = None) -> str | None:
    """Fetch directory listing and find case-correct URL for the TDoc zip.
    """Fetch directory listing and find the correct URL for the TDoc zip.

    When the server hosts a file with different casing than expected,
    this parses the directory listing HTML to find the actual filename.
    Tries the direct parent directory first. If that fails (e.g. the
    directory does not exist due to path duplication like ``Docs/Docs/``),
    progressively tries ancestor directories until a match is found.

    Handles case-mismatched filenames and duplicated path segments.
    """
    if not metadata.url:
        return None

    directory_url = metadata.url.rsplit("/", 1)[0] + "/"
    expected_filename = posixpath.basename(metadata.url)
    expected_lower = expected_filename.lower()

    # Build candidate directory URLs: direct parent, then grandparents
    url_parts = metadata.url.rsplit("/", 1)
    directory_url = url_parts[0] + "/"
    candidate_dirs = [directory_url]

    # Walk up the path to handle duplicated segments (e.g. Docs/Docs/)
    parsed = urlparse(directory_url)
    path_segments = [s for s in parsed.path.split("/") if s]
    # Remove the last segment and try again (up to 3 levels)
    for i in range(1, min(4, len(path_segments))):
        shorter = "/" + "/".join(path_segments[:-i]) + "/"
        candidate = f"{parsed.scheme}://{parsed.netloc}{shorter}"
        candidate_dirs.append(candidate)

    try:
    with requests.Session() as plain_session:
            response = plain_session.get(directory_url, timeout=60)
        for dir_url in candidate_dirs:
            try:
                response = plain_session.get(dir_url, timeout=60)
                response.raise_for_status()
            soup = BeautifulSoup(response.content or b"", "html.parser")
            expected_lower = expected_filename.lower()
            except (requests.RequestException, OSError):
                logger.debug("Directory listing failed for %s", dir_url)
                continue

            soup = BeautifulSoup(response.content or b"", "html.parser")
            for link in soup.find_all("a", href=True):
                href = link.get("href")
                if not isinstance(href, str):
                    continue
                full_url = urljoin(directory_url, href)
                full_url = urljoin(dir_url, href)
                actual_filename = posixpath.basename(full_url)
                if actual_filename.lower() == expected_lower:
                    return full_url
    except (requests.RequestException, OSError):
        logger.debug(f"Failed to fetch directory listing from {directory_url}")

    return None

@@ -148,8 +166,11 @@ def checkout_tdoc(
    checkout_path = get_checkout_path(metadata, checkout_dir)

    if checkout_path.exists() and not force:
        has_files = any(f.is_file() for f in checkout_path.rglob("*") if not f.name.startswith("."))
        if has_files:
            logger.debug(f"TDoc {metadata.tdoc_id} already checked out at {checkout_path}")
            return checkout_path
        logger.debug(f"TDoc {metadata.tdoc_id} checkout dir exists but is empty, re-downloading")

    checkout_path.mkdir(parents=True, exist_ok=True)
    temp_zip_file = checkout_path / f"{metadata.tdoc_id}.zip"
@@ -158,21 +179,30 @@ def checkout_tdoc(
        msg = f"TDoc {metadata.tdoc_id} has no URL"
        raise ValueError(msg)

    try:
        try:
            download_to_file(metadata.url, temp_zip_file, session=session)
        except requests.HTTPError:
            corrected_url = _resolve_corrected_url(metadata)
            if corrected_url and corrected_url != metadata.url:
                logger.info("Retrying %s with corrected URL: %s", metadata.tdoc_id, corrected_url)
                with requests.Session() as plain_session:
                    download_to_file(corrected_url, temp_zip_file, session=plain_session)
            else:
                raise
        try:
            with zipfile.ZipFile(temp_zip_file) as _zf:
                pass
        except zipfile.BadZipFile:
            corrected_url = _resolve_corrected_url(metadata)
            if corrected_url and corrected_url != metadata.url:
                logger.info(f"Retrying checkout for {metadata.tdoc_id} with corrected URL: {corrected_url}")
                logger.info("Retrying %s with corrected URL: %s", metadata.tdoc_id, corrected_url)
                with requests.Session() as plain_session:
                    download_to_file(corrected_url, temp_zip_file, session=plain_session)
            else:
                raise
        safe_extract_zip(temp_zip_file, checkout_path)
        logger.info(f"Checked out {metadata.tdoc_id} to {checkout_path}")
        logger.info("Checked out %s to %s", metadata.tdoc_id, checkout_path)
    finally:
        if temp_zip_file.exists():
            temp_zip_file.unlink()