dracoblue.net

Week 3: Stable Topics with BERTopic

In Week 1 (extraction) and Week 2 (embeddings + KMeans in BigQuery ML) we laid the groundwork. This week I built a Python BERTopic stage whose IDs stay stable across runs by mapping BERTopic’s internal clusters to stable topic IDs in BigQuery. I use Google Gemini again to generate nice labels for the extracted topic clusters.

This week we explore BERTopic + stable topic IDs (via an ID registry):

  • Train a BERTopic model in Python (UMAP + HDBSCAN).
  • Map BERTopic’s internal clusters (modelversion, internaltopic_id)
  • Ensure topic IDs remain consistent across retraining (no more ID jumps).
  • Join human-readable labels and persist results into video_topics for analysis.
  • Inspect results in Looker Studio and reflect on limitations.

Table of Contents

  1. Model storage in GCS (+ generous upload timeout)
  2. Backfill missing embeddings in BigQuery
  3. Build & train BERTopic (UMAP + HDBSCAN)
  4. CLI modes: TRAIN vs INFERENCE
  5. Daily inference: transform embeddings → topics
  6. Stable IDs via a registry + dim_topics bootstrap
  7. Write daily video topics idempotently
  8. Better cluster labels with LLM
  9. Early Results & Charts
  10. Limitations & Next Steps

Below I show how main.py grew from a tiny scaffold to the final stable-ID pipeline — one step at a time.

1) Model storage in GCS (+ generous upload timeout)

For storing the model I chose google cloud storage and will download it before every inference.

def _parse_gs_uri(gcs_uri: str) -> tuple[str, str]:
    if not gcs_uri.startswith("gs://"):
        raise ValueError(f"Expected gs:// URI, got: {gcs_uri}")
    without = gcs_uri[len("gs://"):]
    bucket, _, prefix = without.partition("/")
    return bucket, prefix.strip("/")

def download_model_from_gcs(gcs_uri: str, project_id: Optional[str]) -> str:
    bucket, prefix = _parse_gs_uri(gcs_uri)
    client_kwargs = {"project": project_id} if project_id else {}
    gcs = storage.Client(**client_kwargs)
    bkt = gcs.bucket(bucket)

    tmpdir = pathlib.Path(tempfile.mkdtemp(prefix="bertopic_model_"))

    local_file = tmpdir / os.path.basename(prefix)
    bkt.blob(prefix).download_to_filename(str(local_file))
    return str(local_file)

def upload_model_to_gcs(local_model_file: str, gcs_uri: str, project_id: Optional[str]):
    bucket, prefix = _parse_gs_uri(gcs_uri)
    client_kwargs = {"project": project_id} if project_id else {}
    gcs = storage.Client(**client_kwargs)
    bkt = gcs.bucket(bucket)
    b = bkt.blob(prefix)
    b.upload_from_filename(local_model_file, timeout=600)

This will be used later by train_model() and load_model().

2) Backfill missing embeddings in BigQuery

Since I could not ensure that every transcript had embeddings already - I used the following helper to make sure that for the batch date everything has an embedding. It's executed before inference for the batch date.

def generate_missing_embeddings_for_data(bq: bigquery.Client, project: str, dataset: str, emb_table: str, batch_date: str, embedding_model: str, transcripts_table: str, transcripts_cleaned_table: str):
    sql = f"""

    INSERT INTO `{project}.{dataset}.{emb_table}` (
        video_youtube_id, text, text_clean, emb, batch_date
    )
    SELECT
        ge.video_youtube_id,
        t.text AS text,
        ge.content AS text_clean,
        ge.ml_generate_embedding_result AS emb,
        DATE(t.published_at) AS batch_date
    FROM ML.GENERATE_EMBEDDING(
        MODEL `{project}.{embedding_model}`,  
        (
            SELECT
                t.video_youtube_id,
                s.text_clean AS content               
            FROM `{project}.{dataset}.{transcripts_table}` t
            JOIN `{project}.{dataset}.{transcripts_cleaned_table}` s
                USING (video_youtube_id)
            LEFT JOIN `{project}.{dataset}.{emb_table}` ve
                ON ve.video_youtube_id = t.video_youtube_id
            WHERE DATE(s.published_at) = DATE(@d)
                AND ve.video_youtube_id IS NULL
        ),
        STRUCT(TRUE AS flatten_json_output)      
    ) AS ge
    LEFT JOIN `{project}.{dataset}.{transcripts_table}` t
        ON t.video_youtube_id = ge.video_youtube_id
    ;
    """
    job = bq.query(sql, job_config=bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("d", "DATE", batch_date)]
    ))

    return job.result()

This writes embeddings only when missing for batch_date. The batch_date (UTC, date-typed) equals DATE(published_at) of the video on inference day.

3) Build & train BERTopic (UMAP + HDBSCAN)

The train_model uses build_bertopic to create the BERTopic instance and finally uploads the file via upload_model_to_gcs for later usage.

import umap, hdbscan
import numpy as np
from bertopic import BERTopic
import pathlib, tempfile

def build_bertopic(umap_neighbors, umap_components, min_cluster_size, min_samples, top_n_words) -> BERTopic:
    umap_model = umap.UMAP(
        n_neighbors=umap_neighbors, n_components=umap_components,
        min_dist=0.0, metric="cosine", random_state=42
    )
    hdbscan_model = hdbscan.HDBSCAN(
        min_cluster_size=min_cluster_size, min_samples=min_samples,
        metric="euclidean", cluster_selection_method="eom", prediction_data=True
    )
    return BERTopic(
        umap_model=umap_model, hdbscan_model=hdbscan_model,
        top_n_words=top_n_words, calculate_probabilities=True,
        language="multilingual", verbose=True, nr_topics=None
    )

def train_model(project_id, model_gcs_path, bq, dataset, emb_table,
                lookback_days, min_docs, umap_neighbors, umap_components, min_cluster_size, min_samples, top_n_words):
    train_df = read_embeddings_lookback(bq, project_id, dataset, emb_table, lookback_days)
    if train_df.empty or len(train_df) < min_docs:
        raise RuntimeError(f"Not enough docs to train initial model (have {len(train_df)}, need >= {min_docs}).")
    texts = train_df["text"].tolist()
    embs = np.vstack(train_df["emb"].apply(lambda a: np.array(a, dtype=np.float32)).to_numpy())
    topic_model = build_bertopic(umap_neighbors, umap_components, min_cluster_size, min_samples, top_n_words)
    topic_model.fit(texts, embeddings=embs)

    tmp = pathlib.Path(tempfile.mkdtemp(prefix="bertopic_save_")) / "model.pkl"
    topic_model.save(str(tmp))
    upload_model_to_gcs(str(tmp), model_gcs_path, project_id)
    return topic_model

4) CLI modes: TRAIN vs INFERENCE

To separate responsibilities, the job supports two modes: TRAIN and INFERENCE. The environment variable JOB_MODE and will run the train_model only if it is set to TRAIN.

    PROJECT_ID = required("PROJECT_ID")
    BQ_DATASET = os.getenv("BQ_DATASET", "topicwatchdog")
    EMB_TABLE = os.getenv("EMB_TABLE", "video_embeddings")
    VIDEO_TOPICS_TABLE = os.getenv("VIDEO_TOPICS_TABLE", "video_topics")
    DIM_TOPICS_TABLE = os.getenv("DIM_TOPICS_TABLE", "dim_topics")
    TOPIC_REGISTRY_TABLE = os.getenv("TOPIC_REGISTRY_TABLE", "topic_registry")
    MODEL_VERSION = required("MODEL_VERSION")
    MODEL_GCS_BASE_PATH = required("MODEL_GCS_BASE_PATH")
    MODEL_GCS_PATH = f"{MODEL_GCS_BASE_PATH}/model_{MODEL_VERSION}.pkl"
    BATCH_DATE = os.getenv("BATCH_DATE")
    JOB_MODE = os.getenv("JOB_MODE", "INFERENCE")

    EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text_embedding")
    TRANSCRIPTS_TABLE = os.getenv("TRANSCRIPTS_TABLE", "transcripts")
    TRANSCRIPTS_CLEANED_TABLE = os.getenv("TRANSCRIPTS_CLEANED_TABLE", "transcripts_cleaned")

    # Initial-train knobs
    LOOKBACK_DAYS = as_int("INIT_TRAIN_LOOKBACK_DAYS", 30)
    MIN_DOCS = as_int("INIT_MIN_DOCS", 200)
    UMAP_N_NEIGHBORS = as_int("UMAP_N_NEIGHBORS", 15)
    UMAP_N_COMPONENTS = as_int("UMAP_N_COMPONENTS", 5)
    MIN_CLUSTER_SIZE = as_int("MIN_CLUSTER_SIZE", 15)  
    MIN_SAMPLES = as_int("MIN_SAMPLES", 5) 
    TOP_N_WORDS = as_int("TOP_N_WORDS", 10)

    bq = bq_client(PROJECT_ID)

    if JOB_MODE.upper() == "TRAIN":
        # Load or initial-train the model
        topic_model = train_model(
            project_id=PROJECT_ID,
            model_gcs_path=MODEL_GCS_PATH,
            bq=bq,
            dataset=BQ_DATASET,
            emb_table=EMB_TABLE,
            lookback_days=LOOKBACK_DAYS,
            min_docs=MIN_DOCS,
            umap_neighbors=UMAP_N_NEIGHBORS,
            umap_components=UMAP_N_COMPONENTS,
            min_cluster_size=MIN_CLUSTER_SIZE,
            min_samples=MIN_SAMPLES,
            top_n_words=TOP_N_WORDS,
        )
        return 

And if it's set to INFERENCE I will load this model instead and call generate_missing_embeddings_for_data:

    topic_model = load_model(
        project_id=PROJECT_ID,
        model_gcs_path=MODEL_GCS_PATH,
    )

    topic_info = topic_model.get_topic_info()

    track("calculateTopics", {
        "batchDate": BATCH_DATE,
        "modelVersion": MODEL_VERSION,
        "count": len(topic_info)
    })

    print(f"[INFO] Model {MODEL_VERSION} knows {len(topic_info)} topics")

    generate_missing_embeddings_for_data(bq, PROJECT_ID, BQ_DATASET, EMB_TABLE, BATCH_DATE, EMBEDDING_MODEL, TRANSCRIPTS_TABLE, TRANSCRIPTS_CLEANED_TABLE);

5) — Daily inference: transform embeddings → topics

Now let's fetch all the embeddings for the batch date from big query with the new function read_video_embeddings_for_date

def read_video_embeddings_for_date(bq: bigquery.Client, project: str, dataset: str, emb_table: str, batch_date: str) -> pd.DataFrame:
    sql = f"""
    SELECT video_youtube_id, text, emb, batch_date
    FROM `{project}.{dataset}.{emb_table}`
    WHERE batch_date = DATE(@d)
    """
    job = bq.query(sql, job_config=bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("d", "DATE", batch_date)]
    ))
    return job.result().to_dataframe(progress_bar_type=None)

We call it and transform these to topics by using topic_model.transform:

    df = read_video_embeddings_for_date(bq, PROJECT_ID, BQ_DATASET, EMB_TABLE, BATCH_DATE)
    if df.empty:
        print(f"[INFO] No embeddings for {BATCH_DATE}. Exiting.")
        return

    texts = df["text"].tolist()
    embs = np.vstack(df["emb"].apply(lambda a: np.array(a, dtype=np.float32)).to_numpy())
    topics, probs = topic_model.transform(texts, embeddings=embs)

    outlier_rate = (np.array(topics) == -1).mean()
    print(f"[INFO] Outlier-Rate for {BATCH_DATE}: {outlier_rate:.2%}")

    df["internal_topic_id"] = [int(t) for t in topics]
    df["confidence"] = [float(p) if p is not None else None for p in probs.max(axis=1)]
    df["batch_date"] = pd.to_datetime(BATCH_DATE).date()

It was helpful to show the Outlier-Rate, which indicates that I should retrain the model.

6) Stable IDs via a registry + dim_topics bootstrap

    # Registry & dim_topics bootstrap/mapping
    registry = load_topic_registry(bq, PROJECT_ID, BQ_DATASET, TOPIC_REGISTRY_TABLE, MODEL_VERSION)
    known = set(registry.keys())
    seen = set(df["internal_topic_id"].unique().tolist())
    new_internal = sorted(list(seen - known))
    next_id_start = get_max_topic_id(bq, PROJECT_ID, BQ_DATASET, DIM_TOPICS_TABLE) + 1

    # Labels/keywords and centroids for new internal topics
    topic_labels: Dict[int, str] = {}
    topic_keywords: Dict[int, List[str]] = {}
    for internal_id in new_internal:
        if internal_id == -1:
            topic_labels[internal_id] = "OUTLIER"
            topic_keywords[internal_id] = []
        else:
            top_words = topic_model.get_topic(internal_id) or []
            kws = [w for (w, _ctfidf) in top_words][:TOP_N_WORDS]
            topic_keywords[internal_id] = kws
            topic_labels[internal_id] = ", ".join(kws[:4]) if kws else f"topic_{internal_id}"

    # Centroids from this batch (good enough for bootstrap)
    centroids: Dict[int, List[float]] = {}
    for internal_id in new_internal:
        mask = df["internal_topic_id"] == internal_id
        if mask.any():
            c = embs[mask].mean(axis=0)
            centroids[internal_id] = [float(x) for x in c.tolist()]
        else:
            centroids[internal_id] = []

    new_map_rows = []
    for i, internal_id in enumerate(new_internal):
        stable_id = next_id_start + i if internal_id != -1 else -1
        confs = df.loc[df["internal_topic_id"] == internal_id, "confidence"]
        new_map_rows.append({
            "internal_topic_id": internal_id,
            "topic_id": stable_id,
            "confidence_min": float(confs.min()) if len(confs) else None,
            "confidence_mean": float(confs.mean()) if len(confs) else None
        })

    if new_map_rows:
        upsert_new_registry_and_dim_topics(
            bq=bq,
            project=PROJECT_ID,
            dataset=BQ_DATASET,
            registry_table=TOPIC_REGISTRY_TABLE,
            dim_topics_table=DIM_TOPICS_TABLE,
            model_version=MODEL_VERSION,
            new_mappings=new_map_rows,
            topic_labels=topic_labels,
            topic_keywords=topic_keywords,
            batch_centroids=centroids
        )
        registry = load_topic_registry(bq, PROJECT_ID, BQ_DATASET, TOPIC_REGISTRY_TABLE, MODEL_VERSION)

    df["topic_id"] = df["internal_topic_id"].apply(lambda t: registry.get(int(t), -1))

7) Write daily video topics idempotently

To have video_topics use somewhat "friendly" label and topics, I pulled the label + keywords from dim_topics table and set topic + description on it by using their label and keywords.

    # Join readable labels/keywords
    label_map, keywords_map = {}, {}
    for r in bq.query(f"""
        SELECT topic_id, label, keywords
        FROM `{PROJECT_ID}.{BQ_DATASET}.{DIM_TOPICS_TABLE}`
        WHERE active_to IS NULL
    """).result():
        label_map[int(r["topic_id"])] = r["label"]
        keywords_map[int(r["topic_id"])] = r["keywords"]

    df["topic"] = df["topic_id"].map(lambda t: label_map.get(int(t), "UNKNOWN"))
    df["description"] = df["topic_id"].map(lambda t: ", ".join(keywords_map.get(int(t), []) or []))

    # video_topics (snippet_id = video id; no timestamp on whole-video embedding)
    df["topic_timestamp"] = None

    out = df[["video_youtube_id", "topic_id", "topic", "description", "confidence", "batch_date"]].copy()
    upsert_new_video_topics(bq, PROJECT_ID, BQ_DATASET, VIDEO_TOPICS_TABLE, out)

BERTopic’s internal cluster IDs can drift across retrains. The registry (model_version, internal_topic_idtopic_id) assigns monotonic stable IDs when a new internal ID appears. Old IDs are never reused. Labels/keywords live in dim_topics and can evolve without breaking historical facts.

8) Better cluster labels with LLM

Like last week I used the gemini model again to generate nice topic description based on some examples.

-- make sure all labels are empty
UPDATE topicwatchdog.topic_registry SET topic_label = NULL WHERE true;

-- fetch some clusters examples as input to gemini model
CREATE TEMP TABLE _cluster_examples AS
SELECT
  vt.topic_id,
  ARRAY_AGG(text_clean ORDER BY dv.views_count DESC LIMIT 50) AS examples,
  COUNT(*) AS n
FROM `topicwatchdog.video_topics` vt
LEFT JOIN topicwatchdog.topic_registry tr ON vt.topic_id = tr.topic_id
LEFT JOIN topicwatchdog.transcripts_cleaned tc ON vt.video_youtube_id = tc.video_youtube_id
LEFT JOIN topicwatchdog.dim_videos dv ON dv.video_youtube_id = vt.video_youtube_id
WHERE topic_label IS NULL OR topic_label = ''
GROUP BY vt.topic_id;

-- store the prompts by using these examples
CREATE TEMP TABLE _prompts AS
SELECT
  topic_id,
  (
    'Du bist politischer Analyst. Vergib einen prägnanten, verständlichen ' ||
    'Titel (max. 120 Zeichen) für diesen **politischen Anspruch/Behauptung**-Cluster. ' ||
    'Regeln: deutsch, neutral, keine neuen Themen/Namen erfinden, ' ||
    'nur EIN kurzer Titel. Antworte strikt Text ohne Markdown oder json. ' ||
    '\n\nBeispiele (Original-Canonicals):\n' ||
    STRING_AGG('- ' || ex, '\n')
  ) AS prompt
FROM _cluster_examples, UNNEST(examples) AS ex
GROUP BY topic_id;

-- get all the topic_label entries for topic_id by executing the prompts
CREATE TEMP TABLE _raw_llm AS
SELECT
  topic_id,
  ml_generate_text_llm_result AS topic_label
FROM ML.GENERATE_TEXT(
  MODEL `topicwatchdog.gemini_remote`,
  TABLE _prompts,
  STRUCT(
    TRUE AS flatten_json_output,
    0.2 AS temperature,
    1024 AS max_output_tokens
  )
) AS gen;

-- store them into the topic_registry as new topic_label
MERGE `topicwatchdog.topic_registry` dst
USING (
  SELECT topic_id, topic_label
  FROM _raw_llm
  WHERE topic_label IS NOT NULL
) src
ON dst.topic_id = src.topic_id
WHEN MATCHED THEN
  UPDATE SET dst.topic_label = src.topic_label

Result: We now have a persistent table topic_registry mapping numeric cluster IDs → stable human-readable names like:

| topic_id | cluster_label                                              |
| -------- | ---------------------------------------------------------- |
| 1        | Politische Abgrenzung und Ablehnung                        |
| 2        | Abgrenzung und Ablehnung politischer Strömungen            |
| …        | …                                                          |

This makes Looker Studio dashboards far easier to interpret.

9) Results & Charts

With BERTopic wired into the pipeline, I can finally see stable topics across multiple days.

In Looker Studio I built one new view only for the BERTopic data:

  1. BERTopic A daily histogram of processed videos, a topic list with top channels per topic, and a topic share pie chart.

    Finally a list of all videos matching the result and a possibility to select the timerange.

    topics-week3

As you can see the Kritik an Politikern und Parteien has the topic_id equal -1, which means it is not properly assigned to a topic. Even though 2472 are mapped to -1 it still looks good for the rest of the 3802.

10) Limitations & Next Steps

While BERTopic + registry mapping solves the ID-stability problem, some challenges remain:

  • Registry growth
    Every new internal BERTopic cluster gets a new stable ID. Over time, the registry may bloat with near-duplicates.

  • Outliers
    BERTopic assigns many snippets to -1. These are tracked but need strategies, e.g., (a) filter -1, (b) lower min_cluster_size, (c) periodic retrain on outliers only.

  • Compute cost
    Python BERTopic requires embeddings outside BigQuery ML. I need to benchmark cost and latency vs. the in-BQ approach.

Next steps:

  • Build a review UI in PayloadCMS to merge/split/rename stable topics.
  • Add execution & ops tracking to run it more regularly.
  • Start including claims and define how to model them (IDs, relations, stability).

This closes Week-3: we now have stable, versioned Topic IDs — a foundation for true longitudinal analysis.