Streaming & Incremental
Match new records against existing data in real time. GoldenMatch supports single-record matching, micro-batch streaming, and CLI-based incremental matching.
match_one
The core primitive for streaming. Matches a single record against an existing DataFrame.
import goldenmatch as gm
matches = gm.match_one(
{"name": "John Smith", "zip": "10001"},
existing_df,
matchkey,
)
# Returns list of (row_id, score) tuples
match_one works with fuzzy (weighted) matchkeys. For exact matchkeys (threshold=None), use find_exact_matches with a Polars join instead.
StreamProcessor
Incremental record matching with immediate or micro-batch processing. Wraps match_one and add_to_cluster for continuous operation.
import goldenmatch as gm
processor = gm.StreamProcessor(existing_df, config)
# Process a single record immediately
matches = processor.process_record({"name": "John Smith", "zip": "10001"})
# Micro-batch mode: buffer records and process in batches
for record in incoming_records:
processor.add_record(record)
# Flush the buffer
results = processor.flush()
Immediate mode
Each record is matched and clustered as it arrives:
processor = gm.StreamProcessor(df, config)
result = processor.process_record(new_record)
# result includes matches and updated cluster assignments
Micro-batch mode
Buffer records and process them together for better throughput:
processor = gm.StreamProcessor(df, config)
for record in batch:
processor.add_record(record)
batch_results = processor.flush()
Incremental cluster updates
When a new record matches existing records, update the cluster structure:
import goldenmatch as gm
# Add a record to a cluster (join or merge clusters)
gm.add_to_cluster(record_id, matches, clusters)
# Remove a record from its cluster and re-cluster
gm.unmerge_record(record_id, clusters)
# Shatter a cluster into singletons
gm.unmerge_cluster(cluster_id, clusters)
add_to_cluster handles three cases:
- New record matches records in one cluster – joins that cluster
- New record matches records in multiple clusters – merges those clusters
- New record has no matches – creates a singleton cluster
Incremental CLI
Match new CSV records against an existing base dataset:
goldenmatch incremental base.csv --new new_records.csv --config config.yaml
The incremental CLI handles exact and fuzzy matchkeys separately:
- Exact matchkeys: Polars join between new and base records (fast)
- Fuzzy matchkeys:
match_onebrute-force against the base (thorough)
ANN incremental operations
For embedding-based matching, the ANN index supports incremental updates:
from goldenmatch.core.blocker import ANNBlocker
blocker = ANNBlocker(model="all-MiniLM-L6-v2")
blocker.add_to_index(embedding) # Add a single embedding
neighbors = blocker.query_one(embedding) # Find nearest neighbors
The ANN index is persistent when using database sync mode. Embeddings are computed progressively across runs (100K per run).
Database watch mode
Continuously monitor a database table for new records and match them incrementally:
goldenmatch watch --table customers --connection-string "$DATABASE_URL" --interval 30
Daemon mode
Run watch as a background service with health endpoint and PID file:
goldenmatch watch --table customers --connection-string "$DATABASE_URL" --daemon
Daemon mode adds:
- HTTP health endpoint at
/health - PID file for process management
- SIGTERM handling for graceful shutdown
Database sync
Full incremental matching against a live Postgres database:
# First run: full scan
goldenmatch sync --table customers --connection-string "$DATABASE_URL" --config config.yaml
# Subsequent runs: incremental (only new records)
goldenmatch sync --table customers --connection-string "$DATABASE_URL"
Features:
- Incremental sync – only processes records added since last run
- Hybrid blocking – SQL WHERE clauses for exact fields + FAISS ANN for semantic fields
- Persistent ANN index – disk cache + DB source of truth
- Golden record versioning – append-only with
is_currentflag
run_stream
Run a streaming pipeline programmatically:
import goldenmatch as gm
result = gm.run_stream(existing_df, config, new_records)
Architecture
New Record
|
v
match_one() --> (row_id, score) pairs
|
v
add_to_cluster() --> updated cluster dict
|
v
build_golden_record() --> updated canonical record
For database mode:
Postgres table (new rows)
|
v
watch/sync --> load new records
|
v
Hybrid blocking (SQL + ANN)
|
v
Score + cluster + golden
|
v
Write back to gm_clusters, gm_golden_records