在构建需要做出复杂决策的智能系统时,简单的 if-else 逻辑往往力不从心。本文探讨我如何使用 LangGraph 为地质灾害风险评估构建多阶段决策流水线——整合气象数据、地质因素和 LLM 辅助判断。
问题背景
地质灾害风险评估需要:
- 多源数据融合 — 天气、地形、历史事故
- 不确定性处理 — 数据可能不完整或冲突
- 可解释输出 — 用户需要理解风险等级的依据
- 动态修正 — 低置信度区域需要额外分析
简单的线性流水线无法处理这些需求。LangGraph 的图状态机提供了更好的解决方案。
LangGraph 概述
LangGraph 是一个用于构建有状态多代理应用的框架。它允许定义:
- 节点(Nodes) — 转换状态的处理步骤
- 边(Edges) — 节点之间的条件转换
- 状态(State) — 在图中流转的共享数据
基本结构
from langgraph.graph import StateGraph, END
from typing import TypedDict
class AssessmentState(TypedDict):
region_id: str
weather_data: dict
geological_data: dict
historical_data: dict
neighbor_data: list
initial_risk: str
confidence: float
reasons: list
final_risk: str
needs_refinement: bool
def build_assessment_graph():
graph = StateGraph(AssessmentState)
# 添加节点
graph.add_node("fetch_data", fetch_data_node)
graph.add_node("initial_assessment", initial_assessment_node)
graph.add_node("neighbor_fusion", neighbor_fusion_node)
graph.add_node("llm_refinement", llm_refinement_node)
graph.add_node("finalize", finalize_node)
# 定义边
graph.set_entry_point("fetch_data")
graph.add_edge("fetch_data", "initial_assessment")
graph.add_edge("initial_assessment", "neighbor_fusion")
# 条件边:置信度高时跳过 LLM
graph.add_conditional_edges(
"neighbor_fusion",
should_refine,
{
"refine": "llm_refinement",
"skip": "finalize"
}
)
graph.add_edge("llm_refinement", "finalize")
graph.add_edge("finalize", END)
return graph.compile()节点实现
1. 数据获取节点
async def fetch_data_node(state: AssessmentState) -> AssessmentState:
"""收集区域的所有相关数据。"""
region_id = state["region_id"]
# 并行从多个源获取数据
weather = await fetch_weather_data(region_id)
geological = await fetch_geological_data(region_id)
historical = await fetch_historical_incidents(region_id)
return {
**state,
"weather_data": weather,
"geological_data": geological,
"historical_data": historical,
}2. 初步评估节点
def initial_assessment_node(state: AssessmentState) -> AssessmentState:
"""基于加权因素计算初步风险。"""
weather = state["weather_data"]
geo = state["geological_data"]
history = state["historical_data"]
# 风险评分
rain_score = calculate_rain_risk(weather["precipitation_24h"])
slope_score = calculate_slope_risk(geo["avg_slope"])
soil_score = calculate_soil_risk(geo["soil_type"])
history_score = calculate_history_risk(history["incident_count"])
# 加权组合
total_score = (
rain_score * 0.35 +
slope_score * 0.25 +
soil_score * 0.20 +
history_score * 0.20
)
# 映射评分到风险等级
risk_level = map_score_to_level(total_score)
confidence = calculate_confidence(weather, geo, history)
return {
**state,
"initial_risk": risk_level,
"confidence": confidence,
"reasons": [
f"降雨量: {weather['precipitation_24h']}mm (权重35%)",
f"平均坡度: {geo['avg_slope']}° (权重25%)",
f"土壤类型: {geo['soil_type']} (权重20%)",
f"历史事故: {history['incident_count']}次 (权重20%)",
],
}3. 邻近区域融合节点
def neighbor_fusion_node(state: AssessmentState) -> AssessmentState:
"""根据邻近区域状态调整风险。"""
region_id = state["region_id"]
initial_risk = state["initial_risk"]
# 获取邻近区域的风险等级
neighbors = fetch_neighbor_risks(region_id)
state["neighbor_data"] = neighbors
# 融合逻辑
if neighbors:
neighbor_avg = average_risk_level(neighbors)
# 如果邻近区域风险更高,上调评估
if neighbor_avg > initial_risk:
adjusted_risk = bump_risk_level(initial_risk)
state["initial_risk"] = adjusted_risk
state["reasons"].append(
f"邻近区域平均风险较高,已上调风险等级"
)
# 判断是否需要 LLM 修正
state["needs_refinement"] = (
state["confidence"] < 0.7 or
state["initial_risk"] in ["orange", "red"]
)
return state4. LLM 修正节点
async def llm_refinement_node(state: AssessmentState) -> AssessmentState:
"""对低置信度区域使用 LLM 进行详细分析。"""
from anthropic import Anthropic
client = Anthropic()
prompt = f"""
作为地质灾害风险评估专家,请分析以下数据并给出最终判断:
区域ID: {state['region_id']}
降雨数据: {state['weather_data']}
地质数据: {state['geological_data']}
历史数据: {state['historical_data']}
邻近区域风险: {state['neighbor_data']}
初步评估: {state['initial_risk']} (置信度: {state['confidence']})
请输出:
1. 最终风险等级 (green/yellow/orange/red)
2. 主要风险因素
3. 可能的灾害类型
4. 建议
"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
analysis = parse_llm_response(response.content[0].text)
return {
**state,
"final_risk": analysis["risk_level"],
"reasons": state["reasons"] + analysis["factors"],
}5. 条件边
def should_refine(state: AssessmentState) -> str:
"""判断是否需要 LLM 修正。"""
if state["needs_refinement"]:
return "refine"
return "skip"与 FastAPI 集成
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
assessment_graph = build_assessment_graph()
# 异步结果存储
results_store = {}
@app.post("/api/assess/{region_id}")
async def assess_region(region_id: str, background_tasks: BackgroundTasks):
"""触发区域的异步评估。"""
task_id = generate_task_id()
# 存储初始状态
results_store[task_id] = {"status": "pending", "region_id": region_id}
# 后台运行
background_tasks.add_task(run_assessment, task_id, region_id)
return {"task_id": task_id, "status": "pending"}
async def run_assessment(task_id: str, region_id: str):
"""执行评估图。"""
initial_state = {"region_id": region_id}
result = await assessment_graph.ainvoke(initial_state)
results_store[task_id] = {
"status": "completed",
"region_id": region_id,
"risk_level": result["final_risk"],
"confidence": result["confidence"],
"reasons": result["reasons"],
}
@app.get("/api/result/{task_id}")
async def get_result(task_id: str):
"""获取评估结果。"""
return results_store.get(task_id, {"error": "Task not found"})可视化层
前端在交互式地图上展示结果:
// Vue 风险地图组件
interface RiskResult {
regionId: string;
riskLevel: 'green' | 'yellow' | 'orange' | 'red';
confidence: number;
reasons: string[];
}
const riskColors = {
green: '#22c55e',
yellow: '#eab308',
orange: '#f97316',
red: '#ef4444',
};
function renderRiskMap(results: RiskResult[]) {
results.forEach(result => {
const region = map.getRegion(result.regionId);
region.setStyle({
fillColor: riskColors[result.riskLevel],
fillOpacity: 0.6 + result.confidence * 0.3,
});
region.bindPopup(`
<strong>${result.regionId}</strong><br>
风险等级: ${result.riskLevel}<br>
置信度: ${(result.confidence * 100).toFixed(0)}%<br>
<ul>
${result.reasons.map(r => `<li>${r}</li>`).join('')}
</ul>
`);
});
}定时调度
为周期性更新,我们使用调度器:
import asyncio
from datetime import datetime
async def scheduled_assessment():
"""每30分钟对所有区域运行评估。"""
regions = get_all_region_ids()
for region_id in regions:
try:
result = await assessment_graph.ainvoke({
"region_id": region_id
})
store_result(region_id, result)
notify_if_high_risk(result)
except Exception as e:
log_error(region_id, e)
async def scheduler_loop():
while True:
await scheduled_assessment()
await asyncio.sleep(30 * 60) # 30分钟
# 应用启动时启动调度器
@app.on_event("startup")
async def startup():
asyncio.create_task(scheduler_loop())实现结果
| 功能 | 实现方式 |
|---|---|
| 风险等级 | 4级标准,有明确判定条件 |
| 置信度 | 每区域独立计算,附带证据 |
| 邻近融合 | 本地 + 邻近平均加权 |
| LLM 修正 | 仅对低置信度/高风险区域 |
| 手动刷新 | 异步任务 + 轮询获取结果 |
| 定时调度 | 30分钟周期自动运行 |
经验总结
- 图 vs 流水线 — 图结构处理分支逻辑更优雅
- 默认异步 — LangGraph 节点应为 async 以处理 I/O
- 状态设计 — TypedDict 有助于及早发现错误
- 节制使用 LLM — 仅在必要时使用,降低成本和延迟
- 人类可读 — 每步存储原因便于调试和用户展示
参考
本系统用于教育目的。生产环境部署应使用授权数据源(气象局/地质局等)。