Skip to content

Workflow API

A Workflow is a stateful, code-defined orchestration of agents and logic. You subclass Workflow and implement a run() method; Buddy wraps it to add session state, storage, and memory. Import from buddy.workflow:

from buddy.workflow import Workflow, RunResponse

from buddy import Agent
from buddy.models.openai import OpenAIChat

class ResearchWorkflow(Workflow):
    description = "Research a topic and write a summary."

    researcher = Agent(model=OpenAIChat(id="gpt-4o-mini"))

    def run(self, topic: str) -> RunResponse:
        facts = self.researcher.run(f"Find key facts about {topic}.")
        return RunResponse(content=f"Summary of {topic}:\n{facts.content}")

wf = ResearchWorkflow()
result = wf.run(topic="quantum computing")
print(result.content)

Implement run(), call run()

Buddy intercepts your subclass's run() and routes it through run_workflow(), which assigns a run_id/session_id, persists to storage, and records the run in memory. You still call wf.run(...) as normal. Implement arun() instead (or as well) for async workflows.

Constructor parameters

Workflow.__init__ is keyword-only:

Parameter Type Default Description
name str class name Workflow name.
workflow_id str autogenerated Stable workflow identifier.
description str class attr Description (shown in the UI).
user_id str None ID of the interacting user.
session_id str autogenerated Session identifier.
session_name str None Human-readable session name.
session_state dict {} State persisted across runs.
memory WorkflowMemory \| Memory None Memory backend (defaults to Memory()).
storage Storage None Session persistence backend.
extra_data dict None Arbitrary data stored with the workflow.
debug_mode bool False Enable debug logging.

Return values

Your run() may return either:

  • A single RunResponse — the workflow result.
  • An Iterator[RunResponseEvent] — yield events to stream progress.

For async workflows, implement arun(); it may return a RunResponse or be an async generator yielding events.

from buddy.workflow import Workflow, RunResponse

class StreamingWorkflow(Workflow):
    def run(self, query: str):
        for chunk in self.agent.run(query, stream=True):
            yield chunk  # RunResponseEvent objects

RunResponse

Workflows return the same RunResponse documented in the Agent API (buddy.run.response.RunResponse). When returned from a workflow it additionally carries a workflow_id. The most-used field is content; helpers include to_dict(), to_json(), and from_dict().

WorkflowSession

When storage is set, runs are persisted as a WorkflowSession (buddy.storage.session.workflow.WorkflowSession, re-exported from buddy.workflow):

Field Type Description
session_id str Session UUID.
workflow_id str Associated workflow.
user_id str Interacting user.
memory dict Serialized memory (runs).
session_data dict session_name, session_state, media.
workflow_data dict name, workflow_id, description.
extra_data dict Arbitrary extra data.
created_at / updated_at int Unix timestamps.

Persisting workflows

Pass a storage backend (e.g. SqliteStorage from buddy.storage.sqlite) to resume sessions and keep session_state across runs. Use a stable session_id to continue an existing session.