Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.agno.com/llms.txt

Use this file to discover all available pages before exploring further.

A WorkflowFactory produces a fresh Workflow for each request. Register it in AgentOS(workflows=[...]) alongside any prototype workflows.
basic_workflow_factory.py
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.factory import RequestContext
from agno.models.openai import OpenAIResponses
from agno.os import AgentOS
from agno.workflow import Step, Workflow, WorkflowFactory

db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")


def build_content_pipeline(ctx: RequestContext) -> Workflow:
    user_id = ctx.user_id or "anonymous"

    drafter = Agent(
        name="Drafter",
        model=OpenAIResponses(id="gpt-5.4"),
        instructions=(
            f"You are a content drafter for tenant {user_id}. "
            "Write a first draft based on the topic. Keep it focused and concise."
        ),
    )
    editor = Agent(
        name="Editor",
        model=OpenAIResponses(id="gpt-5.4"),
        instructions=(
            f"You are an editor for tenant {user_id}. "
            "Review the draft for clarity, grammar, and structure. Output the final version."
        ),
    )

    return Workflow(
        name="Content Pipeline",
        description="Draft then edit content",
        db=db,
        steps=[
            Step(name="draft", description="Write the first draft", agent=drafter),
            Step(name="edit", description="Edit and finalize", agent=editor),
        ],
    )


content_pipeline_factory = WorkflowFactory(
    id="content-pipeline",
    db=db,
    factory=build_content_pipeline,
    name="Content Pipeline",
    description="Builds a draft-then-edit content workflow per tenant.",
)

agent_os = AgentOS(workflows=[content_pipeline_factory])
app = agent_os.get_app()

if __name__ == "__main__":
    agent_os.serve(app="basic_workflow_factory:app", port=7777, reload=True)
Run the workflow like any other workflow:
curl -X POST http://localhost:7777/workflows/content-pipeline/runs \
    -F 'message=Write a blog post about sustainable energy' \
    -F 'user_id=tenant_42' \
    -F 'stream=false'
See the Factories reference for the full constructor signature and parameter list.

Step Shape That Varies by Tier

The step graph can change per request. A common pattern: add or drop steps based on a tier read from ctx.trusted.claims.
def build_article_pipeline(ctx: RequestContext) -> Workflow:
    tier = ctx.trusted.claims.get("tier", "free")
    model_id = TIER_MODELS.get(tier, "gpt-4.1-mini")

    steps = []
    if tier == "enterprise":
        steps.append(Step(name="research", agent=researcher(model_id)))
    steps.append(Step(name="draft", agent=drafter(model_id)))
    steps.append(Step(name="edit", agent=editor(model_id)))

    return Workflow(name="Article Pipeline", db=db, steps=steps)
The tier claim is read from verified middleware, so the tier cannot be changed by the request body. See Authorization From Verified Context for the trust model and JWT Role Factory for an end-to-end JWT example.

Async Factories

Use an async callable when you need to fetch context from a database, an HTTP service, or any other awaitable resource.
async_workflow_factory.py
async def build_pipeline(ctx: RequestContext) -> Workflow:
    config = await fetch_pipeline_config(ctx.user_id)
    steps = [Step(name=s.name, agent=build_agent(s)) for s in config.steps]
    return Workflow(name="Pipeline", db=db, steps=steps)


pipeline_factory = WorkflowFactory(id="pipeline", db=db, factory=build_pipeline)

With an Input Schema

Declare a Pydantic model on the factory to validate client-supplied factory_input before the factory runs.
workflow_input_schema.py
from pydantic import BaseModel


class PipelineConfig(BaseModel):
    include_research: bool = False


def build_pipeline(ctx: RequestContext) -> Workflow:
    cfg: PipelineConfig = ctx.input
    steps = []
    if cfg.include_research:
        steps.append(Step(name="research", agent=researcher))
    steps.extend([
        Step(name="draft", agent=drafter),
        Step(name="edit", agent=editor),
    ])
    return Workflow(name="Pipeline", db=db, steps=steps)


pipeline_factory = WorkflowFactory(
    id="pipeline",
    db=db,
    factory=build_pipeline,
    input_schema=PipelineConfig,
)

Error Handling

Raise FactoryPermissionError from inside the factory to reject unauthorized callers with HTTP 403. AgentOS raises FactoryValidationError (400) automatically when factory_input fails input_schema validation.
from agno.factory import FactoryPermissionError


def build_pipeline(ctx: RequestContext) -> Workflow:
    if "workflows:run" not in ctx.trusted.scopes:
        raise FactoryPermissionError("Missing 'workflows:run' scope")
    ...
See the Factories reference for the full exception hierarchy and the post-resolve behavior.

Factory Re-invocation on Read Endpoints

GET /workflows/{id}/runs/{run_id} and GET /workflows/{id}/runs re-invoke the factory because the workflow has to be reconstructed to read its session state. Pass factory_input as a query parameter to drive the rebuild:
curl "http://localhost:7777/workflows/content-pipeline/runs/RUN_ID?session_id=SESSION_ID&factory_input=%7B%22tier%22%3A%22enterprise%22%7D"

Developer Resources