Mini Quack Stack: Simplified Insurance Claims Demo

This streamlined demo shows how to use the Mini Quack Stack (DuckDB, Marimo, DBT, Prefect) to build a data pipeline for insurance claims. We’ll focus on creating a clean, modern approach with less code.

Prerequisites

Install the Mini Quack Stack components using Getting Started with Mini Quack

Step 1: Create a Modern Marimo Notebook

Create a file called insurance_explorer.py with Marimo:

uvx marimo edit insurance_explorer.py
#!/usr/bin/env python3
import marimo as mo
app = mo.App()
 
@app.cell
def imports():
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import duckdb
    from faker import Faker
    import random
    from datetime import datetime, timedelta
    import uuid
    
    # Set up plotting
    plt.style.use('ggplot')
    return pandas, pd, plt, sns, duckdb, Faker, random, datetime, timedelta, uuid
 
@app.cell
def setup_ui():
    # Create UI controls
    num_claims_slider = mo.ui.slider(500, 5000, value=1000, step=500, label="Number of claims")
    return num_claims_slider
 
@app.cell
def setup_connection():
    # Connect to DuckDB
    conn = duckdb.connect("insurance.duckdb")
    
    # Create schemas if needed
    conn.execute("CREATE SCHEMA IF NOT EXISTS raw")
    conn.execute("CREATE SCHEMA IF NOT EXISTS staging")
    return conn
 
@app.cell
def generate_data(num_claims_slider, Faker, random, datetime, timedelta, uuid):
    """Generate synthetic insurance claims data"""
    
    fake = Faker()
    Faker.seed(42)  # For reproducibility
    
    # Define possible values
    claim_types = ['Auto', 'Home', 'Health', 'Life', 'Travel']
    claim_status = ['Open', 'Closed', 'Pending', 'Denied', 'Under Review']
    regions = ['North', 'South', 'East', 'West', 'Central']
    
    # Generate data
    claims = []
    today = datetime.today()
    
    for _ in range(num_claims_slider.value):
        claim_date = fake.date_between(start_date=today - timedelta(days=365), end_date=today)
        has_error = random.random() < 0.15
        
        claim = {
            'claim_id': str(uuid.uuid4()),
            'customer_id': fake.uuid4(),
            'claim_date': claim_date,
            'claim_type': random.choice(claim_types),
            'claim_amount': round(random.uniform(100, 50000), 2) if random.random() < 0.95 else None,
            'claim_status': random.choice(claim_status),
            'region': random.choice(regions),
            'processing_time_days': random.randint(1, 60),
            'has_police_report': random.random() < 0.3,
            'error_code': f"ERR-{random.randint(100, 999)}" if has_error else None
        }
        claims.append(claim)
    
    claims_df = pd.DataFrame(claims)
    return claims_df
 
@app.cell
def load_to_db(claims_df, conn):
    """Load data to DuckDB"""
    
    # Create or replace the raw table
    conn.execute("DROP TABLE IF EXISTS raw.claims")
    conn.execute("""
    CREATE TABLE raw.claims AS 
    SELECT * FROM claims_df
    """)
    
    # Verify data was loaded
    count = conn.execute("SELECT COUNT(*) AS count FROM raw.claims").fetchone()[0]
    mo.md(f"**Loaded {count} records to database**")
    
    # Return sample
    sample = conn.execute("SELECT * FROM raw.claims LIMIT 5").fetchdf()
    return sample
 
@app.cell
def missing_values_analysis(claims_df, pd, plt, sns):
    """Analyze missing values"""
    
    missing_values = claims_df.isnull().sum()
    missing_percent = (missing_values / len(claims_df)) * 100
    
    missing_df = pd.DataFrame({
        'Missing Values': missing_values,
        'Percentage': missing_percent
    }).sort_values('Percentage', ascending=False)
    
    # Only show columns with missing values
    missing_df = missing_df[missing_df['Missing Values'] > 0]
    
    plt.figure(figsize=(10, 5))
    ax = sns.barplot(x=missing_df.index, y='Percentage', data=missing_df)
    plt.xticks(rotation=45, ha='right')
    plt.title('Percentage of Missing Values by Column')
    plt.tight_layout()
    
    return ax.figure
 
@app.cell
def create_staging_table(conn):
    """Create a cleaned staging table with SQL"""
    
    conn.execute("""
    DROP TABLE IF EXISTS staging.clean_claims;
    
    CREATE TABLE staging.clean_claims AS
    SELECT
        claim_id,
        customer_id,
        claim_date,
        claim_type,
        -- Handle NULL claim amounts
        COALESCE(claim_amount, 0) AS claim_amount,
        -- Standardize claim status
        CASE 
            WHEN claim_status = 'Closed' THEN 'CLOSED'
            WHEN claim_status = 'Open' THEN 'OPEN'
            WHEN claim_status = 'Pending' THEN 'PENDING'
            WHEN claim_status = 'Denied' THEN 'DENIED'
            WHEN claim_status = 'Under Review' THEN 'UNDER_REVIEW'
            ELSE 'UNKNOWN'
        END AS claim_status,
        -- Standardize region format
        UPPER(region) AS region,
        processing_time_days,
        has_police_report,
        -- Flag records with errors
        CASE WHEN error_code IS NOT NULL THEN TRUE ELSE FALSE END AS has_error,
        error_code,
        -- Calculate claim severity
        CASE
            WHEN claim_amount > 10000 OR processing_time_days > 30 THEN 'HIGH'
            WHEN claim_amount > 5000 OR processing_time_days > 15 OR has_police_report THEN 'MEDIUM'
            ELSE 'LOW'
        END AS claim_severity
    FROM raw.claims
    """)
    
    # Show the results of our transformation
    clean_claims = conn.execute("SELECT * FROM staging.clean_claims LIMIT 5").fetchdf()
    return clean_claims
 
@app.cell
def dashboard_layout(num_claims_slider, sample, missing_values_analysis, clean_claims):
    """Create a dashboard layout"""
    
    return mo.md("""
    # Insurance Claims Explorer
    
    This interactive dashboard allows you to explore synthetic insurance claims data.
    Adjust the slider below to generate different amounts of data.
    """), \
    mo.hstack([
        mo.vstack([
            mo.md("## Data Generation"),
            num_claims_slider,
            mo.md("### Sample Data"),
            sample
        ], widths="400px"),
        mo.vstack([
            mo.md("## Missing Values Analysis"),
            missing_values_analysis
        ])
    ]), \
    mo.md("## Cleaned Data (Staging Table)"), \
    clean_claims
 
if __name__ == "__main__":
    app.run()

Step 2: Create a Simple DBT Project

Create a basic DBT project structure:

mkdir -p mini_quack_demo/dbt_project
cd mini_quack_demo/dbt_project
dbt init insurance_claims --skip-profile-setup
cd insurance_claims

Create a minimal profiles.yml in the project directory:

# profiles.yml
insurance_claims:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: '../../../insurance.duckdb'
      schema: dbt_schema

Create a source definition in models/sources.yml:

# models/sources.yml
version: 2
 
sources:
  - name: raw
    database: main
    schema: raw
    tables:
      - name: claims
        description: Raw insurance claims data

Create a staging model in models/staging/stg_claims.sql:

{{ config(materialized='table', schema='staging') }}
 
SELECT
    claim_id,
    customer_id,
    claim_date,
    claim_type,
    COALESCE(claim_amount, 0) AS claim_amount,
    CASE 
        WHEN claim_status = 'Closed' THEN 'CLOSED'
        WHEN claim_status = 'Open' THEN 'OPEN'
        WHEN claim_status = 'Pending' THEN 'PENDING'
        WHEN claim_status = 'Denied' THEN 'DENIED'
        WHEN claim_status = 'Under Review' THEN 'UNDER_REVIEW'
        ELSE 'UNKNOWN'
    END AS claim_status,
    UPPER(region) AS region,
    processing_time_days,
    has_police_report,
    CASE WHEN error_code IS NOT NULL THEN TRUE ELSE FALSE END AS has_error,
    error_code
FROM {{ source('raw', 'claims') }}

Create a macro in macros/calculate_severity.sql:

{% macro calculate_severity(claim_amount, processing_time_days, has_police_report) %}
    CASE
        WHEN {{ claim_amount }} > 10000 OR {{ processing_time_days }} > 30 THEN 'HIGH'
        WHEN {{ claim_amount }} > 5000 OR {{ processing_time_days }} > 15 OR {{ has_police_report }} THEN 'MEDIUM'
        ELSE 'LOW'
    END
{% endmacro %}

Create a final model in models/mart/enriched_claims.sql:

{{ config(materialized='table', schema='mart') }}
 
SELECT
    c.claim_id,
    c.customer_id,
    c.claim_date,
    EXTRACT(YEAR FROM c.claim_date) AS claim_year,
    EXTRACT(MONTH FROM c.claim_date) AS claim_month,
    c.claim_type,
    c.claim_amount,
    c.claim_status,
    c.region,
    c.processing_time_days,
    c.has_police_report,
    c.has_error,
    c.error_code,
    -- Calculate claim severity
    {{ calculate_severity('c.claim_amount', 'c.processing_time_days', 'c.has_police_report') }} AS claim_severity,
    -- Calculate claim age
    DATEDIFF('day', c.claim_date, CURRENT_DATE) AS claim_age_days,
    -- Is this a fast claim?
    CASE WHEN c.processing_time_days <= 7 THEN TRUE ELSE FALSE END AS is_fast_claim
FROM {{ ref('stg_claims') }} c

Step 3: Create a Simple Prefect Flow

Create a file called simple_pipeline.py:

import duckdb
import pandas as pd
from faker import Faker
import random
from datetime import datetime, timedelta
import uuid
import os
from prefect import flow, task
 
# Database path
DB_PATH = "insurance.duckdb"
# DBT project path
DBT_PATH = os.path.join(os.getcwd(), "dbt_project/insurance_claims")
 
@task
def generate_claims(num_claims=1000):
    """Generate synthetic insurance claims data"""
    print(f"Generating {num_claims} synthetic claims...")
    
    fake = Faker()
    claims = []
    today = datetime.today()
    
    claim_types = ['Auto', 'Home', 'Health', 'Life', 'Travel']
    claim_status = ['Open', 'Closed', 'Pending', 'Denied', 'Under Review']
    regions = ['North', 'South', 'East', 'West', 'Central']
    
    for _ in range(num_claims):
        claim_date = fake.date_between(start_date=today - timedelta(days=365), end_date=today)
        
        claim = {
            'claim_id': str(uuid.uuid4()),
            'customer_id': fake.uuid4(),
            'claim_date': claim_date,
            'claim_type': random.choice(claim_types),
            'claim_amount': round(random.uniform(100, 50000), 2) if random.random() < 0.95 else None,
            'claim_status': random.choice(claim_status),
            'region': random.choice(regions),
            'processing_time_days': random.randint(1, 60),
            'has_police_report': random.random() < 0.3,
            'error_code': f"ERR-{random.randint(100, 999)}" if random.random() < 0.15 else None
        }
        claims.append(claim)
    
    return pd.DataFrame(claims)
 
@task
def load_to_duckdb(claims_df):
    """Load claims data to DuckDB"""
    print("Loading data to DuckDB...")
    
    conn = duckdb.connect(DB_PATH)
    conn.execute("CREATE SCHEMA IF NOT EXISTS raw")
    
    # Create or append to table
    if conn.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'claims' AND table_schema = 'raw'").fetchone()[0] == 0:
        conn.execute("CREATE TABLE raw.claims AS SELECT * FROM claims_df")
    else:
        conn.execute("INSERT INTO raw.claims SELECT * FROM claims_df")
    
    count = conn.execute("SELECT COUNT(*) FROM raw.claims").fetchone()[0]
    conn.close()
    
    return count
 
@task
def run_dbt():
    """Run DBT models"""
    print("Running DBT models...")
    
    os.chdir(DBT_PATH)
    os.system("dbt run --profiles-dir .")
    
    return True
 
@flow(name="Insurance Claims Pipeline")
def insurance_pipeline(num_claims=1000):
    """Main pipeline flow"""
    # Generate claims data
    claims_df = generate_claims(num_claims)
    
    # Load to DuckDB
    record_count = load_to_duckdb(claims_df)
    print(f"Loaded {record_count} total records")
    
    # Transform with DBT
    dbt_success = run_dbt()
    print(f"DBT run completed: {dbt_success}")
    
    return {"records": record_count, "dbt_success": dbt_success}
 
if __name__ == "__main__":
    insurance_pipeline()

Step 4: Schedule with Prefect

Create a scheduling script called schedule_pipeline.py:

from prefect.deployments import Deployment
from prefect.server.schemas.schedules import IntervalSchedule
from datetime import timedelta
from simple_pipeline import insurance_pipeline
 
# Create a deployment with a schedule
deployment = Deployment.build_from_flow(
    flow=insurance_pipeline,
    name="insurance-pipeline-hourly",
    schedule=IntervalSchedule(interval=timedelta(hours=1)),
    parameters={"num_claims": 500}
)
 
if __name__ == "__main__":
    deployment.apply()
    print("Pipeline scheduled to run hourly!")
    print("Start the Prefect worker with: prefect worker start -p process")

Running the Pipeline

  1. Start the Prefect server:
prefect server start
  1. In a new terminal, apply the deployment:
python schedule_pipeline.py
  1. Start a worker:
prefect worker start -p process

Checking Results

duckdb insurance.duckdb
 
-- Check raw data
SELECT COUNT(*) FROM raw.claims;
 
-- Check transformed data
SELECT COUNT(*) FROM mart.enriched_claims;
 
-- Check claim severity distribution
SELECT claim_severity, COUNT(*) as count 
FROM mart.enriched_claims 
GROUP BY claim_severity
ORDER BY count DESC;

The Mini Quack Stack Advantages

  • DuckDB: Fast analytics database requiring no setup
  • Marimo: Modern, reactive notebooks for exploration and prototyping
  • DBT: SQL-first transformations with version control
  • Prefect: Simple workflow orchestration

All running locally with zero infrastructure - perfect for data teams of any size!

Next Steps

  • Add data quality tests in dbt
  • Create Evidence dashboards to visualize the results
  • Experiment with different schedules and data volumes
  • Add more complex transformations and business logic

All this demonstrates how the Mini Quack Stack can handle real-world data engineering challenges with minimal setup and maximum flexibility.