つれづれなる Agent OPS
Flue

FlueのobserveイベントをLangfuseへ流し、IssueトリアージAgentを観測する

Flue 1.0 BetaのobserveイベントをredactionしたうえでLangfuseへ送信し、IssueトリアージWorkflowのrunId、モデル、結果を追う実験ログ。

前回はFlue 1.0 Betaで、GitHub Issueのタイトルと本文を受け取り、severity、再現可否、ラベル候補、要約を返すIssueトリアージAgentを作りました。

つれづれなる Agent OPS Flue 1.0 BetaでGitHub Issueトリアージエージェントを動かしてみた Flue 1.0 BetaのAgent・Skill・Workflowを使い、GitHub Issueのseverity、再現可否、ラベル候補を構造化して返すトリアージエージェントを作った検証ログ。 https://llm-lab.dev/posts/flue-1-0-beta-issue-triage-agent/

同じWorkflowをGitHub Actionsから呼ぶ構成も、先に別記事で試しています。

つれづれなる Agent OPS FlueのWorkflowをGitHub Actionsから呼び、IssueトリアージをCIに寄せる GitHub Issue作成イベントからFlue Workflowをdry-run実行し、常駐サーバーを作る前にCI上で境界を確認した検証ログ。 https://llm-lab.dev/posts/flue-github-actions-issue-triage-workflow/

今回はその続きとして、Flueのobserve(...)で流れてくる実行イベントを拾い、Issue本文をredactionしたうえでLangfuseへ送信します。

やりたいことは、単に「ログを外部サービスへ送る」ことではありません。 IssueトリアージAgentを運用するなら、あとから次の問いに答えられる必要があります。

  • どのIssue入力に対して、どのWorkflow runが動いたのか
  • どのモデルで、structured outputまで完了したのか
  • finishに失敗したケースと成功したケースで、どこに差があったのか
  • 後から評価・改善に使うために、何を保存し、何を保存しないべきか

Flueのobserve(...)は、これらを追うためのアプリ内イベントの入口です。run_startturn_requestoperationrun_endのような粒度で、Workflowの入力、モデル呼び出し、structured output、usageを拾えます。

ただし、observe(...)で得られるイベントは、Workflow payloadやモデル入力を含みます。Issue本文には内部URLやメールアドレスが混ざることがあるため、Langfuseへ送る前に、後から比較したい項目だけを選び直します。今回は本文全文ではなく、redaction済みのbodyPreview、文字数、runId、model、成功/失敗、triage結果を送る方針にしました。

Flueのrun IDとLangfuse payloadのtraceIdが対応しているターミナル

IssueトリアージWorkflowに観測層を足す

今回の検証では、前回作ったIssueトリアージWorkflowの構造は大きく変えず、観測用の薄い層だけを追加しました。見るべきポイントは、Flueの実行イベントをどの単位で拾い、Langfuseへ送る前にどこでredactionするかです。

追加した主なファイルは次の通りです。

src/
├─ observability/
│  ├─ langfuse-adapter.ts  # FlueEventをLangfuse向けpayloadへ変換
│  ├─ redact.ts            # Issue本文のURL、メール、キーらしき文字列を伏せる
│  └─ register.ts          # observe(...)の登録
├─ app.ts                  # observability/registerを読み込む
└─ workflows/
   └─ triage-issue.ts      # log.infoに安全なIssue概要を追加

observeをapp.tsに登録する

Flueのドキュメントでは、observe(...)はアプリケーションのentrypointで登録します。今回の実験場ではsrc/app.tsflue()をrouteする前に、観測登録ファイルをimportしました。

import './providers';
import './observability/register';
import { flue } from '@flue/runtime/routing';
import { Hono } from 'hono';

const app = new Hono();

app.get('/health', (c) =>
	c.json({
		ok: true,
		model: process.env.FLUE_MODEL ?? 'sakura/gpt-oss-120b',
	}),
);

app.route('/', flue());

export default app;

register.tsでは、FLUE_OBSERVE_JSONLが指定されていればrawイベントをJSONLへ保存し、同時にLangfuse向けpayloadへ変換して送信します。

import { observe } from '@flue/runtime';
import { sendToLangfuse, toLangfuseEnvelope, writeJsonl } from './langfuse-adapter';

const observeJsonl = process.env.FLUE_OBSERVE_JSONL;

if (observeJsonl || process.env.FLUE_LANGFUSE_DRY_RUN || process.env.LANGFUSE_PUBLIC_KEY) {
	observe((event) => {
		if (observeJsonl) {
			writeJsonl(observeJsonl, event);
		}

		const envelope = toLangfuseEnvelope(event);
		if (!envelope) return;

		void sendToLangfuse(envelope).catch((error) => {
			console.warn('[observability] failed to export event', event.type, error);
		});
	});
}

ここで重要なのは、rawイベントと外部送信用payloadを分けたことです。 rawイベントはFlueの挙動確認には便利ですが、外部サービスへそのまま送るものではありません。今回はdry-runでも実送信でも、外部送信用payloadをlogs/langfuse-payloads.jsonlへ残し、npm run triage:langfuseでは同じpayloadをLangfuseへ投げます。

何をLangfuseへ送るかを先に決める

最初に決めたのは、FlueのrunIdとLangfuse側のtraceを対応させることです。 Issueトリアージでは、同じ入力をモデル違いで再実行したり、失敗ケースだけ後から見返したりします。そのときに、Flue側のrun_...とLangfuse側のtraceが対応していないと、CLIログ、アプリログ、Langfuse画面が別々の記録になってしまいます。

今回のadapterでは、run_...を使ってtraceIdを作りました。

const runId = event.runId ?? event.instanceId ?? 'no-run-id';
const traceId = `flue-${runId}`;

対象イベントは、いったん次だけに絞りました。

if (!['run_start', 'run_end', 'operation', 'turn_request', 'turn', 'log'].includes(event.type)) {
	return null;
}

message_startmessage_endthinking_deltaまで全部送ると情報量は増えますが、Issue本文や推論途中の内容も入りやすくなります。今回は「後から比較したい単位」に寄せて、run、operation、turn、logだけを見ることにしました。

この絞り込みで見たいのは、主に次の対応関係です。

見たいものFlue側のイベントLangfuse側で見たい情報
Workflowの開始run_starttraceの開始、redaction済み入力概要
モデル呼び出しturn_requestturnmodel、provider、入力概要、成功/失敗
structured outputoperationtriage結果、usage、duration
Workflowの終了run_end最終結果、status

Issue本文はそのまま送らない

検証用payloadには、わざと内部URLとメールアドレスを混ぜました。

実行確認では、同じサンプルIssueを繰り返しWorkflowへ渡せるように、検証用のnpm scriptを用意しました。これはFlueの標準コマンドではなく、中ではflue run triage-issue --target node --payload ...を呼び出しています。

dry-runでpayloadだけ確認する場合は、次を実行します。

npm run triage:observe

Langfuseへ実送信する場合は、.envLANGFUSE_PUBLIC_KEYLANGFUSE_SECRET_KEYLANGFUSE_BASE_URLを入れたうえで、こちらを実行します。

npm run triage:langfuse

標準CLIだけで同じことを確認するなら、上記scriptの代わりにflue run triage-issue --target node --payload ...へ同じJSON payloadを渡せばよいです。

実行すると、Flue側ではrun IDが出ます。

run       run_01KVNW45DM0EGDHVA92D86V935

Langfuseへ送るpayloadの先頭は、次のようになりました。

{
  "traceId": "flue-run_01KVNW45DM0EGDHVA92D86V935",
  "eventType": "run_start",
  "runId": "run_01KVNW45DM0EGDHVA92D86V935",
  "model": "openai/preview/Kimi-K2.6",
  "status": "running",
  "input": {
    "title": "Dashboard is blank after login",
    "bodyPreview": "Steps: log in, open /dashboard. Expected widgets. Actual blank white screen in Chrome 126. See [redacted] and contact [redacted].",
    "bodyCharacters": 166
  }
}

ここでは本文そのものではなく、redaction済みのbodyPreviewと文字数だけを送っています。 この粒度でも、どのIssue入力がどのrunIdに対応したかは追えます。

伏せ字化したLangfuse payload JSONL

raw observeログには生データが残る

一方、FLUE_OBSERVE_JSONL=logs/flue-observe-events.jsonlで保存したrawイベントには、元のIssue本文が入ります。 これはFlueが悪いというより、observe(...)がアプリ内の実行イベントをそのまま見る仕組みだからです。

たとえばrun_startには、Workflow payloadがそのまま入ります。

{
  "type": "run_start",
  "runId": "run_01KVNW45DM0EGDHVA92D86V935",
  "workflowName": "triage-issue",
  "payload": {
    "title": "Dashboard is blank after login",
    "body": "Steps: ... See https://internal.example.test/ticket/123 and contact alice@example.com."
  }
}

さらにturn_requestmessage_endには、モデルに渡した入力文も入ります。 つまり、外部送信用のredactionを入れても、rawイベントのローカル保存先には機微情報が残りえます。

今回の教訓は単純で、観測ログは「後から見られる便利な記録」ではなく、入力データのコピーです。 Issue本文を扱うなら、rawログの保存先、保持期間、git管理対象外にすることまで含めて決める必要があります。

raw observeログには元のIssue本文が残ることを確認したターミナル

成功ケースはoperationとrun_endで追える

今回の成功ケースでは、operationイベントにstructured outputとusageが入りました。

{
  "eventType": "operation",
  "runId": "run_01KVNW45DM0EGDHVA92D86V935",
  "status": "success",
  "output": {
    "data": {
      "severity": "high",
      "reproducible": true,
      "labels": ["bug", "dashboard", "frontend"],
      "summary": "ログイン後に `/dashboard` を開くと..."
    },
    "usage": {
      "input": 258,
      "output": 958,
      "cacheRead": 2368,
      "totalTokens": 3584
    }
  }
}

run_endにも最終結果が残ります。 Langfuse側では、同じtraceIdの中にrun_startturn_requestoperationrun_endを並べれば、入力の概要、モデル、usage、最終結果を後から見直せます。

実際にLangfuse側で見ると、traceの中にモデル名、イベント種別、triage結果がまとまります。ターミナルの一時ログだけを見る場合と違い、後から「このrunではどのモデルが使われ、どのstructured outputが返ったか」を画面上で確認できます。

Langfuse上でモデル名とtriage結果が確認できる成功trace

Langfuseへ実送信する

npm run triage:langfuseは、dry-runではなくFLUE_LANGFUSE_DRY_RUN=0で同じWorkflowを実行します。 送信に成功すると、ターミナルには次のようなログが出ます。

[langfuse] ingested run_start flue-run_...
[langfuse] ingested operation flue-run_...
[langfuse] ingested run_end flue-run_...

失敗した場合は、[langfuse] ingestion failedとしてHTTP statusとレスポンス本文を出すようにしました。 ここが出た場合は、LangfuseのAPIキー、base URL、ingestion payloadの形式を疑います。

Langfuseへingestedログが出ているターミナル

finishに到達しない失敗もtraceで追う

もう一つ見たかったのは、成功ケースだけではありません。 前回のIssueトリアージ検証では、モデルによってはFlueが期待するfinish tool callに到達できず、Agentが同じような思考を繰り返すケースがありました。

今回も失敗ケースでは、turn_requestturnはLangfuseへ送られ続けている一方で、最終的にはWorkflowが次のエラーで止まりました。

Workflow failed
The agent gave up: Agent did not call `finish` or `give_up` after 33 attempts.

ここで重要なのは、モデルが「何もしていない」わけではないことです。 traceを見ると、モデルは何度もactivate_skillfinishに言及していますが、Flueの実行プロトコルとして必要なtool callには到達できていません。 自然文としてはそれらしいことを言っているのに、Agentフレームワークの完了条件を満たせない。 これは、単発のエラーログだけでは見落としやすい差です。

finish tool callに到達できずWorkflowが失敗したケースのtrace

Flueだけか、Langfuseを足すかではない

ここまで触ると、Flueのobserve(...)だけでもかなり見えることが分かります。 runId、event type、operation、turn、usage、structured outputはFlue側で拾えます。

ただ、LLMOpsとして後から失敗ケースを比較したい場合、外部の観測基盤に載せる意味があります。 たとえば、同じIssue payloadをgpt-ossと別モデルで流し、片方はfinishに失敗し、片方はstructured outputまで完了したとします。 この差を単発ログで読むより、traceとして並べた方が、どこで崩れたかを追いやすくなります。

なので、「Flue標準で見えるから外部Observabilityは不要」でも、「Langfuseを使うからFlueのobserveは不要」でもありません。 Flueのobserve(...)は実行イベントを見る土台で、Langfuseは失敗ケースを比較・評価・改善サイクルへ載せる場所。 この役割分担で考えるのが、IssueトリアージAgentのような実務寄りのAgentには合っていそうです。

DUOps

Author

DUOps(デュオプス)

LLMOps、Agent、MCP、Langfuse、Cloudflare 周辺の実装と運用を、個人で試しながら記録しています。

Xを見る

Related