🎯

chatkit-python

🎯Skill

from naimalarain13/hackathon-ii_the-evolution-of-todo

VibeIndex|
What it does

Implements custom chat API backends with FastAPI, supporting SSE streaming, conversation persistence, and multi-model integration via LiteLLM.

chatkit-python

Installation

Install skill:
npx skills add https://github.com/naimalarain13/hackathon-ii_the-evolution-of-todo --skill chatkit-python
5
Last UpdatedJan 22, 2026

Skill Details

SKILL.md

Overview

---

name: chatkit-python

description: Build custom chat API backends for OpenAI ChatKit frontend using FastAPI. Covers chat endpoint implementation, SSE streaming, conversation persistence, and integration with OpenAI Agents SDK + Gemini via LiteLLM.

---

# ChatKit Python Backend Skill

Build custom chat API backends that work with OpenAI ChatKit frontend.

Architecture

```

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”

β”‚ Frontend (ChatKit-JS) β”‚

β”‚ useChatKit({ api: { url, domainKey } }) β”‚

β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

β”‚

β”‚ HTTP POST /api/chat

β”‚ (SSE streaming response)

β–Ό

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”

β”‚ FastAPI Backend β”‚

β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚

β”‚ β”‚ /api/chat Endpoint β”‚ β”‚

β”‚ β”‚ β€’ Parse ChatKit request β”‚ β”‚

β”‚ β”‚ β€’ Run Agent with MCP tools β”‚ β”‚

β”‚ β”‚ β€’ Stream response via SSE β”‚ β”‚

β”‚ β”‚ β€’ Persist conversation to DB β”‚ β”‚

β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚

β”‚ β”‚ β”‚

β”‚ β–Ό β”‚

β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚

β”‚ β”‚ OpenAI Agents β”‚ β”‚ MCP Server β”‚ β”‚ Neon Database β”‚ β”‚

β”‚ β”‚ SDK + Gemini β”‚ β”‚ (Todo Tools) β”‚ β”‚ (Conversations) β”‚ β”‚

β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚

β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

```

Quick Start

Installation

```bash

pip install fastapi uvicorn sse-starlette "openai-agents[litellm]"

# Or with uv

uv add fastapi uvicorn sse-starlette "openai-agents[litellm]"

```

Environment Variables

```env

# Database

DATABASE_URL=postgresql://user:password@host/database

# Gemini API

GOOGLE_API_KEY=your-gemini-api-key

# CORS (frontend URL)

CORS_ORIGINS=http://localhost:3000,https://your-app.vercel.app

```

Reference

| Pattern | Guide |

|---------|-------|

| Chat Endpoint | [reference/chat-endpoint.md](reference/chat-endpoint.md) |

| SSE Streaming | [reference/sse-streaming.md](reference/sse-streaming.md) |

| Conversation Persistence | [reference/conversation-persistence.md](reference/conversation-persistence.md) |

Examples

| Example | Description |

|---------|-------------|

| [examples/chat-server.md](examples/chat-server.md) | Complete chat server implementation |

Templates

| Template | Purpose |

|----------|---------|

| [templates/chat_router.py](templates/chat_router.py) | FastAPI chat endpoint router |

| [templates/models.py](templates/models.py) | Database models for conversations |

Basic Chat Endpoint

```python

from fastapi import FastAPI, Request

from fastapi.middleware.cors import CORSMiddleware

from sse_starlette.sse import EventSourceResponse

from pydantic import BaseModel

from typing import Optional, List, AsyncGenerator

import json

app = FastAPI()

# CORS for ChatKit frontend

app.add_middleware(

CORSMiddleware,

allow_origins=["http://localhost:3000"],

allow_credentials=True,

allow_methods=["*"],

allow_headers=["*"],

)

class ChatMessage(BaseModel):

role: str # "user" or "assistant"

content: str

class ChatRequest(BaseModel):

messages: List[ChatMessage]

thread_id: Optional[str] = None

user_id: Optional[str] = None

async def generate_response(messages: List[ChatMessage]) -> AsyncGenerator[str, None]:

"""Generate streaming response from agent."""

# Your agent logic here

response = "Hello! How can I help you?"

# Stream response word by word

for word in response.split():

yield f"data: {json.dumps({'content': word + ' '})}\n\n"

yield f"data: {json.dumps({'done': True})}\n\n"

@app.post("/api/chat")

async def chat_endpoint(request: ChatRequest):

"""Chat endpoint with SSE streaming."""

return EventSourceResponse(

generate_response(request.messages),

media_type="text/event-stream",

)

```

Chat Endpoint with Agent

```python

import os

from fastapi import FastAPI, Request, HTTPException

from sse_starlette.sse import EventSourceResponse

from pydantic import BaseModel

from typing import Optional, List, AsyncGenerator

import json

from agents import Agent, Runner, set_tracing_disabled

from agents.mcp import MCPServerStreamableHttp

from agents.extensions.models.litellm_model import LitellmModel

set_tracing_disabled(disabled=True)

app = FastAPI()

class ChatMessage(BaseModel):

role: str

content: str

class ChatRequest(BaseModel):

messages: List[ChatMessage]

thread_id: Optional[str] = None

user_id: str

# Global MCP server connection (managed via lifespan)

mcp_server = None

@app.on_event("startup")

async def startup():

global mcp_server

mcp_server = MCPServerStreamableHttp(

name="Todo MCP",

params={"url": "http://localhost:8000/api/mcp", "timeout": 30},

cache_tools_list=True,

)

await mcp_server.__aenter__()

@app.on_event("shutdown")

async def shutdown():

global mcp_server

if mcp_server:

await mcp_server.__aexit__(None, None, None)

def create_agent(user_id: str):

"""Create agent with user context."""

return Agent(

name="Todo Assistant",

instructions=f"""You are a task management assistant for user {user_id}.

Use the MCP tools to help manage tasks:

  • add_task: Create new tasks
  • list_tasks: View tasks
  • complete_task: Mark done
  • delete_task: Remove tasks

Always pass user_id="{user_id}" to tools.""",

model=LitellmModel(

model="gemini/gemini-2.0-flash",

api_key=os.getenv("GOOGLE_API_KEY"),

),

mcp_servers=[mcp_server] if mcp_server else [],

)

async def stream_agent_response(

agent: Agent,

user_message: str,

) -> AsyncGenerator[str, None]:

"""Stream agent response as SSE events."""

try:

result = Runner.run_streamed(agent, user_message)

async for event in result.stream_events():

if hasattr(event, 'item') and event.item:

yield f"data: {json.dumps({'content': str(event.item)})}\n\n"

# Final response

final = result.final_output

yield f"data: {json.dumps({'content': final, 'done': True})}\n\n"

except Exception as e:

yield f"data: {json.dumps({'error': str(e)})}\n\n"

@app.post("/api/chat")

async def chat(request: ChatRequest):

"""Chat endpoint with streaming response."""

if not request.messages:

raise HTTPException(400, "No messages provided")

# Get last user message

user_message = request.messages[-1].content

# Create agent for this user

agent = create_agent(request.user_id)

return EventSourceResponse(

stream_agent_response(agent, user_message),

media_type="text/event-stream",

)

```

SSE Event Format

ChatKit expects Server-Sent Events in this format:

```python

# Content chunk

yield f"data: {json.dumps({'content': 'partial response'})}\n\n"

# Done signal

yield f"data: {json.dumps({'done': True})}\n\n"

# Error

yield f"data: {json.dumps({'error': 'error message'})}\n\n"

# Tool call (optional)

yield f"data: {json.dumps({'tool_call': {'name': 'add_task', 'result': {...}}})}\n\n"

```

Conversation Persistence

```python

from sqlmodel import SQLModel, Field, Session

from datetime import datetime

from typing import Optional

class Conversation(SQLModel, table=True):

id: Optional[int] = Field(default=None, primary_key=True)

thread_id: str = Field(index=True, unique=True)

user_id: str = Field(index=True)

created_at: datetime = Field(default_factory=datetime.utcnow)

updated_at: datetime = Field(default_factory=datetime.utcnow)

class Message(SQLModel, table=True):

id: Optional[int] = Field(default=None, primary_key=True)

thread_id: str = Field(index=True)

role: str # "user" or "assistant"

content: str

created_at: datetime = Field(default_factory=datetime.utcnow)

async def save_message(thread_id: str, role: str, content: str):

"""Save message to database."""

with Session(engine) as session:

message = Message(thread_id=thread_id, role=role, content=content)

session.add(message)

session.commit()

async def load_conversation(thread_id: str) -> List[dict]:

"""Load conversation history."""

with Session(engine) as session:

messages = session.exec(

select(Message)

.where(Message.thread_id == thread_id)

.order_by(Message.created_at)

).all()

return [{"role": m.role, "content": m.content} for m in messages]

```

Authentication Integration

```python

from fastapi import Depends, HTTPException, Header

async def get_current_user(authorization: str = Header(...)) -> dict:

"""Verify JWT and extract user."""

if not authorization.startswith("Bearer "):

raise HTTPException(401, "Invalid authorization header")

token = authorization[7:]

user = await verify_jwt_token(token) # Your JWT verification

if not user:

raise HTTPException(401, "Invalid token")

return user

@app.post("/api/chat")

async def chat(

request: ChatRequest,

user: dict = Depends(get_current_user),

):

"""Authenticated chat endpoint."""

# Use user["id"] as user_id

agent = create_agent(user["id"])

# ...

```

CORS Configuration

```python

from fastapi.middleware.cors import CORSMiddleware

# Get from environment

CORS_ORIGINS = os.getenv("CORS_ORIGINS", "http://localhost:3000").split(",")

app.add_middleware(

CORSMiddleware,

allow_origins=CORS_ORIGINS,

allow_credentials=True,

allow_methods=["*"],

allow_headers=["*"],

)

```

Error Handling

```python

@app.exception_handler(Exception)

async def global_exception_handler(request: Request, exc: Exception):

return JSONResponse(

status_code=500,

content={"error": str(exc)},

)

@app.post("/api/chat")

async def chat(request: ChatRequest):

try:

# ... chat logic

pass

except ConnectionError:

raise HTTPException(503, "MCP server unavailable")

except TimeoutError:

raise HTTPException(504, "Request timed out")

except Exception as e:

raise HTTPException(500, f"Internal error: {e}")

```

Testing

```python

import pytest

from httpx import AsyncClient

@pytest.mark.asyncio

async def test_chat_endpoint():

async with AsyncClient(app=app, base_url="http://test") as client:

response = await client.post(

"/api/chat",

json={

"messages": [{"role": "user", "content": "Hello"}],

"user_id": "test-user",

},

)

assert response.status_code == 200

assert "text/event-stream" in response.headers["content-type"]

```

Best Practices

  1. Stream responses - Better UX with SSE streaming
  2. Persist conversations - Enable conversation continuity
  3. Authenticate requests - Verify user before processing
  4. Handle errors gracefully - Return structured error responses
  5. Configure CORS - Allow frontend domain
  6. Use connection pooling - For database and MCP connections
  7. Log conversations - For debugging and analytics