Source code for pysyrev.core.config

"""
Configuration loader.

Reads a YAML file and produces typed configuration dataclasses. These
classes know nothing about the business model — they only mirror the YAML
structure. The bridge from config to the runtime model lives in
topic_model.py via TopicModel.from_config().

Naming convention: every config dataclass ends with `Config` so it is
unambiguous when imported alongside the runtime classes (e.g. UMAPConfig
vs UmapModel).

Environment variables: any string in the YAML may contain ${VAR} references.
They are resolved at load time using the environment, augmented with the
contents of the .env file pointed to by the root-level `env:` key (if any).
"""
import dataclasses
import datetime
import os
import re
from dataclasses import dataclass, fields, field
from typing import List, Union

import yaml
from dotenv import load_dotenv


# Pattern to resolve ${ENV_VAR} references inside YAML string values.
_ENV_VAR_PATTERN = re.compile(r'\$\{([^}]+)\}')


def _make_run_dir(export_dir: str, run_name: Union[str, None]) -> tuple:
    """Return ``(run_name, run_dir)``, creating the directory.

    If *run_name* is blank a timestamp (``YYYY-MM-DDTHHMMSS``) is generated.
    ``exist_ok=True`` so that re-opening an existing run is allowed.
    """
    if not run_name:
        run_name = datetime.datetime.now().strftime('%Y-%m-%dT%H%M%S')
    run_dir = os.path.join(export_dir, run_name)
    os.makedirs(run_dir, exist_ok=True)
    return run_name, run_dir


def _find_latest_dir(export_dir: str) -> Union[str, None]:
    """Return the most recently modified subdirectory of *export_dir*."""
    if not os.path.isdir(export_dir):
        return None
    candidates = [
        (entry.stat().st_mtime, entry.path)
        for entry in os.scandir(export_dir)
        if entry.is_dir()
    ]
    return max(candidates, default=(None, None))[1]


def _find_latest_file(export_dir: str, filename: str) -> Union[str, None]:
    """Return *filename* in the most recently modified subdirectory of *export_dir*.

    Handles two layouts:
    - flat:              export_dir/filename
    - timestamped dirs:  export_dir/<run_name>/filename
    """
    if not os.path.isdir(export_dir):
        return None
    flat = os.path.join(export_dir, filename)
    if os.path.isfile(flat):
        return flat
    latest_dir = _find_latest_dir(export_dir)
    if latest_dir is None:
        return None
    candidate = os.path.join(latest_dir, filename)
    return candidate if os.path.isfile(candidate) else None



def _resolve_env_vars(node) -> dict:
    """Recursively walk a YAML structure (dict / list / scalar) and replace
    ${VAR} occurrences in string values with os.environ[VAR]. Raises a
    ValueError if a referenced variable is missing."""
    if isinstance(node, dict):
        return {k: _resolve_env_vars(v) for k, v in node.items()}
    if isinstance(node, list):
        return [_resolve_env_vars(v) for v in node]
    if isinstance(node, str):
        def replace(match):
            var_name = match.group(1)
            if var_name not in os.environ:
                raise ValueError(
                    f"Environment variable {var_name!r} is referenced in the "
                    f"config but is not set (check your .env file)"
                )
            return os.environ[var_name]
        return _ENV_VAR_PATTERN.sub(replace, node)
    return node


@dataclass
class ConfigField:
    """Base for config dataclasses. If a YAML entry sets a field to None
    explicitly, fall back to the dataclass default (when one is defined)."""

    def __post_init__(self):
        for field_ in fields(self):
            if (not isinstance(field_.default, dataclasses._MISSING_TYPE)
                    and getattr(self, field_.name) is None):
                setattr(self, field_.name, field_.default)


[docs] @dataclass class WosApiConfig(ConfigField): """Configuration for retrieving WoS records via the Expanded API.""" api_key: str # typically `${WOS_API_KEY}` -> resolved from .env query: str # WoS Query Language, e.g. 'TS=("agent-based") AND PY=2015-2024' cache_dir: Union[None, str] = None # local cache; None = no caching
[docs] @dataclass class WosSourceConfig(ConfigField): """One WoS source: either a file path, or an API config. Exactly one of `file` / `api` must be set.""" source: str = 'file' # 'file' or 'api' file: Union[None, str] = None api: Union[None, WosApiConfig] = None def __post_init__(self): super().__post_init__() if self.source == 'api' and isinstance(self.api, dict): self.api = WosApiConfig(**self.api) if self.source == 'file' and not self.file: raise ValueError("WoS source is 'file' but no `file:` path is set") if self.source == 'api' and self.api is None: raise ValueError("WoS source is 'api' but no `api:` block is set") if self.source not in ('file', 'api'): raise ValueError(f"Unknown WoS source {self.source!r}; expected 'file' or 'api'")
[docs] @dataclass class OpenAlexApiConfig(ConfigField): """Configuration for retrieving works via the OpenAlex API. Either `query` (free-text BM25) or `filters` (structured) must be set (both can also be combined). `email` enables the polite pool — it is optional but strongly recommended for non-trivial usage. """ api_key: str email: Union[None, str] = None # optional, for the polite pool query: Union[None, str] = None # full-text search (BM25 on title+abstract) filters: Union[None, dict] = None # structured filters; e.g. {'publication_year': '2015-2024'} cache_dir: Union[None, str] = None # local cache; None = no caching
[docs] @dataclass class OpenAlexSourceConfig(ConfigField): """One OpenAlex source: either a file path, or an API config. Exactly one of `file` / `api` must be set.""" source: str = 'file' # 'file' or 'api' file: Union[None, str] = None api: Union[None, OpenAlexApiConfig] = None def __post_init__(self): super().__post_init__() if self.source == 'api' and isinstance(self.api, dict): self.api = OpenAlexApiConfig(**self.api) if self.source == 'file' and not self.file: raise ValueError("OpenAlex source is 'file' but no `file:` path is set") if self.source == 'api' and self.api is None: raise ValueError("OpenAlex source is 'api' but no `api:` block is set") if self.source not in ('file', 'api'): raise ValueError(f"Unknown OpenAlex source {self.source!r}; expected 'file' or 'api'")
[docs] @dataclass class CleanConfig(ConfigField): min_signals_to_reject: int = 2 extra_garbage_phrases: Union[None, List[str]] = None use_langdetect: bool = False
[docs] @dataclass class ExtractConfig(ConfigField): include_doc_type: Union[None, List[str]] = None exclude_doc_type: Union[None, List[str]] = None year: int = 1900 nb_citations: int = 0 language: Union[None, str, List[str]] = None scorer: str = "partial_token_sort_ratio" score_cutoff: int = 90
[docs] @dataclass class ResolveReferencesConfig(ConfigField): enabled: bool = False flag_unresolved: bool = False fuzzy_score_cutoff: int = 90 ngram_size: int = 3 max_candidates: int = 50 scorer: str = "token_set_ratio"
[docs] @dataclass class MergeConfig(ConfigField): title_similarity: int = 98 ngram_size: int = 3 max_candidates_per_row: int = 200 scorer: str = "token_set_ratio"
[docs] @dataclass class BibExportConfig(ConfigField): """Output configuration for the bib stage. Each run is stored in ``<export_dir>/<run_name>/bib_dataset.csv``. Call :meth:`resolve` (done automatically by ``BibDataset.from_config``) to finalise the run directory and set ``dataset`` on the instance. Leave ``run_name`` blank to auto-generate a timestamp. """ export_dir: str run_name: Union[None, str] = None dataset: str = None
[docs] def resolve(self): """Create the run directory and set the output CSV path.""" self.run_name, run_dir = _make_run_dir(self.export_dir, self.run_name) self.dataset = os.path.join(run_dir, 'bib_dataset.csv') return self
[docs] @dataclass class BibConfig(ConfigField): wos: Union[None, str, WosSourceConfig] = None open_alex: Union[None, str, OpenAlexSourceConfig] = None scopus: Union[None, str] = None pubmed: Union[None, str] = None export: Union[None, BibExportConfig] = None clean: CleanConfig = None extract: ExtractConfig = None resolve_references: ResolveReferencesConfig = None merge: MergeConfig = None def __post_init__(self): super().__post_init__() # Backwards compatibility: a string under `wos:` / `open_alex:` is # treated as a file path. A dict is parsed as a structured source config. if isinstance(self.wos, dict): if not self.wos["source"]: self.wos = None else: self.wos = WosSourceConfig(**self.wos) elif isinstance(self.wos, str): self.wos = WosSourceConfig(source='file', file=self.wos) if isinstance(self.open_alex, dict): if not self.open_alex["source"]: self.open_alex = None else: self.open_alex = OpenAlexSourceConfig(**self.open_alex) elif isinstance(self.open_alex, str): self.open_alex = OpenAlexSourceConfig(source='file', file=self.open_alex) if isinstance(self.export, dict): self.export = BibExportConfig(**self.export) if isinstance(self.clean, dict): self.clean = CleanConfig(**self.clean) elif self.clean is None: self.clean = CleanConfig() if isinstance(self.extract, dict): self.extract = ExtractConfig(**self.extract) elif self.extract is None: self.extract = ExtractConfig() if isinstance(self.resolve_references, dict): self.resolve_references = ResolveReferencesConfig(**self.resolve_references) elif self.resolve_references is None: self.resolve_references = ResolveReferencesConfig() if isinstance(self.merge, dict): self.merge = MergeConfig(**self.merge) elif self.merge is None: self.merge = MergeConfig()
[docs] @dataclass class HDBSCANConfig(ConfigField): min_topic_size_range: List[int] = field(default_factory=lambda: [2, 2]) min_sample_range: List[int] = field(default_factory=lambda: [2, 2]) topic_size_step: int = 1 min_sample_step: int = 1 cluster_selection_method: str = 'leaf' metric: str = 'euclidean' prediction_data: bool = True
[docs] @dataclass class UMAPConfig(ConfigField): n_neighbors: List[int] = field(default_factory=lambda: [5]) n_components: List[int] = field(default_factory=lambda: [5]) metric: str = 'cosine' min_dist: float = 0.0 low_memory: bool = False random_state: int = 42
@dataclass class BerteleyConfig(ConfigField): allow_abbrev: bool = False @dataclass class CoherenceScorerConfig(ConfigField): ranking: str = "u_mass" purity: str = "c_v" @dataclass class CTFIDFConfig(ConfigField): bm25_weighting: bool = True reduce_frequent_words: bool = True @dataclass class TopicDistributionConfig(ConfigField): window: int = 8 stride: int = 1 min_similarity: float = 0.1 batch_size: int = 1000
[docs] @dataclass class BertopicConfig(ConfigField): transformer_model: str = 'allenai/specter2_base' n_gram_range: str = 'bigram' language: str = 'english' calculate_probabilities: bool = True
[docs] @dataclass class ReviewExportConfig(ConfigField): """Output configuration for the review stage. Declare the parent directory (``export_dir``) and an optional run label (``run_name``). If ``run_name`` is left blank, :meth:`resolve` generates a timestamp name (``YYYY-MM-DDTHHMMSS``) at run time so that successive test runs never overwrite each other. ``resolve()`` must be called before the review runs (done automatically by ``LLMReview.from_config``). It creates the run directory, sets ``included_docs`` / ``total_docs`` on the instance, and defaults ``cache_dir`` to ``<run_dir>/cache/`` when not explicitly provided. Downstream sections (``bib_network``, ``topic_model``) can reference the output via ``config.review.export.included_docs`` after ``resolve()``, or leave ``doc_dataset`` blank to have ``Config.load`` auto-detect the most recent run. """ export_dir: str run_name: str = None # None → auto-timestamp at resolve() time cache_dir: str = None # None → <run_dir>/cache/ included_docs: str = None total_docs: str = None
[docs] def resolve(self): """Finalise run_name, create output directories, and set file paths.""" self.run_name, run_dir = _make_run_dir(self.export_dir, self.run_name) self.included_docs = os.path.join(run_dir, 'reviewed_included.csv') self.total_docs = os.path.join(run_dir, 'reviewed_total.csv') if self.cache_dir is None: self.cache_dir = os.path.join(run_dir, 'cache') os.makedirs(self.cache_dir, exist_ok=True) return self
[docs] @dataclass class ReviewerConfig(ConfigField): """Mirror of one entry under `review.reviewers` in the YAML. Cross-section fields like inclusion_criteria are NOT here — they live at the ReviewConfig level and are wired together by the runtime layer.""" model_id: str host: str provider: str name: str max_tokens: int temperature: float reasoning_effort: str backstory: str additional_context: str reasoning: str = 'brief' max_retries: Union[None, int] = None # None → falls back to ReviewConfig value max_concurrent_requests: Union[None, int] = None # None → falls back to ReviewConfig value items_per_call: Union[None, int] = None # None → falls back to ReviewConfig value
[docs] @dataclass class ReviewConfig(ConfigField): # --- Required --- export: ReviewExportConfig text_inputs: List[str] inclusion_criteria: str exclusion_criteria: str reviewers: List[ReviewerConfig] workflow: List[dict] # --- Optional (section-level defaults) --- doc_dataset: Union[None, str] = None # None = auto-detect latest bib run batch_size: int = 100 api_pause: float = 30.0 decision_rule: str = 'majority' # majority | mean sample_size: Union[None, int] = None # None = process full dataset max_retries: Union[None, int] = None # None → module default (2); overridable per reviewer max_concurrent_requests: Union[None, int] = None # None → module default (10); overridable per reviewer items_per_call: Union[None, int] = None # None → module default (1); overridable per reviewer def __post_init__(self): super().__post_init__() if isinstance(self.export, dict): self.export = ReviewExportConfig(**self.export) self.reviewers = [ReviewerConfig(**r) for r in self.reviewers]
[docs] @dataclass class CouplingNetworkConfig(ConfigField): use_resolved: bool = False use_unresolved: bool = False min_shared: int = 1
[docs] @dataclass class CocitationNetworkConfig(ConfigField): use_resolved: bool = False use_unresolved: bool = False min_cocitations: int = 1
[docs] @dataclass class BibNetworkExportConfig(ConfigField): """Output configuration for the bib_network stage. Each run is stored in ``<export_dir>/<run_name>/``. Leave ``run_name`` blank to auto-generate a timestamp. Call :meth:`resolve` to finalise the run directory and set file paths. """ export_dir: str run_name: Union[None, str] = None coupling_graph: Union[None, str] = None # set by resolve() cocitation_graph: Union[None, str] = None # set by resolve()
[docs] def resolve(self): """Create the run directory and set output file paths.""" self.run_name, run_dir = _make_run_dir(self.export_dir, self.run_name) self.coupling_graph = os.path.join(run_dir, 'coupling_network.graphml') self.cocitation_graph = os.path.join(run_dir, 'cocitation_network.graphml') return self
[docs] @dataclass class BibNetworkConfig(ConfigField): doc_dataset: str = None coupling_network: CouplingNetworkConfig = None cocitation_network: CocitationNetworkConfig = None export: Union[None, BibNetworkExportConfig] = None def __post_init__(self): super().__post_init__() if isinstance(self.coupling_network, dict): self.coupling_network = CouplingNetworkConfig(**self.coupling_network) elif self.coupling_network is None: self.coupling_network = CouplingNetworkConfig() if isinstance(self.cocitation_network, dict): self.cocitation_network = CocitationNetworkConfig(**self.cocitation_network) elif self.cocitation_network is None: self.cocitation_network = CocitationNetworkConfig() if isinstance(self.export, dict): self.export = BibNetworkExportConfig(**self.export)
[docs] @dataclass class TopicExportConfig(ConfigField): """Output configuration for the topic-model stage. Each run is stored in its own sub-directory: ``<export_dir>/<run_name>/``. Leave ``run_name`` blank to auto-generate a timestamp at run time (directory creation is deferred to ``TopicModel.run()``). """ export_dir: str run_name: Union[None, str] = None
[docs] @dataclass class TopicModelConfig(ConfigField): export: TopicExportConfig doc_dataset: Union[None, str] = None distance: str = "euclidean" keep_n_results: int = 10 coherence_scorer: CoherenceScorerConfig = None hdbscan: HDBSCANConfig = None umap: UMAPConfig = None bertopic: BertopicConfig = None berteley: BerteleyConfig = None ctfidf: CTFIDFConfig = None topic_distribution: TopicDistributionConfig = None def __post_init__(self): super().__post_init__() if isinstance(self.export, dict): self.export = TopicExportConfig(**self.export) if isinstance(self.hdbscan, dict): self.hdbscan = HDBSCANConfig(**self.hdbscan) elif self.hdbscan is None: self.hdbscan = HDBSCANConfig() if isinstance(self.umap, dict): self.umap = UMAPConfig(**self.umap) elif self.umap is None: self.umap = UMAPConfig() if isinstance(self.bertopic, dict): self.bertopic = BertopicConfig(**self.bertopic) elif self.bertopic is None: self.bertopic = BertopicConfig() if isinstance(self.berteley, dict): self.berteley = BerteleyConfig(**self.berteley) elif self.berteley is None: self.berteley = BerteleyConfig() if isinstance(self.ctfidf, dict): self.ctfidf = CTFIDFConfig(**self.ctfidf) elif self.ctfidf is None: self.ctfidf = CTFIDFConfig() if isinstance(self.topic_distribution, dict): self.topic_distribution = TopicDistributionConfig(**self.topic_distribution) elif self.topic_distribution is None: self.topic_distribution = TopicDistributionConfig() if isinstance(self.coherence_scorer, dict): self.coherence_scorer = CoherenceScorerConfig(**self.coherence_scorer) elif self.coherence_scorer is None: self.coherence_scorer = CoherenceScorerConfig()
@dataclass class TopicsSectionConfig(ConfigField): n_repr_docs_per_topic: int = 5 @dataclass class BibNetworkSectionConfig(ConfigField): enabled: str = "auto" # "auto" | "true" | "false" @dataclass class TemporalSectionConfig(ConfigField): variants: List[str] = field(default_factory=lambda: [ "absolute", "cumulative", "normalized", "weighted" ]) @dataclass class TopicCharacteristicsConfig(ConfigField): n_top_cited_per_topic: int = 5 n_top_cited_global: int = 50 @dataclass class TopicSimilarityConfig(ConfigField): clustering: bool = True dendrogram: bool = True @dataclass class PaperSelectionConfig(ConfigField): min_year: int = 2000 proportion_per_topic: float = 0.15 selection_by: str = "citations" # "citations" | "random" export_annex: bool = True annex_format: str = "csv" # "csv" | "txt"
[docs] @dataclass class ReportSectionsConfig(ConfigField): topics: TopicsSectionConfig = None bib_network: BibNetworkSectionConfig = None temporal: TemporalSectionConfig = None topic_characteristics: TopicCharacteristicsConfig = None topic_similarity: TopicSimilarityConfig = None paper_selection: PaperSelectionConfig = None extra: Union[None, List[dict]] = None def __post_init__(self): super().__post_init__() if isinstance(self.topics, dict): self.topics = TopicsSectionConfig(**self.topics) elif self.topics is None: self.topics = TopicsSectionConfig() if isinstance(self.bib_network, dict): self.bib_network = BibNetworkSectionConfig(**self.bib_network) elif self.bib_network is None: self.bib_network = BibNetworkSectionConfig() if isinstance(self.temporal, dict): self.temporal = TemporalSectionConfig(**self.temporal) elif self.temporal is None: self.temporal = TemporalSectionConfig() if isinstance(self.topic_characteristics, dict): self.topic_characteristics = TopicCharacteristicsConfig(**self.topic_characteristics) elif self.topic_characteristics is None: self.topic_characteristics = TopicCharacteristicsConfig() if isinstance(self.topic_similarity, dict): self.topic_similarity = TopicSimilarityConfig(**self.topic_similarity) elif self.topic_similarity is None: self.topic_similarity = TopicSimilarityConfig() if isinstance(self.paper_selection, dict): self.paper_selection = PaperSelectionConfig(**self.paper_selection) elif self.paper_selection is None: self.paper_selection = PaperSelectionConfig()
[docs] @dataclass class ReportMetaConfig(ConfigField): title: str = "Bibliographic report — Pysyrev" subtitle: Union[None, str] = None author: str = "Report generated with the pysyrev engine (v0.1)" date_format: str = "%d/%m/%Y" version: str = "1.0.0" summary: Union[None, str] = None
[docs] @dataclass class ReportConfig(ConfigField): meta: Union[None, ReportMetaConfig] = None sections: Union[None, ReportSectionsConfig] = None def __post_init__(self): super().__post_init__() if isinstance(self.meta, dict): self.meta = ReportMetaConfig(**self.meta) elif self.meta is None: self.meta = ReportMetaConfig() if isinstance(self.sections, dict): self.sections = ReportSectionsConfig(**self.sections) elif self.sections is None: self.sections = ReportSectionsConfig()
[docs] @dataclass class TopicLabelerConfig(ConfigField): """LLM configuration for generating human-readable topic labels.""" provider: str model_id: str host: Union[None, str] = None max_tokens: int = 200 temperature: float = 0.3 max_retries: int = 2 max_concurrent_requests: int = 5 n_repr_docs_for_labeling: int = 3 system_prompt: Union[None, str] = None
[docs] @dataclass class TopicReportConfig(ConfigField): """Model-selection parameters for the topic-report stage.""" run_dir: str = None # auto-detected by Config.load() from topic_model.export.export_dir when blank model_index: int = 0 export_to: str = None
@dataclass class BibNetworkReportConfig(ConfigField): """Paths to the exported bib_network graphs for inclusion in the report. Leave both paths blank and set 'config' at the root of the report YAML so that the latest bib_network run is detected automatically from bib_network.export.export_dir in the main pipeline config. """ coupling_graph: Union[None, str] = None cocitation_graph: Union[None, str] = None
[docs] @dataclass class Config: """Root configuration object. All sections are optional — only the sections present in the YAML are executed. The canonical stage order is: ``bib → review → bib-network → topic-model → topic-report``. ``Config.load()`` propagates outputs between stages automatically when ``doc_dataset`` / ``run_dir`` are left blank, so a full-pipeline YAML requires no explicit cross-section paths. """ env: Union[None, str] = None bib: Union[None, BibConfig] = None review: Union[None, ReviewConfig] = None bib_network: Union[None, BibNetworkConfig] = None topic_model: Union[None, TopicModelConfig] = None topic_report: Union[None, TopicReportConfig] = None report: Union[None, ReportConfig] = None llm: Union[None, TopicLabelerConfig] = None # Auto-populated during load() — not a user-facing YAML key. bib_network_graphs: Union[None, BibNetworkReportConfig] = None
[docs] @classmethod def load(cls, config_file): """Load a YAML config file. Steps: 1. Read the YAML. 2. Load the .env file referenced by the root-level ``env:`` key (if any). 3. Resolve all ``${VAR}`` references. 4. Propagate outputs between stages when ``doc_dataset`` / ``run_dir`` are left blank (auto-detection of the latest run in each export_dir). 5. Build typed dataclasses for every section present. """ with open(config_file, 'r') as file: raw = yaml.safe_load(file) or {} env_path = raw.get('env') if env_path: load_dotenv(env_path) resolved = _resolve_env_vars(raw) # ── Propagate outputs between stages when doc_dataset is blank ──────── # bib.export.export_dir → review.doc_dataset # review.export.export_dir → bib_network.doc_dataset # → topic_model.doc_dataset bib_export_dir = (resolved.get('bib') or {}).get('export', {}).get('export_dir') review_export_dir = (resolved.get('review') or {}).get('export', {}).get('export_dir') review_data = dict(resolved.get('review') or {}) bib_network_data = dict(resolved.get('bib_network') or {}) topic_model_data = dict(resolved.get('topic_model') or {}) if bib_export_dir and not review_data.get('doc_dataset'): latest = _find_latest_file(bib_export_dir, 'bib_dataset.csv') if latest: review_data['doc_dataset'] = latest if review_export_dir: latest = _find_latest_file(review_export_dir, 'reviewed_included.csv') if latest: if not bib_network_data.get('doc_dataset'): bib_network_data['doc_dataset'] = latest if not topic_model_data.get('doc_dataset'): topic_model_data['doc_dataset'] = latest # ── Auto-detect topic_report.run_dir from latest topic_model run ───── topic_report_data = dict(resolved.get('topic_report') or {}) if topic_report_data and not topic_report_data.get('run_dir'): tm_export_dir = ( topic_model_data.get('export', {}).get('export_dir') or (resolved.get('topic_model') or {}).get('export', {}).get('export_dir') ) if tm_export_dir: latest = _find_latest_dir(tm_export_dir) if latest: topic_report_data['run_dir'] = latest # ── Auto-detect bib_network graph paths for the report ──────────────── bib_network_graphs = None bn_export_dir = ( bib_network_data.get('export', {}).get('export_dir') or (resolved.get('bib_network') or {}).get('export', {}).get('export_dir') ) if bn_export_dir: latest_dir = _find_latest_dir(bn_export_dir) if latest_dir: coupling = os.path.join(latest_dir, 'coupling_network.graphml') cocitation = os.path.join(latest_dir, 'cocitation_network.graphml') bib_network_graphs = BibNetworkReportConfig( coupling_graph = coupling if os.path.isfile(coupling) else None, cocitation_graph = cocitation if os.path.isfile(cocitation) else None, ) return cls( env = resolved.get('env'), bib = BibConfig(**resolved['bib']) if resolved.get('bib') else None, review = ReviewConfig(**review_data) if review_data else None, bib_network = BibNetworkConfig(**bib_network_data) if bib_network_data else None, topic_model = TopicModelConfig(**topic_model_data) if topic_model_data else None, topic_report = TopicReportConfig(**topic_report_data) if topic_report_data else None, report = ReportConfig(**resolved['report']) if resolved.get('report') else None, llm = TopicLabelerConfig(**resolved['llm']) if resolved.get('llm') else None, bib_network_graphs = bib_network_graphs, )