Copy
Ask AI
"""
Trace To Database
=================
Demonstrates Agno's two-table trace design and how to inspect traces and spans.
"""
import time # noqa
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.tools.hackernews import HackerNewsTools
from agno.tracing import setup_tracing
from agno.utils.pprint import pprint_run_response
# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------
# Set up database
db = SqliteDb(db_file="tmp/traces.db")
# Set up tracing - this instruments ALL agents automatically
setup_tracing(db=db)
# ---------------------------------------------------------------------------
# Create Agent
# ---------------------------------------------------------------------------
agent = Agent(
name="HackerNews Agent",
model=OpenAIChat(id="gpt-5.2"),
tools=[HackerNewsTools()],
instructions="You are a hacker news agent. Answer questions concisely.",
markdown=True,
)
# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
def run_trace_demo() -> None:
# Run the agent - traces will be captured automatically
print("=" * 60)
print("Running agent with automatic tracing...")
print("=" * 60)
response = agent.run("What is the latest news on AI?")
pprint_run_response(response)
# Query traces and spans from database
print("\n" + "=" * 60)
print("Traces and Spans in Database:")
print("=" * 60)
# If using BatchSpanProcessor, wait for traces to be flushed before querying.
# time.sleep(5) # Uncomment this if using BatchSpanProcessor
try:
# Get the trace for this run
trace = db.get_trace(run_id=response.run_id)
if not trace:
print(
"\n[ERROR] No trace found. Make sure openinference-instrumentation-agno is installed."
)
else:
print("\n Found trace for run")
print(f"\n Trace ID: {trace.trace_id[:16]}...")
print(f" Name: {trace.name}")
print(f" Status: {trace.status}")
print(f" Duration: {trace.duration_ms}ms")
print(f" Total Spans: {trace.total_spans}")
if trace.error_count > 0:
print(f" Errors: {trace.error_count}")
if trace.agent_id:
print(f" Agent ID: {trace.agent_id}")
if trace.run_id:
print(f" Run ID: {trace.run_id[:16]}...")
if trace.session_id:
print(f" Session ID: {trace.session_id[:16]}...")
# Get all spans for this trace
spans = db.get_spans(trace_id=trace.trace_id)
print(f"\n All spans in this trace ({len(spans)} spans):")
for span in sorted(spans, key=lambda s: s.start_time):
indent = " " if span.parent_span_id else ""
duration = (
f"{span.duration_ms}ms"
if span.duration_ms < 1000
else f"{span.duration_ms / 1000:.1f}s"
)
print(f" {indent}- {span.name} ({duration}) [{span.status_code}]")
# Show span kind and key attributes
span_kind = span.attributes.get("openinference.span.kind")
if span_kind:
print(f" {indent} Kind: {span_kind}")
# Show detailed attributes based on span kind
if span_kind == "AGENT":
# Agent-specific attributes
if span.attributes.get("input.value"):
input_val = span.attributes["input.value"]
if len(str(input_val)) < 80:
print(f" {indent} Input: {input_val}")
if span.attributes.get("output.value"):
output_val = span.attributes["output.value"]
if len(str(output_val)) < 80:
print(f" {indent} Output: {output_val}")
elif span_kind == "TOOL":
# Tool-specific attributes
tool_name = span.attributes.get("tool.name")
if tool_name:
print(f" {indent} Tool: {tool_name}")
params = span.attributes.get("tool.parameters")
if params:
print(f" {indent} Input: {params}")
output = span.attributes.get("output.value")
if output:
output_str = str(output)[:100]
print(
f" {indent} Output: {output_str}{'...' if len(str(output)) > 100 else ''}"
)
elif span_kind == "LLM":
# LLM-specific attributes
model_name = span.attributes.get(
"llm.model_name"
) or span.attributes.get("gen_ai.request.model")
if model_name:
print(f" {indent} Model: {model_name}")
# Token usage
input_tokens = span.attributes.get(
"llm.token_count.prompt"
) or span.attributes.get("gen_ai.usage.prompt_tokens")
output_tokens = span.attributes.get(
"llm.token_count.completion"
) or span.attributes.get("gen_ai.usage.completion_tokens")
if input_tokens or output_tokens:
print(
f" {indent} Tokens: {input_tokens or 0} in, {output_tokens or 0} out"
)
# Show input/output messages (first few)
input_messages = span.attributes.get("llm.input_messages")
if (
input_messages
and isinstance(input_messages, list)
and len(input_messages) > 0
):
last_msg = input_messages[-1]
if isinstance(last_msg, dict) and "message.content" in last_msg:
content = last_msg["message.content"]
if len(str(content)) < 80:
print(f" {indent} Prompt: {content}")
# Show any error messages
if span.status_code == "ERROR" and span.status_message:
print(f" {indent} [ERROR] Error: {span.status_message}")
# Show important generic attributes (excluding the ones we already showed)
important_attrs = {
"session.id": "Session",
"user.id": "User",
"agno.agent.id": "Agent",
"agno.run.id": "Run",
}
for attr_key, label in important_attrs.items():
if attr_key in span.attributes and span.attributes[attr_key]:
val = span.attributes[attr_key]
# Truncate long IDs
if len(str(val)) > 16:
val = f"{str(val)[:16]}..."
print(f" {indent} {label}: {val}")
# Show all other attributes (for debugging - can be commented out)
shown_keys = {
"openinference.span.kind",
"input.value",
"output.value",
"tool.name",
"tool.parameters",
"llm.model_name",
"gen_ai.request.model",
"llm.token_count.prompt",
"llm.token_count.completion",
"gen_ai.usage.prompt_tokens",
"gen_ai.usage.completion_tokens",
"llm.input_messages",
"session.id",
"user.id",
"agno.agent.id",
"agno.run.id",
}
other_attrs = {
k: v for k, v in span.attributes.items() if k not in shown_keys
}
if other_attrs:
print(f" {indent} Other attributes ({len(other_attrs)}):")
for key, value in list(other_attrs.items())[:8]: # Show first 8
value_str = str(value)
if len(value_str) > 60:
value_str = value_str[:60] + "..."
print(f" {indent} • {key}: {value_str}")
print("\n" + "=" * 60)
print("\n Summary:")
print(f" • Trace: {trace.trace_id[:16]}...")
print(f" • Total Spans: {len(spans)}")
print(f" • Errors: {trace.error_count}")
except Exception as e:
print(f"\n[ERROR] Error querying traces: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
run_trace_demo()
Run the Example
Copy
Ask AI
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/92_integrations/observability
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python trace_to_database.py