Commit c1b036e8 authored by Jan Reimes's avatar Jan Reimes
Browse files

feat: embed DB metadata in Markdown YAML frontmatter

Fetch TDoc/Spec metadata from database during workspace processing
and include it in the YAML frontmatter under a 'metadata' key.
Uses pyyaml for proper serialization of the frontmatter dict.

TDoc fields: tdoc_id, meeting_id, title, source, contact, tdoc_type,
for_purpose, agenda_item_nbr, agenda_item_text, status, date_created.
Spec fields: spec_number, title, spec_type, status, working_group,
series, latest_version.

Extract _run_processing_loop from workspace_process to stay under
PLR0915 statement limit.
parent eb9ec824
Loading
Loading
Loading
Loading
+175 −49
Original line number Diff line number Diff line
@@ -3,7 +3,9 @@
from __future__ import annotations

import json
import re
import time
from datetime import datetime
from pathlib import Path

import typer
@@ -22,13 +24,17 @@ from tdoc_crawler.cli.args import (
    WorkspaceProcessForceOption,
)
from tdoc_crawler.config import resolve_cache_manager
from tdoc_crawler.config.settings import PathConfig
from tdoc_crawler.config.workspace_registry import WorkspaceMember
from tdoc_crawler.database.base import DocDatabase
from tdoc_crawler.database.oxyde_models import Specification, TDocMetadata
from tdoc_crawler.extraction.convert import ConversionError, DoclingConfig, convert_for_wiki
from tdoc_crawler.extraction.profiles import DEFAULT_EXTRACTION_PROFILE, DeviceType, ExtractionProfile, FiguresMode, TablesMode
from tdoc_crawler.logging import DEFAULT_LEVEL as DEFAULT_VERBOSITY
from tdoc_crawler.logging import get_logger, set_verbosity
from tdoc_crawler.models.workspaces import SourceKind
from tdoc_crawler.utils.async_helpers import run_async
from tdoc_crawler.utils.normalization import normalize_spec_number, normalize_tdoc_id
from tdoc_crawler.workspaces import (
    get_active_workspace,
    get_workspace,
@@ -37,6 +43,30 @@ from tdoc_crawler.workspaces import (
)
from tdoc_crawler.workspaces.utils import resolve_spec_release_from_db

# Fields from DB models to include in YAML frontmatter.
_TDOC_FRONTMATTER_FIELDS = (
    "tdoc_id",
    "meeting_id",
    "title",
    "source",
    "contact",
    "tdoc_type",
    "for_purpose",
    "agenda_item_nbr",
    "agenda_item_text",
    "status",
    "date_created",
)
_SPEC_FRONTMATTER_FIELDS = (
    "spec_number",
    "title",
    "spec_type",
    "status",
    "working_group",
    "series",
    "latest_version",
)

logger = get_logger(__name__)

_PROFILE_LEVELS = {
@@ -78,6 +108,71 @@ def _resolve_spec_source_id(source_id: str) -> str:
    return source_id


def _fetch_db_metadata(document_id: str, source_kind: SourceKind) -> dict[str, object]:
    """Fetch relevant DB metadata fields for a document.

    Returns an empty dict when the DB is unavailable or the record is not found.
    """
    try:
        if source_kind == SourceKind.TDOC:
            raw = _fetch_tdoc_metadata(document_id)
        elif source_kind == SourceKind.SPEC:
            raw = _fetch_spec_metadata(document_id)
        else:
            return {}
        return _sanitize_metadata(raw)
    except Exception:
        logger.debug("Could not fetch DB metadata for %s", document_id, exc_info=True)
    return {}


def _sanitize_metadata(data: dict[str, object]) -> dict[str, object]:
    """Convert complex types to YAML-safe primitives."""
    result: dict[str, object] = {}
    for k, v in data.items():
        if v is None:
            continue
        if isinstance(v, datetime):
            result[k] = v.isoformat()
        else:
            result[k] = str(v)
    return result


def _fetch_tdoc_metadata(tdoc_id: str) -> dict[str, object]:
    """Fetch TDoc metadata from the database."""
    normalized = normalize_tdoc_id(tdoc_id)
    db_path = PathConfig().db_file

    async def _query() -> dict[str, object]:
        async with DocDatabase(db_path):
            record = await TDocMetadata.objects.filter(tdoc_id=normalized).first()
        if record is None:
            return {}
        raw = record.model_dump(mode="python")
        return {k: v for k, v in raw.items() if k in _TDOC_FRONTMATTER_FIELDS}

    return run_async(_query())


def _fetch_spec_metadata(source_id: str) -> dict[str, object]:
    """Fetch spec metadata from the database."""
    # Strip -REL suffix to get the bare spec number
    match = re.match(r"^(\d+(?:\.\d+)?)(?:-REL(.+))?$", source_id)
    spec_number = normalize_spec_number(match.group(1)) if match else normalize_spec_number(source_id)
    db_path = PathConfig().db_file

    async def _query() -> dict[str, object]:
        async with DocDatabase(db_path):
            record = await Specification.objects.filter(spec_number=spec_number).first()
        if record is None:
            return {}
        raw = record.model_dump(mode="python")
        return {k: v for k, v in raw.items() if k in _SPEC_FRONTMATTER_FIELDS}

    return run_async(_query())


def _effective_source_id(member: WorkspaceMember) -> str:
    """Return the effective source ID for directory naming.

@@ -169,6 +264,9 @@ def _process_member(
    wiki_source_dir = wiki_source_dir_base / source_id
    wiki_source_dir.mkdir(parents=True, exist_ok=True)

    # Fetch DB metadata for YAML frontmatter enrichment
    db_metadata = _fetch_db_metadata(source_id, member.source_kind)

    try:
        result_path = convert_for_wiki(
            document_id=source_id,
@@ -179,6 +277,7 @@ def _process_member(
            docling_config=docling_config,
            docx_direct=docx_direct,
            md_yaml_frontmatter=md_yaml_frontmatter,
            db_metadata=db_metadata,
        )
        suffix = result_path.suffix.lstrip(".")
        logger.debug("%s [%s] → %s", source_id, extraction_profile.value, suffix)
@@ -193,6 +292,75 @@ def _process_member(
        return source_id, False, True, 0


def _run_processing_loop(
    members: list[WorkspaceMember],
    wiki_source_dir_base: Path,
    extraction_profile: ExtractionProfile,
    force: bool,
    skip_existing: bool,
    md_yaml_frontmatter: bool,
    docling_config: DoclingConfig,
    docx_direct: bool,
) -> tuple[int, int, int, int]:
    """Run the processing loop over workspace members.

    Returns:
        Tuple of ``(processed, failed, skipped, total_pages)``.
    """
    processed = 0
    failed = 0
    skipped = 0
    total_pages = 0
    start_time = time.monotonic()

    progress, task = create_progress_bar(
        f"Processing [{extraction_profile.value}]",
        total=len(members),
    )

    with progress:
        for member in members:
            source_id = _effective_source_id(member)

            if _should_skip_member(source_id, wiki_source_dir_base, extraction_profile, force, skip_existing):
                skipped += 1
                progress.advance(task)
                continue

            progress.update(task, description=f"Processing [{extraction_profile.value}] {source_id}")
            _, succeeded, errored, pages = _process_member(
                member,
                wiki_source_dir_base,
                extraction_profile,
                force,
                md_yaml_frontmatter,
                docling_config,
                docx_direct,
            )
            if succeeded:
                processed += 1
                total_pages += pages
            if errored:
                failed += 1

            elapsed = time.monotonic() - start_time
            if elapsed > 0 and total_pages > 0:
                pps = total_pages / elapsed
                progress.update(
                    task,
                    description=f"Processing [{extraction_profile.value}] {source_id}  ({total_pages}p, {pps:.1f} p/s)",
                )
            else:
                progress.update(
                    task,
                    description=f"Processing [{extraction_profile.value}] {source_id}",
                )

            progress.advance(task)

    return processed, failed, skipped, total_pages


def workspace_process(
    workspace_name: str = typer.Argument(None, help="Workspace name (default: active workspace)"),
    force: WorkspaceProcessForceOption = False,
@@ -239,60 +407,18 @@ def workspace_process(
    else:
        wiki_source_dir_base = cache_manager.llm_wiki_dir / normalized / "sources"

    processed = 0
    failed = 0
    skipped = 0
    total_pages = 0
    start_time = time.monotonic()

    progress, task = create_progress_bar(
        f"Processing [{extraction_profile.value}]",
        total=len(members),
    )

    with progress:
        for member in members:
            source_id = _effective_source_id(member)

            if _should_skip_member(source_id, wiki_source_dir_base, extraction_profile, force, skip_existing):
                skipped += 1
                progress.advance(task)
                continue

            progress.update(task, description=f"Processing [{extraction_profile.value}] {source_id}")
            _, succeeded, errored, pages = _process_member(
                member,
    processed, failed, skipped, total_pages = _run_processing_loop(
        members,
        wiki_source_dir_base,
        extraction_profile,
        force,
        skip_existing,
        md_yaml_frontmatter,
        docling_config,
        docx_direct,
    )
            if succeeded:
                processed += 1
                total_pages += pages
            if errored:
                failed += 1

            elapsed = time.monotonic() - start_time
            if elapsed > 0 and total_pages > 0:
                pps = total_pages / elapsed
                progress.update(
                    task,
                    description=f"Processing [{extraction_profile.value}] {source_id}  ({total_pages}p, {pps:.1f} p/s)",
                )
            else:
                progress.update(
                    task,
                    description=f"Processing [{extraction_profile.value}] {source_id}",
                )

            progress.advance(task)

    elapsed_total = time.monotonic() - start_time
    summary = f"Processing complete: {processed} succeeded, {failed} failed, {skipped} skipped"
    if total_pages > 0 and elapsed_total > 0:
        avg_pps = total_pages / elapsed_total
        summary += f"  ({total_pages} pages, {avg_pps:.1f} p/s avg)"
    if total_pages > 0:
        summary += f"  ({total_pages} pages)"
    console.print(f"[yellow]{summary}[/yellow]")
+18 −11
Original line number Diff line number Diff line
@@ -279,18 +279,23 @@ def _add_yaml_frontmatter(
    document_id: str,
    source_kind: SourceKind,
    profile: ExtractionProfile,
    db_metadata: dict[str, object] | None = None,
) -> None:
    """Prepend YAML frontmatter to a Markdown file."""
    frontmatter = (
        "---\n"
        f"document_id: {document_id}\n"
        f"source_kind: {source_kind.value}\n"
        f"profile: {profile.value}\n"
        f"extraction_date: \"{datetime.now(UTC).isoformat()}\"\n"
        "---\n\n"
    )
    import yaml  # noqa: PLC0415

    frontmatter_data: dict[str, object] = {
        "document_id": document_id,
        "source_kind": source_kind.value,
        "profile": profile.value,
        "extraction_date": datetime.now(UTC).isoformat(),
    }
    if db_metadata:
        frontmatter_data["metadata"] = db_metadata

    header = f"---\n{yaml.dump(frontmatter_data, default_flow_style=False, allow_unicode=True).strip()}\n---\n\n"
    content = md_path.read_text(encoding="utf-8")
    md_path.write_text(frontmatter + content, encoding="utf-8")
    md_path.write_text(header + content, encoding="utf-8")


def _run_markdown_only(
@@ -391,6 +396,7 @@ def convert_for_wiki(
    docling_config: DoclingConfig | None = None,
    docx_direct: bool = False,
    md_yaml_frontmatter: bool = True,
    db_metadata: dict[str, object] | None = None,
) -> Path:
    """Convert a document for wiki ingestion using the specified profile.

@@ -415,6 +421,7 @@ def convert_for_wiki(
        docling_config: Optional Docling-specific configuration (figure/table modes).
        docx_direct: Feed .docx directly to backend, skip LibreOffice PDF step.
        md_yaml_frontmatter: Prepend YAML frontmatter to generated Markdown.
        db_metadata: Optional DB metadata dict to embed in YAML frontmatter.

    Returns:
        Path to the primary output file (PDF for pdf-only, MD for others).
@@ -451,7 +458,7 @@ def convert_for_wiki(
        with timed_operation(get_metrics_tracker(), document_id, MetricType.CONVERSION):
            result = _run_markdown_only(input_for_md, wiki_source_dir, figures_mode=figures_mode)
        if md_yaml_frontmatter:
            _add_yaml_frontmatter(result, document_id=document_id, source_kind=source_kind, profile=profile)
            _add_yaml_frontmatter(result, document_id=document_id, source_kind=source_kind, profile=profile, db_metadata=db_metadata)
        return result

    # Step 2c: default/advanced → check existing output before running Docling
@@ -475,7 +482,7 @@ def convert_for_wiki(
        )

    if md_yaml_frontmatter:
        _add_yaml_frontmatter(md_file, document_id=document_id, source_kind=source_kind, profile=profile)
        _add_yaml_frontmatter(md_file, document_id=document_id, source_kind=source_kind, profile=profile, db_metadata=db_metadata)
    return md_file