d6e-docker-stf-development
π―Skillfrom d6e-ai/d6e-docker-stf-skills
Develops containerized State Transition Functions (STFs) for D6E platform workflows, enabling custom data processing and business logic in Docker containers.
Installation
docker run --rm -it --entrypoint /bin/bash my-stf:latestSkill Details
Creates custom Docker-based State Transition Functions (STFs) for D6E platform workflows. Use when building containerized business logic for D6E, implementing data processing steps, or creating workflow functions that need database access. Handles JSON input/output, SQL API integration, and multi-language implementations (Python, Node.js, Go).
Overview
# D6E Docker STF Development
Overview
Docker STFs are containerized applications that execute as workflow steps in D6E. They read JSON from stdin, process data with custom logic, access workspace databases via internal API, and output JSON to stdout.
When to Use
Apply this skill when users request:
- "Create a D6E Docker STF that..."
- "Build a custom STF for D6E that..."
- "I need a Docker-based workflow step..."
- "Help me create a data processing function for D6E"
Core Concepts
Input Format
Docker STFs receive this JSON via stdin:
```json
{
"workspace_id": "UUID",
"stf_id": "UUID",
"caller": "UUID | null",
"api_url": "http://api:8080",
"api_token": "internal_token",
"input": {
"operation": "...",
...user-defined parameters
},
"sources": {
"step_name": {
"output": {...previous step data}
}
}
}
```
Output Format
Success:
```json
{
"output": {
"status": "success",
...custom result data
}
}
```
Error:
```json
{
"error": "Error message",
"type": "ErrorType"
}
```
SQL API Access
Execute SQL via internal API:
Endpoint: POST /api/v1/workspaces/{workspace_id}/sql
Headers:
```
Authorization: Bearer {api_token}
X-Internal-Bypass: true
X-Workspace-ID: {workspace_id}
X-STF-ID: {stf_id}
```
Request:
```json
{ "sql": "SELECT * FROM my_table LIMIT 10" }
```
Restrictions:
- No DDL (CREATE, DROP, ALTER)
- Policy-controlled access
- Workspace scope only
Quick Start
Python Implementation
main.py:
```python
#!/usr/bin/env python3
import sys
import json
import requests
import logging
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
def execute_sql(api_url, api_token, workspace_id, stf_id, sql):
"""Execute SQL via D6E internal API"""
url = f"{api_url}/api/v1/workspaces/{workspace_id}/sql"
headers = {
"Authorization": f"Bearer {api_token}",
"X-Internal-Bypass": "true",
"X-Workspace-ID": workspace_id,
"X-STF-ID": stf_id,
"Content-Type": "application/json"
}
response = requests.post(url, json={"sql": sql}, headers=headers)
response.raise_for_status()
return response.json()
def main():
try:
input_data = json.load(sys.stdin)
user_input = input_data["input"]
# Your business logic here
result = {"status": "success", "message": "Processed"}
print(json.dumps({"output": result}))
except Exception as e:
logging.error(f"Error: {str(e)}", exc_info=True)
print(json.dumps({"error": str(e), "type": type(e).__name__}))
sys.exit(1)
if __name__ == "__main__":
main()
```
Dockerfile:
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
RUN chmod +x main.py
ENTRYPOINT ["python3", "main.py"]
```
requirements.txt:
```
requests>=2.31.0
```
Node.js Implementation
index.js:
```javascript
const axios = require("axios");
async function executeSql(apiUrl, apiToken, workspaceId, stfId, sql) {
const response = await axios.post(
${apiUrl}/api/v1/workspaces/${workspaceId}/sql,
{ sql },
{
headers: {
Authorization: Bearer ${apiToken},
"X-Internal-Bypass": "true",
"X-Workspace-ID": workspaceId,
"X-STF-ID": stfId,
"Content-Type": "application/json",
},
}
);
return response.data;
}
async function main() {
try {
const input = await readStdin();
const data = JSON.parse(input);
// Your business logic here
const result = { status: "success", message: "Processed" };
console.log(JSON.stringify({ output: result }));
} catch (error) {
console.error("Error:", error.message);
console.log(
JSON.stringify({
error: error.message,
type: error.name,
})
);
process.exit(1);
}
}
function readStdin() {
return new Promise((resolve) => {
let data = "";
process.stdin.on("data", (chunk) => (data += chunk));
process.stdin.on("end", () => resolve(data));
});
}
main();
```
Dockerfile:
```dockerfile
FROM node:18-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci --omit=dev
COPY index.js .
ENTRYPOINT ["node", "index.js"]
```
Implementation Checklist
When creating a Docker STF, ensure:
- [ ] Reads JSON from stdin
- [ ] Outputs JSON to stdout (
{"output": {...}}) - [ ] Logs to stderr (stdout is for results only)
- [ ] Handles errors gracefully
- [ ] Uses small base images (e.g.,
python:3.11-slim) - [ ] Includes error type in error responses
- [ ] Validates input parameters
- [ ] Uses environment variables for configuration
Best Practices
Security
- Never log sensitive data (tokens, passwords)
- Validate all user inputs
- Use parameterized SQL queries
- Keep dependencies up-to-date
Performance
- Use multi-stage builds to reduce image size
- Minimize dependencies
- Add
.dockerignoreto exclude unnecessary files - Cache pip/npm installations
Error Handling
```python
try:
# Your logic
result = process_data(input_data)
print(json.dumps({"output": result}))
except ValueError as e:
# Validation errors
logging.error(f"Validation error: {str(e)}")
print(json.dumps({"error": str(e), "type": "ValidationError"}))
sys.exit(1)
except Exception as e:
# Unexpected errors
logging.error(f"Unexpected error: {str(e)}", exc_info=True)
print(json.dumps({"error": str(e), "type": type(e).__name__}))
sys.exit(1)
```
Logging
```python
import logging
# Log to stderr
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logging.info("Processing started")
logging.debug(f"Input: {input_data}") # Detailed logs
logging.warning("Deprecated operation used")
logging.error("Failed to process", exc_info=True)
```
Common Patterns
Data Validation Pattern
```python
def validate_input(user_input):
required_fields = ["operation", "table_name"]
for field in required_fields:
if field not in user_input:
raise ValueError(f"Missing required field: {field}")
if user_input["operation"] not in ["query", "insert", "update"]:
raise ValueError(f"Invalid operation: {user_input['operation']}")
return True
# Usage
try:
validate_input(input_data["input"])
except ValueError as e:
print(json.dumps({"error": str(e), "type": "ValidationError"}))
sys.exit(1)
```
Database Query Pattern
```python
def safe_query(api_context, table_name, filters):
"""Execute a safe parameterized query"""
# Build WHERE clause safely
where_conditions = []
for key, value in filters.items():
# Simple validation
if not key.isidentifier():
raise ValueError(f"Invalid column name: {key}")
where_conditions.append(f"{key} = '{value}'")
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
sql = f"SELECT * FROM {table_name} WHERE {where_clause} LIMIT 100"
return execute_sql(
api_context["api_url"],
api_context["api_token"],
api_context["workspace_id"],
api_context["stf_id"],
sql
)
```
External API Pattern
```python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session():
"""Create session with retry logic"""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def call_external_api(url, params):
"""Call external API with error handling"""
session = create_session()
try:
response = session.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.Timeout:
raise Exception("External API timeout")
except requests.RequestException as e:
raise Exception(f"External API error: {str(e)}")
```
Testing Locally
Build and Test
```bash
# Build image
docker build -t my-stf:latest .
# Test with sample input
echo '{
"workspace_id": "test-id",
"stf_id": "test-stf-id",
"caller": null,
"api_url": "http://localhost:8080",
"api_token": "test-token",
"input": {
"operation": "test"
},
"sources": {}
}' | docker run --rm -i my-stf:latest
```
Debug Mode
```bash
# Run with interactive shell
docker run --rm -it --entrypoint /bin/bash my-stf:latest
# Check image size
docker images my-stf:latest
# Inspect logs
docker run --rm -i my-stf:latest < input.json 2>&1 | tee output.log
```
Troubleshooting
Issue: "Policy violation" error
Cause: STF doesn't have permission to access the table.
Solution: Create policies:
```javascript
// Create policy group
d6e_create_policy_group({ name: "my-stf-group" });
// Add STF to group
d6e_add_member_to_policy_group({
policy_group_id: "{group_id}",
member_type: "stf",
member_id: "{stf_id}",
});
// Grant access
d6e_create_policy({
policy_group_id: "{group_id}",
table_name: "my_table",
operation: "select",
mode: "allow",
});
```
Issue: Output not appearing in D6E
Cause: Output not in correct JSON format.
Solution: Always use {"output": {...}} format:
```python
# β Correct
print(json.dumps({"output": {"status": "success"}}))
# β Wrong
print(json.dumps({"status": "success"}))
```
Issue: "Image not found" in D6E
Cause: Image not accessible from D6E API server.
Solution:
- Publish to container registry (GitHub, Docker Hub)
- Or ensure same Docker daemon as D6E API server
Issue: Large image size
Solution: Use multi-stage builds:
```dockerfile
# Build stage
FROM python:3.11 AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# Runtime stage
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY main.py .
ENV PATH=/root/.local/bin:$PATH
ENTRYPOINT ["python3", "main.py"]
```
File Requirements
Every Docker STF should include:
```
my-stf/
βββ main.py (or index.js, main.go) # Entry point
βββ Dockerfile # Container definition
βββ requirements.txt (or package.json, go.mod) # Dependencies
βββ .dockerignore # Exclude files
βββ README.md # Documentation
```
.dockerignore:
```
.git
.gitignore
*.md
tests/
__pycache__/
*.pyc
node_modules/
.env
```
Additional Resources
For detailed information:
- Complete API reference: [reference.md](reference.md)
- More implementation examples: [examples.md](examples.md)
- Quick start guide: [../docs/QUICKSTART.md](../../docs/QUICKSTART.md)
- Testing guide: [../docs/TESTING.md](../../docs/TESTING.md)
- Publishing guide: [../docs/PUBLISHING.md](../../docs/PUBLISHING.md)