Workflow API
A Workflow is a stateful, code-defined orchestration of agents and logic. You
subclass Workflow and implement a run() method; Buddy wraps it to add
session state, storage, and memory. Import from buddy.workflow:
from buddy.workflow import Workflow, RunResponse
from buddy import Agent
from buddy.models.openai import OpenAIChat
class ResearchWorkflow(Workflow):
description = "Research a topic and write a summary."
researcher = Agent(model=OpenAIChat(id="gpt-4o-mini"))
def run(self, topic: str) -> RunResponse:
facts = self.researcher.run(f"Find key facts about {topic}.")
return RunResponse(content=f"Summary of {topic}:\n{facts.content}")
wf = ResearchWorkflow()
result = wf.run(topic="quantum computing")
print(result.content)
Implement run(), call run()
Buddy intercepts your subclass's run() and routes it through
run_workflow(), which assigns a run_id/session_id, persists to storage,
and records the run in memory. You still call wf.run(...) as normal.
Implement arun() instead (or as well) for async workflows.
Constructor parameters
Workflow.__init__ is keyword-only:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
class name | Workflow name. |
workflow_id |
str |
autogenerated | Stable workflow identifier. |
description |
str |
class attr | Description (shown in the UI). |
user_id |
str |
None |
ID of the interacting user. |
session_id |
str |
autogenerated | Session identifier. |
session_name |
str |
None |
Human-readable session name. |
session_state |
dict |
{} |
State persisted across runs. |
memory |
WorkflowMemory \| Memory |
None |
Memory backend (defaults to Memory()). |
storage |
Storage |
None |
Session persistence backend. |
extra_data |
dict |
None |
Arbitrary data stored with the workflow. |
debug_mode |
bool |
False |
Enable debug logging. |
Return values
Your run() may return either:
- A single
RunResponse— the workflow result. - An
Iterator[RunResponseEvent]— yield events to stream progress.
For async workflows, implement arun(); it may return a RunResponse or be an
async generator yielding events.
from buddy.workflow import Workflow, RunResponse
class StreamingWorkflow(Workflow):
def run(self, query: str):
for chunk in self.agent.run(query, stream=True):
yield chunk # RunResponseEvent objects
RunResponse
Workflows return the same RunResponse documented in the
Agent API (buddy.run.response.RunResponse). When
returned from a workflow it additionally carries a workflow_id. The most-used
field is content; helpers include to_dict(), to_json(), and
from_dict().
WorkflowSession
When storage is set, runs are persisted as a WorkflowSession
(buddy.storage.session.workflow.WorkflowSession, re-exported from
buddy.workflow):
| Field | Type | Description |
|---|---|---|
session_id |
str |
Session UUID. |
workflow_id |
str |
Associated workflow. |
user_id |
str |
Interacting user. |
memory |
dict |
Serialized memory (runs). |
session_data |
dict |
session_name, session_state, media. |
workflow_data |
dict |
name, workflow_id, description. |
extra_data |
dict |
Arbitrary extra data. |
created_at / updated_at |
int |
Unix timestamps. |
Persisting workflows
Pass a storage backend (e.g. SqliteStorage from buddy.storage.sqlite)
to resume sessions and keep session_state across runs. Use a stable
session_id to continue an existing session.