🟣 Developer Track
Tutorial 2 of 16
🟣 DEVELOPER TRACK β€’ INTERMEDIATE

Tutorial D2: Async Patterns & Error Handling for Agents

Making agent workers predictable when the plant, network, and APIs misbehave.

βœ… CORE MISSION OF THIS TUTORIAL

By the end of this tutorial, the reader will be able to:

  • βœ… Design async worker loops that process industrial agent jobs without blocking.
  • βœ… Classify failures into retryable vs fatal paths with clear state transitions.
  • βœ… Apply exponential backoff with jitter to protect APIs and plant networks.
  • βœ… Limit concurrency so multiple workers don’t overload PLCs or OT gateways.
  • βœ… Lay the foundation for queue-based multi-agent coordination in later Developer tutorials.

What You'll Build

This tutorial builds two worker styles to show the evolution from naive scripts to resilient production agents:

  • 1. Naive synchronous loop that stops on the first error
  • 2. Async worker pool with retries, backoff, and concurrency limits

All code cells below are complete and runnable in your own environment.

1️⃣ CONCEPT OVERVIEW β€” WHY ASYNC MATTERS FOR AGENT WORKERS

In Tutorial D1 you focused on clean, testable Python that can talk to industrial data sources and LLMs. D2 is where that code starts to look like a real worker: multiple jobs in a queue, intermittent API failures, rate limits in the middle of a shift change, and operators expecting answers in seconds.

A naive worker script often looks like this:

for job in jobs:
  call_llm(job)
  save_result(job)

It works in a demo. In a plant with unstable Wi-Fi to the edge box, occasional VPN drops, and shared model quotas across teams, it becomes a source of silent failure: jobs are dropped, half-run, or retried so aggressively that rate limits kick in for everyone.

Key Principle: Agent workers must fail loudly on paper, not silently in production.

This tutorial shows how to wrap your agent calls in async, retry, and backoff patterns so an alarm-diagnosis worker behaves like a well-tuned piece of industrial equipment: predictable, bounded, and easy to reason about, even when upstream services misbehave.

2️⃣ FROM SYNC SCRIPTS TO RESILIENT AGENT WORKERS

Before touching code, it helps to picture the architecture you are aiming for. Instead of "one loop that sometimes crashes", think in terms of queues, workers, and backpressure.

graph LR
    Q[Alarm Jobs Queue<br/>DB / message bus]:::cyan --> W1[Agent Worker 1<br/>async + retry]:::purple
    Q --> W2[Agent Worker 2<br/>async + retry]:::purple
    W1 --> L[Diagnosis Log<br/>advisory only]:::green
    W2 --> L
    L --> H[Human Review<br/>controls engineer]:::green

    classDef cyan fill:#04d9ff,stroke:#04d9ff,color:#000;
    classDef purple fill:#6366f1,stroke:#6366f1,color:#fff;
    classDef green fill:#00ff7f,stroke:#00ff7f,color:#000;

In this tutorial, the β€œqueue” is just an in-memory list so you can focus on the worker behavior. Real deployments would swap this for a message bus, database table, or event-driven pipeline in the System track.

  • β–Έ Synchronous script β€” one job at a time, crashes kill the whole batch.
  • β–Έ Async worker pool β€” multiple jobs in flight, controlled concurrency, graceful shutdowns.
  • β–Έ Retry + backoff β€” rate limits and network blips are absorbed instead of amplified.

3️⃣ ERROR CLASSES β€” RETRY, BACKOFF, AND FAIL STATES

When you build agent workers, the important design decision is not β€œhow do I catch every exception?” but β€œwhat should happen to this job after a failure?”

A simple yet effective classification for industrial agent calls:

  • β–Έ Retryable failures β€” rate limits, transient network issues, occasional 5xx responses. Handle with exponential backoff + jitter and a max attempt count.
  • β–Έ Non-retryable failures β€” invalid request payloads, schema mismatches, obvious prompt bugs. Fail the job and surface the error.
  • β–Έ Plant-side failures β€” PLC tag reads failing, timeouts against OT gateways. Often require different retry windows than LLM calls and clear observability.
Retry window example:
  β€’ LLM call retries: 0.5s β†’ 1s β†’ 2s β†’ 4s (max 8s)
  β€’ PLC read retries: 2s β†’ 4s β†’ 8s (max 30s, then alarm + stop)

In the experiments below you will implement async workers that treat OpenAI call issues as retryable up to a limit, then mark jobs as failed in a way that is easy to see in logs and dashboards. This prepares you for queue-based coordination and monitoring in later Developer and System track tutorials.

4️⃣ HANDS-ON EXPERIMENTS β€” BUILDING A RESILIENT WORKER LOOP

You will build two versions of an β€œalarm diagnosis” worker: a naive synchronous loop that drops jobs on failure, and an async worker pool with retry and backoff. Both use the same industrial scenario: diagnosing alarms on packaging and filling lines.

Experiment 1 β€” Naive Synchronous Worker (What Goes Wrong)

1

SETUP CELL

Define an alarm job model and sample workload

setup

Create a small, realistic job list that simulates alarms arriving from multiple lines.

Python
from dataclasses import dataclass
from typing import Literal

from openai import OpenAI

client = OpenAI()


@dataclass
class AlarmJob:
    id: int
    line: str
    alarm_code: str
    description: str
    status: Literal["pending", "completed", "failed"] = "pending"
    attempts: int = 0


jobs: list[AlarmJob] = [
    AlarmJob(
        id=1,
        line="Packaging-1",
        alarm_code="A101",
        description="Photo-eye blocked at infeed",
    ),
    AlarmJob(
        id=2,
        line="Packaging-1",
        alarm_code="A305",
        description="Motor overload detected on discharge conveyor",
    ),
    AlarmJob(
        id=3,
        line="Filler-2",
        alarm_code="F212",
        description="Level sensor unstable during fill cycle",
    ),
]

Explanation

  • - Each AlarmJob represents one diagnosis task for your agent worker.
  • - status and attempts give you minimal state tracking for later retries.
  • - This list stands in for a queue or database table in production.

Why this matters

Real plants care less about individual stack traces and more about which jobs never finished.

Common mistake

Skipping explicit status fields makes it hard to tell which jobs silently disappeared.

Takeaway

Even simple dataclasses give you enough structure to track job state across retries.

2

SETUP CELL

Experiment 1A β€” Define the system prompt for fault analysis

setup

Create a system prompt that keeps LLM outputs focused on practical troubleshooting.

Python
SYSTEM_PROMPT = (
    "You are an industrial fault analysis assistant. "
    "You output short, concrete troubleshooting steps for a controls engineer. "
    "You never guess about safety-rated behavior."
)

Explanation

  • - The system prompt sets clear boundaries: practical troubleshooting only, no safety guesses.
  • - This prompt will be reused across both the naive sync and async implementations.
  • - Industrial prompts should be deterministic and focused on actionable outputs, not creative writing.

Takeaway

System prompts act as the instruction manual for your LLM worker β€” be explicit about constraints.

3

SETUP CELL

Experiment 1A β€” Build investigate_alarm function

setup

Create a simple function that calls OpenAI to diagnose one alarm job.

Python
# Continuing from previous cells: AlarmJob, client, and SYSTEM_PROMPT
def investigate_alarm(job: AlarmJob) -> str:
    """
    Call OpenAI to diagnose a single alarm.
    No retry logic β€” fails immediately if the API throws an error.
    """
    user_content = (
        "You are helping diagnose an alarm on a production line. "
        "Use 3-5 bullet points and keep total output < 120 words.\n\n"
        f"Line: {job.line}\n"
        f"Alarm code: {job.alarm_code}\n"
        f"Description: {job.description}\n"
    )

    completion = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_content},
        ],
        temperature=0.1,
    )
    return completion.choices[0].message.content

Explanation

  • - This snippet depends on the AlarmJob model, OpenAI client, and SYSTEM_PROMPT defined in the previous cells.
  • - The user_content template injects job details (line, alarm code, description) into the prompt.
  • - temperature=0.1 keeps outputs consistent and reduces creative variance.
  • - This function has no error handling β€” any API exception will propagate immediately and stop the caller.
  • - Cost: ~$0.002-$0.01 per call with gpt-4o-mini for short prompts.

Takeaway

Without retry logic, one transient API error will crash your entire worker loop.

4

EXPERIMENT CELL

Experiment 1A β€” Running the naive synchronous loop

experiment

See how a single exception can stop the whole batch and leave jobs unprocessed.

Python
from dataclasses import dataclass
from typing import Literal
from openai import OpenAI

client = OpenAI()

@dataclass
class AlarmJob:
    id: int
    line: str
    alarm_code: str
    description: str
    status: Literal["pending", "completed", "failed"] = "pending"
    attempts: int = 0

SYSTEM_PROMPT = (
    "You are an industrial fault analysis assistant. "
    "You output short, concrete troubleshooting steps for a controls engineer. "
    "You never guess about safety-rated behavior."
)

def investigate_alarm(job: AlarmJob) -> str:
    user_content = (
        "You are helping diagnose an alarm on a production line. "
        "Use 3-5 bullet points and keep total output < 120 words.\n\n"
        f"Line: {job.line}\n"
        f"Alarm code: {job.alarm_code}\n"
        f"Description: {job.description}\n"
    )
    completion = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_content},
        ],
        temperature=0.1,
    )
    return completion.choices[0].message.content

def run_sync(jobs_to_run: list[AlarmJob]) -> None:
    """Process jobs sequentially with no error handling."""
    for job in jobs_to_run:
        print(f"Processing job {job.id}...")
        # ❌ No retry or error handling here on purpose
        advice = investigate_alarm(job)
        job.status = "completed"
        print(f"Completed job {job.id}: {advice[:80]!r}...")

# Example jobs
jobs = [
    AlarmJob(id=1, line="Packaging-1", alarm_code="A101", description="Photo-eye blocked at infeed"),
    AlarmJob(id=2, line="Packaging-1", alarm_code="A305", description="Motor overload detected on discharge conveyor"),
    AlarmJob(id=3, line="Filler-2", alarm_code="F212", description="Level sensor unstable during fill cycle"),
]

run_sync(jobs)
Expected output
Processing job 1...
Completed job 1: '- Check for obstructions in the photo-eye path at the infeed station....'
Processing job 2...
Completed job 2: '- Verify motor load is within rated capacity (check nameplate vs. current....'
Processing job 3...
Completed job 3: '- Inspect level sensor wiring for loose connections or corrosion....'

Explanation

  • - This loop looks fine until one OpenAI call throws (for example, a rate limit or network glitch).
  • - As soon as an exception is raised, run_sync stops and later jobs never run.
  • - There is no per-job error reporting: operators only see that some alarms never got a diagnosis.
  • - If job 2 fails, job 3 never starts and remains stuck in "pending" status forever.

Why this matters

In industrial settings, you must be able to point to every job and say completed, failed, or pending.

Common mistake

Relying on top-level try/except around main() only logs failures and still loses job context.

Takeaway

A single uncaught exception can silently drop an entire tail of jobs in a synchronous worker.

5

CHECKPOINT CELL

Checkpoint β€” Why this pattern is risky on a real line

checkpoint

Summarize the failure modes of the naive loop before introducing async workers.

Explanation

  • - If job 2 hits a transient API error, jobs 3–N never start and remain stuck in pending state.
  • - Operators only see "sometimes the diagnosis tool does nothing," which erodes trust quickly.
  • - Retrying the whole batch from the top risks re-running jobs that already completed successfully.
  • - This pattern is acceptable only for small, ad hoc scripts β€” not for recurring plant workflows.

Why this matters

Async without a clear failure model just makes nondeterministic behavior faster and harder to debug.

Takeaway

Before adding async, you must first design job-level failure behavior: what happens to each job when the model or network fails?

Experiment 2 β€” Async Worker Pool with Retry & Backoff

6

SETUP CELL

Set up AsyncOpenAI client and retryable error classes

setup

Introduce asyncio and classify which OpenAI errors should trigger a retry.

Python
import asyncio
import random
from dataclasses import dataclass
from typing import Literal

from openai import (
    AsyncOpenAI,
    APIConnectionError,
    APIError,
    RateLimitError,
)

client = AsyncOpenAI()


@dataclass
class AlarmJob:
    id: int
    line: str
    alarm_code: str
    description: str
    status: Literal["pending", "completed", "failed"] = "pending"
    attempts: int = 0
    last_error: str | None = None


RETRYABLE_ERRORS = (RateLimitError, APIConnectionError, APIError)


SYSTEM_PROMPT = (
    "You are an industrial fault analysis assistant. "
    "You output short, concrete troubleshooting steps for a controls engineer. "
    "You never guess about safety-rated behavior."
)

Explanation

  • - AsyncOpenAI gives you non-blocking calls that integrate with asyncio worker loops.
  • - RETRYABLE_ERRORS groups the types that are usually safe to retry with backoff.
  • - The job model gains attempts and last_error so you can understand failure history later.

Why this matters

In industrial environments you want tight control over what gets retried and how often.

Common mistake

Catching broad Exception and retrying everything can hide genuine bugs in your prompts or payloads.

Takeaway

Classifying retryable errors up front keeps your retry logic simple, explicit, and maintainable.

7

SETUP CELL

Understand exponential backoff with jitter

setup

Learn how retry delays grow exponentially while avoiding thundering herds.

Python
# Exponential backoff pattern:
# delay = base_delay (0.5s)
# After 1st failure: sleep(0.5s + jitter)
# After 2nd failure: sleep(1.0s + jitter)  β€” delay doubled
# After 3rd failure: sleep(2.0s + jitter)  β€” delay doubled again
# Max delay: 8.0s (cap to prevent infinite waits)

# Jitter: random.uniform(0, 0.25)
# Prevents multiple workers from retrying in perfect sync after an outage

Explanation

  • - Exponential backoff doubles the delay after each failure, giving upstream services time to recover.
  • - Jitter adds randomness so multiple workers do not hammer the API at exactly the same moment.
  • - The max_delay cap prevents waits from growing unbounded in rare edge cases.
  • - This pattern is standard for robust distributed systems and scales from 1 to 1000 workers.

Takeaway

Exponential backoff with jitter is the industrial-grade pattern for retrying transient failures.

8

CORE CELL

Implement investigate_alarm_with_retry with exponential backoff

core

Wrap each LLM call in bounded retries with exponential backoff and jitter.

Python
# Continuing from previous cells:
# AlarmJob, client, SYSTEM_PROMPT, RETRYABLE_ERRORS, asyncio, and random
async def investigate_alarm_with_retry(
    job: AlarmJob,
    *,
    max_retries: int = 3,
    base_delay: float = 0.5,
    max_delay: float = 8.0,
) -> str:
    delay = base_delay

    for attempt in range(1, max_retries + 1):
        job.attempts = attempt
        try:
            user_content = (
                "You are helping diagnose an alarm on a production line. "
                "Use 3-5 bullet points and keep total output < 120 words.\n\n"
                f"Line: {job.line}\n"
                f"Alarm code: {job.alarm_code}\n"
                f"Description: {job.description}\n"
            )

            completion = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": user_content},
                ],
                temperature=0.1,
            )
            job.last_error = None
            return completion.choices[0].message.content

        except RETRYABLE_ERRORS as exc:  # type: ignore[misc]
            job.last_error = str(exc)
            is_last_attempt = attempt >= max_retries
            print(
                f"[WARN] Retryable error for job {job.id} "
                f"(attempt {attempt}/{max_retries}): {exc}"
            )
            if is_last_attempt:
                raise

            # Exponential backoff with jitter
            jitter = random.uniform(0, 0.25)
            await asyncio.sleep(delay + jitter)
            delay = min(delay * 2, max_delay)

        except Exception as exc:  # Non-retryable
            job.last_error = str(exc)
            print(f"[ERROR] Non-retryable error for job {job.id}: {exc}")
            raise

Explanation

  • - This function builds on the async client, error classes, and AlarmJob dataclass introduced earlier in the sequence.
  • - Each job tracks how many attempts were made and the latest error message.
  • - Retryable errors use exponential backoff so you do not hammer the API or shared network links.
  • - Non-retryable errors fail fast and bubble up so the worker can mark the job as failed.
  • - Cost: With 3 retries, you pay for up to 3 calls per job in the worst case, but most jobs succeed on the first attempt.

Why this matters

Randomized backoff keeps multiple workers from retrying in lock-step when a shared service recovers.

Common mistake

Using fixed sleep durations for retries creates synchronized thundering herds after outages.

Takeaway

Backoff and bounded retries turn spiky external behavior into smoother, predictable worker behavior.

9

SETUP CELL

Build the worker function that processes jobs from the queue

setup

Understand how a single worker pulls jobs, handles retries, and updates status.

Python
# Continuing from previous cells: asyncio, AlarmJob, and investigate_alarm_with_retry
async def worker(
    name: str,
    queue: "asyncio.Queue[AlarmJob]",
    semaphore: asyncio.Semaphore,
) -> None:
    """
    Pull jobs from the queue until a None sentinel is received.
    Use semaphore to limit concurrent API calls.
    """
    while True:
        job = await queue.get()
        if job is None:  # Shutdown signal
            queue.task_done()
            print(f"[{name}] Shutting down")
            break

        try:
            async with semaphore:
                advice = await investigate_alarm_with_retry(job)
            job.status = "completed"
            print(f"[{name}] Completed job {job.id}")
            print(advice[:80].replace("\n", " ") + "...")
        except Exception as exc:
            job.status = "failed"
            print(f"[{name}] Job {job.id} failed after retries: {exc}")
        finally:
            queue.task_done()

Explanation

  • - This worker depends on the retry function and AlarmJob model defined in prior cells.
  • - The worker runs in an infinite loop until it receives a None job (shutdown signal).
  • - The semaphore ensures only a limited number of workers call the LLM at once.
  • - Job status is set to completed or failed based on the outcome, making failures visible.
  • - queue.task_done() signals that the job has been processed, allowing queue.join() to work correctly.

Takeaway

Each worker is an independent loop that processes jobs until told to shut down.

10

SETUP CELL

Understand the main orchestrator function

setup

Learn how to set up the queue, start workers, and coordinate graceful shutdown.

Python
# Continuing from previous cells: asyncio, AlarmJob, and worker
async def main_async_worker(jobs_to_run: list[AlarmJob]) -> None:
    queue: "asyncio.Queue[AlarmJob]" = asyncio.Queue()
    semaphore = asyncio.Semaphore(3)  # Limit concurrent OpenAI calls

    # Enqueue all jobs
    for job in jobs_to_run:
        await queue.put(job)

    # Start workers
    workers = [
        asyncio.create_task(worker(f"worker-{i+1}", queue, semaphore))
        for i in range(3)
    ]

    # Wait for all jobs to be processed
    await queue.join()

    # Signal workers to shut down
    for _ in workers:
        await queue.put(None)

    await asyncio.gather(*workers)

Explanation

  • - This orchestrator depends on the worker function and AlarmJob model defined earlier in the tutorial.
  • - asyncio.Queue acts as the job queue, and Semaphore(3) limits concurrent API calls.
  • - All jobs are enqueued upfront, then 3 workers are started as concurrent tasks.
  • - await queue.join() blocks until all jobs have been marked done via task_done().
  • - After all jobs complete, None sentinels are sent to shut down each worker gracefully.

Takeaway

The orchestrator coordinates job submission, worker lifecycle, and graceful shutdown.

11

EXPERIMENT CELL

Running the async worker pool with all pieces assembled

experiment

Execute the complete async worker pool and observe concurrent job processing with retries.

Python
import asyncio
import random
from dataclasses import dataclass
from typing import Literal

from openai import (
    AsyncOpenAI,
    APIConnectionError,
    APIError,
    RateLimitError,
)

client = AsyncOpenAI()

@dataclass
class AlarmJob:
    id: int
    line: str
    alarm_code: str
    description: str
    status: Literal["pending", "completed", "failed"] = "pending"
    attempts: int = 0
    last_error: str | None = None

RETRYABLE_ERRORS = (RateLimitError, APIConnectionError, APIError)

SYSTEM_PROMPT = (
    "You are an industrial fault analysis assistant. "
    "You output short, concrete troubleshooting steps for a controls engineer. "
    "You never guess about safety-rated behavior."
)

async def investigate_alarm_with_retry(
    job: AlarmJob,
    *,
    max_retries: int = 3,
    base_delay: float = 0.5,
    max_delay: float = 8.0,
) -> str:
    delay = base_delay
    for attempt in range(1, max_retries + 1):
        job.attempts = attempt
        try:
            user_content = (
                "You are helping diagnose an alarm on a production line. "
                "Use 3-5 bullet points and keep total output < 120 words.\n\n"
                f"Line: {job.line}\n"
                f"Alarm code: {job.alarm_code}\n"
                f"Description: {job.description}\n"
            )
            completion = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": user_content},
                ],
                temperature=0.1,
            )
            job.last_error = None
            return completion.choices[0].message.content
        except RETRYABLE_ERRORS as exc:
            job.last_error = str(exc)
            is_last_attempt = attempt >= max_retries
            print(f"[WARN] Retryable error for job {job.id} (attempt {attempt}/{max_retries}): {exc}")
            if is_last_attempt:
                raise
            jitter = random.uniform(0, 0.25)
            await asyncio.sleep(delay + jitter)
            delay = min(delay * 2, max_delay)
        except Exception as exc:
            job.last_error = str(exc)
            print(f"[ERROR] Non-retryable error for job {job.id}: {exc}")
            raise

async def worker(
    name: str,
    queue: "asyncio.Queue[AlarmJob]",
    semaphore: asyncio.Semaphore,
) -> None:
    while True:
        job = await queue.get()
        if job is None:
            queue.task_done()
            print(f"[{name}] Shutting down")
            break
        try:
            async with semaphore:
                advice = await investigate_alarm_with_retry(job)
            job.status = "completed"
            print(f"[{name}] Completed job {job.id}")
            print(advice[:80].replace("\n", " ") + "...")
        except Exception as exc:
            job.status = "failed"
            print(f"[{name}] Job {job.id} failed after retries: {exc}")
        finally:
            queue.task_done()

async def main_async_worker(jobs_to_run: list[AlarmJob]) -> None:
    queue: "asyncio.Queue[AlarmJob]" = asyncio.Queue()
    semaphore = asyncio.Semaphore(3)
    for job in jobs_to_run:
        await queue.put(job)
    workers = [
        asyncio.create_task(worker(f"worker-{i+1}", queue, semaphore))
        for i in range(3)
    ]
    await queue.join()
    for _ in workers:
        await queue.put(None)
    await asyncio.gather(*workers)

if __name__ == "__main__":
    sample_jobs = [
        AlarmJob(id=1, line="Packaging-1", alarm_code="A101", description="Photo-eye blocked at infeed"),
        AlarmJob(id=2, line="Packaging-1", alarm_code="A305", description="Motor overload detected on discharge conveyor"),
        AlarmJob(id=3, line="Filler-2", alarm_code="F212", description="Level sensor unstable during fill cycle"),
    ]
    asyncio.run(main_async_worker(sample_jobs))
Expected output
[worker-1] Completed job 1
- Check for obstructions in the photo-eye path at the infeed station....
[worker-2] Completed job 2
- Verify motor load is within rated capacity (check nameplate vs. current draw)....
[worker-3] Completed job 3
- Inspect level sensor wiring for loose connections or corrosion....
[worker-1] Shutting down
[worker-2] Shutting down
[worker-3] Shutting down

Explanation

  • - asyncio.Queue models your job queue and lets workers pull jobs independently.
  • - The semaphore caps how many LLM calls can be in flight at once, protecting shared limits.
  • - Each worker cleanly shuts down after a sentinel (None) job is received.
  • - With 3 workers and short prompts, typical runtime is a few seconds and API cost is $0.02–$0.10 for a small batch.

Why this matters

Concurrency limits act like a VFD on your API usage, smoothing peaks when the plant throws many alarms at once.

Common mistake

Letting every worker call the LLM without a concurrency limit can easily trip global rate limits during alarm storms.

Takeaway

An async worker pool with bounded concurrency and retries gives you predictable behavior even under partial failures.

12

CHECKPOINT CELL

Checkpoint β€” Comparing sync vs async worker behavior

checkpoint

Summarize the behavioral differences so you know when to use each pattern.

Explanation

  • - Synchronous workers are simple but brittle: one bad call can kill the batch and lose downstream jobs.
  • - Async workers with retries and backoff keep individual failures contained to each job.
  • - Concurrency limits turn β€œmany simultaneous alarms” into a controlled queue instead of a denial-of-service against your own tools.
  • - Clear job status fields (pending/completed/failed) make it possible to build dashboards and alerts later in the System track.

Why this matters

Once this foundation is solid, adding LangChain, LangGraph, and MCP on top is much safer and easier to reason about.

Takeaway

Treat your agent workers like industrial equipment: bounded, observable, and prepared for upstream failures.

5️⃣ IMPLEMENTATION NOTES β€” COSTS, LIMITS, AND NEXT STEPS

The patterns you implemented here are deliberately conservative: short prompts, low temperature, bounded retries, capped concurrency. That combination keeps token usage low and behavior predictable, which is exactly what you want before integrating with real PLC data or SCADA event streams.

Typical experiments for this tutorial should cost on the order of $2–$4 in API usage, depending on how many times you re-run the async worker pools and how many jobs you simulate. Use this as a baseline when you later wire the worker into richer context like historical alarm logs or documentation RAG.

In D3 you will start introducing LangChain to cleanly manage prompts, tools, and outputs, using the async patterns from this tutorial as the execution backbone.

Further Reading

Official Documentation

Industrial Patterns

βœ… KEY TAKEAWAYS

  • βœ… Synchronous workers are easy to write but can silently drop jobs when a single call fails.
  • βœ… Async workers with bounded retries and exponential backoff absorb transient API and network issues.
  • βœ… Classifying retryable vs non-retryable errors keeps your error handling intentional instead of ad hoc.
  • βœ… Concurrency limits protect shared model quotas and plant networks during alarm storms.
  • βœ… Explicit job state (pending/completed/failed, attempts, last_error) is the foundation for observability.
  • βœ… These patterns are a prerequisite for multi-agent coordination and queue-based architectures later in the track.

πŸ”œ NEXT TUTORIAL

D3 β€” LangChain Essentials for Control Systems

Use LangChain to structure prompts, tools, and outputs on top of your async worker patterns.