Overview

In AI inference systems, accurately recording user prompts and LLM-generated responses becomes increasingly important for analysis and reproducibility. This article documents the Dagster + NATS integration implemented in the agent-gateway project, covering design philosophy, implementation challenges, and production deployment.

Background: Why Event-Driven Pipelines Matter

Limitations of Synchronous Approach

Initially, the Knowledge Service performed direct, synchronous writes to PostgreSQL immediately after generating chat responses. This approach exposed several critical issues:

  1. Response Latency: Post-processing adds to user-facing response time
  2. Scalability Issues: Multiple sequential post-processing steps create bottlenecks
  3. Poor Observability: Difficult to track which operations succeed or fail
  4. Non-Reproducible Workflows: RAG behavior improvements and validation become complex

Architectural Solution

To address these concerns, the system adopted a message queue (NATS) and workflow orchestration tool (Dagster) combination.

Architecture Design

System Overview

  Knowledge Service
    ↓ (NATS pub)
NATS Server (JetStream)
    ↓ (NATS sub)
NATS Consumer Worker
    ↓ (GraphQL)
Dagster GraphQL API
    ↓
Dagster Daemon/WebServer
    ↓ (execute ops)
PostgreSQL (audit logs)
  

Information Flow Between Components

  1. Knowledge Service publishes prompt-response pairs to NATS
  2. NATS Consumer Worker subscribes to messages
  3. Invokes Dagster GraphQL API to trigger job execution
  4. Dagster Operations persist data to PostgreSQL

NATS Message Schema Design

Standard format for chat persistence messages:

  {
  "pipeline_id": "knowledge.chat.persist",
  "correlation_id": "unique-trace-id",
  "idempotency_key": "idempotency-unique-id",
  "model": "llm-model-name",
  "llm_model": "alternative-field-for-compatibility",
  "prompt": "user-provided-prompt-text",
  "response": "llm-generated-response-text",
  "messages": [{"role": "user", "content": "..."}],
  "usage": {"prompt_tokens": 100, "completion_tokens": 50},
  "requested_at": 1697845200000,
  "payload_json": "additional structured data"
}
  

Design Considerations

  • correlation_id: Enables end-to-end tracing across multiple pipeline stages
  • idempotency_key: Guarantees idempotent processing—same message processed multiple times produces consistent results
  • payload_json: Extensibility mechanism allowing arbitrary metadata and custom fields stored as JSONB

Dagster Pipeline Implementation

Jobs and Operations

  @job(name="knowledge_chat_persist")
def knowledge_chat_persist():
    persist_chat_to_pg(load_payload_from_config())
  

load_payload_from_config Operation

  • Converts NATS JSON message string to dictionary
  • Implements fallback handling for decode failures

persist_chat_to_pg Operation

  CREATE TABLE knowledge_chat_audit (
  id BIGSERIAL PRIMARY KEY,
  pipeline_id TEXT NOT NULL,
  correlation_id TEXT,
  idempotency_key TEXT,
  model TEXT,
  prompt TEXT,
  response TEXT,
  payload_json JSONB NOT NULL DEFAULT '{}',
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
  

Processing logic:

  1. Create table if not exists (idempotent)
  2. Extract correlation_id, idempotency_key, prompt, and response from configuration
  3. Insert audit record into PostgreSQL

Resource Design

PostgresResource:

  class PostgresResource(ConfigurableResource):
    dsn: str = "postgresql://postgres:postgres@postgres:5432/agent_gateway"
    
    def get_connection(self):
        actual_dsn = os.getenv("POSTGRES_DSN", self.dsn)
        return psycopg2.connect(actual_dsn)
  

Environment variable override enables connection string separation between development and production.

NATS Consumer Worker Implementation

Evolution from Simple to Scalable Design

Phase 1: Simple Single-Job Routing

  async def message_handler(msg):
    payload = json.loads(msg.data.decode())
    submit_dagster_run(
        job_name="knowledge_chat_persist",
        payload=payload,
        correlation_id=payload.get("correlation_id", "")
    )
  

This initial implementation mapped one topic to one job.

Phase 2: Multi-Job Fan-Out

Expanding requirements necessitated launching multiple jobs from a single message:

  if subject == "pipeline.knowledge.chat.persist":
    # Audit logging
    submit_dagster_run("knowledge_chat_persist", ...)
    # Dataset materialization
    submit_dagster_run("knowledge_chat_pair_materialize", ...)
  

Phase 3: Table-Driven Design (Final Version)

As topic count grew, explicit conditional routing became unmaintainable. Table-driven design solved scalability:

  TOPIC_JOB_MAP = {
    "pipeline.knowledge.chat.persist": [
        ("knowledge_chat_persist", build_audit_run_config),
        ("knowledge_chat_pair_materialize", build_asset_run_config),
    ],
    "pipeline.knowledge.embedding": [
        ("knowledge_embedding", build_event_run_config),
    ],
    "pipeline.knowledge.retrieve": [
        ("knowledge_retrieve", build_event_run_config),
    ],
    "pipeline.knowledge.compose": [
        ("knowledge_compose", build_event_run_config),
    ],
}
  

Advantages:

  • Code complexity remains O(1) as topics increase
  • New topic-job mappings require only table entries
  • Configuration builders for each job remain independent

Async Job Triggering via GraphQL API

Dagster GraphQL Mutation:

  mutation LaunchRun($executionParams: LaunchRunExecutionParam!) {
  launchRun(executionParams: $executionParams) {
    __typename
    ... on LaunchRunSuccess {
      run { runId }
    }
    ... on RunConfigValidationInvalid {
      errors { message reason }
    }
  }
}
  

Python implementation:

  def submit_dagster_run(job_name: str, run_config: dict, correlation_id: str):
    variables = {
        "executionParams": {
            "selector": {
                "repositoryLocationName": "dev_repo",
                "repositoryName": "__repository__",
                "jobName": job_name
            },
            "runConfigData": run_config,
            "mode": "default"
        }
    }
    response = requests.post(DAGSTER_GRAPHQL_URL, json={...})
    run_id = response.json()["data"]["launchRun"]["run"]["runId"]
    logger.info(f"Launched run {run_id}")
  

Async Message Processing Pattern

  async def message_handler(msg):
    # Decode and validate
    payload = json.loads(msg.data.decode())
    correlation_id = payload.get("correlation_id", "")
    
    # Topic-to-job mapping lookup
    routes = TOPIC_JOB_MAP.get(msg.subject)
    if not routes:
        logger.warning(f"No mapping for {msg.subject}")
        return
    
    # Multi-job fan-out
    for job_name, build_config in routes:
        submit_dagster_run(
            job_name=job_name,
            run_config=build_config(payload, correlation_id),
            correlation_id=correlation_id
        )

async def main():
    nc = NATS()
    await nc.connect(NATS_URL)
    
    # Subscribe to all topics
    for topic in TOPIC_JOB_MAP:
        await nc.subscribe(topic, cb=message_handler)
  

PostgreSQL Schema Design

knowledge_chat_audit Table

Audit log for chat persistence, tracking correlation ID, idempotency key, and model information for comprehensive lineage.

knowledge_events Table (Unified)

Shared event table for multiple knowledge processing steps (embedding, retrieve, compose):

  CREATE TABLE knowledge_events (
    id             BIGSERIAL PRIMARY KEY,
    event_id       TEXT UNIQUE,          -- SHA256(event_type:correlation_id)[:32]
    event_type     TEXT NOT NULL,        -- 'embedding' | 'retrieve' | 'compose'
    correlation_id TEXT NOT NULL,
    pipeline_id    TEXT NOT NULL,
    llm_model      TEXT,
    payload_json   JSONB DEFAULT '{}'::jsonb,
    created_at     TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_knowledge_events_type ON knowledge_events (event_type);
CREATE INDEX idx_knowledge_events_corr ON knowledge_events (correlation_id);
CREATE INDEX idx_knowledge_events_date ON knowledge_events (created_at::date);
  
  • event_id: UNIQUE constraint prevents duplicate processing
  • event_type: Discriminator column enables multiple job types sharing single table
  • correlation_id: Tracing across multiple tables
  • Indexes: Enable fast filtering by event_type and correlation_id, plus date-based aggregation

Docker Compose Integration

  services:
  nats:
    image: nats:2.11-alpine
    command: ["-js"]
    ports: ["4222:4222", "8222:8222"]
  
  postgres:
    image: agent-gateway/postgres:18-jit-vector
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: agent_gateway
  
  dagster-user-code:
    environment:
      POSTGRES_DSN: postgresql://postgres:postgres@postgres:5432/agent_gateway
    command: [dagster, api, grpc, -h, 0.0.0.0, -p, 4000]
  
  dagster-webserver:
    environment:
      POSTGRES_DSN: postgresql://postgres:postgres@postgres:5432/agent_gateway
    command: [dagster-webserver, -h, 0.0.0.0, -p, 3000]
  
  dagster-daemon:
    command: [dagster-daemon, run]
  

NATS Consumer Worker runs independently via Kubernetes or Systemd service.

Implementation Challenges and Solutions

Challenge 1: Ordering Guarantees

Problem: Multiple knowledge processing steps (embedding → retrieve → compose) execute asynchronously with no guaranteed order.

Solution:

  • Use Dagster Asset dependencies to enforce order of asset-materialized chat pairs
  • Correlate related events via correlation_id and validate order in post-processing

Challenge 2: Idempotency Assurance

Problem: Network failures causing message retransmission risk duplicate inserts.

Solution:

  • Include idempotency_key in NATS messages
  • Use PostgreSQL UPSERT (ON CONFLICT) or UNIQUE constraint on event_id to eliminate duplicates

Challenge 3: Error Handling and Retries

Problem: Failed Dagster GraphQL calls risk losing messages.

Solution:

  • Delay NATS message acknowledgment until after successful Dagster invocation
  • Unacknowledged messages remain in NATS JetStream retry queue
  • Detailed logging of failures provides intervention triggers

Production Operations

Monitoring and Alerting

  logger.info(f"Submitted job {job_name} to Dagster (correlation_id={correlation_id})")
logger.error(f"Failed to launch run: {launch_result}")
  

Including correlation_id in logs enables complete tracing from user action through pipeline execution.

Distributed Tracing

correlation_id propagates through:

  1. Knowledge Service → NATS message
  2. NATS → Dagster run_config
  3. Dagster → PostgreSQL (correlation_id column)
  4. PostgreSQL → Analytics queries (“How was this user’s operation processed end-to-end?”)

Key Learnings

1. Event-Driven Architecture Design

Message schema design becomes increasingly important. Early inclusion of tracing information like correlation_id significantly reduces operational burden.

2. Scalability and Maintainability

Evolution from explicit conditional routing to table-driven design demonstrates a practical pattern for adding features while controlling code complexity.

3. Database Unification

Instead of fragmenting events across multiple tables, unifying them in a single table with event_type discriminator enables simpler aggregation and analysis.

4. GraphQL API Benefits

Compared to REST, GraphQL APIs enable selective field retrieval, reducing payload size and improving response time. Mutation return values enable immediate result validation and simplified error handling.

Conclusion

The Dagster + NATS integration achieved:

  1. Asynchronous Processing: User-facing responses decouple from post-processing, significantly reducing response latency
  2. Scalability: Table-driven design accommodates job growth
  3. Observability: correlation_id and logging provide complete flow traceability
  4. Reproducibility: Dagster lineage records RAG processing as reproducible assets

Future enhancements include automated prompt-response curation using Dagster Asset Materialization and synthetic fine-tuning dataset generation.