When building intelligent systems that need to make complex decisions, simple if-else logic often falls short. This article explores how I used LangGraph to build a multi-stage decision pipeline for geological hazard risk assessment—integrating meteorological data, geological factors, and LLM-powered refinement.
The Problem
Geological hazard assessment requires:
- Multi-source data fusion — Weather, terrain, historical incidents
- Uncertainty handling — Data may be incomplete or conflicting
- Interpretable outputs — Users need to understand why a risk level was assigned
- Dynamic refinement — Low-confidence regions need additional analysis
A simple linear pipeline couldn't handle these requirements. LangGraph's graph-based state machine offered a better approach.
LangGraph Overview
LangGraph is a framework for building stateful, multi-agent applications. It allows defining:
- Nodes — Processing steps that transform state
- Edges — Conditional transitions between nodes
- State — Shared data that flows through the graph
Basic Structure
from langgraph.graph import StateGraph, END
from typing import TypedDict
class AssessmentState(TypedDict):
region_id: str
weather_data: dict
geological_data: dict
historical_data: dict
neighbor_data: list
initial_risk: str
confidence: float
reasons: list
final_risk: str
needs_refinement: bool
def build_assessment_graph():
graph = StateGraph(AssessmentState)
# Add nodes
graph.add_node("fetch_data", fetch_data_node)
graph.add_node("initial_assessment", initial_assessment_node)
graph.add_node("neighbor_fusion", neighbor_fusion_node)
graph.add_node("llm_refinement", llm_refinement_node)
graph.add_node("finalize", finalize_node)
# Define edges
graph.set_entry_point("fetch_data")
graph.add_edge("fetch_data", "initial_assessment")
graph.add_edge("initial_assessment", "neighbor_fusion")
# Conditional edge: skip LLM if confidence is high
graph.add_conditional_edges(
"neighbor_fusion",
should_refine,
{
"refine": "llm_refinement",
"skip": "finalize"
}
)
graph.add_edge("llm_refinement", "finalize")
graph.add_edge("finalize", END)
return graph.compile()Node Implementations
1. Data Fetch Node
async def fetch_data_node(state: AssessmentState) -> AssessmentState:
"""Gather all relevant data for the region."""
region_id = state["region_id"]
# Fetch from multiple sources concurrently
weather = await fetch_weather_data(region_id)
geological = await fetch_geological_data(region_id)
historical = await fetch_historical_incidents(region_id)
return {
**state,
"weather_data": weather,
"geological_data": geological,
"historical_data": historical,
}2. Initial Assessment Node
def initial_assessment_node(state: AssessmentState) -> AssessmentState:
"""Calculate initial risk based on weighted factors."""
weather = state["weather_data"]
geo = state["geological_data"]
history = state["historical_data"]
# Risk scoring
rain_score = calculate_rain_risk(weather["precipitation_24h"])
slope_score = calculate_slope_risk(geo["avg_slope"])
soil_score = calculate_soil_risk(geo["soil_type"])
history_score = calculate_history_risk(history["incident_count"])
# Weighted combination
total_score = (
rain_score * 0.35 +
slope_score * 0.25 +
soil_score * 0.20 +
history_score * 0.20
)
# Map score to risk level
risk_level = map_score_to_level(total_score)
confidence = calculate_confidence(weather, geo, history)
return {
**state,
"initial_risk": risk_level,
"confidence": confidence,
"reasons": [
f"降雨量: {weather['precipitation_24h']}mm (权重35%)",
f"平均坡度: {geo['avg_slope']}° (权重25%)",
f"土壤类型: {geo['soil_type']} (权重20%)",
f"历史事故: {history['incident_count']}次 (权重20%)",
],
}3. Neighbor Fusion Node
def neighbor_fusion_node(state: AssessmentState) -> AssessmentState:
"""Adjust risk based on neighbor region status."""
region_id = state["region_id"]
initial_risk = state["initial_risk"]
# Fetch neighbor regions' risk levels
neighbors = fetch_neighbor_risks(region_id)
state["neighbor_data"] = neighbors
# Fusion logic
if neighbors:
neighbor_avg = average_risk_level(neighbors)
# If neighbors have higher risk, increase our assessment
if neighbor_avg > initial_risk:
adjusted_risk = bump_risk_level(initial_risk)
state["initial_risk"] = adjusted_risk
state["reasons"].append(
f"邻近区域平均风险较高,已上调风险等级"
)
# Determine if refinement needed
state["needs_refinement"] = (
state["confidence"] < 0.7 or
state["initial_risk"] in ["orange", "red"]
)
return state4. LLM Refinement Node
async def llm_refinement_node(state: AssessmentState) -> AssessmentState:
"""Use LLM for detailed analysis of low-confidence regions."""
from anthropic import Anthropic
client = Anthropic()
prompt = f"""
作为地质灾害风险评估专家,请分析以下数据并给出最终判断:
区域ID: {state['region_id']}
降雨数据: {state['weather_data']}
地质数据: {state['geological_data']}
历史数据: {state['historical_data']}
邻近区域风险: {state['neighbor_data']}
初步评估: {state['initial_risk']} (置信度: {state['confidence']})
请输出:
1. 最终风险等级 (green/yellow/orange/red)
2. 主要风险因素
3. 可能的灾害类型
4. 建议
"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
analysis = parse_llm_response(response.content[0].text)
return {
**state,
"final_risk": analysis["risk_level"],
"reasons": state["reasons"] + analysis["factors"],
}5. Conditional Edge
def should_refine(state: AssessmentState) -> str:
"""Determine if LLM refinement is needed."""
if state["needs_refinement"]:
return "refine"
return "skip"Integration with FastAPI
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
assessment_graph = build_assessment_graph()
# Store for async results
results_store = {}
@app.post("/api/assess/{region_id}")
async def assess_region(region_id: str, background_tasks: BackgroundTasks):
"""Trigger async assessment for a region."""
task_id = generate_task_id()
# Store initial state
results_store[task_id] = {"status": "pending", "region_id": region_id}
# Run in background
background_tasks.add_task(run_assessment, task_id, region_id)
return {"task_id": task_id, "status": "pending"}
async def run_assessment(task_id: str, region_id: str):
"""Execute the assessment graph."""
initial_state = {"region_id": region_id}
result = await assessment_graph.ainvoke(initial_state)
results_store[task_id] = {
"status": "completed",
"region_id": region_id,
"risk_level": result["final_risk"],
"confidence": result["confidence"],
"reasons": result["reasons"],
}
@app.get("/api/result/{task_id}")
async def get_result(task_id: str):
"""Get assessment result."""
return results_store.get(task_id, {"error": "Task not found"})Visualization Layer
The frontend displays results on an interactive map:
// Vue component for risk map
interface RiskResult {
regionId: string;
riskLevel: 'green' | 'yellow' | 'orange' | 'red';
confidence: number;
reasons: string[];
}
const riskColors = {
green: '#22c55e',
yellow: '#eab308',
orange: '#f97316',
red: '#ef4444',
};
function renderRiskMap(results: RiskResult[]) {
results.forEach(result => {
const region = map.getRegion(result.regionId);
region.setStyle({
fillColor: riskColors[result.riskLevel],
fillOpacity: 0.6 + result.confidence * 0.3,
});
region.bindPopup(`
<strong>${result.regionId}</strong><br>
风险等级: ${result.riskLevel}<br>
置信度: ${(result.confidence * 100).toFixed(0)}%<br>
<ul>
${result.reasons.map(r => `<li>${r}</li>`).join('')}
</ul>
`);
});
}Scheduled Dispatch
For periodic updates, we use a scheduler:
import asyncio
from datetime import datetime
async def scheduled_assessment():
"""Run assessment for all regions every 30 minutes."""
regions = get_all_region_ids()
for region_id in regions:
try:
result = await assessment_graph.ainvoke({
"region_id": region_id
})
store_result(region_id, result)
notify_if_high_risk(result)
except Exception as e:
log_error(region_id, e)
async def scheduler_loop():
while True:
await scheduled_assessment()
await asyncio.sleep(30 * 60) # 30 minutes
# Start scheduler on app startup
@app.on_event("startup")
async def startup():
asyncio.create_task(scheduler_loop())Results
| Feature | Implementation |
|---|---|
| Risk levels | 4 levels with clear criteria |
| Confidence | Per-region calculation with evidence |
| Neighbor fusion | Local + neighbor average weighted |
| LLM refinement | Only for low-confidence/high-risk |
| Manual refresh | Async task + polling for result |
| Scheduled dispatch | 30-minute periodic auto-run |
Lessons Learned
- Graph vs Pipeline — Graphs handle branching logic more elegantly
- Async by default — LangGraph nodes should be async for I/O operations
- State design — TypedDict helps catch errors early
- LLM sparingly — Use LLM only when necessary to reduce cost and latency
- Human-readable — Store reasons at each step for debugging and user display
References
This system was built for educational purposes. Production deployments should use authorized data sources (CMA/CGS).