Previous slide Next slide Toggle fullscreen Open presenter view 
 
 
 
 
Data Pipeline Architecture To-Be 
Option 1: Convert curated tables to Iceberg  table format
Option 2: Migrate curation to Athena  + dbt  in conjunction with Iceberg
 
Outcome 
Out-of-the-box, Athena + Iceberg is cheaper  and more performant  for our use cases than Glue PySpark + Iceberg 
Iceberg is compatible with the data engineering tool set which facilitates adoption  
Iceberg simplifies  the code which makes it easier to maintain 
 
Hence we are proceeding with option 2 
This also unifies the data engineering tech stack which facilitates collaboration  and minimizes duplication
 
Lessons learnt 
Re-evaluate objectives regularly!
The investigation was initially supposed to compare Glue PySpark against Hudi and Iceberg table formats.
We quickly expanded the investigation to include Athena, but wasted time investigating Hudi further  when it was clear Iceberg was the clear winner for our use cases.
 
 
Data curation processes 
Bulk insert full loads 
Remove duplicate data 
Apply Type 2 Slowly Changing Dimension (SCD2)  to track row changes over time: 
 
id 
status 
updated_at 
valid_from 
valid_to 
is_current 
 
 
1 
pending 
2019-01-01 
2019-01-01 
2019-01-02 
False 
 
1 
shipped 
2019-01-02 
2019-01-02 
null 
True 
 
 
SCD2 is difficult because:
Deltas can contain multiple and/or late-arriving updates 
Requires updating historic records 
 
 
Performance has degraded over the last few months, with monthly costs quadrupling 
Uses complex process for handling data shuffling  which makes it hard to maintain 
Large volumes of intermittent missing data and duplicates, but given the complexity of the current job, the root-cause could not be identified 
 
Could we improve performance and simplify the PySpark job by making use of Iceberg?
 
A way to organise a datasets' files to present them as a table 
 
Apache Hive  table format defines a table as all the files in one or more particular directories
 
Modern table format  (Apache Hudi , Delta Lake , and Apache Iceberg) store additional metadata and table statistics
 
This allows query engines to better identify relevant data files, minimising data scans and speeding up queries
 
 
 
ACID Transactions 
Hive does not easily support updates or deletes
 
A common work-around is Write-Audit-Publish (WAP)  pattern:
Rewrite sections or all of the data to a staging location 
Audit the new data 
Replace the exising data or point the data catalogue to the new location 
 
 
This causes huge data duplication and redundant ETL jobs
 
Modern table formats ensures ACID guarantees  on inserts, deletes, and updates, with the option to run these operations concurrently
 
 
 
Why Apache Iceberg? 
Comparison of table formats:
Performance  is very dependent on optimisation 
Community support  is comparable 
Ecosystem support  is more varied: 
 
Ecosystem 
Hudi 
Delta Lake 
Iceberg 
 
 
AWS Glue PySpark 
Read+Write+DDL 
Read+Write+DDL 
Read+Write+DDL 
 
Amazon Athena 
Read 
Read 
Read+Write+DDL 
 
 
Athena only has write and DDL support for Iceberg tables 
=> Iceberg makes Athena  a viable alternative to AWS Glue PySpark for ETL 
 
Why Amazon Athena? 
Athena runs queries in a distributed query engine using Trino  under the hood 
Athena has many advantages over Glue PySpark:
Costs based on amount of data scanned ($5/TB) 
Determines optimum cluster query settings dynamically 
Sacrifices  mid-query fault-tolerance for faster execution 
Shallower learning curve 
 
 
Athena V3  better integrated with Data Catalog and Iceberg 
Athena has various service quotas  but these can be increased 
Athena in conjunction with dbt can be used for ETL 
dbt can manage concurrent workloads to minimise throttling  
 
 
3) Evaluation Methodology 
 
Evaluation criteria 
In order of importance :
Compatibility with existing tech stack and tool sets 
Minimise running costs 
Minimise code complexity / maximise readability 
Minimise execution time 
 
Time is less important because use daily batch processes which run over night 
Time is still relevant because:
there is a direct relationship between time and cost for Glue PySpark 
Athena has run time quotas 
 
 
 
 
TPC-DS Benchmark 
TPC-DS  is a data warehousing benchmark consisting of:
25 tables  whose total size can vary (1GB to 100TB) 
99 SQL queries  ranging from simple aggregations to advanced pattern analysis 
 
AWS often uses TPC-DS for example to validate:
 
 
Data curation data generation 
Used TPC-DS connector for AWS Glue  to generate the TPC-DS stores_sales table at scales:
0.1TB (~290 million rows, 21 GB) 
3TB (~8 billion rows, 440 GB) 
 
 
Used a PySpark job to simulate updates with increasing proportion of rows updated: 0.1, 1, 10, 99%
 
By comparison, our largest table oasys_question:
Contains ~3 billion rows, 460 GB 
Receives upto ~2.5 million daily updates (0.08%) 
 
 
 
 
Bulk Insert comparison 
Athena (blue) is cheaper than PySpark (orange) at both scales 
PySpark is faster at larger scales (dashed square) 
 
 
MERGE and SCD2 logic 
MERGE  (ANSI-SQL2003) combines UPDATE and INSERT:
MERGE INTO {object_name | subquery} [ [AS] alias ]
USING table_reference [ [AS] alias ]
ON search_condition
WHEN MATCHED
   THEN UPDATE SET column = { expression | DEFAULT }[, ...]
 WHEN NOT MATCHED
   THEN INSERT [( column[, ...] )] VALUES ( expression[, ...] )
 
 
SCD2 comparison - 100 GB 
Athena is consistently cheaper and faster than PySpark, by a massive margin 
However, Athena fails at the highest update proportions 
 
 
SCD2 comparison - 3 TB 
PySpark fails at all update proportions 
Athena passes at the lower update proportions, as per our use cases 
 
 
Data derivation evaluation 
We used the TPC-DS queries as a subsitute for data derivation processes
Stats for TPC-DS queries against Iceberg tables relative to Hive: 
Scale 
Partitioned 
File Size 
Execution Time 
Data Scanned 
 
 
1GB  
No 
0.7x 
2.7x 
1.5x 
 
3TB  
Yes 
0.03x 
1.2x 
0.9x 
 
 
Note that the 3TB Hive dataset was optimised, unlike for the Iceberg dataset, and we still obtained comparable performance.
 
Data engineering have built various tools  to support data analysis on the MoJ Analytical Platform.
We verified these tools were compatible with Iceberg including:
 
Considerations and Limitations 
Athena support for Iceberg tables has various limitations .
We came across the following limitations during the evaluation:
Athena supports only millisecond precision for timestamps so timestamp columns need to be cast as TIMESTAMP(6) 
Athena has a 100 partitions limit with INSERT INTO, which applies to Iceberg tables as well. See here  for a work-around 
Iceberg metadata is not fully integrated with the Glue Data Catalogue for example:
dropped column still appear 
partitioned columns are not flagged as partitioned 
 
 
 
 
 
Risks 
No time to investigate the impact of:
data skew on write-performance 
table width on write-performance 
simultaneously updating and querying a table on read-performance 
 
 
Replacing dependency on specialist Spark expertise with specialist Athena and Iceberg expertise
 
Athena might not be able to handle future volumes
 
 
 
Knowns Unknowns 
When is Glue PySpark preferred over Athena? 
How to best improve Athena query performance  using sorting, partitions, file compaction etc... 
What is the maximum volume capacity with these optimisations in place? 
How to best scale up for full refreshes in a disaster recovery scenario 
How to best integrate with dbt and create-a-derived-table  
How to best monitor code complexity and flag violations 
How to best publish Iceberg metadata not available in the Data Catalogue 
 
 
 
Contributors 
David Bridgwood 
Chris Foufoulides 
Gwion Aprhobat 
Khristiania Raihan 
Siva Bathina 
Soumaya Mauthoor 
Theodore Manassis 
William Orr
Acknowledgements 
Alex Vilela, Anis Khan, Calum Barnett
 
If we had had more time... 
Run Bulk Insert and SCD2 with Glue PySpark and Athena against Hive tables to estimate performance gains against Iceberg
 
Run the TPC-DS queries in Spark SQL to compare performance against Athena
 
Terraform the codebase to allow collaborators to more easily reproduce the results
 
Investigate SCD2 failures to identify origin and improve understanding of Glue PySpark vs Athena
 
 
 
Apache Iceberg’s approach is to define the table through three categories of metadata. These categories are:
“metadata files” that define the table 
“manifest lists” that define a snapshot of the table 
“manifests” that define groups of data files that may be part of one or more snapshots 
 
source 
 
Athena Resource limits 
When you submit a query, the Athena engine query planner estimates the compute capacity required to run the query and prepares a cluster of compute nodes accordingly.
Some queries like DDL queries run on only one node. Complex queries over large data sets run on much bigger clusters. The nodes are uniform, with the same memory, CPU, and disk configurations. Athena scales out, not up, to process more demanding queries.
Sometimes the demands of a query exceed the resources available to the cluster running the query. The resource most commonly exhausted is memory, but in rare cases it can also be disk space.
source 
 
Because of its size, a distributed dataset is usually stored in partitions, with each partition holding a group of rows. 
This also improves parallelism for operations like a map or filter. 
A shuffle is any operation over a dataset that requires redistributing data across its partitions. 
Examples include sorting and grouping by key.
Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale.
Hive transforms HiveQL queries into MapReduce jobs that run on Apache Hadoop. 
It queries data stored in a distributed storage solution, like the Hadoop Distributed File System (HDFS) or Amazon S3. 
Hive stores its database and table metadata in a metastore, 
which is a database or file backed store that enables easy data abstraction and discovery.
Advantages of Hive:
1. De-facto standard
2. Works with basically every engine
2. Can make use of partitions to speed up queries
3. File format agnostic
4. Can atomically update a partition
Problems with Hive:
1. Table scans unless you make efficient use of partitions
Metadata structures are used to define:
- What is the table?
- What is the table’s schema?
- How is the table partitioned?
- What data files make up the table?
Data Definition Language [DDL](https://en.wikipedia.org/wiki/Data_definition_language) queries. 
In the context of SQL, data definition or data description language (DDL) is a syntax for creating and modifying database objects such as tables, indices, and users. 
DDL statements are similar to a computer programming language for defining data structures, especially database schemas. 
Common examples of DDL statements include CREATE, ALTER, and DROP.
Trino can push down the processing of queries, or parts of queries, into the connected data source: https://trino.io/docs/current/optimizer/pushdown.html
Why dbt?
write custom business logic using SQL
automate data quality testing
deploy the code
publish documentation side-by-side with the code
The relative execution time and data scanned varies depending on the scale and partitioning, but within an acceptable range.