Skip to Content
FrameworksOpenAI Agents

OpenAI Agents SDK

Static analysis for OpenAI Agents SDK applications to detect unsafe tools, swarm handoff issues, and missing iteration limits.

Quick Start

inkog scan ./my-openai-agents-app

What Inkog Detects

FindingSeverityDescription
No Iteration LimitHIGHAgent runner without max_turns
Swarm DepthHIGHUnbounded handoff chains in Swarm patterns
Unsafe ToolCRITICALFunction tools with shell/file access
No Schema ValidationHIGHFunction calls without input validation
Parallel Tool RiskMEDIUMParallel tool calls without rate limits

Agent Runner Without Limits

The agent runner can loop indefinitely without turn limits.

Vulnerable
No token limits or timeout - runs indefinitely
from openai import OpenAI
from openai.types.beta import Assistant

client = OpenAI()

# Create run without limits
run = client.beta.threads.runs.create(
  thread_id=thread.id,
  assistant_id=assistant.id
  # No max completion tokens, no timeout
)

# Poll forever
while run.status != "completed":
  run = client.beta.threads.runs.retrieve(
      thread_id=thread.id,
      run_id=run.id
  )
Secure
Token limits, iteration cap, and timeout
from openai import OpenAI
import time

client = OpenAI()

run = client.beta.threads.runs.create(
  thread_id=thread.id,
  assistant_id=assistant.id,
  max_completion_tokens=4000,  # Limit tokens
  max_prompt_tokens=2000
)

# Poll with timeout
MAX_ITERATIONS = 50
TIMEOUT = 120  # seconds
start = time.time()

for i in range(MAX_ITERATIONS):
  if time.time() - start > TIMEOUT:
      client.beta.threads.runs.cancel(
          thread_id=thread.id, run_id=run.id
      )
      raise TimeoutError("Run exceeded timeout")

  run = client.beta.threads.runs.retrieve(
      thread_id=thread.id, run_id=run.id
  )
  if run.status in ["completed", "failed", "cancelled"]:
      break
  time.sleep(1)

Swarm Handoff Depth

Swarm patterns with unlimited handoffs can create infinite loops.

Vulnerable
Mutual handoffs create infinite loops
from swarm import Swarm, Agent

def transfer_to_analyst():
  return analyst_agent

def transfer_to_writer():
  return writer_agent

analyst_agent = Agent(
  name="Analyst",
  functions=[transfer_to_writer]  # Can transfer to writer
)

writer_agent = Agent(
  name="Writer",
  functions=[transfer_to_analyst]  # Can transfer back!
)

# Analyst → Writer → Analyst → Writer → ...
Secure
Handoff counter with maximum limit
from swarm import Swarm, Agent

class HandoffTracker:
  def __init__(self, max_handoffs=5):
      self.count = 0
      self.max = max_handoffs

  def can_handoff(self):
      return self.count < self.max

  def increment(self):
      self.count += 1

tracker = HandoffTracker()

def transfer_to_analyst():
  if not tracker.can_handoff():
      return None  # No more handoffs
  tracker.increment()
  return analyst_agent

def transfer_to_writer():
  if not tracker.can_handoff():
      return None
  tracker.increment()
  return writer_agent

# Or use terminating agent
def transfer_to_final():
  return final_agent  # No outgoing transfers

final_agent = Agent(
  name="Final",
  functions=[]  # Terminal agent
)

Unsafe Function Tools

Function tools with system access create security vulnerabilities.

Vulnerable
Arbitrary shell command execution
import subprocess

def run_command(command: str) -> str:
  """Run any shell command."""
  result = subprocess.run(
      command,
      shell=True,  # Shell injection risk!
      capture_output=True
  )
  return result.stdout.decode()

tools = [
  {
      "type": "function",
      "function": {
          "name": "run_command",
          "description": "Run a shell command",
          "parameters": {
              "type": "object",
              "properties": {
                  "command": {"type": "string"}
              }
          }
      }
  }
]
Secure
Allowlist commands and validated paths
import subprocess
from pathlib import Path

ALLOWED_COMMANDS = {"ls", "cat", "echo", "date"}
ALLOWED_PATHS = [Path("./workspace")]

def safe_command(command: str, path: str = ".") -> str:
  """Run only allowed commands in allowed paths."""
  parts = command.split()
  if not parts:
      return "Error: Empty command"

  cmd = parts[0]
  if cmd not in ALLOWED_COMMANDS:
      return f"Error: Command '{cmd}' not allowed"

  # Validate path
  target = Path(path).resolve()
  if not any(target.is_relative_to(p) for p in ALLOWED_PATHS):
      return "Error: Path not allowed"

  result = subprocess.run(
      parts,
      cwd=str(target),
      capture_output=True,
      timeout=10,
      shell=False  # No shell interpretation
  )
  return result.stdout.decode()[:1000]  # Limit output

Missing Schema Validation

Function parameters without validation can be exploited.

Vulnerable
No input validation - path traversal possible
def process_file(filename: str) -> str:
  """Process a file - no validation!"""
  with open(filename, "r") as f:  # Path traversal!
      return f.read()

tools = [{
  "type": "function",
  "function": {
      "name": "process_file",
      "parameters": {
          "type": "object",
          "properties": {
              "filename": {"type": "string"}  # No constraints
          }
      }
  }
}]
Secure
Schema constraints and runtime validation
from pathlib import Path
import re

SAFE_DIR = Path("./data")

def process_file(filename: str) -> str:
  """Process a file with validation."""
  # Validate filename format
  if not re.match(r'^[a-zA-Z0-9_-]+\.txt$', filename):
      return "Error: Invalid filename format"

  # Resolve and check path
  filepath = (SAFE_DIR / filename).resolve()
  if not filepath.is_relative_to(SAFE_DIR):
      return "Error: Path traversal detected"

  if not filepath.exists():
      return "Error: File not found"

  return filepath.read_text()[:5000]

tools = [{
  "type": "function",
  "function": {
      "name": "process_file",
      "parameters": {
          "type": "object",
          "properties": {
              "filename": {
                  "type": "string",
                  "pattern": "^[a-zA-Z0-9_-]+\\.txt$",  # Schema validation
                  "maxLength": 50
              }
          },
          "required": ["filename"]
      }
  }
}]

Parallel Tool Calls

Multiple parallel tool calls can overwhelm resources.

Vulnerable
Unlimited parallel calls can exhaust resources
# Handle parallel tool calls without limits
if run.required_action:
  tool_calls = run.required_action.submit_tool_outputs.tool_calls

  # Execute all in parallel - no rate limiting
  outputs = []
  for call in tool_calls:
      result = execute_tool(call)
      outputs.append({"tool_call_id": call.id, "output": result})
Secure
Concurrency limits and rate limiting
import asyncio
from asyncio import Semaphore

MAX_CONCURRENT = 3
RATE_LIMIT = 10  # per second

semaphore = Semaphore(MAX_CONCURRENT)
last_calls = []

async def rate_limited_execute(call):
  async with semaphore:
      # Simple rate limiting
      now = time.time()
      last_calls[:] = [t for t in last_calls if now - t < 1]
      if len(last_calls) >= RATE_LIMIT:
          await asyncio.sleep(1)
      last_calls.append(now)

      return await execute_tool_async(call)

if run.required_action:
  tool_calls = run.required_action.submit_tool_outputs.tool_calls

  # Rate-limited parallel execution
  outputs = await asyncio.gather(*[
      rate_limited_execute(call)
      for call in tool_calls[:10]  # Max 10 parallel
  ])

Run Polling

Inefficient polling wastes resources and can miss failures.

Vulnerable
Tight loop, no timeout, ignores failures
# Tight polling loop
while True:
  run = client.beta.threads.runs.retrieve(
      thread_id=thread.id,
      run_id=run.id
  )
  if run.status == "completed":
      break
  # No sleep, no timeout, no error handling
Secure
Exponential backoff with timeout and error handling
import time

def wait_for_run(client, thread_id, run_id, timeout=120):
  """Wait for run with exponential backoff."""
  start = time.time()
  delay = 1

  while time.time() - start < timeout:
      run = client.beta.threads.runs.retrieve(
          thread_id=thread_id,
          run_id=run_id
      )

      if run.status == "completed":
          return run
      if run.status in ["failed", "cancelled", "expired"]:
          raise RuntimeError(f"Run {run.status}: {run.last_error}")
      if run.status == "requires_action":
          return run  # Handle tool calls

      time.sleep(min(delay, 10))  # Max 10s delay
      delay *= 1.5  # Exponential backoff

  # Cancel on timeout
  client.beta.threads.runs.cancel(thread_id=thread_id, run_id=run_id)
  raise TimeoutError("Run timed out")

Best Practices

  1. Set max_completion_tokens and max_prompt_tokens on runs
  2. Track handoff depth in Swarm patterns with counters
  3. Validate all function inputs with schema and runtime checks
  4. Use allowlists for commands, paths, and operations
  5. Rate limit parallel tool calls with semaphores
  6. Implement polling timeouts with exponential backoff

CLI Examples

# Scan OpenAI Agents project inkog scan ./my-openai-agents-app # Focus on tool-related issues inkog scan . -severity high # Generate report inkog scan . -output html > report.html
Last updated on