"""
Runtime model classes for the topic-modeling pipeline.
These classes hold *configured* runtime objects (a UMAP factory, an HDBSCAN
factory, a SentenceTransformer instance...) and are instantiated either
directly or from a parsed config via TopicModel.from_config().
Field names mirror the corresponding entries in config.py to keep the
bridge between the two trivial.
"""
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List, Union
import pandas as pd
from bertopic.dimensionality import BaseDimensionalityReduction
from bertopic.vectorizers import ClassTfidfTransformer
from hdbscan import HDBSCAN
from sentence_transformers import SentenceTransformer
from umap import UMAP
from pysyrev.core.config import TopicModelConfig
from pysyrev.core.topic import clean_dataset, topic_modeling
# =============================================================================
# Runtime sub-models
# =============================================================================
@dataclass
class UmapModel:
min_dist: float
metric: str
low_memory: bool
random_state: int
def __call__(self, n_neighbors, n_components, embeddings):
return UMAP(
n_neighbors = n_neighbors,
n_components = n_components,
min_dist = self.min_dist,
metric = self.metric,
low_memory = self.low_memory,
random_state = self.random_state,
).fit_transform(embeddings)
@dataclass
class HdbscanModel:
metric: str
cluster_selection_method: str
prediction_data: bool
def __call__(self, min_topic_size, min_samples):
return HDBSCAN(
min_cluster_size = min_topic_size,
min_samples = min_samples,
metric = self.metric,
cluster_selection_method = self.cluster_selection_method,
prediction_data = self.prediction_data,
)
@dataclass
class BertopicModel:
hdbscan_model: HdbscanModel
umap_model: UmapModel
ctfidf_model: ClassTfidfTransformer
transformer_model: str
calculate_probabilities: bool
n_gram_range: str
language: str
nr_topics: int = None
verbose: bool = False
# Cache for the loaded SentenceTransformer (filled on first access).
_embedding_model: SentenceTransformer = None
@property
def embedding_model(self) -> SentenceTransformer:
"""Lazily load the SentenceTransformer on first access. Subsequent
accesses reuse the same instance, so multiple `run()` calls on the
same TopicModel share the loaded weights."""
if self._embedding_model is None:
self._embedding_model = SentenceTransformer(self.transformer_model)
return self._embedding_model
def __call__(self, min_topic_size, min_samples):
return dict(
embedding_model = self.embedding_model,
nr_topics = self.nr_topics,
n_gram_range = self.n_gram_range,
verbose = self.verbose,
language = self.language,
calculate_probabilities = self.calculate_probabilities,
ctfidf_model = self.ctfidf_model,
umap_model = BaseDimensionalityReduction(),
hdbscan_model = self.hdbscan_model(min_topic_size, min_samples),
)
@dataclass
class TopicDistribution:
window: int
stride: int
min_similarity: float
batch_size: int
# =============================================================================
# Top-level runtime model
# =============================================================================
[docs]
@dataclass
class TopicModel:
doc_dataset: str
allow_abbrev: bool
distance: str
bertopic_model: BertopicModel
topic_distribution: TopicDistribution
nr_repr_docs: int
export_dir: str
n_neighbors: List[int]
n_components: List[int]
min_topic_size_range: List[int]
min_sample_range: List[int]
topic_size_step: int
min_sample_step: int
keep_n_results: int
ranking_scorer: str
purity_scorer: str
run_name: Union[None, str]
overwrite: bool = False
# ---- bridge from configuration --------------------------------------
[docs]
@classmethod
def from_config(cls, config: 'Config') -> 'TopicModel':
"""Build a TopicModel from a full Config object."""
from pysyrev.core.config import Config
tc = config.topic_model
ctfidf_model = ClassTfidfTransformer(
bm25_weighting = tc.ctfidf.bm25_weighting,
reduce_frequent_words = tc.ctfidf.reduce_frequent_words,
)
umap_model = UmapModel(
min_dist = tc.umap.min_dist,
metric = tc.umap.metric,
low_memory = tc.umap.low_memory,
random_state = tc.umap.random_state,
)
hdbscan_model = HdbscanModel(
metric = tc.hdbscan.metric,
cluster_selection_method = tc.hdbscan.cluster_selection_method,
prediction_data = tc.hdbscan.prediction_data,
)
bertopic_model = BertopicModel(
hdbscan_model = hdbscan_model,
umap_model = umap_model,
ctfidf_model = ctfidf_model,
transformer_model = tc.bertopic.transformer_model,
calculate_probabilities = tc.bertopic.calculate_probabilities,
n_gram_range = tc.bertopic.n_gram_range,
language = tc.bertopic.language,
)
topic_distribution = TopicDistribution(
window = tc.topic_distribution.window,
stride = tc.topic_distribution.stride,
min_similarity = tc.topic_distribution.min_similarity,
batch_size = tc.topic_distribution.batch_size,
)
report_nr = (
config.report.sections.topics.n_repr_docs_per_topic
if config.report is not None else 5
)
llm_nr = config.llm.n_repr_docs_for_labeling if config.llm is not None else 3
return cls(
doc_dataset = tc.doc_dataset,
allow_abbrev = tc.berteley.allow_abbrev,
distance = tc.distance,
bertopic_model = bertopic_model,
topic_distribution = topic_distribution,
nr_repr_docs = max(report_nr, llm_nr),
export_dir = tc.export.export_dir,
n_neighbors = tc.umap.n_neighbors,
n_components = tc.umap.n_components,
min_topic_size_range = tc.hdbscan.min_topic_size_range,
min_sample_range = tc.hdbscan.min_sample_range,
topic_size_step = tc.hdbscan.topic_size_step,
min_sample_step = tc.hdbscan.min_sample_step,
keep_n_results = tc.keep_n_results,
ranking_scorer = tc.coherence_scorer.ranking,
purity_scorer = tc.coherence_scorer.purity,
run_name = tc.export.run_name,
)
# ---- runtime --------------------------------------------------------
def _clean_dataset(self, dataset, show_progress):
return clean_dataset(dataset, self.allow_abbrev, show_progress)
def _make_run_dir(self) -> Path:
"""Create a unique subdirectory under ``export_dir`` for this run.
Raises FileExistsError if the directory already exists, unless
``overwrite=True`` was set at construction time."""
name = self.run_name or datetime.now().strftime('%Y-%m-%dT%H%M%S')
run_dir = Path(self.export_dir) / name
run_dir.mkdir(parents=True, exist_ok=self.overwrite)
return run_dir
[docs]
def run(self, dataset: pd.DataFrame = None, show_progress=True):
"""Run the topic modelling pipeline.
Parameters
----------
dataset : pd.DataFrame, optional
Reviewed-included dataset. If None, loaded from ``doc_dataset``
(set via config or auto-detected by Config.load).
"""
run_dir = self._make_run_dir()
if dataset is None:
if not self.doc_dataset:
raise ValueError(
"No dataset provided: pass a DataFrame to run() or set "
"doc_dataset in the topic_model section of your config."
)
dataset = pd.read_csv(self.doc_dataset)
cleans_docs, surviving_indices = self._clean_dataset(dataset, show_progress=show_progress)
print(
f"[TopicModel] {len(dataset)} documents in dataset → "
f"{len(cleans_docs)} survived preprocessing."
)
embeddings = self.bertopic_model.embedding_model.encode(
cleans_docs, show_progress_bar=show_progress,
)
return topic_modeling(
dataset,
cleans_docs,
self.bertopic_model,
self.topic_distribution,
embeddings,
self.n_neighbors,
self.n_components,
self.min_topic_size_range,
self.min_sample_range,
self.topic_size_step,
self.min_sample_step,
str(run_dir),
self.nr_repr_docs,
self.distance,
self.ranking_scorer,
self.purity_scorer,
self.keep_n_results,
show_progress,
surviving_indices,
)