In Week 1 (extraction), Week 2 (embeddings + KMeans), and Week 3 (stable topics with BERTopic) I built the foundations. This week applies the same idea to claims — using BERTopic to cluster claim snippets and keep stable claim_id
s via a registry + dim table.
This week we explore BERTopic + stable claim IDs:
- Use pre-computed embeddings from BigQuery (same pipeline as before).
- Fit/Load a BERTopic model (UMAP + HDBSCAN) in Python.
- Assign internal cluster IDs per batch, then map them to stable
claim_id
s. - Persist to
video_claims
,claim_registry
, anddim_claims
tables for analysis. - Inspect behavior in Looker Studio and reflect on limitations.
Table of Contents
- Setup & Dependencies
- Configuration & Environment
- Building the BERTopic model
- Initial Training (bootstrap)
- Loading / Ensuring the Model
- Batch Inference → Registry Mapping → Upserts
- What the Final
main.py
now does - Better cluster labels with LLM
- Results & Charts
- Limitations & Next Steps
1) Setup & Dependencies
I keep the Python environment minimal; relevant parts in requirements.txt
:
bertopic==0.16.3
hdbscan==0.8.38.post1
umap-learn==0.5.6
scikit-learn==1.4.2
numpy==1.26.4
pandas==2.2.2
scipy==1.11.4
pyarrow==16.1.0
db_dtypes==1.4.3
python_dotenv==1.1.1
google-cloud-bigquery==3.25.0
google-cloud-logging==3.10.0
google-cloud-storage==2.17.0
In main.py
the BERTopic stack is imported once:
from bertopic import BERTopic
import umap
import hdbscan
2) Configuration & Environment
The script reads all runtime settings from environment variables (defaults shown are from the file):
PROJECT_ID = required("PROJECT_ID")
BQ_DATASET = os.getenv("BQ_DATASET", "topicwatchdog")
EMB_TABLE = os.getenv("EMB_TABLE", "claim_embeddings")
VIDEO_CLAIMS_TABLE = os.getenv("VIDEO_CLAIMS_TABLE", "video_claims")
CLAIM_REGISTRY_TABLE = os.getenv("CLAIM_REGISTRY_TABLE", "claim_registry")
DIM_CLAIMS_TABLE = os.getenv("DIM_CLAIMS_TABLE", "dim_claims")
MODEL_VERSION = required("MODEL_VERSION")
MODEL_GCS_BASE_PATH = required("MODEL_GCS_BASE_PATH")
MODEL_GCS_PATH = f"{MODEL_GCS_BASE_PATH}/model_{MODEL_VERSION}.pkl"
BATCH_DATE = os.getenv("BATCH_DATE")
JOB_MODE = os.getenv("JOB_MODE", "INFERENCE")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text_embedding")
FACT_CLAIMS_TABLE = os.getenv("FACT_CLAIMS_TABLE", "fact_claims")
# Initial-train knobs
LOOKBACK_DAYS = as_int("INIT_TRAIN_LOOKBACK_DAYS", 10)
MIN_DOCS = as_int("INIT_MIN_DOCS", 200)
UMAP_N_NEIGHBORS = as_int("UMAP_N_NEIGHBORS", 5)
UMAP_N_COMPONENTS = as_int("UMAP_N_COMPONENTS", 5)
MIN_CLUSTER_SIZE = as_int("MIN_CLUSTER_SIZE", 3)
MIN_SAMPLES = as_int("MIN_SAMPLES", 3)
TOP_N_WORDS = as_int("TOP_N_WORDS", 10)
Note: embeddings are already stored in BigQuery from earlier weeks. The script does include a safety-net call to backfill missing embeddings when needed (see Section 7), but the standard run uses the existing embeddings.
The model is stored at MODEL_GCS_PATH
as a .pkl
in GCS. There are helpers to upload/download. See Week 3 for details.
If inference runs without a trained model, the script fails fast with a clear hint:
raise RuntimeError(f"Model is not trained yet, run with JOB_MODE=train first")
3) Building the BERTopic model
build_bertopic(...)
wires together UMAP and HDBSCAN exactly like this:
def build_bertopic(umap_neighbors: int, umap_components: int, min_cluster_size: int, min_samples: int, top_n_words: int) -> BERTopic:
umap_model = umap.UMAP(
n_neighbors=umap_neighbors,
n_components=umap_components,
min_dist=0.0,
metric="cosine",
random_state=42,
)
hdbscan_model = hdbscan.HDBSCAN(
min_cluster_size=min_cluster_size,
min_samples=min_samples,
metric="euclidean",
cluster_selection_method="eom",
prediction_data=True,
)
model = BERTopic(
umap_model=umap_model,
hdbscan_model=hdbscan_model,
top_n_words=top_n_words,
calculate_probabilities=True,
verbose=True,
language="multilingual",
nr_topics=None,
)
return model
This is the consistent core across training and inference.
4) Initial Training (bootstrap)
When run with JOB_MODE=TRAIN
, the script fetches a lookback window of embedded texts (via read_embeddings_lookback
).
def read_embeddings_lookback(bq: bigquery.Client, project: str, dataset: str, emb_table: str, lookback_days: int) -> pd.DataFrame:
sql = f"""
SELECT video_youtube_id, text, emb, batch_date
FROM `{project}.{dataset}.{emb_table}`
WHERE batch_date >= DATE_SUB(CURRENT_DATE(), INTERVAL @n DAY)
AND emb IS NOT NULL
"""
job = bq.query(sql, job_config=bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("n", "INT64", lookback_days)]
))
return job.result().to_dataframe(progress_bar_type=None)
Then it verifies MIN_DOCS
, then fits the model and uploads the .pkl
to GCS. Training is only used to bootstrap an initial model; the daily job runs inference.
def train_model(
project_id: str,
model_gcs_path: str,
bq: bigquery.Client,
dataset: str,
emb_table: str,
lookback_days: int,
min_docs: int,
umap_neighbors: int,
umap_components: int,
min_cluster_size: int,
min_samples: int,
top_n_words: int
) -> BERTopic:
print(f"[INFO] Fetch docs for training")
train_df = read_embeddings_lookback(bq, project_id, dataset, emb_table, lookback_days)
if train_df.empty or len(train_df) < min_docs:
raise RuntimeError(f"Not enough docs to train initial model (have {len(train_df)}, need >= {min_docs}).")
print(f"[INFO] Fetched {len(train_df)} docs for training")
texts = train_df["text"].tolist()
embs = np.vstack(train_df["emb"].apply(lambda a: np.array(a, dtype=np.float32)).to_numpy())
print(f"[INFO] Building bertopic")
topic_model = build_bertopic(umap_neighbors, umap_components, min_cluster_size, min_samples, top_n_words)
print(f"[INFO] Fit embeddings")
topic_model.fit(texts, embeddings=embs)
print(f"[INFO] Storing topic model locally")
tmpdir = pathlib.Path(tempfile.mkdtemp(prefix="claims_save_"))
local_file = tmpdir / "model.pkl"
topic_model.save(str(local_file))
print(f"[INFO] Upload locally saved model to gcs")
upload_model_to_gcs(str(local_file), model_gcs_path, project_id)
return topic_model
5) Loading / Ensuring the Model
On inference runs, I first ensure a model is available:
local_pkl = download_model_from_gcs(model_gcs_path, project_id)
return BERTopic.load(local_pkl)
If missing, it instructs to run the training mode first (see Section 3).
6) Batch Inference → Registry Mapping → Upserts
a) (Safety-net) Generate missing embeddings if needed
Although embeddings are produced earlier in the pipeline, main.py
keeps a safety-net line:
def generate_missing_embeddings_for_data(bq: bigquery.Client, project: str, dataset: str, emb_table: str, batch_date: str, embedding_model: str, fact_claims_table: str):
sql = f"""
INSERT INTO `{project}.{dataset}.{emb_table}` (
video_youtube_id, text, emb, batch_date, claim_timestamp
)
SELECT
ge.video_youtube_id,
fc.claim AS text,
ge.ml_generate_embedding_result AS emb,
DATE(fc.claim_timestamp) AS batch_date,
fc.claim_timestamp AS claim_timestamp
FROM ML.GENERATE_EMBEDDING(
MODEL `{project}.{embedding_model}`,
(
SELECT
fc.video_youtube_id,
fc.claim AS content
FROM `{project}.{dataset}.{fact_claims_table}` fc
LEFT JOIN `{project}.{dataset}.{emb_table}` ve
ON ve.video_youtube_id = fc.video_youtube_id
WHERE DATE(fc.claim_timestamp) = DATE(@d)
AND ve.video_youtube_id IS NULL
),
STRUCT(TRUE AS flatten_json_output)
) AS ge
LEFT JOIN `{project}.{dataset}.{fact_claims_table}` fc
ON fc.video_youtube_id = ge.video_youtube_id
;
"""
job = bq.query(sql, job_config=bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("d", "DATE", batch_date)]
))
return job.result()
b) Load embeddings for BATCH_DATE
and transform
# Load embeddings for the target day
df = read_claim_embeddings_for_date(bq, PROJECT_ID, BQ_DATASET, EMB_TABLE, BATCH_DATE)
if df.empty:
print(f"[INFO] No embeddings for {BATCH_DATE}. Exiting.")
track("failedCalculateClaims", {
"batchDate": BATCH_DATE,
"modelVersion": MODEL_VERSION,
"reason": "No embeddings for batch date"
})
return
texts = df["text"].tolist()
embs = np.vstack(df["emb"].apply(lambda a: np.array(a, dtype=np.float32)).to_numpy())
topics, probs = topic_model.transform(texts, embeddings=embs)
outlier_rate = (np.array(topics) == -1).mean()
print(f"[INFO] Outlier-Rate for {BATCH_DATE}: {outlier_rate:.2%}")
df["internal_claim_id"] = [int(t) for t in topics]
df["confidence"] = [float(p) if p is not None else None for p in probs.max(axis=1)]
df["batch_date"] = pd.to_datetime(BATCH_DATE).date()
c) Bootstrap or extend the registry and dim tables
Discover new internal clusters not in the registry (per MODEL_VERSION
):
registry = load_claim_registry(bq, PROJECT_ID, BQ_DATASET, CLAIM_REGISTRY_TABLE, MODEL_VERSION)
known = set(registry.keys())
seen = set(df["internal_claim_id"].unique().tolist())
new_internal = sorted(list(seen - known))
next_id_start = get_max_claim_id(bq, PROJECT_ID, BQ_DATASET, DIM_CLAIMS_TABLE) + 1
Label/keywords for new clusters (from BERTopic’s top words):
topic_labels: Dict[int, str] = {}
topic_keywords: Dict[int, List[str]] = {}
for internal_id in new_internal:
if internal_id == -1:
topic_labels[internal_id] = "OUTLIER"
topic_keywords[internal_id] = []
else:
top_words = topic_model.get_topic(internal_id) or []
kws = [w for (w, _ctfidf) in top_words][:TOP_N_WORDS]
topic_keywords[internal_id] = kws
topic_labels[internal_id] = ", ".join(kws[:4]) if kws else f"claim_{internal_id}"
Centroids (mean embedding per internal cluster for this batch):
centroids: Dict[int, List[float]] = {}
for internal_id in new_internal:
mask = df["internal_claim_id"] == internal_id
if mask.any():
c = embs[mask].mean(axis=0)
centroids[internal_id] = [float(x) for x in c.tolist()]
else:
centroids[internal_id] = []
Create new stable IDs and stage rows:
new_map_rows = []
for i, internal_id in enumerate(new_internal):
stable_id = next_id_start + i if internal_id != -1 else -1
confs = df.loc[df["internal_claim_id"] == internal_id, "confidence"]
new_map_rows.append({
"internal_claim_id": internal_id,
"claim_id": stable_id,
"confidence_min": float(confs.min()) if len(confs) else None,
"confidence_mean": float(confs.mean()) if len(confs) else None
})
Upsert into claim_registry
and dim_claims
(staged MERGEs happen inside the helper):
if new_map_rows:
upsert_new_registry_and_dim_claims(
bq=bq,
project=PROJECT_ID,
dataset=BQ_DATASET,
registry_table=CLAIM_REGISTRY_TABLE,
dim_claims_table=DIM_CLAIMS_TABLE,
model_version=MODEL_VERSION,
new_mappings=new_map_rows,
topic_labels=topic_labels,
topic_keywords=topic_keywords,
batch_centroids=centroids
)
registry = load_claim_registry(bq, PROJECT_ID, BQ_DATASET, CLAIM_REGISTRY_TABLE, MODEL_VERSION)
d) Map internal → stable claim_id
and write video_claims
df["claim_id"] = df["internal_claim_id"].apply(lambda t: registry.get(int(t), -1))
df["claim_timestamp"] = None
out = df[["video_youtube_id", "claim_id", "text", "confidence", "batch_date"]].copy()
upsert_new_video_claims(bq, PROJECT_ID, BQ_DATASET, VIDEO_CLAIMS_TABLE, out)
print(f"[INFO] Wrote {len(out)} rows to {VIDEO_CLAIMS_TABLE} for {BATCH_DATE}")
The upsert_new_video_claims
helper keeps only the highest-confidence row per (video_youtube_id, text)
pair before merging, so duplicates don’t proliferate.
8) Better cluster labels with LLM
Like last two weeks I used the gemini model again to generate nice cluster name based on some examples.
DECLARE ds STRING DEFAULT 'topicwatchdog';
DECLARE conn STRING DEFAULT 'eu.vertex-ai-eu-conn';
DECLARE gemini_endpoint STRING DEFAULT 'gemini-2.5-flash-lite';
DECLARE gemini_model STRING DEFAULT FORMAT('%s.gemini_remote', ds);
DECLARE min_members INT64 DEFAULT 3;
EXECUTE IMMEDIATE FORMAT("""
CREATE OR REPLACE MODEL `%s`
REMOTE WITH CONNECTION `%s`
OPTIONS (ENDPOINT = '%s')
""", gemini_model, conn, gemini_endpoint);
UPDATE topicwatchdog.claim_registry SET claim_label = NULL WHERE true;
CREATE TEMP TABLE _cluster_examples AS
SELECT
vc.claim_id,
ARRAY_AGG(vc.text ORDER BY dv.views_count DESC LIMIT 50) AS examples,
COUNT(*) AS n
FROM topicwatchdog.video_claims vc
LEFT JOIN topicwatchdog.dim_claims dc ON dc.claim_id = vc.claim_id
LEFT JOIN topicwatchdog.claim_registry tr ON vc.claim_id = tr.claim_id
LEFT JOIN topicwatchdog.dim_videos dv ON dv.video_youtube_id = vc.video_youtube_id
WHERE claim_label IS NULL OR claim_label = ''
GROUP BY vc.claim_id;
select * from _cluster_examples;
CREATE TEMP TABLE _prompts AS
SELECT
claim_id,
(
'Du bist politischer Analyst. Vergib eine prägnante, verständlichen ' ||
'Behauptung (max. 120 Zeichen) für diesen **politische Behauptung**-Cluster. ' ||
'Regeln: deutsch, neutral, keine neuen Behauptung/Namen erfinden, ' ||
'nur die häufigste Behauptung, EINE kurze Behauptung. Antworte strikt Text ohne Markdown oder json. ' ||
'\n\nOriginal-Behauptungen:\n' ||
STRING_AGG('- ' || ex, '\n')
) AS prompt
FROM _cluster_examples, UNNEST(examples) AS ex
GROUP BY claim_id;
select * from _prompts;
CREATE TEMP TABLE _raw_llm AS
SELECT
claim_id,
ml_generate_text_llm_result AS claim_label
FROM ML.GENERATE_TEXT(
MODEL `topicwatchdog.gemini_remote`,
TABLE _prompts,
STRUCT(
TRUE AS flatten_json_output,
0.2 AS temperature,
256 AS max_output_tokens
)
) AS gen;
select * from _raw_llm;
MERGE `topicwatchdog.claim_registry` dst
USING (
SELECT claim_id, claim_label
FROM _raw_llm
WHERE claim_label IS NOT NULL
) src
ON dst.claim_id = src.claim_id
WHEN MATCHED THEN
UPDATE SET dst.claim_label = src.claim_label
Result:
We now have a persistent table claim_registry
mapping numeric cluster IDs → stable human-readable names like:
| claim_id | claim_label |
| -------- | ---------------------------------------------------------- |
| 1 | Politik versagt bei Sicherheit, Wirtschaft und Sozialem. |
| 2 | Beamte sollen in Rentenkasse einzahlen. |
| … | … |
This makes Looker Studio dashboards far easier to interpret.
9) Results & Charts
Now we have a setup which:
- Reads env config & sets up logging/tracking.
- Ensures a BERTopic model exists (train once, then reuse via GCS).
- Loads pre-computed embeddings for
BATCH_DATE
. - Runs **
transform
** to getinternal_claim_id
+confidence
. - Bootstraps registry entries for new internal clusters, including labels, keywords, and centroids (per model version).
- Maps internal → stable **
claim_id
** and upserts intovideo_claims
. - Emits basic metrics (count, outlier rate) for ops visibility.
I added a Looker Studio view for claims similar to Week-3’s topics dashboard:
- Claims Overview — daily histogram of processed claim snippets with mean confidence.
- Top Claim Clusters — largest clusters by count; quick inspection of labels/keywords.
- Claims of this Claims Cluster - show which claims are in which claims cluster
- Videos matching these Claims - show which videos belong into the cluster or claim
Observation: Even with stable IDs, many semantically similar claims still receive different claim_ids
after retraining or as new data arrives. This shows how sensitive claim clustering remains to embedding and boundary changes.
Example cluster sample:
- The Earth is flat
- The Earth is not flat
- The Earth is round
- The Earth is a sphere
are mixed with other claims which are not part of this at all. Thus it's quite disappointing even though the claims clusters stay static, there are still many different claims mixed into one cluster.
10) Limitations & Next Steps
Even with a registry, this approach is sensitive to cluster drift:
ID instability Small changes in embeddings or HDBSCAN boundaries create new internal IDs, which then get new stable IDs — fragmenting continuity.
Over-/Under-grouping Near-duplicate claims can split; broader claims sometimes lump together.
Next steps:
This closes Week-4. I will look into a more basic example like (flat earth and such topics) and evaluate a multi stage way to cluster harmonize claims made in that. BERTopic itself works well technically, but the claims fed into it need better canonicalization and deduplication to yield meaningful clusters.