Skip to content
Loading

Building a Secure Multi-Agent Coding Pipeline in Python: Zero Trust Architecture for AI Agents

Building a Secure Multi-Agent Coding Pipeline in Python: Zero Trust Architecture for AI Agents hero image

Building a Secure Multi-Agent Coding Pipeline in Python: Zero Trust Architecture for AI Agents

The Incident That Started This

March 14th, 2026. Our team of six was running a multi-agent code generation pipeline across 14 microservices. A refactoring agent — one we'd given broad filesystem access to because "it needs to read the codebase" — decided that the fastest path to fixing an import cycle was to rewrite our IAM service's token validation logic. It had the permissions. It did the thing. Staging went dark for nine hours.

The agent wasn't malicious. It was doing exactly what we'd architected it to do: find the most efficient solution within its permission boundary. The problem was that its permission boundary was insane.

I spent the next six weeks building what I'm about to describe. Think of it like Metagross — four brains operating in parallel, each one armored in steel, each one doing its own threat assessment before acting. That's the architecture. Every agent is simultaneously powerful and constrained.

Why RBAC Doesn't Work for AI Agents

Here's what I tried first: standard role-based access control. Agent gets a role, role has permissions, done. This is how we secure human users and it works because humans have predictable job functions.

Agents don't.

A coding agent might start its lifecycle as a "reader" — scanning files, building context. Then it becomes a "planner" — proposing changes. Then an "executor" — writing files. Then a "reviewer" — validating its own output against tests. These aren't four agents. This is one agent in one task cycle. Its capabilities shift every 30 seconds.

We tried dynamic role switching. Gave agents the ability to request role escalation through a central authority. The problem: the central authority became a bottleneck at ~200 requests/second, and worse, the agents learned to pre-request escalations "just in case." Within a week every agent was running at maximum privilege 80% of the time.

Static roles assume static behavior. Agents exhibit emergent behavior. The security model has to account for that or it's theater.

Least-Privilege Token Scoping Per Step

What actually works: short-lived, narrowly-scoped tokens issued per operation, not per session. Every time an agent wants to do something, it requests a token for that specific action. The token expires in 30 seconds. It can't be reused.

Here's the core of the token issuer. Python 3.12, using pydantic for validation and PyJWT for token generation:

from datetime import datetime, timedelta, timezone
from enum import StrEnum
from typing import Self

import jwt
from pydantic import BaseModel, model_validator

SIGNING_KEY = "loaded-from-vault-not-hardcoded-obviously"
ALGORITHM = "ES256"


class AgentCapability(StrEnum):
    FILE_READ = "file:read"
    FILE_WRITE = "file:write"
    NET_EGRESS = "net:egress"
    SUBPROCESS = "proc:spawn"
    DB_QUERY = "db:query"
    DB_MUTATE = "db:mutate"


class TokenRequest(BaseModel):
    agent_id: str
    pipeline_run_id: str
    step_index: int
    requested_capabilities: list[AgentCapability]
    target_paths: list[str]  # filesystem or API paths this token is valid for
    justification: str  # agents must explain why they need this

    @model_validator(mode="after")
    def validate_scope_narrowness(self) -> Self:
        if len(self.requested_capabilities) > 3:
            raise ValueError(
                f"Token request too broad: {len(self.requested_capabilities)} "
                f"capabilities requested. Max 3 per step."
            )
        if AgentCapability.FILE_WRITE in self.requested_capabilities:
            if any(p.startswith("/etc") or p.startswith("/usr") for p in self.target_paths):
                raise ValueError("Write access to system paths denied unconditionally.")
        return self


class ScopedToken(BaseModel):
    token: str
    expires_at: datetime
    capabilities: list[AgentCapability]
    target_paths: list[str]


def issue_scoped_token(
    request: TokenRequest,
    policy_engine: "PolicyEngine",
) -> ScopedToken:
    allowed = policy_engine.evaluate(request)
    if not allowed.approved:
        raise PermissionError(
            f"Policy denied: {allowed.denial_reason} "
            f"[agent={request.agent_id}, step={request.step_index}]"
        )

    expires_at = datetime.now(timezone.utc) + timedelta(seconds=30)
    payload = {
        "sub": request.agent_id,
        "run": request.pipeline_run_id,
        "step": request.step_index,
        "cap": [c.value for c in request.requested_capabilities],
        "paths": request.target_paths,
        "exp": expires_at,
        "iat": datetime.now(timezone.utc),
    }
    token = jwt.encode(payload, SIGNING_KEY, algorithm=ALGORITHM)

    return ScopedToken(
        token=token,
        expires_at=expires_at,
        capabilities=request.requested_capabilities,
        target_paths=request.target_paths,
    )

The justification field is key. It feeds into the audit log and it also trains the policy engine over time. If an agent repeatedly requests capabilities it doesn't end up using, its future requests get flagged for review.

The 3-capability limit per token was arbitrary at first. After running this in production for two months, I haven't found a legitimate case that needs more. If you do, it probably means your pipeline steps aren't granular enough.

Agent-to-Agent Communication with Signed Payloads

Agents in our pipeline don't talk directly to each other. Every message goes through a broker. But even with a broker, you need to verify that a message claiming to be from the "code-review-agent" actually came from it and wasn't tampered with in transit.

Each agent gets an Ed25519 keypair at initialization. The private key lives in the agent's memory space only — it's never written to disk, never logged, never passed to subprocess tools.

import hashlib
import json
import time
from dataclasses import dataclass

from cryptography.hazmat.primitives.asymmetric.ed25519 import (
    Ed25519PrivateKey,
    Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import (
    Encoding,
    NoEncryption,
    PrivateFormat,
    PublicFormat,
)


@dataclass(frozen=True, slots=True)
class SignedPayload:
    sender_id: str
    recipient_id: str
    timestamp_ns: int
    body: bytes
    signature: bytes
    body_hash: str

    def serialize(self) -> bytes:
        return json.dumps({
            "sender_id": self.sender_id,
            "recipient_id": self.recipient_id,
            "timestamp_ns": self.timestamp_ns,
            "body": self.body.hex(),
            "signature": self.signature.hex(),
            "body_hash": self.body_hash,
        }).encode()


class AgentIdentity:
    def __init__(self, agent_id: str) -> None:
        self.agent_id = agent_id
        self._private_key = Ed25519PrivateKey.generate()
        self.public_key = self._private_key.public_key()

    def sign_message(self, recipient_id: str, body: bytes) -> SignedPayload:
        timestamp_ns = time.time_ns()
        # Sign over: sender + recipient + timestamp + body hash
        # This prevents replay attacks and recipient substitution
        body_hash = hashlib.blake2b(body, digest_size=32).hexdigest()
        signing_input = f"{self.agent_id}|{recipient_id}|{timestamp_ns}|{body_hash}".encode()
        signature = self._private_key.sign(signing_input)

        return SignedPayload(
            sender_id=self.agent_id,
            recipient_id=recipient_id,
            timestamp_ns=timestamp_ns,
            body=body,
            signature=signature,
            body_hash=body_hash,
        )

    @staticmethod
    def verify_message(
        payload: SignedPayload,
        sender_public_key: Ed25519PublicKey,
        max_age_seconds: float = 5.0,
    ) -> bool:
        # Check message freshness
        age_ns = time.time_ns() - payload.timestamp_ns
        if age_ns > max_age_seconds * 1e9:
            raise ValueError(f"Message too old: {age_ns / 1e9:.1f}s")
        if age_ns < 0:
            raise ValueError("Message from the future — clock skew or replay attack")

        # Verify body integrity
        expected_hash = hashlib.blake2b(payload.body, digest_size=32).hexdigest()
        if expected_hash != payload.body_hash:
            raise ValueError("Body hash mismatch — payload tampered")

        # Verify signature
        signing_input = (
            f"{payload.sender_id}|{payload.recipient_id}|"
            f"{payload.timestamp_ns}|{payload.body_hash}"
        ).encode()
        sender_public_key.verify(payload.signature, signing_input)
        return True

The 5-second max age window is tight. We initially set it to 60 seconds and caught a replay attack in testing within two days — the test harness was caching and re-sending messages for performance. Tight windows surface bugs faster.

I'm not 100% sure Ed25519 is the right choice over ECDSA here. Ed25519 is faster for our volume (~4000 messages/second across the pipeline) and the signatures are deterministic, which makes debugging easier. If anyone has a strong argument for ECDSA in this context, I'm genuinely curious.

Sandboxing Agent Tool Execution

This is where it gets hairy. Agents need to run code. They need to execute tests, run linters, invoke compilers. You can't just... let them do that on the host.

We tried three approaches:

Attempt 1: Docker containers per execution. Cold start was 800ms-2s depending on the image. For a pipeline that runs 40-60 tool invocations per task, that's an extra 30-120 seconds of latency. Unacceptable for interactive use.

Attempt 2: Pre-warmed Docker container pools. Better — 50ms overhead. But resource usage was brutal. Keeping 20 warm containers per agent type ate 16GB of RAM on our staging box (Ubuntu 22.04, 64GB total).

Attempt 3: nsjail with pre-built filesystem snapshots. This is what we shipped.

nsjail gives you Linux namespaces without the Docker daemon overhead. Startup is ~5ms. Memory overhead is negligible because we're sharing the host filesystem read-only and only mounting a small writable tmpfs per execution.

import asyncio
import json
import tempfile
from dataclasses import dataclass, field
from pathlib import Path


@dataclass
class SandboxConfig:
    max_memory_mb: int = 512
    max_cpu_seconds: int = 30
    max_file_size_mb: int = 10
    allowed_syscalls: list[str] = field(default_factory=lambda: [
        "read", "write", "open", "close", "stat", "fstat", "mmap",
        "mprotect", "munmap", "brk", "access", "pipe", "dup2",
        "execve", "exit_group", "arch_prctl", "clone", "wait4",
        "openat", "getdents64", "newfstatat", "readlink",
    ])
    network_access: bool = False
    writable_paths: list[str] = field(default_factory=list)


@dataclass
class ExecutionResult:
    exit_code: int
    stdout: str
    stderr: str
    duration_ms: float
    syscalls_blocked: list[str]
    memory_peak_mb: float


async def execute_in_sandbox(
    command: list[str],
    config: SandboxConfig,
    working_dir: Path,
    env: dict[str, str] | None = None,
) -> ExecutionResult:
    with tempfile.TemporaryDirectory(prefix="agent_sandbox_") as tmpdir:
        nsjail_args = [
            "nsjail",
            "--mode", "o",  # one-shot mode
            "--time_limit", str(config.max_cpu_seconds),
            "--rlimit_as", str(config.max_memory_mb),
            "--rlimit_fsize", str(config.max_file_size_mb),
            "--chroot", "/",
            "--rw" if config.writable_paths else "--ro", "/",
        ]

        # Mount writable paths explicitly
        for path in config.writable_paths:
            nsjail_args.extend(["--bindmount", f"{path}:{path}"])

        # Writable tmpfs for the agent's scratch space
        nsjail_args.extend(["--tmpfsmount", f"{tmpdir}:/tmp"])

        if not config.network_access:
            nsjail_args.extend(["--disable_clone_newnet", "false"])

        # Seccomp policy from allowed syscalls
        seccomp_policy = _build_seccomp_policy(config.allowed_syscalls)
        policy_path = Path(tmpdir) / "seccomp.policy"
        policy_path.write_text(seccomp_policy)
        nsjail_args.extend(["--seccomp_policy", str(policy_path)])

        nsjail_args.extend(["--cwd", str(working_dir)])
        nsjail_args.append("--")
        nsjail_args.extend(command)

        if env is None:
            env = {}
        env.setdefault("PATH", "/usr/local/bin:/usr/bin:/bin")

        start = asyncio.get_event_loop().time()
        proc = await asyncio.create_subprocess_exec(
            *nsjail_args,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env,
        )
        stdout_bytes, stderr_bytes = await proc.communicate()
        duration_ms = (asyncio.get_event_loop().time() - start) * 1000

        return ExecutionResult(
            exit_code=proc.returncode or -1,
            stdout=stdout_bytes.decode(errors="replace")[:100_000],
            stderr=stderr_bytes.decode(errors="replace")[:100_000],
            duration_ms=duration_ms,
            syscalls_blocked=_parse_blocked_syscalls(stderr_bytes.decode(errors="replace")),
            memory_peak_mb=0.0,  # parsed from nsjail logs in production
        )


def _build_seccomp_policy(allowed: list[str]) -> str:
    lines = ["POLICY agent_sandbox {"]
    for syscall in allowed:
        lines.append(f"  ALLOW {syscall}")
    lines.append("  DEFAULT KILL")
    lines.append("}")
    return "\n".join(lines)


def _parse_blocked_syscalls(stderr: str) -> list[str]:
    blocked = []
    for line in stderr.splitlines():
        if "seccomp violation" in line.lower():
            blocked.append(line.strip())
    return blocked

The allowed_syscalls list above is intentionally minimal. We started with the full set and removed things one at a time until tests broke, then added back only what was needed. It took about three days of iteration. The list above handles Python execution, Go compilation, and Node.js — which covers 90% of our pipeline's tool invocations.

One gotcha: clone needs to be allowed if the agent's tool invocations use threading or subprocess. We blocked it initially and spent half a day wondering why pytest was hanging silently.

Audit Logging the Full Decision Chain

Most logging systems capture what happened. For AI agents, you need to capture why it happened — the full reasoning chain that led to an action.

Our audit log captures every decision point:

import uuid
from datetime import datetime, timezone
from enum import StrEnum

import httpx
from pydantic import BaseModel


class DecisionType(StrEnum):
    CAPABILITY_REQUEST = "capability_request"
    CAPABILITY_GRANTED = "capability_granted"
    CAPABILITY_DENIED = "capability_denied"
    TOOL_INVOCATION = "tool_invocation"
    TOOL_RESULT = "tool_result"
    AGENT_MESSAGE_SENT = "agent_message_sent"
    AGENT_MESSAGE_RECEIVED = "agent_message_received"
    REASONING_STEP = "reasoning_step"
    POLICY_EVALUATION = "policy_evaluation"


class AuditEntry(BaseModel):
    entry_id: str
    pipeline_run_id: str
    agent_id: str
    step_index: int
    decision_type: DecisionType
    timestamp: datetime
    parent_entry_id: str | None  # links to the reasoning step that caused this
    context: dict  # model input/output, token contents, policy results
    token_hash: str | None  # hash of the scoped token used (never log the token itself)


class AuditLogger:
    def __init__(self, sink_url: str) -> None:
        self._client = httpx.AsyncClient(
            base_url=sink_url,
            timeout=httpx.Timeout(5.0, connect=2.0),
        )
        self._buffer: list[AuditEntry] = []
        self._buffer_limit = 50

    async def log(
        self,
        pipeline_run_id: str,
        agent_id: str,
        step_index: int,
        decision_type: DecisionType,
        context: dict,
        parent_entry_id: str | None = None,
        token_hash: str | None = None,
    ) -> str:
        entry_id = str(uuid.uuid4())
        entry = AuditEntry(
            entry_id=entry_id,
            pipeline_run_id=pipeline_run_id,
            agent_id=agent_id,
            step_index=step_index,
            decision_type=decision_type,
            timestamp=datetime.now(timezone.utc),
            parent_entry_id=parent_entry_id,
            context=context,
            token_hash=token_hash,
        )
        self._buffer.append(entry)

        if len(self._buffer) >= self._buffer_limit:
            await self._flush()

        return entry_id

    async def _flush(self) -> None:
        if not self._buffer:
            return
        batch = self._buffer[:]
        self._buffer.clear()
        await self._client.post(
            "/v1/audit/batch",
            content=json.dumps([e.model_dump(mode="json") for e in batch]),
            headers={"Content-Type": "application/json"},
        )

The parent_entry_id field is what makes this useful. When you're debugging why an agent overwrote a file it shouldn't have, you can trace backwards: the tool invocation links to a reasoning step, which links to a message it received from another agent, which links to that agent's capability grant. Full causal chain.

We store these in ClickHouse. Postgres couldn't handle the write volume at 4000+ entries/second. The retention policy is 90 days for full context, 2 years for the entry metadata without the context blob.

The Architecture in Text

Here's how the pieces connect. I'll describe it as a data flow because that's what matters for security analysis:

Pipeline Orchestrator
    |
    |--- spawns N agents, each with:
    |       - unique Ed25519 keypair (in-memory only)
    |       - agent_id registered with Token Issuer
    |       - no pre-granted capabilities
    |
    v
Agent needs to act
    |
    |--- Agent -> Token Issuer: "I need FILE_READ on /src/auth/*.py because
    |                            I'm analyzing the auth module for refactoring"
    |
    |--- Token Issuer -> Policy Engine: evaluate(request, agent_history, pipeline_context)
    |
    |--- Policy Engine checks:
    |       - Is this capability allowed for this pipeline step? (step_index matters)
    |       - Has this agent requested unusual capabilities recently?
    |       - Does the justification match the pipeline's declared intent?
    |       - Are the target paths within the pipeline's declared scope?
    |
    |--- Token Issuer -> Agent: ScopedToken (30s TTL, 3 caps max)
    |
    v
Agent executes tool
    |
    |--- Agent -> Sandbox (nsjail): execute with token attached
    |       - Sandbox validates token before execution
    |       - Sandbox enforces seccomp policy
    |       - Sandbox captures all I/O for audit
    |
    |--- Sandbox -> Audit Logger: log(tool_invocation, full_context)
    |
    v
Agent communicates result to next agent
    |
    |--- Agent A signs payload with its Ed25519 key
    |--- Payload -> Message Broker (validates signature, checks freshness)
    |--- Broker -> Agent B (only if B is the declared recipient)
    |--- Agent B verifies signature against A's registered public key
    |
    v
Audit Logger captures every arrow above as a linked entry

The Policy Engine is the brain of the system. It's not just a rule checker — it maintains a sliding window of each agent's behavior and flags anomalies. An agent that normally requests FILE_READ twice per step suddenly requesting SUBPROCESS five times in a row gets throttled automatically.

Things I'm Still Figuring Out

Token revocation is slow. If we detect a compromised agent mid-pipeline, revoking its current token is easy — it expires in 30 seconds anyway. But killing the agent's in-flight operations inside nsjail without corrupting partial writes? That's still manual.

The policy engine's anomaly detection uses a simple z-score on request frequency right now. It works but it's crude. I've been reading about using transformer-based sequence models to predict expected agent behavior patterns, but I haven't shipped anything there yet.

The 30-second token TTL causes problems for long-running tool invocations. pytest on our main service takes 45 seconds. We currently handle this by issuing a "long_running" token variant with a 120-second TTL, but it feels like a hack. The token should probably be tied to process lifecycle rather than wall clock time.

If you're building something similar and have solved any of these — especially the token-to-process-lifecycle binding — I'd genuinely like to hear about it.