🎯

community-feed

🎯Skill

from dadbodgeoff/drift

VibeIndex|
What it does

Generates efficient social feed with cursor pagination, trending algorithms, and engagement tracking for infinite scroll experiences.

πŸ“¦

Part of

dadbodgeoff/drift(69 items)

community-feed

Installation

npm installInstall npm package
npm install -g driftdetect
npm installInstall npm package
npm install -g driftdetect@latest
npm installInstall npm package
npm install -g driftdetect-mcp
Claude Desktop ConfigurationAdd this to your claude_desktop_config.json
{ "mcpServers": { "drift": { "command": "driftdetect-mcp" } } ...
πŸ“– Extracted from docs: dadbodgeoff/drift
5Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

Social feed with batch queries, cursor pagination, trending algorithms, and engagement tracking. Efficient database queries for infinite scroll feeds.

Overview

# Community Feed

Social feed with trending algorithms, cursor pagination, and engagement tracking.

When to Use This Skill

  • Building social feeds with infinite scroll
  • Need trending/hot content algorithms
  • Implementing like/engagement systems
  • Want efficient pagination for large datasets

Core Concepts

Cursor pagination beats offset for large datasets. Batch-load relationships to avoid N+1. Store trending scores as computed columns for efficient sorting.

Implementation

Python

```python

from dataclasses import dataclass

from datetime import datetime, timezone

from typing import Optional, List, Dict

import base64

import json

@dataclass

class PaginatedPosts:

posts: List[Dict]

total_count: int

has_more: bool

next_cursor: Optional[str]

class CommunityFeedService:

"""Service for community feed with cursor pagination."""

def __init__(self, db):

self.db = db

async def get_feed(

self,

feed_type: str = "trending",

viewer_id: Optional[str] = None,

cursor: Optional[str] = None,

limit: int = 20,

tags: Optional[List[str]] = None,

) -> PaginatedPosts:

cursor_data = self._parse_cursor(cursor) if cursor else None

query = self.db.table("community_posts").select(

"*",

"assets!inner(url, asset_type)",

"users!inner(id, display_name, avatar_url)",

)

if tags:

query = query.contains("tags", tags)

# Feed-specific ordering

if feed_type == "following" and viewer_id:

following = await self._get_following_ids(viewer_id)

if not following:

return PaginatedPosts(posts=[], total_count=0, has_more=False, next_cursor=None)

query = query.in_("user_id", following)

if feed_type == "trending":

query = query.order("trending_score", desc=True)

if cursor_data:

query = query.lt("trending_score", cursor_data["score"])

else:

query = query.order("created_at", desc=True)

if cursor_data:

query = query.lt("created_at", cursor_data["created_at"])

# Fetch one extra to check has_more

query = query.limit(limit + 1)

result = query.execute()

posts = result.data or []

has_more = len(posts) > limit

if has_more:

posts = posts[:limit]

# Batch load viewer's likes

if viewer_id and posts:

liked_ids = await self._get_liked_post_ids(viewer_id, [p["id"] for p in posts])

for post in posts:

post["is_liked_by_viewer"] = post["id"] in liked_ids

next_cursor = None

if has_more and posts:

next_cursor = self._generate_cursor(posts[-1], feed_type)

return PaginatedPosts(

posts=posts,

total_count=await self._get_total_count(tags),

has_more=has_more,

next_cursor=next_cursor,

)

async def _get_following_ids(self, user_id: str) -> List[str]:

result = self.db.table("user_follows").select("following_id").eq("follower_id", user_id).execute()

return [r["following_id"] for r in (result.data or [])]

async def _get_liked_post_ids(self, user_id: str, post_ids: List[str]) -> set:

result = self.db.table("post_likes").select("post_id").eq("user_id", user_id).in_("post_id", post_ids).execute()

return {r["post_id"] for r in (result.data or [])}

def _parse_cursor(self, cursor: str) -> dict:

try:

return json.loads(base64.b64decode(cursor).decode())

except:

return {}

def _generate_cursor(self, post: dict, feed_type: str) -> str:

if feed_type == "trending":

data = {"score": post.get("trending_score", 0)}

else:

data = {"created_at": post["created_at"]}

return base64.b64encode(json.dumps(data).encode()).decode()

```

```python

# Engagement operations

async def like_post(self, post_id: str, user_id: str) -> bool:

existing = self.db.table("post_likes").select("id").eq("post_id", post_id).eq("user_id", user_id).execute()

if existing.data:

return False # Already liked

self.db.table("post_likes").insert({

"post_id": post_id,

"user_id": user_id,

"created_at": datetime.now(timezone.utc).isoformat(),

}).execute()

# Atomic increment

self.db.rpc("increment_like_count", {"post_id": post_id}).execute()

return True

async def unlike_post(self, post_id: str, user_id: str) -> bool:

result = self.db.table("post_likes").delete().eq("post_id", post_id).eq("user_id", user_id).execute()

if not result.data:

return False

self.db.rpc("decrement_like_count", {"post_id": post_id}).execute()

return True

# Trending algorithm

def calculate_trending_score(

like_count: int,

comment_count: int,

view_count: int,

created_at: datetime,

is_featured: bool = False,

) -> float:

"""

Trending score = engagement / age^decay

Higher engagement + newer = higher score

"""

engagement = like_count 1.0 + comment_count 2.0 + view_count * 0.1

age_hours = max((datetime.now(timezone.utc) - created_at).total_seconds() / 3600, 0.1)

score = engagement / (age_hours ** 1.5)

return score * 1.5 if is_featured else score

```

SQL Schema

```sql

CREATE TABLE community_posts (

id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

user_id UUID NOT NULL REFERENCES users(id),

title VARCHAR(200) NOT NULL,

like_count INTEGER DEFAULT 0,

comment_count INTEGER DEFAULT 0,

view_count INTEGER DEFAULT 0,

is_featured BOOLEAN DEFAULT FALSE,

tags TEXT[] DEFAULT '{}',

created_at TIMESTAMPTZ DEFAULT NOW(),

-- Computed trending score

trending_score FLOAT GENERATED ALWAYS AS (

(like_count + comment_count 2 + view_count 0.1) /

POWER(GREATEST(EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600, 0.1), 1.5)

) STORED

);

CREATE INDEX idx_posts_trending ON community_posts(trending_score DESC);

CREATE INDEX idx_posts_created ON community_posts(created_at DESC);

-- Atomic increment functions

CREATE FUNCTION increment_like_count(post_id UUID) RETURNS VOID AS $$

BEGIN UPDATE community_posts SET like_count = like_count + 1 WHERE id = post_id; END;

$$ LANGUAGE plpgsql;

```

Usage Examples

```python

feed_service = CommunityFeedService(db)

# Get trending feed

result = await feed_service.get_feed(feed_type="trending", viewer_id="user_123", limit=20)

for post in result.posts:

print(f"{post['title']} - {post['like_count']} likes")

# Load next page

if result.has_more:

next_page = await feed_service.get_feed(feed_type="trending", cursor=result.next_cursor)

# Like a post

await feed_service.like_post("post_456", "user_123")

```

Best Practices

  1. Use cursor pagination over offset for large datasets
  2. Batch-load relationships to avoid N+1 queries
  3. Store trending score as computed column for efficient sorting
  4. Use atomic database functions for counter updates
  5. Cache total counts (expensive to compute)

Common Mistakes

  • Using offset pagination (slow for large offsets)
  • N+1 queries for author/like data
  • Computing trending score on every query
  • Non-atomic counter updates (race conditions)

Related Patterns

  • analytics-pipeline (event tracking)
  • intelligent-cache (caching feeds)

More from this repository10

🎯
file-uploads🎯Skill

Securely validates, scans, and processes file uploads with multi-stage checks, malware detection, and race condition prevention.

🎯
design-tokens🎯Skill

Generates a comprehensive, type-safe design token system with WCAG AA color compliance and multi-framework support for consistent visual design.

🎯
feature-flags🎯Skill

Enables controlled feature rollouts, A/B testing, and selective feature access through configurable flags for gradual deployment and user targeting.

🎯
ai-coaching🎯Skill

Guides users through articulating creative intent by extracting structured parameters and detecting conversation readiness.

🎯
environment-config🎯Skill

Validates and centralizes environment variables with type safety, fail-fast startup checks, and multi-environment support.

🎯
email-service🎯Skill

Simplifies email sending, templating, and tracking with robust SMTP integration and support for multiple email providers and transactional workflows.

🎯
cloud-storage🎯Skill

Enables secure, multi-tenant cloud file storage with signed URLs, direct uploads, and visibility control for user-uploaded assets.

🎯
error-sanitization🎯Skill

Sanitizes error messages by logging full details server-side while exposing only generic, safe messages to prevent sensitive information leakage.

🎯
batch-processing🎯Skill

Optimizes database operations by collecting and batching independent records, improving throughput by 30-40% with built-in fallback processing.

🎯
rate-limiting🎯Skill

Implements tier-based API rate limiting using a sliding window algorithm with Redis or in-memory storage for fair usage control.