Under the hood: Modelling Data within SQL
# Import modules
import duckdb
import pandas as pd
from datetime import datetime
import uuid
# Establlish connection to warehouse
con = duckdb.connect("warehouse.duckdb")
Add metadata columns
Meta data columns are added to help with future debugging and governance to understand where the data came from in the future
def add_metadata(df, source_name):
batch_id = str(uuid.uuid4())
ingested_at = datetime.utcnow()
df['_ingested_at'] = ingested_at
df['_updated_at'] = ingested_at
df['_batch_id'] = batch_id
df['_source'] = source_name
return df
racing_schedule_df = add_metadata(racing_schedule_df, "f1_api_schedule")
racingquali_race_comparison_df = add_metadata(racingquali_race_comparison_df, "f1_api_quali_race")
Create tables if they don't already exist
Tables are created to store the data, if the tables already exist this command does nothing. It is used as a catchall
con.execute("""
CREATE TABLE IF NOT EXISTS racing_schedule (
RoundNumber INTEGER,
Country VARCHAR,
Location VARCHAR,
OfficialEventName VARCHAR,
EventDate DATE,
EventName VARCHAR,
EventFormat VARCHAR,
Session1 VARCHAR,
Session1Date DATE,
Session1DateUtc TIMESTAMP,
Session2 VARCHAR,
Session2Date DATE,
Session2DateUtc TIMESTAMP,
Session3 VARCHAR,
Session3Date DATE,
Session3DateUtc TIMESTAMP,
Session4 VARCHAR,
Session4Date DATE,
Session4DateUtc TIMESTAMP,
Session5 VARCHAR,
Session5Date DATE,
Session5DateUtc TIMESTAMP,
F1ApiSupport BOOLEAN,
_meta_extract_time TIMESTAMP,
_ingested_at TIMESTAMP,
_updated_at TIMESTAMP,
_batch_id VARCHAR,
_source VARCHAR,
PRIMARY KEY (RoundNumber)
);
""")
con.execute("""
CREATE TABLE IF NOT EXISTS racingquali_race_comparison_schedule (
race_name VARCHAR,
round_number INTEGER,
race_type VARCHAR,
drivers LIST,
quali_position LIST,
race_position LIST,
ClassifiedPosition LIST,
positions_gained LIST,
is_DNF BOOLEAN,
_ingested_at TIMESTAMP,
_updated_at TIMESTAMP,
_batch_id VARCHAR,
_source VARCHAR,
PRIMARY KEY (race_name, round_number)
);
""")
Register dfs and Merge into tables
This is using a type one SCD, if this were to go to production, type two or type three of SCD can be used. For example, if the driver lineups changed more frequenctly this cloud be a type two SCD.
# Register
con.register("staging_racing_schedule", racing_schedule_df)
con.register("staging_racingquali_race_comparison", racingquali_race_comparison_df)
# Merge racing_schedule
con.execute("""
MERGE INTO racing_schedule AS tgt
USING staging_racing_schedule AS src
ON tgt.RoundNumber = src.RoundNumber
WHEN MATCHED THEN UPDATE SET
Country = src.Country,
Location = src.Location,
OfficialEventName = src.OfficialEventName,
EventDate = src.EventDate,
EventName = src.EventName,
EventFormat = src.EventFormat,
Session1 = src.Session1,
Session1Date = src.Session1Date,
Session1DateUtc = src.Session1DateUtc,
Session2 = src.Session2,
Session2Date = src.Session2Date,
Session2DateUtc = src.Session2DateUtc,
Session3 = src.Session3,
Session3Date = src.Session3Date,
Session3DateUtc = src.Session3DateUtc,
Session4 = src.Session4,
Session4Date = src.Session4Date,
Session4DateUtc = src.Session4DateUtc,
Session5 = src.Session5,
Session5Date = src.Session5Date,
Session5DateUtc = src.Session5DateUtc,
F1ApiSupport = src.F1ApiSupport,
_meta_extract_time = src._meta_extract_time,
_updated_at = src._updated_at,
_batch_id = src._batch_id,
_source = src._source
WHEN NOT MATCHED THEN INSERT (
RoundNumber, Country, Location, OfficialEventName, EventDate, EventName,
EventFormat, Session1, Session1Date, Session1DateUtc,
Session2, Session2Date, Session2DateUtc,
Session3, Session3Date, Session3DateUtc,
Session4, Session4Date, Session4DateUtc,
Session5, Session5Date, Session5DateUtc,
F1ApiSupport, _meta_extract_time, _ingested_at, _updated_at, _batch_id, _source
)
VALUES (
src.RoundNumber, src.Country, src.Location, src.OfficialEventName, src.EventDate, src.EventName,
src.EventFormat, src.Session1, src.Session1Date, src.Session1DateUtc,
src.Session2, src.Session2Date, src.Session2DateUtc,
src.Session3, src.Session3Date, src.Session3DateUtc,
src.Session4, src.Session4Date, src.Session4DateUtc,
src.Session5, src.Session5Date, src.Session5DateUtc,
src.F1ApiSupport, src._meta_extract_time, src._ingested_at, src._updated_at, src._batch_id, src._source
);
""")
# Merge racingquali_race_comparison_schedule
con.execute("""
MERGE INTO racingquali_race_comparison_schedule AS tgt
USING staging_racingquali_race_comparison AS src
ON tgt.race_name = src.race_name AND tgt.round_number = src.round_number
WHEN MATCHED THEN UPDATE SET
race_type = src.race_type,
drivers = src.drivers,
quali_position = src.quali_position,
race_position = src.race_position,
ClassifiedPosition = src.ClassifiedPosition,
positions_gained = src.positions_gained,
is_DNF = src.is_DNF,
_updated_at = src._updated_at,
_batch_id = src._batch_id,
_source = src._source
WHEN NOT MATCHED THEN INSERT (
race_name, round_number, race_type, drivers, quali_position, race_position, ClassifiedPosition, positions_gained, is_DNF,
_ingested_at, _updated_at, _batch_id, _source
)
VALUES (
src.race_name, src.round_number, src.race_type, src.drivers, src.quali_position, src.race_position, src.ClassifiedPosition, src.positions_gained, src.is_DNF,
src._ingested_at, src._updated_at, src._batch_id, src._source
);
""")
# Unregister staging
con.unregister("staging_racing_schedule")
con.unregister("staging_racingquali_race_comparison")