#* person_cohort_epoch_definitions: #* attr: #* fillcolor: '2' #* desc: Uses dates to define patient epochs (aka phases) of their history. Lots of #* important logic here! #* ext: py #* inputs: #* - index_dates_fix_yr10k #* # creates a defintion of patient "epochs" # date ranges over patient histories defined by epoch_name, start_date, and end_date # note that these can overlap, and be entirely different for different patients, # and may not even cover any actual data in a given OMOP table # Epochs are coded as cohortname:epochname, so that epoch existance also defines patient cohorts from pyspark.sql.functions import udf from pyspark.sql.types import * import pyspark.sql.functions as F from datetime import date, timedelta import datetime import random def person_cohort_epoch_definitions(index_dates_fix_yr10k): index_dates = index_dates_fix_yr10k ##### ## epoch definitions ##### ## This function is called for each row/patient; using the dates of interest given we will compute a list of ## "epochs" defined by a name, start date (inclusive), and end date (exclusive). ## NB: This code doesn't use the antibody test information anywhere, but it's available for modifications. def build_epochs(first_covid_diagnosis, first_ip_ed_covid_diagnosis, first_longcovid_diagnosis, first_positive_pcr_or_ag_test, first_longcovid_clinic_visit, first_positive_antibody_test, first_misc_diagnosis, first_b94_etc, last_reasonable_date, observation_period_start_date, observation_period_end_date): epochs = [] # like this: [ pre_history ][ acute_pre_offset ]*[ acute_post_offset ][ post_history ] pre_history = 365 acute_pre_offset = 15 acute_post_offset = 45 post_history = 180 # what do NULLs indicate in these? to be safe we might just use the max(start of the first condition, 2018-1-1) to the min(end of the last, last_reasonable_date) # we'll also pretend we don't really have observations outside the N3C range - ie, set them to 2018-1-1 if observation_period_end_date == None or observation_period_end_date > last_reasonable_date: observation_period_end_date = last_reasonable_date if observation_period_start_date == None or observation_period_end_date < date(2018, 1, 1): observation_period_start_date = date(2018, 1, 1) # we're going to require an acute period -15 to +45 days from index, and 1 year pre and 6 month post epochs # so we at least need enough days, we can drop patients that don't if (observation_period_end_date - observation_period_start_date).days < pre_history + acute_pre_offset + post_history + acute_post_offset: return [] ## Let's define an epoch prior to 2018-01-01 and flag it as suspicious ## Patients might not have any data in this epoch, but if they do we'll be able to use the epoch to identify it ## Same for the epoch after the last_reasonable_date epochs.append({'epoch_name': 'all:suspicious-pre-N3C-date', 'epoch_start': date(1, 1, 1), # yup, thats year 1 AD 'epoch_end': date(2018, 1, 1)}) epochs.append({'epoch_name': 'all:suspicious-post-last-reasonable-date', 'epoch_start': last_reasonable_date, 'epoch_end': date(9999, 12, 31)}) # yup, that's year 9999 ## It's easiest to handle this case first - if there are no covid or long covid events, we can define control cohorts if first_covid_diagnosis == None and first_longcovid_diagnosis == None and \ first_positive_pcr_or_ag_test == None and first_longcovid_clinic_visit == None and \ first_positive_antibody_test == None and first_misc_diagnosis == None and \ first_b94_etc == None: min_available_index_date = observation_period_start_date + timedelta(days = pre_history + acute_pre_offset) min_available_index_date = max(min_available_index_date, date(2020, 3, 1)) max_available_index_date = observation_period_end_date - timedelta(days = post_history + acute_post_offset) # can't define an index, no go if max_available_index_date < min_available_index_date: return [] num_available_days = (max_available_index_date - min_available_index_date).days chosen_available_day = random.randint(0, num_available_days) index_date = min_available_index_date + timedelta(days = chosen_available_day) begin_acute = index_date - timedelta(days = acute_pre_offset) end_acute = index_date + timedelta(days = acute_post_offset) epochs.append({'epoch_name': 'control:pre', 'epoch_start': begin_acute - timedelta(days = pre_history), 'epoch_end': begin_acute}) epochs.append({'epoch_name': 'control:acute', 'epoch_start': begin_acute, 'epoch_end': end_acute}) epochs.append({'epoch_name': 'control:post', 'epoch_start': end_acute, 'epoch_end': end_acute + timedelta(days = post_history)}) return epochs ## Define a covid_index as the minimum of first covid diagnosis (any now!) or first positive PCR/Ag test (if one is null use the other) covid_index = None if first_covid_diagnosis != None or first_positive_pcr_or_ag_test != None: covid_index = min(drop_nones(first_covid_diagnosis, first_positive_pcr_or_ag_test)) ## Define longcovid index date, minimum of diagnosis (b94.8 concept_id before Oct. 1, U09.9 after), same rules as above ## Note that this logic allows the definition of a long covid index even without a covid_index longcovid_index = None if first_longcovid_diagnosis != None: if first_longcovid_diagnosis >= date(2021, 10, 1): longcovid_index = first_longcovid_diagnosis if first_b94_etc != None: if first_b94_etc < date(2021, 10, 1): # we can override a post-Oct2021 diagnosis with an earlier one longcovid_index = min(drop_nones(first_b94_etc, longcovid_index)) ## basic covid-only case (no long covid indicator): define pre-covid, acute, and post-acute epochs if covid_index != None and longcovid_index == None: ## (Note: it may be better to use an epoch start as something fancier (patient's first record date?)) begin_acute = covid_index - timedelta(days = acute_pre_offset) end_acute = covid_index + timedelta(days = acute_post_offset) begin_history = covid_index - timedelta(days = acute_pre_offset + pre_history) end_history = covid_index + timedelta(days = acute_post_offset + post_history) # we have to make sure we have enough pre_history and post_history around the index if (begin_acute - begin_history).days < pre_history or (end_history - end_acute).days < post_history: return [] epochs.append({'epoch_name': 'covid-no-pasc:pre', 'epoch_start': begin_history, 'epoch_end': begin_acute}) epochs.append({'epoch_name': 'covid-no-pasc:acute', 'epoch_start': begin_acute, 'epoch_end': end_acute}) epochs.append({'epoch_name': 'covid-no-pasc:post', 'epoch_start': end_acute, 'epoch_end': end_history}) ## long-covid only case: define pre-long and post-long epochs elif covid_index == None and longcovid_index != None: begin_history = longcovid_index - timedelta(days = pre_history) end_history = longcovid_index + timedelta(days = post_history) # we have to make sure we have enough pre_history and post_history around the index if (longcovid_index - begin_history).days < pre_history or (end_history - longcovid_index).days < post_history: return [] epochs.append({'epoch_name': 'pasc-no-covid:pre', 'epoch_start': begin_history, 'epoch_end': longcovid_index}) epochs.append({'epoch_name': 'pasc-no-covid:post', 'epoch_start': longcovid_index, 'epoch_end': end_history}) ## now those who have both; let's break them down into two groups: those with longcovid indicator greater than 45 days after ## end of acute, and those without elif covid_index != None and longcovid_index != None: ## rationale case: appearance of distinct pre-covid, acute, pre-long, and post-long epochs begin_acute = covid_index - timedelta(days = acute_pre_offset) end_acute = covid_index + timedelta(days = acute_post_offset) # normal case: COVID index is at least 45 days before longcovid index if end_acute < longcovid_index: begin_history = covid_index - timedelta(days = acute_pre_offset + pre_history) end_history = covid_index + timedelta(days = acute_post_offset + post_history) # we have to make sure we have enough pre_history and post_history around the index if (begin_acute - begin_history).days < pre_history or (end_history - end_acute).days < post_history: return [] epochs.append({'epoch_name': 'covid-and-pasc:pre', 'epoch_start': begin_history, 'epoch_end': begin_acute}) epochs.append({'epoch_name': 'covid-and-pasc:acute', 'epoch_start': begin_acute, 'epoch_end': end_acute}) # let's also define a single epoch that doesn't break these out epochs.append({'epoch_name': 'covid-and-pasc:post', 'epoch_start': end_acute, 'epoch_end': end_history}) ## here are the tricky ones - there's a long covid indicator but it's nearby a diagnosis or PCR test ## these are possibly patients who didn't get tested/treated for their initial infection, ## but went back later when symptoms didn't go away, got tested and diagnosed with COVID + longCOVID then? ## This case also picks up those who had long covid indicator *before* covid indicator ## I think it makes sense to ignore the covid index altogether, we could call them pre-long-unknown acute ## and post-long-unknown acute (trusting that long indicator is used appropriately...), ## let's instead give these a special epoch type for breaking out later if desired else: begin_history = longcovid_index - timedelta(days = pre_history) end_history = longcovid_index + timedelta(days = post_history) # we have to make sure we have enough pre_history and post_history around the index if (longcovid_index - begin_history).days < pre_history or (end_history - longcovid_index).days < post_history: return [] epochs.append({'epoch_name': 'pasc-suspicious-covid:pre', 'epoch_start': begin_history, 'epoch_end': longcovid_index}) epochs.append({'epoch_name': 'pasc-suspicious-covid:post', 'epoch_start': longcovid_index, 'epoch_end': end_history}) return epochs ##### ## transform junk ##### schema = ArrayType(StructType([ StructField("epoch_name", StringType(), False), StructField("epoch_start", DateType(), True), StructField("epoch_end", DateType(), True) ])) build_epochs_udf = udf(build_epochs, schema) res = index_dates.withColumn('epochs_struct', build_epochs_udf("first_covid_diagnosis", "first_ip_ed_covid_diagnosis", "first_longcovid_diagnosis", "first_positive_pcr_or_ag_test", "first_longcovid_clinic_visit", "first_positive_antibody_test", "first_misc_diagnosis", "first_b94_etc", "last_reasonable_date", "observation_period_start_date", "observation_period_end_date")) res = res.withColumn('epochs_array', F.explode('epochs_struct')) res = res.withColumn('epoch_name', F.col('epochs_array.epoch_name')) res = res.withColumn('epoch_start', F.col('epochs_array.epoch_start')) res = res.withColumn('epoch_end', F.col('epochs_array.epoch_end')) res = res.select('person_id', 'epoch_name', 'epoch_start', 'epoch_end') return res # returns the inputs as a list with no None entries # drop_nones(1, None, 2, 4) returns [1, 2, 4] # drop_nones(some_date, None) returns [some_date] # drop_nones(None, some_date) returns [some_date] def drop_nones(*args): return [x for x in args if x != None]