How Do AI Agents Handle Credentials and API Keys Securely?
Problem
I built an AI agent that calls an external API. The tutorials I followed showed me this:
# BAD: What tutorials teachapi_key = "sk-proj-xxxxx"client = OpenAI(api_key=api_key)
async def process(self, input: str): response = await client.generate(input) return responseThis worked in development. When I deployed to production:
# Production disasterERROR: API key exposed in git historyERROR: Cannot rotate key without redeployingERROR: Agent crashed when key expired mid-taskERROR: Security audit failed - no access controlI discovered that 90% of tutorials hardcode API keys. Zero production agents can do this.
The Real Challenge
A Reddit thread on “Developers who actually built AI agents” revealed the core issue:
“The minimum viable agent that taught me the most: one that needs to call an external API it does not own the key for. Not because of the API call itself, but because it forces you to think about where credentials live, how the agent requests capabilities at runtime, and what happens when access is denied mid-task. Most tutorials skip this because they hardcode keys. Production agents cannot.” - Specialist-Heat-6414
This taught me three critical questions that tutorials ignore:
- Where do credentials live? (Not in code)
- How does the agent request capabilities at runtime? (Dynamic, not hardcoded)
- What happens when access is denied? (Graceful degradation)
Environment
- Python 3.12
- HashiCorp Vault for secret management
- PostgreSQL for audit logs
- Redis for credential caching
- LangGraph for agent orchestration
Solution
Production AI agents need a three-layer credential management architecture:
┌─────────────────┐│ AI Agent ││ ││ 1. Request ││ Capability │└────────┬────────┘ │ ▼┌─────────────────┐│ Auth Layer ││ ││ 2. Validate & ││ Inject Creds │└────────┬────────┘ │ ▼┌─────────────────┐│ Secret Manager ││ (Vault/AWS SM) ││ ││ 3. Store Keys ││ Securely │└─────────────────┘Layer 1: Secure Storage
Never store credentials in code or environment files. I implemented a secret manager abstraction:
from abc import ABC, abstractmethodfrom typing import Optionalimport hvacfrom dataclasses import dataclass
@dataclassclass Credential: """Represents a secure credential""" key_id: str secret: str expires_at: Optional[datetime] = None permissions: list[str] = None
class SecretManager(ABC): """Abstract interface for secret storage"""
@abstractmethod async def get_credential(self, key_name: str) -> Credential: """Retrieve a credential by name""" pass
@abstractmethod async def rotate_credential(self, key_name: str) -> Credential: """Rotate a credential""" pass
class VaultSecretManager(SecretManager): """HashiCorp Vault implementation"""
def __init__(self, vault_url: str, role_id: str, secret_id: str): self.client = hvac.Client(url=vault_url) self.client.auth.approle.login(role_id, secret_id)
async def get_credential(self, key_name: str) -> Credential: """Get credential from Vault""" response = self.client.secrets.kv.v2.read_secret_version( path=key_name )
data = response['data']['data'] return Credential( key_id=data['key_id'], secret=data['secret'], expires_at=datetime.fromisoformat(data.get('expires_at')), permissions=data.get('permissions', []) )
async def rotate_credential(self, key_name: str) -> Credential: """Rotate credential in Vault""" # Generate new key new_key = await self._generate_new_key(key_name)
# Store in Vault self.client.secrets.kv.v2.create_or_update_secret( path=key_name, secret={ 'key_id': new_key.key_id, 'secret': new_key.secret, 'expires_at': new_key.expires_at.isoformat(), 'permissions': new_key.permissions } )
return new_keyWhen I retrieve credentials:
# Initialize secret managervault = VaultSecretManager( vault_url="https://vault.example.com", role_id=os.environ["VAULT_ROLE_ID"], secret_id=os.environ["VAULT_SECRET_ID"])
# Get credential (never stored in code)openai_key = await vault.get_credential("openai-api-key")print(f"Key ID: {openai_key.key_id}") # Safe to log# Never print openai_key.secret!Layer 2: Runtime Capability Requests
Agents should request permissions dynamically, not have them hardcoded. I implemented a capability-based access control system:
from enum import Enumfrom typing import Callable, Awaitablefrom dataclasses import dataclass
class Permission(Enum): READ_EMAILS = "read_emails" SEND_MESSAGES = "send_messages" ACCESS_CALENDAR = "access_calendar" CALL_EXTERNAL_API = "call_external_api"
@dataclassclass CapabilityRequest: """Request for a specific capability""" agent_id: str permission: Permission context: dict justification: str ttl_seconds: int = 3600 # Time-limited access
class CapabilityManager: """Manages runtime capability requests"""
def __init__(self, secret_manager: SecretManager, audit_log): self.secret_manager = secret_manager self.audit_log = audit_log self.capability_cache = {} # Short-lived cache
async def request_capability( self, request: CapabilityRequest ) -> tuple[bool, Optional[Credential]]: """Request a capability at runtime"""
# 1. Log the request for audit await self.audit_log.log_capability_request(request)
# 2. Check if agent has permission if not await self._check_permission(request): await self.audit_log.log_capability_denied(request) return False, None
# 3. Get credential with limited scope credential = await self.secret_manager.get_credential( f"{request.agent_id}_{request.permission.value}" )
# 4. Cache with TTL cache_key = f"{request.agent_id}:{request.permission.value}" self.capability_cache[cache_key] = { 'credential': credential, 'expires_at': datetime.now() + timedelta(seconds=request.ttl_seconds) }
await self.audit_log.log_capability_granted(request) return True, credential
async def _check_permission(self, request: CapabilityRequest) -> bool: """Check if agent has permission""" # Implement your permission logic here # This could check: # - Agent role # - User who started the agent # - Current context # - Time of day # - Rate limits return True # Simplified for exampleWhen an agent needs to call an API:
class SecureAgent: """Agent with runtime capability requests"""
def __init__(self, agent_id: str, capability_manager: CapabilityManager): self.agent_id = agent_id self.capability_manager = capability_manager
async def call_external_api(self, api_name: str, data: dict) -> dict: """Call external API with runtime credential request"""
# Request capability at runtime granted, credential = await self.capability_manager.request_capability( CapabilityRequest( agent_id=self.agent_id, permission=Permission.CALL_EXTERNAL_API, context={'api_name': api_name}, justification=f"Processing user request: {data.get('task')}" ) )
if not granted: raise PermissionError( f"Agent {self.agent_id} not authorized to call {api_name}" )
# Use credential just-in-time try: client = APIClient(api_name, credential.secret) result = await client.call(data) return result finally: # Clear credential from memory del credentialLayer 3: Access Denial Handling
Agents must handle credential failures gracefully. I implemented comprehensive error handling:
from enum import Enumfrom typing import Optional
class CredentialError(Enum): """Types of credential errors""" EXPIRED = "expired" REVOKED = "revoked" INVALID = "invalid" RATE_LIMITED = "rate_limited" PERMISSION_DENIED = "permission_denied"
class CredentialErrorHandler: """Handle credential access failures"""
def __init__( self, secret_manager: SecretManager, fallback_strategy: 'FallbackStrategy', notification_service ): self.secret_manager = secret_manager self.fallback = fallback_strategy self.notifier = notification_service
async def handle_error( self, error: CredentialError, context: dict ) -> tuple[bool, Optional[str]]: """Handle credential error with fallback"""
match error: case CredentialError.EXPIRED: # Try to refresh return await self._handle_expired(context)
case CredentialError.REVOKED: # Escalate to human return await self._handle_revoked(context)
case CredentialError.RATE_LIMITED: # Wait and retry return await self._handle_rate_limited(context)
case CredentialError.INVALID: # Try rotation return await self._handle_invalid(context)
case CredentialError.PERMISSION_DENIED: # Use fallback return await self._handle_denied(context)
async def _handle_expired(self, context: dict) -> tuple[bool, Optional[str]]: """Handle expired credentials""" try: # Attempt to get fresh credential new_cred = await self.secret_manager.get_credential( context['credential_name'] ) return True, new_cred.secret except Exception as e: await self.notifier.alert( f"Credential refresh failed: {e}", severity="high" ) return False, None
async def _handle_revoked(self, context: dict) -> tuple[bool, Optional[str]]: """Handle revoked credentials""" # Alert security team await self.notifier.alert( f"Credential revoked for {context['agent_id']}", severity="critical" )
# Use fallback strategy fallback_result = await self.fallback.execute(context) return True, fallback_result
async def _handle_rate_limited(self, context: dict) -> tuple[bool, Optional[str]]: """Handle rate limiting""" wait_time = context.get('retry_after', 60)
await asyncio.sleep(wait_time)
# Retry return True, "retry"
async def _handle_invalid(self, context: dict) -> tuple[bool, Optional[str]]: """Handle invalid credentials""" try: # Attempt rotation new_cred = await self.secret_manager.rotate_credential( context['credential_name'] ) return True, new_cred.secret except Exception as e: await self.notifier.alert( f"Credential rotation failed: {e}", severity="high" ) return False, None
async def _handle_denied(self, context: dict) -> tuple[bool, Optional[str]]: """Handle permission denied""" # Use fallback fallback_result = await self.fallback.execute(context) return True, fallback_resultComplete Secure Agent
Here’s the complete production-ready secure agent:
class SecureAgent: """Production AI agent with secure credential management"""
def __init__( self, agent_id: str, capability_manager: CapabilityManager, error_handler: CredentialErrorHandler, audit_log ): self.agent_id = agent_id self.capability_manager = capability_manager self.error_handler = error_handler self.audit_log = audit_log
async def execute_task(self, task: dict) -> dict: """Execute task with secure credential handling"""
request_id = str(uuid.uuid4()) await self.audit_log.log_task_start(request_id, task)
try: # Determine what capabilities are needed required_capabilities = await self._analyze_requirements(task)
results = {} for capability in required_capabilities: result = await self._execute_with_capability( request_id, capability, task ) results[capability.value] = result
await self.audit_log.log_task_complete(request_id, results) return {'success': True, 'results': results}
except PermissionError as e: await self.audit_log.log_task_denied(request_id, str(e)) return {'success': False, 'error': str(e)}
except Exception as e: await self.audit_log.log_task_error(request_id, str(e)) return {'success': False, 'error': str(e)}
async def _execute_with_capability( self, request_id: str, capability: Permission, task: dict ) -> any: """Execute with runtime capability request"""
# Request capability granted, credential = await self.capability_manager.request_capability( CapabilityRequest( agent_id=self.agent_id, permission=capability, context={'task': task, 'request_id': request_id}, justification=f"Task: {task.get('description', 'Unknown')}" ) )
if not granted: raise PermissionError(f"Capability {capability.value} denied")
try: # Execute with credential result = await self._do_work(capability, credential, task) return result
except CredentialExpiredError: # Handle expired credential success, new_cred = await self.error_handler.handle_error( CredentialError.EXPIRED, {'credential_name': f"{self.agent_id}_{capability.value}"} ) if success: result = await self._do_work(capability, new_cred, task) return result raise
except CredentialRevokedError: # Handle revoked credential success, fallback = await self.error_handler.handle_error( CredentialError.REVOKED, {'agent_id': self.agent_id, 'capability': capability.value} ) if success: return fallback raise
finally: # Always clear credential from memory if credential: del credential
async def _do_work( self, capability: Permission, credential: Credential, task: dict ) -> any: """Perform the actual work with credential""" # Implement your work logic here pass
async def _analyze_requirements(self, task: dict) -> list[Permission]: """Analyze what capabilities are needed""" required = []
if 'send_email' in task.get('actions', []): required.append(Permission.SEND_MESSAGES)
if 'read_calendar' in task.get('actions', []): required.append(Permission.ACCESS_CALENDAR)
if 'external_api' in task.get('actions', []): required.append(Permission.CALL_EXTERNAL_API)
return requiredCommon Mistakes to Avoid
Mistake 1: Environment Variables Are “Secure Enough”
# WRONG: Still vulnerableapi_key = os.environ.get('OPENAI_API_KEY')Environment variables are visible in:
- Process listings
- Docker inspect
- Crash dumps
- Log files
Fix: Use secret managers with audit trails.
Mistake 2: Caching Credentials for Performance
# WRONG: Credentials live in memory too longclass Agent: def __init__(self): self.api_key = get_credentials() # Cached for agent lifetimeFix: Request credentials just-in-time, clear immediately after use.
Mistake 3: Giving Agents Unrestricted Tool Access
# WRONG: No permission checksagent.add_tool(external_api, unrestricted=True)Fix: Implement capability-based access control.
Mistake 4: Ignoring Access Denial Scenarios
# WRONG: No error handlingcreds = get_credentials()api_call(creds) # What if creds are expired/revoked?Fix: Comprehensive error handling with retry and fallback logic.
Mistake 5: Logging Credentials for Debugging
# WRONG: Credentials appear in logslogger.info(f"API call with key: {api_key}")Fix: Never log credentials. Only log access events.
Real-World Scenarios
This architecture handles production scenarios tutorials ignore:
Multi-Tenant Agents
class MultiTenantAgent: """Agent that handles multiple users' credentials"""
async def process_for_user(self, user_id: str, task: dict): # Get user-specific credential credential = await self.secret_manager.get_credential( f"user_{user_id}_api_key" )
# User A's agent cannot access User B's APIs result = await self.execute_with_credential(credential, task)
return resultTemporary Access
async def grant_temporary_access(agent_id: str, capability: Permission, duration: int): """Grant time-limited access"""
# Create temporary credential temp_cred = await vault.create_temporary_credential( path=f"temp/{agent_id}/{capability.value}", ttl=duration )
# Agent can only use this for 'duration' seconds return temp_credAudit Compliance
class AuditLog: """Complete audit trail for compliance"""
async def log_capability_request(self, request: CapabilityRequest): await self.db.insert('audit_log', { 'timestamp': datetime.now(), 'agent_id': request.agent_id, 'action': 'capability_request', 'permission': request.permission.value, 'justification': request.justification, 'context': json.dumps(request.context) })
async def log_capability_denied(self, request: CapabilityRequest): await self.db.insert('audit_log', { 'timestamp': datetime.now(), 'agent_id': request.agent_id, 'action': 'capability_denied', 'permission': request.permission.value, 'reason': 'permission_check_failed' })Summary
In this post, I showed how production AI agents handle credentials and API keys securely. The key point is implementing three layers: secure storage (secret managers, not code), runtime capability requests (dynamic, not hardcoded), and access denial handling (graceful degradation, not crashes).
Most tutorials hardcode keys because it’s simpler. This creates a dangerous gap between learning materials and production requirements. Real agents need to handle expired credentials, revoked access, and permission changes mid-task.
The cost of getting this wrong is high: leaked API keys, compliance violations, security breaches, and agent downtime during key rotation. Get it right from the start.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- 👨💻 Reddit: Developers who actually built AI agents
- 👨💻 HashiCorp Vault
- 👨💻 AWS Secrets Manager
- 👨💻 Circuit Breaker Pattern
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments