AWS Bedrock
Just want to use the integration? If you only need to plug an AWS Bedrock AgentCore Harness into AI GO!, you can use the ready-made integration directly — see the full integration in our registry and GitHub repo:
<REGISTRY_URL>/<GITHUB_URL>. This tutorial walks through how that integration is built so you can adapt it to your own stateful agent.
This tutorial demonstrates how to connect an AWS Bedrock AgentCore Harness to AI GO! as a custom-inference model, so it can be evaluated like any other model — while preserving conversation state across turns.
The hard part of integrating a stateful agent is not calling the endpoint once; it is making multi-turn conversations work. AgentCore keeps conversation state server-side in a per-session microVM keyed by runtimeSessionId, and AI GO! drives the conversation one user turn at a time. The integration must keep both sides in sync so that turn 3 remembers what happened in turns 1 and 2.
This pattern applies to any stateful agent runtime — AgentCore, a custom agent server, or a hosted assistant API — where the conversation lives behind a session identifier and each call returns a multi-step trace.
What you will build
A complete custom-inference model integration that:
- Forwards each user turn to a Harness via
bedrock-agentcore.invoke_harness(boto3) - Maintains conversation continuity across turns by round-tripping AgentCore's
runtimeSessionId - Returns the full per-turn agent trace — tool calls, tool outputs, and the final reply — in AI GO!'s Open Responses format, ready for trace-aware scorers
By the end, you will have a model you can register, test, and point any multi-turn evaluation at.
Step 1: Understand the integration
The multi-turn challenge
AI GO! sends a chat-completion request for every turn. The body contains the whole conversation so far; the last entry is the current user turn:
{
"messages": [
{ "role": "user", "content": "Hi, can you help me see my orders?" },
{ "role": "assistant", "content": "Sure! What's your email and order ID?",
"session_id": "550e8400-e29b-41d4-a716-446655440000" },
{ "role": "user", "content": "[email protected], order ORD-1001" }
]
}AgentCore, however, does not want the whole history replayed — it already has it stored in the session microVM. It only wants the new user message, plus the runtimeSessionId that identifies the session.
The mechanism for keeping both sides in sync is passing the session id through the conversation itself. AI GO! messages allow extra fields, and any custom field we set on an assistant message is echoed back unchanged on the next turn — this is the supported way to carry custom data between requests. So on each response we attach the session_id to the assistant message; on the next request we read it back and reuse the same session. The first turn (no prior assistant message) mints a fresh session id.
Session id length. AgentCore requires
runtimeSessionIdto be at least 33 characters. Auuid4string is 36, so we mint one withstr(uuid.uuid4()).
Example request bodies (AI GO! → run_inference)
First turn — just the new user message:
{
"messages": [
{ "role": "user", "content": "Search the catalog for shoes." }
]
}Subsequent turns echo the prior assistant message with its session_id, so we keep using the same Harness session:
{
"messages": [
{ "role": "user", "content": "Hello!" },
{ "role": "assistant", "content": "Hello! How can I assist you today?",
"session_id": "550e8400-e29b-41d4-a716-446655440000" },
{ "role": "user", "content": "What is my session id?" }
]
}The three-stage pipeline
A single inference runs as a three-stage pipeline:
ChatCompletionInput -> ModelInput -> RawModelOutput -> OpenResponsesModelOutput
(from AI GO!) convert_ query_model convert_model_output
user_input
run_inference ties the stages together: parse the request, convert it to what the Harness invocation needs, drive the agent, and convert the resulting event stream back into the Open Responses format AI GO! expects.
The output shape
invoke_harness returns a streaming Converse-style event stream — messageStart / contentBlockStart / contentBlockDelta / contentBlockStop / messageStop / metadata. We accumulate it and translate the finalised content into Open Responses trace items:
| Type | Description |
|---|---|
message | A text message (here, the assistant's reply) |
function_call | A tool the agent invoked (name, arguments) |
function_call_output | The result of a tool call, linked by call_id |
Emitting these items (rather than just the final text) is what lets downstream trace-aware scorers inspect how the agent reached its answer.
Step 2: Build the inference handler
The integration lives in a single Python file, run_inference.py, that defines a run_inference(body, environment) function. AI GO! calls it once per turn.
The entry point
run_inference is the function AI GO! calls. It wires the three stages together — parse and convert the request, invoke the Harness, and convert the resulting events back into the Open Responses format AI GO! expects:
def run_inference(body: str, environment: dict[str, Any]) -> str:
model_input = convert_user_input(
ChatCompletionInput.model_validate(json.loads(body))
)
raw = query_model(model_input, environment)
model_output = convert_model_output(raw)
return model_output.model_dump_json()The rest of this step implements each stage in turn.
Model-side types
We model the two intermediate payloads explicitly so the data flow stays legible:
class ModelInput(BaseModel):
"""Request payload the Harness invocation needs."""
session_id: str = "" # empty on the first turn, reused afterwards
user_message: str
class RawModelOutput(BaseModel):
"""The collected Converse event stream, plus the session id we used."""
session_id: str
events: listStage 1 — convert_user_input
convert_user_inputThis is where multi-turn continuity is established. We pull the latest user message and recover the session_id echoed by the previous assistant turn:
def convert_user_input(data: ChatCompletionInput) -> ModelInput:
messages = data.messages
last_user = next(m for m in reversed(messages) if m.role == "user")
session_id = ""
for msg in reversed(messages):
if msg.role == "assistant":
session_id = getattr(msg, "session_id", "") or ""
break
return ModelInput(
session_id=session_id, user_message=_message_text(last_user.content)
)On the first turn there is no prior assistant message, so session_id stays empty — query_model mints a new one.
Stage 2 — query_model
query_modelMint a session id if there isn't one, invoke the Harness, and collect the event stream:
def query_model(model_input: ModelInput, environment: dict[str, Any]) -> RawModelOutput:
client = boto3.client(
"bedrock-agentcore",
region_name=environment.get("AWS_REGION") or "eu-central-1",
aws_access_key_id=environment["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=environment["AWS_SECRET_ACCESS_KEY"],
aws_session_token=environment.get("AWS_SESSION_TOKEN") or None,
)
session_id = model_input.session_id or _new_session_id()
response = client.invoke_harness(
harnessArn=environment["AWS_HARNESS_ARN"],
runtimeSessionId=session_id,
messages=[{"role": "user", "content": [{"text": model_input.user_message}]}],
)
events: list[dict[str, Any]] = list(response["stream"])
return RawModelOutput(session_id=session_id, events=events)We send only the new user message — AgentCore appends it to the session it already holds. Credentials are taken only from environment (the explicit access key), so the adapter never depends on ~/.aws/credentials, aws sso login, or instance metadata being present in the runtime image.
Example raw model output (Harness → query_model)
A Converse stream interleaves multiple messages per turn (assistant with toolUse → user with toolResult → assistant final), and the toolUse.input / toolResult arrive in deltas that must be reassembled:
{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"events": [
{ "messageStart": { "role": "assistant" } },
{ "contentBlockStart": { "contentBlockIndex": 0,
"start": { "toolUse": { "toolUseId": "tooluse_01",
"name": "search_products" } } } },
{ "contentBlockDelta": { "contentBlockIndex": 0,
"delta": { "toolUse": { "input": "{\"query\": " } } } },
{ "contentBlockDelta": { "contentBlockIndex": 0,
"delta": { "toolUse": { "input": "\"shoes\"}" } } } },
{ "contentBlockStop": { "contentBlockIndex": 0 } },
{ "messageStop": { "stopReason": "tool_use" } },
{ "messageStart": { "role": "user" } },
{ "contentBlockStart": { "contentBlockIndex": 0,
"start": { "toolResult": { "toolUseId": "tooluse_01",
"status": "success" } } } },
{ "contentBlockDelta": { "contentBlockIndex": 0,
"delta": { "toolResult": [{ "text": "[]" }] } } },
{ "contentBlockStop": { "contentBlockIndex": 0 } },
{ "messageStop": { "stopReason": "end_turn" } },
{ "messageStart": { "role": "assistant" } },
{ "contentBlockDelta": { "contentBlockIndex": 0,
"delta": { "text": "I couldn't find any shoes..." } } },
{ "contentBlockStop": { "contentBlockIndex": 0 } },
{ "messageStop": { "stopReason": "end_turn" } },
{ "metadata": { "usage": { "inputTokens": 201, "outputTokens": 22 } } }
]
}Stage 3 — convert_model_output
convert_model_outputThe conversion is a one-liner — all of the work lives in the converter, which consumes the event stream directly:
def convert_model_output(raw: RawModelOutput) -> OpenResponsesModelOutput:
return OpenResponsesConverter().build(raw.events, raw.session_id)The Open Responses converter
Because the stream is delta-based, build first accumulates the events into finalised blocks (reassembling chunked toolUse.input, joining text, tracking interleaved messages by index), then constructs the Open Responses items: each tool-use block becomes a function_call, each tool-result block a function_call_output, and the assistant text is joined into a single reply that carries the session_id:
class OpenResponsesConverter:
def build(
self, events: list[dict[str, Any]], session_id: str = "", **kwargs: Any
) -> OpenResponsesModelOutput:
blocks, usage = self._accumulate_stream(events)
items: list[TraceItem] = []
answer_chunks: list[str] = []
for block in blocks:
kind = block["kind"]
if kind == "tool_use":
items.append(self.build_function_call(block))
elif kind == "tool_result":
items.append(self.build_function_call_output(block))
elif kind == "text":
answer_chunks.append(block["text"])
items.append(self.build_assistant_message("".join(answer_chunks), session_id))
return OpenResponsesModelOutput(items=items, usage=self.build_usage(usage))The per-item builders construct each trace item. The session_id to round-trip is attached to the assistant message as an extra field:
def build_function_call(self, block: dict[str, Any]) -> FunctionCall:
return FunctionCall(
id=str(uuid.uuid4()),
call_id=block["tool_use_id"],
name=block["name"],
arguments=block.get("input_json") or "{}",
status=FunctionCallStatus.completed,
)
def build_function_call_output(self, block: dict[str, Any]) -> FunctionCallOutput:
return FunctionCallOutput(
id=str(uuid.uuid4()),
call_id=block["tool_use_id"],
output=block.get("output") or "",
status=FunctionCallOutputStatusEnum.completed,
)
def build_assistant_message(self, text: str, session_id: str) -> Message:
return Message(
id=str(uuid.uuid4()),
status=MessageStatus.completed,
role=MessageRole.assistant,
content=[OutputTextContent(text=text, annotations=[])],
# Carried as an extra field so the next turn can reuse the session.
session_id=session_id,
)Accumulating the Converse stream (_accumulate_stream)
The stream interleaves multiple messages per turn, each with content blocks indexed by contentBlockIndex, and the toolUse.input / toolResult / text arrive in deltas. We keep a small state machine keyed by (message_index, content_block_index), finalising each block on contentBlockStop into one of three normalized shapes:
@classmethod
def _accumulate_stream(
cls, events: list[dict[str, Any]]
) -> tuple[list[dict[str, Any]], dict[str, int]]:
blocks: list[dict[str, Any]] = []
usage: dict[str, int] = {"num_prompt_tokens": 0, "num_completion_tokens": 0}
message_index = -1
current_role = "assistant"
active: dict[tuple[int, int], dict[str, Any]] = {}
def _finalise(msg_idx: int, block_idx: int) -> None:
state = active.pop((msg_idx, block_idx), None)
if state is None:
return
kind = state.get("kind")
if kind == "text":
text = state.get("text") or ""
if text and state.get("role") == "assistant":
blocks.append({"kind": "text", "role": "assistant", "text": text})
elif kind == "tool_use":
blocks.append(
{
"kind": "tool_use",
"tool_use_id": state.get("tool_use_id") or "",
"name": state.get("name") or "",
"input_json": state.get("input_json") or "",
}
)
elif kind == "tool_result":
blocks.append(
{
"kind": "tool_result",
"tool_use_id": state.get("tool_use_id") or "",
"output": cls._stringify_tool_result(
state.get("output_chunks") or []
),
"status": state.get("status") or "success",
}
)
for event in events:
if "messageStart" in event:
message_index += 1
current_role = event["messageStart"].get("role") or "assistant"
continue
if "contentBlockStart" in event:
payload = event["contentBlockStart"]
block_idx = int(payload.get("contentBlockIndex") or 0)
start = payload.get("start") or {}
tool_use = start.get("toolUse")
tool_result = start.get("toolResult")
if tool_use:
active[(message_index, block_idx)] = {
"kind": "tool_use",
"tool_use_id": tool_use.get("toolUseId") or "",
"name": tool_use.get("name") or "",
"input_json": "",
}
elif tool_result:
active[(message_index, block_idx)] = {
"kind": "tool_result",
"tool_use_id": tool_result.get("toolUseId") or "",
"status": tool_result.get("status") or "success",
"output_chunks": [],
}
else:
active[(message_index, block_idx)] = {
"kind": None,
"role": current_role,
}
continue
if "contentBlockDelta" in event:
payload = event["contentBlockDelta"]
block_idx = int(payload.get("contentBlockIndex") or 0)
delta = payload.get("delta") or {}
state = active.get((message_index, block_idx))
if state is None:
state = {"kind": None, "role": current_role}
active[(message_index, block_idx)] = state
if "text" in delta and delta["text"]:
if state.get("kind") is None:
state["kind"] = "text"
state["text"] = ""
state["text"] = (state.get("text") or "") + delta["text"]
elif "toolUse" in delta:
tu = delta["toolUse"] or {}
state["kind"] = "tool_use"
state["input_json"] = (state.get("input_json") or "") + (
tu.get("input") or ""
)
elif "toolResult" in delta:
state["kind"] = "tool_result"
state.setdefault("output_chunks", []).extend(
delta["toolResult"] or []
)
continue
if "contentBlockStop" in event:
payload = event["contentBlockStop"]
block_idx = int(payload.get("contentBlockIndex") or 0)
_finalise(message_index, block_idx)
continue
if "metadata" in event:
metadata_usage = event["metadata"].get("usage") or {}
usage["num_prompt_tokens"] += int(
metadata_usage.get("inputTokens") or 0
)
usage["num_completion_tokens"] += int(
metadata_usage.get("outputTokens") or 0
)
continue
# Flush any blocks left open by truncated streams so we don't lose content.
for msg_idx, block_idx in list(active.keys()):
_finalise(msg_idx, block_idx)
return blocks, usageStep 3: Wire up the model
Three small files connect the snippet to AI GO!.
model.yaml
model.yamlThe model uses connection_type: custom_inference with the identity chat-completion adapter (the snippet already returns Open Responses, so no adapter transform is needed). The region and Harness ARN are plain config; the AWS credentials are stored as server-side secrets:
display_name: "Customer Support Agent (AWS Bedrock AgentCore)"
key: "customer-support-agent-aws-bedrock"
description: >-
A customer-support agent deployed as an AWS Bedrock AgentCore Harness.
rate_limit: 15
task: "chat_completion"
config:
connection_type: "custom_inference"
adapter:
key: "latticeflow$identity_chat_completion"
run_inference_snippet: !include "./run_inference.py"
environment:
AWS_REGION: $AWS_REGION
AWS_HARNESS_ARN: $AWS_HARNESS_ARN
AWS_ACCESS_KEY_ID: "<< secrets.AWS_ACCESS_KEY_ID >>"
AWS_SECRET_ACCESS_KEY: "<< secrets.AWS_SECRET_ACCESS_KEY >>"
timeout: 120
secrets:
AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY!include "./run_inference.py"inlines the snippet at registration time.<< secrets.* >>references server-side secrets; thesecretsblock uploads them from your.env.rate_limit: 15keeps concurrency moderate — lower it if you see throttling or timeouts.
app.yaml
app.yamlThe model needs an app to live in:
display_name: "AWS Bedrock App"
key: "aws-bedrock-app"
tags: ["Agents", "AWS"]
description: >
Custom-inference integration for an AWS Bedrock AgentCore Harness, exposing
it as an AI GO! model with Open Responses traces..env
.envAWS_REGION=eu-central-1
AWS_HARNESS_ARN=arn:aws:bedrock-agentcore:<region>:<account>:harness/<name>
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=...Step 4: Register and test
# Create and switch to the app
lf add app -f app.yaml
lf switch aws-bedrock-app
# Register the model (uploads the secrets and inlines run_inference.py)
lf add model -f model.yaml
# Verify the Harness is reachable and returns well-formed Open Responses
lf test model customer-support-agent-aws-bedrocklf test model sends a single "Hello!" turn and shows each pipeline stage. A successful run ends with the parsed Open Responses output:
3. Running inference.
Status code: 200
4. Transforming model output.
{"items":[{"id":"...","status":"completed","role":"assistant",
"content":[{"text":"Hello! How can I assist you today?","annotations":[]}],
"session_id":"62c80e9a-..."}],
"usage":{"num_completion_tokens":11,"num_prompt_tokens":446}}
...
output="items=[Message(type='message', ..., role=<MessageRole.assistant>,
content=[OutputTextContent(text='Hello! How can I assist you today?', ...)],
session_id='62c80e9a-...')] usage=ModelUsage(...)"
Successfully tested configuration of model with key 'customer-support-agent-aws-bedrock'.
Two things confirm the integration is correct:
- The assistant reply parses as a
Messagewithrole=assistant. - A
session_idis present on the message — the value the next turn reads back to continue the same Harness session.
Your model is now registered and can be pointed at any multi-turn evaluation.
Adapting this pattern
The integration generalizes to any stateful agent runtime.
Different agent platforms
Swap the boto3 call in query_model for your platform's API and adjust the converter to read its output shape. Keep the contract identical: send only the new user turn + a session id, return the current turn's trace items.
Single-turn agents
If your agent is stateless, drop the session_id machinery entirely: convert_user_input returns just the user message, query_model makes one call, and build_assistant_message omits the session_id extra field.
Carrying state other than a session id
This echo mechanism works for any opaque state. Whatever you attach to the assistant message (a session token, a cursor, a serialized memory blob) comes back on the next request — read it in convert_user_input and forward it to your endpoint.
Plugging into an evaluation
Because the snippet emits full traces (tool calls, outputs, and replies), the model drops straight into trace-aware evaluations — multi-turn solvers, function-call-coverage scorers, or model-as-a-judge scorers over open_responses traces. Point a task_specification at this model key and run lf run -f run.yaml.
