Loading src/tdoc_crawler/specs/downloads.py +127 −80 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import asyncio import zipfile from pathlib import Path from typing import Any import requests from zipinspect import HTTPZipReader Loading Loading @@ -40,57 +41,17 @@ class SpecDownloads: ) -> list[Path]: """Download and extract spec documents to the checkout directory.""" results: list[Path] = [] for spec in specs: url: str | None = None filename: str | None = None try: # check if it exists in the database; if not, attempt crawl if sources provided normalized = normalize_spec_number(spec) series = f"{normalized.split('.', maxsplit=1)[0]}_series" target_dir = checkout_dir / "Specs" / "archive" / series / normalized target_dir.mkdir(parents=True, exist_ok=True) # Resolve URL try: url, filename = self._resolve_spec_url(spec, release) except ValueError: if sources: _logger.info("Spec metadata not found for %s, attempting crawl...", normalized) self._database.crawl_specs([normalized], release, sources) # Try again after crawl try: url, filename = self._resolve_spec_url(normalized, release) except ValueError: _logger.warning("Spec metadata still not found for %s after crawl", normalized) continue else: _logger.warning("Spec metadata not found for %s and no sources provided", normalized) continue if url is None or filename is None: continue if sources: _logger.info("Found URL: %s", url) # doc-only logic success = False if doc_only: success = asyncio.run(self._attempt_doc_only_async(url, normalized, target_dir)) if not success: self._download_full_zip(url, target_dir / filename) self._extract_zip(target_dir / filename, target_dir) target_dir = self._checkout_single_spec( spec=spec, doc_only=doc_only, checkout_dir=checkout_dir, release=release, sources=sources, ) if target_dir is not None: results.append(target_dir) except Exception as exc: _logger.error("Failed to checkout %s: %s", spec, exc) continue return results def open_spec( Loading Loading @@ -118,6 +79,64 @@ class SpecDownloads: return zips[0] return target_dir def _checkout_single_spec( self, spec: str, doc_only: bool, checkout_dir: Path, release: str, sources: list[SpecSource] | None, ) -> Path | None: """Checkout a single spec into the archive and return target directory.""" try: normalized = normalize_spec_number(spec) target_dir = self._build_target_dir(checkout_dir, normalized) resolved = self._resolve_spec_url_with_fallback(spec, normalized, release, sources) if resolved is None: return None url, filename = resolved if sources: _logger.info("Found URL: %s", url) doc_only_success = False if doc_only: doc_only_success = asyncio.run(self._attempt_doc_only_async(url, normalized, target_dir)) if not doc_only_success: self._download_full_zip(url, target_dir / filename) self._extract_zip(target_dir / filename, target_dir) return target_dir except Exception as exc: _logger.error("Failed to checkout %s: %s", spec, exc) return None def _resolve_spec_url_with_fallback( self, spec: str, normalized: str, release: str, sources: list[SpecSource] | None, ) -> tuple[str, str] | None: """Resolve spec URL, optionally crawling sources when metadata is missing.""" try: return self._resolve_spec_url(spec, release) except ValueError: if not sources: _logger.warning("Spec metadata not found for %s and no sources provided", normalized) return None _logger.info("Spec metadata not found for %s, attempting crawl...", normalized) self._database.crawl_specs([normalized], release, sources) try: return self._resolve_spec_url(normalized, release) except ValueError: _logger.warning("Spec metadata still not found for %s after crawl", normalized) return None def _resolve_spec_url(self, spec: str, release: str | None) -> tuple[str, str]: """Resolve spec number to download URL and filename.""" release = release or "latest" Loading @@ -127,41 +146,18 @@ class SpecDownloads: if not versions: raise ValueError(f"No versions found for spec {normalized}") # Sort versions to find latest. Version strings (e.g. 17.0.0) sort lexicographically okay for major.minor.patch # But 9.0.0 > 10.0.0 is False in string sort ('9' > '1'). # We need generic version sort. # Simple tuple conversion: def parse_version(v: str) -> tuple[int, ...]: try: return tuple(map(int, v.split("."))) except ValueError: return (0,) versions.sort(key=lambda x: parse_version(x.version), reverse=True) versions.sort(key=lambda version: self._parse_version(version.version), reverse=True) # If specific release requested, use normalize_release to handle various formats release_type, release_value, specificity = normalize_release(release) if release_type in {"latest", "all"}: pass # Use all versions, already sorted by version desc elif release_type in ("exact", "prefix"): # Filter versions by release prefix filtered_versions: list = [] for v in versions: v_parts = parse_version(v.version) if release_type == "exact": # Exact match: specificity == 3, match full version if v.version == release_value: filtered_versions.append(v) # Prefix match: match major (specificity=1) or major.minor (specificity=2) elif len(v_parts) >= specificity: prefix_parts = release_value.split(".") if all(v_parts[i] == int(prefix_parts[i]) for i in range(specificity)): filtered_versions.append(v) if filtered_versions: versions = filtered_versions else: msg = f"No versions found for spec {normalized} with release {release}" raise ValueError(msg) versions = self._filter_versions_by_release( versions=versions, release_type=release_type, release_value=release_value, specificity=specificity, normalized=normalized, release=release, ) target = versions[0] Loading @@ -169,10 +165,61 @@ class SpecDownloads: url = SPEC_URL_TEMPLATE.format(series=series, normalized=normalized, file_name=target.file_name) return url, target.file_name def _filter_versions_by_release( self, versions: list[Any], release_type: str, release_value: str, specificity: int, normalized: str, release: str, ) -> list[Any]: """Filter candidate versions by normalized release selector.""" if release_type in {"latest", "all"}: return versions if release_type not in {"exact", "prefix"}: return versions filtered_versions: list[Any] = [] for version_entry in versions: version_parts = self._parse_version(version_entry.version) if release_type == "exact": if version_entry.version == release_value: filtered_versions.append(version_entry) continue if len(version_parts) < specificity: continue prefix_parts = release_value.split(".") if all(version_parts[index] == int(prefix_parts[index]) for index in range(specificity)): filtered_versions.append(version_entry) if filtered_versions: return filtered_versions msg = f"No versions found for spec {normalized} with release {release}" raise ValueError(msg) def _download_full_zip(self, url: str, target_path: Path) -> None: """Download full zip file, re-use session if already created for doc-only attempt.""" self.session = download_to_file(url, target_path, session=self.session, close_session=False, cache_manager_name=self._cache_manager.name) @staticmethod def _build_target_dir(checkout_dir: Path, normalized: str) -> Path: """Create and return target checkout directory for one normalized spec.""" series = f"{normalized.split('.', maxsplit=1)[0]}_series" target_dir = checkout_dir / "Specs" / "archive" / series / normalized target_dir.mkdir(parents=True, exist_ok=True) return target_dir @staticmethod def _parse_version(version: str) -> tuple[int, ...]: """Parse dotted version string into comparable integer tuple.""" try: return tuple(map(int, version.split("."))) except ValueError: return (0,) @staticmethod async def _attempt_doc_only_async(url: str, normalized: str, target_dir: Path) -> bool: """Attempt to download only the document file from remote zip.""" Loading Loading
src/tdoc_crawler/specs/downloads.py +127 −80 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import asyncio import zipfile from pathlib import Path from typing import Any import requests from zipinspect import HTTPZipReader Loading Loading @@ -40,57 +41,17 @@ class SpecDownloads: ) -> list[Path]: """Download and extract spec documents to the checkout directory.""" results: list[Path] = [] for spec in specs: url: str | None = None filename: str | None = None try: # check if it exists in the database; if not, attempt crawl if sources provided normalized = normalize_spec_number(spec) series = f"{normalized.split('.', maxsplit=1)[0]}_series" target_dir = checkout_dir / "Specs" / "archive" / series / normalized target_dir.mkdir(parents=True, exist_ok=True) # Resolve URL try: url, filename = self._resolve_spec_url(spec, release) except ValueError: if sources: _logger.info("Spec metadata not found for %s, attempting crawl...", normalized) self._database.crawl_specs([normalized], release, sources) # Try again after crawl try: url, filename = self._resolve_spec_url(normalized, release) except ValueError: _logger.warning("Spec metadata still not found for %s after crawl", normalized) continue else: _logger.warning("Spec metadata not found for %s and no sources provided", normalized) continue if url is None or filename is None: continue if sources: _logger.info("Found URL: %s", url) # doc-only logic success = False if doc_only: success = asyncio.run(self._attempt_doc_only_async(url, normalized, target_dir)) if not success: self._download_full_zip(url, target_dir / filename) self._extract_zip(target_dir / filename, target_dir) target_dir = self._checkout_single_spec( spec=spec, doc_only=doc_only, checkout_dir=checkout_dir, release=release, sources=sources, ) if target_dir is not None: results.append(target_dir) except Exception as exc: _logger.error("Failed to checkout %s: %s", spec, exc) continue return results def open_spec( Loading Loading @@ -118,6 +79,64 @@ class SpecDownloads: return zips[0] return target_dir def _checkout_single_spec( self, spec: str, doc_only: bool, checkout_dir: Path, release: str, sources: list[SpecSource] | None, ) -> Path | None: """Checkout a single spec into the archive and return target directory.""" try: normalized = normalize_spec_number(spec) target_dir = self._build_target_dir(checkout_dir, normalized) resolved = self._resolve_spec_url_with_fallback(spec, normalized, release, sources) if resolved is None: return None url, filename = resolved if sources: _logger.info("Found URL: %s", url) doc_only_success = False if doc_only: doc_only_success = asyncio.run(self._attempt_doc_only_async(url, normalized, target_dir)) if not doc_only_success: self._download_full_zip(url, target_dir / filename) self._extract_zip(target_dir / filename, target_dir) return target_dir except Exception as exc: _logger.error("Failed to checkout %s: %s", spec, exc) return None def _resolve_spec_url_with_fallback( self, spec: str, normalized: str, release: str, sources: list[SpecSource] | None, ) -> tuple[str, str] | None: """Resolve spec URL, optionally crawling sources when metadata is missing.""" try: return self._resolve_spec_url(spec, release) except ValueError: if not sources: _logger.warning("Spec metadata not found for %s and no sources provided", normalized) return None _logger.info("Spec metadata not found for %s, attempting crawl...", normalized) self._database.crawl_specs([normalized], release, sources) try: return self._resolve_spec_url(normalized, release) except ValueError: _logger.warning("Spec metadata still not found for %s after crawl", normalized) return None def _resolve_spec_url(self, spec: str, release: str | None) -> tuple[str, str]: """Resolve spec number to download URL and filename.""" release = release or "latest" Loading @@ -127,41 +146,18 @@ class SpecDownloads: if not versions: raise ValueError(f"No versions found for spec {normalized}") # Sort versions to find latest. Version strings (e.g. 17.0.0) sort lexicographically okay for major.minor.patch # But 9.0.0 > 10.0.0 is False in string sort ('9' > '1'). # We need generic version sort. # Simple tuple conversion: def parse_version(v: str) -> tuple[int, ...]: try: return tuple(map(int, v.split("."))) except ValueError: return (0,) versions.sort(key=lambda x: parse_version(x.version), reverse=True) versions.sort(key=lambda version: self._parse_version(version.version), reverse=True) # If specific release requested, use normalize_release to handle various formats release_type, release_value, specificity = normalize_release(release) if release_type in {"latest", "all"}: pass # Use all versions, already sorted by version desc elif release_type in ("exact", "prefix"): # Filter versions by release prefix filtered_versions: list = [] for v in versions: v_parts = parse_version(v.version) if release_type == "exact": # Exact match: specificity == 3, match full version if v.version == release_value: filtered_versions.append(v) # Prefix match: match major (specificity=1) or major.minor (specificity=2) elif len(v_parts) >= specificity: prefix_parts = release_value.split(".") if all(v_parts[i] == int(prefix_parts[i]) for i in range(specificity)): filtered_versions.append(v) if filtered_versions: versions = filtered_versions else: msg = f"No versions found for spec {normalized} with release {release}" raise ValueError(msg) versions = self._filter_versions_by_release( versions=versions, release_type=release_type, release_value=release_value, specificity=specificity, normalized=normalized, release=release, ) target = versions[0] Loading @@ -169,10 +165,61 @@ class SpecDownloads: url = SPEC_URL_TEMPLATE.format(series=series, normalized=normalized, file_name=target.file_name) return url, target.file_name def _filter_versions_by_release( self, versions: list[Any], release_type: str, release_value: str, specificity: int, normalized: str, release: str, ) -> list[Any]: """Filter candidate versions by normalized release selector.""" if release_type in {"latest", "all"}: return versions if release_type not in {"exact", "prefix"}: return versions filtered_versions: list[Any] = [] for version_entry in versions: version_parts = self._parse_version(version_entry.version) if release_type == "exact": if version_entry.version == release_value: filtered_versions.append(version_entry) continue if len(version_parts) < specificity: continue prefix_parts = release_value.split(".") if all(version_parts[index] == int(prefix_parts[index]) for index in range(specificity)): filtered_versions.append(version_entry) if filtered_versions: return filtered_versions msg = f"No versions found for spec {normalized} with release {release}" raise ValueError(msg) def _download_full_zip(self, url: str, target_path: Path) -> None: """Download full zip file, re-use session if already created for doc-only attempt.""" self.session = download_to_file(url, target_path, session=self.session, close_session=False, cache_manager_name=self._cache_manager.name) @staticmethod def _build_target_dir(checkout_dir: Path, normalized: str) -> Path: """Create and return target checkout directory for one normalized spec.""" series = f"{normalized.split('.', maxsplit=1)[0]}_series" target_dir = checkout_dir / "Specs" / "archive" / series / normalized target_dir.mkdir(parents=True, exist_ok=True) return target_dir @staticmethod def _parse_version(version: str) -> tuple[int, ...]: """Parse dotted version string into comparable integer tuple.""" try: return tuple(map(int, version.split("."))) except ValueError: return (0,) @staticmethod async def _attempt_doc_only_async(url: str, normalized: str, target_dir: Path) -> bool: """Attempt to download only the document file from remote zip.""" Loading