Building from Scratch
If you're building agents from the ground up, CodonWorkload provides a flexible foundation for creating single-agent and multi-agent workflows with token-based execution and comprehensive audit trails.
The CodonWorkload Class
CodonWorkload is the foundation for building observable AI agents. Whether you build from scratch or use frameworks like LangGraph, it gives your agents the ability to track their own performance, costs, and behavior over time.
Key capabilities:
- Node registration: Add Python functions as workflow steps
- Graph definition: Connect nodes with directed edges to define execution flow
- Token-based execution: Messages flow between nodes with full provenance tracking
- Audit trails: Automatic recording of every step, decision, and data transformation
- Runtime operations: Nodes can emit messages, record custom events, and share state
For complete method signatures and parameters, see the API Reference.
Key Concepts
Nodes: Individual Python functions that perform specific tasks (e.g., "summarize", "validate", "format"). Each node receives input, processes it, and can emit output to other nodes.
Edges: Directed connections between nodes that define how tokens (messages) flow through your workflow.
Tokens: Immutable messages that carry data between nodes, each with unique provenance tracking. This creates an immutable tape of your agent's actions during invocation that can be used for compliance monitoring, audit, evaluation, debugging, and experimentation.
Runtime: The execution context that provides nodes access to operations like runtime.emit(), runtime.record_event(), and runtime.state.
Node Function Signatures
When building agents from scratch, your node functions should follow a specific signature pattern to access Codon's workflow features:
def your_node_function(payload, *, runtime, context):
# Your business logic here
result = process(payload)
# Use runtime features
runtime.emit("next_node", result)
runtime.record_event("processed", metadata={"size": len(result)})
return result
Parameters for Full Instrumentation:
payload: The input data for this node (from initial execution or upstreamruntime.emit()calls)runtime: Provides access to Codon workflow operations (required for connected workflows)context: Execution metadata likedeployment_id,run_id,workload_name
What runtime enables:
runtime.emit(target_node, data): Send data to downstream nodes to continue the workflowruntime.record_event(event_type, metadata={}): Add custom entries to the audit trailruntime.state: Shared dictionary for coordination between nodes in the same executionruntime.stop(): Halt the entire workflow execution early
Single-Agent Workflow
Here's a simple Q&A agent that processes a user question:
from codon_sdk.instrumentation import initialize_telemetry
from codon_sdk.agents import CodonWorkload
from datetime import datetime
# Initialize telemetry for platform integration
initialize_telemetry()
# Define a workload
workload = CodonWorkload(name="QA-Agent", version="0.1.0")
# Define node functions
def call_model(message: Dict[str, Any], *, runtime, context):
prompt = message["question"]
answer = call_openai(prompt, system="You are a helpful assistant that answers questions you are asked.")
runtime.record_event("call_model", metadata={"answer_length": len(answer)})
runtime.emit("finalize", {"question": message["question"], "answer": answer})
return answer
def prompt_builder(message: Dict[str, Any], *, runtime, context):
question = message["question"]
prompt = (
"Answer the user's question in clear, friendly prose."
f"Question: {question}"
)
runtime.record_event("prompt_created", metadata={"length": len(prompt)})
runtime.emit("call_model", {"question": question})
return prompt
def finalize(message: Dict[str, Any], *, runtime, context):
result = {
"question": message["question"],
"answer": message["answer"],
"timestamp": datetime.utcnow().isoformat() + "Z",
}
runtime.record_event("finalized", metadata={"question_hash": hash(message["question"])})
return result
# Add the nodes
workload.add_node(call_model, name="call_model", role="llm")
workload.add_node(finalize, name="finalize", role="postprocess")
workload.add_node(prompt_builder, name="prompt_builder", role="format_prompt")
# Define the edges
workload.add_edge("prompt_builder", "call_model")
workload.add_edge("call_model", "finalize")
# Execute the workload
report = workload.execute({"question": "What is the meaning of life, the universe, and everything?"}, deployment_id="local")
# Check results
final_result = report.node_results("finalize")[-1]
print(f"Logic ID: {workload.logic_id}")
Multi-Agent Workflow
For more complex scenarios, you can orchestrate multiple agents that collaborate through token passing and shared state:
from codon_sdk.instrumentation import initialize_telemetry
# Initialize telemetry for platform integration
initialize_telemetry()
def build_multi_agent_workload() -> CodonWorkload:
workload = CodonWorkload(name="Research-Writer", version="0.1.0")
def planner(message: Dict[str, Any], *, runtime, context):
topic = message["topic"]
prompt = (
"Design a concise research plan outlining key angles and questions."
f"Topic: {topic}"
"Return a numbered list with three focus areas."
)
plan = call_openai(prompt, system="You are a strategic project planner.")
runtime.state["plan"] = plan
runtime.emit("researcher", {"topic": topic, "plan": plan})
return plan
def researcher(message: Dict[str, Any], *, runtime, context):
prompt = (
"Given this plan, provide bullet insights (max 3 per focus area)."
f"Plan: {message['plan']}"
)
insights = call_openai(prompt, system="You are an expert analyst.")
runtime.state["insights"] = insights
runtime.emit(
"writer",
{
"topic": message["topic"],
"plan": message["plan"],
"insights": insights,
},
)
return insights
def writer(message: Dict[str, Any], *, runtime, context):
prompt = (
"Write a concise executive summary (<=120 words) in a warm, professional tone."
f"Topic: {message['topic']}"
f"Plan: {message['plan']}"
f"Insights: {message['insights']}"
)
summary = call_openai(prompt, system="You are a skilled report writer.")
runtime.record_event("summary_created", metadata={"length": len(summary)})
return {
"topic": message["topic"],
"plan": message["plan"],
"insights": message["insights"],
"summary": summary,
}
workload.add_node(planner, name="planner", role="planner")
workload.add_node(researcher, name="researcher", role="analyst")
workload.add_node(writer, name="writer", role="author")
workload.add_edge("planner", "researcher")
workload.add_edge("researcher", "writer")
return workload
# Use the multi-agent workload
multi_agent = build_multi_agent_workload()
project = {"topic": "The impact of community gardens on urban wellbeing"}
multi_report = multi_agent.execute(project, deployment_id="demo", max_steps=20)
final_document = multi_report.node_results("writer")[-1]
Platform Integration
The initialize_telemetry() function shown in the examples above connects your CodonWorkload executions to the configured endpoint. Call it once at application startup to unlock comprehensive observability.
Benefits of platform integration:
- Node-level spans: Every node execution becomes an OpenTelemetry span with input/output data
- Workload tracing: Complete execution flow visible in your observability dashboard
- Performance metrics: Track latency, error rates, and resource usage per node
- Cost attribution: Monitor token usage and API costs across different deployment environments
- Debug workflows: Inspect failed executions with full context and provenance
When you execute workloads after initializing telemetry, each node function call, runtime.emit(), and runtime.record_event() creates structured telemetry data that appears in your Codon dashboard. This gives you the same observability benefits that LangGraph users get automatically.
Configuration: See Getting Started - Initializing Telemetry for configuration options.
Telemetry Configuration
Direct span emission: To emit OpenTelemetry spans directly from CodonWorkload, set enable_tracing=True when creating your workload:
workload = CodonWorkload(name="MyAgent", version="1.0.0", enable_tracing=True)
See the CodonWorkload API reference for complete parameter details.
Environment variables and options: See Getting Started - Initializing Telemetry for configuration options.
Key Concepts
Token-Based Execution
- Each node receives a token
messagewith data from previous nodes - Nodes can emit new tokens to downstream nodes via
runtime.emit(target_node, payload) - Tokens carry immutable provenance with unique IDs, lineage, and timestamps
Runtime Operations
The runtime parameter provides access to workflow operations:
- runtime.emit(node_name, payload) - Send tokens to other nodes
- runtime.record_event(event_type, metadata={}) - Add custom audit entries
- runtime.state - Shared dictionary for coordination between nodes
- runtime.stop() - Halt execution early
Audit Ledger
Every workflow execution generates a comprehensive audit trail:
- Token enqueue/dequeue events
- Node completion events
- Custom events via runtime.record_event()
- Execution context (deployment_id, logic_id, run_id)
Note: Only fired nodes are represented in a workload run, so the complete workload definition may not be present in the workload run summary. Your audit trail shows actual execution paths, not all possible paths defined in your workflow.
Access the audit ledger through the execution report:
for event in report.ledger:
print(f"{event.timestamp.isoformat()} | {event.event_type} | {event.source_node}")
Logic IDs
Each workload gets a deterministic Logic ID based on: - Agent class metadata (name, version, description) - Node definitions and their roles - Graph topology (edges between nodes)
Same workload structure = same Logic ID, enabling deduplication and version tracking.
Example: Logic ID Generation and Changes
# Based on test_codon_workload.py test patterns
workload = CodonWorkload(name="TestWorkload", version="0.1.0")
def simple_node(message, *, runtime, context):
return f"Result: {message['input']}"
workload.add_node(simple_node, name="test_node", role="processor")
print(f"Initial Logic ID: {workload.logic_id}")
baseline_logic_id = workload.logic_id
# Adding a node changes the Logic ID
def echo_node(message, *, runtime, context):
return message
workload.add_node(echo_node, name="echo", role="responder")
workload.add_edge("test_node", "echo")
print(f"After adding node: {workload.logic_id}")
print(f"Logic ID changed? {workload.logic_id != baseline_logic_id}") # True
This deterministic identification enables:
- Deduplication: Skip redundant executions of the same logic
- Version tracking: Compare agent iterations across deployments
- Caching: Store and retrieve results based on stable identifiers