dracoblue.net

Week 4: Stable Claims with BERTopic

In Week 1 (extraction), Week 2 (embeddings + KMeans), and Week 3 (stable topics with BERTopic) I built the foundations. This week applies the same idea to claims — using BERTopic to cluster claim snippets and keep stable claim_ids via a registry + dim table.

This week we explore BERTopic + stable claim IDs:

  • Use pre-computed embeddings from BigQuery (same pipeline as before).
  • Fit/Load a BERTopic model (UMAP + HDBSCAN) in Python.
  • Assign internal cluster IDs per batch, then map them to stable claim_ids.
  • Persist to video_claims, claim_registry, and dim_claims tables for analysis.
  • Inspect behavior in Looker Studio and reflect on limitations.

Table of Contents

  1. Setup & Dependencies
  2. Configuration & Environment
  3. Building the BERTopic model
  4. Initial Training (bootstrap)
  5. Loading / Ensuring the Model
  6. Batch Inference → Registry Mapping → Upserts
  7. What the Final main.py now does
  8. Better cluster labels with LLM
  9. Results & Charts
  10. Limitations & Next Steps

1) Setup & Dependencies

I keep the Python environment minimal; relevant parts in requirements.txt:

bertopic==0.16.3
hdbscan==0.8.38.post1
umap-learn==0.5.6
scikit-learn==1.4.2
numpy==1.26.4
pandas==2.2.2
scipy==1.11.4
pyarrow==16.1.0
db_dtypes==1.4.3
python_dotenv==1.1.1
google-cloud-bigquery==3.25.0
google-cloud-logging==3.10.0
google-cloud-storage==2.17.0

In main.py the BERTopic stack is imported once:

from bertopic import BERTopic
import umap
import hdbscan

2) Configuration & Environment

The script reads all runtime settings from environment variables (defaults shown are from the file):

PROJECT_ID = required("PROJECT_ID")
BQ_DATASET = os.getenv("BQ_DATASET", "topicwatchdog")
EMB_TABLE = os.getenv("EMB_TABLE", "claim_embeddings")
VIDEO_CLAIMS_TABLE = os.getenv("VIDEO_CLAIMS_TABLE", "video_claims")
CLAIM_REGISTRY_TABLE = os.getenv("CLAIM_REGISTRY_TABLE", "claim_registry")
DIM_CLAIMS_TABLE = os.getenv("DIM_CLAIMS_TABLE", "dim_claims")
MODEL_VERSION = required("MODEL_VERSION")
MODEL_GCS_BASE_PATH = required("MODEL_GCS_BASE_PATH")
MODEL_GCS_PATH = f"{MODEL_GCS_BASE_PATH}/model_{MODEL_VERSION}.pkl"
BATCH_DATE = os.getenv("BATCH_DATE")
JOB_MODE = os.getenv("JOB_MODE", "INFERENCE")

EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text_embedding")
FACT_CLAIMS_TABLE = os.getenv("FACT_CLAIMS_TABLE", "fact_claims")

# Initial-train knobs
LOOKBACK_DAYS = as_int("INIT_TRAIN_LOOKBACK_DAYS", 10)
MIN_DOCS = as_int("INIT_MIN_DOCS", 200)
UMAP_N_NEIGHBORS = as_int("UMAP_N_NEIGHBORS", 5)
UMAP_N_COMPONENTS = as_int("UMAP_N_COMPONENTS", 5)
MIN_CLUSTER_SIZE = as_int("MIN_CLUSTER_SIZE", 3)  
MIN_SAMPLES = as_int("MIN_SAMPLES", 3) 
TOP_N_WORDS = as_int("TOP_N_WORDS", 10)

Note: embeddings are already stored in BigQuery from earlier weeks. The script does include a safety-net call to backfill missing embeddings when needed (see Section 7), but the standard run uses the existing embeddings.

The model is stored at MODEL_GCS_PATH as a .pkl in GCS. There are helpers to upload/download. See Week 3 for details.

If inference runs without a trained model, the script fails fast with a clear hint:

raise RuntimeError(f"Model is not trained yet, run with JOB_MODE=train first")

3) Building the BERTopic model

build_bertopic(...) wires together UMAP and HDBSCAN exactly like this:

def build_bertopic(umap_neighbors: int, umap_components: int, min_cluster_size: int, min_samples: int, top_n_words: int) -> BERTopic:
    umap_model = umap.UMAP(
        n_neighbors=umap_neighbors,
        n_components=umap_components,
        min_dist=0.0,
        metric="cosine",
        random_state=42,
    )
    hdbscan_model = hdbscan.HDBSCAN(
        min_cluster_size=min_cluster_size,
        min_samples=min_samples,
        metric="euclidean",
        cluster_selection_method="eom",
        prediction_data=True,
    )
    model = BERTopic(
        umap_model=umap_model,
        hdbscan_model=hdbscan_model,
        top_n_words=top_n_words,
        calculate_probabilities=True,
        verbose=True,
        language="multilingual",
        nr_topics=None,
    )
    return model

This is the consistent core across training and inference.

4) Initial Training (bootstrap)

When run with JOB_MODE=TRAIN, the script fetches a lookback window of embedded texts (via read_embeddings_lookback).

def read_embeddings_lookback(bq: bigquery.Client, project: str, dataset: str, emb_table: str, lookback_days: int) -> pd.DataFrame:
    sql = f"""
    SELECT video_youtube_id, text, emb, batch_date
    FROM `{project}.{dataset}.{emb_table}`
    WHERE batch_date >= DATE_SUB(CURRENT_DATE(), INTERVAL @n DAY)
      AND emb IS NOT NULL
    """
    job = bq.query(sql, job_config=bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("n", "INT64", lookback_days)]
    ))
    return job.result().to_dataframe(progress_bar_type=None)

Then it verifies MIN_DOCS, then fits the model and uploads the .pkl to GCS. Training is only used to bootstrap an initial model; the daily job runs inference.

def train_model(
    project_id: str,
    model_gcs_path: str,
    bq: bigquery.Client,
    dataset: str,
    emb_table: str,
    lookback_days: int,
    min_docs: int,
    umap_neighbors: int,
    umap_components: int,
    min_cluster_size: int,
    min_samples: int,
    top_n_words: int
) -> BERTopic:
    print(f"[INFO] Fetch docs for training")

    train_df = read_embeddings_lookback(bq, project_id, dataset, emb_table, lookback_days)
    if train_df.empty or len(train_df) < min_docs:
        raise RuntimeError(f"Not enough docs to train initial model (have {len(train_df)}, need >= {min_docs}).")

    print(f"[INFO] Fetched {len(train_df)} docs for training")

    texts = train_df["text"].tolist()
    embs = np.vstack(train_df["emb"].apply(lambda a: np.array(a, dtype=np.float32)).to_numpy())

    print(f"[INFO] Building bertopic")

    topic_model = build_bertopic(umap_neighbors, umap_components, min_cluster_size, min_samples, top_n_words)

    print(f"[INFO] Fit embeddings")

    topic_model.fit(texts, embeddings=embs)

    print(f"[INFO] Storing topic model locally")

    tmpdir = pathlib.Path(tempfile.mkdtemp(prefix="claims_save_"))
    local_file = tmpdir / "model.pkl"
    topic_model.save(str(local_file))

    print(f"[INFO] Upload locally saved model to gcs")

    upload_model_to_gcs(str(local_file), model_gcs_path, project_id)

    return topic_model

5) Loading / Ensuring the Model

On inference runs, I first ensure a model is available:

local_pkl = download_model_from_gcs(model_gcs_path, project_id)
return BERTopic.load(local_pkl)

If missing, it instructs to run the training mode first (see Section 3).

6) Batch Inference → Registry Mapping → Upserts

a) (Safety-net) Generate missing embeddings if needed

Although embeddings are produced earlier in the pipeline, main.py keeps a safety-net line:

def generate_missing_embeddings_for_data(bq: bigquery.Client, project: str, dataset: str, emb_table: str, batch_date: str, embedding_model: str, fact_claims_table: str):
    sql = f"""

    INSERT INTO `{project}.{dataset}.{emb_table}` (
        video_youtube_id, text, emb, batch_date, claim_timestamp
    )
    SELECT
        ge.video_youtube_id,
        fc.claim AS text,
        ge.ml_generate_embedding_result AS emb,
        DATE(fc.claim_timestamp) AS batch_date,
        fc.claim_timestamp AS claim_timestamp
    FROM ML.GENERATE_EMBEDDING(
        MODEL `{project}.{embedding_model}`,  
        (
            SELECT
                fc.video_youtube_id,
                fc.claim AS content               
            FROM `{project}.{dataset}.{fact_claims_table}` fc
            LEFT JOIN `{project}.{dataset}.{emb_table}` ve
                ON ve.video_youtube_id = fc.video_youtube_id
            WHERE DATE(fc.claim_timestamp) = DATE(@d)
                AND ve.video_youtube_id IS NULL
        ),
        STRUCT(TRUE AS flatten_json_output)      
    ) AS ge
    LEFT JOIN `{project}.{dataset}.{fact_claims_table}` fc
        ON fc.video_youtube_id = ge.video_youtube_id
    ;
    """
    job = bq.query(sql, job_config=bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("d", "DATE", batch_date)]
    ))

    return job.result()

b) Load embeddings for BATCH_DATE and transform

# Load embeddings for the target day
df = read_claim_embeddings_for_date(bq, PROJECT_ID, BQ_DATASET, EMB_TABLE, BATCH_DATE)
if df.empty:
    print(f"[INFO] No embeddings for {BATCH_DATE}. Exiting.")
    track("failedCalculateClaims", {
        "batchDate": BATCH_DATE,
        "modelVersion": MODEL_VERSION,
        "reason": "No embeddings for batch date"
    })
    return

texts = df["text"].tolist()
embs = np.vstack(df["emb"].apply(lambda a: np.array(a, dtype=np.float32)).to_numpy())
topics, probs = topic_model.transform(texts, embeddings=embs)

outlier_rate = (np.array(topics) == -1).mean()
print(f"[INFO] Outlier-Rate for {BATCH_DATE}: {outlier_rate:.2%}")

df["internal_claim_id"] = [int(t) for t in topics]
df["confidence"] = [float(p) if p is not None else None for p in probs.max(axis=1)]
df["batch_date"] = pd.to_datetime(BATCH_DATE).date()

c) Bootstrap or extend the registry and dim tables

Discover new internal clusters not in the registry (per MODEL_VERSION):

registry = load_claim_registry(bq, PROJECT_ID, BQ_DATASET, CLAIM_REGISTRY_TABLE, MODEL_VERSION)
known = set(registry.keys())
seen = set(df["internal_claim_id"].unique().tolist())
new_internal = sorted(list(seen - known))
next_id_start = get_max_claim_id(bq, PROJECT_ID, BQ_DATASET, DIM_CLAIMS_TABLE) + 1

Label/keywords for new clusters (from BERTopic’s top words):

topic_labels: Dict[int, str] = {}
topic_keywords: Dict[int, List[str]] = {}
for internal_id in new_internal:
    if internal_id == -1:
        topic_labels[internal_id] = "OUTLIER"
        topic_keywords[internal_id] = []
    else:
        top_words = topic_model.get_topic(internal_id) or []
        kws = [w for (w, _ctfidf) in top_words][:TOP_N_WORDS]
        topic_keywords[internal_id] = kws
        topic_labels[internal_id] = ", ".join(kws[:4]) if kws else f"claim_{internal_id}"

Centroids (mean embedding per internal cluster for this batch):

centroids: Dict[int, List[float]] = {}
    for internal_id in new_internal:
        mask = df["internal_claim_id"] == internal_id
        if mask.any():
            c = embs[mask].mean(axis=0)
            centroids[internal_id] = [float(x) for x in c.tolist()]
        else:
            centroids[internal_id] = []

Create new stable IDs and stage rows:

new_map_rows = []
    for i, internal_id in enumerate(new_internal):
        stable_id = next_id_start + i if internal_id != -1 else -1
        confs = df.loc[df["internal_claim_id"] == internal_id, "confidence"]
        new_map_rows.append({
            "internal_claim_id": internal_id,
            "claim_id": stable_id,
            "confidence_min": float(confs.min()) if len(confs) else None,
            "confidence_mean": float(confs.mean()) if len(confs) else None
        })

Upsert into claim_registry and dim_claims (staged MERGEs happen inside the helper):

if new_map_rows:
        upsert_new_registry_and_dim_claims(
            bq=bq,
            project=PROJECT_ID,
            dataset=BQ_DATASET,
            registry_table=CLAIM_REGISTRY_TABLE,
            dim_claims_table=DIM_CLAIMS_TABLE,
            model_version=MODEL_VERSION,
            new_mappings=new_map_rows,
            topic_labels=topic_labels,
            topic_keywords=topic_keywords,
            batch_centroids=centroids
        )
        registry = load_claim_registry(bq, PROJECT_ID, BQ_DATASET, CLAIM_REGISTRY_TABLE, MODEL_VERSION)

d) Map internal → stable claim_id and write video_claims

df["claim_id"] = df["internal_claim_id"].apply(lambda t: registry.get(int(t), -1))

df["claim_timestamp"] = None

out = df[["video_youtube_id", "claim_id", "text", "confidence", "batch_date"]].copy()
upsert_new_video_claims(bq, PROJECT_ID, BQ_DATASET, VIDEO_CLAIMS_TABLE, out)

print(f"[INFO] Wrote {len(out)} rows to {VIDEO_CLAIMS_TABLE} for {BATCH_DATE}")

The upsert_new_video_claims helper keeps only the highest-confidence row per (video_youtube_id, text) pair before merging, so duplicates don’t proliferate.

8) Better cluster labels with LLM

Like last two weeks I used the gemini model again to generate nice cluster name based on some examples.

DECLARE ds STRING       DEFAULT 'topicwatchdog';
DECLARE conn STRING     DEFAULT 'eu.vertex-ai-eu-conn';
DECLARE gemini_endpoint STRING DEFAULT 'gemini-2.5-flash-lite';
DECLARE gemini_model  STRING DEFAULT FORMAT('%s.gemini_remote', ds);
DECLARE min_members INT64 DEFAULT 3;

EXECUTE IMMEDIATE FORMAT("""
CREATE OR REPLACE MODEL `%s`
REMOTE WITH CONNECTION `%s`
OPTIONS (ENDPOINT = '%s')
""", gemini_model, conn, gemini_endpoint);

UPDATE topicwatchdog.claim_registry SET claim_label = NULL WHERE true;

CREATE TEMP TABLE _cluster_examples AS
SELECT
  vc.claim_id,
  ARRAY_AGG(vc.text ORDER BY dv.views_count DESC LIMIT 50) AS examples,
  COUNT(*) AS n
FROM topicwatchdog.video_claims vc
LEFT JOIN topicwatchdog.dim_claims dc ON dc.claim_id = vc.claim_id
LEFT JOIN topicwatchdog.claim_registry tr ON vc.claim_id = tr.claim_id
LEFT JOIN topicwatchdog.dim_videos dv ON dv.video_youtube_id = vc.video_youtube_id
WHERE claim_label IS NULL OR claim_label = ''
GROUP BY vc.claim_id;

select * from _cluster_examples;

CREATE TEMP TABLE _prompts AS
SELECT
  claim_id,
  (
    'Du bist politischer Analyst. Vergib eine prägnante, verständlichen ' ||
    'Behauptung (max. 120 Zeichen) für diesen **politische Behauptung**-Cluster. ' ||
    'Regeln: deutsch, neutral, keine neuen Behauptung/Namen erfinden, ' ||
    'nur die häufigste Behauptung, EINE kurze Behauptung. Antworte strikt Text ohne Markdown oder json. ' ||
    '\n\nOriginal-Behauptungen:\n' ||
    STRING_AGG('- ' || ex, '\n')
  ) AS prompt
FROM _cluster_examples, UNNEST(examples) AS ex
GROUP BY claim_id;


select * from _prompts;

CREATE TEMP TABLE _raw_llm AS
SELECT
  claim_id,
  ml_generate_text_llm_result AS claim_label
FROM ML.GENERATE_TEXT(
  MODEL `topicwatchdog.gemini_remote`,
  TABLE _prompts,
  STRUCT(
    TRUE AS flatten_json_output,
    0.2 AS temperature,
    256 AS max_output_tokens
  )
) AS gen;


select * from _raw_llm;

MERGE `topicwatchdog.claim_registry` dst
USING (
  SELECT claim_id, claim_label
  FROM _raw_llm
  WHERE claim_label IS NOT NULL
) src
ON dst.claim_id = src.claim_id
WHEN MATCHED THEN
  UPDATE SET dst.claim_label = src.claim_label

Result: We now have a persistent table claim_registry mapping numeric cluster IDs → stable human-readable names like:

| claim_id | claim_label                                                |
| -------- | ---------------------------------------------------------- |
| 1        | Politik versagt bei Sicherheit, Wirtschaft und Sozialem.   |
| 2        | Beamte sollen in Rentenkasse einzahlen.                    |
| …        | …                                                          |

This makes Looker Studio dashboards far easier to interpret.

9) Results & Charts

Now we have a setup which:

  • Reads env config & sets up logging/tracking.
  • Ensures a BERTopic model exists (train once, then reuse via GCS).
  • Loads pre-computed embeddings for BATCH_DATE.
  • Runs **transform** to get internal_claim_id + confidence.
  • Bootstraps registry entries for new internal clusters, including labels, keywords, and centroids (per model version).
  • Maps internal → stable **claim_id** and upserts into video_claims.
  • Emits basic metrics (count, outlier rate) for ops visibility.

I added a Looker Studio view for claims similar to Week-3’s topics dashboard:

  1. Claims Overview — daily histogram of processed claim snippets with mean confidence.
  2. Top Claim Clusters — largest clusters by count; quick inspection of labels/keywords.
  3. Claims of this Claims Cluster - show which claims are in which claims cluster
  4. Videos matching these Claims - show which videos belong into the cluster or claim

topics-week4

Observation: Even with stable IDs, many semantically similar claims still receive different claim_ids after retraining or as new data arrives. This shows how sensitive claim clustering remains to embedding and boundary changes.

Example cluster sample:

  1. The Earth is flat
  2. The Earth is not flat
  3. The Earth is round
  4. The Earth is a sphere

are mixed with other claims which are not part of this at all. Thus it's quite disappointing even though the claims clusters stay static, there are still many different claims mixed into one cluster.

10) Limitations & Next Steps

Even with a registry, this approach is sensitive to cluster drift:

  • ID instability Small changes in embeddings or HDBSCAN boundaries create new internal IDs, which then get new stable IDs — fragmenting continuity.

  • Over-/Under-grouping Near-duplicate claims can split; broader claims sometimes lump together.

Next steps:

This closes Week-4. I will look into a more basic example like (flat earth and such topics) and evaluate a multi stage way to cluster harmonize claims made in that. BERTopic itself works well technically, but the claims fed into it need better canonicalization and deduplication to yield meaningful clusters.