Skip to Content
VulnerabilitiesAccess Control

Access Control

Access control vulnerabilities occur when agents perform sensitive operations without proper authentication or authorization checks.

Missing Authentication Check (CRITICAL)

CVSS 9.1 | CWE-306

Sensitive operations performed without authentication verification.

Vulnerable
No authentication on sensitive endpoint
@app.post("/api/agent/execute")
def execute_agent_task(request):
  # No authentication check!
  task = request.json.get("task")
  return agent.execute(task)
Secure
JWT validation before processing
from functools import wraps
import jwt

def require_auth(f):
  @wraps(f)
  def decorated(*args, **kwargs):
      token = request.headers.get("Authorization", "").replace("Bearer ", "")
      if not token:
          return {"error": "Missing token"}, 401
      try:
          payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
          request.user_id = payload["sub"]
      except jwt.InvalidTokenError:
          return {"error": "Invalid token"}, 401
      return f(*args, **kwargs)
  return decorated

@app.post("/api/agent/execute")
@require_auth
def execute_agent_task(request):
  task = request.json.get("task")
  return agent.execute(task, user_id=request.user_id)

Path Traversal (HIGH)

CVSS 7.5 | CWE-22

File paths constructed from untrusted input allowing access outside intended directory.

Vulnerable
User input directly in file path
@tool
def read_document(filename):
  """Read a document from the documents folder."""
  # DANGEROUS: User can traverse directories
  path = f"./documents/{filename}"
  with open(path) as f:
      return f.read()

# Attacker: filename = "../../../etc/passwd"
Secure
Path validation prevents traversal
import os
from pathlib import Path

DOCS_DIR = Path("./documents").resolve()

@tool
def read_document(filename):
  """Read a document from the documents folder."""
  # Resolve the full path
  requested_path = (DOCS_DIR / filename).resolve()

  # Verify it's within allowed directory
  if not str(requested_path).startswith(str(DOCS_DIR)):
      raise ValueError("Access denied: path traversal detected")

  if not requested_path.exists():
      raise FileNotFoundError(f"Document not found: {filename}")

  with open(requested_path) as f:
      return f.read()

Missing Authorization (HIGH)

User can access resources they don’t own.

Vulnerable
No ownership check on resource access
@tool
def get_conversation(conversation_id):
  """Retrieve a conversation by ID."""
  # No authorization check!
  return db.conversations.find_one({"_id": conversation_id})

# User A can access User B's conversations
Secure
Verify resource ownership before access
@tool
def get_conversation(conversation_id, user_id):
  """Retrieve a conversation by ID."""
  conversation = db.conversations.find_one({
      "_id": conversation_id,
      "user_id": user_id  # Must match requesting user
  })

  if not conversation:
      raise PermissionError("Conversation not found or access denied")

  return conversation

Excessive Agent Permissions

Agents with more permissions than necessary for their task.

Vulnerable
Agent has admin-level database access
# Agent has full database access
db_tool = SQLDatabaseTool(
  connection=engine,
  # Can SELECT, INSERT, UPDATE, DELETE, DROP...
)

agent = create_agent(
  tools=[db_tool],
  system_prompt="You are a helpful assistant."
)
Secure
Read-only access with query restrictions
from sqlalchemy import create_engine, text

# Read-only connection
readonly_engine = create_engine(
  DB_URL,
  connect_args={"options": "-c default_transaction_read_only=on"}
)

# Restricted tool with query validation
class SafeDBTool:
  def __init__(self, engine):
      self.engine = engine
      self.allowed_tables = {"products", "categories"}

  def query(self, sql):
      sql_upper = sql.upper().strip()
      if not sql_upper.startswith("SELECT"):
          raise ValueError("Only SELECT queries allowed")

      with self.engine.connect() as conn:
          return conn.execute(text(sql)).fetchall()

agent = create_agent(
  tools=[SafeDBTool(readonly_engine)],
  system_prompt="You can only read product information."
)

Principle of Least Privilege: Agents should only have the minimum permissions required for their specific task.

Defense Strategies

1. Authentication Middleware

from starlette.middleware.base import BaseHTTPMiddleware class AuthMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): # Skip auth for public endpoints if request.url.path in ["/health", "/docs"]: return await call_next(request) token = request.headers.get("Authorization") if not token: return JSONResponse({"error": "Unauthorized"}, status_code=401) try: request.state.user = verify_token(token) except InvalidToken: return JSONResponse({"error": "Invalid token"}, status_code=401) return await call_next(request)

2. Resource-Based Access Control

class ResourceACL: def can_read(self, user, resource): return ( resource.owner_id == user.id or user.id in resource.shared_with or user.is_admin ) def can_write(self, user, resource): return resource.owner_id == user.id or user.is_admin def can_delete(self, user, resource): return resource.owner_id == user.id

3. Tool Permission Scoping

from enum import Flag, auto class ToolPermission(Flag): READ = auto() WRITE = auto() DELETE = auto() EXECUTE = auto() def create_scoped_agent(user_permissions: ToolPermission): tools = [] if ToolPermission.READ in user_permissions: tools.append(read_tool) if ToolPermission.WRITE in user_permissions: tools.append(write_tool) if ToolPermission.EXECUTE in user_permissions: tools.append(execute_tool) return create_agent(tools=tools)

Checklist

  • All API endpoints require authentication
  • Resource access checks ownership/permissions
  • File operations validate paths are within allowed directories
  • Database connections use minimum required privileges
  • Agent tools are scoped to user’s permission level
  • Audit logs capture all sensitive operations
Last updated on