Semantic Kernel
Static analysis for Microsoft Semantic Kernel applications to detect planner loops, unsafe plugins, and unbounded function execution.
Quick Start
inkog scan ./my-semantic-kernel-appWhat Inkog Detects
| Finding | Severity | Description |
|---|---|---|
| Planner Loop | CRITICAL | Stepwise/Sequential planner without limits |
| Unsafe Plugin | CRITICAL | Plugin with shell or file system access |
| Kernel Function Risk | HIGH | Functions calling external APIs without validation |
| No Termination | HIGH | ChatCompletionAgent without stop strategy |
| Memory Exposure | MEDIUM | Unencrypted sensitive data in memory |
Planner Infinite Loops
Planners can loop indefinitely when trying to achieve a goal.
Vulnerable
Planner runs indefinitely seeking goal
from semantic_kernel.planners import SequentialPlanner
kernel = Kernel()
planner = SequentialPlanner(kernel)
# Planner runs until goal achieved - may never finish
plan = await planner.create_plan(goal)
result = await plan.invoke(kernel) # No iteration limitSecure
Iteration limits and timeout
from semantic_kernel.planners import SequentialPlanner
from semantic_kernel.planners.sequential_planner import SequentialPlannerConfig
config = SequentialPlannerConfig(
max_iterations=10, # Limit planning iterations
max_tokens=4000, # Limit token usage
allow_missing_functions=False
)
planner = SequentialPlanner(kernel, config=config)
try:
plan = await planner.create_plan(goal)
result = await asyncio.wait_for(
plan.invoke(kernel),
timeout=60 # 60 second timeout
)
except asyncio.TimeoutError:
print("Plan execution timed out")Unsafe Plugin Functions
Plugins with system access can be exploited through prompt injection.
Vulnerable
Unrestricted shell access via plugin
from semantic_kernel.functions import kernel_function
import subprocess
class ShellPlugin:
@kernel_function(name="run_shell")
def run_shell(self, command: str) -> str:
"""Execute any shell command."""
return subprocess.run(
command,
shell=True,
capture_output=True
).stdout.decode()
kernel.add_plugin(ShellPlugin(), "shell")Secure
Allowlist with restricted commands
from semantic_kernel.functions import kernel_function
import subprocess
from typing import Annotated
class SafeShellPlugin:
ALLOWED_COMMANDS = {"ls", "cat", "date", "whoami"}
@kernel_function(
name="run_shell",
description="Run only allowed shell commands"
)
def run_shell(
self,
command: Annotated[str, "Command name (ls, cat, date, whoami)"]
) -> str:
"""Execute only allowed commands."""
parts = command.split()
if not parts or parts[0] not in self.ALLOWED_COMMANDS:
return "Error: Command not allowed"
return subprocess.run(
parts,
shell=False,
capture_output=True,
timeout=10
).stdout.decode()[:1000]
kernel.add_plugin(SafeShellPlugin(), "safe_shell")ChatCompletionAgent Without Termination
Agents without termination strategies run indefinitely.
Vulnerable
No termination condition
from semantic_kernel.agents import ChatCompletionAgent
agent = ChatCompletionAgent(
kernel=kernel,
name="Assistant",
instructions="Help the user"
# No termination strategy
)
# Agent runs forever
async for message in agent.invoke_stream(chat_history):
print(message.content)Secure
Custom termination strategy with turn limit
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.agents.strategies import TerminationStrategy
class MaxTurnsTermination(TerminationStrategy):
def __init__(self, max_turns: int = 10):
self.max_turns = max_turns
self.turn_count = 0
async def should_terminate(self, agent, history) -> bool:
self.turn_count += 1
if self.turn_count >= self.max_turns:
return True
# Check for completion keywords
last_msg = history[-1].content if history else ""
return any(w in last_msg.lower() for w in ["done", "complete", "finished"])
agent = ChatCompletionAgent(
kernel=kernel,
name="Assistant",
instructions="Help the user. Say 'DONE' when finished.",
termination_strategy=MaxTurnsTermination(max_turns=15)
)Kernel Function Validation
Functions that call external APIs without validation are risky.
Vulnerable
SSRF risk - can call any URL
import requests
class APIPlugin:
@kernel_function(name="call_api")
def call_api(self, url: str, data: str) -> str:
"""Call any API endpoint."""
response = requests.post(url, json={"data": data})
return response.textSecure
Allowlist hosts and validate URLs
import requests
from urllib.parse import urlparse
class SafeAPIPlugin:
ALLOWED_HOSTS = {"api.example.com", "internal.company.com"}
@kernel_function(name="call_api")
def call_api(self, endpoint: str, data: str) -> str:
"""Call only allowed API endpoints."""
parsed = urlparse(endpoint)
if parsed.hostname not in self.ALLOWED_HOSTS:
return "Error: Host not allowed"
if parsed.scheme != "https":
return "Error: HTTPS required"
try:
response = requests.post(
endpoint,
json={"data": data[:1000]}, # Limit data size
timeout=10,
verify=True
)
return response.text[:5000] # Limit response
except requests.RequestException as e:
return f"Error: {str(e)}"Memory Plugin Security
Memory plugins can store and leak sensitive information.
Vulnerable
Secrets stored in plain text
from semantic_kernel.memory import SemanticTextMemory
memory = SemanticTextMemory(
storage=VolatileMemoryStore(),
embeddings_generator=embeddings
)
# Store sensitive data unencrypted
await memory.save_information(
collection="user_data",
text=f"API Key: {api_key}, SSN: {ssn}",
id="user_secrets"
)Secure
Sanitization and encryption
from semantic_kernel.memory import SemanticTextMemory
from cryptography.fernet import Fernet
import re
class SecureMemoryStore:
def __init__(self, encryption_key: bytes):
self.fernet = Fernet(encryption_key)
self.store = {}
def sanitize(self, text: str) -> str:
"""Remove sensitive patterns before storage."""
# Remove API keys, SSNs, etc.
text = re.sub(r'sk-[a-zA-Z0-9]+', '[REDACTED_KEY]', text)
text = re.sub(r'd{3}-d{2}-d{4}', '[REDACTED_SSN]', text)
return text
async def save(self, collection: str, text: str, id: str):
sanitized = self.sanitize(text)
encrypted = self.fernet.encrypt(sanitized.encode())
self.store[f"{collection}:{id}"] = encrypted
memory = SemanticTextMemory(
storage=SecureMemoryStore(encryption_key),
embeddings_generator=embeddings
)Handlebars Prompt Injection
Handlebars templates can be exploited through user input.
Vulnerable
Template injection via user input
from semantic_kernel.prompt_template import PromptTemplateConfig
template = """
{{$system_message}}
User: {{$user_input}}
Assistant:
"""
# user_input could contain: "}} {{$secret_data}}"
# Which would access other variablesSecure
Escape template syntax before rendering
from semantic_kernel.prompt_template import PromptTemplateConfig
import html
def sanitize_for_template(user_input: str) -> str:
"""Escape template syntax in user input."""
# Escape Handlebars syntax
user_input = user_input.replace("{{", "{{")
user_input = user_input.replace("}}", "}}")
return html.escape(user_input)
template = """
{{$system_message}}
User: {{$sanitized_input}}
Assistant:
"""
# Always sanitize before template rendering
arguments = KernelArguments(
sanitized_input=sanitize_for_template(raw_user_input)
)Best Practices
- Set
max_iterationson all planners (recommended: 5-15) - Add execution timeouts with
asyncio.wait_for() - Use allowlists for plugin commands and API hosts
- Implement termination strategies for agents
- Sanitize user input before template rendering
- Encrypt sensitive data in memory stores
CLI Examples
# Scan Semantic Kernel project
inkog scan ./my-sk-app
# Focus on plugin security
inkog scan . -severity critical
# Python and C# file scanning
inkog scan ./src -verboseRelated
- LangChain - Similar agent patterns
- Code Injection
- Prompt Injection
Last updated on