This example demonstrates how to poll the result of a workflow that is running in the background
import asyncio
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.storage.sqlite import SqliteStorage
from agno.team import Team
from agno.tools.googlesearch import GoogleSearchTools
from agno.tools.hackernews import HackerNewsTools
from agno.utils.pprint import pprint_run_response
from agno.workflow.v2.step import Step
from agno.workflow.v2.workflow import Workflow
# Define agents
hackernews_agent = Agent(
name="Hackernews Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[HackerNewsTools()],
role="Extract key insights and content from Hackernews posts",
)
web_agent = Agent(
name="Web Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[GoogleSearchTools()],
role="Search the web for the latest news and trends",
)
# Define research team for complex analysis
research_team = Team(
name="Research Team",
mode="coordinate",
members=[hackernews_agent, web_agent],
instructions="Research tech topics from Hackernews and the web",
)
content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"Plan a content schedule over 4 weeks for the provided topic and research content",
"Ensure that I have posts for 3 posts per week",
],
)
# Define steps
research_step = Step(
name="Research Step",
team=research_team,
)
content_planning_step = Step(
name="Content Planning Step",
agent=content_planner,
)
content_creation_workflow = Workflow(
name="Content Creation Workflow",
description="Automated content creation from blog posts to social media",
storage=SqliteStorage(
table_name="workflow_v2_bg",
db_file="tmp/workflow_v2_bg.db",
mode="workflow_v2",
),
steps=[research_step, content_planning_step],
)
async def main():
print(" Starting Async Background Workflow Test")
# Start background execution (async)
bg_response = await content_creation_workflow.arun(
message="AI trends in 2024", background=True
)
print(f" Initial Response: {bg_response.status} - {bg_response.content}")
print(f" Run ID: {bg_response.run_id}")
# Poll every 5 seconds until completion
poll_count = 0
final_result = None
while True:
poll_count += 1
print(f"\n Poll #{poll_count} (every 5s)")
result = content_creation_workflow.get_run(bg_response.run_id)
if result is None:
print(" Workflow not found yet, still waiting...")
if poll_count > 50:
print(f"⏰ Timeout after {poll_count} attempts")
break
await asyncio.sleep(5)
continue
final_result = result
if result.has_completed():
print(f" Workflow completed with status: {result.status}")
break
print(f" Status: {result.status}")
if poll_count > 200:
print(f"⏰ Timeout after {poll_count} attempts")
break
await asyncio.sleep(5)
if final_result:
print(f"\n Final Result:")
print("=" * 50)
pprint_run_response(final_result, markdown=True)
else:
print("❌ No final result available")
if __name__ == "__main__":
asyncio.run(main())
Was this page helpful?