How to Implement Webhooks and Real-Time Triggers for AI Agents
Purpose
This post shows how to implement webhooks and real-time triggers for AI agents.
Problem
When I built my first AI agent with Telegram integration, I made a mistake. I handled webhooks synchronously:
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook/telegram', methods=['POST'])def telegram_webhook(): data = request.json
# BAD: Blocks webhook handler on LLM call response = openai.ChatCompletion.create( model="gpt-4", messages=[{"role": "user", "content": data['message']['text']}] )
# Send response back to Telegram send_telegram_message(data['message']['chat']['id'], response)
return {'status': 'ok'}When I tested this:
# Send a messagecurl -X POST http://localhost:5000/webhook/telegram \ -H "Content-Type: application/json" \ -d '{"message": {"text": "Hello", "chat": {"id": 123}}}'
# Request hangs for 10 seconds (LLM call)# Telegram retries because no immediate responseThe webhook timed out because the LLM call took too long. Telegram retried the webhook, causing duplicate messages.
Environment
- Python 3.12
- FastAPI for webhooks
- Redis for queue
- Telegram Bot API
- OpenAI API
Solution
I implemented an event-driven architecture with four layers:
- Webhook Receiver - HTTP endpoint, validation, auth
- Event Queue - Buffer for bursts (Redis)
- Agent Processor - Workers that process events
- Response Handler - Send responses back to platform
Webhook Receiver
Here’s the non-blocking webhook handler:
from fastapi import FastAPI, Request, BackgroundTasks, HTTPExceptionimport hashlibimport hmac
app = FastAPI()
@app.post('/webhook/telegram')async def telegram_webhook(request: Request): # 1. Verify webhook signature signature = request.headers.get('X-Telegram-Bot-API-Secret-Token') if not verify_signature(await request.body(), signature): raise HTTPException(401, "Invalid signature")
# 2. Parse and validate data = await request.json() event = TelegramEvent(**data)
# 3. Queue for async processing (non-blocking) await event_queue.publish(event)
# 4. Return immediately (200 OK) return {'status': 'queued'}
def verify_signature(payload: bytes, signature: str) -> bool: """Verify webhook HMAC signature""" expected = hmac.new( WEBHOOK_SECRET.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected, signature)Now when I test:
curl -X POST http://localhost:8000/webhook/telegram \ -H "Content-Type: application/json" \ -d '{"message": {"text": "Hello", "chat": {"id": 123}}}'
# Immediate response{"status":"queued"}Event Queue
I use Redis as a message queue:
import redis.asyncio as redisimport json
class EventQueue: def __init__(self, redis_url: str): self.redis = redis.from_url(redis_url)
async def publish(self, event: TelegramEvent): await self.redis.lpush('webhook_events', json.dumps(event.dict()))
async def dequeue(self) -> TelegramEvent: _, data = await self.redis.brpop('webhook_events', timeout=1) return TelegramEvent(**json.loads(data))
async def size(self) -> int: return await self.redis.llen('webhook_events')Agent Worker
The worker processes events from the queue:
import asynciofrom typing import AsyncGenerator
class AgentWorker: def __init__(self, agent: AIAgent, queue: MessageQueue): self.agent = agent self.queue = queue self.concurrency = 10 # Process 10 events concurrently
async def start(self): """Start worker pool""" tasks = [ asyncio.create_task(self.process_events()) for _ in range(self.concurrency) ] await asyncio.gather(*tasks)
async def process_events(self): """Worker loop""" while True: event = await self.queue.dequeue() try: await self.handle_event(event) except Exception as e: logger.error(f"Error processing event: {e}") # Retry or dead-letter queue await self.queue.retry_later(event)
async def handle_event(self, event: TelegramEvent): """Process event through AI agent"""
# 1. Extract context conversation_history = await self.memory.get_history(event.chat_id) user_context = await self.get_user_context(event.user_id)
# 2. Agent reasoning (not just routing) agent_response = await self.agent.reason( message=event.message, history=conversation_history, context=user_context, available_tools=self.tools )
# 3. Execute agent decision if agent_response.type == 'message': await self.send_response(event.chat_id, agent_response.content) elif agent_response.type == 'tool_use': result = await self.execute_tool(agent_response.tool, agent_response.params) followup = await self.agent.reason_about_result(result) await self.send_response(event.chat_id, followup)
# 4. Store in memory await self.memory.add(event.chat_id, event.message, agent_response)Telegram Client
Here’s the Telegram integration:
import httpxfrom typing import List
class TelegramClient: def __init__(self, bot_token: str): self.base_url = f"https://api.telegram.org/bot{bot_token}" self.client = httpx.AsyncClient()
async def send_message(self, chat_id: int, text: str, parse_mode: str = 'Markdown'): """Send message to Telegram chat""" url = f"{self.base_url}/sendMessage" payload = { 'chat_id': chat_id, 'text': text, 'parse_mode': parse_mode } response = await self.client.post(url, json=payload) response.raise_for_status() return response.json()
async def send_typing_indicator(self, chat_id: int): """Show typing indicator while agent thinks""" url = f"{self.base_url}/sendChatAction" await self.client.post(url, json={ 'chat_id': chat_id, 'action': 'typing' })
def parse_webhook_event(self, data: dict) -> TelegramEvent: """Parse Telegram webhook payload""" message = data.get('message', {}) return TelegramEvent( chat_id=message['chat']['id'], user_id=message['from']['id'], username=message['from'].get('username'), message_text=message.get('text', ''), message_id=message['message_id'], raw=data )Rate Limiting
I added rate limiting to prevent abuse:
from collections import defaultdictimport asyncio
class RateLimiter: def __init__(self, max_requests: int, period: int): self.max_requests = max_requests self.period = period self.requests = defaultdict(list)
async def acquire(self, key: str): """Rate limit by key (e.g., chat_id)""" now = asyncio.get_event_loop().time() self.requests[key] = [ req_time for req_time in self.requests[key] if now - req_time < self.period ]
if len(self.requests[key]) >= self.max_requests: # Wait until oldest request expires wait_time = self.period - (now - self.requests[key][0]) await asyncio.sleep(wait_time)
self.requests[key].append(now) return True
# Usage in webhook handler@app.post('/webhook/telegram')async def telegram_webhook(request: Request): event = TelegramEvent(**await request.json())
# Rate limit per chat await rate_limiter.acquire(event.chat_id)
# Queue for processing await event_queue.publish(event) return {'status': 'queued'}Complete Architecture
Here’s the complete system:
from fastapi import FastAPIimport asyncio
app = FastAPI()
# Componentstelegram_client = TelegramClient(bot_token=settings.TELEGRAM_TOKEN)event_queue = RedisQueue(settings.REDIS_URL)agent_worker = AgentWorker(agent=ai_agent, queue=event_queue)rate_limiter = RateLimiter(max_requests=20, period=60)
# Webhook endpoints@app.post('/webhook/telegram')async def telegram_webhook(request: Request): event = telegram_client.parse_webhook_event(await request.json()) await rate_limiter.acquire(event.chat_id) await event_queue.publish('telegram', event) return {'status': 'queued'}
# Start worker on startup@app.on_event('startup')async def startup(): asyncio.create_task(agent_worker.start())When I run this:
# Start the serveruvicorn main:app --host 0.0.0.0 --port 8000
# Send multiple messages rapidlyfor i in {1..100}; do curl -X POST http://localhost:8000/webhook/telegram \ -H "Content-Type: application/json" \ -d "{\"message\": {\"text\": \"Message $i\", \"chat\": {\"id\": 123}}}"done
# All requests return immediately{"status":"queued"}{"status":"queued"}...
# Workers process in background# No timeouts, no duplicate messagesWhy This Matters
The async architecture solves several problems:
- No timeouts - Webhook returns immediately
- Scalability - Workers handle bursts of traffic
- Reliability - Queue buffers incoming events
- Monitoring - Can track queue depth and processing time
Summary
In this post, I showed how to implement webhooks for AI agents using an async architecture with queues and workers. The key point is separating webhook reception from agent processing - validate quickly, queue immediately, process asynchronously. This gives you real-time responsiveness without blocking on LLM calls.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments