Skip to content

Deduplication using Pyspark

Linking in Spark

from splink.spark.jar_location import similarity_jar_location

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import types

conf = SparkConf()
# This parallelism setting is only suitable for a small toy example
conf.set("spark.driver.memory", "12g")
conf.set("spark.default.parallelism", "16")


# Add custom similarity functions, which are bundled with Splink
# documented here: https://github.com/moj-analytical-services/splink_scalaudfs
path = similarity_jar_location()
conf.set("spark.jars", path)

sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)
spark.sparkContext.setCheckpointDir("./tmp_checkpoints")

# Register the jaro winkler custom udf
spark.udf.registerJavaFunction(
    "jaro_winkler", "uk.gov.moj.dash.linkage.JaroWinklerSimilarity", types.DoubleType()
)
22/09/19 14:39:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

import pandas as pd 
df = spark.read.csv("./data/fake_1000.csv", header=True)
import splink.spark.spark_comparison_library as cl

settings = {
    "link_type": "dedupe_only",
    "comparisons": [
        cl.jaro_winkler_at_thresholds("first_name", 0.8),
        cl.jaro_winkler_at_thresholds("surname", 0.8),
        cl.levenshtein_at_thresholds("dob"),
        cl.exact_match("city", term_frequency_adjustments=True),
        cl.levenshtein_at_thresholds("email"),
    ],
    "blocking_rules_to_generate_predictions": [
        "l.first_name = r.first_name",
        "l.surname = r.surname",
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
    "em_convergence": 0.01
}
from splink.spark.spark_linker import SparkLinker
linker = SparkLinker(df, settings)
deterministic_rules = [
    "l.first_name = r.first_name and levenshtein(r.dob, l.dob) <= 1",
    "l.surname = r.surname and levenshtein(r.dob, l.dob) <= 1",
    "l.first_name = r.first_name and levenshtein(r.surname, l.surname) <= 2",
    "l.email = r.email"
]

linker.estimate_probability_two_random_records_match(deterministic_rules, recall=0.6)
/Users/robinlinacre/Documents/data_linking/splink_demos/venv/lib/python3.8/site-packages/pyspark/sql/dataframe.py:148: UserWarning: DataFrame.sql_ctx is an internal property, and will be removed in future releases. Use DataFrame.sparkSession instead.
  warnings.warn(
Probability two random records match is estimated to be  0.00389.
This means that amongst all possible pairwise record comparisons, one in 257.25 are expected to match.  With 499,500 total possible comparisons, we expect a total of around 1,941.67 matching pairs

linker.estimate_u_using_random_sampling(target_rows=5e5)
----- Estimating u probabilities using random sampling -----                    

22/09/19 14:39:14 WARN DataSource: All paths were ignored:
  file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_concat_with_tf

                                                                                
Estimated u probabilities using random sampling

Your model is not yet fully trained. Missing estimates for:
    - first_name (no m values are trained).
    - surname (no m values are trained).
    - dob (no m values are trained).
    - city (no m values are trained).
    - email (no m values are trained).

training_blocking_rule = "l.first_name = r.first_name and l.surname = r.surname"
training_session_fname_sname = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

training_blocking_rule = "l.dob = r.dob"
training_session_dob = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

----- Starting EM training session -----

Estimating the m probabilities of the model by blocking on:
l.first_name = r.first_name and l.surname = r.surname

Parameter estimates will be made for the following comparison(s):
    - dob
    - city
    - email

Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: 
    - first_name
    - surname


22/09/19 14:39:20 WARN DataSource: All paths were ignored:
  file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_comparison_vectors_77ebe74

Iteration 1: Largest change in params was -0.53 in the m_probability of dob, level `Exact match`
Iteration 2: Largest change in params was 0.0335 in probability_two_random_records_match
Iteration 3: Largest change in params was 0.0129 in probability_two_random_records_match
Iteration 4: Largest change in params was 0.00639 in probability_two_random_records_match

EM converged after 4 iterations

Your model is not yet fully trained. Missing estimates for:
    - first_name (no m values are trained).
    - surname (no m values are trained).

----- Starting EM training session -----

Estimating the m probabilities of the model by blocking on:
l.dob = r.dob

Parameter estimates will be made for the following comparison(s):
    - first_name
    - surname
    - city
    - email

Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: 
    - dob


22/09/19 14:39:26 WARN DataSource: All paths were ignored:
  file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_comparison_vectors_2bd95d3

Iteration 1: Largest change in params was -0.413 in the m_probability of surname, level `Exact match`
Iteration 2: Largest change in params was 0.108 in probability_two_random_records_match
Iteration 3: Largest change in params was 0.0348 in probability_two_random_records_match
Iteration 4: Largest change in params was 0.0133 in probability_two_random_records_match
Iteration 5: Largest change in params was 0.00593 in probability_two_random_records_match

EM converged after 5 iterations

Your model is fully trained. All comparisons have at least one estimate for their m and u values

results = linker.predict(threshold_match_probability=0.9)
22/09/19 14:39:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/09/19 14:39:32 WARN DataSource: All paths were ignored:
  file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_predict_99e9cb4

results.as_pandas_dataframe(limit=5)
match_weight match_probability unique_id_l unique_id_r first_name_l first_name_r gamma_first_name bf_first_name surname_l surname_r ... gamma_city tf_city_l tf_city_r bf_city bf_tf_adj_city email_l email_r gamma_email bf_email match_key
0 3.371739 0.911904 220 223 Logan Logan 2 86.748396 serguFon Ferguson ... 1 0.212792 0.212792 10.316002 0.259162 l.feruson46@sahh.com None -1 1.000000 0
1 14.743406 0.999964 879 880 Leo Leo 2 86.748396 Webster Webster ... 1 0.008610 0.008610 10.316002 6.404996 leo.webster54@moore.biez None -1 1.000000 0
2 13.140266 0.999889 446 450 Aisha Aisha 2 86.748396 Bryant None ... 0 0.011070 0.001230 0.456259 1.000000 aishab64@obrien-flores.com aishab64@obrien-flores.com 3 257.458944 0
3 8.829126 0.997806 446 448 Aisha Aisha 2 86.748396 Bryant BryBant ... 0 0.011070 0.001230 0.456259 1.000000 aishab64@obrien-flores.com aishab64@obrien-flores.com 3 257.458944 0
4 6.584844 0.989690 790 791 Jackson Jackson 2 86.748396 Fisreh Fishier ... 0 0.009840 0.001230 0.456259 1.000000 j.fisher4@sullivan.com None -1 1.000000 0

5 rows × 28 columns