#* model_tests_covid_gapplyCollect: #* attr: #* fillcolor: '7' #* desc: 'The big one: running the logistic regression tests and estimated marginal #* means. #* #* ' #* ext: R #* inputs: #* - epoch_topic_stat_features #* - analysis_r_code #* model_tests_covid_gapplyCollect <- function(epoch_topic_stat_features, analysis_r_code) { # for logging options("width"=200) epoch_features <- epoch_topic_stat_features #%>% SparkR::filter(SparkR::rlike(epoch_topic_stat_features$topic_name, "^T-2(0|3) ")) #return(wrapper(list("topic2test"), epoch_features)) # Create schema # we want a model per topic, so we'll use a list of unique topic names and loop over them, # running the model for each. unique_topics <- SparkR::select(epoch_features, c("topic_name")) %>% SparkR::distinct() %>% SparkR::collect() %>% pull(topic_name) %>% str_sort(numeric = TRUE) print(unique_topics) schema <- structType(structField("contrast", "string"), structField("p_value", "double"), structField("estimate", "double"), structField("asymp_LCL", "double"), structField("asymp_UCL", "double"), structField("group", "string"), structField("topic_name", "string") ) print(schema) res <- SparkR::gapply(epoch_features, epoch_features$"topic_name", wrapper, schema) #str(res) return(res) #res_df_list <- list() # for(topic_name in unique_topics[1:5]) { # # grab data for this topic # topic_df <- SparkR::filter(epoch_features, # epoch_features$topic_name == topic_name) %>% # SparkR::collect() # # Get the result. This wrapper was needed in testing to capture models that error out # # (but didn't happen in the final tests) # # The wrapper the topic name as well as a list of length 1, # # a leftover of when I tried to dispatch the wrapper using SparkR gapply() (never got it to work) # topic_res_df <- wrapper(list(topic_name), topic_df) # # cuz I'm impatient and want to see them hot off the press! # print(topic_name) # print(topic_res_df) # # the result will be NULL if there was an error; that's ok, # # it will be dropped later by the do.call(rbind, res_df_list) # # (yes, appending in a loop is O(n^2), it's a small loop tho) # res_df_list <- c(res_df_list, list(topic_res_df)) # } # # combine the individual results into a single DF and return # all_res <- do.call(rbind, res_df_list) # return(all_res) }