Loading src/tdoc_crawler/clients/portal.py +22 −16 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ from __future__ import annotations import re from pathlib import Path from typing import Any from typing import Self import niquests as requests Loading Loading @@ -99,11 +99,11 @@ class PortalClient: self._session.close() self._session = None def __enter__(self) -> PortalClient: def __enter__(self) -> Self: """Enter context manager.""" return self def __exit__(self, *args: Any) -> None: def __exit__(self, *args: object) -> None: """Exit context manager and close session.""" self.close() Loading @@ -118,7 +118,8 @@ class PortalClient: return if self.credentials is None: raise PortalCredentialsError("Portal credentials required for targeted fetch. Set TDC_EOL_USERNAME and TDC_EOL_PASSWORD.") msg = "Portal credentials required for targeted fetch. Set TDC_EOL_USERNAME and TDC_EOL_PASSWORD." raise PortalCredentialsError(msg) logger.info("Authenticating with 3GPP portal...") Loading @@ -138,7 +139,7 @@ class PortalClient: "password": self.credentials.password, } logger.debug(f"Calling login API at {login_api_url}") logger.debug("Calling login API at %s", login_api_url) login_response = session.post( login_api_url, Loading @@ -159,7 +160,8 @@ class PortalClient: self._authenticated = True if response_text.lower() == "failed": raise PortalAuthenticationError("Authentication failed - check credentials") msg = "Authentication failed - check credentials" raise PortalAuthenticationError(msg) # Step 3: Store the authenticated session self._session = session Loading @@ -179,7 +181,7 @@ class PortalClient: PortalParsingError: If TDoc ID is invalid or URL extraction fails requests.RequestException: For network errors """ logger.debug(f"Extracting TDoc URL from DownloadTDoc endpoint for {tdoc_id}") logger.debug("Extracting TDoc URL from DownloadTDoc endpoint for %s", tdoc_id) download_url = f"{TDOC_DOWNLOAD_URL}?contributionUid={tdoc_id}" session = self._get_session() Loading @@ -189,7 +191,8 @@ class PortalClient: response.raise_for_status() if "cannot be found" in response.text.lower() or "not found" in response.text.lower(): raise PortalParsingError(f"TDoc {tdoc_id} not found on portal") msg = f"TDoc {tdoc_id} not found on portal" raise PortalParsingError(msg) # Extract URL from JavaScript redirect pattern pattern = r"window\.location\.href\s*=\s*['\"]([^'\"]+)['\"]" Loading @@ -206,14 +209,16 @@ class PortalClient: break if not match: raise PortalParsingError(f"Failed to extract URL for TDoc {tdoc_id}: JavaScript redirect not found") msg = f"Failed to extract URL for TDoc {tdoc_id}: JavaScript redirect not found" raise PortalParsingError(msg) extracted_url = match.group(1).strip() if not extracted_url.startswith(("http://", "https://", "ftp://")): raise PortalParsingError(f"Invalid URL format for TDoc {tdoc_id}: {extracted_url}") msg = f"Invalid URL format for TDoc {tdoc_id}: {extracted_url}" raise PortalParsingError(msg) logger.debug(f"Successfully extracted TDoc URL for {tdoc_id}: {extracted_url}") logger.debug("Successfully extracted TDoc URL for %s: %s", tdoc_id, extracted_url) return extracted_url except requests.RequestException: Loading @@ -222,7 +227,7 @@ class PortalClient: raise except Exception as exc: error_msg = f"Failed to extract URL for TDoc {tdoc_id}: {exc}" logger.error(error_msg) logger.exception(error_msg) raise PortalParsingError(error_msg) from exc def fetch_tdoc_metadata(self, tdoc_id: str, url: str | None = None) -> TDocMetadata: Loading @@ -244,9 +249,9 @@ class PortalClient: if url is None: try: url = self.extract_tdoc_url(tdoc_id) logger.debug(f"Using extracted URL for {tdoc_id}") logger.debug("Using extracted URL for %s", tdoc_id) except Exception as e: logger.debug(f"URL extraction failed for {tdoc_id}: {e}") logger.warning("URL extraction failed for %s: %s", tdoc_id, e) # Continue - authenticated method can still work without pre-extracted URL # Ensure authenticated Loading @@ -254,7 +259,7 @@ class PortalClient: # Fetch TDoc page view_url = f"{TDOC_VIEW_URL}?mode=view&contributionUid={tdoc_id}" logger.debug(f"Fetching TDoc metadata from {view_url}") logger.debug("Fetching TDoc metadata from %s", view_url) session = self._get_session() response = session.get(view_url, timeout=self.timeout) Loading @@ -263,7 +268,8 @@ class PortalClient: # Check if redirected to login (session expired) if "login.aspx" in response.url.lower(): self._authenticated = False raise PortalAuthenticationError("Session expired - re-authentication required") msg = "Session expired - re-authentication required" raise PortalAuthenticationError(msg) # Parse the page using the parser module return self.parse_tdoc_page(response.text, tdoc_id, url) Loading src/tdoc_crawler/config/cache_manager.py +17 −10 Original line number Diff line number Diff line Loading @@ -28,14 +28,13 @@ from typing import ClassVar DEFAULT_DATABASE_FILENAME = "3gpp_crawler.db" DEFAULT_HTTP_CACHE_FILENAME = "http-cache.sqlite3" DEFAULT_CHECKOUT_DIRNAME = "checkout" DEFAULT_LLM_WIKI_DIRNAME = "wiki" WORKSPACE_REGISTRY_FILENAME = "workspaces.json" class CacheManagerNotRegisteredError(RuntimeError): """Raised when trying to resolve CacheManager before registration.""" pass class CacheManager: """Centralized manager for cache directory paths. Loading Loading @@ -73,7 +72,8 @@ class CacheManager: RuntimeError: If a manager is already registered """ if CacheManager._instance is not None: raise RuntimeError("CacheManager already registered. Call only once at startup.") msg = "CacheManager already registered. Call only once at startup." raise RuntimeError(msg) CacheManager._instance = self return self Loading Loading @@ -102,10 +102,18 @@ class CacheManager: """Path to workspace registry JSON file.""" return self._cache_dir / WORKSPACE_REGISTRY_FILENAME @classmethod def is_registered(cls) -> bool: """Check if a CacheManager instance is registered.""" return cls._instance is not None @property def llm_wiki_dir(self) -> Path: """Path to LLM wiki workspace directory.""" return self._cache_dir / DEFAULT_LLM_WIKI_DIRNAME def workspace_llm_wiki_dir(self, workspace_name: str) -> Path: """Path to a specific workspace's LLM wiki directory.""" return self.llm_wiki_dir / workspace_name def workspace_sources_dir(self, workspace_name: str) -> Path: """Path to a workspace's sources subdirectory.""" return self.workspace_llm_wiki_dir(workspace_name) / "sources" def resolve_cache_manager() -> CacheManager: Loading @@ -118,7 +126,6 @@ def resolve_cache_manager() -> CacheManager: CacheManagerNotRegisteredError: If no manager is registered """ if CacheManager._instance is None: raise CacheManagerNotRegisteredError( "CacheManager not registered. Call CacheManager(cache_dir).register() at application startup." ) msg = "CacheManager not registered. Call CacheManager(cache_dir).register() at application startup." raise CacheManagerNotRegisteredError(msg) return CacheManager._instance src/tdoc_crawler/config/export.py +8 −7 Original line number Diff line number Diff line Loading @@ -33,12 +33,12 @@ class ConfigExporter: """Export config to string in specified format with comments.""" if format == "toml": return self._export_toml() elif format == "yaml": if format == "yaml": return self._export_yaml() elif format == "json": if format == "json": return self._export_json() else: raise ValueError(f"Unsupported format: {format}") msg = f"Unsupported format: {format}" raise ValueError(msg) def save(self, path: Path, format: FormatType = "toml", force: bool = False) -> None: """Save config to file. Loading @@ -52,7 +52,8 @@ class ConfigExporter: FileExistsError: If file exists and force is False. """ if path.exists() and not force: raise FileExistsError(f"File exists: {path}. Use --force to overwrite.") msg = f"File exists: {path}. Use --force to overwrite." raise FileExistsError(msg) content = self.export(format) path.write_text(content, encoding="utf-8") Loading Loading @@ -188,14 +189,14 @@ class ConfigExporter: elif isinstance(value, (int, float)): result = str(value) elif isinstance(value, Path): result = f'"{str(value)}"' result = f'"{value!s}"' elif isinstance(value, str): escaped = value.replace("\\", "\\\\").replace('"', '\\"') result = f'"{escaped}"' elif isinstance(value, list): result = "[" + ", ".join(self._toml_value_to_string(item) for item in value) + "]" else: result = f'"{str(value)}"' result = f'"{value!s}"' return result Loading src/tdoc_crawler/config/settings.py +1 −22 Original line number Diff line number Diff line Loading @@ -66,7 +66,6 @@ class PathConfig(BaseSettings): """Path to the checkout directory for documents.""" return self.cache_dir / _DEFAULT_CHECKOUT_DIRNAME @field_validator("cache_dir", mode="before") @classmethod def _resolve_cache_dir(cls, value: str | Path | None) -> Path: Loading Loading @@ -98,11 +97,6 @@ class HttpConfig(BaseSettings): validation_alias=AliasChoices(ConfigEnvVar.HTTP_CACHE_ENABLED.name, "cache_enabled"), description="Enable HTTP response caching", ) cache_refresh_on_access: bool = Field( default=True, validation_alias=AliasChoices(ConfigEnvVar.HTTP_CACHE_REFRESH_ON_ACCESS.name, "cache_refresh_on_access"), description="Refresh cache TTL on each access", ) verify_ssl: bool = Field( default=True, validation_alias=AliasChoices(ConfigEnvVar.TDC_VERIFY_SSL.name, "verify_ssl"), Loading Loading @@ -132,7 +126,7 @@ class HttpConfig(BaseSettings): return int(value) if value else 0 return int(value) @field_validator("cache_enabled", "cache_refresh_on_access", "verify_ssl", mode="before") @field_validator("cache_enabled", "verify_ssl", mode="before") @classmethod def _parse_bool(cls, value: bool | str | None) -> bool: """Parse boolean values from environment strings.""" Loading Loading @@ -205,21 +199,6 @@ class CrawlConfig(BaseSettings): validation_alias=AliasChoices(ConfigEnvVar.TDC_END_DATE.name, "date_end"), description="End date filter (YYYY-MM-DD, YYYY-MM, or YYYY format)", ) source_like: str | None = Field( default=None, validation_alias=AliasChoices(ConfigEnvVar.TDC_SOURCE_LIKE.name, "source_like"), description="SQL LIKE pattern to match document source", ) agenda_like: str | None = Field( default=None, validation_alias=AliasChoices(ConfigEnvVar.TDC_AGENDA_LIKE.name, "agenda_like"), description="SQL LIKE pattern to match agenda item", ) title_like: str | None = Field( default=None, validation_alias=AliasChoices(ConfigEnvVar.TDC_TITLE_LIKE.name, "title_like"), description="SQL LIKE pattern to match document title", ) limit: int = Field( default=1000, ge=1, Loading src/tdoc_crawler/config/sources.py +61 −66 Original line number Diff line number Diff line Loading @@ -77,6 +77,19 @@ def _interpolate_env_vars(value: Any) -> Any: return value def _collect_conf_d(directory: Path, files: list[Path]) -> None: """Append alphabetically sorted config files from a conf.d directory.""" if not directory.is_dir(): return files.extend(conf_file for conf_file in sorted(directory.iterdir()) if conf_file.suffix.lower() in {".toml", ".yaml", ".yml", ".json"}) def _add_if_exists(path: Path, files: list[Path]) -> None: """Append path to files list if it exists as a regular file.""" if path.is_file(): files.append(path) def discover_config_files(cwd: Path | None = None) -> list[Path]: """Discover configuration files in precedence order (lowest first). Loading @@ -93,58 +106,53 @@ def discover_config_files(cwd: Path | None = None) -> list[Path]: if cwd is None: cwd = Path.cwd() home = Path.home() global_dir = home / ".config" / "3gpp-crawler" files: list[Path] = [] # Global configs (lowest precedence) global_config_dir = home / ".config" / "3gpp-crawler" global_config = global_config_dir / "config.toml" if global_config.is_file(): files.append(global_config) # Global conf.d/*.toml (alphabetical) global_conf_d = global_config_dir / "conf.d" if global_conf_d.is_dir(): for conf_file in sorted(global_conf_d.iterdir()): if conf_file.suffix.lower() in {".toml", ".yaml", ".yml", ".json"}: files.append(conf_file) _add_if_exists(global_dir / "config.toml", files) _collect_conf_d(global_dir / "conf.d", files) # Project-level configs (CWD relative, higher precedence) # .config/.3gpp-crawler/conf.d/*.toml project_conf_d_dot = cwd / ".config" / ".3gpp-crawler" / "conf.d" if project_conf_d_dot.is_dir(): for conf_file in sorted(project_conf_d_dot.iterdir()): if conf_file.suffix.lower() in {".toml", ".yaml", ".yml", ".json"}: files.append(conf_file) # .config/.3gpp-crawler/config.toml project_dot_config = cwd / ".config" / ".3gpp-crawler" / "config.toml" if project_dot_config.is_file(): files.append(project_dot_config) # .config/3gpp-crawler.toml project_config_short = cwd / ".config" / "3gpp-crawler.toml" if project_config_short.is_file(): files.append(project_config_short) # .3gpp-crawler/config.toml project_config_dir = cwd / ".3gpp-crawler" / "config.toml" if project_config_dir.is_file(): files.append(project_config_dir) # .3gpp-crawler.toml project_config = cwd / ".3gpp-crawler.toml" if project_config.is_file(): files.append(project_config) # 3gpp-crawler.toml (highest project precedence) project_root_config = cwd / "3gpp-crawler.toml" if project_root_config.is_file(): files.append(project_root_config) _collect_conf_d(cwd / ".config" / ".3gpp-crawler" / "conf.d", files) _add_if_exists(cwd / ".config" / ".3gpp-crawler" / "config.toml", files) _add_if_exists(cwd / ".config" / "3gpp-crawler.toml", files) _add_if_exists(cwd / ".3gpp-crawler" / "config.toml", files) _add_if_exists(cwd / ".3gpp-crawler.toml", files) _add_if_exists(cwd / "3gpp-crawler.toml", files) return files def _load_toml(config_file: Path) -> dict[str, Any]: """Load and interpolate a TOML config file.""" with config_file.open("r", encoding="utf-8") as f: content = f.read() content = _interpolate_env_vars(content) return tomllib.loads(content) def _load_yaml(config_file: Path) -> dict[str, Any]: """Load and interpolate a YAML config file.""" yaml = import_module("yaml") with config_file.open("r", encoding="utf-8") as f: try: data = yaml.safe_load(f) except yaml.scanner.ScannerError as e: raise ConfigLoadError(config_file, f"Parse error: {e}") from e if data is None: data = {} return _interpolate_env_vars(data) def _load_json(config_file: Path) -> dict[str, Any]: """Load and interpolate a JSON config file.""" with config_file.open("r", encoding="utf-8") as f: data = json.load(f) return _interpolate_env_vars(data) def load_config_file(config_file: Path) -> dict[str, Any]: """Load a single config file and return its contents as a dict. Loading @@ -164,32 +172,19 @@ def load_config_file(config_file: Path) -> dict[str, Any]: return {} suffix = config_file.suffix.lower() _loaders = { ".toml": _load_toml, ".yaml": _load_yaml, ".yml": _load_yaml, ".json": _load_json, } loader = _loaders.get(suffix) if loader is None: raise ConfigLoadError(config_file, f"Unsupported file format: {suffix}") try: if suffix == ".toml": # For TOML, we need to interpolate BEFORE parsing since TOML # doesn't support ${VAR} syntax natively with config_file.open("r", encoding="utf-8") as f: content = f.read() content = _interpolate_env_vars(content) data = tomllib.loads(content) elif suffix in {".yaml", ".yml"}: yaml = import_module("yaml") with config_file.open("r", encoding="utf-8") as f: try: data = yaml.safe_load(f) except yaml.scanner.ScannerError as e: raise ConfigLoadError(config_file, f"Parse error: {e}") from e if data is None: data = {} data = _interpolate_env_vars(data) elif suffix == ".json": with config_file.open("r", encoding="utf-8") as f: data = json.load(f) data = _interpolate_env_vars(data) else: raise ConfigLoadError(config_file, f"Unsupported file format: {suffix}") data = loader(config_file) except FileNotFoundError: return {} except PermissionError as e: Loading Loading
src/tdoc_crawler/clients/portal.py +22 −16 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ from __future__ import annotations import re from pathlib import Path from typing import Any from typing import Self import niquests as requests Loading Loading @@ -99,11 +99,11 @@ class PortalClient: self._session.close() self._session = None def __enter__(self) -> PortalClient: def __enter__(self) -> Self: """Enter context manager.""" return self def __exit__(self, *args: Any) -> None: def __exit__(self, *args: object) -> None: """Exit context manager and close session.""" self.close() Loading @@ -118,7 +118,8 @@ class PortalClient: return if self.credentials is None: raise PortalCredentialsError("Portal credentials required for targeted fetch. Set TDC_EOL_USERNAME and TDC_EOL_PASSWORD.") msg = "Portal credentials required for targeted fetch. Set TDC_EOL_USERNAME and TDC_EOL_PASSWORD." raise PortalCredentialsError(msg) logger.info("Authenticating with 3GPP portal...") Loading @@ -138,7 +139,7 @@ class PortalClient: "password": self.credentials.password, } logger.debug(f"Calling login API at {login_api_url}") logger.debug("Calling login API at %s", login_api_url) login_response = session.post( login_api_url, Loading @@ -159,7 +160,8 @@ class PortalClient: self._authenticated = True if response_text.lower() == "failed": raise PortalAuthenticationError("Authentication failed - check credentials") msg = "Authentication failed - check credentials" raise PortalAuthenticationError(msg) # Step 3: Store the authenticated session self._session = session Loading @@ -179,7 +181,7 @@ class PortalClient: PortalParsingError: If TDoc ID is invalid or URL extraction fails requests.RequestException: For network errors """ logger.debug(f"Extracting TDoc URL from DownloadTDoc endpoint for {tdoc_id}") logger.debug("Extracting TDoc URL from DownloadTDoc endpoint for %s", tdoc_id) download_url = f"{TDOC_DOWNLOAD_URL}?contributionUid={tdoc_id}" session = self._get_session() Loading @@ -189,7 +191,8 @@ class PortalClient: response.raise_for_status() if "cannot be found" in response.text.lower() or "not found" in response.text.lower(): raise PortalParsingError(f"TDoc {tdoc_id} not found on portal") msg = f"TDoc {tdoc_id} not found on portal" raise PortalParsingError(msg) # Extract URL from JavaScript redirect pattern pattern = r"window\.location\.href\s*=\s*['\"]([^'\"]+)['\"]" Loading @@ -206,14 +209,16 @@ class PortalClient: break if not match: raise PortalParsingError(f"Failed to extract URL for TDoc {tdoc_id}: JavaScript redirect not found") msg = f"Failed to extract URL for TDoc {tdoc_id}: JavaScript redirect not found" raise PortalParsingError(msg) extracted_url = match.group(1).strip() if not extracted_url.startswith(("http://", "https://", "ftp://")): raise PortalParsingError(f"Invalid URL format for TDoc {tdoc_id}: {extracted_url}") msg = f"Invalid URL format for TDoc {tdoc_id}: {extracted_url}" raise PortalParsingError(msg) logger.debug(f"Successfully extracted TDoc URL for {tdoc_id}: {extracted_url}") logger.debug("Successfully extracted TDoc URL for %s: %s", tdoc_id, extracted_url) return extracted_url except requests.RequestException: Loading @@ -222,7 +227,7 @@ class PortalClient: raise except Exception as exc: error_msg = f"Failed to extract URL for TDoc {tdoc_id}: {exc}" logger.error(error_msg) logger.exception(error_msg) raise PortalParsingError(error_msg) from exc def fetch_tdoc_metadata(self, tdoc_id: str, url: str | None = None) -> TDocMetadata: Loading @@ -244,9 +249,9 @@ class PortalClient: if url is None: try: url = self.extract_tdoc_url(tdoc_id) logger.debug(f"Using extracted URL for {tdoc_id}") logger.debug("Using extracted URL for %s", tdoc_id) except Exception as e: logger.debug(f"URL extraction failed for {tdoc_id}: {e}") logger.warning("URL extraction failed for %s: %s", tdoc_id, e) # Continue - authenticated method can still work without pre-extracted URL # Ensure authenticated Loading @@ -254,7 +259,7 @@ class PortalClient: # Fetch TDoc page view_url = f"{TDOC_VIEW_URL}?mode=view&contributionUid={tdoc_id}" logger.debug(f"Fetching TDoc metadata from {view_url}") logger.debug("Fetching TDoc metadata from %s", view_url) session = self._get_session() response = session.get(view_url, timeout=self.timeout) Loading @@ -263,7 +268,8 @@ class PortalClient: # Check if redirected to login (session expired) if "login.aspx" in response.url.lower(): self._authenticated = False raise PortalAuthenticationError("Session expired - re-authentication required") msg = "Session expired - re-authentication required" raise PortalAuthenticationError(msg) # Parse the page using the parser module return self.parse_tdoc_page(response.text, tdoc_id, url) Loading
src/tdoc_crawler/config/cache_manager.py +17 −10 Original line number Diff line number Diff line Loading @@ -28,14 +28,13 @@ from typing import ClassVar DEFAULT_DATABASE_FILENAME = "3gpp_crawler.db" DEFAULT_HTTP_CACHE_FILENAME = "http-cache.sqlite3" DEFAULT_CHECKOUT_DIRNAME = "checkout" DEFAULT_LLM_WIKI_DIRNAME = "wiki" WORKSPACE_REGISTRY_FILENAME = "workspaces.json" class CacheManagerNotRegisteredError(RuntimeError): """Raised when trying to resolve CacheManager before registration.""" pass class CacheManager: """Centralized manager for cache directory paths. Loading Loading @@ -73,7 +72,8 @@ class CacheManager: RuntimeError: If a manager is already registered """ if CacheManager._instance is not None: raise RuntimeError("CacheManager already registered. Call only once at startup.") msg = "CacheManager already registered. Call only once at startup." raise RuntimeError(msg) CacheManager._instance = self return self Loading Loading @@ -102,10 +102,18 @@ class CacheManager: """Path to workspace registry JSON file.""" return self._cache_dir / WORKSPACE_REGISTRY_FILENAME @classmethod def is_registered(cls) -> bool: """Check if a CacheManager instance is registered.""" return cls._instance is not None @property def llm_wiki_dir(self) -> Path: """Path to LLM wiki workspace directory.""" return self._cache_dir / DEFAULT_LLM_WIKI_DIRNAME def workspace_llm_wiki_dir(self, workspace_name: str) -> Path: """Path to a specific workspace's LLM wiki directory.""" return self.llm_wiki_dir / workspace_name def workspace_sources_dir(self, workspace_name: str) -> Path: """Path to a workspace's sources subdirectory.""" return self.workspace_llm_wiki_dir(workspace_name) / "sources" def resolve_cache_manager() -> CacheManager: Loading @@ -118,7 +126,6 @@ def resolve_cache_manager() -> CacheManager: CacheManagerNotRegisteredError: If no manager is registered """ if CacheManager._instance is None: raise CacheManagerNotRegisteredError( "CacheManager not registered. Call CacheManager(cache_dir).register() at application startup." ) msg = "CacheManager not registered. Call CacheManager(cache_dir).register() at application startup." raise CacheManagerNotRegisteredError(msg) return CacheManager._instance
src/tdoc_crawler/config/export.py +8 −7 Original line number Diff line number Diff line Loading @@ -33,12 +33,12 @@ class ConfigExporter: """Export config to string in specified format with comments.""" if format == "toml": return self._export_toml() elif format == "yaml": if format == "yaml": return self._export_yaml() elif format == "json": if format == "json": return self._export_json() else: raise ValueError(f"Unsupported format: {format}") msg = f"Unsupported format: {format}" raise ValueError(msg) def save(self, path: Path, format: FormatType = "toml", force: bool = False) -> None: """Save config to file. Loading @@ -52,7 +52,8 @@ class ConfigExporter: FileExistsError: If file exists and force is False. """ if path.exists() and not force: raise FileExistsError(f"File exists: {path}. Use --force to overwrite.") msg = f"File exists: {path}. Use --force to overwrite." raise FileExistsError(msg) content = self.export(format) path.write_text(content, encoding="utf-8") Loading Loading @@ -188,14 +189,14 @@ class ConfigExporter: elif isinstance(value, (int, float)): result = str(value) elif isinstance(value, Path): result = f'"{str(value)}"' result = f'"{value!s}"' elif isinstance(value, str): escaped = value.replace("\\", "\\\\").replace('"', '\\"') result = f'"{escaped}"' elif isinstance(value, list): result = "[" + ", ".join(self._toml_value_to_string(item) for item in value) + "]" else: result = f'"{str(value)}"' result = f'"{value!s}"' return result Loading
src/tdoc_crawler/config/settings.py +1 −22 Original line number Diff line number Diff line Loading @@ -66,7 +66,6 @@ class PathConfig(BaseSettings): """Path to the checkout directory for documents.""" return self.cache_dir / _DEFAULT_CHECKOUT_DIRNAME @field_validator("cache_dir", mode="before") @classmethod def _resolve_cache_dir(cls, value: str | Path | None) -> Path: Loading Loading @@ -98,11 +97,6 @@ class HttpConfig(BaseSettings): validation_alias=AliasChoices(ConfigEnvVar.HTTP_CACHE_ENABLED.name, "cache_enabled"), description="Enable HTTP response caching", ) cache_refresh_on_access: bool = Field( default=True, validation_alias=AliasChoices(ConfigEnvVar.HTTP_CACHE_REFRESH_ON_ACCESS.name, "cache_refresh_on_access"), description="Refresh cache TTL on each access", ) verify_ssl: bool = Field( default=True, validation_alias=AliasChoices(ConfigEnvVar.TDC_VERIFY_SSL.name, "verify_ssl"), Loading Loading @@ -132,7 +126,7 @@ class HttpConfig(BaseSettings): return int(value) if value else 0 return int(value) @field_validator("cache_enabled", "cache_refresh_on_access", "verify_ssl", mode="before") @field_validator("cache_enabled", "verify_ssl", mode="before") @classmethod def _parse_bool(cls, value: bool | str | None) -> bool: """Parse boolean values from environment strings.""" Loading Loading @@ -205,21 +199,6 @@ class CrawlConfig(BaseSettings): validation_alias=AliasChoices(ConfigEnvVar.TDC_END_DATE.name, "date_end"), description="End date filter (YYYY-MM-DD, YYYY-MM, or YYYY format)", ) source_like: str | None = Field( default=None, validation_alias=AliasChoices(ConfigEnvVar.TDC_SOURCE_LIKE.name, "source_like"), description="SQL LIKE pattern to match document source", ) agenda_like: str | None = Field( default=None, validation_alias=AliasChoices(ConfigEnvVar.TDC_AGENDA_LIKE.name, "agenda_like"), description="SQL LIKE pattern to match agenda item", ) title_like: str | None = Field( default=None, validation_alias=AliasChoices(ConfigEnvVar.TDC_TITLE_LIKE.name, "title_like"), description="SQL LIKE pattern to match document title", ) limit: int = Field( default=1000, ge=1, Loading
src/tdoc_crawler/config/sources.py +61 −66 Original line number Diff line number Diff line Loading @@ -77,6 +77,19 @@ def _interpolate_env_vars(value: Any) -> Any: return value def _collect_conf_d(directory: Path, files: list[Path]) -> None: """Append alphabetically sorted config files from a conf.d directory.""" if not directory.is_dir(): return files.extend(conf_file for conf_file in sorted(directory.iterdir()) if conf_file.suffix.lower() in {".toml", ".yaml", ".yml", ".json"}) def _add_if_exists(path: Path, files: list[Path]) -> None: """Append path to files list if it exists as a regular file.""" if path.is_file(): files.append(path) def discover_config_files(cwd: Path | None = None) -> list[Path]: """Discover configuration files in precedence order (lowest first). Loading @@ -93,58 +106,53 @@ def discover_config_files(cwd: Path | None = None) -> list[Path]: if cwd is None: cwd = Path.cwd() home = Path.home() global_dir = home / ".config" / "3gpp-crawler" files: list[Path] = [] # Global configs (lowest precedence) global_config_dir = home / ".config" / "3gpp-crawler" global_config = global_config_dir / "config.toml" if global_config.is_file(): files.append(global_config) # Global conf.d/*.toml (alphabetical) global_conf_d = global_config_dir / "conf.d" if global_conf_d.is_dir(): for conf_file in sorted(global_conf_d.iterdir()): if conf_file.suffix.lower() in {".toml", ".yaml", ".yml", ".json"}: files.append(conf_file) _add_if_exists(global_dir / "config.toml", files) _collect_conf_d(global_dir / "conf.d", files) # Project-level configs (CWD relative, higher precedence) # .config/.3gpp-crawler/conf.d/*.toml project_conf_d_dot = cwd / ".config" / ".3gpp-crawler" / "conf.d" if project_conf_d_dot.is_dir(): for conf_file in sorted(project_conf_d_dot.iterdir()): if conf_file.suffix.lower() in {".toml", ".yaml", ".yml", ".json"}: files.append(conf_file) # .config/.3gpp-crawler/config.toml project_dot_config = cwd / ".config" / ".3gpp-crawler" / "config.toml" if project_dot_config.is_file(): files.append(project_dot_config) # .config/3gpp-crawler.toml project_config_short = cwd / ".config" / "3gpp-crawler.toml" if project_config_short.is_file(): files.append(project_config_short) # .3gpp-crawler/config.toml project_config_dir = cwd / ".3gpp-crawler" / "config.toml" if project_config_dir.is_file(): files.append(project_config_dir) # .3gpp-crawler.toml project_config = cwd / ".3gpp-crawler.toml" if project_config.is_file(): files.append(project_config) # 3gpp-crawler.toml (highest project precedence) project_root_config = cwd / "3gpp-crawler.toml" if project_root_config.is_file(): files.append(project_root_config) _collect_conf_d(cwd / ".config" / ".3gpp-crawler" / "conf.d", files) _add_if_exists(cwd / ".config" / ".3gpp-crawler" / "config.toml", files) _add_if_exists(cwd / ".config" / "3gpp-crawler.toml", files) _add_if_exists(cwd / ".3gpp-crawler" / "config.toml", files) _add_if_exists(cwd / ".3gpp-crawler.toml", files) _add_if_exists(cwd / "3gpp-crawler.toml", files) return files def _load_toml(config_file: Path) -> dict[str, Any]: """Load and interpolate a TOML config file.""" with config_file.open("r", encoding="utf-8") as f: content = f.read() content = _interpolate_env_vars(content) return tomllib.loads(content) def _load_yaml(config_file: Path) -> dict[str, Any]: """Load and interpolate a YAML config file.""" yaml = import_module("yaml") with config_file.open("r", encoding="utf-8") as f: try: data = yaml.safe_load(f) except yaml.scanner.ScannerError as e: raise ConfigLoadError(config_file, f"Parse error: {e}") from e if data is None: data = {} return _interpolate_env_vars(data) def _load_json(config_file: Path) -> dict[str, Any]: """Load and interpolate a JSON config file.""" with config_file.open("r", encoding="utf-8") as f: data = json.load(f) return _interpolate_env_vars(data) def load_config_file(config_file: Path) -> dict[str, Any]: """Load a single config file and return its contents as a dict. Loading @@ -164,32 +172,19 @@ def load_config_file(config_file: Path) -> dict[str, Any]: return {} suffix = config_file.suffix.lower() _loaders = { ".toml": _load_toml, ".yaml": _load_yaml, ".yml": _load_yaml, ".json": _load_json, } loader = _loaders.get(suffix) if loader is None: raise ConfigLoadError(config_file, f"Unsupported file format: {suffix}") try: if suffix == ".toml": # For TOML, we need to interpolate BEFORE parsing since TOML # doesn't support ${VAR} syntax natively with config_file.open("r", encoding="utf-8") as f: content = f.read() content = _interpolate_env_vars(content) data = tomllib.loads(content) elif suffix in {".yaml", ".yml"}: yaml = import_module("yaml") with config_file.open("r", encoding="utf-8") as f: try: data = yaml.safe_load(f) except yaml.scanner.ScannerError as e: raise ConfigLoadError(config_file, f"Parse error: {e}") from e if data is None: data = {} data = _interpolate_env_vars(data) elif suffix == ".json": with config_file.open("r", encoding="utf-8") as f: data = json.load(f) data = _interpolate_env_vars(data) else: raise ConfigLoadError(config_file, f"Unsupported file format: {suffix}") data = loader(config_file) except FileNotFoundError: return {} except PermissionError as e: Loading