Skip to content

Python SDK Reference

The DuraGraph Python SDK provides a decorator-based interface for building AI agents and workflows. Build graphs with simple decorators, run them locally, or deploy to the control plane.

Terminal window
# Core SDK
pip install duragraph
# With LLM providers
pip install duragraph[openai] # OpenAI support
pip install duragraph[anthropic] # Anthropic support
# With vector stores
pip install duragraph[chroma] # Chroma vector store
pip install duragraph[pinecone] # Pinecone vector store
pip install duragraph[qdrant] # Qdrant vector store
# All features
pip install duragraph[all]
from duragraph import Graph, llm_node, entrypoint
@Graph(id="customer_support")
class CustomerSupportAgent:
"""A customer support agent that classifies and responds to queries."""
@entrypoint
@llm_node(model="gpt-4o-mini")
def classify(self, state):
"""Classify the customer intent."""
return {"intent": "billing"}
@llm_node(model="gpt-4o-mini")
def respond(self, state):
"""Generate a response based on intent."""
return {"response": f"I'll help you with {state['intent']}."}
# Define flow with edge operator
classify >> respond
# Run locally
agent = CustomerSupportAgent()
result = agent.run({"message": "I have a billing question"})
print(result)
# Or deploy to control plane
agent.serve("http://localhost:8081")

The @Graph decorator turns a Python class into a workflow graph.

from duragraph import Graph
@Graph(
id="my_graph", # Unique graph identifier
state_schema=MyStateClass, # Optional state schema
checkpointer=None, # Optional checkpointer for persistence
)
class MyGraph:
pass

Parameters:

  • id (str, required) - Unique identifier for the graph
  • state_schema (Type, optional) - Pydantic model or TypedDict for state validation
  • checkpointer (Checkpointer, optional) - State persistence implementation

Node decorators define executable steps in your graph.

Marks the starting node of the graph.

@entrypoint
def start(self, state):
return state

Creates a node that calls an LLM.

@llm_node(
model="gpt-4o-mini", # Model name
temperature=0.7, # Optional: sampling temperature
max_tokens=1000, # Optional: max response tokens
system_prompt="You are...", # Optional: system message
)
def generate(self, state):
# Return messages or prompt
return {"messages": [{"role": "user", "content": "Hello"}]}

Supported Providers (v0.2.0):

  • OpenAI (GPT-4, GPT-4o, GPT-3.5-turbo)
  • Anthropic (Claude 3.5 Sonnet, Claude 3 Opus, Claude 3 Haiku)

Defines a tool that can be called by LLMs or other nodes.

from duragraph import tool_node
@tool_node(
name="web_search",
description="Search the web for information",
)
def search(self, query: str, max_results: int = 5):
"""Execute a web search."""
results = perform_search(query, limit=max_results)
return {"results": results}

Creates a conditional routing node.

from duragraph import router_node
@router_node
def route_by_intent(self, state):
"""Route based on classified intent."""
intent = state.get("intent")
if intent == "billing":
return "billing_handler"
elif intent == "support":
return "support_handler"
else:
return "general_handler"

Creates a human-in-the-loop node that pauses for human input.

from duragraph import human_node
@human_node(
prompt="Please review and approve the following:",
timeout=3600, # 1 hour timeout
)
def review(self, state):
"""Wait for human approval."""
return state

The >> operator defines transitions between nodes.

@Graph(id="my_graph")
class MyGraph:
@entrypoint
def start(self, state):
return state
def process(self, state):
return state
def end(self, state):
return state
# Sequential flow
start >> process >> end

Conditional edges:

from duragraph import conditional_edge
@Graph(id="conditional_graph")
class ConditionalGraph:
@entrypoint
def start(self, state):
return state
@router_node
def decide(self, state):
return "path_a" if state["condition"] else "path_b"
def path_a(self, state):
return state
def path_b(self, state):
return state
# Conditional routing
start >> decide
decide >> {"path_a": path_a, "path_b": path_b}

Full async/await support for parallel execution.

import asyncio
from duragraph import Graph, llm_node, entrypoint
@Graph(id="async_agent")
class AsyncAgent:
@entrypoint
@llm_node(model="gpt-4o-mini")
async def think(self, state):
"""Async LLM call."""
return state
async def custom_async(self, state):
"""Custom async node."""
result = await some_async_operation()
return {"result": result}
think >> custom_async
# Run async
async def main():
agent = AsyncAgent()
result = await agent.arun({"input": "Hello"})
print(result)
asyncio.run(main())

Deploy your graph as a worker that connects to the control plane.

from duragraph import Graph, Worker
@Graph(id="production_agent")
class ProductionAgent:
# ... define nodes ...
pass
# Create worker
worker = Worker(
graph=ProductionAgent,
control_plane_url="http://localhost:8081",
heartbeat_interval=30, # Send heartbeat every 30 seconds
max_concurrent_tasks=10,
)
# Start worker (blocking)
worker.start()
# Or async
async def run_worker():
await worker.astart()
asyncio.run(run_worker())

Graceful Shutdown:

import signal
def shutdown_handler(signum, frame):
print("Shutting down gracefully...")
worker.stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
worker.start()
from duragraph.llm import OpenAIProvider
provider = OpenAIProvider(
api_key="sk-...", # Or set OPENAI_API_KEY env var
organization="org-...", # Optional
base_url="https://api.openai.com/v1", # Optional custom endpoint
)
@llm_node(
model="gpt-4o",
temperature=0.7,
provider=provider,
)
def generate(self, state):
return state

Supported Models:

  • gpt-4o
  • gpt-4o-mini
  • gpt-4-turbo
  • gpt-4
  • gpt-3.5-turbo
from duragraph.llm import AnthropicProvider
provider = AnthropicProvider(
api_key="sk-ant-...", # Or set ANTHROPIC_API_KEY env var
)
@llm_node(
model="claude-3-5-sonnet-20241022",
temperature=0.7,
provider=provider,
)
def generate(self, state):
return state

Supported Models:

  • claude-3-5-sonnet-20241022
  • claude-3-opus-20240229
  • claude-3-sonnet-20240229
  • claude-3-haiku-20240307
from duragraph.vectorstores import ChromaVectorStore
vector_store = ChromaVectorStore(
collection_name="my_docs",
persist_directory="./chroma_db",
embedding_function=None, # Uses default
)
# Add documents
vector_store.add_documents([
{"id": "1", "text": "Document 1", "metadata": {"source": "file1.txt"}},
{"id": "2", "text": "Document 2", "metadata": {"source": "file2.txt"}},
])
# Search
results = vector_store.similarity_search(
query="search term",
k=5,
filter={"source": "file1.txt"},
)
from duragraph.vectorstores import PineconeVectorStore
vector_store = PineconeVectorStore(
api_key="your-api-key",
environment="us-east-1-aws",
index_name="my-index",
)
# Use same interface as Chroma
from duragraph.vectorstores import QdrantVectorStore
vector_store = QdrantVectorStore(
url="http://localhost:6333",
collection_name="my_collection",
api_key=None, # Optional for cloud
)
from duragraph.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
api_key="sk-...",
)
# Generate embeddings
vectors = embeddings.embed_documents([
"Document 1",
"Document 2",
])
# Single query
query_vector = embeddings.embed_query("search term")
from duragraph.embeddings import CohereEmbeddings
embeddings = CohereEmbeddings(
model="embed-english-v3.0",
api_key="your-key",
)
from duragraph.embeddings import OllamaEmbeddings
embeddings = OllamaEmbeddings(
model="nomic-embed-text",
base_url="http://localhost:11434",
)
from duragraph.document_loaders import TextLoader
loader = TextLoader("path/to/file.txt")
documents = loader.load()
from duragraph.document_loaders import PDFLoader
loader = PDFLoader("path/to/file.pdf")
documents = loader.load()
from duragraph.document_loaders import DirectoryLoader
loader = DirectoryLoader(
path="./docs",
glob="**/*.md",
loader_cls=TextLoader,
)
documents = loader.load()
from duragraph.text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", " ", ""],
)
chunks = splitter.split_documents(documents)

Register and execute tools dynamically.

from duragraph import ToolRegistry, tool
# Create registry
registry = ToolRegistry()
# Register tools
@registry.register
@tool(
name="calculator",
description="Perform arithmetic operations",
)
def calculator(operation: str, a: float, b: float) -> float:
"""Execute a calculation."""
if operation == "add":
return a + b
elif operation == "multiply":
return a * b
# ... more operations
# Use in graph
@Graph(id="math_agent")
class MathAgent:
@entrypoint
@llm_node(model="gpt-4o-mini", tools=registry.get_tools())
def solve(self, state):
return state

Local development commands (v0.2.0).

Terminal window
# Initialize new project
duragraph init my-agent
# Run graph locally
duragraph dev my_graph.py:MyGraph
# Deploy to control plane
duragraph deploy my_graph.py:MyGraph --url http://localhost:8081
# Visualize graph
duragraph visualize my_graph.py:MyGraph --output graph.png
from typing import TypedDict
from duragraph import Graph
class MyState(TypedDict):
messages: list[dict]
intent: str
result: str
@Graph(id="typed_graph", state_schema=MyState)
class TypedGraph:
@entrypoint
def start(self, state: MyState) -> MyState:
return state
from pydantic import BaseModel, Field
from duragraph import Graph
class MyState(BaseModel):
messages: list[dict] = Field(default_factory=list)
intent: str = ""
result: str = ""
@Graph(id="pydantic_graph", state_schema=MyState)
class PydanticGraph:
@entrypoint
def start(self, state: MyState) -> MyState:
return state
from duragraph.exceptions import (
DuraGraphError,
NodeExecutionError,
WorkerConnectionError,
)
try:
result = agent.run(input_state)
except NodeExecutionError as e:
print(f"Node {e.node_id} failed: {e.message}")
except WorkerConnectionError as e:
print(f"Worker connection failed: {e}")
except DuraGraphError as e:
print(f"Error: {e}")
from duragraph import Graph, llm_node, entrypoint
from duragraph.vectorstores import ChromaVectorStore
from duragraph.embeddings import OpenAIEmbeddings
@Graph(id="rag_agent")
class RAGAgent:
def __init__(self):
self.vector_store = ChromaVectorStore(
collection_name="docs",
embedding_function=OpenAIEmbeddings(),
)
@entrypoint
def retrieve(self, state):
"""Retrieve relevant documents."""
query = state["query"]
results = self.vector_store.similarity_search(query, k=5)
return {"documents": results}
@llm_node(model="gpt-4o")
def generate(self, state):
"""Generate answer from documents."""
context = "\n".join([doc["text"] for doc in state["documents"]])
return {
"messages": [
{"role": "system", "content": f"Context:\n{context}"},
{"role": "user", "content": state["query"]},
]
}
retrieve >> generate
from duragraph import Graph, llm_node, entrypoint
@Graph(id="researcher")
class Researcher:
@entrypoint
@llm_node(model="gpt-4o")
def research(self, state):
return {"findings": "research results"}
@Graph(id="writer")
class Writer:
@entrypoint
@llm_node(model="gpt-4o")
def write(self, state):
return {"article": "written article"}
@Graph(id="multi_agent")
class MultiAgent:
def __init__(self):
self.researcher = Researcher()
self.writer = Writer()
@entrypoint
def orchestrate(self, state):
# Run researcher
research_result = self.researcher.run(state)
# Pass to writer
article = self.writer.run({
"findings": research_result["findings"]
})
return article

For complete API documentation, see:

See Contributing Guide