Automated Reporting Pipelines
This guide shows how to set up automated pipelines for uploading emissions data, validating quality, and generating reports on a schedule.Architecture Overview
A typical automated pipeline follows this flow:Copy
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Source ββββββΆβ Transform ββββββΆβ Upload ββββββΆβ Report β
β Systems β β & Validate β β to Dcycle β β & Alert β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
ERP Data prep CLI/API Dashboards
TMS Quality checks Bulk upload Notifications
Fleet mgmt Format conversion Error handling Stakeholders
Quick Start: Monthly Upload Pipeline
Hereβs a minimal pipeline that uploads monthly logistics data:Copy
#!/bin/bash
# monthly_upload.sh
set -e # Exit on error
# Configuration
MONTH=$(date -d "last month" +%Y-%m)
DATA_DIR="/data/exports"
echo "π Starting monthly upload for $MONTH"
# Upload transport requests
echo "Uploading transport requests..."
dc logistics upload "$DATA_DIR/viajes_$MONTH.csv" --type requests --yes
# Upload fuel recharges
echo "Uploading fuel recharges..."
dc logistics upload "$DATA_DIR/consumos_$MONTH.csv" --type recharges --yes
echo "β
Monthly upload complete"
Copy
# Run on the 2nd of each month at 6 AM
0 6 2 * * /scripts/monthly_upload.sh >> /var/log/dcycle_upload.log 2>&1
GitHub Actions Pipeline
For teams using GitHub, hereβs a complete CI/CD workflow:Copy
# .github/workflows/sustainability-pipeline.yml
name: Sustainability Data Pipeline
on:
schedule:
# Run every Monday at 6 AM UTC
- cron: '0 6 * * 1'
workflow_dispatch: # Allow manual trigger
env:
DCYCLE_API_KEY: ${{ secrets.DCYCLE_API_KEY }}
DCYCLE_ORG_ID: ${{ secrets.DCYCLE_ORG_ID }}
jobs:
upload-emissions-data:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install Dcycle CLI
run: pip install dcycle-cli
- name: Validate data files
run: |
# Check required files exist
for file in data/viajes.csv data/consumos.csv; do
if [ ! -f "$file" ]; then
echo "β Missing required file: $file"
exit 1
fi
done
# Validate CSV structure
python scripts/validate_csv.py data/viajes.csv
python scripts/validate_csv.py data/consumos.csv
- name: Upload transport requests
run: |
dc logistics upload data/viajes.csv --type requests --yes
- name: Upload fuel recharges
run: |
dc logistics upload data/consumos.csv --type recharges --yes
- name: Verify upload
run: |
# Check recent data appears
dc logistics requests list --from $(date +%Y-%m-01) --format json > /tmp/requests.json
COUNT=$(jq length /tmp/requests.json)
echo "β
Verified $COUNT requests uploaded"
- name: Notify on failure
if: failure()
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": "β Sustainability data upload failed",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Pipeline failed: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
}
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
Data Validation
Always validate data before uploading to catch errors early:Copy
# scripts/validate_csv.py
import csv
import sys
from datetime import datetime
def validate_viajes(filepath):
"""Validate transport requests CSV"""
errors = []
required_columns = ['date', 'vehicle_plate', 'origin', 'destination', 'distance_km']
with open(filepath, 'r') as f:
reader = csv.DictReader(f)
# Check columns
missing = set(required_columns) - set(reader.fieldnames)
if missing:
errors.append(f"Missing columns: {missing}")
return errors
# Validate rows
for i, row in enumerate(reader, start=2):
# Check date format
try:
datetime.strptime(row['date'], '%Y-%m-%d')
except ValueError:
errors.append(f"Row {i}: Invalid date format '{row['date']}'")
# Check distance is positive
try:
if float(row['distance_km']) <= 0:
errors.append(f"Row {i}: Distance must be positive")
except ValueError:
errors.append(f"Row {i}: Invalid distance '{row['distance_km']}'")
# Check required fields not empty
for col in ['origin', 'destination', 'vehicle_plate']:
if not row.get(col, '').strip():
errors.append(f"Row {i}: Missing {col}")
return errors
if __name__ == '__main__':
filepath = sys.argv[1]
errors = validate_viajes(filepath)
if errors:
print(f"β Validation failed with {len(errors)} errors:")
for error in errors[:10]: # Show first 10
print(f" - {error}")
if len(errors) > 10:
print(f" ... and {len(errors) - 10} more")
sys.exit(1)
print(f"β
Validation passed: {filepath}")
Multi-Source Pipeline
For organizations with multiple data sources:Copy
#!/bin/bash
# comprehensive_pipeline.sh
set -e
MONTH=${1:-$(date -d "last month" +%Y-%m)}
LOG_FILE="/var/log/dcycle/pipeline_$MONTH.log"
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 1. Export from source systems
log "π₯ Exporting from source systems..."
# Fleet data from TMS
curl -s "$TMS_API/exports/fleet?month=$MONTH" > /tmp/fleet_$MONTH.csv
# Facility invoices from ERP
python scripts/export_erp_invoices.py --month $MONTH > /tmp/invoices_$MONTH.csv
# Purchase data
python scripts/export_purchases.py --month $MONTH > /tmp/purchases_$MONTH.csv
# 2. Transform and validate
log "π Transforming and validating..."
python scripts/transform_fleet.py /tmp/fleet_$MONTH.csv > /tmp/vehicles_$MONTH.csv
python scripts/validate_all.py /tmp/*.csv
# 3. Upload to Dcycle
log "π€ Uploading to Dcycle..."
dc vehicle upload /tmp/vehicles_$MONTH.csv --yes
log " β Vehicles uploaded"
dc invoice upload /tmp/invoices_$MONTH.csv --yes
log " β Invoices uploaded"
dc purchase upload /tmp/purchases_$MONTH.csv --yes
log " β Purchases uploaded"
# 4. Verify and report
log "β
Verifying uploads..."
VEHICLE_COUNT=$(dc vehicle list --format json | jq length)
INVOICE_COUNT=$(dc invoice list --from ${MONTH}-01 --format json | jq length)
log "Summary:"
log " - Vehicles: $VEHICLE_COUNT"
log " - Invoices: $INVOICE_COUNT"
# 5. Send notification
log "π§ Sending notification..."
curl -X POST "$SLACK_WEBHOOK" \
-H 'Content-Type: application/json' \
-d "{
\"text\": \"β
Monthly sustainability data uploaded for $MONTH\",
\"blocks\": [
{
\"type\": \"section\",
\"fields\": [
{\"type\": \"mrkdwn\", \"text\": \"*Vehicles:* $VEHICLE_COUNT\"},
{\"type\": \"mrkdwn\", \"text\": \"*Invoices:* $INVOICE_COUNT\"}
]
}
]
}"
log "π Pipeline complete!"
Error Handling Patterns
Retry with Backoff
Copy
upload_with_retry() {
local file=$1
local type=$2
local max_attempts=3
local delay=30
for attempt in $(seq 1 $max_attempts); do
echo "Attempt $attempt/$max_attempts: Uploading $file..."
if dc logistics upload "$file" --type "$type" --yes 2>&1; then
echo "β
Upload successful"
return 0
fi
if [ $attempt -lt $max_attempts ]; then
echo "β οΈ Failed, retrying in ${delay}s..."
sleep $delay
delay=$((delay * 2)) # Exponential backoff
fi
done
echo "β Upload failed after $max_attempts attempts"
return 1
}
Partial Failure Handling
Copy
# Track failures but continue processing
FAILED_FILES=()
for file in data/*.csv; do
if ! dc logistics upload "$file" --type requests --yes; then
FAILED_FILES+=("$file")
echo "β οΈ Failed: $file (will retry later)"
fi
done
# Report failures
if [ ${#FAILED_FILES[@]} -gt 0 ]; then
echo "β ${#FAILED_FILES[@]} files failed:"
printf ' - %s\n' "${FAILED_FILES[@]}"
# Alert team
send_alert "Upload failures" "${FAILED_FILES[*]}"
exit 1
fi
Monitoring & Alerts
Data Quality Dashboard
Copy
# daily_health_check.sh
echo "π Running daily health check..."
# Check for missing data
FACILITIES=$(dc facility list --format json | jq length)
VEHICLES=$(dc vehicle list --format json | jq length)
RECENT_UPLOADS=$(dc logistics requests list --from $(date -d "7 days ago" +%Y-%m-%d) --format json | jq length)
# Alert if counts drop significantly
if [ $RECENT_UPLOADS -lt 10 ]; then
send_alert "Low upload volume" "Only $RECENT_UPLOADS requests in last 7 days"
fi
# Check for failed jobs
FAILED_JOBS=$(dc logistics jobs list --status failed --format json | jq length)
if [ $FAILED_JOBS -gt 0 ]; then
send_alert "Failed processing jobs" "$FAILED_JOBS jobs need attention"
fi
echo "Health check complete"
Best Practices
Idempotent Operations
Design pipelines to be safely re-runnable. Use
--yes flag and handle duplicates gracefully.Audit Trail
Log all operations with timestamps. Store input files for debugging and compliance.
Incremental Updates
Upload only new/changed data when possible. Use date filters to avoid reprocessing.
Alerting
Set up notifications for failures and anomalies. Donβt let issues go unnoticed.

