Updating DB with Deltas¶
In this tutorial we are going to demonstrate how to make a database based on deltas recieved from an external source. We will build a database containing a table of all the raw deltas and then create a second database that shows us the state of the raw table delta at a particular date.
We are going to pretend that we recieve a csv
file that contains changes of a table. We are going to concatenate those deltas into a single table. Then generate a subsequent table based on the "raw" deltas. This latter part we will do twice once using Pandas and once using Athena.
import os
import pandas as pd
import awswrangler as wr
import datetime
import pydbtools as pydb
from scripts.create_dummy_deltas import get_dummy_deltas
Setup first¶
# setup your own testing area (set foldername = GH username)
foldername = "mratford" # GH username
foldername = foldername.lower().replace("-", "_")
region = "eu-west-1"
bucketname = "alpha-everyone"
db_name = f"aws_example_{foldername}"
db_base_path = f"s3://{bucketname}/{foldername}/database"
s3_base_path = f"s3://{bucketname}/{foldername}/"
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(limit=1000)
if db_name in df_dbs["Database"].to_list():
print(f"deleting database {db_name}")
wr.catalog.delete_database(name=db_name)
Get the deltas¶
We are going to create deltas from the "data/employees.csv
table. I am using code in a script in this repo scripts/create_dummy_deltas.py
. It isn't important what it is doing for this tutorial but if you wanna see what it does you can.
deltas = get_dummy_deltas("data/employees.csv")
Day 1: the first extract of deltas from our databases
deltas["day1"]
employee_id | sex | forename | surname | department_id | manager_id | record_deleted | |
---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1 | 17 | False |
1 | 2 | F | Summer | Bennett | 1 | 17 | False |
2 | 3 | M | Pip | Carter | 1 | 17 | False |
3 | 4 | F | Bella | Long | 1 | 17 | False |
4 | 5 | F | Lexie | Perry | <NA> | 17 | False |
Day 2: The next days deltas show that Lexie has their department_id
and manager_id
corrected. As well 2 new employees.
deltas["day2"]
employee_id | sex | forename | surname | department_id | manager_id | record_deleted | |
---|---|---|---|---|---|---|---|
0 | 5 | F | Lexie | Perry | 2 | 18 | False |
1 | 6 | M | Robert | Roberts | 1 | 17 | False |
2 | 7 | F | Iris | Alexander | 1 | 17 | False |
Day 3: The next days deltas show that:
- Dexter has left the department
- Robert and Iris have moved departments and are working for Lexie
- 3 New employees are also now working for Lexie
deltas["day3"]
employee_id | sex | forename | surname | department_id | manager_id | record_deleted | |
---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1 | 17 | True |
1 | 7 | F | Iris | Alexander | 2 | 5 | False |
2 | 9 | M | Evan | Carter | 2 | 5 | False |
3 | 10 | F | Lauren | Powell | 2 | 5 | False |
4 | 11 | F | Alice | James | 2 | 5 | False |
Create a database and tables¶
There are many ways you can create a database and tables (see other tutorials). For this example I am going to do this in the simplest way - using awswrangler (which infers the table schema from the data).
If you want to explicitly specify the schema look at the
data_conformance_and_dbs
tutorial.
# Init database and delta table
wr.catalog.create_database(name=db_name)
# Add some parameters that will be useful to manage our deltas
df = deltas["day1"]
df["date_received"] = datetime.date(2021, 1, 1)
# We are going to name the folder the same as our table
# this makes things less complex and is adviced
table_name = "raw_deltas"
raw_delta_path = os.path.join(db_base_path, table_name)
_ = wr.s3.to_parquet(
df,
path=raw_delta_path,
dataset=True,
database=db_name,
table=table_name,
mode="append",
)
sql = f"SELECT * FROM {db_name}.{table_name}"
print(sql)
pydb.read_sql_query(sql, ctas_approach=False)
SELECT * FROM aws_example_mratford.raw_deltas
employee_id | sex | forename | surname | department_id | manager_id | record_deleted | date_received | |
---|---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1 | 17 | False | 2021-01-01 |
1 | 2 | F | Summer | Bennett | 1 | 17 | False | 2021-01-01 |
2 | 3 | M | Pip | Carter | 1 | 17 | False | 2021-01-01 |
3 | 4 | F | Bella | Long | 1 | 17 | False | 2021-01-01 |
4 | 5 | F | Lexie | Perry | <NA> | 17 | False | 2021-01-01 |
Take stock¶
We now have a database that we created once and we initialised our raw_deltas
table in our database.
Now we are going to create two tables (that will have the same table). Both these tables will show what our raw_deltas will look like at each day we do an update (the first table will be created using pandas the other using Athena).
We are also going to wrap these code chunks into functions. This will help us utilise these functions later to show how you can run a delta update and then the downstream tables
Pandas derived table¶
We are going to read in all the data in our s3 path and create a new df from that then write it to a new table.
def create_report_pandas(report_date):
# Read in data with pandas
df = wr.s3.read_parquet(raw_delta_path)
# Turn input into a date
d = datetime.datetime.strptime(report_date, "%Y-%m-%d").date()
# filter records after specified date
df = df[df.date_received <= d].reset_index(drop=True)
# Get the latest records per employee_id
latest_df = (
df.sort_values(
["employee_id", "date_received"], ascending=[True, False]
)
.groupby("employee_id")
.head(1)
)
# Remove deleted records
latest_df = latest_df[~latest_df["record_deleted"]].reset_index(drop=True)
latest_df["report_date"] = d
# Remove date_received col
latest_df = latest_df.drop(columns=["date_received", "record_deleted"])
# Write dataframe to new table partitioned by report_date
table_name = "employee_pandas"
table_path = os.path.join(db_base_path, table_name)
# Write the data to S3 but only overwrite partitions
_ = wr.s3.to_parquet(
latest_df,
path=table_path,
dataset=True,
database=db_name,
table=table_name,
partition_cols=["report_date"],
mode="overwrite_partitions",
)
# Run code to create report for 2021-01-01 data
create_report_pandas("2021-01-01")
/home/jovyan/.cache/pypoetry/virtualenvs/pydbtools-VGEYFVdX-py3.9/lib/python3.9/site-packages/awswrangler/s3/_write_dataset.py:92: FutureWarning: In a future version of pandas, a length 1 tuple will be returned when iterating over a groupby with a grouper equal to a list of length 1. Don't supply a list with a single grouper to avoid this warning. for keys, subgroup in df.groupby(by=partition_cols, observed=True):
We have created a report based on all the deltas up to and including 2021-01-01
(which at the moment is only one delta)
sql = f"SELECT * FROM {db_name}.employee_pandas"
print(sql)
pydb.read_sql_query(sql, ctas_approach=False)
SELECT * FROM aws_example_mratford.employee_pandas
employee_id | sex | forename | surname | department_id | manager_id | report_date | |
---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1 | 17 | 2021-01-01 |
1 | 2 | F | Summer | Bennett | 1 | 17 | 2021-01-01 |
2 | 3 | M | Pip | Carter | 1 | 17 | 2021-01-01 |
3 | 4 | F | Bella | Long | 1 | 17 | 2021-01-01 |
4 | 5 | F | Lexie | Perry | <NA> | 17 | 2021-01-01 |
Athena derived table¶
We are going to do the same thing we did with pandas, but this time using Athena. You cannot delete partitions in Athena (because athena just queries your data in S3 using SQL) it doesn't actually alter anything that already exists in S3. But we can predetermine where our tables will sit so we can just delete the S3 path first before writing a partition in there each time.
def create_report_athena(report_date, ctas):
table_path = os.path.join(db_base_path, "employee_athena")
# Clear out the partition we are going to write to
s3_partition_path = os.path.join(table_path, f"report_date={report_date}")
wr.s3.delete_objects(s3_partition_path)
# Actual logic in SQL to create the report
sql = f"""
SELECT employee_id,
sex,
forename,
surname,
department_id,
manager_id,
date '{report_date}' AS report_date
FROM
(
SELECT *,
row_number() OVER (PARTITION BY employee_id ORDER BY date_received DESC) as rn
FROM {db_name}.raw_deltas
WHERE date_received <= date '{report_date}'
)
WHERE NOT record_deleted AND rn = 1
"""
# If ctas is true then create table for the
# first time otherwise use an insert into query
# put the original SQL one of the below
if ctas:
# Creating table for the first time
full_sql = f"""
CREATE TABLE {db_name}.employee_athena
WITH (
external_location = '{table_path}',
partitioned_by = ARRAY['report_date']
) AS
{sql}
"""
else:
full_sql = f"""
INSERT INTO {db_name}.employee_athena
{sql}
"""
# run the query
pydb.start_query_execution_and_wait(full_sql)
# Run code to create report for 2021-01-01 data
create_report_athena("2021-01-01", ctas=True)
sql = f"SELECT * FROM {db_name}.employee_athena"
print(sql)
pydb.read_sql_query(sql, ctas_approach=False)
SELECT * FROM aws_example_mratford.employee_athena
employee_id | sex | forename | surname | department_id | manager_id | report_date | |
---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1 | 17 | 2021-01-01 |
1 | 4 | F | Bella | Long | 1 | 17 | 2021-01-01 |
2 | 3 | M | Pip | Carter | 1 | 17 | 2021-01-01 |
3 | 2 | F | Summer | Bennett | 1 | 17 | 2021-01-01 |
4 | 5 | F | Lexie | Perry | <NA> | 17 | 2021-01-01 |
Final bit¶
Now we have 3 tables.
raw_deltas
a table of all the raw data concatenatedemployee_athena
a report based on what employees table looked like at a givenreport_date
. (Remember in this example the raw_deltas are from an external table employees where we get given daily deltas of changes).employee_pandas
The same report as employee_athena but using pandas instead of athena to create it.
Now we want to update each of these tables based on the data from day2 then do it again for day3s data. Lets do that now (starting with day 2)
Day2¶
Add day2 data to the deltas table
df = deltas["day2"]
df["date_received"] = datetime.date(2021, 1, 2)
_ = wr.s3.to_parquet(
df,
path=raw_delta_path,
dataset=True,
database=db_name,
table=table_name,
mode="append",
)
The run the reports for the same date (now the deltas table has been updated)
create_report_pandas("2021-01-02")
create_report_athena("2021-01-02", ctas=False) # note we use insert to now
/home/jovyan/.cache/pypoetry/virtualenvs/pydbtools-VGEYFVdX-py3.9/lib/python3.9/site-packages/awswrangler/s3/_write_dataset.py:92: FutureWarning: In a future version of pandas, a length 1 tuple will be returned when iterating over a groupby with a grouper equal to a list of length 1. Don't supply a list with a single grouper to avoid this warning. for keys, subgroup in df.groupby(by=partition_cols, observed=True):
sql = f"""
SELECT *
FROM {db_name}.employee_athena
WHERE report_date = date '2021-01-02'
"""
print(sql)
pydb.read_sql_query(sql, ctas_approach=False)
SELECT * FROM aws_example_mratford.employee_athena WHERE report_date = date '2021-01-02'
employee_id | sex | forename | surname | department_id | manager_id | report_date | |
---|---|---|---|---|---|---|---|
0 | 5 | F | Lexie | Perry | 2 | 18 | 2021-01-02 |
1 | 7 | F | Iris | Alexander | 1 | 17 | 2021-01-02 |
2 | 2 | F | Summer | Bennett | 1 | 17 | 2021-01-02 |
3 | 4 | F | Bella | Long | 1 | 17 | 2021-01-02 |
4 | 6 | M | Robert | Roberts | 1 | 17 | 2021-01-02 |
5 | 1 | M | Dexter | Mitchell | 1 | 17 | 2021-01-02 |
6 | 3 | M | Pip | Carter | 1 | 17 | 2021-01-02 |
As we can see new employyes have been added and Lexie's department and manager records have been updated as expected.
It is also worth noting that previous reports have been untouched (using the pandas table as an example)
sql = f"""
SELECT *
FROM {db_name}.employee_pandas
"""
print(sql)
pydb.read_sql_query(sql, ctas_approach=False)
SELECT * FROM aws_example_mratford.employee_pandas
employee_id | sex | forename | surname | department_id | manager_id | report_date | |
---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1 | 17 | 2021-01-02 |
1 | 2 | F | Summer | Bennett | 1 | 17 | 2021-01-02 |
2 | 3 | M | Pip | Carter | 1 | 17 | 2021-01-02 |
3 | 4 | F | Bella | Long | 1 | 17 | 2021-01-02 |
4 | 5 | F | Lexie | Perry | 2 | 18 | 2021-01-02 |
5 | 6 | M | Robert | Roberts | 1 | 17 | 2021-01-02 |
6 | 7 | F | Iris | Alexander | 1 | 17 | 2021-01-02 |
7 | 1 | M | Dexter | Mitchell | 1 | 17 | 2021-01-01 |
8 | 2 | F | Summer | Bennett | 1 | 17 | 2021-01-01 |
9 | 3 | M | Pip | Carter | 1 | 17 | 2021-01-01 |
10 | 4 | F | Bella | Long | 1 | 17 | 2021-01-01 |
11 | 5 | F | Lexie | Perry | <NA> | 17 | 2021-01-01 |
Day 3¶
Lets run the same again for day 3. The code is exactly the same as it was for day2 but now with a new date
# update raw deltas first
df = deltas["day3"]
df["date_received"] = datetime.date(2021, 1, 3)
_ = wr.s3.to_parquet(
df,
path=raw_delta_path,
dataset=True,
database=db_name,
table=table_name,
mode="append",
)
# Then run reports
create_report_pandas("2021-01-03")
create_report_athena("2021-01-03", ctas=False)
/home/jovyan/.cache/pypoetry/virtualenvs/pydbtools-VGEYFVdX-py3.9/lib/python3.9/site-packages/awswrangler/s3/_write_dataset.py:92: FutureWarning: In a future version of pandas, a length 1 tuple will be returned when iterating over a groupby with a grouper equal to a list of length 1. Don't supply a list with a single grouper to avoid this warning. for keys, subgroup in df.groupby(by=partition_cols, observed=True):
sql = f"""
SELECT *
FROM {db_name}.employee_pandas
WHERE report_date = date '2021-01-03'
"""
print(sql)
pydb.read_sql_query(sql, ctas_approach=False)
SELECT * FROM aws_example_mratford.employee_pandas WHERE report_date = date '2021-01-03'
employee_id | sex | forename | surname | department_id | manager_id | report_date | |
---|---|---|---|---|---|---|---|
0 | 2 | F | Summer | Bennett | 1 | 17 | 2021-01-03 |
1 | 3 | M | Pip | Carter | 1 | 17 | 2021-01-03 |
2 | 4 | F | Bella | Long | 1 | 17 | 2021-01-03 |
3 | 5 | F | Lexie | Perry | 2 | 18 | 2021-01-03 |
4 | 6 | M | Robert | Roberts | 1 | 17 | 2021-01-03 |
5 | 7 | F | Iris | Alexander | 2 | 5 | 2021-01-03 |
6 | 9 | M | Evan | Carter | 2 | 5 | 2021-01-03 |
7 | 10 | F | Lauren | Powell | 2 | 5 | 2021-01-03 |
8 | 11 | F | Alice | James | 2 | 5 | 2021-01-03 |
From the above we can see that Dexter has been removed from the report (as he left) and new staff have been added. Again as expected when looking at our original deltas.
Wrapping Up¶
So hopefully that is useful. Some further notes and thoughts
Using pandas vs Athena for reports¶
Just use what you feel is most confortable for your team they each have pros and cons
Partitioning¶
Normally you want to partition your data to reduce query size with the report above the SQL query will only look in a specific S3 path as you filtered the table on a partition column. However, partitioning on small chunks of data (like we have above) can actually reduce performance of your athena query. The upside is that it makes it easier to know where your data sits in S3 (i.e. look how we predetermine the s3 path for a partition in a table in the create_report_athena
function).
The above example didn't actually need partitioning, you could just append the data each time (like what is done for the raw_deltas
table). The question to ask is how much you think it will effect performance / how difficult will it be to return the data to a previous state if you made a mistake. I.e. I can delete a partition very easily and know I am not going to delete any other data but that is less clear when appending data to the same folder. On the flip side it is trivial to delete everything in AWS for this tutorial and run it from scratch as the data and transforms are minimal so rolling back changes to a previous state wouldn't be that hard to do. Ultimately the decision is up to you.
### Clean up
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(limit=1000)
if db_name in df_dbs["Database"].to_list():
print("Deleting database")
wr.catalog.delete_database(name=db_name)
deleting objs Deleting database