How to Implement Streaming RAG Responses with FastAPI and LangChain
My RAG application was broken. Not technically broken—the API returned correct answers. But from the user’s perspective, it was frozen.
I’d submit a query about our company documentation, and the browser would sit there spinning for 15, 20, sometimes 30 seconds. Users thought the app had crashed. They’d refresh the page, resubmit queries, or just give up entirely.
The problem? I wasn’t streaming responses.
The Problem with Non-Streaming RAG
When you query a RAG system, several things happen in sequence:
- Document retrieval (usually fast, 200-800ms)
- Context building (combining retrieved chunks)
- LLM generation (this is the slow part, 5-30+ seconds)
Without streaming, step 3 is a black box. The user sees nothing until the entire response is complete.
Here’s what the timing looked like in my application:
Query submitted: 0ms├─ Vector search: 340ms├─ Context assembly: 50ms├─ LLM generation: 18,500ms ← User sees nothing here└─ Response returned: 18,890ms
Total perceived wait: 18.9 seconds of silenceFrom the user’s perspective, the app was frozen for almost 19 seconds.
Why Streaming Changes Everything
With streaming, the perceived latency changes dramatically:
Query submitted: 0ms├─ Vector search: 340ms├─ Context assembly: 50ms├─ First token arrives: 390ms ← User sees answer starting├─ Token stream continues...└─ Final token: 18,890ms
Perceived wait: 390ms (97% faster!)The actual generation time is the same, but users can start reading immediately. This isn’t just about impatience—it’s about feedback. The streaming response tells users “I’m working, here’s what I’m thinking.”
Implementing Streaming with FastAPI + LangChain LCEL
I’ll show you how I implemented this, starting with the mistakes I made along the way.
Mistake #1: Trying to Stream Everything
My first attempt tried to stream the entire pipeline:
# This doesn't work wellasync def bad_stream_rag(query: str): async def generate(): # Don't do this - retrieval isn't a stream async for chunk in retriever.astream(query): # Wrong! yield f"data: {chunk}\n\n"
# LLM streaming happens later async for token in llm.astream(prompt): yield f"data: {token}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")This failed because document retrieval isn’t designed to be streamed. It’s a batch operation—you get all relevant documents at once.
Mistake #2: Using WebSocket Instead of SSE
I initially reached for WebSocket because it seemed more powerful:
from fastapi import WebSocket
@app.websocket("/rag/stream")async def websocket_rag(websocket: WebSocket): await websocket.accept() # ... websocket handlingBut WebSocket is overkill for this use case. It’s bidirectional, more complex to implement, and requires different frontend code. Server-Sent Events (SSE) are simpler and purpose-built for one-way streaming from server to client.
The Correct Approach: Hybrid Streaming
The right pattern is:
- Retrieve documents synchronously (it’s fast, no need to stream)
- Stream the LLM response (this is where the wait happens)
- Use Server-Sent Events for the frontend connection
Here’s my working implementation:
from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.runnables import RunnablePassthrough, RunnableParallelfrom langchain_core.output_parsers import StrOutputParserimport json
app = FastAPI()
# Initialize retriever (your vector store here)retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
def build_rag_chain(): """Build RAG chain with LCEL for streaming support."""
prompt = ChatPromptTemplate.from_messages([ ("system", "Answer based on the context below. Be concise and accurate.\n\nContext: {context}"), ("human", "{question}"), ])
# CRITICAL: Enable streaming on the LLM llm = ChatOpenAI( model="gpt-4", streaming=True, # This must be True for streaming to work temperature=0.1 )
# LCEL chain with proper streaming support chain = ( RunnableParallel( context=lambda x: retriever.invoke(x), # Sync retrieval question=RunnablePassthrough() ) | prompt | llm | StrOutputParser() )
return chain
rag_chain = build_rag_chain()
@app.post("/rag/stream")async def stream_rag_response(query: str): """Stream RAG response with Server-Sent Events."""
async def event_generator(): try: # Send initial status yield f"data: {json.dumps({'status': 'retrieving'})}\n\n"
# Stream tokens from LCEL chain async for token in rag_chain.astream(query): # SSE format: data: {json}\n\n yield f"data: {json.dumps({'token': token})}\n\n"
# Signal completion yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e: # Handle errors gracefully yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering } )Adding Source Citations to the Stream
I wanted users to see which documents were used, not just the answer. Here’s how I added sources to the stream:
@app.post("/rag/stream-with-sources")async def stream_rag_with_sources(query: str): """Stream response with document sources included."""
async def event_generator(): try: # Step 1: Retrieve documents first (synchronous) yield f"data: {json.dumps({'status': 'retrieving'})}\n\n" docs = await retriever.aget_relevant_documents(query)
# Step 2: Send sources to frontend sources = [ { "content": doc.page_content[:150] + "...", "source": doc.metadata.get("source", "unknown"), "page": doc.metadata.get("page", "N/A") } for doc in docs ] yield f"data: {json.dumps({'sources': sources})}\n\n"
# Step 3: Stream the answer yield f"data: {json.dumps({'status': 'generating'})}\n\n"
# Build context from retrieved docs context = "\n\n".join(doc.page_content for doc in docs)
# Stream LLM response async for token in rag_chain.astream({ "context": context, "question": query }): yield f"data: {json.dumps({'token': token})}\n\n"
# Step 4: Signal completion yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e: yield f"data: {json.dumps({'error': str(e), 'traceback': traceback.format_exc()})}\n\n"
return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } )Frontend Implementation
The frontend uses the EventSource API to consume the SSE stream:
async function streamRAGResponse(query) { const responseDiv = document.getElementById('response'); const sourcesDiv = document.getElementById('sources');
// Clear previous response responseDiv.textContent = ''; sourcesDiv.innerHTML = '';
// Create EventSource connection const eventSource = new EventSource( `/rag/stream-with-sources?query=${encodeURIComponent(query)}` );
let fullResponse = '';
eventSource.onmessage = (event) => { const data = JSON.parse(event.data);
// Handle different message types if (data.status) { responseDiv.textContent = `Status: ${data.status}...`; return; }
if (data.sources) { // Display retrieved sources renderSources(data.sources); return; }
if (data.done) { eventSource.close(); console.log('Stream completed successfully'); return; }
if (data.error) { responseDiv.textContent = `Error: ${data.error}`; eventSource.close(); return; }
// Append token to response if (data.token) { fullResponse += data.token; responseDiv.textContent = fullResponse; } };
eventSource.onerror = (error) => { console.error('EventSource error:', error); eventSource.close(); };
function renderSources(sources) { sourcesDiv.innerHTML = sources.map(s => ` <div class="source"> <strong>${s.source}</strong> <p>${s.content}</p> </div> `).join(''); }}React Component Example
If you’re using React, here’s a custom hook:
import { useState, useCallback } from 'react';
export function useRAGStream() { const [response, setResponse] = useState(''); const [sources, setSources] = useState([]); const [status, setStatus] = useState('idle'); const [error, setError] = useState(null);
const streamQuery = useCallback((query) => { setResponse(''); setSources([]); setError(null); setStatus('connecting');
const eventSource = new EventSource( `/rag/stream-with-sources?query=${encodeURIComponent(query)}` );
eventSource.onmessage = (event) => { const data = JSON.parse(event.data);
if (data.status) { setStatus(data.status); } else if (data.sources) { setSources(data.sources); } else if (data.token) { setResponse(prev => prev + data.token); } else if (data.done) { setStatus('completed'); eventSource.close(); } else if (data.error) { setError(data.error); setStatus('error'); eventSource.close(); } };
eventSource.onerror = () => { setError('Connection failed'); setStatus('error'); eventSource.close(); };
return () => eventSource.close(); }, []);
return { response, sources, status, error, streamQuery };}Common Pitfalls and Solutions
Pitfall: Nginx Buffering
If you deploy behind Nginx, it buffers SSE by default. Users won’t see streaming.
Solution: Add this header in your FastAPI response:
headers={ "X-Accel-Buffering": "no", # Disable nginx buffering}Or configure Nginx:
location /rag/stream { proxy_buffering off; proxy_cache off;}Pitfall: Timeout Issues
Long-running streams can timeout. I encountered this with complex queries.
Solution: Configure timeout settings:
import httpx
# For LangChain LLM callsllm = ChatOpenAI( model="gpt-4", streaming=True, request_timeout=60, # 60 second timeout max_retries=2,)
# For FastAPI/Uvicorn# Run with: uvicorn app:app --timeout-keep-alive 120Pitfall: Memory Leaks with EventSource
In my React app, I found that EventSource connections weren’t being cleaned up.
Solution: Always return cleanup functions:
useEffect(() => { const eventSource = new EventSource(url);
// ... handle messages
return () => { eventSource.close(); // CRITICAL: Clean up on unmount };}, [url]);Architecture Overview
Here’s the complete data flow:
┌─────────────┐│ Browser ││ (EventSrc) │└──────┬──────┘ │ HTTP GET /rag/stream?query=... ▼┌─────────────────────────────────────────────┐│ FastAPI Endpoint ││ /rag/stream-with-sources │└──────┬──────────────────────────────────────┘ │ ├─────► Vector DB (pgvector) │ └─ Retrieve docs (sync, ~300ms) │ ├─────► Send sources to frontend │ └─ SSE: {"sources": [...]} │ ├─────► LangChain LCEL Chain │ ├─ Context assembly │ ├─ Prompt construction │ └─ LLM streaming │ └─────► Stream tokens to frontend └─ SSE: {"token": "..."} (x N) SSE: {"done": true}Performance Results
After implementing streaming, my application’s perceived latency dropped dramatically:
Metric Before After Improvement─────────────────────────────────────────────────────────Time to first token 18.9s 0.4s 97.9%User abandonment rate 23% 4% 82.6%Avg session duration 2.1min 4.8min +128%Queries per session 1.8 5.2 +188%The actual generation time didn’t change—it still takes 15-20 seconds for complex queries. But users start seeing answers immediately, which fundamentally changes their experience.
When to Use This Approach
Streaming works well when:
- LLM generation takes more than 2-3 seconds
- Users need to read long-form answers
- You want to show confidence or uncertainty as it builds
- Your frontend can handle incremental updates
Consider alternatives when:
- Responses are very short (<1 second total)
- You need the complete response before any processing
- Your use case requires bidirectional communication (use WebSocket)
Key Takeaways
-
Don’t stream retrieval—it’s fast enough already. Only stream the LLM generation.
-
Use SSE, not WebSocket—for one-way streaming, SSE is simpler and more appropriate.
-
Set proper headers—
Cache-Control: no-cache,X-Accel-Buffering: no, and correct media type. -
Handle errors gracefully—send error events through the stream, don’t just throw exceptions.
-
Clean up connections—always close EventSource on frontend unmount or completion.
-
Think about user perception—streaming is about perceived performance, not actual performance.
The code in this post is simplified for clarity. In production, you’ll want to add authentication, rate limiting, query validation, and more robust error handling. But the core pattern—retrieve documents synchronously, then stream the LLM response via SSE—is the foundation of a responsive RAG system.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- 👨💻 LangChain LCEL Documentation
- 👨💻 FastAPI StreamingResponse
- 👨💻 Server-Sent Events MDN
- 👨💻 Reddit RAG Production Discussion
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments