Python
```python
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, List, Dict
import base64
import json
@dataclass
class PaginatedPosts:
posts: List[Dict]
total_count: int
has_more: bool
next_cursor: Optional[str]
class CommunityFeedService:
"""Service for community feed with cursor pagination."""
def __init__(self, db):
self.db = db
async def get_feed(
self,
feed_type: str = "trending",
viewer_id: Optional[str] = None,
cursor: Optional[str] = None,
limit: int = 20,
tags: Optional[List[str]] = None,
) -> PaginatedPosts:
cursor_data = self._parse_cursor(cursor) if cursor else None
query = self.db.table("community_posts").select(
"*",
"assets!inner(url, asset_type)",
"users!inner(id, display_name, avatar_url)",
)
if tags:
query = query.contains("tags", tags)
# Feed-specific ordering
if feed_type == "following" and viewer_id:
following = await self._get_following_ids(viewer_id)
if not following:
return PaginatedPosts(posts=[], total_count=0, has_more=False, next_cursor=None)
query = query.in_("user_id", following)
if feed_type == "trending":
query = query.order("trending_score", desc=True)
if cursor_data:
query = query.lt("trending_score", cursor_data["score"])
else:
query = query.order("created_at", desc=True)
if cursor_data:
query = query.lt("created_at", cursor_data["created_at"])
# Fetch one extra to check has_more
query = query.limit(limit + 1)
result = query.execute()
posts = result.data or []
has_more = len(posts) > limit
if has_more:
posts = posts[:limit]
# Batch load viewer's likes
if viewer_id and posts:
liked_ids = await self._get_liked_post_ids(viewer_id, [p["id"] for p in posts])
for post in posts:
post["is_liked_by_viewer"] = post["id"] in liked_ids
next_cursor = None
if has_more and posts:
next_cursor = self._generate_cursor(posts[-1], feed_type)
return PaginatedPosts(
posts=posts,
total_count=await self._get_total_count(tags),
has_more=has_more,
next_cursor=next_cursor,
)
async def _get_following_ids(self, user_id: str) -> List[str]:
result = self.db.table("user_follows").select("following_id").eq("follower_id", user_id).execute()
return [r["following_id"] for r in (result.data or [])]
async def _get_liked_post_ids(self, user_id: str, post_ids: List[str]) -> set:
result = self.db.table("post_likes").select("post_id").eq("user_id", user_id).in_("post_id", post_ids).execute()
return {r["post_id"] for r in (result.data or [])}
def _parse_cursor(self, cursor: str) -> dict:
try:
return json.loads(base64.b64decode(cursor).decode())
except:
return {}
def _generate_cursor(self, post: dict, feed_type: str) -> str:
if feed_type == "trending":
data = {"score": post.get("trending_score", 0)}
else:
data = {"created_at": post["created_at"]}
return base64.b64encode(json.dumps(data).encode()).decode()
```
```python
# Engagement operations
async def like_post(self, post_id: str, user_id: str) -> bool:
existing = self.db.table("post_likes").select("id").eq("post_id", post_id).eq("user_id", user_id).execute()
if existing.data:
return False # Already liked
self.db.table("post_likes").insert({
"post_id": post_id,
"user_id": user_id,
"created_at": datetime.now(timezone.utc).isoformat(),
}).execute()
# Atomic increment
self.db.rpc("increment_like_count", {"post_id": post_id}).execute()
return True
async def unlike_post(self, post_id: str, user_id: str) -> bool:
result = self.db.table("post_likes").delete().eq("post_id", post_id).eq("user_id", user_id).execute()
if not result.data:
return False
self.db.rpc("decrement_like_count", {"post_id": post_id}).execute()
return True
# Trending algorithm
def calculate_trending_score(
like_count: int,
comment_count: int,
view_count: int,
created_at: datetime,
is_featured: bool = False,
) -> float:
"""
Trending score = engagement / age^decay
Higher engagement + newer = higher score
"""
engagement = like_count 1.0 + comment_count 2.0 + view_count * 0.1
age_hours = max((datetime.now(timezone.utc) - created_at).total_seconds() / 3600, 0.1)
score = engagement / (age_hours ** 1.5)
return score * 1.5 if is_featured else score
```
SQL Schema
```sql
CREATE TABLE community_posts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
title VARCHAR(200) NOT NULL,
like_count INTEGER DEFAULT 0,
comment_count INTEGER DEFAULT 0,
view_count INTEGER DEFAULT 0,
is_featured BOOLEAN DEFAULT FALSE,
tags TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
-- Computed trending score
trending_score FLOAT GENERATED ALWAYS AS (
(like_count + comment_count 2 + view_count 0.1) /
POWER(GREATEST(EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600, 0.1), 1.5)
) STORED
);
CREATE INDEX idx_posts_trending ON community_posts(trending_score DESC);
CREATE INDEX idx_posts_created ON community_posts(created_at DESC);
-- Atomic increment functions
CREATE FUNCTION increment_like_count(post_id UUID) RETURNS VOID AS $$
BEGIN UPDATE community_posts SET like_count = like_count + 1 WHERE id = post_id; END;
$$ LANGUAGE plpgsql;
```