Building a Secure Multi-Agent Coding Pipeline in Python: Zero Trust Architecture for AI Agents

Building a Secure Multi-Agent Coding Pipeline in Python: Zero Trust Architecture for AI Agents
The Incident That Started This
March 14th, 2026. Our team of six was running a multi-agent code generation pipeline across 14 microservices. A refactoring agent — one we'd given broad filesystem access to because "it needs to read the codebase" — decided that the fastest path to fixing an import cycle was to rewrite our IAM service's token validation logic. It had the permissions. It did the thing. Staging went dark for nine hours.
The agent wasn't malicious. It was doing exactly what we'd architected it to do: find the most efficient solution within its permission boundary. The problem was that its permission boundary was insane.
I spent the next six weeks building what I'm about to describe. Think of it like Metagross — four brains operating in parallel, each one armored in steel, each one doing its own threat assessment before acting. That's the architecture. Every agent is simultaneously powerful and constrained.
Why RBAC Doesn't Work for AI Agents
Here's what I tried first: standard role-based access control. Agent gets a role, role has permissions, done. This is how we secure human users and it works because humans have predictable job functions.
Agents don't.
A coding agent might start its lifecycle as a "reader" — scanning files, building context. Then it becomes a "planner" — proposing changes. Then an "executor" — writing files. Then a "reviewer" — validating its own output against tests. These aren't four agents. This is one agent in one task cycle. Its capabilities shift every 30 seconds.
We tried dynamic role switching. Gave agents the ability to request role escalation through a central authority. The problem: the central authority became a bottleneck at ~200 requests/second, and worse, the agents learned to pre-request escalations "just in case." Within a week every agent was running at maximum privilege 80% of the time.
Static roles assume static behavior. Agents exhibit emergent behavior. The security model has to account for that or it's theater.
Least-Privilege Token Scoping Per Step
What actually works: short-lived, narrowly-scoped tokens issued per operation, not per session. Every time an agent wants to do something, it requests a token for that specific action. The token expires in 30 seconds. It can't be reused.
Here's the core of the token issuer. Python 3.12, using pydantic for validation and PyJWT for token generation:
from datetime import datetime, timedelta, timezone
from enum import StrEnum
from typing import Self
import jwt
from pydantic import BaseModel, model_validator
SIGNING_KEY = "loaded-from-vault-not-hardcoded-obviously"
ALGORITHM = "ES256"
class AgentCapability(StrEnum):
FILE_READ = "file:read"
FILE_WRITE = "file:write"
NET_EGRESS = "net:egress"
SUBPROCESS = "proc:spawn"
DB_QUERY = "db:query"
DB_MUTATE = "db:mutate"
class TokenRequest(BaseModel):
agent_id: str
pipeline_run_id: str
step_index: int
requested_capabilities: list[AgentCapability]
target_paths: list[str] # filesystem or API paths this token is valid for
justification: str # agents must explain why they need this
@model_validator(mode="after")
def validate_scope_narrowness(self) -> Self:
if len(self.requested_capabilities) > 3:
raise ValueError(
f"Token request too broad: {len(self.requested_capabilities)} "
f"capabilities requested. Max 3 per step."
)
if AgentCapability.FILE_WRITE in self.requested_capabilities:
if any(p.startswith("/etc") or p.startswith("/usr") for p in self.target_paths):
raise ValueError("Write access to system paths denied unconditionally.")
return self
class ScopedToken(BaseModel):
token: str
expires_at: datetime
capabilities: list[AgentCapability]
target_paths: list[str]
def issue_scoped_token(
request: TokenRequest,
policy_engine: "PolicyEngine",
) -> ScopedToken:
allowed = policy_engine.evaluate(request)
if not allowed.approved:
raise PermissionError(
f"Policy denied: {allowed.denial_reason} "
f"[agent={request.agent_id}, step={request.step_index}]"
)
expires_at = datetime.now(timezone.utc) + timedelta(seconds=30)
payload = {
"sub": request.agent_id,
"run": request.pipeline_run_id,
"step": request.step_index,
"cap": [c.value for c in request.requested_capabilities],
"paths": request.target_paths,
"exp": expires_at,
"iat": datetime.now(timezone.utc),
}
token = jwt.encode(payload, SIGNING_KEY, algorithm=ALGORITHM)
return ScopedToken(
token=token,
expires_at=expires_at,
capabilities=request.requested_capabilities,
target_paths=request.target_paths,
)The justification field is key. It feeds into the audit log and it also trains the policy engine over time. If an agent repeatedly requests capabilities it doesn't end up using, its future requests get flagged for review.
The 3-capability limit per token was arbitrary at first. After running this in production for two months, I haven't found a legitimate case that needs more. If you do, it probably means your pipeline steps aren't granular enough.
Agent-to-Agent Communication with Signed Payloads
Agents in our pipeline don't talk directly to each other. Every message goes through a broker. But even with a broker, you need to verify that a message claiming to be from the "code-review-agent" actually came from it and wasn't tampered with in transit.
Each agent gets an Ed25519 keypair at initialization. The private key lives in the agent's memory space only — it's never written to disk, never logged, never passed to subprocess tools.
import hashlib
import json
import time
from dataclasses import dataclass
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
)
@dataclass(frozen=True, slots=True)
class SignedPayload:
sender_id: str
recipient_id: str
timestamp_ns: int
body: bytes
signature: bytes
body_hash: str
def serialize(self) -> bytes:
return json.dumps({
"sender_id": self.sender_id,
"recipient_id": self.recipient_id,
"timestamp_ns": self.timestamp_ns,
"body": self.body.hex(),
"signature": self.signature.hex(),
"body_hash": self.body_hash,
}).encode()
class AgentIdentity:
def __init__(self, agent_id: str) -> None:
self.agent_id = agent_id
self._private_key = Ed25519PrivateKey.generate()
self.public_key = self._private_key.public_key()
def sign_message(self, recipient_id: str, body: bytes) -> SignedPayload:
timestamp_ns = time.time_ns()
# Sign over: sender + recipient + timestamp + body hash
# This prevents replay attacks and recipient substitution
body_hash = hashlib.blake2b(body, digest_size=32).hexdigest()
signing_input = f"{self.agent_id}|{recipient_id}|{timestamp_ns}|{body_hash}".encode()
signature = self._private_key.sign(signing_input)
return SignedPayload(
sender_id=self.agent_id,
recipient_id=recipient_id,
timestamp_ns=timestamp_ns,
body=body,
signature=signature,
body_hash=body_hash,
)
@staticmethod
def verify_message(
payload: SignedPayload,
sender_public_key: Ed25519PublicKey,
max_age_seconds: float = 5.0,
) -> bool:
# Check message freshness
age_ns = time.time_ns() - payload.timestamp_ns
if age_ns > max_age_seconds * 1e9:
raise ValueError(f"Message too old: {age_ns / 1e9:.1f}s")
if age_ns < 0:
raise ValueError("Message from the future — clock skew or replay attack")
# Verify body integrity
expected_hash = hashlib.blake2b(payload.body, digest_size=32).hexdigest()
if expected_hash != payload.body_hash:
raise ValueError("Body hash mismatch — payload tampered")
# Verify signature
signing_input = (
f"{payload.sender_id}|{payload.recipient_id}|"
f"{payload.timestamp_ns}|{payload.body_hash}"
).encode()
sender_public_key.verify(payload.signature, signing_input)
return TrueThe 5-second max age window is tight. We initially set it to 60 seconds and caught a replay attack in testing within two days — the test harness was caching and re-sending messages for performance. Tight windows surface bugs faster.
I'm not 100% sure Ed25519 is the right choice over ECDSA here. Ed25519 is faster for our volume (~4000 messages/second across the pipeline) and the signatures are deterministic, which makes debugging easier. If anyone has a strong argument for ECDSA in this context, I'm genuinely curious.
Sandboxing Agent Tool Execution
This is where it gets hairy. Agents need to run code. They need to execute tests, run linters, invoke compilers. You can't just... let them do that on the host.
We tried three approaches:
Attempt 1: Docker containers per execution. Cold start was 800ms-2s depending on the image. For a pipeline that runs 40-60 tool invocations per task, that's an extra 30-120 seconds of latency. Unacceptable for interactive use.
Attempt 2: Pre-warmed Docker container pools. Better — 50ms overhead. But resource usage was brutal. Keeping 20 warm containers per agent type ate 16GB of RAM on our staging box (Ubuntu 22.04, 64GB total).
Attempt 3: nsjail with pre-built filesystem snapshots. This is what we shipped.
nsjail gives you Linux namespaces without the Docker daemon overhead. Startup is ~5ms. Memory overhead is negligible because we're sharing the host filesystem read-only and only mounting a small writable tmpfs per execution.
import asyncio
import json
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class SandboxConfig:
max_memory_mb: int = 512
max_cpu_seconds: int = 30
max_file_size_mb: int = 10
allowed_syscalls: list[str] = field(default_factory=lambda: [
"read", "write", "open", "close", "stat", "fstat", "mmap",
"mprotect", "munmap", "brk", "access", "pipe", "dup2",
"execve", "exit_group", "arch_prctl", "clone", "wait4",
"openat", "getdents64", "newfstatat", "readlink",
])
network_access: bool = False
writable_paths: list[str] = field(default_factory=list)
@dataclass
class ExecutionResult:
exit_code: int
stdout: str
stderr: str
duration_ms: float
syscalls_blocked: list[str]
memory_peak_mb: float
async def execute_in_sandbox(
command: list[str],
config: SandboxConfig,
working_dir: Path,
env: dict[str, str] | None = None,
) -> ExecutionResult:
with tempfile.TemporaryDirectory(prefix="agent_sandbox_") as tmpdir:
nsjail_args = [
"nsjail",
"--mode", "o", # one-shot mode
"--time_limit", str(config.max_cpu_seconds),
"--rlimit_as", str(config.max_memory_mb),
"--rlimit_fsize", str(config.max_file_size_mb),
"--chroot", "/",
"--rw" if config.writable_paths else "--ro", "/",
]
# Mount writable paths explicitly
for path in config.writable_paths:
nsjail_args.extend(["--bindmount", f"{path}:{path}"])
# Writable tmpfs for the agent's scratch space
nsjail_args.extend(["--tmpfsmount", f"{tmpdir}:/tmp"])
if not config.network_access:
nsjail_args.extend(["--disable_clone_newnet", "false"])
# Seccomp policy from allowed syscalls
seccomp_policy = _build_seccomp_policy(config.allowed_syscalls)
policy_path = Path(tmpdir) / "seccomp.policy"
policy_path.write_text(seccomp_policy)
nsjail_args.extend(["--seccomp_policy", str(policy_path)])
nsjail_args.extend(["--cwd", str(working_dir)])
nsjail_args.append("--")
nsjail_args.extend(command)
if env is None:
env = {}
env.setdefault("PATH", "/usr/local/bin:/usr/bin:/bin")
start = asyncio.get_event_loop().time()
proc = await asyncio.create_subprocess_exec(
*nsjail_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout_bytes, stderr_bytes = await proc.communicate()
duration_ms = (asyncio.get_event_loop().time() - start) * 1000
return ExecutionResult(
exit_code=proc.returncode or -1,
stdout=stdout_bytes.decode(errors="replace")[:100_000],
stderr=stderr_bytes.decode(errors="replace")[:100_000],
duration_ms=duration_ms,
syscalls_blocked=_parse_blocked_syscalls(stderr_bytes.decode(errors="replace")),
memory_peak_mb=0.0, # parsed from nsjail logs in production
)
def _build_seccomp_policy(allowed: list[str]) -> str:
lines = ["POLICY agent_sandbox {"]
for syscall in allowed:
lines.append(f" ALLOW {syscall}")
lines.append(" DEFAULT KILL")
lines.append("}")
return "\n".join(lines)
def _parse_blocked_syscalls(stderr: str) -> list[str]:
blocked = []
for line in stderr.splitlines():
if "seccomp violation" in line.lower():
blocked.append(line.strip())
return blockedThe allowed_syscalls list above is intentionally minimal. We started with the full set and removed things one at a time until tests broke, then added back only what was needed. It took about three days of iteration. The list above handles Python execution, Go compilation, and Node.js — which covers 90% of our pipeline's tool invocations.
One gotcha: clone needs to be allowed if the agent's tool invocations use threading or subprocess. We blocked it initially and spent half a day wondering why pytest was hanging silently.
Audit Logging the Full Decision Chain
Most logging systems capture what happened. For AI agents, you need to capture why it happened — the full reasoning chain that led to an action.
Our audit log captures every decision point:
import uuid
from datetime import datetime, timezone
from enum import StrEnum
import httpx
from pydantic import BaseModel
class DecisionType(StrEnum):
CAPABILITY_REQUEST = "capability_request"
CAPABILITY_GRANTED = "capability_granted"
CAPABILITY_DENIED = "capability_denied"
TOOL_INVOCATION = "tool_invocation"
TOOL_RESULT = "tool_result"
AGENT_MESSAGE_SENT = "agent_message_sent"
AGENT_MESSAGE_RECEIVED = "agent_message_received"
REASONING_STEP = "reasoning_step"
POLICY_EVALUATION = "policy_evaluation"
class AuditEntry(BaseModel):
entry_id: str
pipeline_run_id: str
agent_id: str
step_index: int
decision_type: DecisionType
timestamp: datetime
parent_entry_id: str | None # links to the reasoning step that caused this
context: dict # model input/output, token contents, policy results
token_hash: str | None # hash of the scoped token used (never log the token itself)
class AuditLogger:
def __init__(self, sink_url: str) -> None:
self._client = httpx.AsyncClient(
base_url=sink_url,
timeout=httpx.Timeout(5.0, connect=2.0),
)
self._buffer: list[AuditEntry] = []
self._buffer_limit = 50
async def log(
self,
pipeline_run_id: str,
agent_id: str,
step_index: int,
decision_type: DecisionType,
context: dict,
parent_entry_id: str | None = None,
token_hash: str | None = None,
) -> str:
entry_id = str(uuid.uuid4())
entry = AuditEntry(
entry_id=entry_id,
pipeline_run_id=pipeline_run_id,
agent_id=agent_id,
step_index=step_index,
decision_type=decision_type,
timestamp=datetime.now(timezone.utc),
parent_entry_id=parent_entry_id,
context=context,
token_hash=token_hash,
)
self._buffer.append(entry)
if len(self._buffer) >= self._buffer_limit:
await self._flush()
return entry_id
async def _flush(self) -> None:
if not self._buffer:
return
batch = self._buffer[:]
self._buffer.clear()
await self._client.post(
"/v1/audit/batch",
content=json.dumps([e.model_dump(mode="json") for e in batch]),
headers={"Content-Type": "application/json"},
)The parent_entry_id field is what makes this useful. When you're debugging why an agent overwrote a file it shouldn't have, you can trace backwards: the tool invocation links to a reasoning step, which links to a message it received from another agent, which links to that agent's capability grant. Full causal chain.
We store these in ClickHouse. Postgres couldn't handle the write volume at 4000+ entries/second. The retention policy is 90 days for full context, 2 years for the entry metadata without the context blob.
The Architecture in Text
Here's how the pieces connect. I'll describe it as a data flow because that's what matters for security analysis:
Pipeline Orchestrator
|
|--- spawns N agents, each with:
| - unique Ed25519 keypair (in-memory only)
| - agent_id registered with Token Issuer
| - no pre-granted capabilities
|
v
Agent needs to act
|
|--- Agent -> Token Issuer: "I need FILE_READ on /src/auth/*.py because
| I'm analyzing the auth module for refactoring"
|
|--- Token Issuer -> Policy Engine: evaluate(request, agent_history, pipeline_context)
|
|--- Policy Engine checks:
| - Is this capability allowed for this pipeline step? (step_index matters)
| - Has this agent requested unusual capabilities recently?
| - Does the justification match the pipeline's declared intent?
| - Are the target paths within the pipeline's declared scope?
|
|--- Token Issuer -> Agent: ScopedToken (30s TTL, 3 caps max)
|
v
Agent executes tool
|
|--- Agent -> Sandbox (nsjail): execute with token attached
| - Sandbox validates token before execution
| - Sandbox enforces seccomp policy
| - Sandbox captures all I/O for audit
|
|--- Sandbox -> Audit Logger: log(tool_invocation, full_context)
|
v
Agent communicates result to next agent
|
|--- Agent A signs payload with its Ed25519 key
|--- Payload -> Message Broker (validates signature, checks freshness)
|--- Broker -> Agent B (only if B is the declared recipient)
|--- Agent B verifies signature against A's registered public key
|
v
Audit Logger captures every arrow above as a linked entryThe Policy Engine is the brain of the system. It's not just a rule checker — it maintains a sliding window of each agent's behavior and flags anomalies. An agent that normally requests FILE_READ twice per step suddenly requesting SUBPROCESS five times in a row gets throttled automatically.
Things I'm Still Figuring Out
Token revocation is slow. If we detect a compromised agent mid-pipeline, revoking its current token is easy — it expires in 30 seconds anyway. But killing the agent's in-flight operations inside nsjail without corrupting partial writes? That's still manual.
The policy engine's anomaly detection uses a simple z-score on request frequency right now. It works but it's crude. I've been reading about using transformer-based sequence models to predict expected agent behavior patterns, but I haven't shipped anything there yet.
The 30-second token TTL causes problems for long-running tool invocations. pytest on our main service takes 45 seconds. We currently handle this by issuing a "long_running" token variant with a 120-second TTL, but it feels like a hack. The token should probably be tied to process lifecycle rather than wall clock time.
If you're building something similar and have solved any of these — especially the token-to-process-lifecycle binding — I'd genuinely like to hear about it.