AWS Bedrock

Just want to use the integration? If you only need to plug an AWS Bedrock AgentCore Harness into AI GO!, you can use the ready-made integration directly — see the full integration in our registry and GitHub repo: <REGISTRY_URL> / <GITHUB_URL>. This tutorial walks through how that integration is built so you can adapt it to your own stateful agent.

This tutorial demonstrates how to connect an AWS Bedrock AgentCore Harness to AI GO! as a custom-inference model, so it can be evaluated like any other model — while preserving conversation state across turns.

The hard part of integrating a stateful agent is not calling the endpoint once; it is making multi-turn conversations work. AgentCore keeps conversation state server-side in a per-session microVM keyed by runtimeSessionId, and AI GO! drives the conversation one user turn at a time. The integration must keep both sides in sync so that turn 3 remembers what happened in turns 1 and 2.

This pattern applies to any stateful agent runtime — AgentCore, a custom agent server, or a hosted assistant API — where the conversation lives behind a session identifier and each call returns a multi-step trace.

What you will build

A complete custom-inference model integration that:

  1. Forwards each user turn to a Harness via bedrock-agentcore.invoke_harness (boto3)
  2. Maintains conversation continuity across turns by round-tripping AgentCore's runtimeSessionId
  3. Returns the full per-turn agent trace — tool calls, tool outputs, and the final reply — in AI GO!'s Open Responses format, ready for trace-aware scorers

By the end, you will have a model you can register, test, and point any multi-turn evaluation at.


Step 1: Understand the integration

The multi-turn challenge

AI GO! sends a chat-completion request for every turn. The body contains the whole conversation so far; the last entry is the current user turn:

{
  "messages": [
    { "role": "user", "content": "Hi, can you help me see my orders?" },
    { "role": "assistant", "content": "Sure! What's your email and order ID?",
      "session_id": "550e8400-e29b-41d4-a716-446655440000" },
    { "role": "user", "content": "[email protected], order ORD-1001" }
  ]
}

AgentCore, however, does not want the whole history replayed — it already has it stored in the session microVM. It only wants the new user message, plus the runtimeSessionId that identifies the session.

The mechanism for keeping both sides in sync is passing the session id through the conversation itself. AI GO! messages allow extra fields, and any custom field we set on an assistant message is echoed back unchanged on the next turn — this is the supported way to carry custom data between requests. So on each response we attach the session_id to the assistant message; on the next request we read it back and reuse the same session. The first turn (no prior assistant message) mints a fresh session id.

Session id length. AgentCore requires runtimeSessionId to be at least 33 characters. A uuid4 string is 36, so we mint one with str(uuid.uuid4()).

Example request bodies (AI GO! → run_inference)

First turn — just the new user message:

{
  "messages": [
    { "role": "user", "content": "Search the catalog for shoes." }
  ]
}

Subsequent turns echo the prior assistant message with its session_id, so we keep using the same Harness session:

{
  "messages": [
    { "role": "user", "content": "Hello!" },
    { "role": "assistant", "content": "Hello! How can I assist you today?",
      "session_id": "550e8400-e29b-41d4-a716-446655440000" },
    { "role": "user", "content": "What is my session id?" }
  ]
}

The three-stage pipeline

A single inference runs as a three-stage pipeline:

ChatCompletionInput  ->  ModelInput  ->  RawModelOutput  ->  OpenResponsesModelOutput
   (from AI GO!)         convert_        query_model        convert_model_output
                         user_input

run_inference ties the stages together: parse the request, convert it to what the Harness invocation needs, drive the agent, and convert the resulting event stream back into the Open Responses format AI GO! expects.

The output shape

invoke_harness returns a streaming Converse-style event streammessageStart / contentBlockStart / contentBlockDelta / contentBlockStop / messageStop / metadata. We accumulate it and translate the finalised content into Open Responses trace items:

TypeDescription
messageA text message (here, the assistant's reply)
function_callA tool the agent invoked (name, arguments)
function_call_outputThe result of a tool call, linked by call_id

Emitting these items (rather than just the final text) is what lets downstream trace-aware scorers inspect how the agent reached its answer.


Step 2: Build the inference handler

The integration lives in a single Python file, run_inference.py, that defines a run_inference(body, environment) function. AI GO! calls it once per turn.

The entry point

run_inference is the function AI GO! calls. It wires the three stages together — parse and convert the request, invoke the Harness, and convert the resulting events back into the Open Responses format AI GO! expects:

def run_inference(body: str, environment: dict[str, Any]) -> str:
    model_input = convert_user_input(
        ChatCompletionInput.model_validate(json.loads(body))
    )
    raw = query_model(model_input, environment)
    model_output = convert_model_output(raw)
    return model_output.model_dump_json()

The rest of this step implements each stage in turn.

Model-side types

We model the two intermediate payloads explicitly so the data flow stays legible:

class ModelInput(BaseModel):
    """Request payload the Harness invocation needs."""
    session_id: str = ""        # empty on the first turn, reused afterwards
    user_message: str


class RawModelOutput(BaseModel):
    """The collected Converse event stream, plus the session id we used."""
    session_id: str
    events: list

Stage 1 — convert_user_input

This is where multi-turn continuity is established. We pull the latest user message and recover the session_id echoed by the previous assistant turn:

def convert_user_input(data: ChatCompletionInput) -> ModelInput:
    messages = data.messages
    last_user = next(m for m in reversed(messages) if m.role == "user")

    session_id = ""
    for msg in reversed(messages):
        if msg.role == "assistant":
            session_id = getattr(msg, "session_id", "") or ""
            break

    return ModelInput(
        session_id=session_id, user_message=_message_text(last_user.content)
    )

On the first turn there is no prior assistant message, so session_id stays empty — query_model mints a new one.

Stage 2 — query_model

Mint a session id if there isn't one, invoke the Harness, and collect the event stream:

def query_model(model_input: ModelInput, environment: dict[str, Any]) -> RawModelOutput:
    client = boto3.client(
        "bedrock-agentcore",
        region_name=environment.get("AWS_REGION") or "eu-central-1",
        aws_access_key_id=environment["AWS_ACCESS_KEY_ID"],
        aws_secret_access_key=environment["AWS_SECRET_ACCESS_KEY"],
        aws_session_token=environment.get("AWS_SESSION_TOKEN") or None,
    )

    session_id = model_input.session_id or _new_session_id()

    response = client.invoke_harness(
        harnessArn=environment["AWS_HARNESS_ARN"],
        runtimeSessionId=session_id,
        messages=[{"role": "user", "content": [{"text": model_input.user_message}]}],
    )

    events: list[dict[str, Any]] = list(response["stream"])
    return RawModelOutput(session_id=session_id, events=events)

We send only the new user message — AgentCore appends it to the session it already holds. Credentials are taken only from environment (the explicit access key), so the adapter never depends on ~/.aws/credentials, aws sso login, or instance metadata being present in the runtime image.

Example raw model output (Harness → query_model)

A Converse stream interleaves multiple messages per turn (assistant with toolUse → user with toolResult → assistant final), and the toolUse.input / toolResult arrive in deltas that must be reassembled:

{
  "session_id": "550e8400-e29b-41d4-a716-446655440000",
  "events": [
    { "messageStart": { "role": "assistant" } },
    { "contentBlockStart": { "contentBlockIndex": 0,
        "start": { "toolUse": { "toolUseId": "tooluse_01",
                                "name": "search_products" } } } },
    { "contentBlockDelta": { "contentBlockIndex": 0,
        "delta": { "toolUse": { "input": "{\"query\": " } } } },
    { "contentBlockDelta": { "contentBlockIndex": 0,
        "delta": { "toolUse": { "input": "\"shoes\"}" } } } },
    { "contentBlockStop": { "contentBlockIndex": 0 } },
    { "messageStop": { "stopReason": "tool_use" } },
    { "messageStart": { "role": "user" } },
    { "contentBlockStart": { "contentBlockIndex": 0,
        "start": { "toolResult": { "toolUseId": "tooluse_01",
                                   "status": "success" } } } },
    { "contentBlockDelta": { "contentBlockIndex": 0,
        "delta": { "toolResult": [{ "text": "[]" }] } } },
    { "contentBlockStop": { "contentBlockIndex": 0 } },
    { "messageStop": { "stopReason": "end_turn" } },
    { "messageStart": { "role": "assistant" } },
    { "contentBlockDelta": { "contentBlockIndex": 0,
        "delta": { "text": "I couldn't find any shoes..." } } },
    { "contentBlockStop": { "contentBlockIndex": 0 } },
    { "messageStop": { "stopReason": "end_turn" } },
    { "metadata": { "usage": { "inputTokens": 201, "outputTokens": 22 } } }
  ]
}

Stage 3 — convert_model_output

The conversion is a one-liner — all of the work lives in the converter, which consumes the event stream directly:

def convert_model_output(raw: RawModelOutput) -> OpenResponsesModelOutput:
    return OpenResponsesConverter().build(raw.events, raw.session_id)

The Open Responses converter

Because the stream is delta-based, build first accumulates the events into finalised blocks (reassembling chunked toolUse.input, joining text, tracking interleaved messages by index), then constructs the Open Responses items: each tool-use block becomes a function_call, each tool-result block a function_call_output, and the assistant text is joined into a single reply that carries the session_id:

class OpenResponsesConverter:
    def build(
        self, events: list[dict[str, Any]], session_id: str = "", **kwargs: Any
    ) -> OpenResponsesModelOutput:
        blocks, usage = self._accumulate_stream(events)

        items: list[TraceItem] = []
        answer_chunks: list[str] = []
        for block in blocks:
            kind = block["kind"]
            if kind == "tool_use":
                items.append(self.build_function_call(block))
            elif kind == "tool_result":
                items.append(self.build_function_call_output(block))
            elif kind == "text":
                answer_chunks.append(block["text"])

        items.append(self.build_assistant_message("".join(answer_chunks), session_id))

        return OpenResponsesModelOutput(items=items, usage=self.build_usage(usage))

The per-item builders construct each trace item. The session_id to round-trip is attached to the assistant message as an extra field:

    def build_function_call(self, block: dict[str, Any]) -> FunctionCall:
        return FunctionCall(
            id=str(uuid.uuid4()),
            call_id=block["tool_use_id"],
            name=block["name"],
            arguments=block.get("input_json") or "{}",
            status=FunctionCallStatus.completed,
        )

    def build_function_call_output(self, block: dict[str, Any]) -> FunctionCallOutput:
        return FunctionCallOutput(
            id=str(uuid.uuid4()),
            call_id=block["tool_use_id"],
            output=block.get("output") or "",
            status=FunctionCallOutputStatusEnum.completed,
        )

    def build_assistant_message(self, text: str, session_id: str) -> Message:
        return Message(
            id=str(uuid.uuid4()),
            status=MessageStatus.completed,
            role=MessageRole.assistant,
            content=[OutputTextContent(text=text, annotations=[])],
            # Carried as an extra field so the next turn can reuse the session.
            session_id=session_id,
        )
Accumulating the Converse stream (_accumulate_stream)

The stream interleaves multiple messages per turn, each with content blocks indexed by contentBlockIndex, and the toolUse.input / toolResult / text arrive in deltas. We keep a small state machine keyed by (message_index, content_block_index), finalising each block on contentBlockStop into one of three normalized shapes:

  @classmethod
  def _accumulate_stream(
      cls, events: list[dict[str, Any]]
  ) -> tuple[list[dict[str, Any]], dict[str, int]]:
      blocks: list[dict[str, Any]] = []
      usage: dict[str, int] = {"num_prompt_tokens": 0, "num_completion_tokens": 0}

      message_index = -1
      current_role = "assistant"
      active: dict[tuple[int, int], dict[str, Any]] = {}

      def _finalise(msg_idx: int, block_idx: int) -> None:
          state = active.pop((msg_idx, block_idx), None)
          if state is None:
              return
          kind = state.get("kind")
          if kind == "text":
              text = state.get("text") or ""
              if text and state.get("role") == "assistant":
                  blocks.append({"kind": "text", "role": "assistant", "text": text})
          elif kind == "tool_use":
              blocks.append(
                  {
                      "kind": "tool_use",
                      "tool_use_id": state.get("tool_use_id") or "",
                      "name": state.get("name") or "",
                      "input_json": state.get("input_json") or "",
                  }
              )
          elif kind == "tool_result":
              blocks.append(
                  {
                      "kind": "tool_result",
                      "tool_use_id": state.get("tool_use_id") or "",
                      "output": cls._stringify_tool_result(
                          state.get("output_chunks") or []
                      ),
                      "status": state.get("status") or "success",
                  }
              )

      for event in events:
          if "messageStart" in event:
              message_index += 1
              current_role = event["messageStart"].get("role") or "assistant"
              continue

          if "contentBlockStart" in event:
              payload = event["contentBlockStart"]
              block_idx = int(payload.get("contentBlockIndex") or 0)
              start = payload.get("start") or {}
              tool_use = start.get("toolUse")
              tool_result = start.get("toolResult")
              if tool_use:
                  active[(message_index, block_idx)] = {
                      "kind": "tool_use",
                      "tool_use_id": tool_use.get("toolUseId") or "",
                      "name": tool_use.get("name") or "",
                      "input_json": "",
                  }
              elif tool_result:
                  active[(message_index, block_idx)] = {
                      "kind": "tool_result",
                      "tool_use_id": tool_result.get("toolUseId") or "",
                      "status": tool_result.get("status") or "success",
                      "output_chunks": [],
                  }
              else:
                  active[(message_index, block_idx)] = {
                      "kind": None,
                      "role": current_role,
                  }
              continue

          if "contentBlockDelta" in event:
              payload = event["contentBlockDelta"]
              block_idx = int(payload.get("contentBlockIndex") or 0)
              delta = payload.get("delta") or {}
              state = active.get((message_index, block_idx))
              if state is None:
                  state = {"kind": None, "role": current_role}
                  active[(message_index, block_idx)] = state

              if "text" in delta and delta["text"]:
                  if state.get("kind") is None:
                      state["kind"] = "text"
                      state["text"] = ""
                  state["text"] = (state.get("text") or "") + delta["text"]
              elif "toolUse" in delta:
                  tu = delta["toolUse"] or {}
                  state["kind"] = "tool_use"
                  state["input_json"] = (state.get("input_json") or "") + (
                      tu.get("input") or ""
                  )
              elif "toolResult" in delta:
                  state["kind"] = "tool_result"
                  state.setdefault("output_chunks", []).extend(
                      delta["toolResult"] or []
                  )
              continue

          if "contentBlockStop" in event:
              payload = event["contentBlockStop"]
              block_idx = int(payload.get("contentBlockIndex") or 0)
              _finalise(message_index, block_idx)
              continue

          if "metadata" in event:
              metadata_usage = event["metadata"].get("usage") or {}
              usage["num_prompt_tokens"] += int(
                  metadata_usage.get("inputTokens") or 0
              )
              usage["num_completion_tokens"] += int(
                  metadata_usage.get("outputTokens") or 0
              )
              continue

      # Flush any blocks left open by truncated streams so we don't lose content.
      for msg_idx, block_idx in list(active.keys()):
          _finalise(msg_idx, block_idx)

      return blocks, usage

Step 3: Wire up the model

Three small files connect the snippet to AI GO!.

model.yaml

The model uses connection_type: custom_inference with the identity chat-completion adapter (the snippet already returns Open Responses, so no adapter transform is needed). The region and Harness ARN are plain config; the AWS credentials are stored as server-side secrets:

display_name: "Customer Support Agent (AWS Bedrock AgentCore)"
key: "customer-support-agent-aws-bedrock"
description: >-
  A customer-support agent deployed as an AWS Bedrock AgentCore Harness.
rate_limit: 15
task: "chat_completion"
config:
  connection_type: "custom_inference"
  adapter:
    key: "latticeflow$identity_chat_completion"
  run_inference_snippet: !include "./run_inference.py"
  environment:
    AWS_REGION: $AWS_REGION
    AWS_HARNESS_ARN: $AWS_HARNESS_ARN
    AWS_ACCESS_KEY_ID: "<< secrets.AWS_ACCESS_KEY_ID >>"
    AWS_SECRET_ACCESS_KEY: "<< secrets.AWS_SECRET_ACCESS_KEY >>"
  timeout: 120
secrets:
  AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
  AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
  • !include "./run_inference.py" inlines the snippet at registration time.
  • << secrets.* >> references server-side secrets; the secrets block uploads them from your .env.
  • rate_limit: 15 keeps concurrency moderate — lower it if you see throttling or timeouts.

app.yaml

The model needs an app to live in:

display_name: "AWS Bedrock App"
key: "aws-bedrock-app"
tags: ["Agents", "AWS"]
description: >
  Custom-inference integration for an AWS Bedrock AgentCore Harness, exposing
  it as an AI GO! model with Open Responses traces.

.env

AWS_REGION=eu-central-1
AWS_HARNESS_ARN=arn:aws:bedrock-agentcore:<region>:<account>:harness/<name>
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=...

Step 4: Register and test

# Create and switch to the app
lf add app -f app.yaml
lf switch aws-bedrock-app

# Register the model (uploads the secrets and inlines run_inference.py)
lf add model -f model.yaml

# Verify the Harness is reachable and returns well-formed Open Responses
lf test model customer-support-agent-aws-bedrock

lf test model sends a single "Hello!" turn and shows each pipeline stage. A successful run ends with the parsed Open Responses output:

3. Running inference.
   Status code: 200
4. Transforming model output.
   {"items":[{"id":"...","status":"completed","role":"assistant",
              "content":[{"text":"Hello! How can I assist you today?","annotations":[]}],
              "session_id":"62c80e9a-..."}],
    "usage":{"num_completion_tokens":11,"num_prompt_tokens":446}}
   ...
   output="items=[Message(type='message', ..., role=<MessageRole.assistant>,
            content=[OutputTextContent(text='Hello! How can I assist you today?', ...)],
            session_id='62c80e9a-...')] usage=ModelUsage(...)"
Successfully tested configuration of model with key 'customer-support-agent-aws-bedrock'.

Two things confirm the integration is correct:

  • The assistant reply parses as a Message with role=assistant.
  • A session_id is present on the message — the value the next turn reads back to continue the same Harness session.

Your model is now registered and can be pointed at any multi-turn evaluation.


Adapting this pattern

The integration generalizes to any stateful agent runtime.

Different agent platforms

Swap the boto3 call in query_model for your platform's API and adjust the converter to read its output shape. Keep the contract identical: send only the new user turn + a session id, return the current turn's trace items.

Single-turn agents

If your agent is stateless, drop the session_id machinery entirely: convert_user_input returns just the user message, query_model makes one call, and build_assistant_message omits the session_id extra field.

Carrying state other than a session id

This echo mechanism works for any opaque state. Whatever you attach to the assistant message (a session token, a cursor, a serialized memory blob) comes back on the next request — read it in convert_user_input and forward it to your endpoint.

Plugging into an evaluation

Because the snippet emits full traces (tool calls, outputs, and replies), the model drops straight into trace-aware evaluations — multi-turn solvers, function-call-coverage scorers, or model-as-a-judge scorers over open_responses traces. Point a task_specification at this model key and run lf run -f run.yaml.