import dedupe
import os
import itertools
import time
import logging
import optparse
import locale
import pickle
import multiprocessing
import sqlite3 as lite
import pandas as pd
import unicodedata
import csv
import sys
pd.options.display.max_columns = 999
Original data and cleaning¶
data = pd.read_csv('../data/companies-with-controlling-entities v6.csv', encoding='utf-8')
data.head(2)
# TODO - trim left and right of name for spaces?
# TODO - check that the index is unique, or will cause problems at a later date!
Select just the columns that are useful for entity resolution. Then:
- Remove rows with nulls in for family name and given name columns.
- Fill NaNs in other columns with None (required for dedupe package to work properly on String types).
- Convert numbers to strings - maybe the best way of dealing with human input text?? Open question!
to_dedupe = data[['family_name', 'given_name', 'middle_name', 'dob_year', 'dob_month']].copy()
# Some utility functions for getting dataframes ready for deduping
def remove_nulls_from_these_columns(df, columns, verbose=False):
""" Utility function to remove nulls from the specified columns of a dataframe.
Inputs: - df -> dataframe to remove nulls from
- columns -> list of columns which should have no nulls in
Outputs: - new dataframe without nulls
"""
if verbose:
print("Number of rows before removing nulls: {}".format(len(df)))
to_remove = df
for col in columns:
no_null = to_remove[to_remove[col].notnull()]
if verbose:
print("Number of rows after removing " + col + ": {}".format(len(no_null)))
to_remove = no_null
return no_null
def fill_nulls_with_none(df):
""" Fills nulls in a dataframe with None.
This is required for the Dedupe package to work properly.
Input: - dataframe with nulls as NaN
Output: - new dataframe with nulls as None
"""
new_df = df.copy()
for col in df.columns:
new_df[col] = new_df[col].where(new_df[col].notnull(), None)
return new_df
def convert_numbers_to_strings(df, cols_to_convert, remove_point_zero=True):
""" Convert number types to strings in a dataframe.
This is convoluted as need to keep NoneTypes as NoneTypes for what comes next!
Inputs: - df -> dataframe to convert number types
- cols_to_convert -> list of columns to convert
- remove_point_zero -> bool to say whether you want '.0' removed from number
Ouputs: - dataframe with converted number types
"""
new_df = df.copy()
for col in cols_to_convert:
if remove_point_zero:
new_df[col] = new_df[col].apply(lambda x: str(x).replace('.0','')\
if not isinstance(x, type(None)) else x)
else:
new_df[col] = new_df[col].apply(lambda x: str(x)\
if not isinstance(x, type(None)) else x)
return new_df
to_dedupe = remove_nulls_from_these_columns(to_dedupe, ['family_name','given_name'], verbose=True)
to_dedupe = fill_nulls_with_none(to_dedupe)
to_dedupe = convert_numbers_to_strings(to_dedupe, ['dob_year','dob_month'])
to_dedupe.head()
Dedupe requires that the input data be in the form of a list of dicts:
to_dedupe_dict = to_dedupe.to_dict(orient = 'index')
This is what the entries of the list look like:
to_dedupe_dict[0]
For convenience while testing, store the to_dedupe_dict
object as a pickle file:
with open('to_dedupe_dict.pkl', 'wb') as f:
pickle.dump(to_dedupe_dict, f)
with open('to_dedupe_dict.pkl', 'rb') as f:
to_dedupe_dict = pickle.load(f)
# docs for this are here:
fields = [{'field' : 'family_name', 'type' : 'String'},
{'field' : 'given_name', 'type': 'String'},
{'field' : 'middle_name', 'type': 'String', 'has_missing' : True},
{'field' : 'dob_year', 'type': 'String', 'has_missing' : True},
{'field' : 'dob_month', 'type': 'String', 'has_missing' : True}]
# There is a bug later on that requires num_cores to be 1, but we can make use of
# multi-threaded processes in the meantime
deduper = dedupe.Dedupe(fields, num_cores=4)
We then sample the data to get something to train on. More information about all of the deduper
object methods can be found here. Note that training is done by a human - pairs of records are presented and you're asked whether they are referring to the same thing or not (coming to this bit shortly!)
deduper.sample(to_dedupe_dict, sample_size=1000000)
Training dedupe with human-matched records¶
If you've already done the training, load the file that contains the record pairs marked as being duplicates or not. (It gets saved as a json file).
training_file = 'controlling_entities_dedupe_training.json'
if os.path.exists(training_file):
print('reading labeled examples from ', training_file)
with open(training_file, 'rb') as tf:
deduper.readTraining(tf)
else:
print('no training file found - will create new one: ' + training_file)
dedupe.convenience.consoleLabel(deduper)
Once you've provided the training data, you can train the model.
# learn both the classifier and blocking rules
# max_comparisons seems to be SUPER important in finding good predicates
deduper.train(recall=0.9, maximum_comparisons=1000000000)
The blocking rules have now been found, and the classifier has also been trained. The classifier (which determines whether 2 records are the same or not) is an L2 regularized logistic regression. Any model in SK learn with a fit and predict_proba method could be used in its place.
Just to demonstrate the inputs to the model, we select 2 records from the inputs:
record_pairs = ((to_dedupe_dict[0], to_dedupe_dict[1]),)
The model features are given by:
print(deduper.data_model._field_comparators)
And the input values for this particular pair of records by:
print(deduper.data_model.distances(record_pairs=record_pairs))
The coefficients in the model itself are given by:
print(deduper.classifier.weights)
Anyway, after training we can save the data we labelled, as well as the settings for this particular run:
settings_file = 'dedupe_settings'
with open(settings_file, 'wb') as f:
deduper.writeSettings(f)
with open(training_file, 'w') as f:
deduper.writeTraining(f)
The training finds both the Predicates for creating 'blocks' (or 'canopies' - for a good explanation, see this excellent paper) and the classifier for actually determining whether two records are referring to the same entity.
The documentation provides a good overview on why one might want to use blocks/canopies.
My understanding is that blocks/canopies are created for every single predicate - in other words, blocks/canopies are created on the entire dataset as many times as there are predicates.
deduper.blocker.predicates
Some predicates rely on an 'inverted index', these are the CanopyPredicates
and the SearchPredicates
. You can avoid predicates that rely on inverted indexes by setting index_predicates
to False
in the train
method.
Inverted indexes are used to "find the documents where the word X occurs". Unfortunately it seems quite challenging to print out the inverted index itself to understand exactly what it's doing. Nevermind!
Note that canopies are used by predicates that require an inverted index, and blocks are used by simple predicates. The same row can appear in multiple canopies (i.e. canopies can overlap), while the same row only ever appears in one block.
for field in deduper.blocker.index_fields:
print(field)
With large data sets, the data_sample, training_pairs, training_data, and activeLearner objects are potentially very large memory-wise. This command deletes them to free up said memory.
deduper.cleanupTraining()
Storing the raw data in a database¶
For portability, it's convenient to use SQLite, rather than anything bigger and grander requiring a database server. You probably wouldn't want to deal with hundreds of millions or rows like this though!
database_name = 'controlling_entities_dedupe_database.db'
if os.path.exists(database_name):
print("database '{}' already exists. deleting it and starting from scratch!".format(database_name))
os.remove(database_name)
conn = lite.connect(database_name)
As a reminder, the raw, un-deduplicated data looks like this:
to_dedupe.head()
It's convenient to use pandas to create a table in the database. We store the raw data in a table called actual_data
:
c1 = conn.cursor()
c1.execute("DROP TABLE IF EXISTS actual_data")
to_dedupe.to_sql('actual_data', conn, if_exists='fail', index=True, index_label = 'id')
c1.close()
Creating inverted indexes for the fields that require it¶
i.e. those which have a CanopyPredicate or SearchPredicate associated with them
# A dictionary of the Index Predicates that will used for blocking.
# The keys are the fields the predicates will operate on.
deduper.blocker.index_fields
# this does not add an index if there are no canopy predicates in the index_fields
c3 = conn.cursor()
for field in deduper.blocker.index_fields:
print('creating inverted index on: ' + field)
c3.execute("SELECT DISTINCT {field} FROM actual_data "
"WHERE {field} IS NOT NULL".format(field = field))
field_data = (row[0] for row in c3)
# Indexes the data from a field for use in an index predicate/canopy
# This goes through row-by-row
deduper.blocker.index(field_data, field)
c3.close()
Creating the blocks/canopies¶
Start by creating the (empty) table in the database that will contain the mapping between the blocks and the row_ids.
c2 = conn.cursor()
c2.execute("DROP TABLE IF EXISTS blocking_map")
c2.execute("CREATE TABLE blocking_map "
"(block_key VARCHAR(200), id INTEGER) ")
c2.close()
SELECT_STATEMENT = "SELECT id, family_name, given_name, middle_name, " \
"dob_year, dob_month FROM actual_data"
c4 = conn.cursor()
print('writing blocking map')
c4.execute(SELECT_STATEMENT)
# the data should be fed to the blocker as an iterable of the form:
# data = [(1, {'name' : 'bob'}), (2, {'name' : 'suzanne'}), ... ]
full_data = ((row[0], {'id': row[0],
'family_name' : row[1],
'given_name' : row[2],
'middle_name' : row[3],
'dob_year' : row[4],
'dob_month' : row[5]}) for row in c4)
# Generate the predicates for records. Yields tuples of (predicate, record_id)
b_data = deduper.blocker(full_data)
# b_data has the form:
# [('foo:1', 1), ..., ('bar:1', 100)]
# b_data is a generator so will yield output as looped over
c5 = conn.cursor()
for row in b_data:
c5.execute("INSERT INTO blocking_map (block_key, id) VALUES ('{block_key}',{id})".\
format(block_key=row[0],id=row[1]))
c5.close()
c4.close()
conn.commit()
The blocking_map actually looks like the following:
# How well has the blocking worked? check that there are not loads of blocks with single counts
blocking_map_df = pd.read_sql("SELECT * FROM blocking_map", conn)
blocking_map_df.head()
Conceptually, you can think of the blocks/canopies as looking something like this (note that you end up with this situation for each Predicate dedupe finds):
from IPython.display import Image
Image(filename="./diagrams/blocking_map.png",width=400,height=400)
And here is a list of the ten largest blocks:
blocking_map_df.groupby('block_key')['block_key'].agg('count').sort_values(ascending = False).head(10)
Note - when rows do not fall into any block they are excluded (because they can only be singletons), so the length of the blocking_map
might be shorter than the overall data.
It may, of course, also be longer as the same row can appear in multiple blocks/canopies.
print(len(blocking_map_df))
print(len(to_dedupe))
If we merge the blocking map back onto the original data, we can see what has been lumped together:
merged = pd.merge(blocking_map_df, to_dedupe, left_on = 'id', right_index = True, how = 'right')
merged.sort_values(by='block_key').head()
merged[merged.family_name == 'Loch']
Finally, we create an index on the block_key
for faster queries, and free up memory used by the inverted indexes:
c = conn.cursor()
logging.info("indexing block_key")
c.execute("CREATE INDEX blocking_map_key_idx ON blocking_map (block_key)")
deduper.blocker.resetIndices()
SQL manipulations to prepare blocks for matching¶
The next steps require some data manipulations to prepare the blocks into the format required by the matchBlocks
method. See the matchBlocks
section in the documentation for exactly what that format is.
A MySQL version of these steps can be found here.
These are the tables that we will be created shortly:
c.execute("DROP TABLE IF EXISTS plural_key")
c.execute("DROP TABLE IF EXISTS plural_block")
c.execute("DROP TABLE IF EXISTS covered_blocks")
c.execute("DROP TABLE IF EXISTS smaller_coverage")
Plural Key table¶
Here, the block_keys (which are fairly long varchar types) are reassigned an integer id, imaginatively called block_id. Further, if one block_key forms identical blocks, we only keep one of them. Further further, we only keep blocks that have more than one record in them (as that record is obviously not going to be matched to another record) - this is why the word 'plural' is used!
logging.info("calculating plural_key")
c.execute(" DROP TABLE IF EXISTS plural_key")
c.execute("CREATE TABLE plural_key "
"(block_key VARCHAR(200), "
" block_id INTEGER PRIMARY KEY AUTOINCREMENT) ")
c.execute(" INSERT INTO plural_key (block_key) "
" SELECT MIN(block_key) FROM "
" (SELECT block_key, GROUP_CONCAT(id) AS block FROM "
" (SELECT block_key, id FROM blocking_map ORDER BY block_key, id) AS a "
" GROUP BY block_key "
" HAVING COUNT(*) > 1) AS b "
" GROUP BY block ")
... and this is what plural_key
actually looks like:
plural_key_df = pd.read_sql("SELECT * FROM plural_key", conn)
plural_key_df.head()
The length of plural_key
is the number of blocks within which records will be matched later:
print(len(plural_key_df))
As a sanity check, there should be no duplicated rows in plural_key
:
try:
assert len(plural_key_df[plural_key_df.duplicated(keep = False)]) == 0
print("no duplicated rows in plural_key")
except AssertionError:
print("somehow duplicated rows have crept into plural_key - fix before continuing!!")
And finally we index the table appropriately:
logging.info("creating block_key index")
c.execute("CREATE UNIQUE INDEX block_key_idx ON plural_key (block_key)")
Plural Block table¶
This simply links block_id
calculated in plural_key
above back to the records that each block contains (as represented by id
).
logging.info("calculating plural_block")
c.execute(" DROP TABLE IF EXISTS plural_block ")
c.execute(" CREATE TABLE plural_block AS "
" SELECT block_id, id "
" FROM blocking_map AS a "
" INNER JOIN plural_key AS b "
" ON a.block_key = b.block_key "
" ORDER BY block_id, id")
plural_block_df = pd.read_sql("SELECT * FROM plural_block", conn)
plural_block_df.head()
print(len(plural_block_df))
try:
assert len(plural_block_df[plural_block_df.duplicated(keep = False)]) == 0
print("no duplicated rows in plural_block")
except AssertionError:
print("somehow duplicated rows have crept into plural_block - fix before continuing!!")
logging.info("adding id index and sorting index")
c.execute("CREATE INDEX plural_block_id_idx ON plural_block (id)")
c.execute("CREATE UNIQUE INDEX plural_block_block_id_id_uniq "
" ON plural_block (block_id, id)")
Covered Blocks table¶
This table maps records to the blocks they appear in (if more than one, all blocks are concatenated with a comma separator). Remember that, at this stage, blocks with just one record in ("singleton" blocks) have been excluded.
logging.info("creating covered_blocks")
c.execute(" CREATE TABLE covered_blocks AS "
" SELECT id, GROUP_CONCAT(block_id) AS sorted_ids"
" FROM (SELECT id, block_id FROM plural_block ORDER BY id, block_id) AS a "
" GROUP BY id")
covered_blocks_df = pd.read_sql("SELECT * FROM covered_blocks", conn)
covered_blocks_df.head()
Number of rows in covered_blocks
:
print(len(covered_blocks_df))
Examples of rows which appear in more than one block:
covered_blocks_df[covered_blocks_df.sorted_ids.str.contains(',')].head()
c.execute("CREATE UNIQUE INDEX covered_blocks_id_idx "
"ON covered_blocks (id)")
conn.commit()
Smaller Coverage table¶
This table is a PITA to think about. Essentially, it says: for each record, which block_id
is it in; is it in any other block_ids
; and if so, which other block_ids
are smaller than this one? The purpose is that we don't want to make multiple comparisons of the same pairs of records. If both records fall into the two blocks, for example, we only want to compare them once.
logging.info("creating smaller_coverage")
c.execute(" DROP TABLE IF EXISTS smaller_coverage ")
c.execute(" CREATE TABLE smaller_coverage "
" AS SELECT a.id, block_id, sorted_ids, "
" RTRIM(SUBSTR(sorted_ids, 1, INSTR(sorted_ids, block_id) - 1), ',') as smaller_ids "
" FROM plural_block AS a "
" INNER JOIN covered_blocks AS b "
" ON a.id = b.id")
smaller_coverage_df = pd.read_sql("SELECT * FROM smaller_coverage", conn)
smaller_coverage_df.head()
print(len(smaller_coverage_df))
Examples of where smaller_ids
is not empty:
smaller_coverage_df[smaller_coverage_df.smaller_ids.str.contains(',')].head()
Clustering within the blocks/canopies¶
First create the final table that is fed to matchBlocks
:
logging.info("creating final table")
c.execute("CREATE TABLE final AS "
"SELECT a.id, family_name,given_name, middle_name, dob_year, dob_month, "
"block_id, smaller_ids "
"FROM smaller_coverage AS a "
"INNER JOIN actual_data AS b "
"ON a.id = b.id "
"ORDER BY (block_id)")
conn.commit()
to_match = pd.read_sql("SELECT * FROM final", conn)
to_match.head()
Write a generator function that feeds the blocks to matchBlocks
in the correct format:
start_time = time.time()
# Clustering function
def candidates_gen(result_set) :
lset = set
block_id = None
records = []
i = 0
for row in result_set :
# need to change from list to dict for sqlite query
row = {'id': row[0],
'family_name' : row[1],
'given_name' : row[2],
'middle_name' : row[3],
'dob_year' : row[4],
'dob_month' : row[5],
'block_id' : row[6],
'smaller_ids' : row[7]}
# if there's a new block id then yield the old block
if row['block_id'] != block_id :
#print("done with block id: {}".format(row['block_id']))
if records :
#print(len(records))
yield records
block_id = row['block_id']
records = []
i += 1
if i % 10000 == 0 :
print(i, "blocks")
print(time.time() - start_time, "seconds")
smaller_ids = row['smaller_ids']
if smaller_ids :
smaller_ids = lset(smaller_ids.split(','))
else :
smaller_ids = lset([])
records.append((row['id'], row, smaller_ids))
# once gone through the loop need to yield the last block
if records :
yield records
Deal with the bug where matchBlocks
hangs if it is using more than one core:
# this has to be set to 1 or matchBlocks fails
deduper.num_cores = 1
c.execute("SELECT * FROM final")
# cands is the iterable
cands = candidates_gen(c)
Finally - at the stage where we can perform the comparison between all the records in a block:
print('clustering...')
clustered_dupes = deduper.matchBlocks(cands)
Saving the results¶
We store the results of matchBlocks
in a table called entity_map
:
c.execute("DROP TABLE IF EXISTS entity_map")
print('creating entity_map database')
c.execute("CREATE TABLE entity_map "
"(id INTEGER, canon_id INTEGER, "
" cluster_score FLOAT, PRIMARY KEY(id))")
for cluster, scores in clustered_dupes :
cluster_id = cluster[0]
for id, score in zip(cluster, scores) :
c.execute('INSERT INTO entity_map VALUES ({}, {}, {})'.format(id, cluster_id, score))
c.execute("CREATE INDEX head_index ON entity_map (canon_id)")
conn.commit()
entity_map_df = pd.read_sql("SELECT * FROM entity_map", conn)
entity_map_df.head()
Print the number of duplicates found:
print('# duplicate sets')
print(len(clustered_dupes))
Combining the results with the original data¶
output = pd.merge(to_dedupe, entity_map_df, left_index = True, right_on = 'id', how = 'left')
output.sort_values(by = 'canon_id').head(10)
An example of success - for this person, the dob_year
is different, but each row fairly surely refers to the same thing, so they have the same canon_id
:
output[output.family_name == 'Meath Baker']
Rows with the same canon_id
are the rows which dedupe believes are the referring to the same thing. The cluster_score
is a measure of how sure dedupe is about the comparison. If there is no canon_id
at all, dedupe doesn't believe that that row has a pair.
By playing with the precision/recall in the train
(and other?) methods, one can move the metaphorical slider between capturing all of the duplicated records (at the expense of including some pairs which aren't referring to the same person - high recall), and being certain that the records you say are pairs really are pairs (at the expense of missing some of the pairs that should be lumped together - high precision).
Much more information about entity resolution can be found in this PhD thesis. Indeed the dedupe package itself is based on this work!
# save output to a csv
output.to_csv('entity_resolved_controlling_entities.csv')