Mini Quack Stack: Simplified Insurance Claims Demo
This streamlined demo shows how to use the Mini Quack Stack (DuckDB, Marimo, DBT, Prefect) to build a data pipeline for insurance claims. We’ll focus on creating a clean, modern approach with less code.
Prerequisites
Install the Mini Quack Stack components using Getting Started with Mini Quack
Step 1: Create a Modern Marimo Notebook
Create a file called insurance_explorer.py with Marimo:
uvx marimo edit insurance_explorer.py#!/usr/bin/env python3
import marimo as mo
app = mo.App()
@app.cell
def imports():
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import duckdb
from faker import Faker
import random
from datetime import datetime, timedelta
import uuid
# Set up plotting
plt.style.use('ggplot')
return pandas, pd, plt, sns, duckdb, Faker, random, datetime, timedelta, uuid
@app.cell
def setup_ui():
# Create UI controls
num_claims_slider = mo.ui.slider(500, 5000, value=1000, step=500, label="Number of claims")
return num_claims_slider
@app.cell
def setup_connection():
# Connect to DuckDB
conn = duckdb.connect("insurance.duckdb")
# Create schemas if needed
conn.execute("CREATE SCHEMA IF NOT EXISTS raw")
conn.execute("CREATE SCHEMA IF NOT EXISTS staging")
return conn
@app.cell
def generate_data(num_claims_slider, Faker, random, datetime, timedelta, uuid):
"""Generate synthetic insurance claims data"""
fake = Faker()
Faker.seed(42) # For reproducibility
# Define possible values
claim_types = ['Auto', 'Home', 'Health', 'Life', 'Travel']
claim_status = ['Open', 'Closed', 'Pending', 'Denied', 'Under Review']
regions = ['North', 'South', 'East', 'West', 'Central']
# Generate data
claims = []
today = datetime.today()
for _ in range(num_claims_slider.value):
claim_date = fake.date_between(start_date=today - timedelta(days=365), end_date=today)
has_error = random.random() < 0.15
claim = {
'claim_id': str(uuid.uuid4()),
'customer_id': fake.uuid4(),
'claim_date': claim_date,
'claim_type': random.choice(claim_types),
'claim_amount': round(random.uniform(100, 50000), 2) if random.random() < 0.95 else None,
'claim_status': random.choice(claim_status),
'region': random.choice(regions),
'processing_time_days': random.randint(1, 60),
'has_police_report': random.random() < 0.3,
'error_code': f"ERR-{random.randint(100, 999)}" if has_error else None
}
claims.append(claim)
claims_df = pd.DataFrame(claims)
return claims_df
@app.cell
def load_to_db(claims_df, conn):
"""Load data to DuckDB"""
# Create or replace the raw table
conn.execute("DROP TABLE IF EXISTS raw.claims")
conn.execute("""
CREATE TABLE raw.claims AS
SELECT * FROM claims_df
""")
# Verify data was loaded
count = conn.execute("SELECT COUNT(*) AS count FROM raw.claims").fetchone()[0]
mo.md(f"**Loaded {count} records to database**")
# Return sample
sample = conn.execute("SELECT * FROM raw.claims LIMIT 5").fetchdf()
return sample
@app.cell
def missing_values_analysis(claims_df, pd, plt, sns):
"""Analyze missing values"""
missing_values = claims_df.isnull().sum()
missing_percent = (missing_values / len(claims_df)) * 100
missing_df = pd.DataFrame({
'Missing Values': missing_values,
'Percentage': missing_percent
}).sort_values('Percentage', ascending=False)
# Only show columns with missing values
missing_df = missing_df[missing_df['Missing Values'] > 0]
plt.figure(figsize=(10, 5))
ax = sns.barplot(x=missing_df.index, y='Percentage', data=missing_df)
plt.xticks(rotation=45, ha='right')
plt.title('Percentage of Missing Values by Column')
plt.tight_layout()
return ax.figure
@app.cell
def create_staging_table(conn):
"""Create a cleaned staging table with SQL"""
conn.execute("""
DROP TABLE IF EXISTS staging.clean_claims;
CREATE TABLE staging.clean_claims AS
SELECT
claim_id,
customer_id,
claim_date,
claim_type,
-- Handle NULL claim amounts
COALESCE(claim_amount, 0) AS claim_amount,
-- Standardize claim status
CASE
WHEN claim_status = 'Closed' THEN 'CLOSED'
WHEN claim_status = 'Open' THEN 'OPEN'
WHEN claim_status = 'Pending' THEN 'PENDING'
WHEN claim_status = 'Denied' THEN 'DENIED'
WHEN claim_status = 'Under Review' THEN 'UNDER_REVIEW'
ELSE 'UNKNOWN'
END AS claim_status,
-- Standardize region format
UPPER(region) AS region,
processing_time_days,
has_police_report,
-- Flag records with errors
CASE WHEN error_code IS NOT NULL THEN TRUE ELSE FALSE END AS has_error,
error_code,
-- Calculate claim severity
CASE
WHEN claim_amount > 10000 OR processing_time_days > 30 THEN 'HIGH'
WHEN claim_amount > 5000 OR processing_time_days > 15 OR has_police_report THEN 'MEDIUM'
ELSE 'LOW'
END AS claim_severity
FROM raw.claims
""")
# Show the results of our transformation
clean_claims = conn.execute("SELECT * FROM staging.clean_claims LIMIT 5").fetchdf()
return clean_claims
@app.cell
def dashboard_layout(num_claims_slider, sample, missing_values_analysis, clean_claims):
"""Create a dashboard layout"""
return mo.md("""
# Insurance Claims Explorer
This interactive dashboard allows you to explore synthetic insurance claims data.
Adjust the slider below to generate different amounts of data.
"""), \
mo.hstack([
mo.vstack([
mo.md("## Data Generation"),
num_claims_slider,
mo.md("### Sample Data"),
sample
], widths="400px"),
mo.vstack([
mo.md("## Missing Values Analysis"),
missing_values_analysis
])
]), \
mo.md("## Cleaned Data (Staging Table)"), \
clean_claims
if __name__ == "__main__":
app.run()Step 2: Create a Simple DBT Project
Create a basic DBT project structure:
mkdir -p mini_quack_demo/dbt_project
cd mini_quack_demo/dbt_project
dbt init insurance_claims --skip-profile-setup
cd insurance_claimsCreate a minimal profiles.yml in the project directory:
# profiles.yml
insurance_claims:
target: dev
outputs:
dev:
type: duckdb
path: '../../../insurance.duckdb'
schema: dbt_schemaCreate a source definition in models/sources.yml:
# models/sources.yml
version: 2
sources:
- name: raw
database: main
schema: raw
tables:
- name: claims
description: Raw insurance claims dataCreate a staging model in models/staging/stg_claims.sql:
{{ config(materialized='table', schema='staging') }}
SELECT
claim_id,
customer_id,
claim_date,
claim_type,
COALESCE(claim_amount, 0) AS claim_amount,
CASE
WHEN claim_status = 'Closed' THEN 'CLOSED'
WHEN claim_status = 'Open' THEN 'OPEN'
WHEN claim_status = 'Pending' THEN 'PENDING'
WHEN claim_status = 'Denied' THEN 'DENIED'
WHEN claim_status = 'Under Review' THEN 'UNDER_REVIEW'
ELSE 'UNKNOWN'
END AS claim_status,
UPPER(region) AS region,
processing_time_days,
has_police_report,
CASE WHEN error_code IS NOT NULL THEN TRUE ELSE FALSE END AS has_error,
error_code
FROM {{ source('raw', 'claims') }}Create a macro in macros/calculate_severity.sql:
{% macro calculate_severity(claim_amount, processing_time_days, has_police_report) %}
CASE
WHEN {{ claim_amount }} > 10000 OR {{ processing_time_days }} > 30 THEN 'HIGH'
WHEN {{ claim_amount }} > 5000 OR {{ processing_time_days }} > 15 OR {{ has_police_report }} THEN 'MEDIUM'
ELSE 'LOW'
END
{% endmacro %}Create a final model in models/mart/enriched_claims.sql:
{{ config(materialized='table', schema='mart') }}
SELECT
c.claim_id,
c.customer_id,
c.claim_date,
EXTRACT(YEAR FROM c.claim_date) AS claim_year,
EXTRACT(MONTH FROM c.claim_date) AS claim_month,
c.claim_type,
c.claim_amount,
c.claim_status,
c.region,
c.processing_time_days,
c.has_police_report,
c.has_error,
c.error_code,
-- Calculate claim severity
{{ calculate_severity('c.claim_amount', 'c.processing_time_days', 'c.has_police_report') }} AS claim_severity,
-- Calculate claim age
DATEDIFF('day', c.claim_date, CURRENT_DATE) AS claim_age_days,
-- Is this a fast claim?
CASE WHEN c.processing_time_days <= 7 THEN TRUE ELSE FALSE END AS is_fast_claim
FROM {{ ref('stg_claims') }} cStep 3: Create a Simple Prefect Flow
Create a file called simple_pipeline.py:
import duckdb
import pandas as pd
from faker import Faker
import random
from datetime import datetime, timedelta
import uuid
import os
from prefect import flow, task
# Database path
DB_PATH = "insurance.duckdb"
# DBT project path
DBT_PATH = os.path.join(os.getcwd(), "dbt_project/insurance_claims")
@task
def generate_claims(num_claims=1000):
"""Generate synthetic insurance claims data"""
print(f"Generating {num_claims} synthetic claims...")
fake = Faker()
claims = []
today = datetime.today()
claim_types = ['Auto', 'Home', 'Health', 'Life', 'Travel']
claim_status = ['Open', 'Closed', 'Pending', 'Denied', 'Under Review']
regions = ['North', 'South', 'East', 'West', 'Central']
for _ in range(num_claims):
claim_date = fake.date_between(start_date=today - timedelta(days=365), end_date=today)
claim = {
'claim_id': str(uuid.uuid4()),
'customer_id': fake.uuid4(),
'claim_date': claim_date,
'claim_type': random.choice(claim_types),
'claim_amount': round(random.uniform(100, 50000), 2) if random.random() < 0.95 else None,
'claim_status': random.choice(claim_status),
'region': random.choice(regions),
'processing_time_days': random.randint(1, 60),
'has_police_report': random.random() < 0.3,
'error_code': f"ERR-{random.randint(100, 999)}" if random.random() < 0.15 else None
}
claims.append(claim)
return pd.DataFrame(claims)
@task
def load_to_duckdb(claims_df):
"""Load claims data to DuckDB"""
print("Loading data to DuckDB...")
conn = duckdb.connect(DB_PATH)
conn.execute("CREATE SCHEMA IF NOT EXISTS raw")
# Create or append to table
if conn.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'claims' AND table_schema = 'raw'").fetchone()[0] == 0:
conn.execute("CREATE TABLE raw.claims AS SELECT * FROM claims_df")
else:
conn.execute("INSERT INTO raw.claims SELECT * FROM claims_df")
count = conn.execute("SELECT COUNT(*) FROM raw.claims").fetchone()[0]
conn.close()
return count
@task
def run_dbt():
"""Run DBT models"""
print("Running DBT models...")
os.chdir(DBT_PATH)
os.system("dbt run --profiles-dir .")
return True
@flow(name="Insurance Claims Pipeline")
def insurance_pipeline(num_claims=1000):
"""Main pipeline flow"""
# Generate claims data
claims_df = generate_claims(num_claims)
# Load to DuckDB
record_count = load_to_duckdb(claims_df)
print(f"Loaded {record_count} total records")
# Transform with DBT
dbt_success = run_dbt()
print(f"DBT run completed: {dbt_success}")
return {"records": record_count, "dbt_success": dbt_success}
if __name__ == "__main__":
insurance_pipeline()Step 4: Schedule with Prefect
Create a scheduling script called schedule_pipeline.py:
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import IntervalSchedule
from datetime import timedelta
from simple_pipeline import insurance_pipeline
# Create a deployment with a schedule
deployment = Deployment.build_from_flow(
flow=insurance_pipeline,
name="insurance-pipeline-hourly",
schedule=IntervalSchedule(interval=timedelta(hours=1)),
parameters={"num_claims": 500}
)
if __name__ == "__main__":
deployment.apply()
print("Pipeline scheduled to run hourly!")
print("Start the Prefect worker with: prefect worker start -p process")Running the Pipeline
- Start the Prefect server:
prefect server start- In a new terminal, apply the deployment:
python schedule_pipeline.py- Start a worker:
prefect worker start -p processChecking Results
duckdb insurance.duckdb
-- Check raw data
SELECT COUNT(*) FROM raw.claims;
-- Check transformed data
SELECT COUNT(*) FROM mart.enriched_claims;
-- Check claim severity distribution
SELECT claim_severity, COUNT(*) as count
FROM mart.enriched_claims
GROUP BY claim_severity
ORDER BY count DESC;The Mini Quack Stack Advantages
- DuckDB: Fast analytics database requiring no setup
- Marimo: Modern, reactive notebooks for exploration and prototyping
- DBT: SQL-first transformations with version control
- Prefect: Simple workflow orchestration
All running locally with zero infrastructure - perfect for data teams of any size!
Next Steps
- Add data quality tests in dbt
- Create Evidence dashboards to visualize the results
- Experiment with different schedules and data volumes
- Add more complex transformations and business logic
All this demonstrates how the Mini Quack Stack can handle real-world data engineering challenges with minimal setup and maximum flexibility.