Your First Workflow
Learn to build a complete AI workflow from scratch using Duragraph.
What You’ll Build
Section titled “What You’ll Build”A customer support chatbot that:
- Receives customer queries
- Analyzes sentiment
- Routes to appropriate response strategy
- Generates personalized responses
Prerequisites
Section titled “Prerequisites”- DuraGraph running locally (Quick Start)
- Python 3.8+ installed
- Basic understanding of AI/LLM concepts
Step 1: Define the Workflow
Section titled “Step 1: Define the Workflow”Create customer_support.py:
from duragraph import Workflow, Step
# Create workflowworkflow = Workflow(name="Customer Support Agent")
# Step 1: Analyze sentimentworkflow.add_step("analyze_sentiment", Step.llm_call( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "Analyze the sentiment of this customer message. Respond with just: positive, negative, or neutral."}, {"role": "user", "content": "{customer_message}"} ], output_key="sentiment"))
# Step 2: Route based on sentimentworkflow.add_conditional( "route_by_sentiment", condition=lambda state: state["sentiment"].lower() == "negative", if_true="escalation_response", if_false="standard_response")
# Step 3a: Handle negative sentiment (escalation)workflow.add_step("escalation_response", Step.llm_call( model="gpt-4", messages=[ {"role": "system", "content": "You are an empathetic customer service manager. Address this concern with care and offer to escalate."}, {"role": "user", "content": "{customer_message}"} ], output_key="response"))
# Step 3b: Handle positive/neutral sentimentworkflow.add_step("standard_response", Step.llm_call( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a helpful customer service agent. Provide a friendly, helpful response."}, {"role": "user", "content": "{customer_message}"} ], output_key="response"))Step 2: Execute the Workflow
Section titled “Step 2: Execute the Workflow”from duragraph import DuragraphClient
# Initialize clientclient = DuragraphClient(base_url="http://localhost:8081")
# Create assistantassistant = client.create_assistant( name="Customer Support Bot", workflow=workflow)
# Create conversation threadthread = client.create_thread( metadata={"customer_id": "12345", "channel": "email"})
# Test with different sentimentstest_messages = [ "Hi, I love your product! How can I upgrade my plan?", # Positive "Your service is terrible and I want a refund NOW!", # Negative "Can you help me reset my password please?" # Neutral]
for message in test_messages: print(f"\n🎯 Testing: {message}")
# Create run run = client.create_run( assistant_id=assistant.id, thread_id=thread.id, inputs={"customer_message": message} )
# Stream results for event in client.stream_events(run.id): if event.type == "step_complete": print(f"✅ {event.step_name}: {event.output}") elif event.type == "workflow_complete": print(f"🎉 Final response: {event.outputs['response']}")Step 3: Add Human-in-the-Loop
Section titled “Step 3: Add Human-in-the-Loop”Enhance the workflow with human oversight for escalations:
# Add human approval step for escalationsworkflow.add_step("human_review", Step.human_input( prompt="Customer escalation requires review:", input_schema={ "type": "object", "properties": { "approved": {"type": "boolean"}, "override_response": {"type": "string", "optional": True} } }, depends_on=["escalation_response"]))
# Final response stepworkflow.add_step("final_response", Step.function_call( function=lambda state: { "final_response": ( state["human_review"]["override_response"] if state["human_review"].get("override_response") else state["response"] ) }, depends_on=["human_review", "standard_response"]))Step 4: Add Observability
Section titled “Step 4: Add Observability”Monitor your workflow performance:
import time
# Add timing and loggingstart_time = time.time()
run = client.create_run( assistant_id=assistant.id, thread_id=thread.id, inputs={"customer_message": message}, metadata={ "channel": "email", "priority": "normal", "timestamp": start_time })
# Track metricsmetrics = { "total_time": 0, "sentiment_analysis_time": 0, "response_generation_time": 0, "tokens_used": 0}
for event in client.stream_events(run.id): if event.type == "step_complete": step_time = event.metadata.get("duration", 0) tokens = event.metadata.get("tokens_used", 0)
metrics["total_time"] += step_time metrics["tokens_used"] += tokens
if event.step_name == "analyze_sentiment": metrics["sentiment_analysis_time"] = step_time elif "response" in event.step_name: metrics["response_generation_time"] = step_time
print(f"📊 {event.step_name}: {step_time:.2f}s, {tokens} tokens")
print(f"\n📈 Total metrics: {metrics}")Step 5: Error Handling
Section titled “Step 5: Error Handling”Add robust error handling:
from duragraph import WorkflowError, RetryPolicy
# Configure retry policyworkflow.set_retry_policy(RetryPolicy( max_attempts=3, backoff_multiplier=2.0, retry_on_errors=["LLMTimeoutError", "LLMRateLimitError"]))
# Add error handling stepworkflow.add_step("error_fallback", Step.llm_call( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "Generate a polite fallback response for when the system encounters an error."}, {"role": "user", "content": "Original message: {customer_message}"} ], output_key="fallback_response", triggers_on_error=["analyze_sentiment", "escalation_response", "standard_response"]))
# Execute with error handlingtry: run = client.create_run( assistant_id=assistant.id, thread_id=thread.id, inputs={"customer_message": message} )
result = run.wait_for_completion(timeout=60)
except WorkflowError as e: print(f"Workflow failed: {e.message}") # Use fallback response fallback = client.get_step_output(run.id, "error_fallback") print(f"Fallback response: {fallback}")Complete Example
Section titled “Complete Example”Here’s the full working example:
from duragraph import DuragraphClient, Workflow, Step, RetryPolicyimport time
def create_customer_support_workflow(): workflow = Workflow(name="Customer Support Agent v1.0")
# Sentiment analysis workflow.add_step("analyze_sentiment", Step.llm_call( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "Analyze sentiment: positive, negative, or neutral."}, {"role": "user", "content": "{customer_message}"} ], output_key="sentiment" ))
# Conditional routing workflow.add_conditional( "route_by_sentiment", condition=lambda state: state["sentiment"].lower() == "negative", if_true="escalation_response", if_false="standard_response" )
# Response generation workflow.add_step("escalation_response", Step.llm_call( model="gpt-4", messages=[ {"role": "system", "content": "Empathetic customer service manager response."}, {"role": "user", "content": "{customer_message}"} ], output_key="response" ))
workflow.add_step("standard_response", Step.llm_call( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "Helpful customer service agent."}, {"role": "user", "content": "{customer_message}"} ], output_key="response" ))
# Error handling workflow.set_retry_policy(RetryPolicy(max_attempts=3))
return workflow
def main(): # Initialize client = DuragraphClient(base_url="http://localhost:8081") workflow = create_customer_support_workflow()
# Create assistant assistant = client.create_assistant( name="Customer Support Bot v1.0", workflow=workflow )
# Test the workflow test_message = "Your app keeps crashing and I'm losing my work!"
thread = client.create_thread() run = client.create_run( assistant_id=assistant.id, thread_id=thread.id, inputs={"customer_message": test_message} )
print(f"🚀 Processing: {test_message}")
for event in client.stream_events(run.id): if event.type == "step_complete": print(f"✅ {event.step_name}: {event.output}") elif event.type == "workflow_complete": print(f"🎉 Final response: {event.outputs['response']}")
if __name__ == "__main__": main()Next Steps
Section titled “Next Steps”Enhance your workflow:
Explore advanced features:
Production readiness:
Need help? Join the community.