Dify

Just want to use the integration? If you only need to plug a Dify-hosted agent into AI GO!, you can use the ready-made integration directly — see the full integration in our registry and GitHub repo: <REGISTRY_URL> / <GITHUB_URL>. This tutorial walks through how that integration is built so you can adapt it to your own stateful agent.

This tutorial demonstrates how to connect a Dify agent-chat app to AI GO! as a custom-inference model, so it can be evaluated like any other model — while preserving conversation state across turns.

The hard part of integrating a stateful agent is not calling the endpoint once; it is making multi-turn conversations work. Dify keeps conversation state server-side, identified by a conversation_id and scoped to a user, and AI GO! drives the conversation one user turn at a time. The integration must keep both sides in sync so that turn 3 remembers what happened in turns 1 and 2.

This pattern applies to any stateful agent runtime — Dify, a custom agent server, or a hosted assistant API — where the conversation lives behind a session/conversation identifier and each call returns a multi-step trace.

What you will build

A complete custom-inference model integration that:

  1. Forwards each user turn to a Dify deployment over HTTP (/v1/chat-messages, streaming SSE)
  2. Maintains conversation continuity across turns by round-tripping Dify's conversation_id and the per-conversation user
  3. Returns the full per-turn agent trace — tool calls, tool outputs, and the final reply — in AI GO!'s Open Responses format, ready for trace-aware scorers

By the end, you will have a model you can register, test, and point any multi-turn evaluation at.


Step 1: Understand the integration

The multi-turn challenge

AI GO! sends a chat-completion request for every turn. The body contains the whole conversation so far; the last entry is the current user turn:

{
  "messages": [
    { "role": "user", "content": "Hi, can you help me see my orders?" },
    { "role": "assistant", "content": "Sure! What's your email and order ID?",
      "conversation_id": "cdb01cc3-3754-4f36-91b9-df643506a982",
      "dify_user": "latticeflow-5fa47d159030" },
    { "role": "user", "content": "[email protected], order ORD-1001" }
  ]
}

Dify, however, does not want the whole history replayed — it already has it stored against the conversation_id. It only wants the new user message, plus the conversation_id that identifies the conversation and the user it is scoped to.

The mechanism for keeping both sides in sync is passing this state through the conversation itself. AI GO! messages allow extra fields, and any custom field we set on an assistant message is echoed back unchanged on the next turn — this is the supported way to carry custom data between requests. So on each response we attach the conversation_id and the user to the assistant message; on the next request we read them back and reuse the same Dify conversation.

Why round-trip the user, not just the conversation_id? Dify scopes per-conversation state (memory, agent thoughts) to the user field. Every turn of the same thread must send the same user. We mint a fresh latticeflow-<hex> id on turn 1 and reuse it afterwards; a hardcoded shared user would pin every conversation to the same Dify-side bucket regardless of which thread it belongs to. The first turn (no prior assistant message) mints a new user and leaves conversation_id empty, which tells Dify to start a fresh conversation.

Example request bodies (AI GO! → run_inference)

First turn — just the new user message:

{
  "messages": [
    { "role": "user", "content": "Search the catalog for shoes." }
  ]
}

Subsequent turns echo the prior assistant message with its conversation_id and dify_user, so we keep using the same Dify conversation:

{
  "messages": [
    { "role": "user", "content": "Hello!" },
    { "role": "assistant", "content": "Hello! How can I assist you today?",
      "conversation_id": "a12690e1-cda8-4bf2-acfc-b588d6846425",
      "dify_user": "latticeflow-dc4f1f002667" },
    { "role": "user", "content": "What is my conversation id?" }
  ]
}

The three-stage pipeline

A single inference runs as a three-stage pipeline:

ChatCompletionInput  ->  ModelInput  ->  RawModelOutput  ->  OpenResponsesModelOutput
   (from AI GO!)         convert_        query_model        convert_model_output
                         user_input

run_inference ties the stages together: parse the request, convert it to what Dify accepts, call the endpoint, and convert the raw response back into the Open Responses format AI GO! expects.

The output shape

Dify's /v1/chat-messages endpoint streams Server-Sent Events rather than returning role-tagged messages. We collapse the event stream and translate it into Open Responses trace items:

TypeDescription
messageA text message (here, the assistant's reply)
function_callA tool the agent invoked (name, arguments)
function_call_outputThe result of a tool call, linked by call_id

Emitting these items (rather than just the final text) is what lets downstream trace-aware scorers inspect how the agent reached its answer.


Step 2: Build the inference handler

The integration lives in a single Python file, run_inference.py, that defines a run_inference(body, environment) function. AI GO! calls it once per turn.

The entry point

run_inference is the function AI GO! calls. It wires the three stages together — parse and convert the request, query the Dify endpoint, and convert the raw response back into the Open Responses format AI GO! expects:

def run_inference(body: str, environment: dict[str, Any]) -> str:
    model_input = convert_user_input(
        ChatCompletionInput.model_validate(json.loads(body))
    )
    raw = query_model(model_input, environment)
    model_output = convert_model_output(raw)
    return model_output.model_dump_json()

The rest of this step implements each stage in turn.

Model-side types

We model the two intermediate payloads explicitly so the data flow stays legible:

class ModelInput(BaseModel):
    """Request payload the Dify `/v1/chat-messages` endpoint accepts."""
    query: str
    user: str
    conversation_id: str = ""        # empty on the first turn, reused afterwards
    inputs: dict = Field(default_factory=dict)
    response_mode: str = "streaming"


class RawModelOutput(BaseModel):
    """The parsed SSE events, plus the user id we sent (to echo back)."""
    events: list
    dify_user: str

Stage 1 — convert_user_input

This is where multi-turn continuity is established. We pull the latest user message and recover the conversation_id and user echoed by the previous assistant turn:

def convert_user_input(data: ChatCompletionInput) -> ModelInput:
    messages = data.messages
    last_user = next(m for m in reversed(messages) if m.role == "user")

    conversation_id = ""
    user = ""
    for msg in reversed(messages):
        if msg.role == "assistant":
            conversation_id = getattr(msg, "conversation_id", "") or ""
            user = getattr(msg, "dify_user", "") or ""
            break
    if not user:
        user = f"latticeflow-{uuid.uuid4().hex[:12]}"

    return ModelInput(
        query=_message_text(last_user.content),
        user=user,
        conversation_id=conversation_id,
    )

On the first turn there is no prior assistant message, so a fresh user is minted and conversation_id stays empty — signalling that Dify should start a new conversation.

Stage 2 — query_model

POST to the chat-messages endpoint and consume the SSE stream, collecting each data: event:

def query_model(model_input: ModelInput, environment: dict[str, Any]) -> RawModelOutput:
    url = environment["DIFY_URL"].rstrip("/") + "/v1/chat-messages"
    api_key = environment["DIFY_API_KEY"]

    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
        "Accept": "text/event-stream",
    }

    events: list[dict[str, Any]] = []
    with httpx.Client(timeout=120) as client:
        with client.stream(
            "POST", url, headers=headers, json=model_input.model_dump()
        ) as response:
            response.raise_for_status()
            for line in response.iter_lines():
                if not line or not line.startswith("data:"):
                    continue
                payload = line[len("data:") :].strip()
                if not payload:
                    continue
                events.append(json.loads(payload))

    return RawModelOutput(events=events, dify_user=model_input.user)

We send only the new query — Dify appends it to the conversation it already holds.

Example raw model output (agent API → query_model)

Dify streams SSE events. query_model returns the parsed event list plus the user we sent, so convert_model_output can echo it back:

{
  "dify_user": "latticeflow-5fa47d159030",
  "events": [
    { "event": "agent_thought", "id": "239b...", "position": 1,
      "tool": "search_products",
      "tool_input": "{\"search_products\": {\"query\": \"shoes\"}}",
      "observation": "{\"search_products\": \"... result=[] ...\"}" },
    { "event": "agent_message", "answer": "It looks " },
    { "event": "agent_message", "answer": "like there are no products..." },
    { "event": "message_end",
      "conversation_id": "cdb01cc3-3754-4f36-91b9-df643506a982",
      "metadata": { "usage": { "prompt_tokens": 407, "completion_tokens": 51 } } }
  ]
}

Note that Dify emits the same agent_thought id across several partial events (tool name, input, then observation arrive separately) and streams the reply as a series of agent_message chunks.

Stage 3 — convert_model_output

The conversion is a one-liner — all of the work lives in the converter, which consumes the raw event stream directly:

def convert_model_output(raw: RawModelOutput) -> OpenResponsesModelOutput:
    return OpenResponsesConverter().build(raw.events, raw.dify_user)

The Open Responses converter

Because Dify streams partial, repeated events rather than role-tagged messages, build first collapses the stream (agent thoughts merged by id, answer chunks joined, usage read from message_end) and then constructs the Open Responses items in one pass: each completed tool thought becomes a function_call + function_call_output pair, and the joined answer becomes a single assistant message:

class OpenResponsesConverter:
    def build(
        self,
        events: list[dict[str, Any]],
        dify_user: str = "",
        **kwargs: Any,
    ) -> OpenResponsesModelOutput:
        thoughts: dict[str, dict[str, Any]] = {}
        thought_order: list[str] = []
        answer_chunks: list[str] = []
        conversation_id = ""
        usage = {"num_prompt_tokens": 0, "num_completion_tokens": 0}

        for event in events:
            event_type = event.get("event")
            if event_type == "agent_thought":
                tid = event["id"]
                if tid not in thoughts:
                    thoughts[tid] = {}
                    thought_order.append(tid)
                thoughts[tid].update(
                    {
                        "tool": event.get("tool", "") or thoughts[tid].get("tool", ""),
                        "tool_input": event.get("tool_input", "")
                        or thoughts[tid].get("tool_input", ""),
                        "observation": event.get("observation", "")
                        or thoughts[tid].get("observation", ""),
                    }
                )
            elif event_type in ("agent_message", "message", "message_replace"):
                answer_chunks.append(event.get("answer", ""))
            elif event_type == "message_end":
                conversation_id = event.get("conversation_id", "") or conversation_id
                metadata_usage = (event.get("metadata") or {}).get("usage") or {}
                usage = {
                    "num_prompt_tokens": metadata_usage.get("prompt_tokens", 0),
                    "num_completion_tokens": metadata_usage.get("completion_tokens", 0),
                }

        items: list[TraceItem] = []
        for tid in thought_order:
            t = thoughts[tid]
            if not t.get("tool"):
                continue
            call_id = str(uuid.uuid4())
            items.append(
                self.build_function_call(t["tool"], t.get("tool_input", ""), call_id)
            )
            items.append(
                self.build_function_call_output(t.get("observation", ""), call_id)
            )

        items.append(
            self.build_assistant_message(
                "".join(answer_chunks), conversation_id, dify_user
            )
        )

        return OpenResponsesModelOutput(
            items=items,
            usage=self.build_usage(
                usage["num_prompt_tokens"], usage["num_completion_tokens"]
            ),
        )

The per-item builders construct each trace item. The conversation_id and dify_user to round-trip are attached to the assistant message as extra fields:

    def build_function_call(
        self, tool: str, tool_input: str, call_id: str
    ) -> FunctionCall:
        return FunctionCall(
            id=str(uuid.uuid4()),
            call_id=call_id,
            name=tool,
            arguments=self._extract_tool_arguments(tool, tool_input),
            status=FunctionCallStatus.completed,
        )

    def build_function_call_output(
        self, observation: str, call_id: str
    ) -> FunctionCallOutput:
        return FunctionCallOutput(
            id=str(uuid.uuid4()),
            call_id=call_id,
            output=observation,
            status=FunctionCallOutputStatusEnum.completed,
        )

    def build_assistant_message(
        self, text: str, conversation_id: str, dify_user: str
    ) -> Message:
        return Message(
            id=str(uuid.uuid4()),
            status=MessageStatus.completed,
            role=MessageRole.assistant,
            content=[OutputTextContent(text=text, annotations=[])],
            # Carried as extra fields so the next turn can reuse the conversation.
            conversation_id=conversation_id,
            dify_user=dify_user,
        )
Unwrapping Dify tool arguments (_extract_tool_arguments)

Dify wraps tool arguments as {tool_name: {...}}. We unwrap that envelope when present so the emitted function_call.arguments match the tool's actual schema, and pass anything else through unchanged:

  @staticmethod
  def _extract_tool_arguments(tool_name: str, tool_input: str) -> str:
      if not tool_input:
          return "{}"
      try:
          parsed = json.loads(tool_input)
      except json.JSONDecodeError:
          return tool_input
      if (
          isinstance(parsed, dict)
          and tool_name in parsed
          and isinstance(parsed[tool_name], dict)
      ):
          return json.dumps(parsed[tool_name])
      return json.dumps(parsed) if isinstance(parsed, dict | list) else tool_input
Example Open Responses output (convert_model_output)

The converted result returned to AI GO! — tool call, tool output, and the final assistant message carrying the conversation_id and dify_user for the next turn:

{
  "items": [
    { "type": "function_call", "id": "5bf9...", "call_id": "97eb...",
      "name": "search_products", "arguments": "{\"query\": \"shoes\"}",
      "status": "completed" },
    { "type": "function_call_output", "id": "9da4...", "call_id": "97eb...",
      "output": "{\"search_products\": \"... result=[] ...\"}",
      "status": "completed" },
    { "type": "message", "id": "44c2...", "status": "completed",
      "role": "assistant",
      "content": [{ "type": "output_text",
                    "text": "It looks like there are no products...",
                    "annotations": [] }],
      "conversation_id": "cdb01cc3-3754-4f36-91b9-df643506a982",
      "dify_user": "latticeflow-5fa47d159030" }
  ],
  "usage": { "num_prompt_tokens": 407, "num_completion_tokens": 51 }
}

Step 3: Wire up the model

A few small files connect the snippet to AI GO!.

models/dify.yaml

The model uses connection_type: custom_inference (the snippet already returns Open Responses, so no adapter transform is needed). The endpoint and credentials are injected via environment:

key: "dify-model"
display_name: "Dify Model"
description: "Dify agent-chat app served via the /v1/chat-messages endpoint."
task: "chat_completion"
rate_limit: 5
config:
  connection_type: "custom_inference"
  run_inference_snippet: !include ../run_inference.py
  environment:
    DIFY_URL: $DIFY_URL
    DIFY_API_KEY: $DIFY_API_KEY
  timeout: 180
  • !include ../run_inference.py inlines the snippet at registration time (the spec lives under models/, the snippet at the integration root).
  • rate_limit: 5 keeps concurrency low — external streaming agent endpoints rarely tolerate high request volume. Lower it further if you see timeouts.

Tip — keep the API key out of plaintext. Instead of passing DIFY_API_KEY directly, you can store it as a server-side secret and reference it with << secrets.DIFY_API_KEY >>. See Use Secrets.

app.yaml

The model needs an app to live in:

display_name: "Dify App"
key: "dify-app"
tags: ["Agents", "Dify"]
description: >
  Custom-inference integration for a Dify agent-chat app, exposing its
  /v1/chat-messages endpoint as an AI GO! model with Open Responses traces.

run.yaml

A thin run config that references the model so it can be registered (and later reused by evaluations):

models:
  - $ref: "./models/dify.yaml"

.env

DIFY_URL=https://api.dify.ai
DIFY_API_KEY=app-...

Step 4: Register and test

# Create and switch to the app
lf add app -f app.yaml
lf switch dify-app

# Register the model (inlines run_inference.py)
lf add -f run.yaml

# Verify the endpoint is reachable and returns well-formed Open Responses
lf test model dify-model

lf test model sends a single "Hello!" turn and shows each pipeline stage. A successful run ends with the parsed Open Responses output:

3. Running inference.
   Status code: 200
4. Transforming model output.
   {"items":[{"id":"...","status":"completed","role":"assistant",
              "content":[{"text":"Hello! How can I help you today?","annotations":[]}],
              "conversation_id":"cdb01cc3-...","dify_user":"latticeflow-5fa47d159030"}],
    "usage":{"num_completion_tokens":12,"num_prompt_tokens":34}}
   ...
   output="items=[Message(type='message', ..., role=<MessageRole.assistant>,
            content=[OutputTextContent(text='Hello! How can I help you today?', ...)],
            conversation_id='cdb01cc3-...', dify_user='latticeflow-5fa47d159030')] usage=ModelUsage(...)"
Successfully tested configuration of model with key 'dify-model'.

Two things confirm the integration is correct:

  • The assistant reply parses as a Message with role=assistant.
  • A conversation_id and dify_user are present on the message — the values the next turn reads back to continue the same Dify conversation.

Your model is now registered and can be pointed at any multi-turn evaluation.


Adapting this pattern

The integration generalizes to any stateful agent runtime.

Different agent platforms

Swap the HTTP call in query_model for your platform's API and adjust the converter to read its message/event shape. Keep the contract identical: send only the new user turn + a session id, return the current turn's trace items.

Single-turn agents

If your agent is stateless, drop the continuity machinery entirely: convert_user_input returns just the user message, query_model makes one call, and build_assistant_message omits the conversation_id / dify_user extra fields.

Carrying state other than a conversation id

This echo mechanism works for any opaque state. Whatever you attach to the assistant message (a session token, a cursor, a serialized memory blob) comes back on the next request — read it in convert_user_input and forward it to your endpoint. Here we carry two values, conversation_id and dify_user, because Dify needs both to resume a conversation.

Plugging into an evaluation

Because the snippet emits full traces (tool calls, outputs, and replies), the model drops straight into trace-aware evaluations — multi-turn solvers, function-call-coverage scorers, or model-as-a-judge scorers over open_responses traces. Point a task_specification at this model key and run lf run -f run.yaml.