Previous slide Next slide Toggle fullscreen Open presenter view
Data Pipeline Architecture To-Be
Option 1: Convert curated tables to Iceberg table format
Option 2: Migrate curation to Athena + dbt in conjunction with Iceberg
Outcome
Out-of-the-box, Athena + Iceberg is cheaper and more performant for our use cases than Glue PySpark + Iceberg
Iceberg is compatible with the data engineering tool set which facilitates adoption
Iceberg simplifies the code which makes it easier to maintain
Hence we are proceeding with option 2
This also unifies the data engineering tech stack which facilitates collaboration and minimizes duplication
Lessons learnt
Re-evaluate objectives regularly!
The investigation was initially supposed to compare Glue PySpark against Hudi and Iceberg table formats.
We quickly expanded the investigation to include Athena, but wasted time investigating Hudi further when it was clear Iceberg was the clear winner for our use cases.
Data curation processes
Bulk insert full loads
Remove duplicate data
Apply Type 2 Slowly Changing Dimension (SCD2) to track row changes over time:
id
status
updated_at
valid_from
valid_to
is_current
1
pending
2019-01-01
2019-01-01
2019-01-02
False
1
shipped
2019-01-02
2019-01-02
null
True
SCD2 is difficult because:
Deltas can contain multiple and/or late-arriving updates
Requires updating historic records
Performance has degraded over the last few months, with monthly costs quadrupling
Uses complex process for handling data shuffling which makes it hard to maintain
Large volumes of intermittent missing data and duplicates, but given the complexity of the current job, the root-cause could not be identified
Could we improve performance and simplify the PySpark job by making use of Iceberg?
A way to organise a datasets' files to present them as a table
Apache Hive table format defines a table as all the files in one or more particular directories
Modern table format (Apache Hudi , Delta Lake , and Apache Iceberg) store additional metadata and table statistics
This allows query engines to better identify relevant data files, minimising data scans and speeding up queries
ACID Transactions
Hive does not easily support updates or deletes
A common work-around is Write-Audit-Publish (WAP) pattern:
Rewrite sections or all of the data to a staging location
Audit the new data
Replace the exising data or point the data catalogue to the new location
This causes huge data duplication and redundant ETL jobs
Modern table formats ensures ACID guarantees on inserts, deletes, and updates, with the option to run these operations concurrently
Why Apache Iceberg?
Comparison of table formats:
Performance is very dependent on optimisation
Community support is comparable
Ecosystem support is more varied:
Ecosystem
Hudi
Delta Lake
Iceberg
AWS Glue PySpark
Read+Write+DDL
Read+Write+DDL
Read+Write+DDL
Amazon Athena
Read
Read
Read+Write+DDL
Athena only has write and DDL support for Iceberg tables
=> Iceberg makes Athena a viable alternative to AWS Glue PySpark for ETL
Why Amazon Athena?
Athena runs queries in a distributed query engine using Trino under the hood
Athena has many advantages over Glue PySpark:
Costs based on amount of data scanned ($5/TB)
Determines optimum cluster query settings dynamically
Sacrifices mid-query fault-tolerance for faster execution
Shallower learning curve
Athena V3 better integrated with Data Catalog and Iceberg
Athena has various service quotas but these can be increased
Athena in conjunction with dbt can be used for ETL
dbt can manage concurrent workloads to minimise throttling
3) Evaluation Methodology
Evaluation criteria
In order of importance :
Compatibility with existing tech stack and tool sets
Minimise running costs
Minimise code complexity / maximise readability
Minimise execution time
Time is less important because use daily batch processes which run over night
Time is still relevant because:
there is a direct relationship between time and cost for Glue PySpark
Athena has run time quotas
TPC-DS Benchmark
TPC-DS is a data warehousing benchmark consisting of:
25 tables whose total size can vary (1GB to 100TB)
99 SQL queries ranging from simple aggregations to advanced pattern analysis
AWS often uses TPC-DS for example to validate:
Data curation data generation
Used TPC-DS connector for AWS Glue to generate the TPC-DS stores_sales
table at scales:
0.1TB (~290 million rows, 21 GB)
3TB (~8 billion rows, 440 GB)
Used a PySpark job to simulate updates with increasing proportion of rows updated: 0.1, 1, 10, 99%
By comparison, our largest table oasys_question
:
Contains ~3 billion rows, 460 GB
Receives upto ~2.5 million daily updates (0.08%)
Bulk Insert comparison
Athena (blue) is cheaper than PySpark (orange) at both scales
PySpark is faster at larger scales (dashed square)
MERGE and SCD2 logic
MERGE (ANSI-SQL2003) combines UPDATE and INSERT:
MERGE INTO {object_name | subquery} [ [AS] alias ]
USING table_reference [ [AS] alias ]
ON search_condition
WHEN MATCHED
THEN UPDATE SET column = { expression | DEFAULT }[, ...]
WHEN NOT MATCHED
THEN INSERT [( column[, ...] )] VALUES ( expression[, ...] )
SCD2 comparison - 100 GB
Athena is consistently cheaper and faster than PySpark, by a massive margin
However, Athena fails at the highest update proportions
SCD2 comparison - 3 TB
PySpark fails at all update proportions
Athena passes at the lower update proportions, as per our use cases
Data derivation evaluation
We used the TPC-DS queries as a subsitute for data derivation processes
Stats for TPC-DS queries against Iceberg tables relative to Hive:
Scale
Partitioned
File Size
Execution Time
Data Scanned
1GB
No
0.7x
2.7x
1.5x
3TB
Yes
0.03x
1.2x
0.9x
Note that the 3TB Hive dataset was optimised, unlike for the Iceberg dataset, and we still obtained comparable performance.
Data engineering have built various tools to support data analysis on the MoJ Analytical Platform.
We verified these tools were compatible with Iceberg including:
Considerations and Limitations
Athena support for Iceberg tables has various limitations .
We came across the following limitations during the evaluation:
Athena supports only millisecond precision for timestamps so timestamp columns need to be cast as TIMESTAMP(6)
Athena has a 100 partitions limit with INSERT INTO, which applies to Iceberg tables as well. See here for a work-around
Iceberg metadata is not fully integrated with the Glue Data Catalogue for example:
dropped column still appear
partitioned columns are not flagged as partitioned
Risks
No time to investigate the impact of:
data skew on write-performance
table width on write-performance
simultaneously updating and querying a table on read-performance
Replacing dependency on specialist Spark expertise with specialist Athena and Iceberg expertise
Athena might not be able to handle future volumes
Knowns Unknowns
When is Glue PySpark preferred over Athena?
How to best improve Athena query performance using sorting, partitions, file compaction etc...
What is the maximum volume capacity with these optimisations in place?
How to best scale up for full refreshes in a disaster recovery scenario
How to best integrate with dbt and create-a-derived-table
How to best monitor code complexity and flag violations
How to best publish Iceberg metadata not available in the Data Catalogue
Contributors
David Bridgwood
Chris Foufoulides
Gwion Aprhobat
Khristiania Raihan
Siva Bathina
Soumaya Mauthoor
Theodore Manassis
William Orr
Acknowledgements
Alex Vilela, Anis Khan, Calum Barnett
If we had had more time...
Run Bulk Insert and SCD2 with Glue PySpark and Athena against Hive tables to estimate performance gains against Iceberg
Run the TPC-DS queries in Spark SQL to compare performance against Athena
Terraform the codebase to allow collaborators to more easily reproduce the results
Investigate SCD2 failures to identify origin and improve understanding of Glue PySpark vs Athena
Apache Iceberg’s approach is to define the table through three categories of metadata. These categories are:
“metadata files” that define the table
“manifest lists” that define a snapshot of the table
“manifests” that define groups of data files that may be part of one or more snapshots
source
Athena Resource limits
When you submit a query, the Athena engine query planner estimates the compute capacity required to run the query and prepares a cluster of compute nodes accordingly.
Some queries like DDL queries run on only one node. Complex queries over large data sets run on much bigger clusters. The nodes are uniform, with the same memory, CPU, and disk configurations. Athena scales out, not up, to process more demanding queries.
Sometimes the demands of a query exceed the resources available to the cluster running the query. The resource most commonly exhausted is memory, but in rare cases it can also be disk space.
source
Because of its size, a distributed dataset is usually stored in partitions, with each partition holding a group of rows.
This also improves parallelism for operations like a map or filter.
A shuffle is any operation over a dataset that requires redistributing data across its partitions.
Examples include sorting and grouping by key.
Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale.
Hive transforms HiveQL queries into MapReduce jobs that run on Apache Hadoop.
It queries data stored in a distributed storage solution, like the Hadoop Distributed File System (HDFS) or Amazon S3.
Hive stores its database and table metadata in a metastore,
which is a database or file backed store that enables easy data abstraction and discovery.
Advantages of Hive:
1. De-facto standard
2. Works with basically every engine
2. Can make use of partitions to speed up queries
3. File format agnostic
4. Can atomically update a partition
Problems with Hive:
1. Table scans unless you make efficient use of partitions
Metadata structures are used to define:
- What is the table?
- What is the table’s schema?
- How is the table partitioned?
- What data files make up the table?
Data Definition Language [DDL](https://en.wikipedia.org/wiki/Data_definition_language) queries.
In the context of SQL, data definition or data description language (DDL) is a syntax for creating and modifying database objects such as tables, indices, and users.
DDL statements are similar to a computer programming language for defining data structures, especially database schemas.
Common examples of DDL statements include CREATE, ALTER, and DROP.
Trino can push down the processing of queries, or parts of queries, into the connected data source: https://trino.io/docs/current/optimizer/pushdown.html
Why dbt?
write custom business logic using SQL
automate data quality testing
deploy the code
publish documentation side-by-side with the code
The relative execution time and data scanned varies depending on the scale and partitioning, but within an acceptable range.