#* person_topic_assignment: #* attr: #* fillcolor: '3' #* desc: Assign full patient histories to topics. #* ext: py #* inputs: #* - vectorize_concepts #* - lda_model #* # in this node we use the model's .transform() to take the original person data and # assign each person to topics via the probabilistic model from pyspark.ml.functions import vector_to_array def person_topic_assignment(lda_model, vectorize_concepts): ldamodel = lda_model.stages[0].model result = ldamodel.transform(vectorize_concepts) # note that in the result concept_names and concept_vector are carried over from the input table # what's new is the topicDistribution column which has type VectorUDT (as does the concept_vector column from the vectorizer) # the newly generated topicDistribution column contains lists of probabilities, describing how likely each person's concepts are drawn from each "topic" # this is returned as a sparse vector, which is a very pyspark-specific column type (it actually makes use of sparks "struct" data type and custom functions to work # with structs of that type) # fortunately there's a conversion function that converts such a sparse vector back to a regular array column # while we're at it let's drop the the other columns - we just want to know the distribution of topics per person_id result = result.withColumn("topic_dist_array", vector_to_array("topicDistribution")).drop("concept_vector").drop("topicDistribution").drop("concept_names") # in this output, the topic_dist_array column contains an array of the weights associated with each topic return(result) ################################################# ## Global imports and functions included below ## ################################################# import pickle def to_pickle(data): output = Transforms.get_output() output_fs = output.filesystem() with output_fs.open('data.pickle', 'wb') as f: pickle.dump(data, f) def from_pickle(transform_input): with transform_input.filesystem().open('data.pickle', 'rb') as f: data = pickle.load(f) return data