Deduplication using Pyspark
Linking in SparkΒΆ
from splink.spark.jar_location import similarity_jar_location
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import types
conf = SparkConf()
# This parallelism setting is only suitable for a small toy example
conf.set("spark.driver.memory", "12g")
conf.set("spark.default.parallelism", "16")
# Add custom similarity functions, which are bundled with Splink
# documented here: https://github.com/moj-analytical-services/splink_scalaudfs
path = similarity_jar_location()
conf.set("spark.jars", path)
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
spark.sparkContext.setCheckpointDir("./tmp_checkpoints")
23/08/17 15:07:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
# Disable warnings for pyspark - you don't need to include this
import warnings
spark.sparkContext.setLogLevel("ERROR")
warnings.simplefilter("ignore", UserWarning)
from splink.datasets import splink_datasets
pandas_df = splink_datasets.fake_1000
df = spark.createDataFrame(pandas_df)
import splink.spark.comparison_library as cl
import splink.spark.comparison_template_library as ctl
from splink.spark.blocking_rule_library import block_on
settings = {
"link_type": "dedupe_only",
"comparisons": [
ctl.name_comparison("first_name"),
ctl.name_comparison("surname"),
ctl.date_comparison("dob", cast_strings_to_date=True),
cl.exact_match("city", term_frequency_adjustments=True),
ctl.email_comparison("email", include_username_fuzzy_level=False),
],
"blocking_rules_to_generate_predictions": [
block_on("first_name"),
"l.surname = r.surname", # alternatively, you can write BRs in their SQL form
],
"retain_matching_columns": True,
"retain_intermediate_calculation_columns": True,
"em_convergence": 0.01
}
from splink.spark.linker import SparkLinker
linker = SparkLinker(df, settings, spark=spark)
deterministic_rules = [
"l.first_name = r.first_name and levenshtein(r.dob, l.dob) <= 1",
"l.surname = r.surname and levenshtein(r.dob, l.dob) <= 1",
"l.first_name = r.first_name and levenshtein(r.surname, l.surname) <= 2",
"l.email = r.email"
]
linker.estimate_probability_two_random_records_match(deterministic_rules, recall=0.6)
--WARN--
You are using datediff comparison
with str-casting and ANSI is not enabled. Bad dates
e.g. 1999-13-54 will not trigger an exception but will
classed as comparison level = "ELSE". Ensure date strings
are cleaned to remove bad dates
Probability two random records match is estimated to be 0.0806.
This means that amongst all possible pairwise record comparisons, one in 12.41 are expected to match. With 499,500 total possible comparisons, we expect a total of around 40,246.67 matching pairs
linker.estimate_u_using_random_sampling(max_pairs=5e5)
----- Estimating u probabilities using random sampling -----
Estimated u probabilities using random sampling
Your model is not yet fully trained. Missing estimates for:
- first_name (no m values are trained).
- surname (no m values are trained).
- dob (no m values are trained).
- city (no m values are trained).
- email (no m values are trained).
training_blocking_rule = "l.first_name = r.first_name and l.surname = r.surname"
training_session_fname_sname = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)
training_blocking_rule = "l.dob = r.dob"
training_session_dob = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)
----- Starting EM training session -----
Estimating the m probabilities of the model by blocking on:
l.first_name = r.first_name and l.surname = r.surname
Parameter estimates will be made for the following comparison(s):
- dob
- city
- email
Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules:
- first_name
- surname
Iteration 1: Largest change in params was -0.698 in probability_two_random_records_match
Iteration 2: Largest change in params was 0.0569 in the m_probability of email, level `All other comparisons`
Iteration 3: Largest change in params was 0.0193 in the m_probability of email, level `All other comparisons`
Iteration 4: Largest change in params was 0.0081 in the m_probability of email, level `All other comparisons`
EM converged after 4 iterations
Your model is not yet fully trained. Missing estimates for:
- first_name (no m values are trained).
- surname (no m values are trained).
----- Starting EM training session -----
Estimating the m probabilities of the model by blocking on:
l.dob = r.dob
Parameter estimates will be made for the following comparison(s):
- first_name
- surname
- city
- email
Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules:
- dob
Iteration 1: Largest change in params was -0.531 in the m_probability of surname, level `Exact match surname`
Iteration 2: Largest change in params was 0.138 in probability_two_random_records_match
Iteration 3: Largest change in params was 0.0478 in probability_two_random_records_match
Iteration 4: Largest change in params was 0.0193 in probability_two_random_records_match
Iteration 5: Largest change in params was 0.00956 in probability_two_random_records_match
EM converged after 5 iterations
Your model is fully trained. All comparisons have at least one estimate for their m and u values
results = linker.predict(threshold_match_probability=0.9)
results.as_pandas_dataframe(limit=5)
match_weight | match_probability | unique_id_l | unique_id_r | first_name_l | first_name_r | gamma_first_name | bf_first_name | surname_l | surname_r | ... | gamma_city | tf_city_l | tf_city_r | bf_city | bf_tf_adj_city | email_l | email_r | gamma_email | bf_email | match_key | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 9.061592 | 0.998132 | 405 | 406 | NaN | NaN | 4 | 11.458919 | eoookC | Cooke | ... | 0 | 0.001 | 0.187 | 0.624618 | 1.000000 | l.cooke@thompson-williams.info | l.cooke@thompson-williams.info | 3 | 8.470049 | 0 |
1 | 19.266762 | 0.999998 | 811 | 813 | Elliott | Elliott | 4 | 11.458919 | NaN | NaN | ... | 1 | 0.006 | 0.006 | 5.892455 | 11.876543 | e.b30@little.biz | e.b30zlittle.bi@ | 1 | 252.866021 | 0 |
2 | 8.780607 | 0.997731 | 8 | 10 | NaN | NaN | 4 | 11.458919 | Dean | Dean | ... | 0 | 0.187 | 0.014 | 0.624618 | 1.000000 | NaN | evied56@harris-bailey.net | 0 | 0.349378 | 0 |
3 | 13.961451 | 0.999937 | 829 | 830 | Mason | Mason | 4 | 11.458919 | NaN | Smith | ... | 0 | 0.013 | 0.001 | 0.624618 | 1.000000 | masons2@reed.com | masons5@2@reed.com | 1 | 252.866021 | 0 |
4 | 17.466985 | 0.999994 | 35 | 36 | NaN | NaN | 4 | 11.458919 | Bron | Brrown | ... | 1 | 0.009 | 0.009 | 5.892455 | 7.917695 | NaN | lola.b@martinez-jones.net | 0 | 0.349378 | 0 |
5 rows Γ 28 columns