ML Combining R with Spark and H2O DL framework

OperAI - Operational AI
9 min readMay 23, 2021

R programming language

R is a functional programming and scripting language written in a functional style to help to understand the intent and to ensure that the implementation corresponds to that very intent. R is becoming an essential tool for research, finance and analytics-driven companies such as Google, Facebook, LinkedIn, IBM, Microsoft and Amazon. There is a growing importance of R based on survey conducted by IEEE. Goggle used R to demonstrate the value of massively parallel computational infrastructure for forecasting based on the MapReduce paradigm. Both R and Python are on the rise with the convergence of the Internet of Things (IoT) and machine learning (ML).

Apache Spark

Spark emerged as a framework to address the two limitations of Hadoop related to speed and tolerance to partitioning. Spark works on the top of Hadoop, and while Hadoop processes data on disk Spark processes data in memory. Spark uses also RDD concept to process the partitioning required in a computing cluster setting of scalable parallel processing.

ML with R & Spark combined

In addition to R machine learning algorithms Spark supports also machine learning algorithms including both linear and non-linear algorithms such as such general linear models (GLM), decision trees algorithms and feature transformations that can be carried out together as chain with dplyr package pipelines.

The machine learning algorithms are performed in Spark cluster within sparklyr package.

sparklyr with an Apache Spark cluster — https://spark.rstudio.com/

For example to predict a car’s fuel consumption (mpg) for example based on its two features weight (wt), and the number of cylinders the engine contains (cyl) using GLM ML regression model. The assumption is based that a relationship might exist between mpg and each of the two features and that it is linear.

This relationship can be a function where fuel consumption (mpg), as a response variable is dependent on the weight and the number of cylinders of car.

mpg = function (wt, cyl)

To connect to the car table with mpg, wt and cyl variable under Spark, the function copy_to () can be used/

# copy mtcars into sparkmtcars_tbl <- copy_to(sc, mtcars)

There is also another function called spark_connect ():

sc <- spark_connect(master = "local", version = "2.1.0")mtcars_tbl <- copy_to(sc, mtcars, "mtcars")

Data preparation involving transformation and indexing to split data into a training set and test set to 70% and 30% respectively:

# transform the car data set, and then partition into training set and test setpartitions <- mtcars_tbl %>% filter(hp >= 100) %>% mutate(cyl8 = cyl == 8) %>%sdf_partition(training = 0.7, test = 0.7, seed = 1240)

Once the car data is split in the two sets, the model is fit to the training set

# fit a linear model to the training setfit <- partitions$training %>% ml_linear_regression(response = "mpg", features = c("wt", "cyl"))## * No rows dropped by 'na.omit' call

Fitting the ML models

library(rsparkling)library(sparklyr)library(dplyr)library(h2o)mtcars_glm <- h2o.glm(x = c("wt", "cyl"),                     y = "mpg",                     training_frame = mtcars_h2o,                     lambda_search = TRUE)

The models produced by Spark can be summarized, like in R using summary() to display information on the extent and quality of the model fit as well as the precision and accuracy of ML predictions.

summary(fit)

Instead of using built-in car data we can read and write data in CSV or JSON, then store data in HDFS, S3, or on the local filesystem of cluster nodes.

temp_csv <- tempfile(fileext = ".csv")temp_json <- tempfile(fileext = ".json")iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)spark_write_json(iris_tbl, temp_json)iris_json_tbl <- spark_read_json(sc, "iris_json", temp_json)src_tbls(sc)

Distributed computing — Spark

Instead of using all of the data it is possible to partition the data to manageable sizes or groups, from large datasets, by making queries directly against tables within a Spark cluster.

The spark_connection object implements a DBIinterface for Spark and dbGetQuery to perform the SQL and return the result that can be used in turn as data frame in R.

library(DBI)iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")iris_preview

As an example arbitrary r code across can be carried out over (distributed) cluster using spark_apply of iris data:

spark_apply(iris_tbl, function(data) { data[1:4] + rgamma(1,2)})

Another example of grouping data where iris data is used to group data by species and then use GLM model for each group (species).

iris_tbl %>%spark_apply(nrow, group_by = "Species") iris_tbl %>% spark_apply(   function(e) summary(glm(Petal_Length ~ Petal_Width, e))$r.squared,   names = "r.squared",   group_by = "Species")

There is possibility to also perform an operation over each group of rows grouped by columns to and make use of any package within the closure:

spark_apply( iris_tbl, function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)), names = c("term", "estimate", "std.error", "statistic", "p.value"), group_by = "Species")

H2O ML-Deep Learning

Parallel Distributed ML Algorithm with R and H2O, which is an open source math engine for Big Data that can be used to compute parallel distributed machine learning algorithms.

Download H2O directly at http://h2o.ai/download. # Install H2O’s R package from CRAN at https://cran.r-project.org/web/packages/h2o/.

# install.packages("h2o")# run h2olibrary(h2o)

Start H2O on your local machine using all available processor cores. By default, CRAN policies limit use to only 2 cores.

h2o.init(nthreads = -1)

To launch H2O locally with default initialization arguments, use the following:

h2o.init()
# Checking Cluster Status# To check the status and health of the H2O cluster, use h2o.clusterInfo()h2o.clusterInfo()
# Import dataset into H2O and display summarydata(iris)# Converts R object "iris" into H2O object "iris.hex"iris.hex = as.h2o(iris, destination_frame= "iris.hex")head(iris.hex)# Search for factors in the data, if anyh2o.anyFactor(iris.hex)##Displays the titles of the columnscolnames(iris.hex)names(iris.hex)##Displays data in a graphplot(iris$Petal.Length, iris$Petal.Width, pch=21, bg=c("red","green3","blue")[unclass(iris$Species)], xlab="Petal Length", ylab="Petal Width")## Getting Quantiles of the data# Returns the percentiles at 0, 10, 20, ..., 100%iris.qs <- quantile(iris.hex$Sepal.Length, probs =(1:10)/10)iris.qs
# Take the outliers or the bottom and top 10% of iris data
Sepal.Length.outliers <- iris.hex[iris.hex$Sepal.Length <=iris.qs["10%"] | iris.hex$Sepal.Length >= iris.qs["90%"],]# Check that the number of rows return is about 20% of the original datanrow(iris.hex)nrow(Sepal.Length.outliers)nrow(Sepal.Length.outliers)/nrow(iris.hex)# View quantiles and histogramsquantile(x = iris.hex$Sepal.Length, na.rm = TRUE)h2o.hist(iris.hex$Sepal.Length)#### Data split with target assined categorical valueslibrary(keras)# keras as a package uses the pipe operator (%>%) to connect functions or operations together# KerasR uses the dollar $ sign as operator. The pipe operator generally improves the readability of codes# Determine sample sizeind <- sample(2, nrow(iris), replace=TRUE, prob=c(0.67, 0.33))# Split the `iris` datairis.training <- iris[ind==1, 1:4]iris.test <- iris[ind==2, 1:4]# Split the class attributeiris.trainingtarget <- iris[ind==1, 5]iris.testtarget <- iris[ind==2, 5]# One hot encode training target valuesiris.trainLabels <- to_categorical(iris.trainingtarget)# One hot encode test target valuesiris.testLabels <- to_categorical(iris.testtarget)# Print out the iris.testLabels to double check the resultprint(iris.testLabels)# Generating Random Numbers# Creates object for uniform distribution on iris data setrnd_i <- h2o.runif(iris.hex)summary (rnd_i) ## Summarize the results of h2o.runif## Construct test and train sets## Create training set with threshold of 0.67iris.train <- iris.hex[rnd_i <= 0.67,]##Assign name to training setiris.train <- h2o.assign(iris.train, "iris.train")## Create test set with threshold to filter values greater than 0.67iris.test <- iris.hex[rnd_i > 0.67,]## Assign name to test setiris.test <- h2o.assign(iris.test, "iris.test")## Combine results of test & training sets, then display resultnrow(iris.train) + nrow(iris.test)nrow(iris.hex) ## Matches the full set# Or by spiliting frames on two separate subsets based on a specified ratio# Splits data in prostate data frame with a ratio of 0.75iris.split <- h2o.splitFrame(data = iris.hex , ratios = 0.75)# Creates training set from 1st data set in splitiris.train <- iris.split[[1]]# Creates testing set from 2st data set in splitiris.test <- iris.split[[2]]# Getting frames by creating a reference object to the data frame in H2O# index.hex <- h2o.getFrame(id = "index.hex")# Getting Models in H2O by creating a reference object for the model in H2O, using h2o.getModel().gbm.model <- h2o.getModel(model_id = "GBM_8e4591a9b413407b983d73fbd9eb44cf")
# Listing H2O Objects by generating a list of all H2O objects generated during a session and each object‘s size in bytes.
h2o.ls()

Running Models in H2O — # Some of the model types:

1 Gradient Boosting Machine (GBM)

2 Generalized Linear Models (GLM)

3 K-means

4 Principal Components Analysis (PCA)

library(h2o)h2o.init(nthreads = -1)data(iris)iris.hex <- as.h2o(iris,destination_frame = "iris.hex")iris.gbm01 <- h2o.gbm(y = 1, x = 2:5, training_frame = iris.hex, ntrees = 10,max_depth = 3,min_rows = 2, learn_rate = 0.2, distribution= "gaussian")

To find the most important variables type:

names(iris.gbm01@model)iris.gbm01@model$variable_importances# In the case of classification model that uses labels we will use distribution="multinomial":iris.gbm02 <- h2o.gbm(y = 5, x = 1:4, training_frame = iris.hex, ntrees = 15, max_depth = 5, min_rows =2, learn_rate = 0.01, distribution= "multinomial")names(iris.gbm2@model)iris.gbm02@model$variable_importances

Generalized linear models (GLM) are commonly-used sa they are extremely fast,and scales extremely well for models with a limited number of predictors.

binary_data <- read.csv("https://raw.githubusercontent.com/abari212/data/master/PRESENCE_ABSENCE_CASE.csv", header=T, dec=".",sep=",")# Converts R object "binary_data" into H2O object "binary_dat.hex"binary_data.hex = as.h2o(binary_data, destination_frame= "binary_data.hex")head(binary_data.hex)names(binary_data.hex)binary_data.glm01 <- h2o.glm(y = "Y", x = c("X1","X2","X3","X4","X5","X6", "X7","X8"),training_frame = binary_data.hex, family = "binomial", nfolds = 10, alpha = 0.5)binary_data.glm01@model$cross_validation_metricsnames(binary_data.glm01@model)

Checking the accuracy of the model

The accuracy is based on the Receiver Operating Characteristic(ROC) curve. The ROC curve has its origin in engineering for diagnostic test evaluation. The curve has two main axis one for true positive rate values (Sensitivity), which is plotted in function of the axis of false positive rate (100-Specificity), where each point on the curve corresponds to a particular decision threshold. ROC curve is a way to carry out cost/benefit analysis of diagnostic decision making process.

The Area Under Curve (AUC) of the ROC curve to assess the model’s performance

AUC, which is the Area Under Curve of the ROC curve, is the most common measure of accuracy metrics for machine learning techniques for binary problems. AUC values vary between an area under the curve of 1, where the entire graph would fall beneath the curve to 0 when the area under the curve is equal 0.0.

# AUC valuesbinary_data.glm01@model$cross_validation_metrics_summary
# Set predictor and response variablesnames(binary_data.hex)Y = "Y"X = x = c("X1","X2","X3","X4","X5","X6", "X7","X8")# Define the data for the model and display the results## Construct test and train sets# Generating Random Numbers# Creates object for uniform distribution on iris data setrnd_i <- h2o.runif(binary_data.hex)summary (rnd_i) ## Summarize the results of h2o.runif## Create training set with threshold of 0.67binary_data.train <- binary_data.hex[rnd_i <= 0.67,]##Assign name to training setbinary_data.train <- h2o.assign(binary_data.train, "binary_data.train")## Create test set with threshold to filter values greater than 0.67binary_data.test <- binary_data.hex[rnd_i > 0.67,]## Assign name to test setbinary_data.test <- h2o.assign(binary_data.test, "binary_data.test")## Combine results of test & training sets, then display resultnrow(binary_data.train) + nrow(binary_data.test)nrow(binary_data.hex) ## Matches the full setbinary_data.glm02 <- h2o.glm(training_frame=binary_data.train, x=X, y=Y, family = "gaussian", alpha = 0.5)# View model information: training statistics, performance, important variablessummary(binary_data.glm02)# Predict using GLM modelbinary_data.pred = h2o.predict(object = binary_data.glm02, newdata =binary_data.test)# Look at summary of predictions: probability of TRUEnames(binary_data.pred)binary_data.pred[1] Mark Landry with assistance from Spencer Aiello (2018).

More on ML in this book on Big Data.

--

--

OperAI - Operational AI

OperAI develops embedded ML/AI-based solutions to speed up and streamline operational processes at the edges of the cloud.