FastAPI Server-Sent Events
What are Server-Sent Events (SSE) in FastAPI?
Server-Sent Events (SSE) allow a server to push real-time updates to a client over an HTTP connection. Unlike WebSockets, which allow bidirectional communication, SSE is a one-way communication channel where the server sends updates, and the client listens. SSE is useful for real-time updates like notifications, live data feeds, or stock price updates. In FastAPI, SSE can be implemented using asynchronous generators.
How do you implement SSE in FastAPI?
SSE can be implemented in FastAPI by using asynchronous generators to yield data continuously to the client. The client listens to the server and receives data in real time. The response is formatted as a stream of events, with each event formatted according to the SSE protocol.
Example of implementing SSE in FastAPI:
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
import time
import asyncio
app = FastAPI()
async def event_stream():
while True:
# Simulate sending events every 2 seconds
await asyncio.sleep(2)
yield f"data: The time is {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
@app.get("/sse")
async def sse_endpoint(request: Request):
return StreamingResponse(event_stream(), media_type="text/event-stream")
In this example, the event_stream function generates a stream of events, sending the current time every 2 seconds to the client. The /sse endpoint returns a StreamingResponse with the text/event-stream content type, allowing the client to listen for updates.
How do you connect to an SSE endpoint from the client?
To connect to an SSE endpoint from the client, you use the JavaScript EventSource API, which listens for server-sent events from the specified endpoint. The client continuously receives events as long as the connection is open.
Example of connecting to an SSE endpoint from the client:
const eventSource = new EventSource("http://localhost:8000/sse");
eventSource.onmessage = function(event) {
console.log("New message from server:", event.data);
};
eventSource.onerror = function() {
console.error("Error occurred in SSE connection");
};
In this example, the client connects to the /sse endpoint using EventSource. The onmessage event handler logs the data received from the server, and the onerror handler logs any errors in the connection.
How do you handle client disconnection in SSE with FastAPI?
In FastAPI, you can handle client disconnection by checking if the client has closed the connection during the SSE stream. You can use request.is_disconnected() to detect when the client has disconnected and stop sending events.
Example of handling client disconnection:
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
import asyncio
app = FastAPI()
async def event_stream(request: Request):
while True:
# Check if the client has disconnected
if await request.is_disconnected():
break
await asyncio.sleep(2)
yield f"data: The time is {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
@app.get("/sse")
async def sse_endpoint(request: Request):
return StreamingResponse(event_stream(request), media_type="text/event-stream")
In this example, the event_stream function checks whether the client is still connected. If the client disconnects, the generator stops sending events, and the connection is closed.
How do you send custom events with SSE in FastAPI?
In addition to sending plain data with SSE, you can send custom events by specifying the event name. This allows the client to listen for different types of events using the addEventListener method on the client side.
Example of sending custom events:
async def event_stream():
while True:
await asyncio.sleep(2)
yield "event: customEvent\ndata: This is a custom event\n\n"
@app.get("/sse")
async def sse_endpoint():
return StreamingResponse(event_stream(), media_type="text/event-stream")
In this example, a custom event called customEvent is sent from the server. The client can listen for this event using addEventListener.
Example of listening for custom events on the client side:
const eventSource = new EventSource("http://localhost:8000/sse");
eventSource.addEventListener("customEvent", function(event) {
console.log("Custom event received:", event.data);
});
How do you handle reconnection in SSE?
SSE supports automatic reconnection in case the connection is lost. The client automatically tries to reconnect to the server if the connection is interrupted. You can configure the retry interval on the server side by sending the retry field with the desired interval (in milliseconds).
Example of configuring reconnection with SSE:
async def event_stream():
while True:
await asyncio.sleep(2)
yield "retry: 5000\n" # Set reconnection interval to 5 seconds
yield f"data: The time is {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
@app.get("/sse")
async def sse_endpoint():
return StreamingResponse(event_stream(), media_type="text/event-stream")
In this example, the server instructs the client to attempt reconnection after 5 seconds if the connection is lost.
How do you optimize SSE performance in FastAPI?
To optimize SSE performance in FastAPI, you can follow these best practices:
- Limit the frequency of data updates to avoid overwhelming the client and server with too many events.
- Use efficient data structures and background tasks to handle events and avoid blocking the event loop.
- Ensure the server is properly configured to handle multiple concurrent SSE connections, especially for high-traffic applications.
Example of optimizing the event stream frequency:
async def event_stream():
while True:
await asyncio.sleep(5) # Send events every 5 seconds to reduce load
yield f"data: The time is {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
In this example, the frequency of sending events is reduced to every 5 seconds, which helps reduce the load on the server.
How do you secure SSE endpoints in FastAPI?
SSE endpoints can be secured using the same mechanisms you would use for securing regular HTTP routes in FastAPI, such as authentication and authorization. You can add dependencies to check for valid tokens or session information before allowing the client to access the SSE endpoint.
Example of securing an SSE endpoint with an OAuth2 token:
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
from starlette.responses import StreamingResponse
import asyncio
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def event_stream():
while True:
await asyncio.sleep(2)
yield f"data: The time is {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
@app.get("/sse")
async def sse_endpoint(token: str = Depends(oauth2_scheme)):
return StreamingResponse(event_stream(), media_type="text/event-stream")
In this example, the SSE endpoint is secured with OAuth2, requiring the client to provide a valid token to access the event stream.
What are the advantages and limitations of SSE compared to WebSockets?
SSE is simpler to implement than WebSockets and is well-suited for unidirectional real-time updates (from server to client). However, SSE has limitations in terms of bidirectional communication and scalability for large-scale applications.
Advantages of SSE:
- Simple to implement with built-in browser support via
EventSource. - Uses HTTP, which works well with firewalls and proxies.
- Automatic reconnection is supported.
Limitations of SSE:
- Unidirectional: Can only send data from the server to the client (no client-to-server communication).
- Limited scalability for high-traffic applications compared to WebSockets.
- Not suitable for low-latency, bidirectional communication use cases like real-time multiplayer games or chat applications.