#* topic_description_decoded: #* attr: #* fillcolor: '3' #* desc: Decode the topic descriptions from the internal representation to the concept_id #* representation. #* ext: py #* inputs: #* - topic_description_raw #* - gen_vocabulary #* # Ok, so now we need to decode the term indices back to the actual terms from the vocabulary list. We can do that by loading up the # vocabulary list itself (and unpacking it with from_pickle()), and then using a python UDF that translates a vector of indexes to a vector of terms in the vocab. from pyspark.sql.functions import udf from pyspark.sql.types import * def topic_description_decoded( topic_description_raw, gen_vocabulary): vocabulary = from_pickle(gen_vocabulary) # [4, 81, 8] -> ["Fever", "COVID19", "Broken Leg"] # where those are entries in the vocabulary list # note that this function is defined inside the parent function and so has access to the parent's variables, i.e. the vocabulary list that we're using # that allows the UDF to have access to that data even though when we call it below we're just calling it with the column name param and spark handles passing # each array int he column to the function as a parameter def indices_to_vocab_terms(indices_array): terms = [] for index in indices_array: term = vocabulary[index] terms.append(term) return terms map_udf = udf(indices_to_vocab_terms, ArrayType(StringType())) result = topic_description_raw.withColumn("termTerms", map_udf("termIndices")) return result ################################################# ## Global imports and functions included below ## ################################################# import pickle def to_pickle(data): output = Transforms.get_output() output_fs = output.filesystem() with output_fs.open('data.pickle', 'wb') as f: pickle.dump(data, f) def from_pickle(transform_input): with transform_input.filesystem().open('data.pickle', 'rb') as f: data = pickle.load(f) return data