#* condition_era_with_epochs: #* attr: #* fillcolor: '2' #* desc: Assigned condition eras to epochs, removing any conditions that overlap the #* epochs defined as suspicious (prior to 2018-1-1 or after most reasonable date). #* ext: py #* inputs: #* - person_epochs #* - condition_era_full_clean #* # Given an OMOP table and an Epoch definitions table, # place events (rows) into defined epochs, duplicating events # that appear in multiple epochs (for example, a condition # begun prior to COVID19 infection and ended after would be # in both the "pre-covid" and "acute" epoch.) import pyspark.sql.functions as F def condition_era_with_epochs(person_epochs, condition_era_full_clean): # give it a local name omop_table = condition_era_full_clean # if working with something like observation where there's a single event date, just set both here to the same column event_start_date = 'condition_era_start_date' event_end_date = 'condition_era_end_date' # whether to drop events that don't cover any defined epoch? (epoch info will be null) drop_unallocated = True # if there's any data for a given patient covered by these epochs, drop those patients altogether scrub_patients_with_data_in_epochs = [] # for these epochs we'll scrub the epochs (rather the events in them) but not the whole persons scrub_epoch_events = ["suspicious-pre-N3C-date","suspicious-post-last-reasonable-date"] ## TODO: count suspicious person_ids instead of suspicious events; break out by data partners omop_table = omop_table.withColumn('event_id', F.monotonically_increasing_id()) if drop_unallocated: res = person_epochs.join(omop_table, person_epochs.person_id == omop_table.person_id, how = "inner").drop(person_epochs.person_id) else: res = person_epochs.join(omop_table, person_epochs.person_id == omop_table.person_id, how = "right").drop(person_epochs.person_id) ## TODO: epochs are inclusive/exclusive; what about OMOP date ranges? This assumes inclusive/inclusive # dropping epoch/event combos where the event end is before the epoch start # (filter keeps rows that match the expression, but it's easier to remove rows that where the ranges don't overlap, ~ is pyspark for 'not') # we also keep null epochs if they are present (via drop_unallocated option) res = res.filter(~(F.col(event_end_date) < F.col('epoch_start')) | F.col('epoch').isNull()) # keep events that touch the start date at the end of the event (since start of epoch is inclusive) # dropping epoch/event combos where the event start is after the epoch end res = res.filter(~(F.col(event_start_date) >= F.col('epoch_end')) | F.col('epoch').isNull()) # drop events whose start date touches the end date (since end of epoch is exclusive) suspicious_counts = res.filter(res.epoch.isin(scrub_patients_with_data_in_epochs) | res.epoch.isin(scrub_epoch_events)).select('epoch').groupby('epoch').count().collect() print(suspicious_counts) suspicious_persons = res.filter(res.epoch.isin(scrub_patients_with_data_in_epochs)).select('person_id').distinct() print("Scrubbing " + str(suspicious_persons.count()) + " person_id with epochs in person-epoch scrub list.") res = res.join(suspicious_persons, res.person_id == suspicious_persons.person_id, how = "leftanti") suspicious_event_ids = res.filter(res.epoch.isin(scrub_epoch_events)).select('event_id').distinct() print("Scrubbing " + str(suspicious_event_ids.count()) + " events with epochs in event-epoch scrub list.") res = res.join(suspicious_event_ids, res.event_id == suspicious_event_ids.event_id, how = "leftanti") # we also keep null epochs if they are present (via drop_unallocated option) res = res.filter(~res.epoch.isin(scrub_epoch_events) | F.col('epoch').isNull()) return res