MoJ AP tools demo¶
This notebook demonstrates the use of some of the Python tools developed by data engineers to make creating analytical pipelines simpler for data scientists and analysts.
It focuses on taking a large dataset which is too big for memory, converting it to another format while applying metadata to ensure consistent data types, and creating a database with tables from files or dataframes.
First import the necessary libraries. pydbtools, arrow_pd_parser and mojap_metadata are libraries created and maintained by the Data Modelling and Engineering Team.
import pydbtools as pydb
from arrow_pd_parser import reader, writer
from mojap_metadata import Metadata
import pandas as pd
import awswrangler as wr
import itertools
pd.options.display.max_columns = None
Create a new database, cleaning up any tables and data beforehand in case it already exists.
db = "dmet_example"
pydb.delete_database_and_data(db)
pydb.create_database(db)
True
We have a dataset that consists of a number of very large csv files. How can we load this without running out of memory and crashing our session?
big_path = "s3://alpha-everyone/s3_data_packer_test/land/big/"
# Don't run this!
# df = wr.s3.read_csv(big_path)
arrow_pd_parser
has the ability to read files in chunks, returning an iterator of dataframes. Specify a number of lines to load with chunksize to preview the table.
df = next(reader.read(big_path, file_format="csv", chunksize=10, index_col=0))
df
name | address | city | state | date_time | price | ||
---|---|---|---|---|---|---|---|
0 | Kip Love | gar1812@yahoo.com | 315 Fairfield Pike | Germantown | Maine | 2016-05-07 03:50:50.991892 | 426 |
1 | Joel Clements | intendancies1853@outlook.com | 211 Garces Gate | Canton | South Carolina | 2007-03-20 17:29:35.856517 | 1780 |
2 | Tambra Bowman | badju1819@gmail.com | 604 Paulding Brae | Roanoke Rapids | Missouri | 2011-05-11 13:23:57.802563 | 1919 |
3 | Claude Jackson | clannishly1990@live.com | 1084 Lyndhurst Crescent | Porterville | Arkansas | 2019-05-25 20:22:14.927012 | 1404 |
4 | Percy Sullivan | simoniac1930@live.com | 34 Beaver Row | Lake Jackson | Hawaii | 2013-09-21 00:26:37.106366 | 126 |
5 | Nathan Frank | unstocking1800@yahoo.com | 151 Music Concourse Gardens | Jackson | Maryland | 2004-11-18 20:36:42.006247 | 417 |
6 | Leonel Buck | amazing1959@outlook.com | 258 Higuera Line | Gurnee | Illinois | 2016-03-27 12:33:38.784997 | 938 |
7 | Lawanna Hess | rovers1805@gmail.com | 1065 Freeman Townline | Inglewood | Kansas | 2006-04-11 19:15:28.331296 | 782 |
8 | Carmelo Morgan | piperazine1959@protonmail.com | 412 San Buenaventura Turnpike | Rolling Meadows | Ohio | 2004-05-22 14:03:06.479138 | 185 |
9 | Shiloh Silva | bibliomane1826@yahoo.com | 917 Brookdale Path | Bainbridge Island | Oklahoma | 2000-12-08 08:47:10.598186 | 478 |
Checking the data types we can see that date_time
is a string but we would like it to be a timestamp.
df.dtypes
name string email string address string city string state string date_time string price Int64 dtype: object
Create metadata to fix this using mojap_metadata.Metadata
. Note that arrow
rather than pandas
types are used, and these will be enforced across formats.
metadata = Metadata.from_dict(
{
"name": "big_table",
"columns": [
{"name": n, "type": t}
for n, t in [
("name", "string"),
("email", "string"),
("address", "string"),
("city", "string"),
("state", "string"),
("date_time", "timestamp(ms)"),
("price", "int64"),
]
],
}
)
metadata.columns
[{'name': 'name', 'type': 'string'}, {'name': 'email', 'type': 'string'}, {'name': 'address', 'type': 'string'}, {'name': 'city', 'type': 'string'}, {'name': 'state', 'type': 'string'}, {'name': 'date_time', 'type': 'timestamp(ms)'}, {'name': 'price', 'type': 'int64'}]
Now try previewing the data again with metadata enforced.
df = next(
reader.read(
big_path,
file_format="csv",
chunksize=10,
index_col=0,
metadata=metadata,
)
)
df
name | address | city | state | date_time | price | ||
---|---|---|---|---|---|---|---|
0 | Kip Love | gar1812@yahoo.com | 315 Fairfield Pike | Germantown | Maine | 2016-05-07 03:50:50.991892 | 426 |
1 | Joel Clements | intendancies1853@outlook.com | 211 Garces Gate | Canton | South Carolina | 2007-03-20 17:29:35.856517 | 1780 |
2 | Tambra Bowman | badju1819@gmail.com | 604 Paulding Brae | Roanoke Rapids | Missouri | 2011-05-11 13:23:57.802563 | 1919 |
3 | Claude Jackson | clannishly1990@live.com | 1084 Lyndhurst Crescent | Porterville | Arkansas | 2019-05-25 20:22:14.927012 | 1404 |
4 | Percy Sullivan | simoniac1930@live.com | 34 Beaver Row | Lake Jackson | Hawaii | 2013-09-21 00:26:37.106366 | 126 |
5 | Nathan Frank | unstocking1800@yahoo.com | 151 Music Concourse Gardens | Jackson | Maryland | 2004-11-18 20:36:42.006247 | 417 |
6 | Leonel Buck | amazing1959@outlook.com | 258 Higuera Line | Gurnee | Illinois | 2016-03-27 12:33:38.784997 | 938 |
7 | Lawanna Hess | rovers1805@gmail.com | 1065 Freeman Townline | Inglewood | Kansas | 2006-04-11 19:15:28.331296 | 782 |
8 | Carmelo Morgan | piperazine1959@protonmail.com | 412 San Buenaventura Turnpike | Rolling Meadows | Ohio | 2004-05-22 14:03:06.479138 | 185 |
9 | Shiloh Silva | bibliomane1826@yahoo.com | 917 Brookdale Path | Bainbridge Island | Oklahoma | 2000-12-08 08:47:10.598186 | 478 |
Note that date_time
is now an object of datetime.datetime
type as the pandas
date/time types have too narrow a range.
df.dtypes
name string email string address string city string state string date_time object price Int64 dtype: object
For the sake of this demo take a small slice, the first 5 chunks, of an iterator reading the whole data set.
r = itertools.islice(
reader.read(
big_path,
file_format="csv",
chunksize="100MB",
index_col=0,
metadata=metadata,
),
5,
)
We can then convert between formats, in this case to parquet, while preserving the metadata.
new_path = "s3://alpha-everyone/dmet_st/big_table.parquet"
wr.s3.delete_objects(new_path)
writer.write(r, new_path, metadata=metadata)
Big datasets in S3 can be used to create a queryable table.
pydb.file_to_table(
new_path,
database=db,
table="big_table",
location="s3://alpha-everyone/dmet_st/dmet_example/big_table",
chunksize="100MB",
metadata=metadata,
)
pydb.read_sql_query(f"select * from {db}.big_table limit 5")
name | address | city | state | date_time | price | ||
---|---|---|---|---|---|---|---|
0 | Werner Jenkins | congruism1828@gmail.com | 497 Sydney Path | Lodi | Delaware | 2012-03-18 08:15:24.496 | 1485 |
1 | Jacques Garner | embrothelled1973@yahoo.com | 1244 West Point High Street | Elmira | Michigan | 2002-09-05 00:39:12.638 | 463 |
2 | Freeman Callahan | wonders1955@protonmail.com | 869 Neptune Turnpike | Burbank | Michigan | 2006-05-24 06:33:02.054 | 161 |
3 | Reuben Ewing | charmian2048@yahoo.com | 823 Minnesota Freeway | Anacortes | Minnesota | 2011-04-17 19:49:33.711 | 777 |
4 | Dwain Maynard | vulturelike1889@yahoo.com | 935 North Hughes Terrace | Mamaroneck | Oregon | 2011-06-25 06:16:58.032 | 1105 |
Create a new table in the database from an SQL statement.
pydb.delete_table_and_data(database=db, table="state_revenues")
pydb.create_table(
f"""
select state, sum(price) as revenue
from {db}.big_table
group by state
""",
database=db,
table="state_revenues",
location="s3://alpha-everyone/dmet_st/dmet_example/state_revenues",
)
sr = pydb.read_sql_query(f"select * from {db}.state_revenues")
sr
state | revenue | |
---|---|---|
0 | North Dakota | 19936459 |
1 | Pennsylvania | 19876171 |
2 | Montana | 20228848 |
3 | Maine | 19783670 |
4 | Washington | 20056159 |
5 | Tennessee | 20081268 |
6 | Ohio | 20222361 |
7 | South Carolina | 20161898 |
8 | Texas | 20095812 |
9 | Mississippi | 19881812 |
10 | Connecticut | 20104647 |
11 | Oregon | 20184460 |
12 | Florida | 20011580 |
13 | Iowa | 20227238 |
14 | Nevada | 20112272 |
15 | Alaska | 19822735 |
16 | Illinois | 20175380 |
17 | Colorado | 20197961 |
18 | West Virginia | 19938810 |
19 | Oklahoma | 19899153 |
20 | Missouri | 20086612 |
21 | Rhode Island | 20075477 |
22 | Wyoming | 20129612 |
23 | New Jersey | 19833567 |
24 | New York | 20216151 |
25 | Delaware | 19825190 |
26 | Vermont | 20570793 |
27 | Idaho | 20328920 |
28 | Virginia | 20243585 |
29 | Nebraska | 19912395 |
30 | Maryland | 20206547 |
31 | Kentucky | 20248912 |
32 | New Mexico | 19974839 |
33 | Louisiana | 20076739 |
34 | Arizona | 20051478 |
35 | North Carolina | 20144047 |
36 | South Dakota | 20024755 |
37 | Arkansas | 20429413 |
38 | Michigan | 20237562 |
39 | Hawaii | 19992112 |
40 | California | 20028102 |
41 | Georgia | 20152913 |
42 | Massachusetts | 20141289 |
43 | Utah | 20167422 |
44 | Kansas | 20375149 |
45 | Wisconsin | 19983479 |
46 | Alabama | 20138405 |
47 | Minnesota | 19840186 |
48 | Indiana | 20425105 |
49 | New Hampshire | 20156172 |
What if we want to do some manipulation with pandas?
starts_with_n = sr[sr["state"].str.startswith("N")]
starts_with_n
state | revenue | |
---|---|---|
0 | North Dakota | 19936459 |
14 | Nevada | 20112272 |
23 | New Jersey | 19833567 |
24 | New York | 20216151 |
29 | Nebraska | 19912395 |
32 | New Mexico | 19974839 |
35 | North Carolina | 20144047 |
49 | New Hampshire | 20156172 |
We can then create another table in the database from the manipulated dataframe. This allows us to create a hybrid pipeline of SQL and pandas operations.
pydb.dataframe_to_table(
starts_with_n,
database=db,
table="starts_with_n",
location="s3://alpha-everyone/dmet_st/dmet_example/starts_with_n",
)
pydb.read_sql_query(f"select * from {db}.starts_with_n")
state | revenue | |
---|---|---|
0 | North Dakota | 19936459 |
1 | Nevada | 20112272 |
2 | New Jersey | 19833567 |
3 | New York | 20216151 |
4 | Nebraska | 19912395 |
5 | New Mexico | 19974839 |
6 | North Carolina | 20144047 |
7 | New Hampshire | 20156172 |