Simple Database Creation and Manipulation¶
In this tutorial we are going to use aws-wrangler to create a database of different tables.
Let's create a database out of the test data employees.csv
, sales.csv
and department.csv
(all in the data/
folder)
Note this is basically taken from: https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/014%20-%20Schema%20Evolution.ipynb
In [1]:
Copied!
import pandas as pd
import awswrangler as wr
import datetime
import pydbtools as pydb
import pandas as pd
import awswrangler as wr
import datetime
import pydbtools as pydb
Setup first¶
In [2]:
Copied!
# setup your own testing area (set foldername = GH username)
foldername = "mratford" # GH username
foldername = foldername.lower().replace("-", "_")
# setup your own testing area (set foldername = GH username)
foldername = "mratford" # GH username
foldername = foldername.lower().replace("-", "_")
In [3]:
Copied!
bucketname = "alpha-everyone"
db_name = f"aws_example_{foldername}"
db_base_path = f"s3://{bucketname}/{foldername}/database"
s3_base_path = f"s3://{bucketname}/{foldername}/"
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
wr.catalog.delete_database(name=db_name)
bucketname = "alpha-everyone"
db_name = f"aws_example_{foldername}"
db_base_path = f"s3://{bucketname}/{foldername}/database"
s3_base_path = f"s3://{bucketname}/{foldername}/"
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
wr.catalog.delete_database(name=db_name)
deleting objs
Lets get the data in pandas first¶
In [4]:
Copied!
df = pd.read_csv("data/employees.csv")
df.head()
df = pd.read_csv("data/employees.csv")
df.head()
Out[4]:
employee_id | sex | forename | surname | department_id | manager_id | |
---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1.0 | 17.0 |
1 | 2 | F | Summer | Bennett | 1.0 | 17.0 |
2 | 3 | M | Pip | Carter | 1.0 | 17.0 |
3 | 4 | F | Bella | Long | 1.0 | 17.0 |
4 | 5 | F | Lexie | Perry | NaN | 17.0 |
Lets do some transforms on it¶
In [5]:
Copied!
df["creation_date"] = datetime.date(2021, 1, 1)
df.head()
df["creation_date"] = datetime.date(2021, 1, 1)
df.head()
Out[5]:
employee_id | sex | forename | surname | department_id | manager_id | creation_date | |
---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1.0 | 17.0 | 2021-01-01 |
1 | 2 | F | Summer | Bennett | 1.0 | 17.0 | 2021-01-01 |
2 | 3 | M | Pip | Carter | 1.0 | 17.0 | 2021-01-01 |
3 | 4 | F | Bella | Long | 1.0 | 17.0 | 2021-01-01 |
4 | 5 | F | Lexie | Perry | NaN | 17.0 | 2021-01-01 |
write the table to a database¶
parquet is always your best bet for writing data to a Glue Database especially if you only want to retrieve that data via Athena SQL queries.
In [6]:
Copied!
# Create the database
wr.catalog.create_database(db_name)
# note table_path is a folder as glue treats all the
# data in a folder as contents of a single table
table_path = f"{db_base_path}/employees/"
# Write your pandas dataframe to S3 and add it as a table in your database
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True, # True allows the other params below i.e. overwriting to db.table
database=db_name,
table="employees",
mode="overwrite",
)
# Create the database
wr.catalog.create_database(db_name)
# note table_path is a folder as glue treats all the
# data in a folder as contents of a single table
table_path = f"{db_base_path}/employees/"
# Write your pandas dataframe to S3 and add it as a table in your database
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True, # True allows the other params below i.e. overwriting to db.table
database=db_name,
table="employees",
mode="overwrite",
)
Out[6]:
{'paths': ['s3://alpha-everyone/mratford/database/employees/bc076712bce54b45b3a6ec7f8def198f.snappy.parquet'], 'partitions_values': {}}
Append new data to the table¶
Let's for fun also add new cols as well
In [7]:
Copied!
df["creation_date"] = datetime.date(2021, 1, 1)
df["new_col1"] = df["employee_id"] + 100
df["new_col2"] = "some text"
df.head()
df["creation_date"] = datetime.date(2021, 1, 1)
df["new_col1"] = df["employee_id"] + 100
df["new_col2"] = "some text"
df.head()
Out[7]:
employee_id | sex | forename | surname | department_id | manager_id | creation_date | new_col1 | new_col2 | |
---|---|---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1.0 | 17.0 | 2021-01-01 | 101 | some text |
1 | 2 | F | Summer | Bennett | 1.0 | 17.0 | 2021-01-01 | 102 | some text |
2 | 3 | M | Pip | Carter | 1.0 | 17.0 | 2021-01-01 | 103 | some text |
3 | 4 | F | Bella | Long | 1.0 | 17.0 | 2021-01-01 | 104 | some text |
4 | 5 | F | Lexie | Perry | NaN | 17.0 | 2021-01-01 | 105 | some text |
In [8]:
Copied!
# Write the new data to S3.
# Note the only thing has changed is mode="append" whereas previously it was mode="overwrite"
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True,
database=db_name,
table="employees",
mode="append",
)
# Write the new data to S3.
# Note the only thing has changed is mode="append" whereas previously it was mode="overwrite"
wr.s3.to_parquet(
df=df,
path=table_path,
index=False,
dataset=True,
database=db_name,
table="employees",
mode="append",
)
Out[8]:
{'paths': ['s3://alpha-everyone/mratford/database/employees/11667f33cb814d039b588eee1a206d75.snappy.parquet'], 'partitions_values': {}}
Now query the data with Athena to look at it¶
This should use pydbtools rather than aws_wrangler (if you are a AP user).
In [9]:
Copied!
# Each uploaded dataset had one employee with an employee_id == 1
# So lets pull that down to demonstrate both tables were added to the data
sql = f"SELECT * from {db_name}.employees where employee_id = 1"
db_table = pydb.read_sql_query(sql, ctas_approach=False)
# Each uploaded dataset had one employee with an employee_id == 1
# So lets pull that down to demonstrate both tables were added to the data
sql = f"SELECT * from {db_name}.employees where employee_id = 1"
db_table = pydb.read_sql_query(sql, ctas_approach=False)
In [10]:
Copied!
print(sql)
print(sql)
SELECT * from aws_example_mratford.employees where employee_id = 1
In [11]:
Copied!
db_table.head()
db_table.head()
Out[11]:
employee_id | sex | forename | surname | department_id | manager_id | creation_date | new_col1 | new_col2 | |
---|---|---|---|---|---|---|---|---|---|
0 | 1 | M | Dexter | Mitchell | 1.0 | 17.0 | 2021-01-01 | 101 | some text |
1 | 1 | M | Dexter | Mitchell | 1.0 | 17.0 | 2021-01-01 | <NA> | <NA> |
In [12]:
Copied!
### Clean up
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
print(f"deleting {db_name}")
wr.catalog.delete_database(name=db_name)
### Clean up
# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
print("deleting objs")
wr.s3.delete_objects(s3_base_path)
# Delete the database if it exists
df_dbs = wr.catalog.databases(None)
if db_name in df_dbs["Database"].to_list():
print(f"deleting {db_name}")
wr.catalog.delete_database(name=db_name)
deleting objs deleting aws_example_mratford
In [13]:
Copied!
# Demonstrate db no longer exists
db_name in wr.catalog.databases()["Database"].to_list()
# Demonstrate db no longer exists
db_name in wr.catalog.databases()["Database"].to_list()
Out[13]:
False
In [ ]:
Copied!