🎯

snapshot-aggregation

🎯Skill

from dadbodgeoff/drift

VibeIndex|
What it does

Compresses daily time-series data with intelligent merge logic, enabling efficient historical dashboards and storage capacity planning.

πŸ“¦

Part of

dadbodgeoff/drift(69 items)

snapshot-aggregation

Installation

npm installInstall npm package
npm install -g driftdetect
npm installInstall npm package
npm install -g driftdetect@latest
npm installInstall npm package
npm install -g driftdetect-mcp
Claude Desktop ConfigurationAdd this to your claude_desktop_config.json
{ "mcpServers": { "drift": { "command": "driftdetect-mcp" } } ...
πŸ“– Extracted from docs: dadbodgeoff/drift
4Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

Daily compression of time-series data with merge logic for multiple pipeline runs, structured aggregation for dashboards, and storage estimation for capacity planning.

Overview

# Snapshot Aggregation

Daily compression with merge logic and storage estimation for time-series data.

When to Use This Skill

  • Raw event data grows too fast for direct queries
  • Need daily snapshots for historical dashboards
  • Pipeline runs multiple times per day and needs merge logic
  • Want storage estimation for capacity planning

Core Concepts

Raw event data grows fast. Daily snapshots provide:

  • Historical dashboards without querying millions of rows
  • 90-day retention with minimal storage (~2-3KB/day)
  • Multiple pipeline runs per day that merge correctly
  • Storage estimation for capacity planning

Key insight: Multiple runs per day must merge correctly, not overwrite.

Implementation

TypeScript

```typescript

interface DailySnapshot {

snapshotDate: string; // YYYY-MM-DD

totalArticles: number;

totalEvents: number;

totalClusters: number;

avgRiskScore: number;

maxRiskScore: number;

categoryTotals: Record;

topHotspots: Hotspot[]; // Top 10 by risk

keyEvents: KeyEvent[]; // Top 20 by risk

countryStats: Record;

riskTrend: number; // vs previous day

pipelineRuns: number; // How many runs merged

createdAt: string;

updatedAt: string;

}

interface Hotspot {

country: string;

countryCode: string;

lat: number;

lon: number;

riskScore: number;

eventCount: number;

summary: string;

}

interface KeyEvent {

id: string;

title: string;

country: string;

riskScore: number;

timestamp: string;

}

interface CountryStats {

events: number;

avgRisk: number;

maxRisk: number;

topCategory: string;

}

/**

* Aggregate pipeline results into a daily snapshot

*/

function aggregateToSnapshot(

result: PipelineResult,

rawEvents: RawEvent[] = [],

date: string = new Date().toISOString().split('T')[0]

): DailySnapshot {

const { predictions, stats } = result;

// Calculate category totals

const categoryTotals: Record = {};

for (const pred of predictions) {

if (pred.categoryCounts) {

for (const [cat, count] of Object.entries(pred.categoryCounts)) {

categoryTotals[cat] = (categoryTotals[cat] || 0) + count;

}

}

}

// Extract top 10 hotspots

const topHotspots: Hotspot[] = predictions

.slice(0, 10)

.map(p => ({

country: getCountryName(p.countryCode),

countryCode: p.countryCode,

lat: p.lat,

lon: p.lon,

riskScore: p.riskScore,

eventCount: p.eventCount,

summary: p.summary,

}));

// Extract key events (top 20 by risk)

const keyEvents: KeyEvent[] = rawEvents

.sort((a, b) => b.riskScore - a.riskScore)

.slice(0, 20)

.map(e => ({

id: e.id,

title: truncate(e.title, 120),

country: e.country,

riskScore: e.riskScore,

timestamp: new Date().toISOString(),

}));

// Build country stats

const countryStats: Record = {};

for (const pred of predictions) {

const country = pred.countryCode;

if (!countryStats[country]) {

countryStats[country] = {

events: 0,

avgRisk: 0,

maxRisk: 0,

topCategory: pred.category,

};

}

countryStats[country].events += pred.eventCount;

countryStats[country].maxRisk = Math.max(

countryStats[country].maxRisk,

pred.riskScore

);

}

// Aggregate stats

const totalEvents = predictions.reduce((sum, p) => sum + p.eventCount, 0);

const avgRiskScore = predictions.length > 0

? Math.round(predictions.reduce((sum, p) => sum + p.riskScore, 0) / predictions.length)

: 0;

const maxRiskScore = predictions.length > 0

? Math.max(...predictions.map(p => p.riskScore))

: 0;

return {

snapshotDate: date,

totalArticles: stats.totalFetched,

totalEvents,

totalClusters: stats.clustersFormed,

avgRiskScore,

maxRiskScore,

categoryTotals,

topHotspots,

keyEvents,

countryStats,

riskTrend: 0,

pipelineRuns: 1,

createdAt: new Date().toISOString(),

updatedAt: new Date().toISOString(),

};

}

/**

* Merge multiple snapshots from the same day

*/

function mergeSnapshots(

existing: DailySnapshot,

incoming: DailySnapshot

): DailySnapshot {

// Merge category totals (take max - later run has more complete data)

const categoryTotals = { ...existing.categoryTotals };

for (const [cat, count] of Object.entries(incoming.categoryTotals)) {

categoryTotals[cat] = Math.max(categoryTotals[cat] || 0, count);

}

// Merge hotspots (keep top 10 by risk, dedupe by location)

const allHotspots = [...existing.topHotspots, ...incoming.topHotspots];

const uniqueHotspots = dedupeByKey(

allHotspots,

h => ${h.lat.toFixed(1)},${h.lon.toFixed(1)}

);

const topHotspots = uniqueHotspots

.sort((a, b) => b.riskScore - a.riskScore)

.slice(0, 10);

// Merge key events (keep top 20 by risk, dedupe by ID)

const allEvents = [...existing.keyEvents, ...incoming.keyEvents];

const uniqueEvents = dedupeByKey(allEvents, e => e.id);

const keyEvents = uniqueEvents

.sort((a, b) => b.riskScore - a.riskScore)

.slice(0, 20);

// Merge country stats (take max values)

const countryStats = { ...existing.countryStats };

for (const [country, stats] of Object.entries(incoming.countryStats)) {

if (!countryStats[country]) {

countryStats[country] = stats;

} else {

countryStats[country] = {

events: Math.max(countryStats[country].events, stats.events),

avgRisk: Math.round(

(countryStats[country].avgRisk + stats.avgRisk) / 2

),

maxRisk: Math.max(countryStats[country].maxRisk, stats.maxRisk),

topCategory: stats.maxRisk > countryStats[country].maxRisk

? stats.topCategory

: countryStats[country].topCategory,

};

}

}

return {

...existing,

totalArticles: existing.totalArticles + incoming.totalArticles,

totalEvents: Math.max(existing.totalEvents, incoming.totalEvents),

totalClusters: Math.max(existing.totalClusters, incoming.totalClusters),

avgRiskScore: Math.round(

(existing.avgRiskScore + incoming.avgRiskScore) / 2

),

maxRiskScore: Math.max(existing.maxRiskScore, incoming.maxRiskScore),

categoryTotals,

topHotspots,

keyEvents,

countryStats,

pipelineRuns: existing.pipelineRuns + 1,

updatedAt: new Date().toISOString(),

};

}

/**

* Estimate storage size of a snapshot

*/

function estimateSnapshotSize(snapshot: DailySnapshot): number {

return JSON.stringify(snapshot).length;

}

// Helpers

function truncate(str: string, maxLen: number): string {

return str.length > maxLen ? str.slice(0, maxLen - 3) + '...' : str;

}

function dedupeByKey(arr: T[], keyFn: (item: T) => string): T[] {

const seen = new Set();

return arr.filter(item => {

const key = keyFn(item);

if (seen.has(key)) return false;

seen.add(key);

return true;

});

}

```

Usage Examples

Save Daily Snapshot

```typescript

async function saveDailySnapshot(result: PipelineResult) {

const today = new Date().toISOString().split('T')[0];

const newSnapshot = aggregateToSnapshot(result, result.rawEvents, today);

// Check for existing snapshot today

const { data: existing } = await supabase

.from('daily_snapshots')

.select('*')

.eq('snapshot_date', today)

.single();

if (existing) {

// Merge with existing

const merged = mergeSnapshots(existing, newSnapshot);

await supabase

.from('daily_snapshots')

.update(merged)

.eq('snapshot_date', today);

} else {

// Insert new

await supabase

.from('daily_snapshots')

.insert(newSnapshot);

}

console.log(Snapshot size: ${estimateSnapshotSize(newSnapshot)} bytes);

}

```

Database Schema

```sql

CREATE TABLE daily_snapshots (

id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

snapshot_date DATE UNIQUE NOT NULL,

total_articles INTEGER DEFAULT 0,

total_events INTEGER DEFAULT 0,

avg_risk_score INTEGER DEFAULT 0,

max_risk_score INTEGER DEFAULT 0,

category_totals JSONB DEFAULT '{}',

top_hotspots JSONB DEFAULT '[]',

key_events JSONB DEFAULT '[]',

country_stats JSONB DEFAULT '{}',

pipeline_runs INTEGER DEFAULT 1,

created_at TIMESTAMPTZ DEFAULT now(),

updated_at TIMESTAMPTZ DEFAULT now()

);

CREATE INDEX idx_snapshots_date ON daily_snapshots(snapshot_date DESC);

```

Best Practices

  1. Structured aggregation - Not just counts, but top-N lists and breakdowns
  2. Merge logic - Multiple runs per day combine correctly
  3. Deduplication - Hotspots/events dedupe by key before merging
  4. Storage estimation - Track size for capacity planning
  5. Retention policy - Auto-cleanup old snapshots (90 days)

Common Mistakes

  • Simple daily counts (lose detail for dashboards)
  • Overwriting on multiple runs (lose data)
  • No deduplication in merge (duplicate hotspots)
  • Unbounded arrays (memory issues)
  • No retention policy (storage grows forever)

Related Patterns

  • batch-processing - Process events before aggregation
  • geographic-clustering - Cluster events for hotspots
  • checkpoint-resume - Track which data has been aggregated

More from this repository10

🎯
feature-flags🎯Skill

Enables controlled feature rollouts, A/B testing, and selective feature access through configurable flags for gradual deployment and user targeting.

🎯
design-tokens🎯Skill

Generates a comprehensive, type-safe design token system with WCAG AA color compliance and multi-framework support for consistent visual design.

🎯
file-uploads🎯Skill

Securely validates, scans, and processes file uploads with multi-stage checks, malware detection, and race condition prevention.

🎯
ai-coaching🎯Skill

Guides users through articulating creative intent by extracting structured parameters and detecting conversation readiness.

🎯
environment-config🎯Skill

Validates and centralizes environment variables with type safety, fail-fast startup checks, and multi-environment support.

🎯
community-feed🎯Skill

Generates efficient social feed with cursor pagination, trending algorithms, and engagement tracking for infinite scroll experiences.

🎯
cloud-storage🎯Skill

Enables secure, multi-tenant cloud file storage with signed URLs, direct uploads, and visibility control for user-uploaded assets.

🎯
email-service🎯Skill

Simplifies email sending, templating, and tracking with robust SMTP integration and support for multiple email providers and transactional workflows.

🎯
error-sanitization🎯Skill

Sanitizes error messages by logging full details server-side while exposing only generic, safe messages to prevent sensitive information leakage.

🎯
batch-processing🎯Skill

Optimizes database operations by collecting and batching independent records, improving throughput by 30-40% with built-in fallback processing.