<?xml version="1.0" encoding="UTF-8"?><rss version="2.0"
	xmlns:content="http://purl.org/rss/1.0/modules/content/"
	xmlns:wfw="http://wellformedweb.org/CommentAPI/"
	xmlns:dc="http://purl.org/dc/elements/1.1/"
	xmlns:atom="http://www.w3.org/2005/Atom"
	xmlns:sy="http://purl.org/rss/1.0/modules/syndication/"
	xmlns:slash="http://purl.org/rss/1.0/modules/slash/"
	>

<channel>
	<title>Blog4Java &#187; Spark</title>
	<atom:link href="http://malsolo.com/blog4java/?feed=rss2&#038;tag=spark" rel="self" type="application/rss+xml" />
	<link>http://malsolo.com/blog4java</link>
	<description>A personal and Java blog, likely only for me</description>
	<lastBuildDate>Tue, 31 Mar 2015 15:52:42 +0000</lastBuildDate>
	<language>en-US</language>
	<sy:updatePeriod>hourly</sy:updatePeriod>
	<sy:updateFrequency>1</sy:updateFrequency>
	<generator>http://wordpress.org/?v=4.1.1</generator>
	<item>
		<title>Getting started with Spark</title>
		<link>http://malsolo.com/blog4java/?p=679</link>
		<comments>http://malsolo.com/blog4java/?p=679#comments</comments>
		<pubDate>Mon, 02 Mar 2015 15:27:16 +0000</pubDate>
		<dc:creator><![CDATA[Javier (@jbbarquero)]]></dc:creator>
				<category><![CDATA[Big Data]]></category>
		<category><![CDATA[Architecture]]></category>
		<category><![CDATA[Hadoop]]></category>
		<category><![CDATA[Java]]></category>
		<category><![CDATA[Spark]]></category>

		<guid isPermaLink="false">http://malsolo.com/blog4java/?p=679</guid>
		<description><![CDATA[Spark Introduction Apache Spark is a cluster computing platform designed to be fast, expresive, high level, general-purpose, fault-tolerante and compatible with Hadoop (Spark can work directly with HDFS, S3 and so on). Spark can also be defined as a framework &#8230; <a href="http://malsolo.com/blog4java/?p=679">Continue reading <span class="meta-nav">&#8594;</span></a>]]></description>
				<content:encoded><![CDATA[<h1>Spark Introduction</h1>
<p>Apache Spark is a cluster <strong>computing platform</strong> designed to be fast, expresive, high level, general-purpose, fault-tolerante and compatible with Hadoop (Spark can work directly with HDFS, S3 and so on). Spark can also be defined as a framework for distributed processing and analisys of big amounts of data. People from databricks (the company behind Spark) called it a distributed executing engine for large scale analytics.</p>
<p>Spark improves efficiency over Hadoop because it uses in-memory computing primitives. According to the <a title="Apache Spark™" href="https://spark.apache.org/" target="_blank">Apache Spark site</a>, it can run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.</p>
<p>It also claims to improve usability through rich Scala, Python and Java APIs as well as an interactive shell, in Scala and Python. Spark is written in Scala.</p>
<h2>Spark Architecture</h2>
<p>Spark has three main components</p>
<div id="attachment_688" style="width: 630px" class="wp-caption alignnone"><a href="http://malsolo.com/blog4java/wp-content/uploads/2015/02/apache_spark_stack.png"><img class="size-large wp-image-688" src="http://malsolo.com/blog4java/wp-content/uploads/2015/02/apache_spark_stack-1024x534.png" alt="The Apache Spark stack" width="620" height="323" /></a><p class="wp-caption-text">Apache Spark stack</p></div>
<h3>Spark Core (API)</h3>
<p>A high level programming framework that allows programmers to focus on the logic and not the plumbing of distributing programming, that is, the steps to be done without worrying of coordinating tasks, networking of so on.</p>
<p>These steps are define by RDD (Resilient Distributed Datasets), the main programming abstraction that represent a collection of items distributed across many compute nodes that can be manipulated in parallel.</p>
<h3>Spark clustering</h3>
<p>Spark itself doesn&#8217;t manage the cluster, but it supports three cluster managers:</p>
<ul>
<li>Standalone: a simple cluster manager included in Spark itself called the Standalone Scheduler.</li>
<li><a title="Apache Hadoop YARN" href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html" target="_blank">Hadoop YARN</a>: see my <a title=" Getting started with Hadoop" href="http://malsolo.com/blog4java/?p=516" target="_blank">introduction to Apache Hadoop</a>.</li>
<li><a title="Apache Mesos" href="http://mesos.apache.org/" target="_blank">Apache Mesos</a>.</li>
</ul>
<h3>Spark stack</h3>
<p>Finally, Spark provides high level specialized components that are closely integrated in order to provide one great platform.</p>
<p>The current components are:</p>
<ul>
<li>Spark SQL: for querying data via SQL.</li>
<li>Spark Streaming: for real-time processing of live streams of data.</li>
<li>GraphX: a library for manipulating graphs and performing graph-parallel computations.</li>
<li>MLLib: a library for machine learning providing algorithms for doing so (classification, regression, &#8230;)</li>
</ul>
<h2>Spark Usage</h2>
<p>There are two ways to work with Spark:</p>
<ul>
<li>The Spark interactive shells</li>
<li>Spark standalone applications</li>
</ul>
<h3>Spark Shell</h3>
<p>It&#8217;s an interactive shell from the command line that has two implementations, one in Python and the other in Scala, an <a title="RPEL: Read–eval–print loop" href="http://en.wikipedia.org/wiki/Read%E2%80%93eval%E2%80%93print_loop" target="_blank">RPEL</a> that is very useful for learning the API or for data exploration.</p>
<p>Spark’s shells allow you to interact with data not only on your single machine, but on disk or in memory across many machines, thanks to the distributed nature of Spark.</p>
<h3>Spark Applications</h3>
<p>The other way to work with Spark is by creating standalone applications either in Python, Scala or Java. Use them for large scale data processing.</p>
<h2>Spark main concepts</h2>
<h3>Driver program</h3>
<p>It&#8217;s the program that launches the distributed operations on a cluster.</p>
<p>The Spark shell is a driver program.</p>
<p>The application that you write, with its <em>main</em> function that defines de datasets and applies operations on them is a driver program.</p>
<h3>Spark Context (sc)</h3>
<p>It&#8217;s the main entry point to the Spark API.</p>
<p>When using the shell, a preconfigured SparkContext is automatically created and it&#8217;s available in the variable called <strong><em>sc</em></strong>.</p>
<p>When writing applications, the first thing that you need to create is your own instance of the SparkContext.</p>
<h3>Resilient Distributed Dataset (RDD)</h3>
<p>The goal of Spark is to allow you to operate in datasets in a single machine and that these operations work in the same way in a distributed cluster.</p>
<p>For achieving this, Spark offers the <strong>Resilient Distributed Dataset</strong> (RDD), they are <span style="text-decoration: underline;">immutable collections</span> (<em>dataset</em>) of objects that Spark distributes (<em>distributed</em>) through the cluster. They are loaded from a source of data and, since they are immutable, RDDs are also created as a result of transformation on existing RDDs (map, filters, etc.). Finally, Spark automatically rebuilds them in a node if there is a failure in another node (<em>resilient</em>)</p>
<p>There are two types of RDD operations on RDDs:</p>
<ul>
<li><strong>Transformations</strong>: lazy operations to build RDDs based on the current RDD.</li>
<li><strong>Actions</strong>: return a result or write the RDD to storage. It implies a computation that actually applies the pending transformation that were lazily defined.</li>
</ul>
<p>In the Spark jargon, this is called a Direct Acyclic Graph (DAG) of operations. The RDDs track the series of transformations used to build them by maintaining a pointer to its parents.</p>
<h1>Spark Installation</h1>
<p>Go to <a title="Download Spark" href="https://spark.apache.org/downloads.html" target="_blank">https://spark.apache.org/downloads.html</a> and then:</p>
<ol>
<li>Choose a Spark release (1.2.1 is the last at the time of this writing)</li>
<li>Choose a package type: select the package type of <em>“Pre-built for Hadoop 2.4 and later”</em></li>
<li>Choose a download type: <em>Direct Download</em> is OK, but the default <em>Apache Mirror</em> works well.</li>
<li>Click on the link after <em>Download Spark</em>, for instance <strong><em>spark-1.2.1.tgz</em></strong>, to download Spark.</li>
</ol>
<p>Unpack the downloaded file and move into that directory in order to use the interactive shell:</p><pre class="crayon-plain-tag">$ tar -xf spark-1.2.0-bin-hadoop2.4.tgz
$ cd spark-1.2.0-bin-hadoop2.4</pre><p></p>
<h2>Using the Shell</h2>
<p>The Python version of the Spark shell is available via the command <strong>bin/pyspark</strong> and the Scala version of the shell by using <strong>bin/spark-shell</strong>.</p>
<p>Note: the shell accept code completion with the Tab key.</p>
<p>Let&#8217;s try the <strong>Scala</strong> shell:</p><pre class="crayon-plain-tag">$ bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/02/26 17:23:45 INFO SecurityManager: Changing view acls to: Javier
15/02/26 17:23:45 INFO SecurityManager: Changing modify acls to: Javier
15/02/26 17:23:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Javier); users with modify permissions: Set(Javier)
15/02/26 17:23:45 INFO HttpServer: Starting HTTP Server
15/02/26 17:23:45 INFO Utils: Successfully started service 'HTTP class server' on port 46130.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_31)
Type in expressions to have them evaluated.
Type :help for more information.
15/02/26 17:23:50 WARN Utils: Your hostname, xxx resolves to a loopback address: 127.0.1.1; using 192.168.2.49 instead (on interface eth0)
15/02/26 17:23:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/02/26 17:23:50 INFO SecurityManager: Changing view acls to: Javier
15/02/26 17:23:50 INFO SecurityManager: Changing modify acls to: Javier
15/02/26 17:23:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Javier); users with modify permissions: Set(Javier)
15/02/26 17:23:51 INFO Slf4jLogger: Slf4jLogger started
15/02/26 17:23:51 INFO Remoting: Starting remoting
15/02/26 17:23:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@xxx.malsolo.lan:55248]
15/02/26 17:23:51 INFO Utils: Successfully started service 'sparkDriver' on port 55248.
15/02/26 17:23:51 INFO SparkEnv: Registering MapOutputTracker
15/02/26 17:23:51 INFO SparkEnv: Registering BlockManagerMaster
15/02/26 17:23:52 INFO DiskBlockManager: Created local directory at /tmp/spark-1420fe71-6907-408a-b44c-9547ba1a2c49/spark-909fad01-a3df-484b-bd30-1ea6006396e9
15/02/26 17:23:52 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/02/26 17:23:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/02/26 17:23:52 INFO HttpFileServer: HTTP File server directory is /tmp/spark-82586699-a230-47e4-8148-2cc4dcc741ec/spark-72f09be4-797a-4612-a845-e4fd1e578e76
15/02/26 17:23:52 INFO HttpServer: Starting HTTP Server
15/02/26 17:23:52 INFO Utils: Successfully started service 'HTTP file server' on port 41493.
15/02/26 17:23:53 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/02/26 17:23:53 INFO SparkUI: Started SparkUI at http://xxx.malsolo.lan:4040
15/02/26 17:23:53 INFO Executor: Starting executor ID  on host localhost
15/02/26 17:23:53 INFO Executor: Using REPL class URI: http://192.168.2.49:46130
15/02/26 17:23:53 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@xxx.malsolo.lan:55248/user/HeartbeatReceiver
15/02/26 17:23:53 INFO NettyBlockTransferService: Server created on 40938
15/02/26 17:23:53 INFO BlockManagerMaster: Trying to register BlockManager
15/02/26 17:23:53 INFO BlockManagerMasterActor: Registering block manager localhost:40938 with 265.1 MB RAM, BlockManagerId(, localhost, 40938)
15/02/26 17:23:53 INFO BlockManagerMaster: Registered BlockManager
15/02/26 17:23:53 INFO SparkILoop: Created spark context..
Spark context available as sc.

scala&gt;</pre><p>To exit either shell, press Ctrl-D.</p><pre class="crayon-plain-tag">scala&gt; Stopping spark context.
15/02/26 17:27:40 INFO SparkUI: Stopped Spark web UI at http://xxx.malsolo.lan:4040
15/02/26 17:27:40 INFO DAGScheduler: Stopping DAGScheduler
15/02/26 17:27:41 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/02/26 17:27:41 INFO MemoryStore: MemoryStore cleared
15/02/26 17:27:41 INFO BlockManager: BlockManager stopped
15/02/26 17:27:41 INFO BlockManagerMaster: BlockManagerMaster stopped
15/02/26 17:27:41 INFO SparkContext: Successfully stopped SparkContext
15/02/26 17:27:41 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/02/26 17:27:41 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/02/26 17:27:41 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
$</pre><p>It&#8217;s possible to control the verbosity of the logging by creating a <em>conf/log4j.properties</em> file (use the existing <em>conf/log4j.properties.template</em>, Currently, Spark uses log4j 1.2.17, so you can find more details at <a title="Apache log4j™ 1.2" href="http://logging.apache.org/log4j/1.2/" target="_blank">Apache log4j™ 1.2</a> website.) and then changing the line:</p>
<p><strong>log4j.rootCategory=INFO, console</strong></p>
<p>To:</p>
<p><strong>log4j.rootCategory=WARN, console</strong></p>
<p>Now, with the shell we can try some commands like examining the sc variable, create RDDs, filtering them and so on.</p><pre class="crayon-plain-tag">scala&gt; sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@76af34b5

scala&gt; val lines = sc.textFile("README.md")
lines: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :12

scala&gt; lines.count()
res1: Long = 98                                                                 

scala&gt; lines.first()
res2: String = # Apache Spark</pre><p></p>
<p>There is an INFO message that informs of the URL of the Spark UI (INFO SparkUI: Started SparkUI at http://[ipaddress]:4040), so you can use it to see information about the tasks and clusters.</p>
<div id="attachment_725" style="width: 904px" class="wp-caption alignnone"><a href="http://malsolo.com/blog4java/wp-content/uploads/2015/03/SparkUI-2.png"><img src="http://malsolo.com/blog4java/wp-content/uploads/2015/03/SparkUI-2.png" alt="Spark UI at 4040" width="894" height="574" class="size-full wp-image-725" /></a><p class="wp-caption-text">Spark UI</p></div>
<h1>Spark Operations</h1>
<p>Once we have the Spark shell, let&#8217;s use it to take a look to the available operations before we dive into creating applications.</p>
<h2>Creating RDDs</h2>
<p>You can turn an existing collection into a RDD (parallelize it), you can load an external file (several formats: text, JSON, CSV, SequenceFiles, objects) or even existing Hadoop InputFormat (with <em>sc.hadoopFile()</em>) </p>
<p></p><pre class="crayon-plain-tag">scala> val numbers = sc.parallelize(List(1,2,3))
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> val lines = sc.textFile("README.md")
lines: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[2] at textFile at <console>:12

scala></pre><p></p>
<h2>Transformations</h2>
<p>As we said earlier, transformations are lazy evaluated operations on RDDs that return a new RDD.</p>
<p>You can pass each element through a function (with <em>map()</em>) or keep elements that pass a predicate (with <em>filter()</em>) or produce zero or more elements for each element (with <em>flatMap()</em>) and so on.</p>
<p></p><pre class="crayon-plain-tag">scala> val squares = numbers.map(x => x*x)
squares: org.apache.spark.rdd.RDD[Int] = MappedRDD[3] at map at <console>:14

scala> val spark = lines.filter(line => line.contains("Spark"))
spark: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:14

scala> val sequences = numbers.flatMap(x => 1 to x)
sequences: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[5] at flatMap at <console>:14

scala> val words = lines.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = FlatMappedRDD[6] at flatMap at <console>:14

scala></pre><p></p>
<h2>Actions</h2>
<p>They are the operations that return a final value to the driver program or write data to an external storage system that result in the evaluation of the transformations in the RDD.</p>
<p>For instance, retrieve the contents (<em>collect()</em>), return the first n elements (<em>take()</em>), count the number of elements (<em>count()</em>), combine elements with an associative function (<em>reduce()</em>) or write elements to a text file (<em>saveAsTextFile()</em>)</p>
<p></p><pre class="crayon-plain-tag">scala> sequences.collect()
res0: Array[Int] = Array(1, 1, 2, 1, 2, 3)                                      

scala> squares.take(2)
res1: Array[Int] = Array(1, 4)

scala> words.count()
res2: Long = 524

scala> numbers.reduce((x, y) => x + y)
res4: Int = 6

scala> spark.saveAsTextFile("borrar.txt")

scala></pre><p></p>
<h2>Key/Value Pairs</h2>
<p>There is a special type of RDD, <strong>Pair RDDs</strong>, that that contain elements that are tuples, that is, a key-value pair, being key and value of any type.</p>
<p>They are very useful for perform aggregations, grouping, counting. They can be obtained from some initial ETL (extract, transform, load) operations.</p>
<p>The pair RDDS can be partitioned across nodes for improving speed by allowing similar keys to be accesible on the same node.</p>
<p>Regarding operations, Spark offers special operation for Pair RDDs that allow you to act on each key in parallel, for instance, <em>reduceByKey()</em> to aggregate data by key, <em>join()</em> to merge two RDDs by grouping elements with the same key, or even <em>sortByKey()</em>.</p>
<p></p><pre class="crayon-plain-tag">scala> val pets = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 2)))
pets: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:12

scala> pets.collect()
res12: Array[(String, Int)] = Array((cat,1), (dog,1), (cat,2))

scala> pets.reduceByKey((a, b) => a + b).collect()
res9: Array[(String, Int)] = Array((dog,1), (cat,3))

scala> pets.groupByKey().collect()
res10: Array[(String, Iterable[Int])] = Array((dog,CompactBuffer(1)), (cat,CompactBuffer(1, 2)))

scala> pets.sortByKey().collect()
res11: Array[(String, Int)] = Array((cat,1), (cat,2), (dog,1))

scala></pre><p></p>
<p>Now let&#8217;s use the Shell to see how easily you can implement the MapReduce WordCount example in a single line:</p>
<p></p><pre class="crayon-plain-tag">scala> val counts = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((x, y) => x + y)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:14

scala> counts.collect()
res16: Array[(String, Int)] = Array((package,1), (this,1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (YARN,,1), (have,1), (pre-built,1), (locally.,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (first,1), (documentation,3), (Configuration,1), (learning,,1), (graph,1), (Hive,2), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (provides,1), (refer,2), (configure,1), (Interactive,2), (distribution.,1), (can,6), (build,3), (when,1), (Apache,1),...
scala></pre><p></p>
<h1>Spark Applications</h1>
<p>For writing a Spark Application it&#8217;s possible to use Scala, Python or Java. What I&#8217;m going to do is to use Java 8 to take advantage of the new features of the language in order to have a less verbose syntax.</p>
<h2>Word count Java application</h2>
<p>First, use the appropriate dependency. For instance, with maven:</p>
<p></p><pre class="crayon-plain-tag">&lt;dependency&gt;
			&lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
			&lt;artifactId&gt;spark-core_2.10&lt;/artifactId&gt;
			&lt;version&gt;1.2.1&lt;/version&gt;
		&lt;/dependency&gt;</pre><p></p>
<p>Then, you have to instantiate your own <strong>SparkContext</strong>, and it&#8217;s done via a <strong>SparkConf</strong> object. We use the minimal configuration: a name for the cluster URL (&#8220;local&#8221; to use a local cluster) and an application name to identify the application on the cluster:</p>
<p></p><pre class="crayon-plain-tag">SparkConf conf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;Word Count with Spark&quot;);
		JavaSparkContext sc = new JavaSparkContext(conf);</pre><p></p>
<p>Now, before writing Java, code it&#8217;s necessary to explain the differences with Scala.</p>
<p>Spark is written in Scala and it takes full advantage of its features. But Java lacks of some of them. So Spark provides alternatives with interfaces or concrete classes.</p>
<p>Let&#8217;s see the Word Count example in Spark written in Scala:</p>
<p></p><pre class="crayon-plain-tag">val file = spark.textFile(&quot;file&quot;)
val counts = file.flatMap(line =&gt; line.split(&quot; &quot;))
                 .map(word =&gt; (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile(&quot;out&quot;)</pre><p></p>
<p>Java didn&#8217;t accept functions as parameters, so Spark provides interfaces in the <em>org.apache.spark.api.java.function</em> package to be implemented, either as anonymous inner classes or as named classes, to be passed as arguments of the functions (<em>flatMap()</em>, <em>map()</em>, <em>reduceByKey()</em>, &#8230;)</p>
<p>In our case, these are the functions that are needed:</p>
<ul>
<li><strong>FlatMapFunction<T, R></strong> with the method <em>Iterable<R> call(T t)</em> to return zero or more output records from each input record (t).</li>
<li><strong>PairFunction<T, K, V></strong> with the method <em>Tuple2<K, V> call(T t)</em> to return key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.</li>
<li><strong>Function2<T1, T2, R></strong> with the method <em>R call(T1 v1, T2 v2)</em>, a two-argument function that takes arguments of type T1 and T2 and returns an R.</li>
</ul>
<p>Java doesn&#8217;t have a native implementation of Tuple (as <a href="https://twitter.com/lukaseder" title="Lukas Eder twitter account" target="_blank">Lukas Eder</a> said <em>On a side-note</em> at <a href="http://blog.jooq.org/2015/01/23/how-to-translate-sql-group-by-and-aggregations-to-java-8/" title="How to Translate SQL GROUP BY and Aggregations to Java 8" target="_blank">here</a> &#8220;Why the JDK doesn’t ship with built-in tuples like C#’s or Scala’s escapes me.&#8221;, in other words, <strong><em>&#8220;Functional programming without tuples is like coffee without sugar: A bitter punch in your face.&#8221;</em></strong>)</p>
<p>For that reason, Spark provides several implementations for Tuple in the <em>scala</em> package.</p>
<p>But, Java has evolved, and now functions are first class citizens, so it’s possible to pass them as parameters for other functions, thus it’s very easy to write the Java 8 version of the word count in Spark using lambdas (since the provided interfaces have a sole public method. And the result is almost as clear as the Scala version)</p>
<p></p><pre class="crayon-plain-tag">JavaRDD&lt;String&gt; lines = sc.textFile(&quot;file&quot;);
		JavaPairRDD&lt;String, Integer&gt; counts = lines.flatMap(line -&gt; Arrays.asList(line.split(&quot; &quot;)))
			.mapToPair(word -&gt; new Tuple2&lt;String, Integer&gt;(word, 1))
			.reduceByKey((x, y) -&gt; x + y);
		counts.saveAsTextFile(&quot;out&quot;);</pre><p></p>
<p>The complete source code is <a href="https://github.com/jbbarquero/spark-examples" title="spark-examples" target="_blank">available at GitHub</a>.</p>
<h2>Build and run</h2>
<p>Now, we only have to build the project (with maven) and submit it to Spark (with <strong>bin/spark-submit</strong>). From the root directory of the application (note: the out directory must not exists, so remove it previously if you need so with <strong><em>rm -r out</em></strong>):</p>
<p></p><pre class="crayon-plain-tag">$ mvn clean install
$ ~/Applications/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class com.malsolo.spark.examples.WordCount target/spark-examples-0.0.1-SNAPSHOT.jar</pre><p></p>
<p>Finally, we can see the results to compare with the ones obtained <a href="http://malsolo.com/blog4java/?p=516" title=" Getting started with Hadoop" target="_blank">using Hadoop</a>:</p>
<p></p><pre class="crayon-plain-tag">$ cat out/part-00000 | grep President
(President,,26)
(President,72)
(President.,8)
(Vice-President,5)
(Vice-President,,5)
(Vice-President;,1)
(President;,3)
(Vice-President.,1)

$ cat out/part-00000 | grep United
(United,85)

$ cat out/part-00000 | grep State
(State,47)
(States,46)
(States.",1)
(State.,6)
(States,,55)
(State,,20)
(States:,2)
(States.,8)
(Statement,1)
(States;,13)
(State;,4)</pre><p></p>
<h1>Shared Variables</h1>
<p>Spark closures and the variables they use are sent separately to the tasks running on the cluster, thus the variables created in the driver program are recieved in the tasks as a new copy, so updates on these copies are not propagated back to the driver.</p>
<p>Spark has two kinds of shared variables, <strong>accumulators</strong> and <strong>broadcast variables</strong>, to solve that problem as well as for solving issues related with the amount of data that is sent across the cluster.</p>
<h2>Accumulators</h2>
<p>Variables that can be used to aggregate values from worker nodes back to the driver program. In a nutshell:</p>
<ul>
<li>They are created with <em>SparkContext.accumulator(initialValue)</em> that returns an <em>org.apache.spark.Accumulator[T]</em> (with T, the type of initialValue)</li>
<li>Worker code adds values with <em>+=</em> in Scala or the function <em>add()</em> in Java.</li>
<li>The driver program can access with <em>value</em> in Scala or <em>value()</em>/<em>setValue()</em> in Java (accessing from worker code throws an exception)</li>
<li>The right value will be obtained after calling an <em><strong>action</strong></em> (remember that <u><em><strong>transformations</strong></em> are lazy operations</u>)</li>
</ul>
<p>Spark has built-in support for accumulators of type Integer, but you can create custom Accumulators by extending <a href="http://spark.apache.org/docs/1.2.1/api/java/index.html?org/apache/spark/AccumulatorParam.html" title="AccumulatorParam API" target="_blank">AccumulatorParam</a>.</p>
<p>Let&#8217;s see an example that counts the empty lines in the file that we use to count words:</p>
<p></p><pre class="crayon-plain-tag">public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;Word Count with Spark&quot;);
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		JavaRDD&lt;String&gt; lines = sc.textFile(INPUT_FILE_TEXT);
		
		final Accumulator&lt;Integer&gt; blankLines = sc.accumulator(0);
		
		@SuppressWarnings(&quot;resource&quot;)
		JavaPairRDD&lt;String, Integer&gt; counts = lines.flatMap(line -&gt; 
			{
				if (&quot;&quot;.equals(line)) {
					blankLines.add(1);
				}
				return Arrays.asList(line.split(&quot; &quot;));
			})
			.mapToPair(word -&gt; new Tuple2&lt;String, Integer&gt;(word, 1))
			.reduceByKey((x, y) -&gt; x + y);

		counts.saveAsTextFile(OUTPUT_FILE_TEXT);
		
		System.out.println(&quot;Blank lines: &quot; + blankLines.value());
		
		sc.close();
	}</pre><p></p>
<ul>
<li>In line 5 we create an Accumulator<Integer> initialized to 0</li>
<li>In lines 11 to 16 we modify the FlatMapFunction to add 1 if the input line is empty</li>
<li>In line 22 we print the value of the content. After the <em>saveAsTextFile()</em> action.</li>
</ul>
<p>Let&#8217;s try:</p>
<p></p><pre class="crayon-plain-tag">$ rm -r out
$ mvn clean install
$ ~/Applications/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class com.malsolo.spark.examples.WordCount target/spark-examples-0.0.1-SNAPSHOT.jar

Spark assembly has been built with Hive, including Datanucleus jars on classpath
15/03/02 15:26:22 WARN Utils: Your hostname, xxx resolves to a loopback address: 127.0.1.1; using yyy.yyy.y.yy instead (on interface eth0)
15/03/02 15:26:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/03/02 15:26:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Blank lines: 169
$</pre><p></p>
<h2>Broadcast variables</h2>
<p>Shared variable to efficiently distribute large read-only values to all the worker nodes.</p>
<p>If you need to use the same same variable in multiple parallel operations, it&#8217;s likely you’d rather share it instead of letting Spark sends it separately for each operation.</p>
<p>In a nutshell:</p>
<ul>
<li>They are created with SparkContext.broadcast(initValue) on an object of type T that has to be Serializable.</li>
<li>Access its value with <em>value</em> in Scala or <em>value()</em> in Java.</li>
<li>The value shouldn&#8217;t be modified after creation, because the change will only happen in one node.</li>
</ul>
<p>Let’s see an example with a list of words that have not to be included in the count (a short list, but enough to see the concept):</p>
<p></p><pre class="crayon-plain-tag">public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;Word Count with Spark&quot;);
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		JavaRDD&lt;String&gt; lines = sc.textFile(INPUT_FILE_TEXT);
		
		final Accumulator&lt;Integer&gt; blankLines = sc.accumulator(0);
		
		final Broadcast&lt;List&lt;String&gt;&gt; wordsToIgnore = sc.broadcast(getWordsToIgnore());
		
		@SuppressWarnings(&quot;resource&quot;)
		JavaPairRDD&lt;String, Integer&gt; counts = lines.flatMap(line -&gt; 
			{
				if (&quot;&quot;.equals(line)) {
					blankLines.add(1);
				}
				return Arrays.asList(line.split(&quot; &quot;));
			})
			.filter(word -&gt; !wordsToIgnore.value().contains(word))
			.mapToPair(word -&gt; new Tuple2&lt;String, Integer&gt;(word, 1))
			.reduceByKey((x, y) -&gt; x + y);
		
		counts.saveAsTextFile(OUTPUT_FILE_TEXT);
		
		System.out.println(&quot;Blank lines: &quot; + blankLines.value());
		
		sc.close();
	}

	private static List&lt;String&gt; getWordsToIgnore() {
		return Arrays.asList(&quot;the&quot;, &quot;of&quot;, &quot;and&quot;, &quot;for&quot;);
	}</pre><p></p>
<ul>
<li>In line 9 we create the broadcast variable: a list of words to ignore. In lines 30 to 31 we only return 4 words, but it&#8217;s easy to see that the list could be big enough.</li>
<li>In line 19 we access the broadcast variable with the <em>value()</em> method and use it in a filter method.</li>
</ul>
<h1>Resources</h1>
<ul>
<li>Source code at <a href="https://github.com/jbbarquero/spark-examples" title="spark-examples" target="_blank">GitHub</a></li>
<li><a title="Learning Spark" href="http://shop.oreilly.com/product/0636920028512.do" target="_blank">Learning Spark</a>. By Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia (O&#8217;Reilly Media)</li>
<li><a title="Cloudera Developer Training for Apache Spark" href="http://cloudera.com/content/cloudera/en/training/courses/spark-training.html" target="_blank">Cloudera Developer Training for Apache Spark</a>. By Diana Carroll (Cloudera training)</li>
<li><a title="Parallel Programming with Spark" href="https://www.youtube.com/watch?v=7k4yDKBYOcw" target="_blank">Parallel Programming with Spark (Part 1 &amp; 2)</a>. By Matei Zaharia ((UC Berkeley AMPLab YouTube channel))</li>
<li><a title="Advanced Spark Features" href="https://www.youtube.com/watch?v=w0Tisli7zn4" target="_blank">Advanced Spark Features</a>. By Matei Zaharia (UC Berkeley AMPLab YouTube channel)</li>
<li><a title="A Deeper Understanding of Spark Internals" href="https://www.youtube.com/watch?v=dmL0N3qfSc8" target="_blank">A Deeper Understanding of Spark Internals</a>. By Aaron Davidson (Apache Spark YouTube channel)</li>
</ul>
]]></content:encoded>
			<wfw:commentRss>http://malsolo.com/blog4java/?feed=rss2&#038;p=679</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
	</channel>
</rss>
