# Setup for Google Colab: clone repo and locate data
import os
from google.colab import files
= '05_advanced'
MODULE = 'large_food_log.csv'
DATASET = '/content/data-analysis-projects'
BASE_PATH = os.path.join(BASE_PATH, 'notebooks', MODULE)
MODULE_PATH = os.path.join('data', DATASET)
DATASET_PATH
try:
print('Attempting to clone repository...')
if not os.path.exists(BASE_PATH):
!git clone https://github.com/ggkuhnle/data-analysis-projects.git
print('Setting working directory...')
os.chdir(MODULE_PATH)if os.path.exists(DATASET_PATH):
print(f'Dataset found: {DATASET_PATH} ✅')
else:
raise FileNotFoundError('Dataset missing after clone.')
except Exception as e:
print(f'Cloning failed: {e}')
print('Falling back to manual upload...')
'data', exist_ok=True)
os.makedirs(= files.upload()
uploaded if DATASET in uploaded:
with open(DATASET_PATH, 'wb') as f:
f.write(uploaded[DATASET])print(f'Successfully uploaded {DATASET} ✅')
else:
raise FileNotFoundError(f'Upload failed. Please upload {DATASET}.')
⚙️ 5.2 Workflow Automation
Automate common nutrition data tasks so they’re repeatable, robust, and shareable. We’ll turn a manual analysis on large_food_log.csv
into a small pipeline: clean → validate → transform → summarise → export.
You will: - Build reusable functions with clear inputs/outputs. - Parameterise runs (date ranges, filters, output paths). - Add sanity checks and lightweight logging. - Export tidy artefacts (CSV/Parquet) for downstream notebooks.
When to automate?
When you repeat the same steps across files, days, or projects; when colleagues need to run it; or when you want reliable, version-controlled outputs.%pip install -q pandas numpy pyarrow tqdm
import pandas as pd, numpy as np, datetime as dt, os, sys, json, textwrap, pathlib
from tqdm.auto import tqdm
import difflib
from typing import Union, Sequence, Dict, Any
'display.max_columns', 40)
pd.set_option(print('Automation environment ready.')
🔧 Run Parameters
Central place to tweak what the pipeline does without changing code.
= {
PARAMS "input_csv": "data/large_food_log.csv",
"out_dir": "artifacts/5_2_automation",
"date_col": "Date",
"filters": { # optional filters
"Nutrient_in": ["Iron", "Calcium", "Vitamin_D", "Protein"]
},"date_range": { # optional date range
"start": None, # e.g., "2024-01-01"
"end": None # e.g., "2024-12-31"
},"export": {
"formats": ["csv", "parquet"],
"summaries": [
"by": ["Date", "Nutrient"], "value": "Amount", "agg": "sum", "name": "sum_by_date_nutrient"},
{"by": ["Meal", "Nutrient"], "value": "Amount", "agg": "mean", "name": "mean_by_meal_nutrient"}
{
]
}
} PARAMS
🧰 Reusable Helpers
Small, single-purpose functions make the pipeline readable and testable.
def log(msg: str):
= pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')
now print(f"[{now}] {msg}")
def ensure_dir(path: str):
=True, exist_ok=True)
pathlib.Path(path).mkdir(parents
def read_csv_safe(path: str) -> pd.DataFrame:
f"Reading: {path}")
log(= pd.read_csv(path)
df if df.empty:
raise ValueError("Input CSV is empty.")
return df
def clean_columns(df: pd.DataFrame) -> pd.DataFrame:
# lower snake_case
= df.copy()
df = [c.strip().replace(' ', '_') for c in df.columns]
df.columns = [c.lower() for c in df.columns]
df.columns return df
def parse_dates(df: pd.DataFrame, date_col: str) -> pd.DataFrame:
= df.copy()
df if date_col.lower() not in df.columns:
raise KeyError(f"Date column '{date_col}' not found after cleaning.")
= pd.to_datetime(df[date_col.lower()], errors='coerce')
df[date_col.lower()] return df
def standardise_units(df: pd.DataFrame) -> pd.DataFrame:
# Example: if Amount sometimes given in grams vs milligrams; here we assume it's already coherent
# Add hooks here if your real data needs scaling (e.g., mg→g)
return df
def apply_filters(df: pd.DataFrame, params: dict) -> pd.DataFrame:
= df.copy()
df = params.get("filters", {})
f if f.get("Nutrient_in") and "nutrient" in df.columns:
= df[df["nutrient"].isin(f["Nutrient_in"])].copy()
df # Date range
= params.get("date_range", {})
dr = params.get("date_col", "Date").lower()
date_col = dr.get("start"); end = dr.get("end")
start if start:
= df[df[date_col] >= pd.to_datetime(start)]
df if end:
= df[df[date_col] <= pd.to_datetime(end)]
df return df
def validate(df: pd.DataFrame):
# Minimal sanity checks; extend as needed
= {"id", "meal", "nutrient", "amount", "date"}
required = required - set(df.columns)
missing if missing:
raise ValueError(f"Missing required columns: {sorted(missing)}")
if df["amount"].isna().any():
raise ValueError("Amount contains missing values.")
if (df["amount"] < 0).any():
raise ValueError("Amount contains negative values.")
if df["date"].isna().any():
raise ValueError("Date contains unparseable values.")
return True
def summarise(df: pd.DataFrame, *, by, value: str, agg: str) -> pd.DataFrame:
if agg not in {"sum", "mean", "median", "count"}:
raise ValueError("Unsupported aggregation; choose from sum, mean, median, count")
= df.groupby(by, observed=True)[value]
grp = getattr(grp, agg)().reset_index(name=f"{value}_{agg}")
res return res
def export(df: pd.DataFrame, out_dir: str, name: str, formats=("csv", "parquet")):
ensure_dir(out_dir)if "csv" in formats:
= os.path.join(out_dir, f"{name}.csv")
path_csv =False)
df.to_csv(path_csv, indexf"Wrote: {path_csv}")
log(if "parquet" in formats:
= os.path.join(out_dir, f"{name}.parquet")
path_pq =False)
df.to_parquet(path_pq, indexf"Wrote: {path_pq}")
log(
def resolve_cols(df: pd.DataFrame, keys: Union[str, Sequence[str]], cfg: Dict[str, Any]) -> Union[str, list]:
"""
Map requested column name(s) to actual DataFrame columns.
- Case-insensitive match
- Optional alias map via cfg['col_aliases']
- Helpful error with close-name suggestions
"""
if isinstance(keys, (list, tuple)):
= list(keys)
requested else:
= [keys]
requested
= df.columns.tolist()
cols = {c.lower(): c for c in cols}
lower_map
# Build alias map (cfg-specified + common date aliases)
= dict(cfg.get("col_aliases", {}))
aliases if "date_col" in cfg and cfg["date_col"] in cols:
for k in ("Date", "DATE", "date"):
"date_col"])
aliases.setdefault(k, cfg[
= []
resolved for k in requested:
# direct hit
if k in cols:
resolved.append(k)continue
# case-insensitive
if k.lower() in lower_map:
resolved.append(lower_map[k.lower()])continue
# alias map
if k in aliases and aliases[k] in cols:
resolved.append(aliases[k])continue
# suggestions
= difflib.get_close_matches(k, cols, n=3)
sugg raise KeyError(
f"Column '{k}' not found. Available: {cols[:8]}... "
+ (f"Suggestions: {', '.join(sugg)}" if sugg else "")
)return resolved if len(resolved) > 1 else resolved[0]
def summarise(df: pd.DataFrame, *, by: Union[str, Sequence[str], None], value: str, agg, cfg: Dict[str, Any]):
"""
Grouped summary with robust column resolution.
- by: str | list[str] | None
- value: str
- agg: any pandas-valid aggregator (e.g., 'sum', 'mean', ['mean','std'], dict, etc.)
"""
= resolve_cols(df, value, cfg)
value_col if by is None or (isinstance(by, (list, tuple)) and len(by) == 0):
# Ungrouped aggregation
= pd.DataFrame({value_col: df[value_col].agg(agg)})
out = out.T.reset_index().rename(columns={"index": value_col})
out return out
= resolve_cols(df, by, cfg)
by_cols return (
=False, observed=False)[value_col]
df.groupby(by_cols, dropna
.agg(agg)
.reset_index() )
🏗️ The Pipeline
All steps connected with logging and idempotent artefact writes.
def run_pipeline(params: dict):
"Starting pipeline…")
log(= params.copy()
cfg "out_dir"])
ensure_dir(cfg[
# 1) Load
= read_csv_safe(cfg["input_csv"])
raw
# 2) Clean/standardise
= clean_columns(raw)
df = parse_dates(df, cfg["date_col"]) # ensure this creates cfg["date_col"]
df = standardise_units(df)
df
# 3) Validate
validate(df)
# 4) Filter (optional)
= apply_filters(df, cfg)
df_f f"Columns after cleaning/filtering: {list(df_f.columns)}")
log(f"Rows after filtering: {len(df_f):,}")
log(
# 5) Export a cleaned/filtered snapshot for reproducibility
"out_dir"], name="clean_filtered", formats=cfg["export"]["formats"])
export(df_f, cfg[
# 6) Summaries
= {}
outputs for spec in cfg["export"]["summaries"]:
= summarise(df_f, by=spec.get("by"), value=spec["value"], agg=spec["agg"], cfg=cfg)
res "out_dir"], name=spec["name"], formats=cfg["export"]["formats"])
export(res, cfg["name"]] = res
outputs[spec[
"Pipeline complete.")
log(return {"data": df_f, "outputs": outputs}
# Run the pipeline if artifacts aren't present yet
def _ensure_artifacts():
global artifacts
if "artifacts" in globals() and isinstance(artifacts, dict):
return artifacts
if "PARAMS" not in globals():
raise NameError(
"PARAMS is not defined. Define a PARAMS dict first with keys like: "
"input_csv, out_dir, date_col, export={'formats': [...], 'summaries': [...]}, etc."
)= run_pipeline(PARAMS)
artifacts return artifacts
= _ensure_artifacts()
arts
# Show summaries (if any)
= arts.get("outputs", {})
outs if not outs:
print("No summaries were produced. Check PARAMS['export']['summaries'].")
else:
for name, df_out in outs.items():
print(f"\n=== {name} ===")
try:
8))
display(df_out.head(except Exception:
print(df_out.head(8))
👀 Quick Peek at Outputs
for name, df_out in artifacts["outputs"].items():
print(f"\n=== {name} ===")
8)) display(df_out.head(
✅ Lightweight Tests
A few assertions catch common regressions. Add more for your project’s rules.
= artifacts["data"]
df_clean assert (df_clean["amount"] >= 0).all(), "Found negative amounts after cleaning!"
assert df_clean["date"].dtype.kind == 'M', "Date column must be datetime."
for name, df_out in artifacts["outputs"].items():
assert not df_out.empty, f"Summary {name} is empty."
print("All sanity checks passed.")
🧪 Parameterised Runs (Examples)
Change filters/date ranges without touching pipeline internals.
= PARAMS.copy()
params_alt "out_dir"] = "artifacts/5_2_automation_subset"
params_alt["filters"] = {"Nutrient_in": ["Iron", "Vitamin_D"]}
params_alt["date_range"] = {"start": None, "end": None}
params_alt[
= run_pipeline(params_alt)
artifacts_alt print("Done with parameterised run.")
🖥️ (Optional) CLI Script Template
Copy this to a file (e.g., run_pipeline.py
) if you want to run it outside notebooks.
Show script
import json, argparse
from your_module import run_pipeline # import functions from a module/package
def main():
= argparse.ArgumentParser()
ap '--params', type=str, required=True, help='Path to params JSON file')
ap.add_argument(= ap.parse_args()
args with open(args.params) as f:
= json.load(f)
params
run_pipeline(params)
if __name__ == '__main__':
main()
🧩 Exercises
- New Summary: Add a third summary: total
Amount
byID × Nutrient
and export it. - Unit Standardisation: Assume some
Amount
values are grams (g) and others mg; add a conversion step so everything is mg. - Robustness: Add a validation rule that flags days where total
Amount
exceeds a threshold per nutrient. - Performance: If your input grows to millions of rows, refactor
read_csv_safe
to stream in chunks and aggregate incrementally.
✅ Wrap-up
You now have a small but robust automation pipeline: parameterised, validated, and exporting tidy artefacts other notebooks can consume.
Further reading
- Pandas GroupBy: https://pandas.pydata.org/docs/user_guide/groupby.html
- Arrow/Parquet: https://arrow.apache.org/
- tqdm (progress bars): https://tqdm.github.io/