While SPARK-15799 has been merged, satisfying CRAN requirements proved to be challenging (see for example discussions about 2.2.2, 2.3.1, 2.4.0), and the packages has been subsequently removed (see for example SparkR was removed from CRAN on 2018-05-01, CRAN SparkR package removed? Dec 05, 2017 This post grew out of some notes I was making on the differences between SparkR and sparklyr, two packages that provide an R interface to Spark. I’m currently working on a project where I’ll be interacting with data in Spark, so wanted to get a sense of options using R. Those unfamiliar with sparklyr might benefit from reading the first half of this previous post, where I cover the idea of.
-->This document shows how to predict flight arrival delays using a ScaleR logistic regression model. The example uses flight delay and weather data, joined using SparkR.
Although both packages run on Apache Hadoop’s Spark execution engine, they are blocked from in-memory data sharing as they each require their own respective Spark sessions. Until this issue is addressed in an upcoming version of ML Server, the workaround is to maintain non-overlapping Spark sessions, and to exchange data through intermediate files. The instructions here show that these requirements are straightforward to achieve.
This example was initially shared in a talk at Strata 2016 by Mario Inchiosa and Roni Burd. You can find this talk at Building a Scalable Data Science Platform with R.
The code was originally written for ML Server running on Spark in an HDInsight cluster on Azure. But the concept of mixing the use of SparkR and ScaleR in one script is also valid in the context of on-premises environments.
The steps in this document assume that you have an intermediate level of knowledge of R and R the ScaleR library of ML Server. You are introduced to SparkR while walking through this scenario.
The airline and weather datasets
The flight data is available from the U.S. government archives. It is also available as a zip from AirOnTimeCSV.zip.
The weather data can be downloaded as zip files in raw form, by month, from the National Oceanic and Atmospheric Administration repository. For this example, download the data for May 2007 – December 2012. Use the hourly data files and
YYYYMMMstation.txt
file within each of the zips.Setting up the Spark environment
Use the following code to set up the Spark environment:
Next, add
Spark_Home
to the search path for R packages. Adding it to the search path allows you to use SparkR, and initialize a SparkR session:Preparing the weather data
To prepare the weather data, subset it to the columns needed for modeling:
- 'Visibility'
- 'DryBulbCelsius'
- 'DewPointCelsius'
- 'RelativeHumidity'
- 'WindSpeed'
- 'Altimeter'
Then add an airport code associated with the weather station and convert the measurements from local time to UTC.
Begin by creating a file to map the weather station (WBAN) info to an airport code. The following code reads each of the hourly raw weather data files, subsets to the columns we need, merges the weather station mapping file, adjusts the date times of measurements to UTC, and then writes out a new version of the file:
Importing the airline and weather data to Spark DataFrames
Now we use the SparkR read.df() function to import the weather and airline data to Spark DataFrames. This function, like many other Spark methods, are executed lazily, meaning that they are queued for execution but not executed until required.
Data cleansing and transformation
Next we do some cleanup on the airline data we’ve imported to rename columns. We only keep the variables needed, and round scheduled departure times down to the nearest hour to enable merging with the latest weather data at departure:
Now we perform similar operations on the weather data:
Joining the weather and airline data
We now use the SparkR join() function to do a left outer join of the airline and weather data by departure AirportID and datetime. The outer join allows us to retain all the airline data records even if there is no matching weather data. Following the join, we remove some redundant columns, and rename the kept columns to remove the incoming DataFrame prefix introduced by the join.
In a similar fashion, we join the weather and airline data based on arrival AirportID and datetime:
Save results to CSV for exchange with ScaleR
That completes the joins we need to do with SparkR. We save the data from the final Spark DataFrame 'joinedDF5' to a CSV for input to ScaleR and then close out the SparkR session. We explicitly tell SparkR to save the resultant CSV in 80 separate partitions to enable sufficient parallelism in ScaleR processing:
Import to XDF for use by ScaleR
We could use the CSV file of joined airline and weather data as-is for modeling via a ScaleR text data source. But we import it to XDF first, since it is more efficient when running multiple operations on the dataset:
Splitting data for training and test
We use rxDataStep to split out the 2012 data for testing and keep the rest for training:
Train and test a logistic regression model
Now we are ready to build a model. To see the influence of weather data on delay in the arrival time, we use ScaleR’s logistic regression routine. We use it to model whether an arrival delay of greater than 15 minutes is influenced by the weather at the departure and arrival airports:
Now let’s see how it does on the test data by making some predictions and looking at ROC and AUC.
Scoring elsewhere
We can also use the model for scoring data on another platform. By saving it to an RDS file and then transferring and importing that RDS into a destination scoring environment such as MIcrosoft SQL Server R Services. It is important to ensure that the factor levels of the data to be scored match those on which the model was built. That match can be achieved by extracting and saving the column information associated with the modeling data via ScaleR’s
rxCreateColInfo()
function and then applying that column information to the input data source for prediction. In the following we save a few rows of the test dataset and extract and use the column information from this sample in the prediction script:Summary
In this article, we’ve shown how it’s possible to combine use of SparkR for data manipulation with ScaleR for model development in Hadoop Spark. This scenario requires that you maintain separate Spark sessions, only running one session at a time, and exchange data via CSV files. Although straightforward, this process should be even easier in an upcoming ML Services release, when SparkR and ScaleR can share a Spark session and so share Spark DataFrames.
Next steps and more information
- For more information on use of ML Server on Apache Spark, see the Getting started guide.
- For general information on ML Server, see the Get started with R article.
- For information on ML Services on HDInsight, see Overview of ML Services on HDInsight and Get started with ML Services on Azure HDInsight.
For more information on use of SparkR, see:
- Apache SparkR document.
- SparkR Overview from Databricks.
SparkR is an R package that provides a light-weight frontend to use Spark fromR.
NOTE: As of April 2015, SparkR has been merged into Apache Spark and is shipping in an upcoming release (1.4) due early summer 2015. This repo currently targets users using released versions of Spark. This repo no longer accepts new pull requests, and they should now be submitted to apache/spark; see here for some instructions.
NOTE: The API from the upcoming Spark release (1.4) will not have the same API as described by this github repo. Initial support for Spark in R be focussed on high level operations instead of low level ETL. This may change in the (1.5) version. You can contribute and follow SparkR developments on the Apache Spark mailing lists and issue tracker.
Installing SparkR
Requirements
SparkR requires
- Scala 2.10, and
- Spark version >= 0.9.0 and <= 1.2.
Current build by default uses Apache Spark 1.1.0. You can also build SparkR against adifferent Spark version (>= 0.9.0) by modifying
pkg/src/build.sbt
.DataFrame: DataFrame was introduced in Spark 1.3; the 1.3-compatible SparkR version can be found in the
sparkr-sql
branch, which includes a preliminary R API to work with DataFrames.Package installation
To develop SparkR, you can build the scala package and the R package using
If you wish to try out the package directly from github, you can use
install_github
from devtools
. Note that you can specify which branch, tag etc to install from.SparkR by default uses Apache Spark 1.1.0. You can switch to a different Sparkversion by setting the environment variable
SPARK_VERSION
. For example, touse Apache Spark 1.3.0, you can runSparkR by default links to Hadoop 1.0.4. To use SparkR with other Hadoopversions, you will need to rebuild SparkR with the same version that Spark islinkedto.For example to use SparkR with a CDH 4.2.0 MR1 cluster, you can run
By default, SparkR uses sbt to build an assemblyjar. If you wish to use maven instead, you can setthe environment variable
USE_MAVEN=1
. For exampleIf you are building SparkR from behind a proxy, you can setup maven to use the right proxyserver.
Building from source from GitHub
Run the following within R to pull source code from GitHub and build locally. It is possibleto specify build dependencies by starting R with environment values:
- Start R
- Run install_github
note: replace repo and branchname
Running sparkR
If you have cloned and built SparkR, you can start using it by launching the SparkRshell with
The
sparkR
script automatically creates a SparkContext with Spark by default inlocal mode. To specify the Spark master of a cluster for the automatically createdSparkContext, you can runIf you have installed it directly from github, you can include the SparkRpackage and then initialize a SparkContext. For example to run with a localSpark master you can launch R and then run
To increase the memory used by the driver you can export the SPARK_MEMenvironment variable. For example to use 1g, you can run
In a cluster setting to set the amount of memory used by the executors you canpass the variable
spark.executor.memory
to the SparkContext constructor.Finally, to stop the cluster run
sparkR.stop() can be invoked to terminate a SparkContext created previously via sparkR.init(). Then you can call sparkR.init() again to create a new SparkContext that may have different configurations.
Examples, Unit tests
SparkR comes with several sample programs in the
examples
directory.To run one of them, use ./sparkR <filename> <args>
. For example:You can also run the unit-tests for SparkR by running (you need to install the testthat package first):
Running on EC2
Instructions for running SparkR on EC2 can be found in theSparkR wiki.
Running on YARN
Currently, SparkR supports running on YARN with the
yarn-client
mode. These steps show how to build SparkR with YARN support and run SparkR programs on a YARN cluster:Alternatively, install_github can be use (on CDH in this case):
Then within R,
Before launching an application, make sure each worker node has a local copy of
lib/SparkR/sparkr-assembly-0.1.jar
. With a cluster launched with the spark-ec2
script, do:Or run the above installation steps on all worker node.
Finally, when launching an application, the environment variable
YARN_CONF_DIR
needs to be set to the directory which contains the client-side configuration files for the Hadoop cluster (with a cluster launched with spark-ec2
, this defaults to /root/ephemeral-hdfs/conf/
):Running on a cluster using sparkR-submit
sparkR-submit is a script introduced to facilitate submission of SparkR jobs to a Spark supported cluster (e.g. Standalone, Mesos, YARN).It supports the same commandline parameters as spark-submit. SPARK_HOME and JAVA_HOME must be defined.
On YARN, YARN_CONF_DIR must be defined. sparkR-submit supports YARN deploy modes: yarn-client and yarn-cluster.
sparkR-submit is installed with the SparkR package. By default, it can be found under the default Library ('library' subdirectory of R_HOME)
For example, to run on YARN (CDH 5.3.0),
Report Issues/Feedback
For better tracking and collaboration, issues and TODO items are reported to the Apache Spark JIRA under the component tag 'SparkR'.
In your pull request, please cross reference the ticket item created and append '[SPARKR]' (e.g.: '[SPARK-1234] [SPARKR] Pull request').