Skip to content

How to Implement Webhooks and Real-Time Triggers for AI Agents

Purpose

This post shows how to implement webhooks and real-time triggers for AI agents.

Problem

When I built my first AI agent with Telegram integration, I made a mistake. I handled webhooks synchronously:

blocking-webhook.py
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook/telegram', methods=['POST'])
def telegram_webhook():
data = request.json
# BAD: Blocks webhook handler on LLM call
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": data['message']['text']}]
)
# Send response back to Telegram
send_telegram_message(data['message']['chat']['id'], response)
return {'status': 'ok'}

When I tested this:

Terminal window
# Send a message
curl -X POST http://localhost:5000/webhook/telegram \
-H "Content-Type: application/json" \
-d '{"message": {"text": "Hello", "chat": {"id": 123}}}'
# Request hangs for 10 seconds (LLM call)
# Telegram retries because no immediate response

The webhook timed out because the LLM call took too long. Telegram retried the webhook, causing duplicate messages.

Environment

  • Python 3.12
  • FastAPI for webhooks
  • Redis for queue
  • Telegram Bot API
  • OpenAI API

Solution

I implemented an event-driven architecture with four layers:

  1. Webhook Receiver - HTTP endpoint, validation, auth
  2. Event Queue - Buffer for bursts (Redis)
  3. Agent Processor - Workers that process events
  4. Response Handler - Send responses back to platform

Webhook Receiver

Here’s the non-blocking webhook handler:

webhook-receiver.py
from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
import hashlib
import hmac
app = FastAPI()
@app.post('/webhook/telegram')
async def telegram_webhook(request: Request):
# 1. Verify webhook signature
signature = request.headers.get('X-Telegram-Bot-API-Secret-Token')
if not verify_signature(await request.body(), signature):
raise HTTPException(401, "Invalid signature")
# 2. Parse and validate
data = await request.json()
event = TelegramEvent(**data)
# 3. Queue for async processing (non-blocking)
await event_queue.publish(event)
# 4. Return immediately (200 OK)
return {'status': 'queued'}
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify webhook HMAC signature"""
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)

Now when I test:

Terminal window
curl -X POST http://localhost:8000/webhook/telegram \
-H "Content-Type: application/json" \
-d '{"message": {"text": "Hello", "chat": {"id": 123}}}'
# Immediate response
{"status":"queued"}

Event Queue

I use Redis as a message queue:

event-queue.py
import redis.asyncio as redis
import json
class EventQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def publish(self, event: TelegramEvent):
await self.redis.lpush('webhook_events', json.dumps(event.dict()))
async def dequeue(self) -> TelegramEvent:
_, data = await self.redis.brpop('webhook_events', timeout=1)
return TelegramEvent(**json.loads(data))
async def size(self) -> int:
return await self.redis.llen('webhook_events')

Agent Worker

The worker processes events from the queue:

agent-worker.py
import asyncio
from typing import AsyncGenerator
class AgentWorker:
def __init__(self, agent: AIAgent, queue: MessageQueue):
self.agent = agent
self.queue = queue
self.concurrency = 10 # Process 10 events concurrently
async def start(self):
"""Start worker pool"""
tasks = [
asyncio.create_task(self.process_events())
for _ in range(self.concurrency)
]
await asyncio.gather(*tasks)
async def process_events(self):
"""Worker loop"""
while True:
event = await self.queue.dequeue()
try:
await self.handle_event(event)
except Exception as e:
logger.error(f"Error processing event: {e}")
# Retry or dead-letter queue
await self.queue.retry_later(event)
async def handle_event(self, event: TelegramEvent):
"""Process event through AI agent"""
# 1. Extract context
conversation_history = await self.memory.get_history(event.chat_id)
user_context = await self.get_user_context(event.user_id)
# 2. Agent reasoning (not just routing)
agent_response = await self.agent.reason(
message=event.message,
history=conversation_history,
context=user_context,
available_tools=self.tools
)
# 3. Execute agent decision
if agent_response.type == 'message':
await self.send_response(event.chat_id, agent_response.content)
elif agent_response.type == 'tool_use':
result = await self.execute_tool(agent_response.tool, agent_response.params)
followup = await self.agent.reason_about_result(result)
await self.send_response(event.chat_id, followup)
# 4. Store in memory
await self.memory.add(event.chat_id, event.message, agent_response)

Telegram Client

Here’s the Telegram integration:

telegram-client.py
import httpx
from typing import List
class TelegramClient:
def __init__(self, bot_token: str):
self.base_url = f"https://api.telegram.org/bot{bot_token}"
self.client = httpx.AsyncClient()
async def send_message(self, chat_id: int, text: str, parse_mode: str = 'Markdown'):
"""Send message to Telegram chat"""
url = f"{self.base_url}/sendMessage"
payload = {
'chat_id': chat_id,
'text': text,
'parse_mode': parse_mode
}
response = await self.client.post(url, json=payload)
response.raise_for_status()
return response.json()
async def send_typing_indicator(self, chat_id: int):
"""Show typing indicator while agent thinks"""
url = f"{self.base_url}/sendChatAction"
await self.client.post(url, json={
'chat_id': chat_id,
'action': 'typing'
})
def parse_webhook_event(self, data: dict) -> TelegramEvent:
"""Parse Telegram webhook payload"""
message = data.get('message', {})
return TelegramEvent(
chat_id=message['chat']['id'],
user_id=message['from']['id'],
username=message['from'].get('username'),
message_text=message.get('text', ''),
message_id=message['message_id'],
raw=data
)

Rate Limiting

I added rate limiting to prevent abuse:

rate-limiter.py
from collections import defaultdict
import asyncio
class RateLimiter:
def __init__(self, max_requests: int, period: int):
self.max_requests = max_requests
self.period = period
self.requests = defaultdict(list)
async def acquire(self, key: str):
"""Rate limit by key (e.g., chat_id)"""
now = asyncio.get_event_loop().time()
self.requests[key] = [
req_time for req_time in self.requests[key]
if now - req_time < self.period
]
if len(self.requests[key]) >= self.max_requests:
# Wait until oldest request expires
wait_time = self.period - (now - self.requests[key][0])
await asyncio.sleep(wait_time)
self.requests[key].append(now)
return True
# Usage in webhook handler
@app.post('/webhook/telegram')
async def telegram_webhook(request: Request):
event = TelegramEvent(**await request.json())
# Rate limit per chat
await rate_limiter.acquire(event.chat_id)
# Queue for processing
await event_queue.publish(event)
return {'status': 'queued'}

Complete Architecture

Here’s the complete system:

main.py
from fastapi import FastAPI
import asyncio
app = FastAPI()
# Components
telegram_client = TelegramClient(bot_token=settings.TELEGRAM_TOKEN)
event_queue = RedisQueue(settings.REDIS_URL)
agent_worker = AgentWorker(agent=ai_agent, queue=event_queue)
rate_limiter = RateLimiter(max_requests=20, period=60)
# Webhook endpoints
@app.post('/webhook/telegram')
async def telegram_webhook(request: Request):
event = telegram_client.parse_webhook_event(await request.json())
await rate_limiter.acquire(event.chat_id)
await event_queue.publish('telegram', event)
return {'status': 'queued'}
# Start worker on startup
@app.on_event('startup')
async def startup():
asyncio.create_task(agent_worker.start())

When I run this:

Terminal window
# Start the server
uvicorn main:app --host 0.0.0.0 --port 8000
# Send multiple messages rapidly
for i in {1..100}; do
curl -X POST http://localhost:8000/webhook/telegram \
-H "Content-Type: application/json" \
-d "{\"message\": {\"text\": \"Message $i\", \"chat\": {\"id\": 123}}}"
done
# All requests return immediately
{"status":"queued"}
{"status":"queued"}
...
# Workers process in background
# No timeouts, no duplicate messages

Why This Matters

The async architecture solves several problems:

  1. No timeouts - Webhook returns immediately
  2. Scalability - Workers handle bursts of traffic
  3. Reliability - Queue buffers incoming events
  4. Monitoring - Can track queue depth and processing time

Summary

In this post, I showed how to implement webhooks for AI agents using an async architecture with queues and workers. The key point is separating webhook reception from agent processing - validate quickly, queue immediately, process asynchronously. This gives you real-time responsiveness without blocking on LLM calls.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments