Blog4Java

A personal and Java blog, likely only for me

Getting started with Spark

| 0 comments

Spark Introduction

Apache Spark is a cluster computing platform designed to be fast, expresive, high level, general-purpose, fault-tolerante and compatible with Hadoop (Spark can work directly with HDFS, S3 and so on). Spark can also be defined as a framework for distributed processing and analisys of big amounts of data. People from databricks (the company behind Spark) called it a distributed executing engine for large scale analytics.

Spark improves efficiency over Hadoop because it uses in-memory computing primitives. According to the Apache Spark site, it can run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.

It also claims to improve usability through rich Scala, Python and Java APIs as well as an interactive shell, in Scala and Python. Spark is written in Scala.

Spark Architecture

Spark has three main components

The Apache Spark stack

Apache Spark stack

Spark Core (API)

A high level programming framework that allows programmers to focus on the logic and not the plumbing of distributing programming, that is, the steps to be done without worrying of coordinating tasks, networking of so on.

These steps are define by RDD (Resilient Distributed Datasets), the main programming abstraction that represent a collection of items distributed across many compute nodes that can be manipulated in parallel.

Spark clustering

Spark itself doesn’t manage the cluster, but it supports three cluster managers:

Spark stack

Finally, Spark provides high level specialized components that are closely integrated in order to provide one great platform.

The current components are:

  • Spark SQL: for querying data via SQL.
  • Spark Streaming: for real-time processing of live streams of data.
  • GraphX: a library for manipulating graphs and performing graph-parallel computations.
  • MLLib: a library for machine learning providing algorithms for doing so (classification, regression, …)

Spark Usage

There are two ways to work with Spark:

  • The Spark interactive shells
  • Spark standalone applications

Spark Shell

It’s an interactive shell from the command line that has two implementations, one in Python and the other in Scala, an RPEL that is very useful for learning the API or for data exploration.

Spark’s shells allow you to interact with data not only on your single machine, but on disk or in memory across many machines, thanks to the distributed nature of Spark.

Spark Applications

The other way to work with Spark is by creating standalone applications either in Python, Scala or Java. Use them for large scale data processing.

Spark main concepts

Driver program

It’s the program that launches the distributed operations on a cluster.

The Spark shell is a driver program.

The application that you write, with its main function that defines de datasets and applies operations on them is a driver program.

Spark Context (sc)

It’s the main entry point to the Spark API.

When using the shell, a preconfigured SparkContext is automatically created and it’s available in the variable called sc.

When writing applications, the first thing that you need to create is your own instance of the SparkContext.

Resilient Distributed Dataset (RDD)

The goal of Spark is to allow you to operate in datasets in a single machine and that these operations work in the same way in a distributed cluster.

For achieving this, Spark offers the Resilient Distributed Dataset (RDD), they are immutable collections (dataset) of objects that Spark distributes (distributed) through the cluster. They are loaded from a source of data and, since they are immutable, RDDs are also created as a result of transformation on existing RDDs (map, filters, etc.). Finally, Spark automatically rebuilds them in a node if there is a failure in another node (resilient)

There are two types of RDD operations on RDDs:

  • Transformations: lazy operations to build RDDs based on the current RDD.
  • Actions: return a result or write the RDD to storage. It implies a computation that actually applies the pending transformation that were lazily defined.

In the Spark jargon, this is called a Direct Acyclic Graph (DAG) of operations. The RDDs track the series of transformations used to build them by maintaining a pointer to its parents.

Spark Installation

Go to https://spark.apache.org/downloads.html and then:

  1. Choose a Spark release (1.2.1 is the last at the time of this writing)
  2. Choose a package type: select the package type of “Pre-built for Hadoop 2.4 and later”
  3. Choose a download type: Direct Download is OK, but the default Apache Mirror works well.
  4. Click on the link after Download Spark, for instance spark-1.2.1.tgz, to download Spark.

Unpack the downloaded file and move into that directory in order to use the interactive shell:

Using the Shell

The Python version of the Spark shell is available via the command bin/pyspark and the Scala version of the shell by using bin/spark-shell.

Note: the shell accept code completion with the Tab key.

Let’s try the Scala shell:

To exit either shell, press Ctrl-D.

It’s possible to control the verbosity of the logging by creating a conf/log4j.properties file (use the existing conf/log4j.properties.template, Currently, Spark uses log4j 1.2.17, so you can find more details at Apache log4j™ 1.2 website.) and then changing the line:

log4j.rootCategory=INFO, console

To:

log4j.rootCategory=WARN, console

Now, with the shell we can try some commands like examining the sc variable, create RDDs, filtering them and so on.

There is an INFO message that informs of the URL of the Spark UI (INFO SparkUI: Started SparkUI at http://[ipaddress]:4040), so you can use it to see information about the tasks and clusters.

Spark UI at 4040

Spark UI

Spark Operations

Once we have the Spark shell, let’s use it to take a look to the available operations before we dive into creating applications.

Creating RDDs

You can turn an existing collection into a RDD (parallelize it), you can load an external file (several formats: text, JSON, CSV, SequenceFiles, objects) or even existing Hadoop InputFormat (with sc.hadoopFile())

Transformations

As we said earlier, transformations are lazy evaluated operations on RDDs that return a new RDD.

You can pass each element through a function (with map()) or keep elements that pass a predicate (with filter()) or produce zero or more elements for each element (with flatMap()) and so on.

Actions

They are the operations that return a final value to the driver program or write data to an external storage system that result in the evaluation of the transformations in the RDD.

For instance, retrieve the contents (collect()), return the first n elements (take()), count the number of elements (count()), combine elements with an associative function (reduce()) or write elements to a text file (saveAsTextFile())

Key/Value Pairs

There is a special type of RDD, Pair RDDs, that that contain elements that are tuples, that is, a key-value pair, being key and value of any type.

They are very useful for perform aggregations, grouping, counting. They can be obtained from some initial ETL (extract, transform, load) operations.

The pair RDDS can be partitioned across nodes for improving speed by allowing similar keys to be accesible on the same node.

Regarding operations, Spark offers special operation for Pair RDDs that allow you to act on each key in parallel, for instance, reduceByKey() to aggregate data by key, join() to merge two RDDs by grouping elements with the same key, or even sortByKey().

Now let’s use the Shell to see how easily you can implement the MapReduce WordCount example in a single line:

Spark Applications

For writing a Spark Application it’s possible to use Scala, Python or Java. What I’m going to do is to use Java 8 to take advantage of the new features of the language in order to have a less verbose syntax.

Word count Java application

First, use the appropriate dependency. For instance, with maven:

Then, you have to instantiate your own SparkContext, and it’s done via a SparkConf object. We use the minimal configuration: a name for the cluster URL (“local” to use a local cluster) and an application name to identify the application on the cluster:

Now, before writing Java, code it’s necessary to explain the differences with Scala.

Spark is written in Scala and it takes full advantage of its features. But Java lacks of some of them. So Spark provides alternatives with interfaces or concrete classes.

Let’s see the Word Count example in Spark written in Scala:

Java didn’t accept functions as parameters, so Spark provides interfaces in the org.apache.spark.api.java.function package to be implemented, either as anonymous inner classes or as named classes, to be passed as arguments of the functions (flatMap(), map(), reduceByKey(), …)

In our case, these are the functions that are needed:

  • FlatMapFunction with the method Iterable call(T t) to return zero or more output records from each input record (t).
  • PairFunction with the method Tuple2 call(T t) to return key-value pairs (Tuple2), and can be used to construct PairRDDs.
  • Function2 with the method R call(T1 v1, T2 v2), a two-argument function that takes arguments of type T1 and T2 and returns an R.

Java doesn’t have a native implementation of Tuple (as Lukas Eder said On a side-note at here “Why the JDK doesn’t ship with built-in tuples like C#’s or Scala’s escapes me.”, in other words, “Functional programming without tuples is like coffee without sugar: A bitter punch in your face.”)

For that reason, Spark provides several implementations for Tuple in the scala package.

But, Java has evolved, and now functions are first class citizens, so it’s possible to pass them as parameters for other functions, thus it’s very easy to write the Java 8 version of the word count in Spark using lambdas (since the provided interfaces have a sole public method. And the result is almost as clear as the Scala version)

The complete source code is available at GitHub.

Build and run

Now, we only have to build the project (with maven) and submit it to Spark (with bin/spark-submit). From the root directory of the application (note: the out directory must not exists, so remove it previously if you need so with rm -r out):

Finally, we can see the results to compare with the ones obtained using Hadoop:

Shared Variables

Spark closures and the variables they use are sent separately to the tasks running on the cluster, thus the variables created in the driver program are recieved in the tasks as a new copy, so updates on these copies are not propagated back to the driver.

Spark has two kinds of shared variables, accumulators and broadcast variables, to solve that problem as well as for solving issues related with the amount of data that is sent across the cluster.

Accumulators

Variables that can be used to aggregate values from worker nodes back to the driver program. In a nutshell:

  • They are created with SparkContext.accumulator(initialValue) that returns an org.apache.spark.Accumulator[T] (with T, the type of initialValue)
  • Worker code adds values with += in Scala or the function add() in Java.
  • The driver program can access with value in Scala or value()/setValue() in Java (accessing from worker code throws an exception)
  • The right value will be obtained after calling an action (remember that transformations are lazy operations)

Spark has built-in support for accumulators of type Integer, but you can create custom Accumulators by extending AccumulatorParam.

Let’s see an example that counts the empty lines in the file that we use to count words:

  • In line 5 we create an Accumulator initialized to 0
  • In lines 11 to 16 we modify the FlatMapFunction to add 1 if the input line is empty
  • In line 22 we print the value of the content. After the saveAsTextFile() action.

Let’s try:

Broadcast variables

Shared variable to efficiently distribute large read-only values to all the worker nodes.

If you need to use the same same variable in multiple parallel operations, it’s likely you’d rather share it instead of letting Spark sends it separately for each operation.

In a nutshell:

  • They are created with SparkContext.broadcast(initValue) on an object of type T that has to be Serializable.
  • Access its value with value in Scala or value() in Java.
  • The value shouldn’t be modified after creation, because the change will only happen in one node.

Let’s see an example with a list of words that have not to be included in the count (a short list, but enough to see the concept):

  • In line 9 we create the broadcast variable: a list of words to ignore. In lines 30 to 31 we only return 4 words, but it’s easy to see that the list could be big enough.
  • In line 19 we access the broadcast variable with the value() method and use it in a filter method.

Resources

Author: Javier (@jbbarquero)

Java EE developer, swim addict, occasional videogames player, fan of Ben Alex and Shamus Young, and deeply in love with my wife. Sooner or later I'll dedicate a post to expand this simple introduction.

Leave a Reply

Required fields are marked *.