batch-processing
π―Skillfrom lobbi-docs/claude
Processes large document sets in bulk using Claude's Message Batches API, reducing processing costs by 50% for non-time-sensitive workloads.
Installation
npx skills add https://github.com/lobbi-docs/claude --skill batch-processingSkill Details
Message Batches API for Claude with 50% cost savings on bulk processing. Activate for batch jobs, JSONL processing, bulk analysis, and cost optimization.
Overview
# Batch Processing Skill
Leverage Anthropic's Message Batches API for 50% cost savings on bulk processing workloads.
When to Use This Skill
- Processing 10K+ documents
- Model evaluation and benchmarks
- ETL and data enrichment
- Training data generation
- Bulk content analysis
- Any non-time-sensitive workload
Cost Savings
| Processing Type | Cost |
|----------------|------|
| Standard API | 100% |
| Batch API | 50% |
Example: 1M tokens @ $3/1M = $3 (standard) vs $1.50 (batch)
Batch Lifecycle
```
Created β Processing β Ended (completed/failed/expired/canceled)
```
- Processing time: Up to 24 hours
- Results retention: 29 days
- Full feature support: Tools, vision, system prompts
Core Implementation
Step 1: Create JSONL File
```python
import json
# Each line is a complete request
requests = [
{
"custom_id": "request-1",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Summarize: Document 1..."}]
}
},
{
"custom_id": "request-2",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Summarize: Document 2..."}]
}
}
]
# Write JSONL file
with open("batch_requests.jsonl", "w") as f:
for request in requests:
f.write(json.dumps(request) + "\n")
```
Step 2: Submit Batch
```python
import anthropic
client = anthropic.Anthropic()
# Create batch from file
with open("batch_requests.jsonl", "rb") as f:
batch = client.beta.messages.batches.create(
requests=f
)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
```
Step 3: Poll for Completion
```python
import time
def wait_for_batch(client, batch_id, poll_interval=60):
"""Poll until batch completes"""
while True:
batch = client.beta.messages.batches.retrieve(batch_id)
if batch.processing_status == "ended":
print(f"Batch completed!")
print(f" Succeeded: {batch.request_counts.succeeded}")
print(f" Errored: {batch.request_counts.errored}")
print(f" Expired: {batch.request_counts.expired}")
return batch
print(f"Status: {batch.processing_status} - waiting...")
time.sleep(poll_interval)
```
Step 4: Stream Results
```python
def process_results(client, batch_id):
"""Stream and process batch results"""
results = {}
for result in client.beta.messages.batches.results(batch_id):
custom_id = result.custom_id
if result.result.type == "succeeded":
message = result.result.message
results[custom_id] = {
"status": "success",
"content": message.content[0].text,
"usage": message.usage
}
elif result.result.type == "errored":
results[custom_id] = {
"status": "error",
"error": result.result.error
}
elif result.result.type == "expired":
results[custom_id] = {
"status": "expired"
}
return results
```
Complete Workflow
```python
import anthropic
import json
import time
def run_batch_job(documents):
"""Complete batch processing workflow"""
client = anthropic.Anthropic()
# 1. Create requests
requests = []
for i, doc in enumerate(documents):
requests.append({
"custom_id": f"doc-{i}",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [{"role": "user", "content": f"Summarize: {doc}"}]
}
})
# 2. Write JSONL
with open("batch.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
# 3. Submit batch
with open("batch.jsonl", "rb") as f:
batch = client.beta.messages.batches.create(requests=f)
print(f"Batch submitted: {batch.id}")
# 4. Wait for completion
while True:
batch = client.beta.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
time.sleep(60)
# 5. Collect results
results = {}
for result in client.beta.messages.batches.results(batch.id):
if result.result.type == "succeeded":
results[result.custom_id] = result.result.message.content[0].text
return results
```
Error Handling
Retry Failed Requests
```python
def retry_failed(client, original_batch_id):
"""Retry failed and expired requests"""
retry_requests = []
for result in client.beta.messages.batches.results(original_batch_id):
if result.result.type in ["errored", "expired"]:
# Re-create the request (you'll need to store original params)
retry_requests.append({
"custom_id": result.custom_id,
"params": get_original_params(result.custom_id) # Your implementation
})
if retry_requests:
# Write and submit retry batch
with open("retry.jsonl", "w") as f:
for req in retry_requests:
f.write(json.dumps(req) + "\n")
with open("retry.jsonl", "rb") as f:
return client.beta.messages.batches.create(requests=f)
return None
```
Error Types
| Result Type | Action |
|-------------|--------|
| succeeded | Process normally |
| errored | Check error, may retry |
| expired | Request took >24h, retry |
| canceled | Batch was canceled |
API Reference
Endpoints
| Method | Endpoint | Purpose |
|--------|----------|---------|
| POST | /v1/messages/batches | Create batch |
| GET | /v1/messages/batches/{id} | Get batch status |
| GET | /v1/messages/batches | List batches |
| POST | /v1/messages/batches/{id}/cancel | Cancel batch |
| GET | /v1/messages/batches/{id}/results | Stream results |
Request Format
```json
{
"custom_id": "unique-identifier",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"system": "Optional system prompt",
"messages": [{"role": "user", "content": "..."}],
"tools": [],
"temperature": 0.7
}
}
```
Best Practices
DO:
- Use for workloads >100 requests
- Include unique custom_id for tracking
- Store original params for retry logic
- Monitor batch status via polling
- Process results as streaming JSONL
DON'T:
- Use for time-sensitive requests
- Expect immediate results
- Forget to handle expired requests
- Ignore error results
Cost Optimization Tips
- Batch similar requests - Same model, similar prompts
- Combine with caching - Cache static context before batch
- Use appropriate model - Haiku for simple tasks, Sonnet for complex
- Optimize prompts - Shorter prompts = lower cost
Use Case: Document Processing
```python
# Process 10,000 documents at 50% cost
documents = load_documents("data/*.txt") # 10K files
# Create batch with consistent format
requests = [{
"custom_id": doc.id,
"params": {
"model": "claude-haiku-4-20250514", # Cheapest for bulk
"max_tokens": 512,
"system": "Extract key information as JSON.",
"messages": [{"role": "user", "content": doc.content}]
}
} for doc in documents]
# Run batch job
results = run_batch_job(requests)
# Cost: ~50% of standard API!
```
See Also
- llm-integration - API basics
- prompt-caching - Cache context
- streaming - Real-time output
More from this repository10
Manages Google Cloud Platform services like GKE, Cloud Run, Cloud Storage, BigQuery, and Pub/Sub for comprehensive cloud infrastructure and deployment.
helm skill from lobbi-docs/claude
tool-use skill from lobbi-docs/claude
Enables deep, systematic reasoning for complex problems by configuring Claude's internal thinking process with configurable reasoning budgets.
llmintegration skill from lobbi-docs/claude
scrum skill from lobbi-docs/claude
Indexes and retrieves text documents efficiently using vector embeddings for semantic search and similarity matching.
Retrieves and grounds AI responses with precise document citations, enabling verifiable and contextually rich information extraction.
Applies and blends 50+ professional design styles to components, enabling cohesive and distinctive AI-powered visual aesthetics.
Automatically prioritizes and categorizes new Jira tickets by analyzing their content and recommending initial triage actions.