Skip to content

How to Implement Streaming RAG Responses with FastAPI and LangChain

My RAG application was broken. Not technically broken—the API returned correct answers. But from the user’s perspective, it was frozen.

I’d submit a query about our company documentation, and the browser would sit there spinning for 15, 20, sometimes 30 seconds. Users thought the app had crashed. They’d refresh the page, resubmit queries, or just give up entirely.

The problem? I wasn’t streaming responses.

The Problem with Non-Streaming RAG

When you query a RAG system, several things happen in sequence:

  1. Document retrieval (usually fast, 200-800ms)
  2. Context building (combining retrieved chunks)
  3. LLM generation (this is the slow part, 5-30+ seconds)

Without streaming, step 3 is a black box. The user sees nothing until the entire response is complete.

Here’s what the timing looked like in my application:

Response timing breakdown
Query submitted: 0ms
├─ Vector search: 340ms
├─ Context assembly: 50ms
├─ LLM generation: 18,500ms ← User sees nothing here
└─ Response returned: 18,890ms
Total perceived wait: 18.9 seconds of silence

From the user’s perspective, the app was frozen for almost 19 seconds.

Why Streaming Changes Everything

With streaming, the perceived latency changes dramatically:

Streaming response timing
Query submitted: 0ms
├─ Vector search: 340ms
├─ Context assembly: 50ms
├─ First token arrives: 390ms ← User sees answer starting
├─ Token stream continues...
└─ Final token: 18,890ms
Perceived wait: 390ms (97% faster!)

The actual generation time is the same, but users can start reading immediately. This isn’t just about impatience—it’s about feedback. The streaming response tells users “I’m working, here’s what I’m thinking.”

Implementing Streaming with FastAPI + LangChain LCEL

I’ll show you how I implemented this, starting with the mistakes I made along the way.

Mistake #1: Trying to Stream Everything

My first attempt tried to stream the entire pipeline:

mistake_1_stream_everything.py
# This doesn't work well
async def bad_stream_rag(query: str):
async def generate():
# Don't do this - retrieval isn't a stream
async for chunk in retriever.astream(query): # Wrong!
yield f"data: {chunk}\n\n"
# LLM streaming happens later
async for token in llm.astream(prompt):
yield f"data: {token}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")

This failed because document retrieval isn’t designed to be streamed. It’s a batch operation—you get all relevant documents at once.

Mistake #2: Using WebSocket Instead of SSE

I initially reached for WebSocket because it seemed more powerful:

mistake_2_websocket.py
from fastapi import WebSocket
@app.websocket("/rag/stream")
async def websocket_rag(websocket: WebSocket):
await websocket.accept()
# ... websocket handling

But WebSocket is overkill for this use case. It’s bidirectional, more complex to implement, and requires different frontend code. Server-Sent Events (SSE) are simpler and purpose-built for one-way streaming from server to client.

The Correct Approach: Hybrid Streaming

The right pattern is:

  1. Retrieve documents synchronously (it’s fast, no need to stream)
  2. Stream the LLM response (this is where the wait happens)
  3. Use Server-Sent Events for the frontend connection

Here’s my working implementation:

streaming_rag.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
import json
app = FastAPI()
# Initialize retriever (your vector store here)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
def build_rag_chain():
"""Build RAG chain with LCEL for streaming support."""
prompt = ChatPromptTemplate.from_messages([
("system", "Answer based on the context below. Be concise and accurate.\n\nContext: {context}"),
("human", "{question}"),
])
# CRITICAL: Enable streaming on the LLM
llm = ChatOpenAI(
model="gpt-4",
streaming=True, # This must be True for streaming to work
temperature=0.1
)
# LCEL chain with proper streaming support
chain = (
RunnableParallel(
context=lambda x: retriever.invoke(x), # Sync retrieval
question=RunnablePassthrough()
)
| prompt
| llm
| StrOutputParser()
)
return chain
rag_chain = build_rag_chain()
@app.post("/rag/stream")
async def stream_rag_response(query: str):
"""Stream RAG response with Server-Sent Events."""
async def event_generator():
try:
# Send initial status
yield f"data: {json.dumps({'status': 'retrieving'})}\n\n"
# Stream tokens from LCEL chain
async for token in rag_chain.astream(query):
# SSE format: data: {json}\n\n
yield f"data: {json.dumps({'token': token})}\n\n"
# Signal completion
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
# Handle errors gracefully
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)

Adding Source Citations to the Stream

I wanted users to see which documents were used, not just the answer. Here’s how I added sources to the stream:

streaming_with_sources.py
@app.post("/rag/stream-with-sources")
async def stream_rag_with_sources(query: str):
"""Stream response with document sources included."""
async def event_generator():
try:
# Step 1: Retrieve documents first (synchronous)
yield f"data: {json.dumps({'status': 'retrieving'})}\n\n"
docs = await retriever.aget_relevant_documents(query)
# Step 2: Send sources to frontend
sources = [
{
"content": doc.page_content[:150] + "...",
"source": doc.metadata.get("source", "unknown"),
"page": doc.metadata.get("page", "N/A")
}
for doc in docs
]
yield f"data: {json.dumps({'sources': sources})}\n\n"
# Step 3: Stream the answer
yield f"data: {json.dumps({'status': 'generating'})}\n\n"
# Build context from retrieved docs
context = "\n\n".join(doc.page_content for doc in docs)
# Stream LLM response
async for token in rag_chain.astream({
"context": context,
"question": query
}):
yield f"data: {json.dumps({'token': token})}\n\n"
# Step 4: Signal completion
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e), 'traceback': traceback.format_exc()})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)

Frontend Implementation

The frontend uses the EventSource API to consume the SSE stream:

rag_frontend.js
async function streamRAGResponse(query) {
const responseDiv = document.getElementById('response');
const sourcesDiv = document.getElementById('sources');
// Clear previous response
responseDiv.textContent = '';
sourcesDiv.innerHTML = '';
// Create EventSource connection
const eventSource = new EventSource(
`/rag/stream-with-sources?query=${encodeURIComponent(query)}`
);
let fullResponse = '';
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
// Handle different message types
if (data.status) {
responseDiv.textContent = `Status: ${data.status}...`;
return;
}
if (data.sources) {
// Display retrieved sources
renderSources(data.sources);
return;
}
if (data.done) {
eventSource.close();
console.log('Stream completed successfully');
return;
}
if (data.error) {
responseDiv.textContent = `Error: ${data.error}`;
eventSource.close();
return;
}
// Append token to response
if (data.token) {
fullResponse += data.token;
responseDiv.textContent = fullResponse;
}
};
eventSource.onerror = (error) => {
console.error('EventSource error:', error);
eventSource.close();
};
function renderSources(sources) {
sourcesDiv.innerHTML = sources.map(s => `
<div class="source">
<strong>${s.source}</strong>
<p>${s.content}</p>
</div>
`).join('');
}
}

React Component Example

If you’re using React, here’s a custom hook:

useRAGStream.js
import { useState, useCallback } from 'react';
export function useRAGStream() {
const [response, setResponse] = useState('');
const [sources, setSources] = useState([]);
const [status, setStatus] = useState('idle');
const [error, setError] = useState(null);
const streamQuery = useCallback((query) => {
setResponse('');
setSources([]);
setError(null);
setStatus('connecting');
const eventSource = new EventSource(
`/rag/stream-with-sources?query=${encodeURIComponent(query)}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.status) {
setStatus(data.status);
} else if (data.sources) {
setSources(data.sources);
} else if (data.token) {
setResponse(prev => prev + data.token);
} else if (data.done) {
setStatus('completed');
eventSource.close();
} else if (data.error) {
setError(data.error);
setStatus('error');
eventSource.close();
}
};
eventSource.onerror = () => {
setError('Connection failed');
setStatus('error');
eventSource.close();
};
return () => eventSource.close();
}, []);
return { response, sources, status, error, streamQuery };
}

Common Pitfalls and Solutions

Pitfall: Nginx Buffering

If you deploy behind Nginx, it buffers SSE by default. Users won’t see streaming.

Solution: Add this header in your FastAPI response:

nginx_buffering_fix.py
headers={
"X-Accel-Buffering": "no", # Disable nginx buffering
}

Or configure Nginx:

nginx.conf
location /rag/stream {
proxy_buffering off;
proxy_cache off;
}

Pitfall: Timeout Issues

Long-running streams can timeout. I encountered this with complex queries.

Solution: Configure timeout settings:

timeout_config.py
import httpx
# For LangChain LLM calls
llm = ChatOpenAI(
model="gpt-4",
streaming=True,
request_timeout=60, # 60 second timeout
max_retries=2,
)
# For FastAPI/Uvicorn
# Run with: uvicorn app:app --timeout-keep-alive 120

Pitfall: Memory Leaks with EventSource

In my React app, I found that EventSource connections weren’t being cleaned up.

Solution: Always return cleanup functions:

cleanup_eventsource.js
useEffect(() => {
const eventSource = new EventSource(url);
// ... handle messages
return () => {
eventSource.close(); // CRITICAL: Clean up on unmount
};
}, [url]);

Architecture Overview

Here’s the complete data flow:

Streaming RAG Architecture
┌─────────────┐
│ Browser │
│ (EventSrc) │
└──────┬──────┘
│ HTTP GET /rag/stream?query=...
┌─────────────────────────────────────────────┐
│ FastAPI Endpoint │
│ /rag/stream-with-sources │
└──────┬──────────────────────────────────────┘
├─────► Vector DB (pgvector)
│ └─ Retrieve docs (sync, ~300ms)
├─────► Send sources to frontend
│ └─ SSE: {"sources": [...]}
├─────► LangChain LCEL Chain
│ ├─ Context assembly
│ ├─ Prompt construction
│ └─ LLM streaming
└─────► Stream tokens to frontend
└─ SSE: {"token": "..."} (x N)
SSE: {"done": true}

Performance Results

After implementing streaming, my application’s perceived latency dropped dramatically:

Before vs After Streaming
Metric Before After Improvement
─────────────────────────────────────────────────────────
Time to first token 18.9s 0.4s 97.9%
User abandonment rate 23% 4% 82.6%
Avg session duration 2.1min 4.8min +128%
Queries per session 1.8 5.2 +188%

The actual generation time didn’t change—it still takes 15-20 seconds for complex queries. But users start seeing answers immediately, which fundamentally changes their experience.

When to Use This Approach

Streaming works well when:

  • LLM generation takes more than 2-3 seconds
  • Users need to read long-form answers
  • You want to show confidence or uncertainty as it builds
  • Your frontend can handle incremental updates

Consider alternatives when:

  • Responses are very short (<1 second total)
  • You need the complete response before any processing
  • Your use case requires bidirectional communication (use WebSocket)

Key Takeaways

  1. Don’t stream retrieval—it’s fast enough already. Only stream the LLM generation.

  2. Use SSE, not WebSocket—for one-way streaming, SSE is simpler and more appropriate.

  3. Set proper headersCache-Control: no-cache, X-Accel-Buffering: no, and correct media type.

  4. Handle errors gracefully—send error events through the stream, don’t just throw exceptions.

  5. Clean up connections—always close EventSource on frontend unmount or completion.

  6. Think about user perception—streaming is about perceived performance, not actual performance.

The code in this post is simplified for clarity. In production, you’ll want to add authentication, rate limiting, query validation, and more robust error handling. But the core pattern—retrieve documents synchronously, then stream the LLM response via SSE—is the foundation of a responsive RAG system.


Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments