2019 Data Science Bowl - Part 2

➡ Return to tylerburleigh.com

In Part 1, I started by reading in the data and getting it to a container that could be used for analysis given the large dataset and the memory constraints of my system. Because the datasets were so large, this meant that I had to use a framework designed for “Big Data”. I chose Apache Spark using the sparklyr package.

In this part, I’ll start to look at the JSON data contained within the event_data column and figure out how I might parse and analyze the JSON data at a larger scale.

load_quietly <- function(package) { # Quietly load package
  suppressWarnings(suppressMessages(library(deparse(substitute(package)), character.only=TRUE))) 
}
load_quietly(tidyverse)
load_quietly(data.table)
load_quietly(jsonlite)
load_quietly(sparklyr)
load_quietly(DT)

Start the Spark container without caching the table in memory.

sc <- spark_connect(master = "local")
train_s <- spark_read_parquet(sc, 'data/train.parquet', memory = FALSE)

Next I’ll peek at the data.

glimpse(train_s)
## Observations: ??
## Variables: 11
## Database: spark_connection
## $ event_id        <chr> "d2e9262e", "2fb91ec1", "a6d66e51", "71e712d8"...
## $ game_session    <chr> "da8d88f7055d8a5e", "da8d88f7055d8a5e", "da8d8...
## $ timestamp       <dttm> 2019-08-02 22:20:04, 2019-08-02 22:20:05, 201...
## $ event_data      <chr> "\"{\"\"\"\"cloud\"\"\"\":\"\"\"\"middle\"\"\"...
## $ installation_id <chr> "eca4e454", "eca4e454", "eca4e454", "eca4e454"...
## $ event_count     <int> 101, 102, 103, 104, 105, 106, 107, 108, 109, 1...
## $ event_code      <int> 4020, 4025, 5000, 5010, 4020, 4021, 4021, 4025...
## $ game_time       <int> 199840, 200610, 203444, 205507, 207659, 208786...
## $ title           <chr> "Watering Hole (Activity)", "Watering Hole (Ac...
## $ type            <chr> "Activity", "Activity", "Activity", "Activity"...
## $ world           <chr> "MAGMAPEAK", "MAGMAPEAK", "MAGMAPEAK", "MAGMAP...
train_s %>% head
## # Source: spark<?> [?? x 11]
##   event_id game_session timestamp           event_data installation_id
##   <chr>    <chr>        <dttm>              <chr>      <chr>          
## 1 d2e9262e da8d88f7055~ 2019-08-02 22:20:04 "\"{\"\"\~ eca4e454       
## 2 2fb91ec1 da8d88f7055~ 2019-08-02 22:20:05 "\"{\"\"\~ eca4e454       
## 3 a6d66e51 da8d88f7055~ 2019-08-02 22:20:08 "\"{\"\"\~ eca4e454       
## 4 71e712d8 da8d88f7055~ 2019-08-02 22:20:10 "\"{\"\"\~ eca4e454       
## 5 d2e9262e da8d88f7055~ 2019-08-02 22:20:12 "\"{\"\"\~ eca4e454       
## 6 f50fc6c1 da8d88f7055~ 2019-08-02 22:20:13 "\"{\"\"\~ eca4e454       
## # ... with 6 more variables: event_count <int>, event_code <int>,
## #   game_time <int>, title <chr>, type <chr>, world <chr>

Parsing the JSON

So the main objective is to develop a process for parsing the JSON data. In its current form, the event_data column has double-escaping in the form of additional " characters, so I’ll need to take care of that.

train_s %>%
  select(event_data) %>%
  head(1)
## Warning: `overscope_eval_next()` is deprecated as of rlang 0.2.0.
## Please use `eval_tidy()` with a data mask instead.
## This warning is displayed once per session.
## Warning: `overscope_clean()` is deprecated as of rlang 0.2.0.
## This warning is displayed once per session.
## # Source: spark<?> [?? x 1]
##   event_data                                                               
##   <chr>                                                                    
## 1 "\"{\"\"\"\"cloud\"\"\"\":\"\"\"\"middle\"\"\"\",\"\"\"\"cloud_size\"\"\~

I can use gsub to remove these characters, but I have to do it multiple times to account for the different number of double-quotes around the { and } brackets vs. the inner elements.

train_s %>%
  select(event_data) %>%
  head(1) %>%
  collect(.) -> x
x
## # A tibble: 1 x 1
##   event_data                                                               
##   <chr>                                                                    
## 1 "\"{\"\"\"\"cloud\"\"\"\":\"\"\"\"middle\"\"\"\",\"\"\"\"cloud_size\"\"\~
# Remove double-escaping
gsub('\"\"\"\"', '"', x, fixed=T) %>% 
  gsub('\"{', '{', ., fixed=T) %>% 
  gsub('}\"', '}', ., fixed=T) -> x
x
## [1] "{\"cloud\":\"middle\",\"cloud_size\":3,\"water_level\":1,\"event_count\":101,\"game_time\":199840,\"event_code\":4020}"
# Convert to JSON
fromJSON(x)
## $cloud
## [1] "middle"
## 
## $cloud_size
## [1] 3
## 
## $water_level
## [1] 1
## 
## $event_count
## [1] 101
## 
## $game_time
## [1] 199840
## 
## $event_code
## [1] 4020

gsub inside of Spark

The above approach is maybe not ideal because it involves collecting the data from the spark container into memory, rather than running it directly on the spark container. An alternative approach would be to use spark_apply to run the gsub operation directly on the container when the data is accessed.

train_s %>%
  head() %>%
  spark_apply(function(df) {
    library(dplyr)

    remove_double_escape <- function(e){
      gsub('\"\"\"\"', '"', e, fixed=T) %>% 
        gsub('\"{', '{', ., fixed=T) %>% 
        gsub('}\"', '}', ., fixed=T)
    }

    df %>% mutate(event_data_fix = remove_double_escape(event_data))
  }) %>%
  collect(.) %>%
  select(event_data_fix)
## # A tibble: 6 x 1
##   event_data_fix                                                           
##   <chr>                                                                    
## 1 "{\"cloud\":\"middle\",\"cloud_size\":3,\"water_level\":1,\"event_count\~
## 2 "{\"cloud\":\"middle\",\"cloud_size\":3,\"water_level\":3,\"coordinates\~
## 3 "{\"water_level\":3,\"event_count\":103,\"game_time\":203444,\"event_cod~
## 4 "{\"duration\":750,\"event_count\":104,\"game_time\":205507,\"event_code~
## 5 "{\"cloud\":\"left\",\"cloud_size\":1,\"water_level\":0,\"event_count\":~
## 6 "{\"cloud\":\"middle\",\"cloud_size\":3,\"event_count\":106,\"game_time\~

Chaining gsub with JSON => dataframe conversion

This is great, but what about converting it to a JSON? Unfortunately a dataframe cannot “hold” JSON formatted data in a way that’s useful.

One solution then is to follow the gsub with a rowwise operation that converts the new clean column to a flat JSON first, but then expands it into a dataframe where each JSON variable gets its own column.

train_s %>%
  head() %>%
  spark_apply(function(df) {
    library(dplyr)

    remove_double_escape <- function(e){
      gsub('\"\"\"\"', '"', e, fixed=T) %>% 
        gsub('\"{', '{', ., fixed=T) %>% 
        gsub('}\"', '}', ., fixed=T)
    }

    df %>% mutate(event_data_fix = remove_double_escape(event_data)) 
      
  }) %>%
  collect(.) %>%
  rowwise() %>%
  do(data.frame(fromJSON(.$event_data_fix, flatten = T)))
## Warning in bind_rows_(x, .id): Unequal factor levels: coercing to character
## Warning in bind_rows_(x, .id): binding character and factor vector,
## coercing into character vector

## Warning in bind_rows_(x, .id): binding character and factor vector,
## coercing into character vector

## Warning in bind_rows_(x, .id): binding character and factor vector,
## coercing into character vector
## Source: local data frame [6 x 12]
## Groups: <by row>
## 
## # A tibble: 6 x 12
##   cloud cloud_size water_level event_count game_time event_code
## * <chr>      <int>       <int>       <int>     <int>      <int>
## 1 midd~          3           1         101    199840       4020
## 2 midd~          3           3         102    200610       4025
## 3 <NA>          NA           3         103    203444       5000
## 4 <NA>          NA          NA         104    205507       5010
## 5 left           1           0         105    207659       4020
## 6 midd~          3          NA         106    208786       4021
## # ... with 6 more variables: coordinates.x <int>, coordinates.y <int>,
## #   coordinates.stage_width <int>, coordinates.stage_height <int>,
## #   filled <lgl>, duration <int>

So this seems like a pretty good process, at least for now.

Writing a function for later

I can write a function to abstract what I’ve just done, to make it easier later on when I want to look at the JSON data. In this function, I’ll also join the new columns to the old columns, while removing event_data.

# extract_json 
# spark_df: the spark dataframe
# nrows: the number of rows to return
extract_json <- function(spark_df, nrows = 5){
  
  spark_df %>%
    head(nrows) %>%
    collect(.) -> tmp_df
  
  spark_df %>%
    head(nrows) %>%
    spark_apply(function(df) {
      library(dplyr)
  
      remove_double_escape <- function(e){
        gsub('\"\"\"\"', '"', e, fixed=T) %>% 
          gsub('\"{', '{', ., fixed=T) %>% 
          gsub('}\"', '}', ., fixed=T)
      }
  
      df %>% mutate(tmp = remove_double_escape(event_data)) 
        
    }) %>%
    collect(.) %>%
    rowwise() %>%
    do(data.frame(fromJSON(.$tmp, flatten = T))) %>%
    bind_cols(tmp_df, .) %>%
    select(-event_data)
}

extract_json(train_s) -> x
## Warning in bind_rows_(x, .id): Unequal factor levels: coercing to character
## Warning in bind_rows_(x, .id): binding character and factor vector,
## coercing into character vector

## Warning in bind_rows_(x, .id): binding character and factor vector,
## coercing into character vector
datatable(x, rownames = FALSE,
          options = list(pageLength = 5, scrollX = '400px', dom = 't'))

➡ Go to Part 3