🎯

data-engineering

🎯Skill

from pluginagentmarketplace/custom-plugin-ai-data-scientist

VibeIndex|
What it does

Builds scalable data pipelines and infrastructure using Apache Spark, Airflow, and big data processing techniques for efficient ETL workflows.

πŸ“¦

Part of

pluginagentmarketplace/custom-plugin-ai-data-scientist(12 items)

data-engineering

Installation

Add MarketplaceAdd marketplace to Claude Code
/plugin marketplace add pluginagentmarketplace/custom-plugin-ai-data-scientist
Install PluginInstall plugin from marketplace
/plugin install ai-data-scientist-plugin@pluginagentmarketplace-ai-data-scientist
git cloneClone repository
git clone https://github.com/pluginagentmarketplace/custom-plugin-ai-data-scientist.git
Claude CodeAdd plugin in Claude Code
/plugin load .
πŸ“– Extracted from docs: pluginagentmarketplace/custom-plugin-ai-data-scientist
6Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

ETL pipelines, Apache Spark, data warehousing, and big data processing. Use for building data pipelines, processing large datasets, or data infrastructure.

Overview

# Data Engineering

Build scalable data pipelines and infrastructure for big data processing.

Quick Start with Apache Spark

```python

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, avg, sum, count

# Initialize Spark

spark = SparkSession.builder \

.appName("DataProcessing") \

.config("spark.executor.memory", "4g") \

.getOrCreate()

# Read data

df = spark.read.parquet("s3://bucket/data/")

# Transformations (lazy evaluation)

df_clean = df \

.filter(col("value") > 0) \

.groupBy("category") \

.agg(

sum("sales").alias("total_sales"),

avg("price").alias("avg_price"),

count("*").alias("count")

) \

.orderBy(col("total_sales").desc())

# Write results

df_clean.write \

.mode("overwrite") \

.partitionBy("date") \

.parquet("s3://bucket/output/")

```

ETL Pipeline with Apache Airflow

```python

from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime, timedelta

default_args = {

'owner': 'data-team',

'depends_on_past': False,

'start_date': datetime(2024, 1, 1),

'email_on_failure': True,

'retries': 3,

'retry_delay': timedelta(minutes=5),

}

dag = DAG(

'etl_pipeline',

default_args=default_args,

schedule_interval='@daily',

catchup=False

)

def extract(**context):

# Extract data from source

data = fetch_api_data()

context['task_instance'].xcom_push(key='raw_data', value=data)

def transform(**context):

# Transform data

data = context['task_instance'].xcom_pull(key='raw_data')

cleaned = clean_and_transform(data)

context['task_instance'].xcom_push(key='clean_data', value=cleaned)

def load(**context):

# Load to data warehouse

data = context['task_instance'].xcom_pull(key='clean_data')

load_to_warehouse(data)

extract_task = PythonOperator(

task_id='extract',

python_callable=extract,

dag=dag

)

transform_task = PythonOperator(

task_id='transform',

python_callable=transform,

dag=dag

)

load_task = PythonOperator(

task_id='load',

python_callable=load,

dag=dag

)

extract_task >> transform_task >> load_task

```

Data Warehousing

Star Schema Design

```sql

-- Fact Table

CREATE TABLE fact_sales (

sale_id SERIAL PRIMARY KEY,

date_key INT REFERENCES dim_date(date_key),

product_key INT REFERENCES dim_product(product_key),

customer_key INT REFERENCES dim_customer(customer_key),

quantity INT,

revenue DECIMAL(10,2),

cost DECIMAL(10,2)

);

-- Dimension Table

CREATE TABLE dim_product (

product_key INT PRIMARY KEY,

product_id VARCHAR(50),

product_name VARCHAR(200),

category VARCHAR(100),

brand VARCHAR(100)

);

```

Snowflake Data Warehouse

```sql

-- Create warehouse

CREATE WAREHOUSE compute_wh

WAREHOUSE_SIZE = 'MEDIUM'

AUTO_SUSPEND = 300

AUTO_RESUME = TRUE;

-- Load data from S3

COPY INTO sales_table

FROM 's3://bucket/data/'

FILE_FORMAT = (TYPE = 'PARQUET')

ON_ERROR = 'CONTINUE';

-- Clustering

ALTER TABLE sales CLUSTER BY (date, region);

-- Time travel

SELECT * FROM sales AT (OFFSET => -3600); -- 1 hour ago

```

Big Data Processing

Spark SQL

```python

# Register as temp view

df.createOrReplaceTempView("sales")

# SQL queries

result = spark.sql("""

SELECT

category,

SUM(sales) as total_sales,

AVG(price) as avg_price

FROM sales

WHERE date >= '2024-01-01'

GROUP BY category

HAVING SUM(sales) > 10000

ORDER BY total_sales DESC

""")

result.show()

```

Spark Optimization

```python

# Cache in memory

df.cache()

# Repartition

df.repartition(200)

# Broadcast small tables

from pyspark.sql.functions import broadcast

result = large_df.join(broadcast(small_df), "key")

# Persist

from pyspark.storagelevel import StorageLevel

df.persist(StorageLevel.MEMORY_AND_DISK)

```

Stream Processing with Kafka

```python

from kafka import KafkaProducer, KafkaConsumer

import json

# Producer

producer = KafkaProducer(

bootstrap_servers=['localhost:9092'],

value_serializer=lambda v: json.dumps(v).encode('utf-8')

)

producer.send('topic-name', {'key': 'value'})

# Consumer

consumer = KafkaConsumer(

'topic-name',

bootstrap_servers=['localhost:9092'],

value_deserializer=lambda m: json.loads(m.decode('utf-8')),

group_id='my-group',

auto_offset_reset='earliest'

)

for message in consumer:

process_message(message.value)

```

Data Quality Validation

```python

import great_expectations as ge

# Load data

df = ge.read_csv('data.csv')

# Define expectations

df.expect_column_values_to_not_be_null('user_id')

df.expect_column_values_to_be_unique('email')

df.expect_column_values_to_be_between('age', 0, 120)

df.expect_column_values_to_match_regex(

'email',

r'^[\w\.-]+@[\w\.-]+\.\w+$'

)

# Validate

results = df.validate()

print(results)

```

Delta Lake (Data Lakehouse)

```python

from delta.tables import DeltaTable

# Write to Delta

df.write.format("delta") \

.mode("overwrite") \

.save("/path/to/delta-table")

# Read from Delta

df = spark.read.format("delta").load("/path/to/delta-table")

# ACID transactions

deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

# Upsert (merge)

deltaTable.alias("target") \

.merge(

updates.alias("source"),

"target.id = source.id"

) \

.whenMatchedUpdate(set={"value": "source.value"}) \

.whenNotMatchedInsert(

values={"id": "source.id", "value": "source.value"}

) \

.execute()

# Time travel

df = spark.read.format("delta") \

.option("versionAsOf", 10) \

.load("/path/to/delta-table")

```

Best Practices

  1. Incremental processing: Process only new data
  2. Idempotency: Same input produces same output
  3. Data validation: Check quality at every stage
  4. Monitoring: Track pipeline health and performance
  5. Error handling: Retry logic, dead letter queues
  6. Partitioning: Partition large datasets by date/category
  7. Compression: Use Parquet, ORC for storage efficiency

More from this repository10

🎯
reinforcement-learning🎯Skill

Trains intelligent agents to learn optimal behaviors through interaction with environments using reinforcement learning techniques.

🎯
computer-vision🎯Skill

Processes and analyzes images using deep learning models for classification, detection, and visual understanding tasks.

🎯
machine-learning🎯Skill

Builds, trains, and evaluates machine learning models for classification, regression, and clustering using scikit-learn's powerful algorithms and techniques.

🎯
data-visualization🎯Skill

Generates interactive data visualizations and performs exploratory data analysis using Matplotlib, Seaborn, Plotly, and other visualization tools.

🎯
time-series🎯Skill

Performs time series analysis using ARIMA, SARIMA, Prophet, detecting trends, seasonality, and anomalies for precise temporal predictions.

🎯
statistical-analysis🎯Skill

Performs rigorous statistical analysis using Python's SciPy, enabling hypothesis testing, A/B testing, and data validation across various statistical methods.

🎯
python-programming🎯Skill

Enables efficient Python programming for data science, covering fundamentals, data manipulation, and advanced library usage with NumPy and Pandas.

🎯
model-optimization🎯Skill

Optimizes machine learning models through techniques like quantization, pruning, hyperparameter tuning, and AutoML for improved performance and efficiency.

🎯
deep-learning🎯Skill

Develops neural network models using PyTorch and TensorFlow for advanced machine learning tasks like image classification, NLP, and pattern recognition.

🎯
nlp-processing🎯Skill

nlp-processing skill from pluginagentmarketplace/custom-plugin-ai-data-scientist