返回博客列表

使用 LangGraph 构建多阶段决策系统

我如何使用 LangGraph 的多阶段决策架构设计地质灾害风险评估系统,实现智能风险判断。

#AI#LangGraph#FastAPI#Python

在构建需要做出复杂决策的智能系统时,简单的 if-else 逻辑往往力不从心。本文探讨我如何使用 LangGraph 为地质灾害风险评估构建多阶段决策流水线——整合气象数据、地质因素和 LLM 辅助判断。

问题背景

地质灾害风险评估需要:

  1. 多源数据融合 — 天气、地形、历史事故
  2. 不确定性处理 — 数据可能不完整或冲突
  3. 可解释输出 — 用户需要理解风险等级的依据
  4. 动态修正 — 低置信度区域需要额外分析

简单的线性流水线无法处理这些需求。LangGraph 的图状态机提供了更好的解决方案。

LangGraph 概述

LangGraph 是一个用于构建有状态多代理应用的框架。它允许定义:

  • 节点(Nodes) — 转换状态的处理步骤
  • 边(Edges) — 节点之间的条件转换
  • 状态(State) — 在图中流转的共享数据

基本结构

from langgraph.graph import StateGraph, END
from typing import TypedDict
 
class AssessmentState(TypedDict):
    region_id: str
    weather_data: dict
    geological_data: dict
    historical_data: dict
    neighbor_data: list
    initial_risk: str
    confidence: float
    reasons: list
    final_risk: str
    needs_refinement: bool
 
def build_assessment_graph():
    graph = StateGraph(AssessmentState)
 
    # 添加节点
    graph.add_node("fetch_data", fetch_data_node)
    graph.add_node("initial_assessment", initial_assessment_node)
    graph.add_node("neighbor_fusion", neighbor_fusion_node)
    graph.add_node("llm_refinement", llm_refinement_node)
    graph.add_node("finalize", finalize_node)
 
    # 定义边
    graph.set_entry_point("fetch_data")
    graph.add_edge("fetch_data", "initial_assessment")
    graph.add_edge("initial_assessment", "neighbor_fusion")
 
    # 条件边:置信度高时跳过 LLM
    graph.add_conditional_edges(
        "neighbor_fusion",
        should_refine,
        {
            "refine": "llm_refinement",
            "skip": "finalize"
        }
    )
 
    graph.add_edge("llm_refinement", "finalize")
    graph.add_edge("finalize", END)
 
    return graph.compile()

节点实现

1. 数据获取节点

async def fetch_data_node(state: AssessmentState) -> AssessmentState:
    """收集区域的所有相关数据。"""
    region_id = state["region_id"]
 
    # 并行从多个源获取数据
    weather = await fetch_weather_data(region_id)
    geological = await fetch_geological_data(region_id)
    historical = await fetch_historical_incidents(region_id)
 
    return {
        **state,
        "weather_data": weather,
        "geological_data": geological,
        "historical_data": historical,
    }

2. 初步评估节点

def initial_assessment_node(state: AssessmentState) -> AssessmentState:
    """基于加权因素计算初步风险。"""
    weather = state["weather_data"]
    geo = state["geological_data"]
    history = state["historical_data"]
 
    # 风险评分
    rain_score = calculate_rain_risk(weather["precipitation_24h"])
    slope_score = calculate_slope_risk(geo["avg_slope"])
    soil_score = calculate_soil_risk(geo["soil_type"])
    history_score = calculate_history_risk(history["incident_count"])
 
    # 加权组合
    total_score = (
        rain_score * 0.35 +
        slope_score * 0.25 +
        soil_score * 0.20 +
        history_score * 0.20
    )
 
    # 映射评分到风险等级
    risk_level = map_score_to_level(total_score)
    confidence = calculate_confidence(weather, geo, history)
 
    return {
        **state,
        "initial_risk": risk_level,
        "confidence": confidence,
        "reasons": [
            f"降雨量: {weather['precipitation_24h']}mm (权重35%)",
            f"平均坡度: {geo['avg_slope']}° (权重25%)",
            f"土壤类型: {geo['soil_type']} (权重20%)",
            f"历史事故: {history['incident_count']}次 (权重20%)",
        ],
    }

3. 邻近区域融合节点

def neighbor_fusion_node(state: AssessmentState) -> AssessmentState:
    """根据邻近区域状态调整风险。"""
    region_id = state["region_id"]
    initial_risk = state["initial_risk"]
 
    # 获取邻近区域的风险等级
    neighbors = fetch_neighbor_risks(region_id)
    state["neighbor_data"] = neighbors
 
    # 融合逻辑
    if neighbors:
        neighbor_avg = average_risk_level(neighbors)
 
        # 如果邻近区域风险更高,上调评估
        if neighbor_avg > initial_risk:
            adjusted_risk = bump_risk_level(initial_risk)
            state["initial_risk"] = adjusted_risk
            state["reasons"].append(
                f"邻近区域平均风险较高,已上调风险等级"
            )
 
    # 判断是否需要 LLM 修正
    state["needs_refinement"] = (
        state["confidence"] < 0.7 or
        state["initial_risk"] in ["orange", "red"]
    )
 
    return state

4. LLM 修正节点

async def llm_refinement_node(state: AssessmentState) -> AssessmentState:
    """对低置信度区域使用 LLM 进行详细分析。"""
    from anthropic import Anthropic
 
    client = Anthropic()
 
    prompt = f"""
    作为地质灾害风险评估专家,请分析以下数据并给出最终判断:
 
    区域ID: {state['region_id']}
    降雨数据: {state['weather_data']}
    地质数据: {state['geological_data']}
    历史数据: {state['historical_data']}
    邻近区域风险: {state['neighbor_data']}
    初步评估: {state['initial_risk']} (置信度: {state['confidence']})
 
    请输出:
    1. 最终风险等级 (green/yellow/orange/red)
    2. 主要风险因素
    3. 可能的灾害类型
    4. 建议
    """
 
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
 
    analysis = parse_llm_response(response.content[0].text)
 
    return {
        **state,
        "final_risk": analysis["risk_level"],
        "reasons": state["reasons"] + analysis["factors"],
    }

5. 条件边

def should_refine(state: AssessmentState) -> str:
    """判断是否需要 LLM 修正。"""
    if state["needs_refinement"]:
        return "refine"
    return "skip"

与 FastAPI 集成

from fastapi import FastAPI, BackgroundTasks
import asyncio
 
app = FastAPI()
assessment_graph = build_assessment_graph()
 
# 异步结果存储
results_store = {}
 
@app.post("/api/assess/{region_id}")
async def assess_region(region_id: str, background_tasks: BackgroundTasks):
    """触发区域的异步评估。"""
    task_id = generate_task_id()
 
    # 存储初始状态
    results_store[task_id] = {"status": "pending", "region_id": region_id}
 
    # 后台运行
    background_tasks.add_task(run_assessment, task_id, region_id)
 
    return {"task_id": task_id, "status": "pending"}
 
async def run_assessment(task_id: str, region_id: str):
    """执行评估图。"""
    initial_state = {"region_id": region_id}
 
    result = await assessment_graph.ainvoke(initial_state)
 
    results_store[task_id] = {
        "status": "completed",
        "region_id": region_id,
        "risk_level": result["final_risk"],
        "confidence": result["confidence"],
        "reasons": result["reasons"],
    }
 
@app.get("/api/result/{task_id}")
async def get_result(task_id: str):
    """获取评估结果。"""
    return results_store.get(task_id, {"error": "Task not found"})

可视化层

前端在交互式地图上展示结果:

// Vue 风险地图组件
interface RiskResult {
  regionId: string;
  riskLevel: 'green' | 'yellow' | 'orange' | 'red';
  confidence: number;
  reasons: string[];
}
 
const riskColors = {
  green: '#22c55e',
  yellow: '#eab308',
  orange: '#f97316',
  red: '#ef4444',
};
 
function renderRiskMap(results: RiskResult[]) {
  results.forEach(result => {
    const region = map.getRegion(result.regionId);
    region.setStyle({
      fillColor: riskColors[result.riskLevel],
      fillOpacity: 0.6 + result.confidence * 0.3,
    });
 
    region.bindPopup(`
      <strong>${result.regionId}</strong><br>
      风险等级: ${result.riskLevel}<br>
      置信度: ${(result.confidence * 100).toFixed(0)}%<br>
      <ul>
        ${result.reasons.map(r => `<li>${r}</li>`).join('')}
      </ul>
    `);
  });
}

定时调度

为周期性更新,我们使用调度器:

import asyncio
from datetime import datetime
 
async def scheduled_assessment():
    """每30分钟对所有区域运行评估。"""
    regions = get_all_region_ids()
 
    for region_id in regions:
        try:
            result = await assessment_graph.ainvoke({
                "region_id": region_id
            })
            store_result(region_id, result)
            notify_if_high_risk(result)
        except Exception as e:
            log_error(region_id, e)
 
async def scheduler_loop():
    while True:
        await scheduled_assessment()
        await asyncio.sleep(30 * 60)  # 30分钟
 
# 应用启动时启动调度器
@app.on_event("startup")
async def startup():
    asyncio.create_task(scheduler_loop())

实现结果

功能实现方式
风险等级4级标准,有明确判定条件
置信度每区域独立计算,附带证据
邻近融合本地 + 邻近平均加权
LLM 修正仅对低置信度/高风险区域
手动刷新异步任务 + 轮询获取结果
定时调度30分钟周期自动运行

经验总结

  1. 图 vs 流水线 — 图结构处理分支逻辑更优雅
  2. 默认异步 — LangGraph 节点应为 async 以处理 I/O
  3. 状态设计 — TypedDict 有助于及早发现错误
  4. 节制使用 LLM — 仅在必要时使用,降低成本和延迟
  5. 人类可读 — 每步存储原因便于调试和用户展示

参考


本系统用于教育目的。生产环境部署应使用授权数据源(气象局/地质局等)。