Creating and Maintaining Database Tables in Athena¶
In this tutorial we are going to use Athena SQL queries (via pydbtools) to create a new database from and existing databases in Athena.
First we need to create a database of tables to act as our existing database. But then we will create a new database that holds tables that are derived from the original.
Our source database will have the test data employees.csv
, sales.csv
and department.csv
(all in the data/
folder)
Useful links:
Setup¶
Just run this script to create the source database so we can use it for our example.
import os
import pandas as pd
import awswrangler as wr
import pydbtools as pydb
# setup your own testing area (set foldername = GH username)
foldername = "mratford" # GH username
foldername = foldername.lower().replace("-", "_")
bucketname = "alpha-everyone"
s3_base_path = f"s3://{bucketname}/{foldername}/"
source_db_name = f"source_db_{foldername}"
source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/"
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if source_db_name in df_dbs["Database"].to_list():
print(f"{source_db_name} found deleting")
wr.catalog.delete_database(name=source_db_name)
# Setup source database
# Create the database
wr.catalog.create_database(source_db_name)
# Iterate through the tables in data/ and write them to our db using awswrangler
for table_name in ["department", "employees", "sales"]:
df = pd.read_csv(f"data/{table_name}.csv")
table_path = os.path.join(source_db_base_path, f"{table_name}/")
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True, # True allows the other params below i.e. overwriting to db.table
database=source_db_name,
table=table_name,
mode="overwrite",
)
deleting objs source_db_mratford found deleting
Now for the actual tutorial¶
We are going to run all of our queries using SQL. You may have seen that I've used awswrangler to create the database above (which is fine to do). However this part of the tutorial will all be in SQL so you can run this from anything (i.e. R, Athena workbench, anything that can send queries to Athena).
Step 1. create the new database¶
We are going to create a new database which will generate derived tables from our source database. For ease we are going to create the database location in the same parent folder as our source database. However, in reality you probably want to create your own bucket for the database and tables to sit in so that you can control who has access to your database.
Note: We use a lot of f-strings here to parameterise our SQL queries so for ease and understanding we are going to print out each SQL query each time just so you can see what you are actually running on athena.
new_db_name = f"new_db_{foldername}"
new_db_base_path = f"s3://{bucketname}/{foldername}/new_db/"
sql = f"""
CREATE DATABASE IF NOT EXISTS {new_db_name}
COMMENT 'example or running queries and insert to'
LOCATION '{new_db_base_path}'
"""
print(sql)
_ = pydb.start_query_execution_and_wait(sql)
CREATE DATABASE IF NOT EXISTS new_db_mratford COMMENT 'example or running queries and insert to' LOCATION 's3://alpha-everyone/mratford/new_db/'
Step 2. Run a CTAS query to create your new derived table in your new database¶
We use a CTAS query as it both generates the output data into S3 but also creates the schema of the new table
# Note our table s3 path is saved in the following format:
# s3://<bucket>/<path to database folder>/<table_name>/
# You don't have to do this but it is strongly recommended to make it easier
# to map your schemas to your data.
sales_report_s3_path = os.path.join(new_db_base_path, "sales_report/")
sql = f"""
CREATE TABLE {new_db_name}.sales_report WITH
(
external_location='{sales_report_s3_path}'
) AS
SELECT qtr as sales_quarter, sum(sales) AS total_sales
FROM {source_db_name}.sales
WHERE qtr IN (1,2)
GROUP BY qtr
"""
print(sql)
_ = pydb.start_query_execution_and_wait(sql)
CREATE TABLE new_db_mratford.sales_report WITH ( external_location='s3://alpha-everyone/mratford/new_db/sales_report/' ) AS SELECT qtr as sales_quarter, sum(sales) AS total_sales FROM source_db_mratford.sales WHERE qtr IN (1,2) GROUP BY qtr
Step 3. Use An insert into query to add new data into that table¶
We use an Insert INTO query here as we already created the schema in the previous CTAS query
# Let's say we want to add more data into our table
sql = f"""
INSERT INTO {new_db_name}.sales_report
SELECT qtr as sales_quarter, sum(sales) AS total_sales
FROM {source_db_name}.sales
WHERE qtr IN (3,4)
GROUP BY qtr
"""
print(sql)
_ = pydb.start_query_execution_and_wait(sql)
INSERT INTO new_db_mratford.sales_report SELECT qtr as sales_quarter, sum(sales) AS total_sales FROM source_db_mratford.sales WHERE qtr IN (3,4) GROUP BY qtr
# Now lets see what our sales_report looks like
pydb.read_sql_query(
f"SELECT * FROM {new_db_name}.sales_report",
database=None,
ctas_approach=False,
)
sales_quarter | total_sales | |
---|---|---|
0 | 1 | 28167.78 |
1 | 2 | 30696.60 |
2 | 3 | 26419.31 |
3 | 4 | 27558.68 |
Step 4. Create another new table and insert new data into it this time using partitions¶
We are going to do the same but this time partition the data and write new data into a new partition. Let's parition the data based on something like when the report was ran.
Note: that columns that are partitions should always be the last columns in your table.
sales_report_s3_path = os.path.join(new_db_base_path, "daily_sales_report/")
sql = f"""
CREATE TABLE {new_db_name}.daily_sales_report WITH
(
external_location='{sales_report_s3_path}',
partitioned_by = ARRAY['report_date']
) AS
SELECT qtr as sales_quarter, sum(sales) AS total_sales,
date '2021-01-01' AS report_date
FROM {source_db_name}.sales
GROUP BY qtr, date '2021-01-01'
"""
print(sql)
_ = pydb.start_query_execution_and_wait(sql)
CREATE TABLE new_db_mratford.daily_sales_report WITH ( external_location='s3://alpha-everyone/mratford/new_db/daily_sales_report/', partitioned_by = ARRAY['report_date'] ) AS SELECT qtr as sales_quarter, sum(sales) AS total_sales, date '2021-01-01' AS report_date FROM source_db_mratford.sales GROUP BY qtr, date '2021-01-01'
# Then assume we run the report the next day pretending our source database is updated every day
sales_report_s3_path = os.path.join(new_db_base_path, "daily_sales_report/")
sql = f"""
INSERT INTO {new_db_name}.daily_sales_report
SELECT qtr as sales_quarter, sum(sales) AS total_sales,
date '2021-01-02' AS report_date
FROM {source_db_name}.sales
GROUP BY qtr, date '2021-01-02'
"""
print(sql)
_ = pydb.start_query_execution_and_wait(sql)
INSERT INTO new_db_mratford.daily_sales_report SELECT qtr as sales_quarter, sum(sales) AS total_sales, date '2021-01-02' AS report_date FROM source_db_mratford.sales GROUP BY qtr, date '2021-01-02'
pydb.read_sql_query(
f"SELECT * FROM {new_db_name}.daily_sales_report",
database=None,
ctas_approach=False,
)
sales_quarter | total_sales | report_date | |
---|---|---|---|
0 | 4 | 27558.68 | 2021-01-01 |
1 | 2 | 30696.60 | 2021-01-01 |
2 | 1 | 28167.78 | 2021-01-01 |
3 | 3 | 26419.31 | 2021-01-01 |
4 | 2 | 30696.60 | 2021-01-02 |
5 | 3 | 26419.31 | 2021-01-02 |
6 | 1 | 28167.78 | 2021-01-02 |
7 | 4 | 27558.68 | 2021-01-02 |
# Clean up
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
for db_name in [source_db_name, new_db_name]:
if db_name in df_dbs["Database"].to_list():
print(f"{db_name} found deleting")
wr.catalog.delete_database(name=db_name)
deleting objs source_db_mratford found deleting new_db_mratford found deleting