import os
import pandas as pd
import awswrangler as wr
import pydbtools as pydb
Set up your testing area.
Important: substitute your own Github username below.
# setup your own testing area (set foldername = GH username)
foldername = "mratford" # GH username
foldername = foldername.lower().replace("-", "_")
bucketname = "alpha-everyone"
s3_base_path = f"s3://{bucketname}/{foldername}/"
db_name = f"aws_example_{foldername}"
source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/"
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
print(f"{db_name} found deleting")
wr.catalog.delete_database(name=db_name)
# Setup source database
# Create the database
wr.catalog.create_database(db_name)
# Iterate through the tables in data/ and write them to our db using awswrangler
for table_name in ["department", "employees"]:
df = pd.read_csv(f"data/{table_name}.csv")
table_path = os.path.join(source_db_base_path, f"{table_name}/")
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True, # True allows the other params below i.e. overwriting to db.table
database=db_name,
table=table_name,
mode="overwrite",
)
# For the sales table partition the data by employee_id and qtr
# (reduce the size of the table for legibility)
df = pd.read_csv("data/sales.csv").query("employee_id < 5")
table_path = os.path.join(source_db_base_path, "sales")
partition_info = wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True,
partition_cols=["employee_id", "qtr"],
database=db_name,
table="sales",
mode="overwrite",
)
deleting objs aws_example_mratford found deleting
Deleting a table¶
Show the tables in the database.
table_info = list(wr.catalog.get_tables(database=db_name))
[x["Name"] for x in table_info]
['department', 'employees', 'sales']
Show the data for the department
table.
dept_info = next(x for x in table_info if x["Name"] == "department")
dept_location = dept_info["StorageDescriptor"]["Location"]
wr.s3.list_objects(dept_location)
['s3://alpha-everyone/mratford/source_db/department/be5d0e3a417a4c7bbb0bf0ff5996bba4.snappy.parquet']
Now delete the department
table.
pydb.delete_table_and_data(database=db_name, table="department")
True
Check that it's no longer in the database.
table_info = list(wr.catalog.get_tables(database=db_name))
[x["Name"] for x in table_info]
['employees', 'sales']
Check that the data no longer exists.
wr.s3.list_objects(dept_location)
[]
Deleting a partition¶
Show the partitions from the sales
table.
wr.catalog.get_partitions(database=db_name, table="sales")
{'s3://alpha-everyone/mratford/source_db/sales/employee_id=3/qtr=2/': ['3', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=3/': ['1', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=4/': ['2', '4'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=4/': ['1', '4'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=3/': ['2', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=4/': ['4', '4'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=1/': ['2', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=2/': ['2', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=3/': ['4', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=1/': ['4', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=2/': ['4', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=2/': ['1', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=1/': ['1', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=3/qtr=1/': ['3', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=3/qtr=4/': ['3', '4'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=3/qtr=3/': ['3', '3']}
Check the data for one of the partitions.
wr.s3.list_objects(f"{source_db_base_path}sales/employee_id=1/qtr=4/")
['s3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=4/a59b17a35bbf47d1bb2def454fb3bbc6.snappy.parquet']
Use an SQL like query to delete the partition and data for quarter 4.
pydb.delete_partitions_and_data(
database=db_name, table="sales", expression="qtr = 4"
)
Check that the partition no longer exists.
wr.catalog.get_partitions(database=db_name, table="sales")
{'s3://alpha-everyone/mratford/source_db/sales/employee_id=3/qtr=2/': ['3', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=3/': ['1', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=3/': ['2', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=1/': ['2', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=2/': ['2', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=3/': ['4', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=1/': ['4', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=2/': ['4', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=2/': ['1', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=1/': ['1', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=3/qtr=1/': ['3', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=3/qtr=3/': ['3', '3']}
Check that the data no longer exists.
wr.s3.list_objects(f"{source_db_base_path}sales/employee_id=1/qtr=4/")
[]
Using a more complex query, delete quarters 1 and 2 for employee 3.
pydb.delete_partitions_and_data(
database=db_name, table="sales", expression="employee_id = 3 and qtr < 3"
)
wr.catalog.get_partitions(database=db_name, table="sales")
{'s3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=3/': ['1', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=3/': ['2', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=1/': ['2', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=2/qtr=2/': ['2', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=3/': ['4', '3'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=1/': ['4', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=4/qtr=2/': ['4', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=2/': ['1', '2'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=1/qtr=1/': ['1', '1'], 's3://alpha-everyone/mratford/source_db/sales/employee_id=3/qtr=3/': ['3', '3']}
See the documentation for details on the expression syntax.
Deleting a database¶
db_name in list(wr.catalog.databases()["Database"])
True
pydb.delete_database_and_data(db_name)
True
db_name in list(wr.catalog.databases()["Database"])
False
wr.s3.list_objects(source_db_base_path)
[]
source_db_base_path
's3://alpha-everyone/mratford/source_db/'
Deleting temporary database tables¶
It might be useful during development to get rid of the temporary database or one of it's tables if something has gone wrong. This can be accomplished by using __temp__
as the database name in one of the functions above.
# Setup source database
# Create the database
wr.catalog.create_database(db_name, exist_ok=True)
# Iterate through the tables in data/ and write them to our db using awswrangler
for table_name in ["department", "employees", "sales"]:
df = pd.read_csv(f"data/{table_name}.csv")
table_path = os.path.join(source_db_base_path, f"{table_name}/")
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True, # True allows the other params below i.e. overwriting to db.table
database=db_name,
table=table_name,
mode="overwrite",
)
sql = f"""
SELECT employee_id, sum(sales) as total_sales
FROM {db_name}.sales
GROUP BY employee_id
"""
print(sql)
pydb.create_temp_table(sql, table_name="total_sales")
SELECT employee_id, sum(sales) as total_sales FROM aws_example_mratford.sales GROUP BY employee_id
pydb.read_sql_query("select * from __temp__.total_sales")
employee_id | total_sales | |
---|---|---|
0 | 21 | 1643.14 |
1 | 93 | 688.05 |
2 | 101 | 817.45 |
3 | 60 | 1331.55 |
4 | 17 | 2302.02 |
5 | 5 | 2480.50 |
6 | 28 | 2071.77 |
7 | 6 | 2207.77 |
8 | 26 | 1994.65 |
9 | 3 | 2590.60 |
10 | 44 | 2184.14 |
11 | 12 | 2286.28 |
12 | 42 | 1688.76 |
13 | 20 | 2851.36 |
14 | 10 | 1935.67 |
15 | 32 | 2693.30 |
16 | 7 | 2465.13 |
17 | 19 | 2442.86 |
18 | 24 | 2248.35 |
19 | 220 | 1377.37 |
20 | 1 | 2911.65 |
21 | 23 | 3036.47 |
22 | 38 | 2158.55 |
23 | 11 | 3092.89 |
24 | 59 | 927.30 |
25 | 43 | 2073.25 |
26 | 45 | 2778.84 |
27 | 37 | 1984.24 |
28 | 39 | 1931.27 |
29 | 46 | 2547.63 |
30 | 4 | 2996.54 |
31 | 63 | 721.91 |
32 | 9 | 2279.84 |
33 | 35 | 2624.64 |
34 | 16 | 2373.06 |
35 | 47 | 1489.52 |
36 | 200 | 1108.89 |
37 | 57 | 953.09 |
38 | 13 | 2711.01 |
39 | 25 | 3099.01 |
40 | 31 | 2461.53 |
41 | 33 | 2685.07 |
42 | 80 | 611.18 |
43 | 34 | 1387.50 |
44 | 30 | 2251.47 |
45 | 29 | 2478.25 |
46 | 27 | 3050.02 |
47 | 2 | 1785.73 |
48 | 8 | 2155.77 |
49 | 15 | 2613.67 |
50 | 41 | 1326.88 |
51 | 40 | 2595.53 |
52 | 36 | 1580.06 |
53 | 18 | 1759.39 |
pydb.delete_database_and_data("__temp__")
True
try:
df = pydb.read_sql_query("select * from __temp__.total_sales")
print("Error, temporary database not deleted correctly.")
except wr.exceptions.QueryFailed:
print("Query failed correctly.")
Query failed correctly.