Skip to content

How Do AI Agents Handle Credentials and API Keys Securely?

Problem

I built an AI agent that calls an external API. The tutorials I followed showed me this:

tutorial-agent.py
# BAD: What tutorials teach
api_key = "sk-proj-xxxxx"
client = OpenAI(api_key=api_key)
async def process(self, input: str):
response = await client.generate(input)
return response

This worked in development. When I deployed to production:

Terminal window
# Production disaster
ERROR: API key exposed in git history
ERROR: Cannot rotate key without redeploying
ERROR: Agent crashed when key expired mid-task
ERROR: Security audit failed - no access control

I discovered that 90% of tutorials hardcode API keys. Zero production agents can do this.

The Real Challenge

A Reddit thread on “Developers who actually built AI agents” revealed the core issue:

“The minimum viable agent that taught me the most: one that needs to call an external API it does not own the key for. Not because of the API call itself, but because it forces you to think about where credentials live, how the agent requests capabilities at runtime, and what happens when access is denied mid-task. Most tutorials skip this because they hardcode keys. Production agents cannot.” - Specialist-Heat-6414

This taught me three critical questions that tutorials ignore:

  1. Where do credentials live? (Not in code)
  2. How does the agent request capabilities at runtime? (Dynamic, not hardcoded)
  3. What happens when access is denied? (Graceful degradation)

Environment

  • Python 3.12
  • HashiCorp Vault for secret management
  • PostgreSQL for audit logs
  • Redis for credential caching
  • LangGraph for agent orchestration

Solution

Production AI agents need a three-layer credential management architecture:

┌─────────────────┐
│ AI Agent │
│ │
│ 1. Request │
│ Capability │
└────────┬────────┘
┌─────────────────┐
│ Auth Layer │
│ │
│ 2. Validate & │
│ Inject Creds │
└────────┬────────┘
┌─────────────────┐
│ Secret Manager │
│ (Vault/AWS SM) │
│ │
│ 3. Store Keys │
│ Securely │
└─────────────────┘

Layer 1: Secure Storage

Never store credentials in code or environment files. I implemented a secret manager abstraction:

secret_manager.py
from abc import ABC, abstractmethod
from typing import Optional
import hvac
from dataclasses import dataclass
@dataclass
class Credential:
"""Represents a secure credential"""
key_id: str
secret: str
expires_at: Optional[datetime] = None
permissions: list[str] = None
class SecretManager(ABC):
"""Abstract interface for secret storage"""
@abstractmethod
async def get_credential(self, key_name: str) -> Credential:
"""Retrieve a credential by name"""
pass
@abstractmethod
async def rotate_credential(self, key_name: str) -> Credential:
"""Rotate a credential"""
pass
class VaultSecretManager(SecretManager):
"""HashiCorp Vault implementation"""
def __init__(self, vault_url: str, role_id: str, secret_id: str):
self.client = hvac.Client(url=vault_url)
self.client.auth.approle.login(role_id, secret_id)
async def get_credential(self, key_name: str) -> Credential:
"""Get credential from Vault"""
response = self.client.secrets.kv.v2.read_secret_version(
path=key_name
)
data = response['data']['data']
return Credential(
key_id=data['key_id'],
secret=data['secret'],
expires_at=datetime.fromisoformat(data.get('expires_at')),
permissions=data.get('permissions', [])
)
async def rotate_credential(self, key_name: str) -> Credential:
"""Rotate credential in Vault"""
# Generate new key
new_key = await self._generate_new_key(key_name)
# Store in Vault
self.client.secrets.kv.v2.create_or_update_secret(
path=key_name,
secret={
'key_id': new_key.key_id,
'secret': new_key.secret,
'expires_at': new_key.expires_at.isoformat(),
'permissions': new_key.permissions
}
)
return new_key

When I retrieve credentials:

get_credential.py
# Initialize secret manager
vault = VaultSecretManager(
vault_url="https://vault.example.com",
role_id=os.environ["VAULT_ROLE_ID"],
secret_id=os.environ["VAULT_SECRET_ID"]
)
# Get credential (never stored in code)
openai_key = await vault.get_credential("openai-api-key")
print(f"Key ID: {openai_key.key_id}") # Safe to log
# Never print openai_key.secret!

Layer 2: Runtime Capability Requests

Agents should request permissions dynamically, not have them hardcoded. I implemented a capability-based access control system:

capability_manager.py
from enum import Enum
from typing import Callable, Awaitable
from dataclasses import dataclass
class Permission(Enum):
READ_EMAILS = "read_emails"
SEND_MESSAGES = "send_messages"
ACCESS_CALENDAR = "access_calendar"
CALL_EXTERNAL_API = "call_external_api"
@dataclass
class CapabilityRequest:
"""Request for a specific capability"""
agent_id: str
permission: Permission
context: dict
justification: str
ttl_seconds: int = 3600 # Time-limited access
class CapabilityManager:
"""Manages runtime capability requests"""
def __init__(self, secret_manager: SecretManager, audit_log):
self.secret_manager = secret_manager
self.audit_log = audit_log
self.capability_cache = {} # Short-lived cache
async def request_capability(
self,
request: CapabilityRequest
) -> tuple[bool, Optional[Credential]]:
"""Request a capability at runtime"""
# 1. Log the request for audit
await self.audit_log.log_capability_request(request)
# 2. Check if agent has permission
if not await self._check_permission(request):
await self.audit_log.log_capability_denied(request)
return False, None
# 3. Get credential with limited scope
credential = await self.secret_manager.get_credential(
f"{request.agent_id}_{request.permission.value}"
)
# 4. Cache with TTL
cache_key = f"{request.agent_id}:{request.permission.value}"
self.capability_cache[cache_key] = {
'credential': credential,
'expires_at': datetime.now() + timedelta(seconds=request.ttl_seconds)
}
await self.audit_log.log_capability_granted(request)
return True, credential
async def _check_permission(self, request: CapabilityRequest) -> bool:
"""Check if agent has permission"""
# Implement your permission logic here
# This could check:
# - Agent role
# - User who started the agent
# - Current context
# - Time of day
# - Rate limits
return True # Simplified for example

When an agent needs to call an API:

agent_capability.py
class SecureAgent:
"""Agent with runtime capability requests"""
def __init__(self, agent_id: str, capability_manager: CapabilityManager):
self.agent_id = agent_id
self.capability_manager = capability_manager
async def call_external_api(self, api_name: str, data: dict) -> dict:
"""Call external API with runtime credential request"""
# Request capability at runtime
granted, credential = await self.capability_manager.request_capability(
CapabilityRequest(
agent_id=self.agent_id,
permission=Permission.CALL_EXTERNAL_API,
context={'api_name': api_name},
justification=f"Processing user request: {data.get('task')}"
)
)
if not granted:
raise PermissionError(
f"Agent {self.agent_id} not authorized to call {api_name}"
)
# Use credential just-in-time
try:
client = APIClient(api_name, credential.secret)
result = await client.call(data)
return result
finally:
# Clear credential from memory
del credential

Layer 3: Access Denial Handling

Agents must handle credential failures gracefully. I implemented comprehensive error handling:

credential_handler.py
from enum import Enum
from typing import Optional
class CredentialError(Enum):
"""Types of credential errors"""
EXPIRED = "expired"
REVOKED = "revoked"
INVALID = "invalid"
RATE_LIMITED = "rate_limited"
PERMISSION_DENIED = "permission_denied"
class CredentialErrorHandler:
"""Handle credential access failures"""
def __init__(
self,
secret_manager: SecretManager,
fallback_strategy: 'FallbackStrategy',
notification_service
):
self.secret_manager = secret_manager
self.fallback = fallback_strategy
self.notifier = notification_service
async def handle_error(
self,
error: CredentialError,
context: dict
) -> tuple[bool, Optional[str]]:
"""Handle credential error with fallback"""
match error:
case CredentialError.EXPIRED:
# Try to refresh
return await self._handle_expired(context)
case CredentialError.REVOKED:
# Escalate to human
return await self._handle_revoked(context)
case CredentialError.RATE_LIMITED:
# Wait and retry
return await self._handle_rate_limited(context)
case CredentialError.INVALID:
# Try rotation
return await self._handle_invalid(context)
case CredentialError.PERMISSION_DENIED:
# Use fallback
return await self._handle_denied(context)
async def _handle_expired(self, context: dict) -> tuple[bool, Optional[str]]:
"""Handle expired credentials"""
try:
# Attempt to get fresh credential
new_cred = await self.secret_manager.get_credential(
context['credential_name']
)
return True, new_cred.secret
except Exception as e:
await self.notifier.alert(
f"Credential refresh failed: {e}",
severity="high"
)
return False, None
async def _handle_revoked(self, context: dict) -> tuple[bool, Optional[str]]:
"""Handle revoked credentials"""
# Alert security team
await self.notifier.alert(
f"Credential revoked for {context['agent_id']}",
severity="critical"
)
# Use fallback strategy
fallback_result = await self.fallback.execute(context)
return True, fallback_result
async def _handle_rate_limited(self, context: dict) -> tuple[bool, Optional[str]]:
"""Handle rate limiting"""
wait_time = context.get('retry_after', 60)
await asyncio.sleep(wait_time)
# Retry
return True, "retry"
async def _handle_invalid(self, context: dict) -> tuple[bool, Optional[str]]:
"""Handle invalid credentials"""
try:
# Attempt rotation
new_cred = await self.secret_manager.rotate_credential(
context['credential_name']
)
return True, new_cred.secret
except Exception as e:
await self.notifier.alert(
f"Credential rotation failed: {e}",
severity="high"
)
return False, None
async def _handle_denied(self, context: dict) -> tuple[bool, Optional[str]]:
"""Handle permission denied"""
# Use fallback
fallback_result = await self.fallback.execute(context)
return True, fallback_result

Complete Secure Agent

Here’s the complete production-ready secure agent:

secure_agent.py
class SecureAgent:
"""Production AI agent with secure credential management"""
def __init__(
self,
agent_id: str,
capability_manager: CapabilityManager,
error_handler: CredentialErrorHandler,
audit_log
):
self.agent_id = agent_id
self.capability_manager = capability_manager
self.error_handler = error_handler
self.audit_log = audit_log
async def execute_task(self, task: dict) -> dict:
"""Execute task with secure credential handling"""
request_id = str(uuid.uuid4())
await self.audit_log.log_task_start(request_id, task)
try:
# Determine what capabilities are needed
required_capabilities = await self._analyze_requirements(task)
results = {}
for capability in required_capabilities:
result = await self._execute_with_capability(
request_id,
capability,
task
)
results[capability.value] = result
await self.audit_log.log_task_complete(request_id, results)
return {'success': True, 'results': results}
except PermissionError as e:
await self.audit_log.log_task_denied(request_id, str(e))
return {'success': False, 'error': str(e)}
except Exception as e:
await self.audit_log.log_task_error(request_id, str(e))
return {'success': False, 'error': str(e)}
async def _execute_with_capability(
self,
request_id: str,
capability: Permission,
task: dict
) -> any:
"""Execute with runtime capability request"""
# Request capability
granted, credential = await self.capability_manager.request_capability(
CapabilityRequest(
agent_id=self.agent_id,
permission=capability,
context={'task': task, 'request_id': request_id},
justification=f"Task: {task.get('description', 'Unknown')}"
)
)
if not granted:
raise PermissionError(f"Capability {capability.value} denied")
try:
# Execute with credential
result = await self._do_work(capability, credential, task)
return result
except CredentialExpiredError:
# Handle expired credential
success, new_cred = await self.error_handler.handle_error(
CredentialError.EXPIRED,
{'credential_name': f"{self.agent_id}_{capability.value}"}
)
if success:
result = await self._do_work(capability, new_cred, task)
return result
raise
except CredentialRevokedError:
# Handle revoked credential
success, fallback = await self.error_handler.handle_error(
CredentialError.REVOKED,
{'agent_id': self.agent_id, 'capability': capability.value}
)
if success:
return fallback
raise
finally:
# Always clear credential from memory
if credential:
del credential
async def _do_work(
self,
capability: Permission,
credential: Credential,
task: dict
) -> any:
"""Perform the actual work with credential"""
# Implement your work logic here
pass
async def _analyze_requirements(self, task: dict) -> list[Permission]:
"""Analyze what capabilities are needed"""
required = []
if 'send_email' in task.get('actions', []):
required.append(Permission.SEND_MESSAGES)
if 'read_calendar' in task.get('actions', []):
required.append(Permission.ACCESS_CALENDAR)
if 'external_api' in task.get('actions', []):
required.append(Permission.CALL_EXTERNAL_API)
return required

Common Mistakes to Avoid

Mistake 1: Environment Variables Are “Secure Enough”

mistake-env.py
# WRONG: Still vulnerable
api_key = os.environ.get('OPENAI_API_KEY')

Environment variables are visible in:

  • Process listings
  • Docker inspect
  • Crash dumps
  • Log files

Fix: Use secret managers with audit trails.

Mistake 2: Caching Credentials for Performance

mistake-cache.py
# WRONG: Credentials live in memory too long
class Agent:
def __init__(self):
self.api_key = get_credentials() # Cached for agent lifetime

Fix: Request credentials just-in-time, clear immediately after use.

Mistake 3: Giving Agents Unrestricted Tool Access

mistake-unrestricted.py
# WRONG: No permission checks
agent.add_tool(external_api, unrestricted=True)

Fix: Implement capability-based access control.

Mistake 4: Ignoring Access Denial Scenarios

mistake-no-handling.py
# WRONG: No error handling
creds = get_credentials()
api_call(creds) # What if creds are expired/revoked?

Fix: Comprehensive error handling with retry and fallback logic.

Mistake 5: Logging Credentials for Debugging

mistake-logging.py
# WRONG: Credentials appear in logs
logger.info(f"API call with key: {api_key}")

Fix: Never log credentials. Only log access events.

Real-World Scenarios

This architecture handles production scenarios tutorials ignore:

Multi-Tenant Agents

multi-tenant.py
class MultiTenantAgent:
"""Agent that handles multiple users' credentials"""
async def process_for_user(self, user_id: str, task: dict):
# Get user-specific credential
credential = await self.secret_manager.get_credential(
f"user_{user_id}_api_key"
)
# User A's agent cannot access User B's APIs
result = await self.execute_with_credential(credential, task)
return result

Temporary Access

temporary-access.py
async def grant_temporary_access(agent_id: str, capability: Permission, duration: int):
"""Grant time-limited access"""
# Create temporary credential
temp_cred = await vault.create_temporary_credential(
path=f"temp/{agent_id}/{capability.value}",
ttl=duration
)
# Agent can only use this for 'duration' seconds
return temp_cred

Audit Compliance

audit.py
class AuditLog:
"""Complete audit trail for compliance"""
async def log_capability_request(self, request: CapabilityRequest):
await self.db.insert('audit_log', {
'timestamp': datetime.now(),
'agent_id': request.agent_id,
'action': 'capability_request',
'permission': request.permission.value,
'justification': request.justification,
'context': json.dumps(request.context)
})
async def log_capability_denied(self, request: CapabilityRequest):
await self.db.insert('audit_log', {
'timestamp': datetime.now(),
'agent_id': request.agent_id,
'action': 'capability_denied',
'permission': request.permission.value,
'reason': 'permission_check_failed'
})

Summary

In this post, I showed how production AI agents handle credentials and API keys securely. The key point is implementing three layers: secure storage (secret managers, not code), runtime capability requests (dynamic, not hardcoded), and access denial handling (graceful degradation, not crashes).

Most tutorials hardcode keys because it’s simpler. This creates a dangerous gap between learning materials and production requirements. Real agents need to handle expired credentials, revoked access, and permission changes mid-task.

The cost of getting this wrong is high: leaked API keys, compliance violations, security breaches, and agent downtime during key rotation. Get it right from the start.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments