Creating and Maintaining Database Tables in Athena from SQL files¶
In this tutorial we are going to use Athena SQL queries (via pydbtools) to create a new database from and existing databases in Athena.
First we need to create a database of tables to act as our existing database. But then we will create a new database that holds tables that are derived from the original.
Our source database will have the test data employees.csv
, sales.csv
and department.csv
(all in the data/
folder)
Useful links:
Setup¶
Just run this script to create the source database so we can use it for our example.
import os
import pandas as pd
import awswrangler as wr
import pydbtools as pydb
# setup your own testing area (set foldername = GH username)
foldername = "mratford" # GH username
foldername = foldername.lower().replace("-", "_")
bucketname = "alpha-everyone"
s3_base_path = f"s3://{bucketname}/{foldername}/"
source_db_name = f"source_db_{foldername}"
source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/"
pydb.delete_database_and_data(source_db_name)
# Setup source database
# Create the database
pydb.create_database(source_db_name)
# Iterate through the tables in data/ and write them to our db using file_to_table
for table_name in ["department", "employees", "sales"]:
table_path = pydb.s3_path_join(source_db_base_path, f"{table_name}/")
pydb.file_to_table(
path=f"data/{table_name}.csv",
database=source_db_name,
table=table_name,
location=table_path,
)
Now for the actual tutorial¶
We are going to run all of our queries using SQL. You may have seen that I've used awswrangler to create the database above (which is fine to do). However this part of the tutorial will all be in SQL so you can run this from anything (i.e. R, Athena workbench, anything that can send queries to Athena).
Step 1. create the new database¶
We are going to create a new database which will generate derived tables from our source database. For ease we are going to create the database location in the same parent folder as our source database. However, in reality you probably want to create your own bucket for the database and tables to sit in so that you can control who has access to your database.
Note: We use a lot of f-strings here to parameterise our SQL queries so for ease and understanding we are going to print out each SQL query each time just so you can see what you are actually running on athena.
new_db_name = f"new_db_{foldername}"
new_db_base_path = f"s3://{bucketname}/{foldername}/new_db/"
sql = f"""
CREATE DATABASE IF NOT EXISTS {new_db_name}
COMMENT 'example or running queries and insert to'
LOCATION '{new_db_base_path}';
"""
Step 2. Run a CTAS query to create your new derived table in your new database¶
We use a CTAS query as it both generates the output data into S3 but also creates the schema of the new table
# Note our table s3 path is saved in the following format:
# s3://<bucket>/<path to database folder>/<table_name>/
# You don't have to do this but it is strongly recommended to make it easier
# to map your schemas to your data.
sales_report_s3_path = os.path.join(new_db_base_path, "sales_report/")
sql += f"""
CREATE TABLE {new_db_name}.sales_report WITH
(
external_location='{sales_report_s3_path}'
) AS
SELECT qtr as sales_quarter, sum(sales) AS total_sales
FROM {source_db_name}.sales
WHERE qtr IN (1,2)
GROUP BY qtr;
"""
Step 3. Use An insert into query to add new data into that table¶
We use an Insert INTO query here as we already created the schema in the previous CTAS query
# Let's say we want to add more data into our table
sql += f"""
INSERT INTO {new_db_name}.sales_report
SELECT qtr as sales_quarter, sum(sales) AS total_sales
FROM {source_db_name}.sales
WHERE qtr IN (3,4)
GROUP BY qtr;
"""
Add SQL to create a quarterly sales report.
sql += f"""
SELECT * FROM {new_db_name}.sales_report;
"""
Step 4. Create another new table and insert new data into it this time using partitions¶
We are going to do the same but this time partition the data and write new data into a new partition. Let's parition the data based on something like when the report was ran.
Note: that columns that are partitions should always be the last columns in your table.
sales_report_s3_path = os.path.join(new_db_base_path, "daily_sales_report/")
sql += f"""
CREATE TABLE {new_db_name}.daily_sales_report WITH
(
external_location='{sales_report_s3_path}',
partitioned_by = ARRAY['report_date']
) AS
SELECT qtr as sales_quarter, sum(sales) AS total_sales,
date '2021-01-01' AS report_date
FROM {source_db_name}.sales
GROUP BY qtr, date '2021-01-01';
"""
# Then assume we run the report the next day pretending our source database is updated every day
sales_report_s3_path = os.path.join(new_db_base_path, "daily_sales_report/")
sql += f"""
INSERT INTO {new_db_name}.daily_sales_report
SELECT qtr as sales_quarter, sum(sales) AS total_sales,
date '2021-01-02' AS report_date
FROM {source_db_name}.sales
GROUP BY qtr, date '2021-01-02';
"""
Let's look at the entire SQL statement.
print(sql)
CREATE DATABASE IF NOT EXISTS new_db_mratford COMMENT 'example or running queries and insert to' LOCATION 's3://alpha-everyone/mratford/new_db/'; CREATE TABLE new_db_mratford.sales_report WITH ( external_location='s3://alpha-everyone/mratford/new_db/sales_report/' ) AS SELECT qtr as sales_quarter, sum(sales) AS total_sales FROM source_db_mratford.sales WHERE qtr IN (1,2) GROUP BY qtr; INSERT INTO new_db_mratford.sales_report SELECT qtr as sales_quarter, sum(sales) AS total_sales FROM source_db_mratford.sales WHERE qtr IN (3,4) GROUP BY qtr; SELECT * FROM new_db_mratford.sales_report; CREATE TABLE new_db_mratford.daily_sales_report WITH ( external_location='s3://alpha-everyone/mratford/new_db/daily_sales_report/', partitioned_by = ARRAY['report_date'] ) AS SELECT qtr as sales_quarter, sum(sales) AS total_sales, date '2021-01-01' AS report_date FROM source_db_mratford.sales GROUP BY qtr, date '2021-01-01'; INSERT INTO new_db_mratford.daily_sales_report SELECT qtr as sales_quarter, sum(sales) AS total_sales, date '2021-01-02' AS report_date FROM source_db_mratford.sales GROUP BY qtr, date '2021-01-02';
sales_report = pydb.read_sql_queries(sql)
sales_report
sales_quarter | total_sales | |
---|---|---|
0 | 3 | 26419.31 |
1 | 4 | 27558.68 |
2 | 2 | 30696.60 |
3 | 1 | 28167.78 |
Use read_sql_queries_gen
, which returns an iterator of pandas dataframes, for more than one SELECT
query.
sql += f"""
SELECT * FROM {new_db_name}.daily_sales_report;
"""
print(sql)
CREATE DATABASE IF NOT EXISTS new_db_mratford COMMENT 'example or running queries and insert to' LOCATION 's3://alpha-everyone/mratford/new_db/'; CREATE TABLE new_db_mratford.sales_report WITH ( external_location='s3://alpha-everyone/mratford/new_db/sales_report/' ) AS SELECT qtr as sales_quarter, sum(sales) AS total_sales FROM source_db_mratford.sales WHERE qtr IN (1,2) GROUP BY qtr; INSERT INTO new_db_mratford.sales_report SELECT qtr as sales_quarter, sum(sales) AS total_sales FROM source_db_mratford.sales WHERE qtr IN (3,4) GROUP BY qtr; SELECT * FROM new_db_mratford.sales_report; CREATE TABLE new_db_mratford.daily_sales_report WITH ( external_location='s3://alpha-everyone/mratford/new_db/daily_sales_report/', partitioned_by = ARRAY['report_date'] ) AS SELECT qtr as sales_quarter, sum(sales) AS total_sales, date '2021-01-01' AS report_date FROM source_db_mratford.sales GROUP BY qtr, date '2021-01-01'; INSERT INTO new_db_mratford.daily_sales_report SELECT qtr as sales_quarter, sum(sales) AS total_sales, date '2021-01-02' AS report_date FROM source_db_mratford.sales GROUP BY qtr, date '2021-01-02'; SELECT * FROM new_db_mratford.daily_sales_report;
pydb.delete_database_and_data(new_db_name)
df_g = pydb.read_sql_queries_gen(sql)
sales_report = next(df_g)
sales_report
sales_quarter | total_sales | |
---|---|---|
0 | 3 | 26419.31 |
1 | 4 | 27558.68 |
2 | 2 | 30696.60 |
3 | 1 | 28167.78 |
daily_sales_report = next(df_g)
daily_sales_report
sales_quarter | total_sales | report_date | |
---|---|---|---|
0 | 1 | 28167.78 | 2021-01-02 |
1 | 3 | 26419.31 | 2021-01-02 |
2 | 4 | 27558.68 | 2021-01-02 |
3 | 2 | 30696.60 | 2021-01-02 |
4 | 2 | 30696.60 | 2021-01-01 |
5 | 1 | 28167.78 | 2021-01-01 |
6 | 3 | 26419.31 | 2021-01-01 |
7 | 4 | 27558.68 | 2021-01-01 |
# Clean up
pydb.delete_database_and_data(new_db_name)
True