center

Evaluation

MoJ Analytical Platform
Codebase: iceberg-evalution

Contents

  1. Summary
  2. Technical Concepts
  3. Evaluation Methodology
  4. Next Steps
  5. Appendix

1) Summary

Data Pipeline Architecture As-Is

  1. AWS DMS for extracting full loads and changed data (cdc)

  2. AWS Glue PySpark for creating curated tables and orchestrated using Step Functions

  3. Amazon Athena for creating derived tables and orchestrated using dbt

  4. Data stored in S3 and metadata in Glue Data Catalog

alt architecture

Data Pipeline Architecture To-Be

Option 1: Convert curated tables to Iceberg table format

architecture_proposed_pyspark

Option 2: Migrate curation to Athena + dbt in conjunction with Iceberg

architecture_proposed

Outcome

  1. Out-of-the-box, Athena + Iceberg is cheaper and more performant for our use cases than Glue PySpark + Iceberg
  2. Iceberg is compatible with the data engineering tool set which facilitates adoption
  3. Iceberg simplifies the code which makes it easier to maintain
Hence we are proceeding with option 2

This also unifies the data engineering tech stack which facilitates collaboration and minimizes duplication

Lessons learnt

Re-evaluate objectives regularly!

The investigation was initially supposed to compare Glue PySpark against Hudi and Iceberg table formats.

We quickly expanded the investigation to include Athena, but wasted time investigating Hudi further when it was clear Iceberg was the clear winner for our use cases.

2) Technical Concepts

Data curation processes

  1. Bulk insert full loads
  2. Remove duplicate data
  3. Apply Type 2 Slowly Changing Dimension (SCD2) to track row changes over time:
id status updated_at valid_from valid_to is_current
1 pending 2019-01-01 2019-01-01 2019-01-02 False
1 shipped 2019-01-02 2019-01-02 null True

SCD2 is difficult because:

  • Deltas can contain multiple and/or late-arriving updates
  • Requires updating historic records

Issues with Glue PySpark job

  1. Performance has degraded over the last few months, with monthly costs quadrupling
  2. Uses complex process for handling data shuffling which makes it hard to maintain
  3. Large volumes of intermittent missing data and duplicates, but given the complexity of the current job, the root-cause could not be identified

Could we improve performance and simplify the PySpark job by making use of Iceberg?

Data Lake table formats

  • A way to organise a datasets' files to present them as a table

  • Apache Hive table format defines a table as all the files in one or more particular directories

  • Modern table format (Apache Hudi, Delta Lake, and Apache Iceberg) store additional metadata and table statistics

  • This allows query engines to better identify relevant data files, minimising data scans and speeding up queries

ACID Transactions

  • Hive does not easily support updates or deletes

  • A common work-around is Write-Audit-Publish (WAP) pattern:

    1. Rewrite sections or all of the data to a staging location
    2. Audit the new data
    3. Replace the exising data or point the data catalogue to the new location
  • This causes huge data duplication and redundant ETL jobs

  • Modern table formats ensures ACID guarantees on inserts, deletes, and updates, with the option to run these operations concurrently

Why Apache Iceberg?

Comparison of table formats:

  1. Performance is very dependent on optimisation
  2. Community support is comparable
  3. Ecosystem support is more varied:
Ecosystem Hudi Delta Lake Iceberg
AWS Glue PySpark Read+Write+DDL Read+Write+DDL Read+Write+DDL
Amazon Athena Read Read Read+Write+DDL

Athena only has write and DDL support for Iceberg tables
=> Iceberg makes Athena a viable alternative to AWS Glue PySpark for ETL

Why Amazon Athena?

  • Athena runs queries in a distributed query engine using Trino under the hood
  • Athena has many advantages over Glue PySpark:
    • Costs based on amount of data scanned ($5/TB)
    • Determines optimum cluster query settings dynamically
    • Sacrifices mid-query fault-tolerance for faster execution
    • Shallower learning curve
  • Athena V3 better integrated with Data Catalog and Iceberg
  • Athena has various service quotas but these can be increased
  • Athena in conjunction with dbt can be used for ETL
  • dbt can manage concurrent workloads to minimise throttling

3) Evaluation Methodology

Evaluation criteria

In order of importance :

  1. Compatibility with existing tech stack and tool sets
  2. Minimise running costs
  3. Minimise code complexity / maximise readability
  4. Minimise execution time
  • Time is less important because use daily batch processes which run over night
  • Time is still relevant because:
    • there is a direct relationship between time and cost for Glue PySpark
    • Athena has run time quotas

TPC-DS Benchmark

TPC-DS is a data warehousing benchmark consisting of:

  • 25 tables whose total size can vary (1GB to 100TB)
  • 99 SQL queries ranging from simple aggregations to advanced pattern analysis

AWS often uses TPC-DS for example to validate:

Data curation evaluation architecture

The compute consists of:

  1. Glue PySpark job with Spark SQL queries
  2. Glue Python shell job with Athena SQL queries

To ensure fairness we used:

  • similar SQL statements
  • out-of-the-box configuration with no optimisations

Data curation data generation

  • Used TPC-DS connector for AWS Glue to generate the TPC-DS stores_sales table at scales:

    • 0.1TB (~290 million rows, 21 GB)
    • 3TB (~8 billion rows, 440 GB)
  • Used a PySpark job to simulate updates with increasing proportion of rows updated: 0.1, 1, 10, 99%

  • By comparison, our largest table oasys_question:

    • Contains ~3 billion rows, 460 GB
    • Receives upto ~2.5 million daily updates (0.08%)

Bulk Insert comparison

center

  • Athena (blue) is cheaper than PySpark (orange) at both scales
  • PySpark is faster at larger scales (dashed square)

MERGE and SCD2 logic

MERGE (ANSI-SQL2003) combines UPDATE and INSERT:

MERGE INTO {object_name | subquery} [ [AS] alias ]
USING table_reference [ [AS] alias ]
ON search_condition
WHEN MATCHED
   THEN UPDATE SET column = { expression | DEFAULT }[, ...]
 WHEN NOT MATCHED
   THEN INSERT [( column[, ...] )] VALUES ( expression[, ...] )
  • Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated

  • This improves performance and simplifies the code

SCD2 comparison - 100 GB

center

  • Athena is consistently cheaper and faster than PySpark, by a massive margin
  • However, Athena fails at the highest update proportions

SCD2 comparison - 3 TB

center

  • PySpark fails at all update proportions
  • Athena passes at the lower update proportions, as per our use cases

Data derivation evaluation

We used the TPC-DS queries as a subsitute for data derivation processes

Stats for TPC-DS queries against Iceberg tables relative to Hive:

Scale Partitioned File Size Execution Time Data Scanned
1GB No 0.7x 2.7x 1.5x
3TB Yes 0.03x 1.2x 0.9x

Note that the 3TB Hive dataset was optimised, unlike for the Iceberg dataset, and we still obtained comparable performance.

Compatibility with AP Tools set

Data engineering have built various tools to support data analysis on the MoJ Analytical Platform.

We verified these tools were compatible with Iceberg including:

Considerations and Limitations

Athena support for Iceberg tables has various limitations.

We came across the following limitations during the evaluation:

  • Athena supports only millisecond precision for timestamps so timestamp columns need to be cast as TIMESTAMP(6)
  • Athena has a 100 partitions limit with INSERT INTO, which applies to Iceberg tables as well. See here for a work-around
  • Iceberg metadata is not fully integrated with the Glue Data Catalogue for example:
    • dropped column still appear
    • partitioned columns are not flagged as partitioned

4) Next Steps

Risks

  • No time to investigate the impact of:

    • data skew on write-performance
    • table width on write-performance
    • simultaneously updating and querying a table on read-performance
  • Replacing dependency on specialist Spark expertise with specialist Athena and Iceberg expertise

  • Athena might not be able to handle future volumes

Knowns Unknowns

  1. When is Glue PySpark preferred over Athena?
  2. How to best improve Athena query performance using sorting, partitions, file compaction etc...
  3. What is the maximum volume capacity with these optimisations in place?
  4. How to best scale up for full refreshes in a disaster recovery scenario
  5. How to best integrate with dbt and create-a-derived-table
  6. How to best monitor code complexity and flag violations
  7. How to best publish Iceberg metadata not available in the Data Catalogue

5) Appendix

Contributors

David Bridgwood
Chris Foufoulides
Gwion Aprhobat
Khristiania Raihan
Siva Bathina
Soumaya Mauthoor
Theodore Manassis
William Orr

Acknowledgements

Alex Vilela, Anis Khan, Calum Barnett

If we had had more time...

  • Run Bulk Insert and SCD2 with Glue PySpark and Athena against Hive tables to estimate performance gains against Iceberg

  • Run the TPC-DS queries in Spark SQL to compare performance against Athena

  • Terraform the codebase to allow collaborators to more easily reproduce the results

  • Investigate SCD2 failures to identify origin and improve understanding of Glue PySpark vs Athena

Iceberg Metadata

Apache Iceberg’s approach is to define the table through three categories of metadata. These categories are:

  • “metadata files” that define the table
  • “manifest lists” that define a snapshot of the table
  • “manifests” that define groups of data files that may be part of one or more snapshots

source

Athena Resource limits

When you submit a query, the Athena engine query planner estimates the compute capacity required to run the query and prepares a cluster of compute nodes accordingly.

Some queries like DDL queries run on only one node. Complex queries over large data sets run on much bigger clusters. The nodes are uniform, with the same memory, CPU, and disk configurations. Athena scales out, not up, to process more demanding queries.

Sometimes the demands of a query exceed the resources available to the cluster running the query. The resource most commonly exhausted is memory, but in rare cases it can also be disk space.

source

Because of its size, a distributed dataset is usually stored in partitions, with each partition holding a group of rows. This also improves parallelism for operations like a map or filter. A shuffle is any operation over a dataset that requires redistributing data across its partitions. Examples include sorting and grouping by key.

Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale. Hive transforms HiveQL queries into MapReduce jobs that run on Apache Hadoop. It queries data stored in a distributed storage solution, like the Hadoop Distributed File System (HDFS) or Amazon S3. Hive stores its database and table metadata in a metastore, which is a database or file backed store that enables easy data abstraction and discovery. Advantages of Hive: 1. De-facto standard 2. Works with basically every engine 2. Can make use of partitions to speed up queries 3. File format agnostic 4. Can atomically update a partition Problems with Hive: 1. Table scans unless you make efficient use of partitions Metadata structures are used to define: - What is the table? - What is the table’s schema? - How is the table partitioned? - What data files make up the table?

Data Definition Language [DDL](https://en.wikipedia.org/wiki/Data_definition_language) queries. In the context of SQL, data definition or data description language (DDL) is a syntax for creating and modifying database objects such as tables, indices, and users. DDL statements are similar to a computer programming language for defining data structures, especially database schemas. Common examples of DDL statements include CREATE, ALTER, and DROP.

Trino can push down the processing of queries, or parts of queries, into the connected data source: https://trino.io/docs/current/optimizer/pushdown.html Why dbt? write custom business logic using SQL automate data quality testing deploy the code publish documentation side-by-side with the code

The relative execution time and data scanned varies depending on the scale and partitioning, but within an acceptable range.