🟣 Developer Track
Tutorial 5 of 16
🟣 DEVELOPER TRACK β€’ FOUNDATIONS β€’ INTERMEDIATE

Tutorial D5: LangGraph Fundamentals (StateGraph)

Build industrial workflows as explicit graphs with typed state, failure routing, and checkpoints.

βœ… CORE MISSION OF THIS TUTORIAL

By the end of this tutorial, the reader will be able to:

  • βœ… Understand why graphs beat sequential chains for industrial workflows
  • βœ… Build a LangGraph StateGraph with typed state (Pydantic)
  • βœ… Add failure routing so tool errors route to fallback nodes
  • βœ… Use checkpoints to resume workflows after transient failures
  • βœ… Prepare for multi-agent coordination (D7+) and MCP tooling (D6)

This tutorial teaches you to USE LangGraph effectively. Optional deep dive at the end shows how graphs work internally.

🌍 VENDOR-AGNOSTIC ENGINEERING NOTE

This tutorial uses:

  • β–Έ Generic IEC 61131-3 Structured Text (ST) concepts and alarm patterns
  • β–Έ Simulated tag fetch tool only. No live OPC UA or PLC connections.
  • β–Έ Advisory recommendations with human review

You are building a workflow engine for diagnostics, not an autonomous controller.

1️⃣ THE PROBLEM: WHY CHAINS FAIL IN PRODUCTION

If you have built Python scripts for automation, you have probably written workflows as a sequence of function calls: parse input β†’ fetch data β†’ process β†’ output result. This "chain" pattern works well for simple tasks, but it has a critical weakness in industrial environments: when one step fails, the entire workflow crashes.

Imagine an alarm diagnosis workflow with three steps: (1) parse the alarm code, (2) call a simulated tag fetch tool that stands in for a backend such as OPC UA, and (3) generate a troubleshooting recommendation. If step 2 times out due to a network blip, the traditional chain approach crashes and returns nothing, even though step 1 succeeded and could have provided partial guidance. In industrial troubleshooting, partial information is better than no information.

This is where graphs come in. A graph-based workflow treats failures as expected events that can be routed to fallback nodes, logged for review, or retried with backoff. Instead of crashing, the workflow continues along an alternative path.

πŸ’‘ Key Principle: A graph makes failure a first-class path.
Tool errors, fallbacks, retries, and human review gates should be explicit edges, not random exceptions that kill your workflow.

What Happens When a Chain Fails

Let's see the problem in action. The code below shows a typical chain: three functions that pass a dictionary between steps. This pattern is simple and readable, but watch what happens when the second step fails.

We will simulate a common plant scenario: a tag fetch backend times out while retrieving data. You can think of it as an OPC UA-style gateway timeout, but the code below is only a simulation. This is a transient failure. The network might recover in 5 seconds, but the chain treats it as fatal and crashes the entire workflow.

1

EXPERIMENT CELL

The Sequential Chain Problem

experiment

See what happens when a tool call fails mid-pipeline with no recovery path.

Python
# Naive approach: chain of functions over a dict
from typing import TypedDict

class DiagState(TypedDict):
    line_id: str
    alarm_code: str
    observations: list[str]
    tool_status: str
    tags: dict
    recommendation: str

def step1_parse_alarm(state: DiagState) -> DiagState:
    state["observations"].append(f"Alarm {state['alarm_code']} on {state['line_id']}")
    return state

def step2_fetch_tags(state: DiagState) -> DiagState:
    # PROBLEM: If this fails, the whole pipeline crashes
    raise TimeoutError("OPC UA timeout to Edge Gateway")
    state["tags"] = {"PE_14_Status": False, "VFD_Speed": 0}
    state["tool_status"] = "ok"
    return state

def step3_recommend(state: DiagState) -> DiagState:
    state["recommendation"] = "Check PE-14 for jam, clean lens, verify alignment."
    return state

# Try to run it
state: DiagState = {
    "line_id": "Packaging-1",
    "alarm_code": "A305",
    "observations": [],
    "tool_status": "not_called",
    "tags": {},
    "recommendation": "",
}

try:
    state = step1_parse_alarm(state)
    state = step2_fetch_tags(state)   # ❌ Crashes here
    state = step3_recommend(state)    # ❌ Never reached
    print(state["recommendation"])
except TimeoutError as e:
    print(f"Pipeline crashed: {e}")
    print(f"Current state: {state}")  # Partial progress lost
Expected output
Pipeline crashed: OPC UA timeout to Edge Gateway
Current state: {'line_id': 'Packaging-1', 'alarm_code': 'A305', 'observations': ['Alarm A305 on Packaging-1'], 'tool_status': 'not_called', 'tags': {}, 'recommendation': ''}

Explanation

  • - The pipeline crashes at step 2, and you lose all context about what was attempted.
  • - In a production environment, this means the operator gets NOTHING, not even partial guidance.
  • - Retrying means restarting from scratch, even though step 1 succeeded.

Common mistake

Treating tool calls as synchronous, happy-path steps instead of fallible operations that need routing.

Takeaway

Sequential chains have no concept of alternative paths. One failure kills the entire workflow.

What You Actually Need: The Four Graph Primitives

To fix the fragility you just saw, you need to move from sequential chains to graph-based workflows. Graphs have four core primitives that chains lack:

Nodes represent individual units of work. Each node can succeed or fail independently without crashing the entire workflow. Think of a node as a self-contained function that takes the current state, does one thing, and returns updated state.

Edges define the transitions between nodes. Critically, edges can be conditional: "if tool succeeded, go to recommend; if tool timed out, go to error_handler." This is how you route failures to fallback nodes instead of crashing.

Typed state is the shared memory that flows through the graph. Using Pydantic for validation, you catch schema errors (typos, missing fields) before the workflow runs, not during a midnight alarm storm.

Checkpoints are snapshots of state after each node executes. If a transient failure occurs (network blip, temporary API overload), you can resume from the last checkpoint instead of restarting from scratch. This is especially valuable for workflows that call expensive LLM APIs.

2

CHECKPOINT CELL

Mental Model Check: Chains vs Graphs

checkpoint

Understand what separates a graph from a chain before writing any LangGraph code.

Explanation

  • - Chains: Sequential function calls. One failure kills the workflow. No conditional routing. No resume capability.
  • - Graphs: Nodes + edges + state + checkpoints. Failures route to fallback nodes. State is validated. Workflows can resume after interruptions.

Takeaway

Graphs let you model the REAL workflow: normal paths + failure paths + recovery paths + human-in-the-loop gates.

2️⃣ BUILDING WITH LANGGRAPH: STEP-BY-STEP

Now that you understand WHY graphs matter, let's build the same diagnostic workflow using LangGraph. LangGraph is a Python library specifically designed for building stateful, multi-step workflows with explicit failure handling.

We will rebuild the alarm diagnosis workflow step-by-step, adding each primitive (nodes, edges, state, checkpoints) one at a time. By the end of this section, you will have a resilient workflow that routes failures to a fallback node and can resume after transient errors.

What makes LangGraph different from plain Python? LangGraph provides built-in support for conditional routing, state persistence, and checkpointing. You could build these features yourself, but LangGraph handles the plumbing so you can focus on workflow logic.

Step 1: Install LangGraph and Define Typed State

First, install the LangGraph library. LangGraph is built on top of LangChain, so it integrates seamlessly with LLM chains, tools, and agents you might already be using.

3

SETUP CELL

Install LangGraph via pip

setup

Set up the environment and understand the core library.

SHELL
pip install langgraph

Explanation

  • - LangGraph is a stateful graph library built on top of LangChain.
  • - It is designed for multi-agent workflows, tool orchestration, and checkpointing.
  • - If you are using a virtual environment (recommended for plant-side code), activate it first.

Next, define your workflow state using Pydantic. State is the shared "memory" that flows through your graph. Every node receives the current state, updates it, and returns the modified state.

Pydantic provides runtime validation: if a node tries to write an invalid field type or forgets a required field, Pydantic catches the error BEFORE the workflow runs. This prevents midnight debugging sessions where you discover a typo during an alarm storm.

4

SETUP CELL

Define typed state with Pydantic

setup

Create a validated state schema that all nodes will share.

Python
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph

class DiagnosticState(BaseModel):
    line_id: str
    alarm_code: str
    observations: list[str] = Field(default_factory=list)
    tool_status: str = "not_called"
    tags: dict[str, object] = Field(default_factory=dict)
    recommendation: str = ""
    error_message: str = ""  # NEW: track failures explicitly

Explanation

  • - Pydantic validates your state at runtime, catching schema errors early.
  • - Field(default_factory=...) gives mutable fields safe defaults (e.g., empty lists).
  • - error_message is NEW: it gives us a place to store failure info without crashing.

Takeaway

Typed state prevents accidental typos and makes your workflow easier to understand.

Step 2: Build the Graph with Nodes and Edges

Now comes the core of LangGraph: defining node functions. Each node is a Python function with a simple contract: it receives the current state (DiagnosticState), does ONE thing, and returns the updated state.

Critical principle: Nodes should NOT raise exceptions for expected failures. Instead, they should update fields in the state (like tool_status or error_message) to signal what happened. This allows conditional edges to route the workflow to fallback nodes instead of crashing.

In the code below, we define four nodes: (1) parse_alarm (always succeeds), (2) fetch_tags (simulates a backend tag read and may timeout), (3) recommend (uses fetched data), and (4) error_handler (fallback when tools fail). Notice how fetch_tags catches TimeoutError and updates state instead of crashing.

5

CORE CELL

Define node functions with graceful failure handling

core

Create the individual workflow steps as pure functions that handle failures gracefully.

Python
def parse_alarm_node(state: DiagnosticState) -> DiagnosticState:
    """Parse the alarm code and add to observations."""
    state.observations.append(f"Alarm {state.alarm_code} detected on {state.line_id}")
    return state

def fetch_tags_node(state: DiagnosticState) -> DiagnosticState:
    """Simulate a tag-fetch tool call. May fail."""
    try:
        # Simulate a backend tag read (e.g. OPC UA in a real system)
        import random
        if random.random() < 0.3:  # 30% chance of timeout
            raise TimeoutError("OPC UA timeout to Edge Gateway")
        state.tags = {"PE_14_Status": False, "VFD_Speed": 0}
        state.tool_status = "ok"
    except TimeoutError as e:
        state.tool_status = "timeout"
        state.error_message = str(e)
    return state

def recommend_node(state: DiagnosticState) -> DiagnosticState:
    """Generate recommendation from available data."""
    if state.tool_status == "ok":
        state.recommendation = f"Check PE-14: status={state.tags.get('PE_14_Status')}"
    else:
        state.recommendation = "Unable to fetch live tags. Review recent change logs manually."
    return state

def error_handler_node(state: DiagnosticState) -> DiagnosticState:
    """Handle tool failures gracefully."""
    state.observations.append(f"Tool failure detected: {state.error_message}")
    state.recommendation = "Fallback: Review recent alarms and manual override history."
    return state

Explanation

  • - Each node is a pure function: DiagnosticState β†’ DiagnosticState.
  • - Nodes do not raise exceptions for expected failures. They update state.tool_status instead.
  • - error_handler_node provides fallback logic when tools fail.

Why this matters

By keeping failures inside state, we avoid crashing the graph and enable conditional routing.

Now that you have defined the node functions, it's time to assemble the graph. This is where you specify how nodes connect to each other: which nodes always flow to the next step, and which nodes need conditional routing based on state.

LangGraph provides three core methods for wiring nodes: add_node() registers a function as a node, add_edge() creates an unconditional transition ("always go here next"), and add_conditional_edges() creates a routing function that inspects state and decides where to go.

The routing function (route_after_fetch_tags) is just a Python function that takes state and returns the name of the next node. In our case, it checks tool_status: if "ok", route to recommend; if "timeout", route to error_handler. This is how failures become first-class paths instead of crashes.

6

CORE CELL

Assemble the graph with conditional routing

core

Wire nodes together with normal edges and failure edges.

Python
from langgraph.graph import END

graph = StateGraph(DiagnosticState)

# Add nodes
graph.add_node("parse_alarm", parse_alarm_node)
graph.add_node("fetch_tags", fetch_tags_node)
graph.add_node("recommend", recommend_node)
graph.add_node("error_handler", error_handler_node)

# Define conditional routing function
def route_after_fetch_tags(state: DiagnosticState) -> str:
    """Route to recommend if ok, error_handler if timeout."""
    if state.tool_status == "ok":
        return "recommend"
    elif state.tool_status == "timeout":
        return "error_handler"
    else:
        return END

# Add edges
graph.set_entry_point("parse_alarm")
graph.add_edge("parse_alarm", "fetch_tags")
graph.add_conditional_edges("fetch_tags", route_after_fetch_tags)
graph.add_edge("recommend", END)
graph.add_edge("error_handler", END)

workflow = graph.compile()

Explanation

  • - add_node() registers functions as workflow steps.
  • - add_edge() creates unconditional transitions (parse_alarm β†’ fetch_tags).
  • - add_conditional_edges() routes based on state (fetch_tags β†’ recommend OR error_handler).
  • - END is a special marker for terminal nodes.

Takeaway

Conditional edges let tool failures route to fallback nodes instead of crashing the workflow.

Step 3: Run the Graph and Observe Routing

Your graph is now assembled. To execute it, you call workflow.invoke(initial_state) and let LangGraph handle the node execution and routing logic. You don't manually call each nodeβ€”the graph engine does that for you based on the edges you defined.

Watch what happens when fetch_tags_node times out. Instead of crashing, the workflow routes to error_handler_node, which produces a fallback recommendation. The operator gets actionable guidance even when the simulated tag backend is unreachable.

Run the cell below multiple times. Because we simulate a 30% timeout rate, you will sometimes see a successful fetch (routes to recommend) and sometimes see a timeout (routes to error_handler). This randomness mimics real plant networks.

7

EXPERIMENT CELL

Execute the workflow with failure routing

experiment

Run the graph and see it route to error_handler on timeout.

Python
import random

original_random = random.random
random.random = lambda: 0.0  # Force the timeout branch for a reproducible example

initial_state = DiagnosticState(
    line_id="Packaging-1",
    alarm_code="A305"
)

try:
    result = workflow.invoke(initial_state)
finally:
    random.random = original_random

print("=== Workflow Result ===")
print(f"Tool status: {result['tool_status']}")
print(f"Recommendation: {result['recommendation']}")
print(f"Observations: {result['observations']}")
Expected output
=== Workflow Result ===
Tool status: timeout
Recommendation: Fallback: Review recent alarms and manual override history.
Observations: ['Alarm A305 detected on Packaging-1', 'Tool failure detected: OPC UA timeout to Edge Gateway']

Explanation

  • - Even though fetch_tags failed, the workflow DID NOT crash.
  • - The graph routed to error_handler, which produced a fallback recommendation.
  • - Operators get actionable guidance even when tools fail.

Why this matters

This is the power of graphs: failures are first-class paths, not exceptional crashes.

Step 4: Add Checkpointing for Resumability

So far, our graph handles failures by routing to fallback nodes. But what happens if the failure is transient, like a 5-second network blip that recovers on its own? Rerunning the entire workflow from the start wastes API calls (and money) for nodes that already succeeded.

Checkpointing solves this problem. LangGraph automatically snapshots your state after each node executes. If a workflow fails partway through, you can retrieve the last checkpoint and resume from where it left off without re-executing expensive nodes like LLM calls.

Checkpointing also enables human-in-the-loop workflows: you can pause a workflow before a risky action (e.g., "approve this PLC code change"), store the checkpoint, and resume after a human reviews and approves.

In the code below, we use MemorySaver() for checkpointing. This stores checkpoints in memory (fine for tutorials). In production, you would use SqliteSaver or a database-backed checkpointer so workflows survive restarts.

8

CORE CELL

Enable checkpointing with MemorySaver

core

Add state persistence so you can resume workflows after interruptions.

Python
import random
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
workflow_with_checkpoints = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["recommend"],
)

config = {"configurable": {"thread_id": "diag-session-1"}}
original_random = random.random
random.random = lambda: 0.99  # Force the success branch so the graph pauses before recommend
try:
    workflow_with_checkpoints.invoke(initial_state, config=config)
finally:
    random.random = original_random
snapshot = workflow_with_checkpoints.get_state(config)

print(f"Paused before node(s): {snapshot.next}")
print(f"Current tool status: {snapshot.values['tool_status']}")

Explanation

  • - MemorySaver() stores checkpoints in memory (use SqliteSaver for production).
  • - thread_id groups checkpoints for a specific workflow run.
  • - interrupt_before=["recommend"] pauses the graph at a known checkpoint so resumption is easy to see.
  • - After each node, state is snapshotted automatically.

Takeaway

Checkpointing makes workflows resilient to transient failures. You do not lose progress.

Now let's see checkpointing in action with a clean pause/resume example. We deliberately pause the graph right before the recommend node, inspect the saved checkpoint, then resume without re-running the earlier nodes.

LangGraph handles resumption via thread_id. When you call invoke() again with the same config after an interrupt, LangGraph loads the last checkpoint and continues from state.next. This same mechanism supports human approvals, retries, and long-running workflows.

9

EXPERIMENT CELL

Inspect checkpoint state and resume

experiment

Demonstrate how to inspect a saved checkpoint and resume from the paused node.

Python
# Inspect the latest checkpoint for this thread
state = workflow_with_checkpoints.get_state(config)
print(f"Last checkpoint values: {state.values}")
print(f"Next node(s) to run: {state.next}")

# Resume from the interrupt point.
# LangGraph loads the checkpoint and continues with the paused node.
resumed_result = workflow_with_checkpoints.invoke(None, config=config)

print(f"Resumed recommendation: {resumed_result['recommendation']}")
Expected output
Last checkpoint values: {'line_id': 'Packaging-1', 'alarm_code': 'A305', 'observations': ['Alarm A305 detected on Packaging-1'], 'tool_status': 'ok', 'tags': {'PE_14_Status': False, 'VFD_Speed': 0}, 'recommendation': '', 'error_message': ''}
Next node(s) to run: ('recommend',)
Resumed recommendation: Check PE-14: status=False

Explanation

  • - get_state(config) returns the latest checkpoint for the given thread_id.
  • - state.next tells you which node(s) will run next when you resume.
  • - invoke(None, config) resumes from the interrupt checkpoint. There is no need to rebuild prior state manually.

Why this matters

In production, the same pause/resume mechanism is what lets you recover from interruptions without replaying the whole workflow.

3️⃣ KEY CONCEPTS: WHAT YOU JUST BUILT

Nodes vs Edges

Nodes are units of work. They receive state, do something, and return updated state. Edges define transitions. They can be unconditional ("always go here") or conditional ("go here if X, else go there").

NODE FLOW: NORMAL AND FAILURE ROUTES

graph LR
    P[parse_alarm<br/>adds observation]:::cyan
    F[fetch_tags<br/>simulated tag read, may timeout]:::pink
    R[recommend<br/>build recommendation]:::green
    E[error_handler<br/>fallback guidance]:::amber

    P -->|always| F
    F -->|status ok| R
    F -->|status timeout| E

    classDef cyan fill:#1a1a1e,stroke:#04d9ff,stroke-width:2px,color:#04d9ff;
    classDef pink fill:#1a1a1e,stroke:#ff4fd8,stroke-width:2px,color:#ff4fd8;
    classDef green fill:#1a1a1e,stroke:#00ff7f,stroke-width:2px,color:#00ff7f;
    classDef amber fill:#1a1a1e,stroke:#fec20b,stroke-width:2px,color:#fec20b;

Typed State with Pydantic

State is the "memory" of the workflow. Pydantic ensures every node sees a consistent schema. If a node tries to write an invalid field, Pydantic raises a validation error BEFORE the graph runs.

Checkpointing and Resumability

Checkpoints = snapshots of state after each node. This enables:

  • Resuming workflows after transient failures (network blips)
  • Human-in-the-loop gates (pause for approval, then resume)
  • Debugging (inspect state at any step)
  • Audit trails (replay exactly what the agent saw)
10

CHECKPOINT CELL

Mental Model Check

checkpoint

Confirm you understand the core LangGraph primitives.

Explanation

  • - Nodes = functions that update state.
  • - Edges = transitions (unconditional or conditional).
  • - State = Pydantic-validated shared memory.
  • - Checkpoints = snapshots enabling resume + audit.

Takeaway

These four primitives turn brittle chains into resilient, auditable workflows.

4️⃣ OPTIONAL: HOW GRAPHS WORK INTERNALLY

⚠️ This section is optional.
If you just want to USE LangGraph, skip this and move to D5 (MCP Tooling).
This section shows how a minimal graph engine works under the hood.

Understanding the internals helps you debug complex workflows and appreciate why LangGraph is designed the way it is. We'll build a 50-line mini-StateGraph to see the core logic.

11

SETUP CELL

Build a minimal graph engine (50 lines)

setup

See how nodes, edges, and routing work under the hood.

Python
from typing import Callable, Dict, Any

class MiniStateGraph:
    """Minimal graph engine to understand LangGraph internals."""
    def __init__(self):
        self.nodes: Dict[str, Callable] = {}
        self.edges: Dict[str, str | Callable] = {}
        self.entry_point: str = ""

    def add_node(self, name: str, fn: Callable):
        self.nodes[name] = fn

    def add_edge(self, from_node: str, to_node: str):
        self.edges[from_node] = to_node

    def add_conditional_edges(self, from_node: str, routing_fn: Callable):
        self.edges[from_node] = routing_fn

    def set_entry_point(self, node_name: str):
        self.entry_point = node_name

    def invoke(self, state: Any) -> Any:
        """Execute the graph from entry_point until END."""
        current = self.entry_point
        while current != "END":
            node_fn = self.nodes[current]
            state = node_fn(state)
            next_edge = self.edges.get(current)
            if callable(next_edge):
                current = next_edge(state)
            else:
                current = next_edge
        return state

# Test it with our diagnostic workflow
mini_graph = MiniStateGraph()
mini_graph.add_node("parse_alarm", parse_alarm_node)
mini_graph.add_node("fetch_tags", fetch_tags_node)
mini_graph.add_node("recommend", recommend_node)
mini_graph.add_node("error_handler", error_handler_node)
mini_graph.set_entry_point("parse_alarm")
mini_graph.add_edge("parse_alarm", "fetch_tags")
mini_graph.add_conditional_edges("fetch_tags", route_after_fetch_tags)
mini_graph.add_edge("recommend", "END")
mini_graph.add_edge("error_handler", "END")

result = mini_graph.invoke(DiagnosticState(line_id="Test-1", alarm_code="T100"))
print(f"Mini-graph result: {result.recommendation}")

Explanation

  • - add_node() stores functions in a dict.
  • - add_edge() stores transitions (static or dynamic).
  • - invoke() loops: execute node β†’ determine next β†’ repeat until END.
  • - Conditional edges are just functions that return the next node name.

Takeaway

At its core, a graph is just: run node β†’ check routing β†’ run next node. LangGraph adds validation, checkpointing, and distributed execution.

βœ… KEY TAKEAWAYS

  • βœ… Graphs > Chains for industrial workflows because failures are first-class paths, not crashes.
  • βœ… Nodes are pure functions (State β†’ State). They do NOT raise exceptions for expected failures.
  • βœ… Edges define routing: unconditional ("always") or conditional ("if tool_status == timeout, go to error_handler").
  • βœ… Typed state (Pydantic) prevents schema bugs and makes workflows easier to understand.
  • βœ… Checkpoints enable resume after transient failures and human-in-the-loop gates.
  • βœ… LangGraph is production-grade: it adds distributed execution, persistence, and observability on top of the core graph primitive.

πŸ”œ NEXT TUTORIAL

D6 β€” MCP Tooling & Architecture Basics

Wrap plant data sources behind read-only tool servers with clear security boundaries.