⚙️ 5.2 Workflow Automation

Automate common nutrition data tasks so they’re repeatable, robust, and shareable. We’ll turn a manual analysis on large_food_log.csv into a small pipeline: clean → validate → transform → summarise → export.

You will: - Build reusable functions with clear inputs/outputs. - Parameterise runs (date ranges, filters, output paths). - Add sanity checks and lightweight logging. - Export tidy artefacts (CSV/Parquet) for downstream notebooks.

When to automate? When you repeat the same steps across files, days, or projects; when colleagues need to run it; or when you want reliable, version-controlled outputs.
# Setup for Google Colab: clone repo and locate data
import os
from google.colab import files

MODULE = '05_advanced'
DATASET = 'large_food_log.csv'
BASE_PATH = '/content/data-analysis-projects'
MODULE_PATH = os.path.join(BASE_PATH, 'notebooks', MODULE)
DATASET_PATH = os.path.join('data', DATASET)

try:
    print('Attempting to clone repository...')
    if not os.path.exists(BASE_PATH):
        !git clone https://github.com/ggkuhnle/data-analysis-projects.git
    print('Setting working directory...')
    os.chdir(MODULE_PATH)
    if os.path.exists(DATASET_PATH):
        print(f'Dataset found: {DATASET_PATH} ✅')
    else:
        raise FileNotFoundError('Dataset missing after clone.')
except Exception as e:
    print(f'Cloning failed: {e}')
    print('Falling back to manual upload...')
    os.makedirs('data', exist_ok=True)
    uploaded = files.upload()
    if DATASET in uploaded:
        with open(DATASET_PATH, 'wb') as f:
            f.write(uploaded[DATASET])
        print(f'Successfully uploaded {DATASET} ✅')
    else:
        raise FileNotFoundError(f'Upload failed. Please upload {DATASET}.')
%pip install -q pandas numpy pyarrow tqdm
import pandas as pd, numpy as np, datetime as dt, os, sys, json, textwrap, pathlib
from tqdm.auto import tqdm
import difflib
from typing import Union, Sequence, Dict, Any
pd.set_option('display.max_columns', 40)
print('Automation environment ready.')

🔧 Run Parameters

Central place to tweak what the pipeline does without changing code.

PARAMS = {
    "input_csv": "data/large_food_log.csv",
    "out_dir": "artifacts/5_2_automation",
    "date_col": "Date",
    "filters": {  # optional filters
        "Nutrient_in": ["Iron", "Calcium", "Vitamin_D", "Protein"]
    },
    "date_range": {  # optional date range
        "start": None,   # e.g., "2024-01-01"
        "end": None      # e.g., "2024-12-31"
    },
    "export": {
        "formats": ["csv", "parquet"],
        "summaries": [
            {"by": ["Date", "Nutrient"], "value": "Amount", "agg": "sum", "name": "sum_by_date_nutrient"},
            {"by": ["Meal", "Nutrient"], "value": "Amount", "agg": "mean", "name": "mean_by_meal_nutrient"}
        ]
    }
}
PARAMS

🧰 Reusable Helpers

Small, single-purpose functions make the pipeline readable and testable.

def log(msg: str):
    now = pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"[{now}] {msg}")

def ensure_dir(path: str):
    pathlib.Path(path).mkdir(parents=True, exist_ok=True)

def read_csv_safe(path: str) -> pd.DataFrame:
    log(f"Reading: {path}")
    df = pd.read_csv(path)
    if df.empty:
        raise ValueError("Input CSV is empty.")
    return df

def clean_columns(df: pd.DataFrame) -> pd.DataFrame:
    # lower snake_case
    df = df.copy()
    df.columns = [c.strip().replace(' ', '_') for c in df.columns]
    df.columns = [c.lower() for c in df.columns]
    return df

def parse_dates(df: pd.DataFrame, date_col: str) -> pd.DataFrame:
    df = df.copy()
    if date_col.lower() not in df.columns:
        raise KeyError(f"Date column '{date_col}' not found after cleaning.")
    df[date_col.lower()] = pd.to_datetime(df[date_col.lower()], errors='coerce')
    return df

def standardise_units(df: pd.DataFrame) -> pd.DataFrame:
    # Example: if Amount sometimes given in grams vs milligrams; here we assume it's already coherent
    # Add hooks here if your real data needs scaling (e.g., mg→g)
    return df

def apply_filters(df: pd.DataFrame, params: dict) -> pd.DataFrame:
    df = df.copy()
    f = params.get("filters", {})
    if f.get("Nutrient_in") and "nutrient" in df.columns:
        df = df[df["nutrient"].isin(f["Nutrient_in"])].copy()
    # Date range
    dr = params.get("date_range", {})
    date_col = params.get("date_col", "Date").lower()
    start = dr.get("start"); end = dr.get("end")
    if start:
        df = df[df[date_col] >= pd.to_datetime(start)]
    if end:
        df = df[df[date_col] <= pd.to_datetime(end)]
    return df

def validate(df: pd.DataFrame):
    # Minimal sanity checks; extend as needed
    required = {"id", "meal", "nutrient", "amount", "date"}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {sorted(missing)}")
    if df["amount"].isna().any():
        raise ValueError("Amount contains missing values.")
    if (df["amount"] < 0).any():
        raise ValueError("Amount contains negative values.")
    if df["date"].isna().any():
        raise ValueError("Date contains unparseable values.")
    return True

def summarise(df: pd.DataFrame, *, by, value: str, agg: str) -> pd.DataFrame:
    if agg not in {"sum", "mean", "median", "count"}:
        raise ValueError("Unsupported aggregation; choose from sum, mean, median, count")
    grp = df.groupby(by, observed=True)[value]
    res = getattr(grp, agg)().reset_index(name=f"{value}_{agg}")
    return res

def export(df: pd.DataFrame, out_dir: str, name: str, formats=("csv", "parquet")):
    ensure_dir(out_dir)
    if "csv" in formats:
        path_csv = os.path.join(out_dir, f"{name}.csv")
        df.to_csv(path_csv, index=False)
        log(f"Wrote: {path_csv}")
    if "parquet" in formats:
        path_pq = os.path.join(out_dir, f"{name}.parquet")
        df.to_parquet(path_pq, index=False)
        log(f"Wrote: {path_pq}")

def resolve_cols(df: pd.DataFrame, keys: Union[str, Sequence[str]], cfg: Dict[str, Any]) -> Union[str, list]:
    """
    Map requested column name(s) to actual DataFrame columns.
    - Case-insensitive match
    - Optional alias map via cfg['col_aliases']
    - Helpful error with close-name suggestions
    """
    if isinstance(keys, (list, tuple)):
        requested = list(keys)
    else:
        requested = [keys]

    cols = df.columns.tolist()
    lower_map = {c.lower(): c for c in cols}

    # Build alias map (cfg-specified + common date aliases)
    aliases = dict(cfg.get("col_aliases", {}))
    if "date_col" in cfg and cfg["date_col"] in cols:
        for k in ("Date", "DATE", "date"):
            aliases.setdefault(k, cfg["date_col"])

    resolved = []
    for k in requested:
        # direct hit
        if k in cols:
            resolved.append(k)
            continue
        # case-insensitive
        if k.lower() in lower_map:
            resolved.append(lower_map[k.lower()])
            continue
        # alias map
        if k in aliases and aliases[k] in cols:
            resolved.append(aliases[k])
            continue
        # suggestions
        sugg = difflib.get_close_matches(k, cols, n=3)
        raise KeyError(
            f"Column '{k}' not found. Available: {cols[:8]}... "
            + (f"Suggestions: {', '.join(sugg)}" if sugg else "")
        )
    return resolved if len(resolved) > 1 else resolved[0]

def summarise(df: pd.DataFrame, *, by: Union[str, Sequence[str], None], value: str, agg, cfg: Dict[str, Any]):
    """
    Grouped summary with robust column resolution.
    - by: str | list[str] | None
    - value: str
    - agg: any pandas-valid aggregator (e.g., 'sum', 'mean', ['mean','std'], dict, etc.)
    """
    value_col = resolve_cols(df, value, cfg)
    if by is None or (isinstance(by, (list, tuple)) and len(by) == 0):
        # Ungrouped aggregation
        out = pd.DataFrame({value_col: df[value_col].agg(agg)})
        out = out.T.reset_index().rename(columns={"index": value_col})
        return out
    by_cols = resolve_cols(df, by, cfg)
    return (
        df.groupby(by_cols, dropna=False, observed=False)[value_col]
          .agg(agg)
          .reset_index()
    )

🏗️ The Pipeline

All steps connected with logging and idempotent artefact writes.

def run_pipeline(params: dict):
    log("Starting pipeline…")
    cfg = params.copy()
    ensure_dir(cfg["out_dir"])

    # 1) Load
    raw = read_csv_safe(cfg["input_csv"])

    # 2) Clean/standardise
    df = clean_columns(raw)
    df = parse_dates(df, cfg["date_col"])      # ensure this creates cfg["date_col"]
    df = standardise_units(df)

    # 3) Validate
    validate(df)

    # 4) Filter (optional)
    df_f = apply_filters(df, cfg)
    log(f"Columns after cleaning/filtering: {list(df_f.columns)}")
    log(f"Rows after filtering: {len(df_f):,}")

    # 5) Export a cleaned/filtered snapshot for reproducibility
    export(df_f, cfg["out_dir"], name="clean_filtered", formats=cfg["export"]["formats"])

    # 6) Summaries
    outputs = {}
    for spec in cfg["export"]["summaries"]:
        res = summarise(df_f, by=spec.get("by"), value=spec["value"], agg=spec["agg"], cfg=cfg)
        export(res, cfg["out_dir"], name=spec["name"], formats=cfg["export"]["formats"])
        outputs[spec["name"]] = res

    log("Pipeline complete.")
    return {"data": df_f, "outputs": outputs}
# Run the pipeline if artifacts aren't present yet
def _ensure_artifacts():
    global artifacts
    if "artifacts" in globals() and isinstance(artifacts, dict):
        return artifacts
    if "PARAMS" not in globals():
        raise NameError(
            "PARAMS is not defined. Define a PARAMS dict first with keys like: "
            "input_csv, out_dir, date_col, export={'formats': [...], 'summaries': [...]}, etc."
        )
    artifacts = run_pipeline(PARAMS)
    return artifacts

arts = _ensure_artifacts()

# Show summaries (if any)
outs = arts.get("outputs", {})
if not outs:
    print("No summaries were produced. Check PARAMS['export']['summaries'].")
else:
    for name, df_out in outs.items():
        print(f"\n=== {name} ===")
        try:
            display(df_out.head(8))
        except Exception:
            print(df_out.head(8))

👀 Quick Peek at Outputs

for name, df_out in artifacts["outputs"].items():
    print(f"\n=== {name} ===")
    display(df_out.head(8))

✅ Lightweight Tests

A few assertions catch common regressions. Add more for your project’s rules.

df_clean = artifacts["data"]
assert (df_clean["amount"] >= 0).all(), "Found negative amounts after cleaning!"
assert df_clean["date"].dtype.kind == 'M', "Date column must be datetime."
for name, df_out in artifacts["outputs"].items():
    assert not df_out.empty, f"Summary {name} is empty."
print("All sanity checks passed.")

🧪 Parameterised Runs (Examples)

Change filters/date ranges without touching pipeline internals.

params_alt = PARAMS.copy()
params_alt["out_dir"] = "artifacts/5_2_automation_subset"
params_alt["filters"] = {"Nutrient_in": ["Iron", "Vitamin_D"]}
params_alt["date_range"] = {"start": None, "end": None}

artifacts_alt = run_pipeline(params_alt)
print("Done with parameterised run.")

🖥️ (Optional) CLI Script Template

Copy this to a file (e.g., run_pipeline.py) if you want to run it outside notebooks.

Show script
import json, argparse
from your_module import run_pipeline  # import functions from a module/package

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument('--params', type=str, required=True, help='Path to params JSON file')
    args = ap.parse_args()
    with open(args.params) as f:
        params = json.load(f)
    run_pipeline(params)

if __name__ == '__main__':
    main()

🧩 Exercises

  1. New Summary: Add a third summary: total Amount by ID × Nutrient and export it.
  2. Unit Standardisation: Assume some Amount values are grams (g) and others mg; add a conversion step so everything is mg.
  3. Robustness: Add a validation rule that flags days where total Amount exceeds a threshold per nutrient.
  4. Performance: If your input grows to millions of rows, refactor read_csv_safe to stream in chunks and aggregate incrementally.

✅ Wrap-up

You now have a small but robust automation pipeline: parameterised, validated, and exporting tidy artefacts other notebooks can consume.

Further reading
  • Pandas GroupBy: https://pandas.pydata.org/docs/user_guide/groupby.html
  • Arrow/Parquet: https://arrow.apache.org/
  • tqdm (progress bars): https://tqdm.github.io/