
"Streaming API Implementation Guide: Real-Time AI Responses with SSE"
When you use ChatGPT or Claude, you see text appearing word by word instead of waiting for the entire response to load. That's streaming — and implementing it in your own applications makes the difference between a chatbot that feels alive and one that feels broken.
This guide covers how streaming works under the hood, how to implement it with every major AI provider, and the common pitfalls that trip up developers.
Why Streaming Matters#
Without streaming, your application sends a request and waits. For a 500-word response from GPT-4, that's 5-15 seconds of staring at a loading spinner. With streaming, the first token arrives in under a second, and the user sees the response building in real time.
The impact on user experience is dramatic:
| Metric | Without Streaming | With Streaming |
|---|---|---|
| Time to First Token | 5-15 seconds | 0.3-1 second |
| Perceived Latency | High (feels slow) | Low (feels instant) |
| User Engagement | Users leave after 3s | Users stay and read |
| Error Recovery | All or nothing | Partial response visible |
Streaming uses Server-Sent Events (SSE), a simple HTTP-based protocol where the server pushes data to the client over a single long-lived connection.
How AI API Streaming Works#
The flow is straightforward:
- Client sends a POST request with
"stream": true - Server responds with
Content-Type: text/event-stream - Server sends chunks as
data: {...}\n\nlines - Each chunk contains a delta (partial token)
- Stream ends with
data: [DONE]\n\n
Here's what the raw SSE stream looks like:
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"Hello"},"index":0}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":" world"},"index":0}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"!"},"index":0}]}
data: [DONE]
Implementation: Python#
Using the OpenAI SDK (Works with Crazyrouter)#
The simplest approach — works with OpenAI, Claude, Gemini, and any provider accessible through Crazyrouter:
from openai import OpenAI
# Use Crazyrouter for access to 300+ models
client = OpenAI(
api_key="your-crazyrouter-api-key",
base_url="https://crazyrouter.com/v1"
)
# Streaming chat completion
stream = client.chat.completions.create(
model="gpt-4.1", # or "claude-sonnet-4-5", "gemini-2.5-flash", etc.
messages=[
{"role": "user", "content": "Explain quantum computing in simple terms"}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # newline at the end
Using Raw HTTP (requests + sseclient)#
For more control over the connection:
import requests
import json
API_KEY = "your-crazyrouter-api-key"
BASE_URL = "https://crazyrouter.com/v1"
response = requests.post(
f"{BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "gpt-4.1",
"messages": [{"role": "user", "content": "Write a haiku about coding"}],
"stream": True
},
stream=True # Important: enable response streaming
)
# Parse SSE stream
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
if data == "[DONE]":
break
chunk = json.loads(data)
content = chunk["choices"][0]["delta"].get("content", "")
print(content, end="", flush=True)
Async Streaming (for web servers)#
If you're building a FastAPI or async application:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key="your-crazyrouter-api-key",
base_url="https://crazyrouter.com/v1"
)
async def stream_response(prompt: str):
stream = await client.chat.completions.create(
model="gpt-4.1",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_response += content
yield content # Yield each chunk to the caller
return full_response
# Usage in FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/chat")
async def chat(prompt: str):
return StreamingResponse(
stream_response(prompt),
media_type="text/event-stream"
)
Implementation: Node.js#
Using the OpenAI SDK#
import OpenAI from 'openai';
const client = new OpenAI({
apiKey: 'your-crazyrouter-api-key',
baseURL: 'https://crazyrouter.com/v1'
});
async function streamChat(prompt) {
const stream = await client.chat.completions.create({
model: 'gpt-4.1',
messages: [{ role: 'user', content: prompt }],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
process.stdout.write(content);
}
console.log(); // newline
}
await streamChat('Explain REST APIs in 3 sentences');
Express.js Server with Streaming#
import express from 'express';
import OpenAI from 'openai';
const app = express();
const client = new OpenAI({
apiKey: 'your-crazyrouter-api-key',
baseURL: 'https://crazyrouter.com/v1'
});
app.get('/api/chat', async (req, res) => {
const { prompt } = req.query;
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = await client.chat.completions.create({
model: 'gpt-4.1',
messages: [{ role: 'user', content: prompt }],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
});
app.listen(3000);
Frontend: Consuming the Stream#
// Browser-side code
async function streamFromAPI(prompt) {
const response = await fetch(`/api/chat?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let fullText = '';
const outputElement = document.getElementById('output');
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
const data = JSON.parse(line.slice(6));
fullText += data.content;
outputElement.textContent = fullText;
}
}
}
}
Implementation: cURL#
For testing and debugging:
curl -N -X POST https://crazyrouter.com/v1/chat/completions \
-H "Authorization: Bearer your-crazyrouter-api-key" \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4.1",
"messages": [{"role": "user", "content": "Count from 1 to 10"}],
"stream": true
}'
The -N flag disables buffering so you see chunks as they arrive.
Handling Edge Cases#
1. Connection Drops#
Streams can disconnect. Always implement reconnection logic:
import time
def stream_with_retry(prompt, max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="gpt-4.1",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
yield chunk.choices[0].delta.content or ""
return # Success
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise e
2. Token Counting During Streaming#
You don't get token counts until the stream ends. Track them from the final chunk:
total_tokens = 0
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="")
if chunk.usage: # Final chunk includes usage
total_tokens = chunk.usage.total_tokens
print(f"\nTotal tokens: {total_tokens}")
3. Timeout Handling#
Set appropriate timeouts for long-running streams:
client = OpenAI(
api_key="your-key",
base_url="https://crazyrouter.com/v1",
timeout=120.0 # 2 minutes for long responses
)
4. Backpressure#
If your consumer is slower than the producer (e.g., writing to a database), buffer appropriately:
import asyncio
from collections import deque
buffer = deque(maxlen=1000)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
buffer.append(content)
if len(buffer) > 100:
# Flush buffer to storage
await flush_to_db("".join(buffer))
buffer.clear()
Provider-Specific Notes#
OpenAI (GPT-4.1, o4-mini)#
Standard SSE format. Supports stream_options: {"include_usage": true} to get token counts in the final chunk.
Anthropic (Claude)#
Claude uses a slightly different SSE event format with event types (message_start, content_block_delta, message_stop). When accessed through Crazyrouter, this is normalized to the OpenAI format automatically.
Google (Gemini)#
Gemini's native streaming uses a different protocol. Through Crazyrouter, it's converted to standard OpenAI SSE format.
Using Crazyrouter for Unified Streaming#
The biggest advantage of using Crazyrouter for streaming is format consistency. Regardless of whether you're streaming from GPT-4.1, Claude, Gemini, or DeepSeek, the SSE format is identical — OpenAI-compatible. No need to handle provider-specific quirks.
# Same code works for ANY model
for model in ["gpt-4.1", "claude-sonnet-4-5", "gemini-2.5-flash", "deepseek-v3"]:
stream = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": "Hello!"}],
stream=True
)
for chunk in stream:
print(chunk.choices[0].delta.content or "", end="")
print(f"\n--- {model} done ---")
FAQ#
Does streaming cost more than non-streaming?#
No. Streaming and non-streaming requests cost the same number of tokens. The only difference is how the response is delivered.
Can I use streaming with function calling?#
Yes. Function call arguments are streamed as deltas just like content. You'll receive partial JSON that you need to concatenate before parsing.
What's the maximum stream duration?#
Most providers allow streams up to 5-10 minutes. For very long responses (>4000 tokens), make sure your client timeout is set appropriately.
Does streaming work with all AI models?#
Most modern LLMs support streaming. Through Crazyrouter, streaming is available for 300+ models including all major providers.
How do I handle streaming in a React frontend?#
Use the fetch API with ReadableStream, or libraries like ai (Vercel AI SDK) which handle SSE parsing automatically.
Summary#
Streaming transforms AI applications from "wait and hope" to "watch it think." The implementation is straightforward — set stream: true, parse SSE chunks, and render incrementally.
For the simplest multi-provider streaming setup, Crazyrouter normalizes all providers to the OpenAI SSE format. One API key, consistent streaming behavior across 300+ models. Get started at crazyrouter.com.


