Dagster + NATS イベント駆動パイプライン設計と実装
UIプロンプトとLLMレスポンスのペアデータセット保存を実現するDagster + NATS統合、非同期パイプラインオーケストレーション、イベントルーティング、PostgreSQL監査ログ設計
概要
AI推論システムにおいて、ユーザーのプロンプトとLLMの生成レスポンスを正確に記録し、後続の分析や再現に活用することは重要になってくる。本稿では、agent-gatewayプロジェクトで実装されたDagster + NATS統合システムについて、設計思想、実装上の課題、そして本番環境への展開を記述する。
背景:なぜイベント駆動パイプラインが必要か
従来のアプローチの限界
初期段階では、知識サービス(Knowledge Service)がチャットレスポンス直後にPostgreSQLへ同期的にデータを書き込んでいた。しかし、以下の課題が浮上した:
- レスポンス遅延:後処理がレスポンス時間に含まれる
- スケーラビリティの欠如:複数の後処理ステップが逐次実行される
- 可観測性の低さ:どの処理がいつ失敗したのか把握が困難
- 再現不可能な処理フロー:RAGロジックの改善や検証が難しい
解決策の方向性
これらの課題を解決するため、メッセージキュー(NATS)とワークフロー管理ツール(Dagster)の組み合わせを採用することにした。
アーキテクチャ設計
システム全体図
Knowledge Service
↓ (NATS pub)
NATS Server (JetStream)
↓ (NATS sub)
NATS Consumer Worker
↓ (GraphQL)
Dagster GraphQL API
↓
Dagster Daemon/WebServer
↓ (execute ops)
PostgreSQL (audit logs)
コンポーネント間の通信フロー
- Knowledge ServiceがUIプロンプトとLLMレスポンスをペアでNATSに発行
- NATS Consumer Workerがメッセージを購読
- Dagster GraphQL APIを呼び出してジョブを実行
- Dagster OpsがデータをPostgreSQLに永続化
NATS メッセージフォーマット設計
チャット永続化メッセージの標準スキーマ:
{
"pipeline_id": "knowledge.chat.persist",
"correlation_id": "unique-trace-id",
"idempotency_key": "idempotency-unique-id",
"model": "llm-model-name",
"llm_model": "alternative-field-for-compatibility",
"prompt": "user-provided-prompt-text",
"response": "llm-generated-response-text",
"messages": [{"role": "user", "content": "..."}],
"usage": {"prompt_tokens": 100, "completion_tokens": 50},
"requested_at": 1697845200000,
"payload_json": "additional structured data"
}
設計上の考慮事項
- correlation_id:エンドツーエンドのトレーシング用に、複数のパイプライン段階を通して伝播
- idempotency_key:同じメッセージが複数回処理されてもべき等性を保証
- payload_json:拡張性。追加メタデータやカスタムフィールドをJSONBで保存
Dagster パイプラインの実装
ジョブとオペレーション
@job(name="knowledge_chat_persist")
def knowledge_chat_persist():
persist_chat_to_pg(load_payload_from_config())
load_payload_from_config Op
- NATSメッセージのJSON文字列を辞書に変換
- デコード失敗時はフォールバック処理
persist_chat_to_pg Op
CREATE TABLE knowledge_chat_audit (
id BIGSERIAL PRIMARY KEY,
pipeline_id TEXT NOT NULL,
correlation_id TEXT,
idempotency_key TEXT,
model TEXT,
prompt TEXT,
response TEXT,
payload_json JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
このテーブルに以下の処理で書き込み:
- テーブルが存在しない場合は作成(冪等)
- 設定値からcorrelation_id、idempotency_key、プロンプト、レスポンスを抽出
- PostgreSQLに監査レコードを挿入
リソース設計
PostgresResource:
class PostgresResource(ConfigurableResource):
dsn: str = "postgresql://postgres:postgres@postgres:5432/agent_gateway"
def get_connection(self):
actual_dsn = os.getenv("POSTGRES_DSN", self.dsn)
return psycopg2.connect(actual_dsn)
環境変数による上書き可能な設計で、開発環境と本番環境での接続文字列を切り分け。
NATS Consumer Worker の実装
初期実装から拡張への進化
Phase 1: 単純な単一ジョブ路由
async def message_handler(msg):
payload = json.loads(msg.data.decode())
submit_dagster_run(
job_name="knowledge_chat_persist",
payload=payload,
correlation_id=payload.get("correlation_id", "")
)
この単純な実装では、1つのトピックに1つのジョブが対応していた。
Phase 2: マルチジョブ・ファンアウト
処理要件の拡張により、同じメッセージから複数のジョブを起動する必要が生じた:
if subject == "pipeline.knowledge.chat.persist":
# 監査ログ
submit_dagster_run("knowledge_chat_persist", ...)
# ペアデータセット資産化
submit_dagster_run("knowledge_chat_pair_materialize", ...)
Phase 3: テーブル駆動設計(最終版)
トピック数が増加するにつれ、スケーラビリティの問題が明白に。テーブル駆動アプローチに移行:
TOPIC_JOB_MAP = {
"pipeline.knowledge.chat.persist": [
("knowledge_chat_persist", build_audit_run_config),
("knowledge_chat_pair_materialize", build_asset_run_config),
],
"pipeline.knowledge.embedding": [
("knowledge_embedding", build_event_run_config),
],
"pipeline.knowledge.retrieve": [
("knowledge_retrieve", build_event_run_config),
],
"pipeline.knowledge.compose": [
("knowledge_compose", build_event_run_config),
],
}
この設計により:
- トピック数が増えても、コードの複雑度はO(1)に保持
- 新しいトピック・ジョブマッピングは、テーブルへのエントリ追加のみ
- 各ジョブの設定ビルダ関数を独立して管理可能
GraphQL APIを使用した非同期ジョブ起動
Dagster GraphQL Mutation:
mutation LaunchRun($executionParams: LaunchRunExecutionParam!) {
launchRun(executionParams: $executionParams) {
__typename
... on LaunchRunSuccess {
run { runId }
}
... on RunConfigValidationInvalid {
errors { message reason }
}
}
}
Python実装:
def submit_dagster_run(job_name: str, run_config: dict, correlation_id: str):
variables = {
"executionParams": {
"selector": {
"repositoryLocationName": "dev_repo",
"repositoryName": "__repository__",
"jobName": job_name
},
"runConfigData": run_config,
"mode": "default"
}
}
response = requests.post(DAGSTER_GRAPHQL_URL, json={...})
run_id = response.json()["data"]["launchRun"]["run"]["runId"]
logger.info(f"Launched run {run_id}")
非同期メッセージ処理の要点
async def message_handler(msg):
# デコード、バリデーション
payload = json.loads(msg.data.decode())
correlation_id = payload.get("correlation_id", "")
# トピック→ジョブマッピング
routes = TOPIC_JOB_MAP.get(msg.subject)
if not routes:
logger.warning(f"No mapping for {msg.subject}")
return
# 複数ジョブへのファンアウト
for job_name, build_config in routes:
submit_dagster_run(
job_name=job_name,
run_config=build_config(payload, correlation_id),
correlation_id=correlation_id
)
async def main():
nc = NATS()
await nc.connect(NATS_URL)
# 全トピックをサブスクライブ
for topic in TOPIC_JOB_MAP:
await nc.subscribe(topic, cb=message_handler)
PostgreSQL スキーマ設計
knowledge_chat_audit テーブル
チャット永続化の監査ログ。相関ID、べき等性キー、モデル情報をトラッキング。
knowledge_events テーブル(拡張版)
複数の知識処理ステップ(embedding、retrieve、compose)が共有する統一イベントテーブル:
CREATE TABLE knowledge_events (
id BIGSERIAL PRIMARY KEY,
event_id TEXT UNIQUE, -- SHA256(event_type:correlation_id)[:32]
event_type TEXT NOT NULL, -- 'embedding' | 'retrieve' | 'compose'
correlation_id TEXT NOT NULL,
pipeline_id TEXT NOT NULL,
llm_model TEXT,
payload_json JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_knowledge_events_type ON knowledge_events (event_type);
CREATE INDEX idx_knowledge_events_corr ON knowledge_events (correlation_id);
CREATE INDEX idx_knowledge_events_date ON knowledge_events (created_at::date);
- event_id:UNIQUEコンストレイントにより重複処理を防止
- event_type:判別カラムにより、複数のジョブタイプが同一テーブルに書き込み可能
- correlation_id:複数テーブル間でのトレーシング
- インデックス:event_typeとcorrelation_idで高速検索、created_atで日付単位の集計を加速
Docker Compose での統合
services:
nats:
image: nats:2.11-alpine
command: ["-js"]
ports: ["4222:4222", "8222:8222"]
postgres:
image: agent-gateway/postgres:18-jit-vector
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: agent_gateway
dagster-user-code:
environment:
POSTGRES_DSN: postgresql://postgres:postgres@postgres:5432/agent_gateway
command: [dagster, api, grpc, -h, 0.0.0.0, -p, 4000]
dagster-webserver:
environment:
POSTGRES_DSN: postgresql://postgres:postgres@postgres:5432/agent_gateway
command: [dagster-webserver, -h, 0.0.0.0, -p, 3000]
dagster-daemon:
command: [dagster-daemon, run]
NATS Consumer WorkerはKubernetesまたはSystemdサービスとして独立して実行。
実装上の課題と解決策
課題1: 順序保証の欠如
問題:複数の知識処理ステップ(embedding → retrieve → compose)が非同期で実行される場合、実行順序が保証されない。
解決策:
- Dagster Asset依存関係を活用して、asset化したチャットペアデータに依存する後処理を定義
- correlation_idで関連イベントを結合し、事後的に順序を検証
課題2: べき等性の保証
問題:ネットワーク障害によるメッセージの再送で、同じデータが複数回挿入される可能性。
解決策:
- idempotency_keyをNATSメッセージに含める
- PostgreSQL UPSERT(
ON CONFLICT)またはevent_idのUNIQUEコンストレイントで重複を排除
課題3: エラーハンドリングと再試行
問題:Dagster GraphQL呼び出しが失敗した場合、メッセージが喪失される可能性。
解決策:
- NATS メッセージを明示的にACKするタイミングを遅延(Dagster起動成功後)
- 失敗したメッセージはNATS JetStreamの再試行キューに保持
- Dagster失敗時のログ出力で、手動介入の契機を提供
本番運用の工夫
監視とアラート
logger.info(f"Submitted job {job_name} to Dagster (correlation_id={correlation_id})")
logger.error(f"Failed to launch run: {launch_result}")
correlation_idをログに含めることで、ユーザー操作からパイプライン実行まで全て追跡可能。
トレーシング
correlation_idは以下の箇所で伝播:
- Knowledge Service → NATS メッセージ
- NATS → Dagster run_config
- Dagster → PostgreSQL (correlation_id カラム)
- PostgreSQL → 分析クエリ(「このユーザーの操作は全体どのように処理されたか」を検索)
得られた知見
1. 非同期パイプラインの設計
イベント駆動アーキテクチャでは、メッセージスキーマの設計が極めて重要。correlation_idのような追跡情報を早期に盛り込むことで、後の運用負荷を大幅に削減できる。
2. スケーラビリティと保守性
初期的な条件分岐ロジックから、テーブル駆動設計への進化は、コード複雑度の増加を抑制しつつ、機能を拡張するための実践的なパターンを示唆する。
3. データベース設計での統一性
複数のイベント種別が異なるテーブルに分散していると、統合分析が困難。共有イベントテーブルに統合し、event_type判別カラムで多態性を実現する設計が有効。
4. GraphQL APIの活用
REST APIと異なり、GraphQL APIは必要なフィールドのみ取得できるため、ペイロード削減とレスポンス高速化に貢献。また、mutation の戻り値で実行結果を即座に検証でき、エラーハンドリングが容易。
まとめ
Dagster + NATS統合により、以下を実現した:
- 非同期処理:ユーザー向けレスポンスと後処理を分離し、レスポンス遅延を大幅に削減
- スケーラビリティ:テーブル駆動設計で、ジョブ数の増加に対応
- 可観測性:correlation_idとloggingにより、全処理フロー追跡可能
- 再現性:Dagster lineageにより、RAG処理が再現可能なassetとして記録
次のステップとしては、Dagster Asset Materializationを活用した、チャットペアの自動キュレーション、さらには微調整データセットの自動生成が考えられる。

