On this page
architecture
Rust + NATS + Dagster AI知能工場:OpenAI互換プロキシ、冪等設計、SSEストリーミング、Go移行の全記録
Rust(axum)によるOpenAI互換プロキシ、NATS Core/JetStreamイベント中継、Dagster oneshotジョブ実行、PG冪等設計、Qdrantセマンティックキャッシュ、SSEストリーミング、Quadlet/systemd統合。そしてGoへの全面移行。AI知能工場のアーキテクチャ全貌。
技術メモとして残していた ObsidianノートにAIの要約を利用しています。
背景
LLMを業務パイプラインに組み込むには、単純なRequest-Responseでは足りないことが多い。複数LLMの並列推論、SSEによる即応フィードバック、冪等なリトライ、トレーサビリティが必要。
Rust(axum)で薄型OpenAI互換プロキシを作り、NATS Coreでイベント中継し、Dagster oneshotジョブで重い処理を実行する「AI知能工場」を設計した。最終的にGoへ全面移行した経緯も含めて記録する。
コンポーネント責務
| レイヤー | 技術 | 責務 |
|---|---|---|
| エントリポイント | Rust/Go + axum | OpenAI互換EP、trace_id採番、SSE保持 |
| メッセージング | NATS Core | Pub/Subイベント中継(evt.chat.{trace_id}) |
| 実行エンジン | Dagster + systemd | oneshot型ジョブ実行(常駐しない) |
| オブザーバビリティ | Loki + Grafana + Dagit | trace_id付きログ、Lineage追跡 |
| キャッシュ/保存 | PostgreSQL + Qdrant | 冪等ログ + セマンティックキャッシュ |
通信フロー
- クライアント → Rust API:
/v1/chat/completions(stream=true)。trace_id発行しSSE開始 - Rust → Dagster: systemd/Quadletでoneshot起動(
dagster-<job>@{trace_id}) - Dagster (op/pipeline): ローカルLLM並列実行。トークンや進捗を
evt.chat.{trace_id}でNATSにpublish。成果物コミット後にfinishedを一度だけ送信 - Rust API:
subscribe("evt.chat.{trace_id}")でNATS購読、OpenAI互換chunkに整形してSSEに流す - 保存: ストリーム完了時にPG/QdrantへUPSERT
イベントスキーマ
{"type":"role","role":"assistant"}
{"type":"token","text":"...","task":"A"}
{"type":"tool_call","name":"search","arguments":"{...}"}
{"type":"usage","usage":{...}}
{"type":"finished","reason":"stop","winner":"llama3.1-8b"}
Rust APIがこれをOpenAI互換のdata: {...}\n\n形式に変換してSSEで送信。
冪等・リトライ設計
- req_id = sha256(model + messages + params)。同一リクエストの重複実行を吸収
- PGスキーマ:
idempotency_log(key PK, status, result, updated_at)+completions_cache(req_id PK, model, content, usage, created_at) - 全副作用はUPSERT/unique制約で重複吸収
- ネットワーク系は指数バックオフ + 最大試行回数
- finishedは成果物コミット後に一度だけ
OpenAI互換プロキシ拡張
標準OpenAI APIにx_フィールドで独自機能を注入:
x_route: direct(素通し)/ rag(知識検索)/ workflow(非同期ジョブ)x_adapter: LoRAアダプタ指定x_project: プロジェクト識別子- 透過的RAG: Qdrant/PGから文脈を自動挿入(クライアントは意識しない)
NATS Core → JetStream移行パス
初期はNATS Coreで最薄構成。喪失容認 + 冪等で堅牢化。
JetStream移行時の差分:
- REQストリーム追加: Rust→
js.publish("req.chat", {...})、Dagsterはpull consumerでfetch→ack。確実受付・再試行 - EVTはpush consumer: deliver_subjectを割当て、push配信をSSEへ即中継
Quadlet/systemd統合
# debate@.container (Type=oneshot)
ContainerEnv=TRACE_ID=%i
AutoRemove=yes
Restart=on-failure
After=nats.service
Rust側からsystemd-run --user -u dagster-debate@{trace_id}で起動。loginctl enable-linger ksh3でユーザーsystemdを常駐。
ディレクトリ構成
/dagster
/systemd # Quadletテンプレート
/entrypoints # run_xxx.py(TRACE_IDを受け取って実行)
/repo # ops/jobs/assets/pipelines + definitions.py
/instance # DAGSTER_HOME共有
/fn # 共通関数(NATS publish, PG/Qdrant I/O等)
RustからGoへの移行
移行の理由
Rustで設計した構成は精密だが、大量の非同期処理(SSE中継、NATS Pub/Sub、並列検索)の記述コストが高く、柔軟性とのバランスが悪かった。
- SSEの中継 + NATSの購読 + PG/Qdrantへの書き込みが同時に走る非同期コンテキストの管理がRustでは冗長
- goroutine + channelの方がこのパターンに素直にフィットする
- 実行時オーバーヘッドの差はこのユースケースでは無視できるレベル
Goで保持した設計原則
- OpenAI互換エンドポイント
- NATS Pub/Subによるイベント駆動
- Dagster oneshotジョブの起動
- trace_id付きの全ログ
- 冪等設計(PG UPSERT)
Rust版の設計ドキュメントはそのまま「仕様書」として価値がある。Go実装はRust設計の「簡潔な翻訳」。
運用ノブ
- 並列度: Dagster Dynamic Mapping + run tags、またはsystemd同時Start制限
- タイムアウト: EP全体/ジョブ個別のTimeout設定
- キャッシュ閾値: Qdrant類似度 >= 0.92で「暫定応答」制御
- キャンセル: クライアント切断で
cancel.{trace_id}をNATSにpublish → op側で早期終了
観測
- ログ: JSONでtrace_id必須。Promtail→Loki→Grafanaで
{trace_id="..."}|json検索 - メトリクス: SSEレイテンシ、ドロップ率、oneshot成功率をダッシュボード化
- Dagit: 共通DAGSTER_HOME/Run Storage(Postgres推奨)で過去Run/Lineage閲覧

