Access Control
Access control vulnerabilities occur when agents perform sensitive operations without proper authentication or authorization checks.
Missing Authentication Check (CRITICAL)
CVSS 9.1 | CWE-306
Sensitive operations performed without authentication verification.
Vulnerable
No authentication on sensitive endpoint
@app.post("/api/agent/execute")
def execute_agent_task(request):
# No authentication check!
task = request.json.get("task")
return agent.execute(task)Secure
JWT validation before processing
from functools import wraps
import jwt
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token:
return {"error": "Missing token"}, 401
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
request.user_id = payload["sub"]
except jwt.InvalidTokenError:
return {"error": "Invalid token"}, 401
return f(*args, **kwargs)
return decorated
@app.post("/api/agent/execute")
@require_auth
def execute_agent_task(request):
task = request.json.get("task")
return agent.execute(task, user_id=request.user_id)Path Traversal (HIGH)
CVSS 7.5 | CWE-22
File paths constructed from untrusted input allowing access outside intended directory.
Vulnerable
User input directly in file path
@tool
def read_document(filename):
"""Read a document from the documents folder."""
# DANGEROUS: User can traverse directories
path = f"./documents/{filename}"
with open(path) as f:
return f.read()
# Attacker: filename = "../../../etc/passwd"Secure
Path validation prevents traversal
import os
from pathlib import Path
DOCS_DIR = Path("./documents").resolve()
@tool
def read_document(filename):
"""Read a document from the documents folder."""
# Resolve the full path
requested_path = (DOCS_DIR / filename).resolve()
# Verify it's within allowed directory
if not str(requested_path).startswith(str(DOCS_DIR)):
raise ValueError("Access denied: path traversal detected")
if not requested_path.exists():
raise FileNotFoundError(f"Document not found: {filename}")
with open(requested_path) as f:
return f.read()Missing Authorization (HIGH)
User can access resources they don’t own.
Vulnerable
No ownership check on resource access
@tool
def get_conversation(conversation_id):
"""Retrieve a conversation by ID."""
# No authorization check!
return db.conversations.find_one({"_id": conversation_id})
# User A can access User B's conversationsSecure
Verify resource ownership before access
@tool
def get_conversation(conversation_id, user_id):
"""Retrieve a conversation by ID."""
conversation = db.conversations.find_one({
"_id": conversation_id,
"user_id": user_id # Must match requesting user
})
if not conversation:
raise PermissionError("Conversation not found or access denied")
return conversationExcessive Agent Permissions
Agents with more permissions than necessary for their task.
Vulnerable
Agent has admin-level database access
# Agent has full database access
db_tool = SQLDatabaseTool(
connection=engine,
# Can SELECT, INSERT, UPDATE, DELETE, DROP...
)
agent = create_agent(
tools=[db_tool],
system_prompt="You are a helpful assistant."
)Secure
Read-only access with query restrictions
from sqlalchemy import create_engine, text
# Read-only connection
readonly_engine = create_engine(
DB_URL,
connect_args={"options": "-c default_transaction_read_only=on"}
)
# Restricted tool with query validation
class SafeDBTool:
def __init__(self, engine):
self.engine = engine
self.allowed_tables = {"products", "categories"}
def query(self, sql):
sql_upper = sql.upper().strip()
if not sql_upper.startswith("SELECT"):
raise ValueError("Only SELECT queries allowed")
with self.engine.connect() as conn:
return conn.execute(text(sql)).fetchall()
agent = create_agent(
tools=[SafeDBTool(readonly_engine)],
system_prompt="You can only read product information."
)Principle of Least Privilege: Agents should only have the minimum permissions required for their specific task.
Defense Strategies
1. Authentication Middleware
from starlette.middleware.base import BaseHTTPMiddleware
class AuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# Skip auth for public endpoints
if request.url.path in ["/health", "/docs"]:
return await call_next(request)
token = request.headers.get("Authorization")
if not token:
return JSONResponse({"error": "Unauthorized"}, status_code=401)
try:
request.state.user = verify_token(token)
except InvalidToken:
return JSONResponse({"error": "Invalid token"}, status_code=401)
return await call_next(request)2. Resource-Based Access Control
class ResourceACL:
def can_read(self, user, resource):
return (
resource.owner_id == user.id or
user.id in resource.shared_with or
user.is_admin
)
def can_write(self, user, resource):
return resource.owner_id == user.id or user.is_admin
def can_delete(self, user, resource):
return resource.owner_id == user.id3. Tool Permission Scoping
from enum import Flag, auto
class ToolPermission(Flag):
READ = auto()
WRITE = auto()
DELETE = auto()
EXECUTE = auto()
def create_scoped_agent(user_permissions: ToolPermission):
tools = []
if ToolPermission.READ in user_permissions:
tools.append(read_tool)
if ToolPermission.WRITE in user_permissions:
tools.append(write_tool)
if ToolPermission.EXECUTE in user_permissions:
tools.append(execute_tool)
return create_agent(tools=tools)Checklist
- All API endpoints require authentication
- Resource access checks ownership/permissions
- File operations validate paths are within allowed directories
- Database connections use minimum required privileges
- Agent tools are scoped to user’s permission level
- Audit logs capture all sensitive operations
Last updated on