Dagster + NATS Event-Driven Pipeline Design and Implementation
Implementation of asynchronous data persistence for UI prompts and LLM responses using Dagster orchestration and NATS pub/sub messaging, including event routing, audit logging, and distributed system observability
Overview
In AI inference systems, accurately recording user prompts and LLM-generated responses becomes increasingly important for analysis and reproducibility. This article documents the Dagster + NATS integration implemented in the agent-gateway project, covering design philosophy, implementation challenges, and production deployment.
Background: Why Event-Driven Pipelines Matter
Limitations of Synchronous Approach
Initially, the Knowledge Service performed direct, synchronous writes to PostgreSQL immediately after generating chat responses. This approach exposed several critical issues:
- Response Latency: Post-processing adds to user-facing response time
- Scalability Issues: Multiple sequential post-processing steps create bottlenecks
- Poor Observability: Difficult to track which operations succeed or fail
- Non-Reproducible Workflows: RAG behavior improvements and validation become complex
Architectural Solution
To address these concerns, the system adopted a message queue (NATS) and workflow orchestration tool (Dagster) combination.
Architecture Design
System Overview
Knowledge Service
↓ (NATS pub)
NATS Server (JetStream)
↓ (NATS sub)
NATS Consumer Worker
↓ (GraphQL)
Dagster GraphQL API
↓
Dagster Daemon/WebServer
↓ (execute ops)
PostgreSQL (audit logs)
Information Flow Between Components
- Knowledge Service publishes prompt-response pairs to NATS
- NATS Consumer Worker subscribes to messages
- Invokes Dagster GraphQL API to trigger job execution
- Dagster Operations persist data to PostgreSQL
NATS Message Schema Design
Standard format for chat persistence messages:
{
"pipeline_id": "knowledge.chat.persist",
"correlation_id": "unique-trace-id",
"idempotency_key": "idempotency-unique-id",
"model": "llm-model-name",
"llm_model": "alternative-field-for-compatibility",
"prompt": "user-provided-prompt-text",
"response": "llm-generated-response-text",
"messages": [{"role": "user", "content": "..."}],
"usage": {"prompt_tokens": 100, "completion_tokens": 50},
"requested_at": 1697845200000,
"payload_json": "additional structured data"
}
Design Considerations
- correlation_id: Enables end-to-end tracing across multiple pipeline stages
- idempotency_key: Guarantees idempotent processing—same message processed multiple times produces consistent results
- payload_json: Extensibility mechanism allowing arbitrary metadata and custom fields stored as JSONB
Dagster Pipeline Implementation
Jobs and Operations
@job(name="knowledge_chat_persist")
def knowledge_chat_persist():
persist_chat_to_pg(load_payload_from_config())
load_payload_from_config Operation
- Converts NATS JSON message string to dictionary
- Implements fallback handling for decode failures
persist_chat_to_pg Operation
CREATE TABLE knowledge_chat_audit (
id BIGSERIAL PRIMARY KEY,
pipeline_id TEXT NOT NULL,
correlation_id TEXT,
idempotency_key TEXT,
model TEXT,
prompt TEXT,
response TEXT,
payload_json JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Processing logic:
- Create table if not exists (idempotent)
- Extract correlation_id, idempotency_key, prompt, and response from configuration
- Insert audit record into PostgreSQL
Resource Design
PostgresResource:
class PostgresResource(ConfigurableResource):
dsn: str = "postgresql://postgres:postgres@postgres:5432/agent_gateway"
def get_connection(self):
actual_dsn = os.getenv("POSTGRES_DSN", self.dsn)
return psycopg2.connect(actual_dsn)
Environment variable override enables connection string separation between development and production.
NATS Consumer Worker Implementation
Evolution from Simple to Scalable Design
Phase 1: Simple Single-Job Routing
async def message_handler(msg):
payload = json.loads(msg.data.decode())
submit_dagster_run(
job_name="knowledge_chat_persist",
payload=payload,
correlation_id=payload.get("correlation_id", "")
)
This initial implementation mapped one topic to one job.
Phase 2: Multi-Job Fan-Out
Expanding requirements necessitated launching multiple jobs from a single message:
if subject == "pipeline.knowledge.chat.persist":
# Audit logging
submit_dagster_run("knowledge_chat_persist", ...)
# Dataset materialization
submit_dagster_run("knowledge_chat_pair_materialize", ...)
Phase 3: Table-Driven Design (Final Version)
As topic count grew, explicit conditional routing became unmaintainable. Table-driven design solved scalability:
TOPIC_JOB_MAP = {
"pipeline.knowledge.chat.persist": [
("knowledge_chat_persist", build_audit_run_config),
("knowledge_chat_pair_materialize", build_asset_run_config),
],
"pipeline.knowledge.embedding": [
("knowledge_embedding", build_event_run_config),
],
"pipeline.knowledge.retrieve": [
("knowledge_retrieve", build_event_run_config),
],
"pipeline.knowledge.compose": [
("knowledge_compose", build_event_run_config),
],
}
Advantages:
- Code complexity remains O(1) as topics increase
- New topic-job mappings require only table entries
- Configuration builders for each job remain independent
Async Job Triggering via GraphQL API
Dagster GraphQL Mutation:
mutation LaunchRun($executionParams: LaunchRunExecutionParam!) {
launchRun(executionParams: $executionParams) {
__typename
... on LaunchRunSuccess {
run { runId }
}
... on RunConfigValidationInvalid {
errors { message reason }
}
}
}
Python implementation:
def submit_dagster_run(job_name: str, run_config: dict, correlation_id: str):
variables = {
"executionParams": {
"selector": {
"repositoryLocationName": "dev_repo",
"repositoryName": "__repository__",
"jobName": job_name
},
"runConfigData": run_config,
"mode": "default"
}
}
response = requests.post(DAGSTER_GRAPHQL_URL, json={...})
run_id = response.json()["data"]["launchRun"]["run"]["runId"]
logger.info(f"Launched run {run_id}")
Async Message Processing Pattern
async def message_handler(msg):
# Decode and validate
payload = json.loads(msg.data.decode())
correlation_id = payload.get("correlation_id", "")
# Topic-to-job mapping lookup
routes = TOPIC_JOB_MAP.get(msg.subject)
if not routes:
logger.warning(f"No mapping for {msg.subject}")
return
# Multi-job fan-out
for job_name, build_config in routes:
submit_dagster_run(
job_name=job_name,
run_config=build_config(payload, correlation_id),
correlation_id=correlation_id
)
async def main():
nc = NATS()
await nc.connect(NATS_URL)
# Subscribe to all topics
for topic in TOPIC_JOB_MAP:
await nc.subscribe(topic, cb=message_handler)
PostgreSQL Schema Design
knowledge_chat_audit Table
Audit log for chat persistence, tracking correlation ID, idempotency key, and model information for comprehensive lineage.
knowledge_events Table (Unified)
Shared event table for multiple knowledge processing steps (embedding, retrieve, compose):
CREATE TABLE knowledge_events (
id BIGSERIAL PRIMARY KEY,
event_id TEXT UNIQUE, -- SHA256(event_type:correlation_id)[:32]
event_type TEXT NOT NULL, -- 'embedding' | 'retrieve' | 'compose'
correlation_id TEXT NOT NULL,
pipeline_id TEXT NOT NULL,
llm_model TEXT,
payload_json JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_knowledge_events_type ON knowledge_events (event_type);
CREATE INDEX idx_knowledge_events_corr ON knowledge_events (correlation_id);
CREATE INDEX idx_knowledge_events_date ON knowledge_events (created_at::date);
- event_id: UNIQUE constraint prevents duplicate processing
- event_type: Discriminator column enables multiple job types sharing single table
- correlation_id: Tracing across multiple tables
- Indexes: Enable fast filtering by event_type and correlation_id, plus date-based aggregation
Docker Compose Integration
services:
nats:
image: nats:2.11-alpine
command: ["-js"]
ports: ["4222:4222", "8222:8222"]
postgres:
image: agent-gateway/postgres:18-jit-vector
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: agent_gateway
dagster-user-code:
environment:
POSTGRES_DSN: postgresql://postgres:postgres@postgres:5432/agent_gateway
command: [dagster, api, grpc, -h, 0.0.0.0, -p, 4000]
dagster-webserver:
environment:
POSTGRES_DSN: postgresql://postgres:postgres@postgres:5432/agent_gateway
command: [dagster-webserver, -h, 0.0.0.0, -p, 3000]
dagster-daemon:
command: [dagster-daemon, run]
NATS Consumer Worker runs independently via Kubernetes or Systemd service.
Implementation Challenges and Solutions
Challenge 1: Ordering Guarantees
Problem: Multiple knowledge processing steps (embedding → retrieve → compose) execute asynchronously with no guaranteed order.
Solution:
- Use Dagster Asset dependencies to enforce order of asset-materialized chat pairs
- Correlate related events via correlation_id and validate order in post-processing
Challenge 2: Idempotency Assurance
Problem: Network failures causing message retransmission risk duplicate inserts.
Solution:
- Include idempotency_key in NATS messages
- Use PostgreSQL UPSERT (
ON CONFLICT) or UNIQUE constraint on event_id to eliminate duplicates
Challenge 3: Error Handling and Retries
Problem: Failed Dagster GraphQL calls risk losing messages.
Solution:
- Delay NATS message acknowledgment until after successful Dagster invocation
- Unacknowledged messages remain in NATS JetStream retry queue
- Detailed logging of failures provides intervention triggers
Production Operations
Monitoring and Alerting
logger.info(f"Submitted job {job_name} to Dagster (correlation_id={correlation_id})")
logger.error(f"Failed to launch run: {launch_result}")
Including correlation_id in logs enables complete tracing from user action through pipeline execution.
Distributed Tracing
correlation_id propagates through:
- Knowledge Service → NATS message
- NATS → Dagster run_config
- Dagster → PostgreSQL (correlation_id column)
- PostgreSQL → Analytics queries (“How was this user’s operation processed end-to-end?”)
Key Learnings
1. Event-Driven Architecture Design
Message schema design becomes increasingly important. Early inclusion of tracing information like correlation_id significantly reduces operational burden.
2. Scalability and Maintainability
Evolution from explicit conditional routing to table-driven design demonstrates a practical pattern for adding features while controlling code complexity.
3. Database Unification
Instead of fragmenting events across multiple tables, unifying them in a single table with event_type discriminator enables simpler aggregation and analysis.
4. GraphQL API Benefits
Compared to REST, GraphQL APIs enable selective field retrieval, reducing payload size and improving response time. Mutation return values enable immediate result validation and simplified error handling.
Conclusion
The Dagster + NATS integration achieved:
- Asynchronous Processing: User-facing responses decouple from post-processing, significantly reducing response latency
- Scalability: Table-driven design accommodates job growth
- Observability: correlation_id and logging provide complete flow traceability
- Reproducibility: Dagster lineage records RAG processing as reproducible assets
Future enhancements include automated prompt-response curation using Dagster Asset Materialization and synthetic fine-tuning dataset generation.

