Skip to content
Loading

Building a Durable Agent in Python: Crash Recovery, Idempotent Tool Calls, and Audit Logging for Production

Building a Durable Agent in Python: Crash Recovery, Idempotent Tool Calls, and Audit Logging for Production hero image

Building a Durable Agent in Python: Crash Recovery, Idempotent Tool Calls, and Audit Logging for Production

The 2 AM Page That Changed How I Build Agents

Three weeks ago, one of our deployment agents died mid-execution. It had completed steps 1 through 3 of a 5-step rollout: database migration ran, new containers were built and pushed, the load balancer config was updated. Then the process got OOM-killed. Steps 4 and 5 never ran. The old service was drained but the new one never registered itself with the service mesh.

Production was down for 22 minutes while we figured out what had actually completed versus what hadn't. The migration was applied but we couldn't tell if it was the full migration or a partial. The container tag existed in the registry but was it the final image or an intermediate layer push? We had logs, sure, but they were scattered across stdout, and the agent's in-memory state was gone.

I've been thinking about Ditto ever since. The Pokemon that transforms into anything, that can perfectly recover any form it's previously taken. That's what I wanted: an agent that could crash, restart, look at its history, and transform back into exactly the state it was in before resuming from the precise point of failure.

Most agent frameworks treat this as somebody else's problem. They give you tool calling, they give you memory, they give you nice abstractions for chaining LLM calls together. But they assume the process stays alive. In production, processes die. Machines restart. Deploys happen. OOM kills happen. And when your agent is halfway through something with real side effects, "just retry from the beginning" isn't an answer.

Why Agent State Is Harder Than You Think

A typical web request is stateless. If it fails, retry it. A database transaction is atomic. If it fails, rollback. But an agent doing a multi-step deployment occupies this awful middle ground: it's stateful across multiple external side effects that can't be wrapped in a single transaction.

Step 1 mutates a database. Step 2 pushes to a container registry. Step 3 modifies load balancer config. Each of these is a committed, externally-visible change. You can't rollback step 1 after step 3 succeeds without building compensating transactions for every single operation. And most of the time, you don't need rollback. You need resume.

The naive approach is to just store "current_step = 3" somewhere and restart from there. But that misses the actual complexity. What if step 3 partially completed? What if the agent made a decision during step 2 that affects how step 4 should execute? What if the LLM produced a plan that's sitting in memory and never got persisted?

You need the full history. Every decision, every tool invocation, every result. You need event sourcing.

Event Sourcing for Agent Steps

The core idea is stolen directly from event-driven architectures: instead of storing current state, store the sequence of events that produced that state. Replay the events to reconstruct where you are.

For an agent, an "event" is any of these:

  • PlanGenerated: The LLM produced a set of steps to execute
  • ToolCallInitiated: The agent decided to invoke a specific tool with specific arguments
  • ToolCallCompleted: The tool returned a result
  • ToolCallFailed: The tool threw an error
  • DecisionMade: The agent chose between alternatives based on a tool result
  • CheckpointReached: An explicit durability boundary

Here's the event schema I settled on:

from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any
import hashlib
import json
import uuid


class EventType(Enum):
    PLAN_GENERATED = "plan_generated"
    TOOL_CALL_INITIATED = "tool_call_initiated"
    TOOL_CALL_COMPLETED = "tool_call_completed"
    TOOL_CALL_FAILED = "tool_call_failed"
    DECISION_MADE = "decision_made"
    CHECKPOINT_REACHED = "checkpoint_reached"


@dataclass
class AgentEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    run_id: str = ""
    event_type: EventType = EventType.CHECKPOINT_REACHED
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    step_index: int = 0
    tool_name: str | None = None
    tool_args: dict[str, Any] = field(default_factory=dict)
    result: Any = None
    error: str | None = None
    causality_parent: str | None = None
    idempotency_key: str | None = None

    def to_dict(self) -> dict[str, Any]:
        return {
            "event_id": self.event_id,
            "run_id": self.run_id,
            "event_type": self.event_type.value,
            "timestamp": self.timestamp.isoformat(),
            "step_index": self.step_index,
            "tool_name": self.tool_name,
            "tool_args": self.tool_args,
            "result": self.result,
            "error": self.error,
            "causality_parent": self.causality_parent,
            "idempotency_key": self.idempotency_key,
        }

The causality_parent field is key. It links each event to the event that caused it, giving you a full causal chain. When you're debugging why step 4 did something unexpected, you can trace backwards through the chain and see exactly what information the agent had at each decision point.

The idempotency_key is what makes safe resumption possible. More on that shortly.

The Event Store: SQLite Is Enough

I know, I know. "Just use Postgres" or "just use Kafka." For a local agent that runs on a single machine, SQLite is perfect. It's ACID-compliant, it's a single file, it survives process crashes (that's literally what WAL mode is for), and you don't need to run infrastructure.

import sqlite3
from contextlib import contextmanager
from pathlib import Path


class EventStore:
    def __init__(self, db_path: Path):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with self._conn() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS events (
                    event_id TEXT PRIMARY KEY,
                    run_id TEXT NOT NULL,
                    event_type TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    step_index INTEGER NOT NULL,
                    tool_name TEXT,
                    tool_args TEXT,
                    result TEXT,
                    error TEXT,
                    causality_parent TEXT,
                    idempotency_key TEXT,
                    UNIQUE(run_id, idempotency_key)
                )
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_run_id
                ON events(run_id, step_index)
            """)

    @contextmanager
    def _conn(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA synchronous=NORMAL")
        try:
            yield conn
            conn.commit()
        finally:
            conn.close()

    def append(self, event: AgentEvent) -> None:
        with self._conn() as conn:
            d = event.to_dict()
            conn.execute(
                """INSERT OR IGNORE INTO events
                (event_id, run_id, event_type, timestamp, step_index,
                 tool_name, tool_args, result, error, causality_parent,
                 idempotency_key)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    d["event_id"], d["run_id"], d["event_type"],
                    d["timestamp"], d["step_index"], d["tool_name"],
                    json.dumps(d["tool_args"]), json.dumps(d["result"]),
                    d["error"], d["causality_parent"], d["idempotency_key"],
                ),
            )

    def get_run_events(self, run_id: str) -> list[dict[str, Any]]:
        with self._conn() as conn:
            cursor = conn.execute(
                "SELECT * FROM events WHERE run_id = ? ORDER BY step_index, timestamp",
                (run_id,),
            )
            columns = [desc[0] for desc in cursor.description]
            return [dict(zip(columns, row)) for row in cursor.fetchall()]

    def has_completed_step(self, run_id: str, idempotency_key: str) -> bool:
        with self._conn() as conn:
            cursor = conn.execute(
                """SELECT result FROM events
                WHERE run_id = ? AND idempotency_key = ?
                AND event_type = 'tool_call_completed'""",
                (run_id, idempotency_key),
            )
            row = cursor.fetchone()
            return row is not None

Notice the INSERT OR IGNORE combined with the unique constraint on (run_id, idempotency_key). If the agent crashes after writing an event but before advancing its internal pointer, replaying will attempt to re-insert the same event. The database silently ignores the duplicate. No corruption. No double-writes.

Idempotent Tool Calls: The Actually Hard Part

Recording events is the easy half. The hard half is making sure that when you replay, you don't re-execute tool calls that already succeeded. If step 2 pushed a container image and the agent crashed during step 3, restarting shouldn't push the image again. Maybe it's fine for that specific operation (container pushes are usually idempotent by tag). But what about sending a Slack notification? Triggering a webhook? Running a database migration?

The pattern I use: every tool call gets an idempotency key derived from the run ID, step index, and tool arguments. Before executing, check the event store. If there's already a ToolCallCompleted event with that key, skip execution and return the stored result.

def make_idempotency_key(run_id: str, step_index: int, tool_name: str, tool_args: dict) -> str:
    payload = json.dumps(
        {"run_id": run_id, "step": step_index, "tool": tool_name, "args": tool_args},
        sort_keys=True,
    )
    return hashlib.sha256(payload.encode()).hexdigest()[:16]


class DurableToolExecutor:
    def __init__(self, event_store: EventStore, tools: dict[str, callable]):
        self.event_store = event_store
        self.tools = tools

    def execute(
        self,
        run_id: str,
        step_index: int,
        tool_name: str,
        tool_args: dict[str, Any],
        causality_parent: str | None = None,
    ) -> Any:
        idem_key = make_idempotency_key(run_id, step_index, tool_name, tool_args)

        # Check if already completed
        if self.event_store.has_completed_step(run_id, idem_key):
            events = self.event_store.get_run_events(run_id)
            for e in events:
                if e["idempotency_key"] == idem_key and e["event_type"] == "tool_call_completed":
                    return json.loads(e["result"])

        # Record initiation
        init_event = AgentEvent(
            run_id=run_id,
            event_type=EventType.TOOL_CALL_INITIATED,
            step_index=step_index,
            tool_name=tool_name,
            tool_args=tool_args,
            causality_parent=causality_parent,
            idempotency_key=f"{idem_key}_init",
        )
        self.event_store.append(init_event)

        # Execute
        try:
            result = self.tools[tool_name](**tool_args)
        except Exception as exc:
            fail_event = AgentEvent(
                run_id=run_id,
                event_type=EventType.TOOL_CALL_FAILED,
                step_index=step_index,
                tool_name=tool_name,
                tool_args=tool_args,
                error=str(exc),
                causality_parent=init_event.event_id,
                idempotency_key=f"{idem_key}_fail",
            )
            self.event_store.append(fail_event)
            raise

        # Record completion
        complete_event = AgentEvent(
            run_id=run_id,
            event_type=EventType.TOOL_CALL_COMPLETED,
            step_index=step_index,
            tool_name=tool_name,
            tool_args=tool_args,
            result=result,
            causality_parent=init_event.event_id,
            idempotency_key=idem_key,
        )
        self.event_store.append(complete_event)
        return result

There's a subtlety here. What if the tool call succeeds but the process crashes before writing the ToolCallCompleted event? On resume, the agent won't find the completion event and will re-execute. For truly non-idempotent operations (like charging a credit card), you need the tool itself to accept an idempotency key and check on its end. This is defense in depth: the event store handles the common case, and the tool's own idempotency mechanism handles the edge case.

The Durable Agent Runner

Now we put it all together. The runner takes a plan (a list of steps), an event store, and a tool executor. On start, it replays existing events to figure out where it left off, then continues from there.

@dataclass
class Step:
    index: int
    tool_name: str
    tool_args: dict[str, Any]
    description: str


class DurableAgentRunner:
    def __init__(self, event_store: EventStore, executor: DurableToolExecutor):
        self.event_store = event_store
        self.executor = executor

    def run(self, run_id: str, steps: list[Step]) -> dict[str, Any]:
        # Replay: find the last completed step
        events = self.event_store.get_run_events(run_id)
        completed_steps = set()
        results = {}

        for event in events:
            if event["event_type"] == "tool_call_completed":
                completed_steps.add(event["step_index"])
                results[event["step_index"]] = json.loads(event["result"])

        last_completed = max(completed_steps) if completed_steps else -1

        if last_completed >= 0:
            print(f"[resume] Resuming run {run_id[:8]}... from step {last_completed + 1}")
            print(f"[resume] Steps already completed: {sorted(completed_steps)}")

        # Execute remaining steps
        for step in steps:
            if step.index in completed_steps:
                continue

            print(f"[step {step.index}] {step.description}")

            # Inject previous results into tool args if referenced
            resolved_args = self._resolve_args(step.tool_args, results)

            result = self.executor.execute(
                run_id=run_id,
                step_index=step.index,
                tool_name=step.tool_name,
                tool_args=resolved_args,
                causality_parent=events[-1]["event_id"] if events else None,
            )
            results[step.index] = result

            # Checkpoint after each step
            checkpoint = AgentEvent(
                run_id=run_id,
                event_type=EventType.CHECKPOINT_REACHED,
                step_index=step.index,
                result={"completed_through": step.index},
                idempotency_key=f"checkpoint_{step.index}",
            )
            self.event_store.append(checkpoint)

        return results

    def _resolve_args(self, args: dict, results: dict) -> dict:
        """Replace $step_N references with actual results from prior steps."""
        resolved = {}
        for key, value in args.items():
            if isinstance(value, str) and value.startswith("$step_"):
                step_ref = int(value.split("_")[1])
                resolved[key] = results.get(step_ref)
            else:
                resolved[key] = value
        return resolved

The _resolve_args method is a small but important detail. Steps often depend on results from previous steps. When replaying, those results come from the event store rather than from live execution. The agent doesn't care which source provided them.

A Real Example: 5-Step Deployment

Let me show this in action. Here's a deployment agent that runs database migrations, builds and pushes a container, updates the load balancer, registers with the service mesh, and runs a health check.

from pathlib import Path

# Define the tools
def run_migration(schema_version: str, database_url: str) -> dict:
    """Apply database migration. Checks current version first (idempotent)."""
    # In reality, this calls alembic or a migration runner
    # The migration runner itself should be idempotent (IF NOT EXISTS, etc.)
    print(f"  Applying migration to version {schema_version}")
    return {"applied_version": schema_version, "tables_modified": 3}


def build_and_push_image(service_name: str, git_sha: str, registry: str) -> dict:
    """Build container and push to registry. Tag makes this idempotent."""
    tag = f"{registry}/{service_name}:{git_sha}"
    print(f"  Building and pushing {tag}")
    return {"image_tag": tag, "size_mb": 142}


def update_load_balancer(service_name: str, image_tag: str) -> dict:
    """Update LB target group to new container version."""
    print(f"  Updating LB for {service_name} -> {image_tag}")
    return {"target_group": f"tg-{service_name}", "healthy_targets": 3}


def register_service_mesh(service_name: str, image_tag: str) -> dict:
    """Register new version with service mesh. Re-registration is safe."""
    print(f"  Registering {service_name} in mesh")
    return {"mesh_endpoint": f"{service_name}.internal:8080", "version": image_tag}


def run_health_check(endpoint: str) -> dict:
    """Hit the health endpoint and verify response."""
    print(f"  Health checking {endpoint}")
    return {"status": "healthy", "latency_ms": 12}


# Wire it up
def deploy(run_id: str | None = None):
    store = EventStore(Path("./agent_state.db"))
    tools = {
        "run_migration": run_migration,
        "build_and_push_image": build_and_push_image,
        "update_load_balancer": update_load_balancer,
        "register_service_mesh": register_service_mesh,
        "run_health_check": run_health_check,
    }
    executor = DurableToolExecutor(store, tools)
    runner = DurableAgentRunner(store, executor)

    run_id = run_id or str(uuid.uuid4())
    print(f"Deploy run: {run_id}")

    steps = [
        Step(0, "run_migration", {"schema_version": "v42", "database_url": "postgres://..."}, "Apply database migration"),
        Step(1, "build_and_push_image", {"service_name": "payment-api", "git_sha": "a1b2c3d", "registry": "gcr.io/prod"}, "Build and push container"),
        Step(2, "update_load_balancer", {"service_name": "payment-api", "image_tag": "$step_1"}, "Update load balancer"),
        Step(3, "register_service_mesh", {"service_name": "payment-api", "image_tag": "$step_1"}, "Register with service mesh"),
        Step(4, "run_health_check", {"endpoint": "$step_3"}, "Run health check"),
    ]

    results = runner.run(run_id, steps)
    print(f"\nDeploy complete. Results: {json.dumps(results, indent=2, default=str)}")
    return run_id

First run executes all 5 steps. Kill the process during step 3. Restart with the same run_id. It replays from SQLite, sees steps 0-2 are done, skips them, and picks up at step 3. The migration doesn't run twice. The image doesn't get pushed twice. The load balancer doesn't get reconfigured.

Audit Logging: The Causality Chain

The event store is already your audit log. But for operational use, you want something more queryable. I add a structured logging layer that writes JSON lines with the full causality chain:

import logging
import sys


class AuditLogger:
    def __init__(self, run_id: str):
        self.run_id = run_id
        self.logger = logging.getLogger(f"agent.audit.{run_id[:8]}")
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_event(self, event: AgentEvent, context: dict[str, Any] | None = None):
        entry = {
            "ts": event.timestamp.isoformat(),
            "run_id": self.run_id,
            "event_id": event.event_id,
            "type": event.event_type.value,
            "step": event.step_index,
            "tool": event.tool_name,
            "caused_by": event.causality_parent,
            "idempotency_key": event.idempotency_key,
        }
        if context:
            entry["context"] = context
        if event.error:
            entry["error"] = event.error

        self.logger.info(json.dumps(entry))

    def trace_causality(self, event_store: EventStore, event_id: str) -> list[str]:
        """Walk backwards through the causality chain."""
        chain = []
        events = event_store.get_run_events(self.run_id)
        event_map = {e["event_id"]: e for e in events}

        current = event_id
        while current and current in event_map:
            chain.append(current)
            current = event_map[current].get("causality_parent")

        return list(reversed(chain))

When something goes wrong at 2 AM, you can pull the causality chain for the failed event and see exactly what sequence of decisions led there. No more grepping through unstructured logs trying to reconstruct what happened.

What This Doesn't Solve

I want to be honest about limitations. This pattern works great for deterministic agent workflows where the plan is known upfront. If your agent is doing open-ended ReAct-style reasoning where each step depends on an LLM call that might produce different output on replay, you need additional machinery. You'd need to store the LLM responses as events too, and on replay, feed the stored responses back instead of calling the LLM again.

There's also the question of timeouts. If a tool call hangs forever and the operator kills the process, the event store shows ToolCallInitiated with no completion. Is it safe to retry? Depends on the tool. You need per-tool retry policies and possibly a "check if the last attempt actually succeeded" hook before retrying.

And distributed agents -- multiple processes, multiple machines -- that's a different beast. You'd swap SQLite for Postgres with advisory locks, or use something like Temporal which solves this problem at the orchestration layer. But for single-machine agents doing real work, this SQLite approach has been solid for me for months now.

Ditto's Lesson

Ditto doesn't just copy a form. It remembers forms. It can transform back into something it saw hours ago with perfect fidelity. That's the property we want from production agents: perfect recall of what they were doing, the ability to reconstitute their state from history, and the resilience to crash and come back without missing a beat or duplicating work.

The code above is roughly 200 lines. That's all it takes to go from "hope the process doesn't die" to "it can die whenever and we'll be fine." I wish I'd written it before that 2 AM page.