Strands Agents integration
Temporal's integration with Strands Agents gives your Strands agents durable execution, automatic retries, and timeouts via the Temporal platform. The plugin routes Strands model invocations, tool calls, MCP tool calls, and hooks through Temporal Activities, so every step the agent takes is recorded in workflow history.
The Temporal Python SDK integration with Strands Agents is currently at an experimental release stage. The API may change in future versions.
Code snippets in this guide are taken from the Strands Agents plugin samples. Refer to the samples for the complete code.
Prerequisites
- This guide assumes you are already familiar with Strands Agents. If you aren't, refer to the Strands Agents documentation for more details.
- If you are new to Temporal, we recommend reading Understanding Temporal or taking the Temporal 101 course.
- Ensure you have set up your local development environment by following the Set up your local development environment guide. When you're done, leave the Temporal development server running if you want to test your code locally.
Install the plugin
Install the Temporal Python SDK with Strands Agents support (requires temporalio 1.28.0 or later):
uv add "temporalio[strands-agents]"
or with pip:
pip install "temporalio[strands-agents]"
Quickstart
Define a Workflow that holds a TemporalAgent, then register StrandsPlugin on the Worker:
import asyncio
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin, TemporalAgent
from temporalio.worker import Worker
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60))
@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
return str(result)
async def main() -> None:
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="strands",
workflows=[MyWorkflow],
plugins=[StrandsPlugin()],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Start the Workflow from a client:
import asyncio
from temporalio.client import Client
from workflow import MyWorkflow
async def main() -> None:
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
MyWorkflow.run,
"Hello",
id="strands-quickstart",
task_queue="strands",
)
print(result)
if __name__ == "__main__":
asyncio.run(main())
Inside a Workflow, always call agent.invoke_async(message) — not agent(message). The synchronous form spawns a
worker thread, which the Workflow sandbox blocks.
Models
StrandsPlugin(models=...) takes a mapping of name → factory. Each factory is called lazily on first use (on the
Worker, outside the Workflow sandbox) and the constructed model is cached for the Worker's lifetime. If models is
omitted, the plugin registers a single BedrockModel() factory under the name "bedrock", matching Strands' own
implicit default. Select a model per agent with TemporalAgent(model="name", ...):
from strands.models.anthropic import AnthropicModel
from strands.models.bedrock import BedrockModel
# Workflow
@workflow.defn
class MultiModelWorkflow:
def __init__(self) -> None:
self.agent_a = TemporalAgent(
model="claude",
start_to_close_timeout=timedelta(seconds=60),
)
self.agent_b = TemporalAgent(
model="bedrock",
start_to_close_timeout=timedelta(seconds=60),
)
# Worker
Worker(..., plugins=[StrandsPlugin(models={
"claude": lambda: AnthropicModel(client_args={"api_key": "..."}),
"bedrock": lambda: BedrockModel(),
})])
Each TemporalAgent carries its own Activity options (timeouts, retry policy, task queue, streaming topic) and
dispatches to the shared model Activity, which resolves the model name against the registered factories at runtime. A
name not present in models raises ValueError inside the Activity.
Tools
Wrap non-deterministic tools as Temporal Activities, register them with the Worker, and pass them to the agent through
workflow.activity_as_tool:
from strands_tools import shell
from temporalio import activity
from temporalio.contrib.strands import StrandsPlugin, TemporalAgent
from temporalio.contrib.strands import workflow as strands_workflow
@activity.defn
async def fetch_user(user_id: str) -> dict:
...
@activity.defn(name="shell")
async def shell_activity(command: str) -> dict:
return shell.shell(command=command, non_interactive=True)
# Workflow
agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[
strands_workflow.activity_as_tool(fetch_user, start_to_close_timeout=timedelta(seconds=30)),
strands_workflow.activity_as_tool(shell_activity, start_to_close_timeout=timedelta(seconds=15)),
],
)
# Worker
Worker(
...,
activities=[fetch_user, shell_activity],
plugins=[StrandsPlugin()],
)
If you're using built-in strands_tools, wrap them in a thin async function decorated with @activity.defn so they
run as Temporal Activities.
Hooks
Strands' hook system lets you subscribe callbacks to events in the agent lifecycle —
invocation start and end, model call before and after, tool call before and after, and message added. Pass
hooks=[MyHookProvider()] to TemporalAgent; single-agent hook events fire in Workflow context, so deterministic
callbacks just work:
from strands.hooks import HookProvider, HookRegistry
from strands.hooks.events import AfterToolCallEvent
from temporalio import workflow
class AuditHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(AfterToolCallEvent, self._on_tool_call)
def _on_tool_call(self, event: AfterToolCallEvent) -> None:
workflow.logger.info(f"tool {event.tool_use['name']} finished")
agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60), hooks=[AuditHook()])
Hook callbacks run in Workflow context, so they must be
deterministic — no time.time(), uuid.uuid4(), or
I/O. For callbacks that need I/O (audit logging, metrics, alerting), use workflow.activity_as_hook() to dispatch the
work as a Temporal Activity:
from temporalio import activity
from temporalio.contrib.strands.workflow import activity_as_hook
@activity.defn
async def persist_tool_call(tool_name: str) -> None:
# I/O safely in an activity.
...
class AuditHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(
AfterToolCallEvent,
activity_as_hook(
persist_tool_call,
activity_input=lambda event: event.tool_use["name"],
start_to_close_timeout=timedelta(seconds=10),
),
)
activity_input extracts serializable values from the event to pass as the Activity's input. Use a dataclass or
Pydantic model for multiple values. This is needed because hook events hold references to the Agent, AgentTool
instances, and other objects that don't cross the Activity boundary.
Human-in-the-loop interrupts
Strands offers two human-in-the-loop surfaces; both work with the plugin. In each case, agent.invoke_async() returns
AgentResult(stop_reason="interrupt", interrupts=[...]) instead of raising. Pair this with a Signal handler that
supplies responses, then resume by calling agent.invoke_async(responses).
Hook-based interrupts
A hook on an interruptible event (for example, BeforeToolCallEvent) can pause the agent by calling
event.interrupt(name, reason=...). The hook runs in Workflow context, so it must be deterministic — no I/O.
from strands.hooks import HookProvider, HookRegistry
from strands.hooks.events import BeforeToolCallEvent
from temporalio import workflow
class ApprovalHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeToolCallEvent, self._gate)
def _gate(self, event: BeforeToolCallEvent) -> None:
if event.interrupt("approval", reason="confirm delete") != "approve":
event.cancel_tool = "denied"
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[delete_thing],
hooks=[ApprovalHook()],
)
self._approval: str | None = None
@workflow.signal
def approve(self, response: str) -> None:
self._approval = response
@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
if result.stop_reason == "interrupt":
await workflow.wait_condition(lambda: self._approval is not None)
result = await self.agent.invoke_async([
{"interruptResponse": {"interruptId": result.interrupts[0].id, "response": self._approval}}
])
return str(result)
Tool-body interrupts
A @strands.tool function can raise InterruptException(Interrupt(...)) directly. The agent stops with the interrupt,
and the Workflow handles the resume the same way as for hooks:
from strands import tool
from strands.interrupt import Interrupt, InterruptException
@tool
def delete_thing(name: str) -> str:
raise InterruptException(
Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?")
)
The same works from an activity_as_tool-wrapped Activity. The plugin's failure converter preserves the Interrupt
payload across the Activity boundary, so AgentResult.interrupts is populated just like the in-Workflow case:
from strands.interrupt import Interrupt, InterruptException
from temporalio import activity
from temporalio.contrib.strands.workflow import activity_as_tool
@activity.defn
async def delete_thing(name: str) -> str:
if not await policy.is_authorized(name):
raise InterruptException(
Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?")
)
await storage.delete(name)
return f"deleted {name}"
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[activity_as_tool(delete_thing, start_to_close_timeout=timedelta(seconds=10))],
)
Activity-tool interrupts rely on the plugin's failure converter, which is installed via the client's data converter.
Attach StrandsPlugin to the client (not just the Worker) for them to work — Workers built from that client pick
up the plugin automatically:
client = await Client.connect("localhost:7233", plugins=[StrandsPlugin()])
Worker(client, task_queue="strands", workflows=[MyWorkflow], activities=[delete_thing])
Structured output
Like Strands' Agent, TemporalAgent supports structured output with structured_output_model. The plugin defaults
to the pydantic_data_converter, so Pydantic types
serialize cleanly across the Activity and Workflow boundary:
from pydantic import BaseModel
class PersonInfo(BaseModel):
name: str
age: int
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
structured_output_model=PersonInfo,
)
@workflow.run
async def run(self, prompt: str) -> PersonInfo:
result = await self.agent.invoke_async(prompt)
return result.structured_output
Streaming
To forward model chunks to external consumers, pass streaming_topic="..." to TemporalAgent and host a
WorkflowStream on the Workflow. Each StreamEvent is published on the named topic from inside the model Activity;
subscribers read via WorkflowStreamClient. Chunks are batched on streaming_batch_interval (default 100ms):
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamClient
# Workflow
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.stream = WorkflowStream()
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
streaming_topic="events",
)
# Client
async for item in WorkflowStreamClient.create(client, workflow_id).subscribe(
["events"], result_type=StreamEvent,
):
print(item.data)
MCP
StrandsPlugin(mcp_clients=...) takes a mapping of name → MCPClient factory, mirroring the models= pattern. The
plugin registers a per-server {name}-call-tool Activity and connects at Worker startup to enumerate tools.
Workflow-side, TemporalMCPClient(server="name") is a pure handle: it references the server by name and carries the
per-call Activity options.
from datetime import timedelta
from mcp import StdioServerParameters, stdio_client
from strands.tools.mcp.mcp_client import MCPClient
from temporalio import workflow
from temporalio.contrib.strands import StrandsPlugin, TemporalAgent, TemporalMCPClient
# Workflow
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
echo = TemporalMCPClient(server="echo", start_to_close_timeout=timedelta(seconds=30))
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[echo],
)
# Worker
Worker(
...,
plugins=[StrandsPlugin(
mcp_clients={
"echo": lambda: MCPClient(
lambda: stdio_client(
StdioServerParameters(command="...", args=[...]),
),
),
},
)],
)
Each factory returns a fully configured MCPClient, so you can pass options like tool_filters, prefix,
elicitation_callback, or tasks_config to it.
The plugin connects to each MCP server once at Worker startup to enumerate tools. The schema is frozen for the Worker's lifetime; restart Workers to pick up MCP-server changes. If a server is unavailable at startup, the Worker fails to start.
Retries
TemporalAgent disables Strands' built-in ModelRetryStrategy so retries are handled exclusively by Temporal.
Configure retries via retry_policy on TemporalAgent, and on the Activity options accepted by
workflow.activity_as_tool, workflow.activity_as_hook, and TemporalMCPClient:
from temporalio.common import RetryPolicy
TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=3),
)
Passing retry_strategy=... to TemporalAgent(...) raises ValueError; remove the argument (or pass
retry_strategy=None) and put the retry config on the Activity options instead.
Continue-as-new
A chat-style Workflow accumulates message history with every turn and eventually hits Temporal's per-Workflow history
limit. Use continue-as-new to start a fresh execution while carrying
agent.messages forward as input:
from dataclasses import dataclass, field
from strands.types.content import Messages
from temporalio import workflow
@dataclass
class ChatInput:
messages: Messages = field(default_factory=list)
@workflow.defn
class ChatWorkflow:
def __init__(self) -> None:
self._pending: list[str] = []
self._done = False
@workflow.signal
def user_says(self, prompt: str) -> None:
self._pending.append(prompt)
@workflow.signal
def end_chat(self) -> None:
self._done = True
@workflow.run
async def run(self, input: ChatInput) -> None:
agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
messages=list(input.messages),
)
while True:
await workflow.wait_condition(lambda: self._pending or self._done)
if self._done:
return
await agent.invoke_async(self._pending.pop(0))
if workflow.info().is_continue_as_new_suggested():
workflow.continue_as_new(ChatInput(messages=agent.messages))
Observability
StrandsPlugin composes cleanly with the OpenTelemetry plugin. Register
OpenTelemetryPlugin on the client (Workers built from that client pick it up automatically) and StrandsPlugin on
the Worker. You'll get OTel spans around the model, tool, and MCP Activities the plugin schedules, plus any spans
Strands itself emits inside invoke_async:
import opentelemetry.trace
from temporalio.client import Client
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker
opentelemetry.trace.set_tracer_provider(create_tracer_provider())
client = await Client.connect("localhost:7233", plugins=[OpenTelemetryPlugin()])
Worker(
client,
task_queue="strands",
workflows=[MyWorkflow],
plugins=[StrandsPlugin()],
)
Set the tracer provider before connecting the client.
Snapshots
TemporalAgent.take_snapshot() and TemporalAgent.load_snapshot() raise NotImplementedError. Temporal's event
history already persists Workflow state durably at a finer granularity than Strands snapshots, so calling either
inside a Workflow is redundant.
Samples
The Strands Agents plugin samples demonstrate all supported patterns end-to-end.