Skip to content

API Reference

View source on GitHub

Core SDK (codon_sdk)

Agents

CodonWorkload

Bases: Workload

Default Workload implementation for authoring agents from scratch.

Source code in sdk/src/codon_sdk/agents/codon_workload.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
class CodonWorkload(Workload):
    """Default Workload implementation for authoring agents from scratch."""

    def __init__(
        self,
        *,
        name: str,
        version: str,
        description: Optional[str] = None,
        tags: Optional[Sequence[str]] = None,
        enable_tracing: bool = False,
    ) -> None:
        """Initialize workload with metadata.

        Args:
            name: Unique identifier for the workload type.
            version: Semantic version (e.g., "1.2.0").
            description: Human-readable purpose description.
            tags: Categorization tags.
            enable_tracing: When True, emit OpenTelemetry spans directly from the
                CodonWorkload runtime. Defaults to False to avoid double
                instrumentation when external wrappers (e.g., LangGraph) are used.
        """
        self._node_specs: Dict[str, NodeSpec] = {}
        self._node_functions: Dict[str, Callable[..., Any]] = {}
        self._edges: Set[Tuple[str, str]] = set()
        self._predecessors: DefaultDict[str, Set[str]] = defaultdict(set)
        self._successors: DefaultDict[str, Set[str]] = defaultdict(set)
        self._agent_class_id: Optional[str] = None
        self._logic_id: Optional[str] = None
        self._entry_nodes: Optional[List[str]] = None
        self._organization_id: Optional[str] = os.getenv("ORG_NAMESPACE")
        self._enable_tracing = enable_tracing
        super().__init__(
            name=name,
            version=version,
            description=description,
            tags=tags,
        )

    @property
    def agent_class_id(self) -> str:
        if self._agent_class_id is None:
            raise WorkloadRegistrationError("Agent class ID has not been computed")
        return self._agent_class_id

    @property
    def logic_id(self) -> str:
        if self._logic_id is None:
            raise WorkloadRegistrationError("Logic ID has not been computed")
        return self._logic_id

    @property
    def organization_id(self) -> Optional[str]:
        if self._organization_id:
            return self._organization_id
        if self._node_specs:
            return next(iter(self._node_specs.values())).org_namespace
        return os.getenv("ORG_NAMESPACE")

    @property
    def nodes(self) -> Sequence[NodeSpec]:
        return tuple(self._node_specs.values())

    @property
    def topology(self) -> Iterable[Tuple[str, str]]:
        return tuple(sorted(self._edges))

    def _register_logic_group(self) -> None:
        self._agent_class_id = self._compute_agent_class_id()
        self._update_logic_identity()

    def add_node(
        self,
        function: Callable[..., Any],
        name: str,
        role: str,
        *,
        org_namespace: Optional[str] = None,
        model_name: Optional[str] = None,
        model_version: Optional[str] = None,
    ) -> NodeSpec:
        """Add a node (computational step) to the workload.

        Args:
            function: The Python function to execute for this node.
            name: Unique identifier for this node within the workload.
            role: The node's role in the workflow (e.g., "processor", "validator").
            org_namespace: Organization namespace for scoping. Defaults to ORG_NAMESPACE env var.
            model_name: Optional model identifier if this node uses an AI model.
            model_version: Optional model version if this node uses an AI model.

        Returns:
            The generated NodeSpec with a unique ID.

        Raises:
            WorkloadRegistrationError: If a node with this name already exists.
        """
        if name in self._node_specs:
            raise WorkloadRegistrationError(f"Node '{name}' already registered")

        nodespec = NodeSpec(
            name=name,
            role=role,
            callable=function,
            org_namespace=org_namespace,
            model_name=model_name,
            model_version=model_version,
        )
        self._node_specs[name] = nodespec
        self._node_functions[name] = function
        self._predecessors.setdefault(name, set())
        self._successors.setdefault(name, set())
        self._organization_id = nodespec.org_namespace
        self._update_logic_identity()
        return nodespec

    def add_edge(self, source_name: str, destination_name: str) -> None:
        """Add an edge between two nodes in the workload.

        Args:
            source_name: Name of the source node.
            destination_name: Name of the destination node.

        Raises:
            WorkloadRegistrationError: If either node name doesn't exist.
        """
        if source_name not in self._node_specs:
            raise WorkloadRegistrationError(f"Unknown source node '{source_name}'")
        if destination_name not in self._node_specs:
            raise WorkloadRegistrationError(
                f"Unknown destination node '{destination_name}'"
            )

        edge = (source_name, destination_name)
        if edge in self._edges:
            return

        self._edges.add(edge)
        self._successors[source_name].add(destination_name)
        self._predecessors[destination_name].add(source_name)
        self._update_logic_identity()

    async def execute_async(
        self,
        payload: Any,
        *,
        deployment_id: str,
        entry_nodes: Optional[Sequence[str]] = None,
        max_steps: int = 1000,
        event_handler: Optional[Callable[[StreamEvent], Awaitable[None]]] = None,
        **kwargs: Any,
    ) -> ExecutionReport:
        if not deployment_id:
            raise ValueError("deployment_id is required when executing a workload")
        if not self._node_specs:
            raise WorkloadRuntimeError("No nodes have been registered")

        if entry_nodes is not None:
            active_entry_nodes = list(entry_nodes)
        elif self._entry_nodes:
            active_entry_nodes = list(self._entry_nodes)
        else:
            entry_candidates = [
                name for name, preds in self._predecessors.items() if not preds
            ]
            active_entry_nodes = entry_candidates or list(self._node_specs.keys())
        for node in active_entry_nodes:
            if node not in self._node_specs:
                raise WorkloadRuntimeError(f"Entry node '{node}' is not registered")

        run_id = str(uuid4())
        context: Dict[str, Any] = {
            "deployment_id": deployment_id,
            "workload_logic_id": self.logic_id,
            "logic_id": self.logic_id,
            "workload_id": self.agent_class_id,
            "workload_run_id": run_id,
            "run_id": run_id,
            "workload_name": self.metadata.name,
            "organization_id": self.organization_id,
            "org_namespace": self.organization_id,
            "workload_version": self.metadata.version,
            **kwargs,
        }

        ledger: List[AuditEvent] = []
        records: DefaultDict[str, List[NodeExecutionRecord]] = defaultdict(list)
        queue_deque: Deque[Tuple[str, Token]] = deque()
        state: Dict[str, Any] = {}

        loop = asyncio.get_running_loop()
        tracer = trace.get_tracer(__name__) if self._enable_tracing else None

        async def publish_event(event_type: str, **data: Any) -> None:
            if event_handler is None:
                return
            await event_handler(StreamEvent(event_type=event_type, data=data))

        def schedule_event(event_type: str, data: Dict[str, Any]) -> None:
            if event_handler is None:
                return
            loop.create_task(event_handler(StreamEvent(event_type=event_type, data=data)))

        def enqueue(
            source_node: Optional[str],
            target_node: str,
            token: Token,
            *,
            audit_metadata: Dict[str, Any],
        ) -> None:
            queue_deque.append((target_node, token))
            ledger.append(
                AuditEvent(
                    event_type="token_enqueued",
                    timestamp=_utcnow(),
                    token_id=token.id,
                    source_node=source_node,
                    target_node=target_node,
                    metadata={
                        "payload_repr": repr(token.payload),
                        **audit_metadata,
                    },
                )
            )
            schedule_event(
                "token_enqueued",
                {
                    "source_node": source_node,
                    "target_node": target_node,
                    "token_id": token.id,
                    "payload": token.payload,
                    "metadata": {"payload_repr": repr(token.payload), **audit_metadata},
                },
            )

        for node in active_entry_nodes:
            seed_token = Token(
                id=str(uuid4()),
                payload=payload,
                origin="__entry__",
                parent_id=None,
                lineage=(),
            )
            enqueue("__entry__", node, seed_token, audit_metadata={"seed": True})

        steps = 0

        while queue_deque:
            node_name, token = queue_deque.popleft()
            ledger.append(
                AuditEvent(
                    event_type="token_dequeued",
                    timestamp=_utcnow(),
                    token_id=token.id,
                    source_node=token.origin,
                    target_node=node_name,
                    metadata={"payload_repr": repr(token.payload)},
                )
            )
            await publish_event(
                "token_dequeued",
                source_node=token.origin,
                target_node=node_name,
                token_id=token.id,
                payload=token.payload,
            )

            if steps >= max_steps:
                raise WorkloadRuntimeError(
                    f"Maximum step count {max_steps} reached; possible infinite loop"
                )
            steps += 1

            nodespec = self._node_specs[node_name]
            telemetry = NodeTelemetryPayload(
                workload_id=self.agent_class_id,
                workload_name=self.metadata.name,
                workload_version=self.metadata.version,
                workload_logic_id=self.logic_id,
                workload_run_id=run_id,
                deployment_id=deployment_id,
                organization_id=self.organization_id,
                org_namespace=self.organization_id,
                nodespec_id=nodespec.id,
                node_name=nodespec.name,
                node_role=nodespec.role,
                model_name=nodespec.model_name,
            )
            telemetry.node_input = _render_payload(token.payload)

            node_callable = self._node_functions[node_name]
            handle = _RuntimeHandle(
                workload=self,
                current_node=node_name,
                token=token,
                enqueue=enqueue,
                ledger=ledger,
                state=state,
                telemetry=telemetry,
                schedule_event=schedule_event,
            )

            started_at = _utcnow()
            span_ctx = (
                tracer.start_as_current_span(f"codon.node.{node_name}")
                if tracer
                else contextlib.nullcontext()
            )
            await publish_event(
                "node_started",
                node=node_name,
                token_id=token.id,
                payload=token.payload,
            )
            with span_ctx as span:
                if span is not None:
                    _apply_nodespec_attributes(span, nodespec)
                    _apply_workload_attributes(span, telemetry=telemetry, nodespec=nodespec, context=context)
                try:
                    result = node_callable(token.payload, runtime=handle, context=context)
                    if inspect.isawaitable(result):
                        result = await result  # type: ignore[assignment]
                except Exception as exc:  # pragma: no cover
                    telemetry.status_code = "ERROR"
                    telemetry.error_message = repr(exc)
                    if span is not None:
                        _apply_workload_attributes(span, telemetry=telemetry, nodespec=nodespec, context=context)
                        span.set_status(Status(StatusCode.ERROR, str(exc)))
                    ledger.append(
                        AuditEvent(
                            event_type="node_failed",
                            timestamp=_utcnow(),
                            token_id=token.id,
                            source_node=node_name,
                            target_node=None,
                            metadata={"exception": repr(exc)},
                        )
                    )
                    schedule_event(
                        "node_failed",
                        {
                            "node": node_name,
                            "token_id": token.id,
                            "exception": repr(exc),
                        },
                    )
                    raise WorkloadRuntimeError(
                        f"Node '{node_name}' execution failed"
                    ) from exc
                finished_at = _utcnow()
                telemetry.duration_ms = int((finished_at - started_at).total_seconds() * 1000)
                telemetry.status_code = "OK"
                telemetry.node_output = _render_payload(result)
                if span is not None:
                    _apply_workload_attributes(span, telemetry=telemetry, nodespec=nodespec, context=context)
                    span.set_status(Status(StatusCode.OK))

            records[node_name].append(
                NodeExecutionRecord(
                    node=node_name,
                    token_id=token.id,
                    result=result,
                    started_at=started_at,
                    finished_at=finished_at,
                )
            )

            ledger.append(
                AuditEvent(
                    event_type="node_completed",
                    timestamp=_utcnow(),
                    token_id=token.id,
                    source_node=node_name,
                    target_node=None,
                    metadata={
                        "result_repr": telemetry.node_output,
                        "emissions": handle.emissions,
                    },
                )
            )
            await publish_event(
                "node_completed",
                node=node_name,
                token_id=token.id,
                result=result,
                emissions=handle.emissions,
            )

            if handle.stop_requested:
                ledger.append(
                    AuditEvent(
                        event_type="runtime_stopped",
                        timestamp=_utcnow(),
                        token_id=token.id,
                        source_node=node_name,
                        target_node=None,
                        metadata={"reason": "stop_requested"},
                    )
                )
                schedule_event(
                    "runtime_stopped",
                    {
                        "node": node_name,
                        "token_id": token.id,
                        "reason": "stop_requested",
                    },
                )
                break

        report = ExecutionReport(
            results=dict(records),
            ledger=ledger,
            run_id=run_id,
            context=context,
        )
        await publish_event("workflow_finished", report=report)
        return report

    def execute(
        self,
        payload: Any,
        *,
        deployment_id: str,
        entry_nodes: Optional[Sequence[str]] = None,
        max_steps: int = 1000,
        **kwargs: Any,
    ) -> ExecutionReport:
        return _run_coroutine_sync(
            lambda: self.execute_async(
                payload,
                deployment_id=deployment_id,
                entry_nodes=entry_nodes,
                max_steps=max_steps,
                **kwargs,
            )
        )

    async def execute_streaming_async(
        self,
        payload: Any,
        *,
        deployment_id: str,
        entry_nodes: Optional[Sequence[str]] = None,
        max_steps: int = 1000,
        **kwargs: Any,
    ) -> AsyncIterator[StreamEvent]:
        queue_async: asyncio.Queue[Any] = asyncio.Queue()
        sentinel = object()
        error: List[BaseException] = []

        async def handler(event: StreamEvent) -> None:
            await queue_async.put(event)

        async def runner() -> None:
            try:
                await self.execute_async(
                    payload,
                    deployment_id=deployment_id,
                    entry_nodes=entry_nodes,
                    max_steps=max_steps,
                    event_handler=handler,
                    **kwargs,
                )
            except Exception as exc:  # pragma: no cover - surfaced to consumer
                error.append(exc)
            finally:
                await queue_async.put(sentinel)

        task = asyncio.create_task(runner())

        try:
            while True:
                item = await queue_async.get()
                if item is sentinel:
                    break
                yield item
            if error:
                raise error[0]
        finally:
            if not task.done():
                task.cancel()
                with contextlib.suppress(Exception):
                    await task

    def execute_streaming(
        self,
        payload: Any,
        *,
        deployment_id: str,
        entry_nodes: Optional[Sequence[str]] = None,
        max_steps: int = 1000,
        **kwargs: Any,
    ) -> Iterator[StreamEvent]:
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            pass
        else:
            raise RuntimeError(
                "execute_streaming cannot be called from an active event loop; "
                "use execute_streaming_async instead"
            )

        q: "queue.Queue[Any]" = queue.Queue()
        sentinel = object()
        error: List[BaseException] = []

        async def producer() -> None:
            try:
                async for event in self.execute_streaming_async(
                    payload,
                    deployment_id=deployment_id,
                    entry_nodes=entry_nodes,
                    max_steps=max_steps,
                    **kwargs,
                ):
                    q.put(event)
            except Exception as exc:  # pragma: no cover - surfaced to consumer
                error.append(exc)
            finally:
                q.put(sentinel)

        def run_loop() -> None:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                loop.run_until_complete(producer())
            finally:
                loop.close()

        thread = threading.Thread(target=run_loop, daemon=True)
        thread.start()
        try:
            while True:
                item = q.get()
                if item is sentinel:
                    break
                yield item
            if error:
                raise error[0]
        finally:
            thread.join()

    def _compute_agent_class_id(self) -> str:
        meta = self.metadata
        slug = meta.name.strip().lower().replace(" ", "-")
        return f"{slug}:{meta.version}"

    def _update_logic_identity(self) -> None:
        agent_class = AgentClass(
            name=self.metadata.name,
            version=self.metadata.version,
            description=self.metadata.description or "",
        )
        topology = Topology(
            edges=[
                NodeEdge(
                    source_nodespec_id=self._node_specs[src].id,
                    target_nodespec_id=self._node_specs[dest].id,
                )
                for src, dest in sorted(self._edges)
            ]
        )
        request = LogicRequest(
            agent_class=agent_class,
            nodes=list(self._node_specs.values()),
            topology=topology,
        )
        self._logic_id = generate_logic_id(request)

__init__(*, name: str, version: str, description: Optional[str] = None, tags: Optional[Sequence[str]] = None, enable_tracing: bool = False) -> None

Initialize workload with metadata.

Parameters:

Name Type Description Default
name str

Unique identifier for the workload type.

required
version str

Semantic version (e.g., "1.2.0").

required
description Optional[str]

Human-readable purpose description.

None
tags Optional[Sequence[str]]

Categorization tags.

None
enable_tracing bool

When True, emit OpenTelemetry spans directly from the CodonWorkload runtime. Defaults to False to avoid double instrumentation when external wrappers (e.g., LangGraph) are used.

False
Source code in sdk/src/codon_sdk/agents/codon_workload.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def __init__(
    self,
    *,
    name: str,
    version: str,
    description: Optional[str] = None,
    tags: Optional[Sequence[str]] = None,
    enable_tracing: bool = False,
) -> None:
    """Initialize workload with metadata.

    Args:
        name: Unique identifier for the workload type.
        version: Semantic version (e.g., "1.2.0").
        description: Human-readable purpose description.
        tags: Categorization tags.
        enable_tracing: When True, emit OpenTelemetry spans directly from the
            CodonWorkload runtime. Defaults to False to avoid double
            instrumentation when external wrappers (e.g., LangGraph) are used.
    """
    self._node_specs: Dict[str, NodeSpec] = {}
    self._node_functions: Dict[str, Callable[..., Any]] = {}
    self._edges: Set[Tuple[str, str]] = set()
    self._predecessors: DefaultDict[str, Set[str]] = defaultdict(set)
    self._successors: DefaultDict[str, Set[str]] = defaultdict(set)
    self._agent_class_id: Optional[str] = None
    self._logic_id: Optional[str] = None
    self._entry_nodes: Optional[List[str]] = None
    self._organization_id: Optional[str] = os.getenv("ORG_NAMESPACE")
    self._enable_tracing = enable_tracing
    super().__init__(
        name=name,
        version=version,
        description=description,
        tags=tags,
    )

add_edge(source_name: str, destination_name: str) -> None

Add an edge between two nodes in the workload.

Parameters:

Name Type Description Default
source_name str

Name of the source node.

required
destination_name str

Name of the destination node.

required

Raises:

Type Description
WorkloadRegistrationError

If either node name doesn't exist.

Source code in sdk/src/codon_sdk/agents/codon_workload.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def add_edge(self, source_name: str, destination_name: str) -> None:
    """Add an edge between two nodes in the workload.

    Args:
        source_name: Name of the source node.
        destination_name: Name of the destination node.

    Raises:
        WorkloadRegistrationError: If either node name doesn't exist.
    """
    if source_name not in self._node_specs:
        raise WorkloadRegistrationError(f"Unknown source node '{source_name}'")
    if destination_name not in self._node_specs:
        raise WorkloadRegistrationError(
            f"Unknown destination node '{destination_name}'"
        )

    edge = (source_name, destination_name)
    if edge in self._edges:
        return

    self._edges.add(edge)
    self._successors[source_name].add(destination_name)
    self._predecessors[destination_name].add(source_name)
    self._update_logic_identity()

add_node(function: Callable[..., Any], name: str, role: str, *, org_namespace: Optional[str] = None, model_name: Optional[str] = None, model_version: Optional[str] = None) -> NodeSpec

Add a node (computational step) to the workload.

Parameters:

Name Type Description Default
function Callable[..., Any]

The Python function to execute for this node.

required
name str

Unique identifier for this node within the workload.

required
role str

The node's role in the workflow (e.g., "processor", "validator").

required
org_namespace Optional[str]

Organization namespace for scoping. Defaults to ORG_NAMESPACE env var.

None
model_name Optional[str]

Optional model identifier if this node uses an AI model.

None
model_version Optional[str]

Optional model version if this node uses an AI model.

None

Returns:

Type Description
NodeSpec

The generated NodeSpec with a unique ID.

Raises:

Type Description
WorkloadRegistrationError

If a node with this name already exists.

Source code in sdk/src/codon_sdk/agents/codon_workload.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def add_node(
    self,
    function: Callable[..., Any],
    name: str,
    role: str,
    *,
    org_namespace: Optional[str] = None,
    model_name: Optional[str] = None,
    model_version: Optional[str] = None,
) -> NodeSpec:
    """Add a node (computational step) to the workload.

    Args:
        function: The Python function to execute for this node.
        name: Unique identifier for this node within the workload.
        role: The node's role in the workflow (e.g., "processor", "validator").
        org_namespace: Organization namespace for scoping. Defaults to ORG_NAMESPACE env var.
        model_name: Optional model identifier if this node uses an AI model.
        model_version: Optional model version if this node uses an AI model.

    Returns:
        The generated NodeSpec with a unique ID.

    Raises:
        WorkloadRegistrationError: If a node with this name already exists.
    """
    if name in self._node_specs:
        raise WorkloadRegistrationError(f"Node '{name}' already registered")

    nodespec = NodeSpec(
        name=name,
        role=role,
        callable=function,
        org_namespace=org_namespace,
        model_name=model_name,
        model_version=model_version,
    )
    self._node_specs[name] = nodespec
    self._node_functions[name] = function
    self._predecessors.setdefault(name, set())
    self._successors.setdefault(name, set())
    self._organization_id = nodespec.org_namespace
    self._update_logic_identity()
    return nodespec

ExecutionReport dataclass

Execution summary returned by :meth:CodonWorkload.execute.

Source code in sdk/src/codon_sdk/agents/codon_workload.py
211
212
213
214
215
216
217
218
219
220
221
@dataclass
class ExecutionReport:
    """Execution summary returned by :meth:`CodonWorkload.execute`."""

    results: Dict[str, List[NodeExecutionRecord]]
    ledger: List[AuditEvent]
    run_id: str
    context: Dict[str, Any]

    def node_results(self, node: str) -> List[Any]:
        return [record.result for record in self.results.get(node, [])]

Instrumentation

initialize_telemetry(api_key: Optional[str] = None, service_name: Optional[str] = None, endpoint: Optional[str] = None, attach_to_existing: Optional[bool] = None) -> None

Initialize OpenTelemetry tracing for Codon.

Endpoint precedence: explicit argument, OTEL_EXPORTER_OTLP_ENDPOINT, then production default. API key precedence: explicit argument, then CODON_API_KEY environment variable. When provided, the API key is sent as x-codon-api-key on OTLP requests.

Source code in sdk/src/codon_sdk/instrumentation/config.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def initialize_telemetry(
    api_key: Optional[str] = None,
    service_name: Optional[str] = None,
    endpoint: Optional[str] = None,
    attach_to_existing: Optional[bool] = None,
) -> None:
    """Initialize OpenTelemetry tracing for Codon.

    Endpoint precedence: explicit argument, ``OTEL_EXPORTER_OTLP_ENDPOINT``, then
    production default. API key precedence: explicit argument, then
    ``CODON_API_KEY`` environment variable. When provided, the API key is sent
    as ``x-codon-api-key`` on OTLP requests.
    """

    attach = (
        attach_to_existing
        if attach_to_existing is not None
        else _coerce_bool(os.getenv("CODON_ATTACH_TO_EXISTING_OTEL_PROVIDER"))
    ) or False

    existing_provider = trace.get_tracer_provider()

    final_api_key = api_key or os.getenv("CODON_API_KEY")
    final_service_name = (
        service_name
        or os.getenv("OTEL_SERVICE_NAME")
        or "unknown_codon_service"
    )
    final_endpoint = (
        endpoint
        or os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
        or DEFAULT_INGEST_ENDPOINT
    )

    headers: Dict[str, str] = {}
    if final_api_key:
        headers["x-codon-api-key"] = final_api_key
    else:
        logger.warning(
            "CODON telemetry initialized without an API key; spans may be rejected by the gateway"
        )

    resource = Resource(attributes={"service.name": final_service_name})
    exporter = OTLPSpanExporter(endpoint=final_endpoint, headers=headers)

    if attach and isinstance(existing_provider, TracerProvider):
        processor = BatchSpanProcessor(exporter)
        # Avoid double-adding an equivalent processor if initialise is called repeatedly.
        if not _has_equivalent_processor(existing_provider, exporter):
            existing_provider.add_span_processor(processor)
        return

    provider = TracerProvider(resource=resource)
    provider.add_span_processor(BatchSpanProcessor(exporter))

    trace.set_tracer_provider(provider)

Instrumentation Schemas

NodeSpec

Bases: BaseModel

Immutable specification that introspects Python callables and generates stable SHA-256 identifiers.

NodeSpec inspects Python callables to capture the function signature, type hints, and optional model metadata. It emits a deterministic SHA-256 ID that downstream systems can rely on.

NodeSpec requires type annotations to build JSON schemas for inputs and outputs. If annotations are missing, the generated schemas may be empty.

Source code in sdk/src/codon_sdk/instrumentation/schemas/nodespec/__init__.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class NodeSpec(BaseModel):
    """Immutable specification that introspects Python callables and generates stable SHA-256 identifiers.

    NodeSpec inspects Python callables to capture the function signature, type hints, and optional 
    model metadata. It emits a deterministic SHA-256 ID that downstream systems can rely on.

    NodeSpec requires type annotations to build JSON schemas for inputs and outputs. If annotations 
    are missing, the generated schemas may be empty.
    """
    model_config = ConfigDict(extra="forbid", frozen=True)
    id: str = Field(
        default=None, description="The NodeSpec ID generated from the NodeSpec."
    )
    spec_version: str = Field(
        default=__version__,
        description="The current version of the NodeSpec specification.",
    )
    org_namespace: str = Field(
        default=os.getenv(
            nodespec_env.OrgNamespace,
            nodespec_env.OrgNamespaceDefault
        ),
        description="The namespace of the calling organization.",
    )
    name: str = Field(description="The name of the node.")
    role: str = Field(description="The role of the node.")
    callable_signature: str = Field(description="The callable signature of the node.")
    input_schema: Optional[str] = Field(
        default=None, description="The input schema of the node."
    )
    output_schema: Optional[str] = Field(
        default=None, description="The output schema of the node."
    )
    model_name: Optional[str] = Field(
        default=None, description="The name of the model used in the node."
    )
    model_version: Optional[str] = Field(
        default=None, description="The version of the model currently used."
    )

    @override
    def __init__(
        self,
        name: str,
        role: str,
        callable: Callable[..., Any],
        org_namespace: Optional[str] = None,
        model_name: Optional[str] = None,
        model_version: Optional[str] = None,
        **kwargs,
    ):
        """Create a NodeSpec by introspecting a Python callable.

        Args:
            name: The name of the node.
            role: The role of the node.
            callable: The Python function to introspect.
            org_namespace: The namespace of the calling organization. Defaults to ORG_NAMESPACE env var.
            model_name: The name of the model used in the node.
            model_version: The version of the model currently used.
            **kwargs: Additional fields for the NodeSpec.

        Raises:
            NodeSpecValidationError: If ORG_NAMESPACE environment variable not set.

        Example:
            >>> nodespec = NodeSpec(
            ...     org_namespace="acme",
            ...     name="summarize", 
            ...     role="processor",
            ...     callable=summarize_function,
            ...     model_name="gpt-4o",
            ...     model_version="2024-05-13"
            ... )
            >>> print(nodespec.id)
        """

        callable_attrs = analyze_function(callable)
        namespace = org_namespace or os.getenv(nodespec_env.OrgNamespace)
        if not namespace:
            raise NodeSpecValidationError(
                f"{nodespec_env.OrgNamespace} environment variable not set."
            )

        nodespec_id = self._generate_nodespec_id(
            callable_attrs=callable_attrs,
            org_namespace=namespace,
            name=name,
            role=role,
            model_name=model_name,
            model_version=model_version,
        )

        super().__init__(
            id=nodespec_id,
            org_namespace=namespace,
            name=name,
            role=role,
            callable_signature=callable_attrs.callable_signature,
            input_schema=callable_attrs.input_schema,
            output_schema=callable_attrs.output_schema,
            model_name=model_name,
            model_version=model_version,
            **kwargs,
        )

    @field_validator("spec_version", mode="before")
    @classmethod
    def _enforce_current_spec_version(cls, v: Any, info: Any) -> str:
        """This validator ensures that the spec_version used is the official one and won't be overridden."""
        if "spec_version" in info.data:
            raise NodeSpecValidationError("spec_version cannot be changed.")

        return __version__

    def _generate_nodespec_id(
        self,
        callable_attrs: FunctionAnalysisResult,
        org_namespace: str,
        name: str,
        role: str,
        model_name: Optional[str] = None,
        model_version: Optional[str] = None,
    ) -> str:
        """
        Generates a unique identifier for the node specification.
        """
        callable_attrs: Dict[str, str] = callable_attrs.model_dump(
            mode="json", exclude_none=True
        )
        nodespec_meta_attrs: Dict[str, str] = {
            "org_namespace": org_namespace,
            "name": name,
            "role": role,
        }
        if model_name:
            nodespec_meta_attrs["model_name"] = model_name
        if model_version:
            nodespec_meta_attrs["model_version"] = model_version

        canonical_spec: str = json.dumps(
            {**callable_attrs, **nodespec_meta_attrs},
            sort_keys=True,
            separators=(",", ":"),
        )

        to_hash: str = canonical_spec.strip()
        nodespec_id: str = nodespec_hash_method(hashable_string=to_hash)

        return nodespec_id

__init__(name: str, role: str, callable: Callable[..., Any], org_namespace: Optional[str] = None, model_name: Optional[str] = None, model_version: Optional[str] = None, **kwargs)

Create a NodeSpec by introspecting a Python callable.

Parameters:

Name Type Description Default
name str

The name of the node.

required
role str

The role of the node.

required
callable Callable[..., Any]

The Python function to introspect.

required
org_namespace Optional[str]

The namespace of the calling organization. Defaults to ORG_NAMESPACE env var.

None
model_name Optional[str]

The name of the model used in the node.

None
model_version Optional[str]

The version of the model currently used.

None
**kwargs

Additional fields for the NodeSpec.

{}

Raises:

Type Description
NodeSpecValidationError

If ORG_NAMESPACE environment variable not set.

Example

nodespec = NodeSpec( ... org_namespace="acme", ... name="summarize", ... role="processor", ... callable=summarize_function, ... model_name="gpt-4o", ... model_version="2024-05-13" ... ) print(nodespec.id)

Source code in sdk/src/codon_sdk/instrumentation/schemas/nodespec/__init__.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@override
def __init__(
    self,
    name: str,
    role: str,
    callable: Callable[..., Any],
    org_namespace: Optional[str] = None,
    model_name: Optional[str] = None,
    model_version: Optional[str] = None,
    **kwargs,
):
    """Create a NodeSpec by introspecting a Python callable.

    Args:
        name: The name of the node.
        role: The role of the node.
        callable: The Python function to introspect.
        org_namespace: The namespace of the calling organization. Defaults to ORG_NAMESPACE env var.
        model_name: The name of the model used in the node.
        model_version: The version of the model currently used.
        **kwargs: Additional fields for the NodeSpec.

    Raises:
        NodeSpecValidationError: If ORG_NAMESPACE environment variable not set.

    Example:
        >>> nodespec = NodeSpec(
        ...     org_namespace="acme",
        ...     name="summarize", 
        ...     role="processor",
        ...     callable=summarize_function,
        ...     model_name="gpt-4o",
        ...     model_version="2024-05-13"
        ... )
        >>> print(nodespec.id)
    """

    callable_attrs = analyze_function(callable)
    namespace = org_namespace or os.getenv(nodespec_env.OrgNamespace)
    if not namespace:
        raise NodeSpecValidationError(
            f"{nodespec_env.OrgNamespace} environment variable not set."
        )

    nodespec_id = self._generate_nodespec_id(
        callable_attrs=callable_attrs,
        org_namespace=namespace,
        name=name,
        role=role,
        model_name=model_name,
        model_version=model_version,
    )

    super().__init__(
        id=nodespec_id,
        org_namespace=namespace,
        name=name,
        role=role,
        callable_signature=callable_attrs.callable_signature,
        input_schema=callable_attrs.input_schema,
        output_schema=callable_attrs.output_schema,
        model_name=model_name,
        model_version=model_version,
        **kwargs,
    )

LogicRequest

Bases: BaseModel

Source code in sdk/src/codon_sdk/instrumentation/schemas/logic_id/__init__.py
36
37
38
39
class LogicRequest(BaseModel):
  agent_class: AgentClass = Field("The Agentic Class of the Logic Workload")
  nodes: List[NodeSpec]
  topology: Topology = Field(default_factory=Topology, description="A list of edges between nodes describing a workload")

Instrumentation Packages

LangGraph Integration (codon-instrumentation-langgraph)

initialize_telemetry(api_key: Optional[str] = None, service_name: Optional[str] = None, endpoint: Optional[str] = None, attach_to_existing: Optional[bool] = None) -> None

Initialize OpenTelemetry tracing for Codon.

Endpoint precedence: explicit argument, OTEL_EXPORTER_OTLP_ENDPOINT, then production default. API key precedence: explicit argument, then CODON_API_KEY environment variable. When provided, the API key is sent as x-codon-api-key on OTLP requests.

Source code in sdk/src/codon_sdk/instrumentation/config.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def initialize_telemetry(
    api_key: Optional[str] = None,
    service_name: Optional[str] = None,
    endpoint: Optional[str] = None,
    attach_to_existing: Optional[bool] = None,
) -> None:
    """Initialize OpenTelemetry tracing for Codon.

    Endpoint precedence: explicit argument, ``OTEL_EXPORTER_OTLP_ENDPOINT``, then
    production default. API key precedence: explicit argument, then
    ``CODON_API_KEY`` environment variable. When provided, the API key is sent
    as ``x-codon-api-key`` on OTLP requests.
    """

    attach = (
        attach_to_existing
        if attach_to_existing is not None
        else _coerce_bool(os.getenv("CODON_ATTACH_TO_EXISTING_OTEL_PROVIDER"))
    ) or False

    existing_provider = trace.get_tracer_provider()

    final_api_key = api_key or os.getenv("CODON_API_KEY")
    final_service_name = (
        service_name
        or os.getenv("OTEL_SERVICE_NAME")
        or "unknown_codon_service"
    )
    final_endpoint = (
        endpoint
        or os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
        or DEFAULT_INGEST_ENDPOINT
    )

    headers: Dict[str, str] = {}
    if final_api_key:
        headers["x-codon-api-key"] = final_api_key
    else:
        logger.warning(
            "CODON telemetry initialized without an API key; spans may be rejected by the gateway"
        )

    resource = Resource(attributes={"service.name": final_service_name})
    exporter = OTLPSpanExporter(endpoint=final_endpoint, headers=headers)

    if attach and isinstance(existing_provider, TracerProvider):
        processor = BatchSpanProcessor(exporter)
        # Avoid double-adding an equivalent processor if initialise is called repeatedly.
        if not _has_equivalent_processor(existing_provider, exporter):
            existing_provider.add_span_processor(processor)
        return

    provider = TracerProvider(resource=resource)
    provider.add_span_processor(BatchSpanProcessor(exporter))

    trace.set_tracer_provider(provider)

LangGraphWorkloadAdapter

Factory helpers for building Codon workloads from LangGraph graphs.

Source code in instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/adapter.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class LangGraphWorkloadAdapter:
    """Factory helpers for building Codon workloads from LangGraph graphs."""

    @classmethod
    def from_langgraph(
        cls,
        graph: Any,
        *,
        name: str,
        version: str,
        description: Optional[str] = None,
        tags: Optional[Sequence[str]] = None,
        org_namespace: Optional[str] = None,
        node_overrides: Optional[Mapping[str, Any]] = None,
        entry_nodes: Optional[Sequence[str]] = None,
        max_reviews: Optional[int] = None,
        compile_kwargs: Optional[Mapping[str, Any]] = None,
        runtime_config: Optional[Mapping[str, Any]] = None,
        return_artifacts: bool = False,
    ) -> Union[CodonWorkload, LangGraphAdapterResult]:
        """Create a :class:`CodonWorkload` from a LangGraph ``StateGraph``.

        Args:
            graph: A LangGraph ``StateGraph`` (preferred) or compatible object exposing
                ``nodes``/``edges``.
            name: Unique identifier for the workload type.
            version: Semantic version (e.g., "1.2.0").
            description: Human-readable purpose description.
            tags: Categorization tags.
            org_namespace: Organization namespace for scoping.
            node_overrides: Optional mapping of node names to override configurations.
            entry_nodes: Optional sequence of entry node names.
            max_reviews: Optional maximum number of reviews.
            compile_kwargs: Optional keyword arguments forwarded to ``graph.compile(...)`` so
                you can attach checkpointers, memory stores, or other runtime extras.
            runtime_config: Optional runtime configuration mapping.
            return_artifacts: When ``True`` return a :class:`LangGraphAdapterResult` containing the
                workload, the original state graph, and the compiled graph.

        Returns:
            CodonWorkload or LangGraphAdapterResult depending on return_artifacts parameter.
        """

        compiled, raw_nodes, raw_edges = cls._normalise_graph(
            graph, compile_kwargs=compile_kwargs
        )
        overrides = cls._normalise_overrides(node_overrides)
        node_map = cls._coerce_node_map(raw_nodes)
        raw_edge_list = cls._coerce_edges(raw_edges)

        node_names = set(node_map.keys())
        valid_edges = []
        entry_from_virtual = set()
        for src, dst in raw_edge_list:
            if src not in node_names or dst not in node_names:
                if src not in node_names and dst in node_names:
                    entry_from_virtual.add(dst)
                continue
            valid_edges.append((src, dst))

        workload = CodonWorkload(
            name=name,
            version=version,
            description=description,
            tags=tags,
        )

        successors: Dict[str, Sequence[str]] = cls._build_successor_map(valid_edges)
        predecessors: Dict[str, Sequence[str]] = cls._build_predecessor_map(valid_edges)

        for node_name, runnable in node_map.items():
            override = overrides.get(node_name)
            role = cls._derive_role(node_name, runnable, override.role if override else None)
            model_name = override.model_name if override else None
            model_version = override.model_version if override else None
            nodespec_kwargs: Dict[str, Any] = {}
            if override and override.input_schema is not None:
                nodespec_kwargs["input_schema"] = override.input_schema
            if override and override.output_schema is not None:
                nodespec_kwargs["output_schema"] = override.output_schema

            instrumented_callable = cls._wrap_node(
                node_name=node_name,
                role=role,
                runnable=runnable,
                successors=tuple(successors.get(node_name, ())),
                nodespec_target=override.callable if override else None,
                model_name=model_name,
                model_version=model_version,
                nodespec_kwargs=nodespec_kwargs or None,
            )
            workload.add_node(
                instrumented_callable,
                name=node_name,
                role=role,
                org_namespace=org_namespace,
            )

        for edge in valid_edges:
            workload.add_edge(*edge)

        workload._predecessors.update({k: set(v) for k, v in predecessors.items()})
        workload._successors.update({k: set(v) for k, v in successors.items()})

        if entry_nodes is not None:
            workload._entry_nodes = list(entry_nodes)
        else:
            inferred = [node for node, preds in predecessors.items() if not preds]
            inferred = list({*inferred, *entry_from_virtual})
            workload._entry_nodes = inferred or list(node_map.keys())

        setattr(workload, "langgraph_state_graph", graph)
        setattr(workload, "langgraph_compiled_graph", compiled)
        setattr(workload, "langgraph_compile_kwargs", dict(compile_kwargs or {}))
        setattr(workload, "langgraph_runtime_config", dict(runtime_config or {}))

        if return_artifacts:
            return LangGraphAdapterResult(
                workload=workload,
                state_graph=graph,
                compiled_graph=compiled,
            )

        return workload

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------
    @staticmethod
    def _normalise_graph(
        graph: Any, *, compile_kwargs: Optional[Mapping[str, Any]] = None
    ) -> Tuple[Any, Any, Any]:
        """Return compiled graph plus raw node/edge structures."""

        raw_nodes, raw_edges = LangGraphWorkloadAdapter._extract_nodes_edges(graph)
        compiled = graph
        if hasattr(graph, "compile"):
            kwargs = dict(compile_kwargs or {})
            compiled = graph.compile(**kwargs)
        comp_nodes, comp_edges = LangGraphWorkloadAdapter._extract_nodes_edges(compiled)

        nodes = raw_nodes or comp_nodes
        edges = raw_edges or comp_edges

        if nodes is None or edges is None:
            raise ValueError(
                "Unable to extract nodes/edges from LangGraph graph. Pass the original StateGraph or ensure the compiled graph exposes config.nodes/config.edges."
            )

        return compiled, nodes, edges

    @staticmethod
    def _extract_nodes_edges(obj: Any) -> Tuple[Optional[Any], Optional[Any]]:
        nodes = None
        edges = None

        graph_attr = getattr(obj, "graph", None)
        if graph_attr is not None:
            nodes = nodes or getattr(graph_attr, "nodes", None)
            edges = edges or getattr(graph_attr, "edges", None)

        nodes = nodes or getattr(obj, "nodes", None)
        edges = edges or getattr(obj, "edges", None)

        config = getattr(obj, "config", None)
        if config is not None:
            nodes = nodes or getattr(config, "nodes", None)
            edges = edges or getattr(config, "edges", None)
            if nodes is None and isinstance(config, Mapping):
                nodes = config.get("nodes")
            if edges is None and isinstance(config, Mapping):
                edges = config.get("edges")

        if nodes is not None and callable(getattr(nodes, "items", None)):
            nodes = dict(nodes)

        return nodes, edges

    @staticmethod
    def _coerce_node_map(nodes: Any) -> Dict[str, Any]:
        if isinstance(nodes, Mapping):
            result: Dict[str, Any] = {}
            for name, data in nodes.items():
                result[name] = LangGraphWorkloadAdapter._select_runnable(name, data)
            return result

        result: Dict[str, Any] = {}
        for item in nodes:
            if isinstance(item, tuple) and len(item) >= 2:
                name = item[0]
                data = item[1]
                result[name] = LangGraphWorkloadAdapter._select_runnable(name, data)
            else:
                raise ValueError(f"Unrecognized LangGraph node entry: {item!r}")

        return result

    @staticmethod
    def _normalise_overrides(overrides: Optional[Mapping[str, Any]]) -> Dict[str, NodeOverride]:
        if not overrides:
            return {}

        result: Dict[str, NodeOverride] = {}
        for name, value in overrides.items():
            if isinstance(value, NodeOverride):
                result[name] = value
                continue
            if isinstance(value, Mapping):
                result[name] = NodeOverride(
                    role=value.get("role"),
                    callable=value.get("callable"),
                    model_name=value.get("model_name"),
                    model_version=value.get("model_version"),
                    input_schema=value.get("input_schema"),
                    output_schema=value.get("output_schema"),
                )
                continue
            raise TypeError(
                "node_overrides values must be NodeOverride instances or mapping objects"
            )

        return result

    @staticmethod
    def _select_runnable(name: str, data: Any) -> Any:
        candidates: list[Any] = []

        if callable(data) or hasattr(data, "ainvoke") or hasattr(data, "invoke"):
            return data

        if isinstance(data, Mapping):
            for key in ("callable", "node", "value", "runnable", "invoke", "ainvoke"):
                if key in data and data[key] is not None:
                    candidates.append(data[key])
        else:
            for attr in ("callable", "node", "value", "runnable", "wrapped", "inner", "invoke", "ainvoke"):
                if hasattr(data, attr):
                    candidate = getattr(data, attr)
                    if candidate is not None and candidate is not data:
                        candidates.append(candidate)

        for candidate in candidates:
            if candidate is None:
                continue
            if callable(candidate) or hasattr(candidate, "ainvoke") or hasattr(candidate, "invoke"):
                return candidate

        raise WorkloadRuntimeError(f"Node '{name}' is not callable")

    @staticmethod
    def _coerce_edges(edges: Any) -> Sequence[Tuple[str, str]]:
        result: list[Tuple[str, str]] = []

        for item in edges:
            source = target = None
            if isinstance(item, tuple):
                if len(item) >= 2:
                    source, target = item[0], item[1]
            else:
                source = getattr(item, "source", None) or getattr(item, "start", None)
                target = getattr(item, "target", None) or getattr(item, "end", None)
                if source is None and isinstance(item, Mapping):
                    source = item.get("source")
                    target = item.get("target")

            if source is None or target is None:
                raise ValueError(f"Cannot determine edge endpoints for entry: {item!r}")

            result.append((source, target))

        return result

    @staticmethod
    def _derive_role(
        node_name: str,
        runnable: Any,
        override_role: Optional[str],
    ) -> str:
        if override_role:
            return override_role

        metadata = getattr(runnable, "metadata", None)
        if isinstance(metadata, Mapping):
            role = metadata.get("role") or metadata.get("tag")
            if isinstance(role, str):
                return role

        if "_" in node_name:
            return node_name.split("_")[0]
        return node_name

    @classmethod
    def _wrap_node(
        cls,
        *,
        node_name: str,
        role: str,
        runnable: Any,
        successors: Sequence[str],
        nodespec_target: Optional[Callable[..., Any]] = None,
        model_name: Optional[str] = None,
        model_version: Optional[str] = None,
        nodespec_kwargs: Optional[Mapping[str, Any]] = None,
    ) -> Callable[..., Any]:
        from codon.instrumentation.langgraph import track_node

        runnable = cls._unwrap_runnable(runnable)

        decorator = track_node(
            node_name=node_name,
            role=role,
            model_name=model_name,
            model_version=model_version,
            introspection_target=nodespec_target,
            nodespec_kwargs=nodespec_kwargs,
        )

        async def invoke_callable(state: Any, config: Optional[Mapping[str, Any]]) -> Any:
            if hasattr(runnable, "ainvoke"):
                try:
                    if config:
                        return await runnable.ainvoke(state, config=config)
                    return await runnable.ainvoke(state)
                except TypeError:
                    return await runnable.ainvoke(state)
            if inspect.iscoroutinefunction(runnable):
                return await runnable(state)
            if hasattr(runnable, "invoke"):
                try:
                    if config:
                        result = runnable.invoke(state, config=config)
                    else:
                        result = runnable.invoke(state)
                except TypeError:
                    result = runnable.invoke(state)
                if inspect.isawaitable(result):
                    return await result
                return result
            if callable(runnable):
                result = runnable(state)
                if inspect.isawaitable(result):
                    return await result
                return result
            raise WorkloadRuntimeError(f"Node '{node_name}' is not callable")

        @decorator
        async def node_callable(message: Any, *, runtime, context):
            if isinstance(message, Mapping) and "state" in message:
                state = message["state"]
            else:
                state = message

            workload = getattr(runtime, "_workload", None)
            base_config = None
            if workload is not None:
                base_config = getattr(workload, "langgraph_runtime_config", None)
            invocation_config = context.get("langgraph_config") if isinstance(context, Mapping) else None
            config = _merge_runtime_configs(base_config, invocation_config)

            result = await invoke_callable(state, config)

            if isinstance(result, Mapping):
                next_state: JsonDict = {**state, **result}
            else:
                next_state = {"value": result}

            for target in successors:
                runtime.emit(target, {"state": next_state})

            return next_state

        return node_callable

    @staticmethod
    def _unwrap_runnable(runnable: Any) -> Any:
        """Attempt to peel wrappers to find the actual callable runnable."""

        seen: set[int] = set()
        current = runnable

        while True:
            if current is None:
                break

            identifier = id(current)
            if identifier in seen:
                break
            seen.add(identifier)

            if hasattr(current, "ainvoke") or hasattr(current, "invoke") or callable(current):
                return current

            candidate = None
            for attr in ("callable", "node", "value", "wrapped", "inner", "runnable"):
                if hasattr(current, attr):
                    candidate = getattr(current, attr)
                    if candidate is not current:
                        break

            if candidate is None and isinstance(current, Mapping):
                for key in ("callable", "node", "value", "runnable"):
                    if key in current:
                        candidate = current[key]
                        if candidate is not current:
                            break

            if candidate is None:
                break

            current = candidate

        return runnable

    @staticmethod
    def _build_successor_map(edges: Sequence[Tuple[str, str]]) -> Dict[str, Sequence[str]]:
        successors: Dict[str, list] = defaultdict(list)
        for src, dst in edges:
            successors[src].append(dst)
        return {k: tuple(v) for k, v in successors.items()}

    @staticmethod
    def _build_predecessor_map(edges: Sequence[Tuple[str, str]]) -> Dict[str, Sequence[str]]:
        predecessors: Dict[str, list] = defaultdict(list)
        for src, dst in edges:
            predecessors[dst].append(src)
        return {k: tuple(v) for k, v in predecessors.items()}

from_langgraph(graph: Any, *, name: str, version: str, description: Optional[str] = None, tags: Optional[Sequence[str]] = None, org_namespace: Optional[str] = None, node_overrides: Optional[Mapping[str, Any]] = None, entry_nodes: Optional[Sequence[str]] = None, max_reviews: Optional[int] = None, compile_kwargs: Optional[Mapping[str, Any]] = None, runtime_config: Optional[Mapping[str, Any]] = None, return_artifacts: bool = False) -> Union[CodonWorkload, LangGraphAdapterResult] classmethod

Create a :class:CodonWorkload from a LangGraph StateGraph.

Parameters:

Name Type Description Default
graph Any

A LangGraph StateGraph (preferred) or compatible object exposing nodes/edges.

required
name str

Unique identifier for the workload type.

required
version str

Semantic version (e.g., "1.2.0").

required
description Optional[str]

Human-readable purpose description.

None
tags Optional[Sequence[str]]

Categorization tags.

None
org_namespace Optional[str]

Organization namespace for scoping.

None
node_overrides Optional[Mapping[str, Any]]

Optional mapping of node names to override configurations.

None
entry_nodes Optional[Sequence[str]]

Optional sequence of entry node names.

None
max_reviews Optional[int]

Optional maximum number of reviews.

None
compile_kwargs Optional[Mapping[str, Any]]

Optional keyword arguments forwarded to graph.compile(...) so you can attach checkpointers, memory stores, or other runtime extras.

None
runtime_config Optional[Mapping[str, Any]]

Optional runtime configuration mapping.

None
return_artifacts bool

When True return a :class:LangGraphAdapterResult containing the workload, the original state graph, and the compiled graph.

False

Returns:

Type Description
Union[CodonWorkload, LangGraphAdapterResult]

CodonWorkload or LangGraphAdapterResult depending on return_artifacts parameter.

Source code in instrumentation-packages/codon-instrumentation-langgraph/codon/instrumentation/langgraph/adapter.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@classmethod
def from_langgraph(
    cls,
    graph: Any,
    *,
    name: str,
    version: str,
    description: Optional[str] = None,
    tags: Optional[Sequence[str]] = None,
    org_namespace: Optional[str] = None,
    node_overrides: Optional[Mapping[str, Any]] = None,
    entry_nodes: Optional[Sequence[str]] = None,
    max_reviews: Optional[int] = None,
    compile_kwargs: Optional[Mapping[str, Any]] = None,
    runtime_config: Optional[Mapping[str, Any]] = None,
    return_artifacts: bool = False,
) -> Union[CodonWorkload, LangGraphAdapterResult]:
    """Create a :class:`CodonWorkload` from a LangGraph ``StateGraph``.

    Args:
        graph: A LangGraph ``StateGraph`` (preferred) or compatible object exposing
            ``nodes``/``edges``.
        name: Unique identifier for the workload type.
        version: Semantic version (e.g., "1.2.0").
        description: Human-readable purpose description.
        tags: Categorization tags.
        org_namespace: Organization namespace for scoping.
        node_overrides: Optional mapping of node names to override configurations.
        entry_nodes: Optional sequence of entry node names.
        max_reviews: Optional maximum number of reviews.
        compile_kwargs: Optional keyword arguments forwarded to ``graph.compile(...)`` so
            you can attach checkpointers, memory stores, or other runtime extras.
        runtime_config: Optional runtime configuration mapping.
        return_artifacts: When ``True`` return a :class:`LangGraphAdapterResult` containing the
            workload, the original state graph, and the compiled graph.

    Returns:
        CodonWorkload or LangGraphAdapterResult depending on return_artifacts parameter.
    """

    compiled, raw_nodes, raw_edges = cls._normalise_graph(
        graph, compile_kwargs=compile_kwargs
    )
    overrides = cls._normalise_overrides(node_overrides)
    node_map = cls._coerce_node_map(raw_nodes)
    raw_edge_list = cls._coerce_edges(raw_edges)

    node_names = set(node_map.keys())
    valid_edges = []
    entry_from_virtual = set()
    for src, dst in raw_edge_list:
        if src not in node_names or dst not in node_names:
            if src not in node_names and dst in node_names:
                entry_from_virtual.add(dst)
            continue
        valid_edges.append((src, dst))

    workload = CodonWorkload(
        name=name,
        version=version,
        description=description,
        tags=tags,
    )

    successors: Dict[str, Sequence[str]] = cls._build_successor_map(valid_edges)
    predecessors: Dict[str, Sequence[str]] = cls._build_predecessor_map(valid_edges)

    for node_name, runnable in node_map.items():
        override = overrides.get(node_name)
        role = cls._derive_role(node_name, runnable, override.role if override else None)
        model_name = override.model_name if override else None
        model_version = override.model_version if override else None
        nodespec_kwargs: Dict[str, Any] = {}
        if override and override.input_schema is not None:
            nodespec_kwargs["input_schema"] = override.input_schema
        if override and override.output_schema is not None:
            nodespec_kwargs["output_schema"] = override.output_schema

        instrumented_callable = cls._wrap_node(
            node_name=node_name,
            role=role,
            runnable=runnable,
            successors=tuple(successors.get(node_name, ())),
            nodespec_target=override.callable if override else None,
            model_name=model_name,
            model_version=model_version,
            nodespec_kwargs=nodespec_kwargs or None,
        )
        workload.add_node(
            instrumented_callable,
            name=node_name,
            role=role,
            org_namespace=org_namespace,
        )

    for edge in valid_edges:
        workload.add_edge(*edge)

    workload._predecessors.update({k: set(v) for k, v in predecessors.items()})
    workload._successors.update({k: set(v) for k, v in successors.items()})

    if entry_nodes is not None:
        workload._entry_nodes = list(entry_nodes)
    else:
        inferred = [node for node, preds in predecessors.items() if not preds]
        inferred = list({*inferred, *entry_from_virtual})
        workload._entry_nodes = inferred or list(node_map.keys())

    setattr(workload, "langgraph_state_graph", graph)
    setattr(workload, "langgraph_compiled_graph", compiled)
    setattr(workload, "langgraph_compile_kwargs", dict(compile_kwargs or {}))
    setattr(workload, "langgraph_runtime_config", dict(runtime_config or {}))

    if return_artifacts:
        return LangGraphAdapterResult(
            workload=workload,
            state_graph=graph,
            compiled_graph=compiled,
        )

    return workload