Create Temporary Tables¶
This tutorial demonstrates how to create tempoary tables in athena using pydbtools
Setup¶
Just run this script to create the source database so we can use it for our example.
In [1]:
Copied!
import os
import pandas as pd
import awswrangler as wr
import pydbtools as pydb
import os
import pandas as pd
import awswrangler as wr
import pydbtools as pydb
In [2]:
Copied!
# setup your own testing area (set foldername = GH username)
foldername = "mratford" # GH username
foldername = foldername.lower().replace("-", "_")
# setup your own testing area (set foldername = GH username)
foldername = "mratford" # GH username
foldername = foldername.lower().replace("-", "_")
In [3]:
Copied!
bucketname = "alpha-everyone"
s3_base_path = f"s3://{bucketname}/{foldername}/"
db_name = f"aws_example_{foldername}"
source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/"
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
print(f"{db_name} found deleting")
wr.catalog.delete_database(name=db_name)
# Setup source database
# Create the database
wr.catalog.create_database(db_name)
# Iterate through the tables in data/ and write them to our db using awswrangler
for table_name in ["department", "employees", "sales"]:
df = pd.read_csv(f"data/{table_name}.csv")
table_path = os.path.join(source_db_base_path, table_name)
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True, # True allows the other params below i.e. overwriting to db.table
database=db_name,
table=table_name,
mode="overwrite",
)
bucketname = "alpha-everyone"
s3_base_path = f"s3://{bucketname}/{foldername}/"
db_name = f"aws_example_{foldername}"
source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/"
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
print(f"{db_name} found deleting")
wr.catalog.delete_database(name=db_name)
# Setup source database
# Create the database
wr.catalog.create_database(db_name)
# Iterate through the tables in data/ and write them to our db using awswrangler
for table_name in ["department", "employees", "sales"]:
df = pd.read_csv(f"data/{table_name}.csv")
table_path = os.path.join(source_db_base_path, table_name)
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True, # True allows the other params below i.e. overwriting to db.table
database=db_name,
table=table_name,
mode="overwrite",
)
Task¶
We are going to create a table that shows total sales per employee using all 3 tables
In [4]:
Copied!
pydb.read_sql_query(
f"SELECT * FROM {db_name}.employees LIMIT 5", ctas_approach=False
)
pydb.read_sql_query(
f"SELECT * FROM {db_name}.employees LIMIT 5", ctas_approach=False
)
Out[4]:
employee_id | sex | forename | surname | department_id | manager_id | |
---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1.0 | 17.0 |
1 | 2 | F | Summer | Bennett | 1.0 | 17.0 |
2 | 3 | M | Pip | Carter | 1.0 | 17.0 |
3 | 4 | F | Bella | Long | 1.0 | 17.0 |
4 | 5 | F | Lexie | Perry | NaN | 17.0 |
In [5]:
Copied!
pydb.read_sql_query(
f"SELECT * FROM {db_name}.department LIMIT 5", ctas_approach=False
)
pydb.read_sql_query(
f"SELECT * FROM {db_name}.department LIMIT 5", ctas_approach=False
)
Out[5]:
department_id | department_name | |
---|---|---|
0 | 1 | Sales |
1 | 2 | Admin |
2 | 3 | Management |
3 | 4 | Technical |
4 | 5 | Maintenance |
In [6]:
Copied!
pydb.read_sql_query(
f"SELECT * FROM {db_name}.sales LIMIT 5", ctas_approach=False
)
pydb.read_sql_query(
f"SELECT * FROM {db_name}.sales LIMIT 5", ctas_approach=False
)
Out[6]:
employee_id | qtr | sales | |
---|---|---|---|
0 | 1 | 1 | 768.17 |
1 | 2 | 1 | 391.98 |
2 | 3 | 1 | 406.36 |
3 | 4 | 1 | 816.25 |
4 | 5 | 1 | 437.05 |
pydbtools has a create temp table function that allows you to create tables which you can refer to in a __temp__
database.
First create a total_sales table:
In [7]:
Copied!
sql = f"""
SELECT employee_id, sum(sales) as total_sales
FROM {db_name}.sales
GROUP BY employee_id
"""
print(sql)
pydb.create_temp_table(sql, table_name="total_sales")
sql = f"""
SELECT employee_id, sum(sales) as total_sales
FROM {db_name}.sales
GROUP BY employee_id
"""
print(sql)
pydb.create_temp_table(sql, table_name="total_sales")
SELECT employee_id, sum(sales) as total_sales FROM aws_example_mratford.sales GROUP BY employee_id
Then create a table of employee names from the sales department:
In [8]:
Copied!
sql = f"""
SELECT e.employee_id, e.forename, e.surname, d.department_name
FROM {db_name}.employees AS e
LEFT JOIN {db_name}.department AS d
ON e.department_id = d.department_id
WHERE e.department_id = 1
"""
print(sql)
pydb.create_temp_table(sql, table_name="sales_employees")
sql = f"""
SELECT e.employee_id, e.forename, e.surname, d.department_name
FROM {db_name}.employees AS e
LEFT JOIN {db_name}.department AS d
ON e.department_id = d.department_id
WHERE e.department_id = 1
"""
print(sql)
pydb.create_temp_table(sql, table_name="sales_employees")
SELECT e.employee_id, e.forename, e.surname, d.department_name FROM aws_example_mratford.employees AS e LEFT JOIN aws_example_mratford.department AS d ON e.department_id = d.department_id WHERE e.department_id = 1
Finally return our final table
In [9]:
Copied!
sql = f"""
SELECT se.*, ts.total_sales
FROM __temp__.sales_employees AS se
INNER JOIN __temp__.total_sales AS ts
ON se.employee_id = ts.employee_id
"""
print(sql)
pydb.read_sql_query(sql, ctas_approach=False).head(10)
sql = f"""
SELECT se.*, ts.total_sales
FROM __temp__.sales_employees AS se
INNER JOIN __temp__.total_sales AS ts
ON se.employee_id = ts.employee_id
"""
print(sql)
pydb.read_sql_query(sql, ctas_approach=False).head(10)
SELECT se.*, ts.total_sales FROM __temp__.sales_employees AS se INNER JOIN __temp__.total_sales AS ts ON se.employee_id = ts.employee_id
Out[9]:
employee_id | forename | surname | department_name | total_sales | |
---|---|---|---|---|---|
0 | 1 | Dexter | Mitchell | Sales | 2911.65 |
1 | 2 | Summer | Bennett | Sales | 1785.73 |
2 | 3 | Pip | Carter | Sales | 2590.60 |
3 | 4 | Bella | Long | Sales | 2996.54 |
4 | 6 | Robert | Roberts | Sales | 2207.77 |
5 | 7 | Iris | Alexander | Sales | 2465.13 |
6 | 9 | Evan | Carter | Sales | 2279.84 |
7 | 10 | Lauren | Powell | Sales | 1935.67 |
8 | 11 | Alice | James | Sales | 3092.89 |
9 | 12 | Owen | Scott | Sales | 2286.28 |
Creating a temporary table from a dataframe¶
You can also use an existing dataframe as a table in the temporary database and run SQL queries on it.
In [10]:
Copied!
df = pd.read_csv("data/sales.csv")
pydb.dataframe_to_temp_table(df, "sales")
pydb.read_sql_query(
"select qtr, sum(sales) as sales from __temp__.sales group by qtr"
)
df = pd.read_csv("data/sales.csv")
pydb.dataframe_to_temp_table(df, "sales")
pydb.read_sql_query(
"select qtr, sum(sales) as sales from __temp__.sales group by qtr"
)
Out[10]:
qtr | sales | |
---|---|---|
0 | 4 | 27558.68 |
1 | 2 | 30696.60 |
2 | 1 | 28167.78 |
3 | 3 | 26419.31 |
In [11]:
Copied!
### Clean up
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
print(f"{db_name} found deleting")
wr.catalog.delete_database(name=db_name)
### Clean up
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
print(f"{db_name} found deleting")
wr.catalog.delete_database(name=db_name)
deleting objs aws_example_mratford found deleting
In [ ]:
Copied!