Skip to main content

Automated Reporting Pipelines

This guide shows how to set up automated pipelines for uploading emissions data, validating quality, and generating reports on a schedule.

Architecture Overview

A typical automated pipeline follows this flow:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Source    │────▢│  Transform  │────▢│   Upload    │────▢│   Report    β”‚
β”‚   Systems   β”‚     β”‚  & Validate β”‚     β”‚  to Dcycle  β”‚     β”‚  & Alert    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     ERP              Data prep           CLI/API           Dashboards
     TMS              Quality checks      Bulk upload       Notifications
     Fleet mgmt       Format conversion   Error handling    Stakeholders

Quick Start: Monthly Upload Pipeline

Here’s a minimal pipeline that uploads monthly logistics data:
#!/bin/bash
# monthly_upload.sh

set -e  # Exit on error

# Configuration
MONTH=$(date -d "last month" +%Y-%m)
DATA_DIR="/data/exports"

echo "πŸ“Š Starting monthly upload for $MONTH"

# Upload transport requests
echo "Uploading transport requests..."
dc logistics upload "$DATA_DIR/viajes_$MONTH.csv" --type requests --yes

# Upload fuel recharges
echo "Uploading fuel recharges..."
dc logistics upload "$DATA_DIR/consumos_$MONTH.csv" --type recharges --yes

echo "βœ… Monthly upload complete"
Schedule with cron:
# Run on the 2nd of each month at 6 AM
0 6 2 * * /scripts/monthly_upload.sh >> /var/log/dcycle_upload.log 2>&1

GitHub Actions Pipeline

For teams using GitHub, here’s a complete CI/CD workflow:
# .github/workflows/sustainability-pipeline.yml
name: Sustainability Data Pipeline

on:
  schedule:
    # Run every Monday at 6 AM UTC
    - cron: '0 6 * * 1'
  workflow_dispatch:  # Allow manual trigger

env:
  DCYCLE_API_KEY: ${{ secrets.DCYCLE_API_KEY }}
  DCYCLE_ORG_ID: ${{ secrets.DCYCLE_ORG_ID }}

jobs:
  upload-emissions-data:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install Dcycle CLI
        run: pip install dcycle-cli

      - name: Validate data files
        run: |
          # Check required files exist
          for file in data/viajes.csv data/consumos.csv; do
            if [ ! -f "$file" ]; then
              echo "❌ Missing required file: $file"
              exit 1
            fi
          done

          # Validate CSV structure
          python scripts/validate_csv.py data/viajes.csv
          python scripts/validate_csv.py data/consumos.csv

      - name: Upload transport requests
        run: |
          dc logistics upload data/viajes.csv --type requests --yes

      - name: Upload fuel recharges
        run: |
          dc logistics upload data/consumos.csv --type recharges --yes

      - name: Verify upload
        run: |
          # Check recent data appears
          dc logistics requests list --from $(date +%Y-%m-01) --format json > /tmp/requests.json
          COUNT=$(jq length /tmp/requests.json)
          echo "βœ… Verified $COUNT requests uploaded"

      - name: Notify on failure
        if: failure()
        uses: slackapi/slack-github-action@v1
        with:
          payload: |
            {
              "text": "❌ Sustainability data upload failed",
              "blocks": [
                {
                  "type": "section",
                  "text": {
                    "type": "mrkdwn",
                    "text": "Pipeline failed: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
                  }
                }
              ]
            }
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

Data Validation

Always validate data before uploading to catch errors early:
# scripts/validate_csv.py
import csv
import sys
from datetime import datetime

def validate_viajes(filepath):
    """Validate transport requests CSV"""
    errors = []
    required_columns = ['date', 'vehicle_plate', 'origin', 'destination', 'distance_km']

    with open(filepath, 'r') as f:
        reader = csv.DictReader(f)

        # Check columns
        missing = set(required_columns) - set(reader.fieldnames)
        if missing:
            errors.append(f"Missing columns: {missing}")
            return errors

        # Validate rows
        for i, row in enumerate(reader, start=2):
            # Check date format
            try:
                datetime.strptime(row['date'], '%Y-%m-%d')
            except ValueError:
                errors.append(f"Row {i}: Invalid date format '{row['date']}'")

            # Check distance is positive
            try:
                if float(row['distance_km']) <= 0:
                    errors.append(f"Row {i}: Distance must be positive")
            except ValueError:
                errors.append(f"Row {i}: Invalid distance '{row['distance_km']}'")

            # Check required fields not empty
            for col in ['origin', 'destination', 'vehicle_plate']:
                if not row.get(col, '').strip():
                    errors.append(f"Row {i}: Missing {col}")

    return errors

if __name__ == '__main__':
    filepath = sys.argv[1]
    errors = validate_viajes(filepath)

    if errors:
        print(f"❌ Validation failed with {len(errors)} errors:")
        for error in errors[:10]:  # Show first 10
            print(f"  - {error}")
        if len(errors) > 10:
            print(f"  ... and {len(errors) - 10} more")
        sys.exit(1)

    print(f"βœ… Validation passed: {filepath}")

Multi-Source Pipeline

For organizations with multiple data sources:
#!/bin/bash
# comprehensive_pipeline.sh

set -e

MONTH=${1:-$(date -d "last month" +%Y-%m)}
LOG_FILE="/var/log/dcycle/pipeline_$MONTH.log"

log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 1. Export from source systems
log "πŸ“₯ Exporting from source systems..."

# Fleet data from TMS
curl -s "$TMS_API/exports/fleet?month=$MONTH" > /tmp/fleet_$MONTH.csv

# Facility invoices from ERP
python scripts/export_erp_invoices.py --month $MONTH > /tmp/invoices_$MONTH.csv

# Purchase data
python scripts/export_purchases.py --month $MONTH > /tmp/purchases_$MONTH.csv

# 2. Transform and validate
log "πŸ”„ Transforming and validating..."

python scripts/transform_fleet.py /tmp/fleet_$MONTH.csv > /tmp/vehicles_$MONTH.csv
python scripts/validate_all.py /tmp/*.csv

# 3. Upload to Dcycle
log "πŸ“€ Uploading to Dcycle..."

dc vehicle upload /tmp/vehicles_$MONTH.csv --yes
log "  βœ“ Vehicles uploaded"

dc invoice upload /tmp/invoices_$MONTH.csv --yes
log "  βœ“ Invoices uploaded"

dc purchase upload /tmp/purchases_$MONTH.csv --yes
log "  βœ“ Purchases uploaded"

# 4. Verify and report
log "βœ… Verifying uploads..."

VEHICLE_COUNT=$(dc vehicle list --format json | jq length)
INVOICE_COUNT=$(dc invoice list --from ${MONTH}-01 --format json | jq length)

log "Summary:"
log "  - Vehicles: $VEHICLE_COUNT"
log "  - Invoices: $INVOICE_COUNT"

# 5. Send notification
log "πŸ“§ Sending notification..."

curl -X POST "$SLACK_WEBHOOK" \
  -H 'Content-Type: application/json' \
  -d "{
    \"text\": \"βœ… Monthly sustainability data uploaded for $MONTH\",
    \"blocks\": [
      {
        \"type\": \"section\",
        \"fields\": [
          {\"type\": \"mrkdwn\", \"text\": \"*Vehicles:* $VEHICLE_COUNT\"},
          {\"type\": \"mrkdwn\", \"text\": \"*Invoices:* $INVOICE_COUNT\"}
        ]
      }
    ]
  }"

log "πŸŽ‰ Pipeline complete!"

Error Handling Patterns

Retry with Backoff

upload_with_retry() {
    local file=$1
    local type=$2
    local max_attempts=3
    local delay=30

    for attempt in $(seq 1 $max_attempts); do
        echo "Attempt $attempt/$max_attempts: Uploading $file..."

        if dc logistics upload "$file" --type "$type" --yes 2>&1; then
            echo "βœ… Upload successful"
            return 0
        fi

        if [ $attempt -lt $max_attempts ]; then
            echo "⚠️ Failed, retrying in ${delay}s..."
            sleep $delay
            delay=$((delay * 2))  # Exponential backoff
        fi
    done

    echo "❌ Upload failed after $max_attempts attempts"
    return 1
}

Partial Failure Handling

# Track failures but continue processing
FAILED_FILES=()

for file in data/*.csv; do
    if ! dc logistics upload "$file" --type requests --yes; then
        FAILED_FILES+=("$file")
        echo "⚠️ Failed: $file (will retry later)"
    fi
done

# Report failures
if [ ${#FAILED_FILES[@]} -gt 0 ]; then
    echo "❌ ${#FAILED_FILES[@]} files failed:"
    printf '  - %s\n' "${FAILED_FILES[@]}"

    # Alert team
    send_alert "Upload failures" "${FAILED_FILES[*]}"

    exit 1
fi

Monitoring & Alerts

Data Quality Dashboard

# daily_health_check.sh

echo "πŸ” Running daily health check..."

# Check for missing data
FACILITIES=$(dc facility list --format json | jq length)
VEHICLES=$(dc vehicle list --format json | jq length)
RECENT_UPLOADS=$(dc logistics requests list --from $(date -d "7 days ago" +%Y-%m-%d) --format json | jq length)

# Alert if counts drop significantly
if [ $RECENT_UPLOADS -lt 10 ]; then
    send_alert "Low upload volume" "Only $RECENT_UPLOADS requests in last 7 days"
fi

# Check for failed jobs
FAILED_JOBS=$(dc logistics jobs list --status failed --format json | jq length)
if [ $FAILED_JOBS -gt 0 ]; then
    send_alert "Failed processing jobs" "$FAILED_JOBS jobs need attention"
fi

echo "Health check complete"

Best Practices

Idempotent Operations

Design pipelines to be safely re-runnable. Use --yes flag and handle duplicates gracefully.

Audit Trail

Log all operations with timestamps. Store input files for debugging and compliance.

Incremental Updates

Upload only new/changed data when possible. Use date filters to avoid reprocessing.

Alerting

Set up notifications for failures and anomalies. Don’t let issues go unnoticed.

Next Steps