Back to blog

Building a Multi-Stage Decision System with LangGraph

How I designed a geological hazard assessment system using LangGraph's multi-stage decision architecture for intelligent risk evaluation.

#AI#LangGraph#FastAPI#Python

When building intelligent systems that need to make complex decisions, simple if-else logic often falls short. This article explores how I used LangGraph to build a multi-stage decision pipeline for geological hazard risk assessment—integrating meteorological data, geological factors, and LLM-powered refinement.

The Problem

Geological hazard assessment requires:

  1. Multi-source data fusion — Weather, terrain, historical incidents
  2. Uncertainty handling — Data may be incomplete or conflicting
  3. Interpretable outputs — Users need to understand why a risk level was assigned
  4. Dynamic refinement — Low-confidence regions need additional analysis

A simple linear pipeline couldn't handle these requirements. LangGraph's graph-based state machine offered a better approach.

LangGraph Overview

LangGraph is a framework for building stateful, multi-agent applications. It allows defining:

  • Nodes — Processing steps that transform state
  • Edges — Conditional transitions between nodes
  • State — Shared data that flows through the graph

Basic Structure

from langgraph.graph import StateGraph, END
from typing import TypedDict
 
class AssessmentState(TypedDict):
    region_id: str
    weather_data: dict
    geological_data: dict
    historical_data: dict
    neighbor_data: list
    initial_risk: str
    confidence: float
    reasons: list
    final_risk: str
    needs_refinement: bool
 
def build_assessment_graph():
    graph = StateGraph(AssessmentState)
 
    # Add nodes
    graph.add_node("fetch_data", fetch_data_node)
    graph.add_node("initial_assessment", initial_assessment_node)
    graph.add_node("neighbor_fusion", neighbor_fusion_node)
    graph.add_node("llm_refinement", llm_refinement_node)
    graph.add_node("finalize", finalize_node)
 
    # Define edges
    graph.set_entry_point("fetch_data")
    graph.add_edge("fetch_data", "initial_assessment")
    graph.add_edge("initial_assessment", "neighbor_fusion")
 
    # Conditional edge: skip LLM if confidence is high
    graph.add_conditional_edges(
        "neighbor_fusion",
        should_refine,
        {
            "refine": "llm_refinement",
            "skip": "finalize"
        }
    )
 
    graph.add_edge("llm_refinement", "finalize")
    graph.add_edge("finalize", END)
 
    return graph.compile()

Node Implementations

1. Data Fetch Node

async def fetch_data_node(state: AssessmentState) -> AssessmentState:
    """Gather all relevant data for the region."""
    region_id = state["region_id"]
 
    # Fetch from multiple sources concurrently
    weather = await fetch_weather_data(region_id)
    geological = await fetch_geological_data(region_id)
    historical = await fetch_historical_incidents(region_id)
 
    return {
        **state,
        "weather_data": weather,
        "geological_data": geological,
        "historical_data": historical,
    }

2. Initial Assessment Node

def initial_assessment_node(state: AssessmentState) -> AssessmentState:
    """Calculate initial risk based on weighted factors."""
    weather = state["weather_data"]
    geo = state["geological_data"]
    history = state["historical_data"]
 
    # Risk scoring
    rain_score = calculate_rain_risk(weather["precipitation_24h"])
    slope_score = calculate_slope_risk(geo["avg_slope"])
    soil_score = calculate_soil_risk(geo["soil_type"])
    history_score = calculate_history_risk(history["incident_count"])
 
    # Weighted combination
    total_score = (
        rain_score * 0.35 +
        slope_score * 0.25 +
        soil_score * 0.20 +
        history_score * 0.20
    )
 
    # Map score to risk level
    risk_level = map_score_to_level(total_score)
    confidence = calculate_confidence(weather, geo, history)
 
    return {
        **state,
        "initial_risk": risk_level,
        "confidence": confidence,
        "reasons": [
            f"降雨量: {weather['precipitation_24h']}mm (权重35%)",
            f"平均坡度: {geo['avg_slope']}° (权重25%)",
            f"土壤类型: {geo['soil_type']} (权重20%)",
            f"历史事故: {history['incident_count']}次 (权重20%)",
        ],
    }

3. Neighbor Fusion Node

def neighbor_fusion_node(state: AssessmentState) -> AssessmentState:
    """Adjust risk based on neighbor region status."""
    region_id = state["region_id"]
    initial_risk = state["initial_risk"]
 
    # Fetch neighbor regions' risk levels
    neighbors = fetch_neighbor_risks(region_id)
    state["neighbor_data"] = neighbors
 
    # Fusion logic
    if neighbors:
        neighbor_avg = average_risk_level(neighbors)
 
        # If neighbors have higher risk, increase our assessment
        if neighbor_avg > initial_risk:
            adjusted_risk = bump_risk_level(initial_risk)
            state["initial_risk"] = adjusted_risk
            state["reasons"].append(
                f"邻近区域平均风险较高,已上调风险等级"
            )
 
    # Determine if refinement needed
    state["needs_refinement"] = (
        state["confidence"] < 0.7 or
        state["initial_risk"] in ["orange", "red"]
    )
 
    return state

4. LLM Refinement Node

async def llm_refinement_node(state: AssessmentState) -> AssessmentState:
    """Use LLM for detailed analysis of low-confidence regions."""
    from anthropic import Anthropic
 
    client = Anthropic()
 
    prompt = f"""
    作为地质灾害风险评估专家,请分析以下数据并给出最终判断:
 
    区域ID: {state['region_id']}
    降雨数据: {state['weather_data']}
    地质数据: {state['geological_data']}
    历史数据: {state['historical_data']}
    邻近区域风险: {state['neighbor_data']}
    初步评估: {state['initial_risk']} (置信度: {state['confidence']})
 
    请输出:
    1. 最终风险等级 (green/yellow/orange/red)
    2. 主要风险因素
    3. 可能的灾害类型
    4. 建议
    """
 
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
 
    analysis = parse_llm_response(response.content[0].text)
 
    return {
        **state,
        "final_risk": analysis["risk_level"],
        "reasons": state["reasons"] + analysis["factors"],
    }

5. Conditional Edge

def should_refine(state: AssessmentState) -> str:
    """Determine if LLM refinement is needed."""
    if state["needs_refinement"]:
        return "refine"
    return "skip"

Integration with FastAPI

from fastapi import FastAPI, BackgroundTasks
import asyncio
 
app = FastAPI()
assessment_graph = build_assessment_graph()
 
# Store for async results
results_store = {}
 
@app.post("/api/assess/{region_id}")
async def assess_region(region_id: str, background_tasks: BackgroundTasks):
    """Trigger async assessment for a region."""
    task_id = generate_task_id()
 
    # Store initial state
    results_store[task_id] = {"status": "pending", "region_id": region_id}
 
    # Run in background
    background_tasks.add_task(run_assessment, task_id, region_id)
 
    return {"task_id": task_id, "status": "pending"}
 
async def run_assessment(task_id: str, region_id: str):
    """Execute the assessment graph."""
    initial_state = {"region_id": region_id}
 
    result = await assessment_graph.ainvoke(initial_state)
 
    results_store[task_id] = {
        "status": "completed",
        "region_id": region_id,
        "risk_level": result["final_risk"],
        "confidence": result["confidence"],
        "reasons": result["reasons"],
    }
 
@app.get("/api/result/{task_id}")
async def get_result(task_id: str):
    """Get assessment result."""
    return results_store.get(task_id, {"error": "Task not found"})

Visualization Layer

The frontend displays results on an interactive map:

// Vue component for risk map
interface RiskResult {
  regionId: string;
  riskLevel: 'green' | 'yellow' | 'orange' | 'red';
  confidence: number;
  reasons: string[];
}
 
const riskColors = {
  green: '#22c55e',
  yellow: '#eab308',
  orange: '#f97316',
  red: '#ef4444',
};
 
function renderRiskMap(results: RiskResult[]) {
  results.forEach(result => {
    const region = map.getRegion(result.regionId);
    region.setStyle({
      fillColor: riskColors[result.riskLevel],
      fillOpacity: 0.6 + result.confidence * 0.3,
    });
 
    region.bindPopup(`
      <strong>${result.regionId}</strong><br>
      风险等级: ${result.riskLevel}<br>
      置信度: ${(result.confidence * 100).toFixed(0)}%<br>
      <ul>
        ${result.reasons.map(r => `<li>${r}</li>`).join('')}
      </ul>
    `);
  });
}

Scheduled Dispatch

For periodic updates, we use a scheduler:

import asyncio
from datetime import datetime
 
async def scheduled_assessment():
    """Run assessment for all regions every 30 minutes."""
    regions = get_all_region_ids()
 
    for region_id in regions:
        try:
            result = await assessment_graph.ainvoke({
                "region_id": region_id
            })
            store_result(region_id, result)
            notify_if_high_risk(result)
        except Exception as e:
            log_error(region_id, e)
 
async def scheduler_loop():
    while True:
        await scheduled_assessment()
        await asyncio.sleep(30 * 60)  # 30 minutes
 
# Start scheduler on app startup
@app.on_event("startup")
async def startup():
    asyncio.create_task(scheduler_loop())

Results

FeatureImplementation
Risk levels4 levels with clear criteria
ConfidencePer-region calculation with evidence
Neighbor fusionLocal + neighbor average weighted
LLM refinementOnly for low-confidence/high-risk
Manual refreshAsync task + polling for result
Scheduled dispatch30-minute periodic auto-run

Lessons Learned

  1. Graph vs Pipeline — Graphs handle branching logic more elegantly
  2. Async by default — LangGraph nodes should be async for I/O operations
  3. State design — TypedDict helps catch errors early
  4. LLM sparingly — Use LLM only when necessary to reduce cost and latency
  5. Human-readable — Store reasons at each step for debugging and user display

References


This system was built for educational purposes. Production deployments should use authorized data sources (CMA/CGS).