<?xml version="1.0" encoding="UTF-8"?><rss version="2.0"
	xmlns:content="http://purl.org/rss/1.0/modules/content/"
	xmlns:wfw="http://wellformedweb.org/CommentAPI/"
	xmlns:dc="http://purl.org/dc/elements/1.1/"
	xmlns:atom="http://www.w3.org/2005/Atom"
	xmlns:sy="http://purl.org/rss/1.0/modules/syndication/"
	xmlns:slash="http://purl.org/rss/1.0/modules/slash/"
	>

<channel>
	<title>Blog4Java &#187; Architecture</title>
	<atom:link href="http://malsolo.com/blog4java/?feed=rss2&#038;tag=architecture" rel="self" type="application/rss+xml" />
	<link>http://malsolo.com/blog4java</link>
	<description>A personal and Java blog, likely only for me</description>
	<lastBuildDate>Tue, 31 Mar 2015 15:52:42 +0000</lastBuildDate>
	<language>en-US</language>
	<sy:updatePeriod>hourly</sy:updatePeriod>
	<sy:updateFrequency>1</sy:updateFrequency>
	<generator>http://wordpress.org/?v=4.1.1</generator>
	<item>
		<title>Getting started with Spark</title>
		<link>http://malsolo.com/blog4java/?p=679</link>
		<comments>http://malsolo.com/blog4java/?p=679#comments</comments>
		<pubDate>Mon, 02 Mar 2015 15:27:16 +0000</pubDate>
		<dc:creator><![CDATA[Javier (@jbbarquero)]]></dc:creator>
				<category><![CDATA[Big Data]]></category>
		<category><![CDATA[Architecture]]></category>
		<category><![CDATA[Hadoop]]></category>
		<category><![CDATA[Java]]></category>
		<category><![CDATA[Spark]]></category>

		<guid isPermaLink="false">http://malsolo.com/blog4java/?p=679</guid>
		<description><![CDATA[Spark Introduction Apache Spark is a cluster computing platform designed to be fast, expresive, high level, general-purpose, fault-tolerante and compatible with Hadoop (Spark can work directly with HDFS, S3 and so on). Spark can also be defined as a framework &#8230; <a href="http://malsolo.com/blog4java/?p=679">Continue reading <span class="meta-nav">&#8594;</span></a>]]></description>
				<content:encoded><![CDATA[<h1>Spark Introduction</h1>
<p>Apache Spark is a cluster <strong>computing platform</strong> designed to be fast, expresive, high level, general-purpose, fault-tolerante and compatible with Hadoop (Spark can work directly with HDFS, S3 and so on). Spark can also be defined as a framework for distributed processing and analisys of big amounts of data. People from databricks (the company behind Spark) called it a distributed executing engine for large scale analytics.</p>
<p>Spark improves efficiency over Hadoop because it uses in-memory computing primitives. According to the <a title="Apache Spark™" href="https://spark.apache.org/" target="_blank">Apache Spark site</a>, it can run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.</p>
<p>It also claims to improve usability through rich Scala, Python and Java APIs as well as an interactive shell, in Scala and Python. Spark is written in Scala.</p>
<h2>Spark Architecture</h2>
<p>Spark has three main components</p>
<div id="attachment_688" style="width: 630px" class="wp-caption alignnone"><a href="http://malsolo.com/blog4java/wp-content/uploads/2015/02/apache_spark_stack.png"><img class="size-large wp-image-688" src="http://malsolo.com/blog4java/wp-content/uploads/2015/02/apache_spark_stack-1024x534.png" alt="The Apache Spark stack" width="620" height="323" /></a><p class="wp-caption-text">Apache Spark stack</p></div>
<h3>Spark Core (API)</h3>
<p>A high level programming framework that allows programmers to focus on the logic and not the plumbing of distributing programming, that is, the steps to be done without worrying of coordinating tasks, networking of so on.</p>
<p>These steps are define by RDD (Resilient Distributed Datasets), the main programming abstraction that represent a collection of items distributed across many compute nodes that can be manipulated in parallel.</p>
<h3>Spark clustering</h3>
<p>Spark itself doesn&#8217;t manage the cluster, but it supports three cluster managers:</p>
<ul>
<li>Standalone: a simple cluster manager included in Spark itself called the Standalone Scheduler.</li>
<li><a title="Apache Hadoop YARN" href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html" target="_blank">Hadoop YARN</a>: see my <a title=" Getting started with Hadoop" href="http://malsolo.com/blog4java/?p=516" target="_blank">introduction to Apache Hadoop</a>.</li>
<li><a title="Apache Mesos" href="http://mesos.apache.org/" target="_blank">Apache Mesos</a>.</li>
</ul>
<h3>Spark stack</h3>
<p>Finally, Spark provides high level specialized components that are closely integrated in order to provide one great platform.</p>
<p>The current components are:</p>
<ul>
<li>Spark SQL: for querying data via SQL.</li>
<li>Spark Streaming: for real-time processing of live streams of data.</li>
<li>GraphX: a library for manipulating graphs and performing graph-parallel computations.</li>
<li>MLLib: a library for machine learning providing algorithms for doing so (classification, regression, &#8230;)</li>
</ul>
<h2>Spark Usage</h2>
<p>There are two ways to work with Spark:</p>
<ul>
<li>The Spark interactive shells</li>
<li>Spark standalone applications</li>
</ul>
<h3>Spark Shell</h3>
<p>It&#8217;s an interactive shell from the command line that has two implementations, one in Python and the other in Scala, an <a title="RPEL: Read–eval–print loop" href="http://en.wikipedia.org/wiki/Read%E2%80%93eval%E2%80%93print_loop" target="_blank">RPEL</a> that is very useful for learning the API or for data exploration.</p>
<p>Spark’s shells allow you to interact with data not only on your single machine, but on disk or in memory across many machines, thanks to the distributed nature of Spark.</p>
<h3>Spark Applications</h3>
<p>The other way to work with Spark is by creating standalone applications either in Python, Scala or Java. Use them for large scale data processing.</p>
<h2>Spark main concepts</h2>
<h3>Driver program</h3>
<p>It&#8217;s the program that launches the distributed operations on a cluster.</p>
<p>The Spark shell is a driver program.</p>
<p>The application that you write, with its <em>main</em> function that defines de datasets and applies operations on them is a driver program.</p>
<h3>Spark Context (sc)</h3>
<p>It&#8217;s the main entry point to the Spark API.</p>
<p>When using the shell, a preconfigured SparkContext is automatically created and it&#8217;s available in the variable called <strong><em>sc</em></strong>.</p>
<p>When writing applications, the first thing that you need to create is your own instance of the SparkContext.</p>
<h3>Resilient Distributed Dataset (RDD)</h3>
<p>The goal of Spark is to allow you to operate in datasets in a single machine and that these operations work in the same way in a distributed cluster.</p>
<p>For achieving this, Spark offers the <strong>Resilient Distributed Dataset</strong> (RDD), they are <span style="text-decoration: underline;">immutable collections</span> (<em>dataset</em>) of objects that Spark distributes (<em>distributed</em>) through the cluster. They are loaded from a source of data and, since they are immutable, RDDs are also created as a result of transformation on existing RDDs (map, filters, etc.). Finally, Spark automatically rebuilds them in a node if there is a failure in another node (<em>resilient</em>)</p>
<p>There are two types of RDD operations on RDDs:</p>
<ul>
<li><strong>Transformations</strong>: lazy operations to build RDDs based on the current RDD.</li>
<li><strong>Actions</strong>: return a result or write the RDD to storage. It implies a computation that actually applies the pending transformation that were lazily defined.</li>
</ul>
<p>In the Spark jargon, this is called a Direct Acyclic Graph (DAG) of operations. The RDDs track the series of transformations used to build them by maintaining a pointer to its parents.</p>
<h1>Spark Installation</h1>
<p>Go to <a title="Download Spark" href="https://spark.apache.org/downloads.html" target="_blank">https://spark.apache.org/downloads.html</a> and then:</p>
<ol>
<li>Choose a Spark release (1.2.1 is the last at the time of this writing)</li>
<li>Choose a package type: select the package type of <em>“Pre-built for Hadoop 2.4 and later”</em></li>
<li>Choose a download type: <em>Direct Download</em> is OK, but the default <em>Apache Mirror</em> works well.</li>
<li>Click on the link after <em>Download Spark</em>, for instance <strong><em>spark-1.2.1.tgz</em></strong>, to download Spark.</li>
</ol>
<p>Unpack the downloaded file and move into that directory in order to use the interactive shell:</p><pre class="crayon-plain-tag">$ tar -xf spark-1.2.0-bin-hadoop2.4.tgz
$ cd spark-1.2.0-bin-hadoop2.4</pre><p></p>
<h2>Using the Shell</h2>
<p>The Python version of the Spark shell is available via the command <strong>bin/pyspark</strong> and the Scala version of the shell by using <strong>bin/spark-shell</strong>.</p>
<p>Note: the shell accept code completion with the Tab key.</p>
<p>Let&#8217;s try the <strong>Scala</strong> shell:</p><pre class="crayon-plain-tag">$ bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/02/26 17:23:45 INFO SecurityManager: Changing view acls to: Javier
15/02/26 17:23:45 INFO SecurityManager: Changing modify acls to: Javier
15/02/26 17:23:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Javier); users with modify permissions: Set(Javier)
15/02/26 17:23:45 INFO HttpServer: Starting HTTP Server
15/02/26 17:23:45 INFO Utils: Successfully started service 'HTTP class server' on port 46130.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_31)
Type in expressions to have them evaluated.
Type :help for more information.
15/02/26 17:23:50 WARN Utils: Your hostname, xxx resolves to a loopback address: 127.0.1.1; using 192.168.2.49 instead (on interface eth0)
15/02/26 17:23:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/02/26 17:23:50 INFO SecurityManager: Changing view acls to: Javier
15/02/26 17:23:50 INFO SecurityManager: Changing modify acls to: Javier
15/02/26 17:23:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Javier); users with modify permissions: Set(Javier)
15/02/26 17:23:51 INFO Slf4jLogger: Slf4jLogger started
15/02/26 17:23:51 INFO Remoting: Starting remoting
15/02/26 17:23:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@xxx.malsolo.lan:55248]
15/02/26 17:23:51 INFO Utils: Successfully started service 'sparkDriver' on port 55248.
15/02/26 17:23:51 INFO SparkEnv: Registering MapOutputTracker
15/02/26 17:23:51 INFO SparkEnv: Registering BlockManagerMaster
15/02/26 17:23:52 INFO DiskBlockManager: Created local directory at /tmp/spark-1420fe71-6907-408a-b44c-9547ba1a2c49/spark-909fad01-a3df-484b-bd30-1ea6006396e9
15/02/26 17:23:52 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/02/26 17:23:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/02/26 17:23:52 INFO HttpFileServer: HTTP File server directory is /tmp/spark-82586699-a230-47e4-8148-2cc4dcc741ec/spark-72f09be4-797a-4612-a845-e4fd1e578e76
15/02/26 17:23:52 INFO HttpServer: Starting HTTP Server
15/02/26 17:23:52 INFO Utils: Successfully started service 'HTTP file server' on port 41493.
15/02/26 17:23:53 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/02/26 17:23:53 INFO SparkUI: Started SparkUI at http://xxx.malsolo.lan:4040
15/02/26 17:23:53 INFO Executor: Starting executor ID  on host localhost
15/02/26 17:23:53 INFO Executor: Using REPL class URI: http://192.168.2.49:46130
15/02/26 17:23:53 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@xxx.malsolo.lan:55248/user/HeartbeatReceiver
15/02/26 17:23:53 INFO NettyBlockTransferService: Server created on 40938
15/02/26 17:23:53 INFO BlockManagerMaster: Trying to register BlockManager
15/02/26 17:23:53 INFO BlockManagerMasterActor: Registering block manager localhost:40938 with 265.1 MB RAM, BlockManagerId(, localhost, 40938)
15/02/26 17:23:53 INFO BlockManagerMaster: Registered BlockManager
15/02/26 17:23:53 INFO SparkILoop: Created spark context..
Spark context available as sc.

scala&gt;</pre><p>To exit either shell, press Ctrl-D.</p><pre class="crayon-plain-tag">scala&gt; Stopping spark context.
15/02/26 17:27:40 INFO SparkUI: Stopped Spark web UI at http://xxx.malsolo.lan:4040
15/02/26 17:27:40 INFO DAGScheduler: Stopping DAGScheduler
15/02/26 17:27:41 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/02/26 17:27:41 INFO MemoryStore: MemoryStore cleared
15/02/26 17:27:41 INFO BlockManager: BlockManager stopped
15/02/26 17:27:41 INFO BlockManagerMaster: BlockManagerMaster stopped
15/02/26 17:27:41 INFO SparkContext: Successfully stopped SparkContext
15/02/26 17:27:41 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/02/26 17:27:41 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/02/26 17:27:41 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
$</pre><p>It&#8217;s possible to control the verbosity of the logging by creating a <em>conf/log4j.properties</em> file (use the existing <em>conf/log4j.properties.template</em>, Currently, Spark uses log4j 1.2.17, so you can find more details at <a title="Apache log4j™ 1.2" href="http://logging.apache.org/log4j/1.2/" target="_blank">Apache log4j™ 1.2</a> website.) and then changing the line:</p>
<p><strong>log4j.rootCategory=INFO, console</strong></p>
<p>To:</p>
<p><strong>log4j.rootCategory=WARN, console</strong></p>
<p>Now, with the shell we can try some commands like examining the sc variable, create RDDs, filtering them and so on.</p><pre class="crayon-plain-tag">scala&gt; sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@76af34b5

scala&gt; val lines = sc.textFile("README.md")
lines: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :12

scala&gt; lines.count()
res1: Long = 98                                                                 

scala&gt; lines.first()
res2: String = # Apache Spark</pre><p></p>
<p>There is an INFO message that informs of the URL of the Spark UI (INFO SparkUI: Started SparkUI at http://[ipaddress]:4040), so you can use it to see information about the tasks and clusters.</p>
<div id="attachment_725" style="width: 904px" class="wp-caption alignnone"><a href="http://malsolo.com/blog4java/wp-content/uploads/2015/03/SparkUI-2.png"><img src="http://malsolo.com/blog4java/wp-content/uploads/2015/03/SparkUI-2.png" alt="Spark UI at 4040" width="894" height="574" class="size-full wp-image-725" /></a><p class="wp-caption-text">Spark UI</p></div>
<h1>Spark Operations</h1>
<p>Once we have the Spark shell, let&#8217;s use it to take a look to the available operations before we dive into creating applications.</p>
<h2>Creating RDDs</h2>
<p>You can turn an existing collection into a RDD (parallelize it), you can load an external file (several formats: text, JSON, CSV, SequenceFiles, objects) or even existing Hadoop InputFormat (with <em>sc.hadoopFile()</em>) </p>
<p></p><pre class="crayon-plain-tag">scala> val numbers = sc.parallelize(List(1,2,3))
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> val lines = sc.textFile("README.md")
lines: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[2] at textFile at <console>:12

scala></pre><p></p>
<h2>Transformations</h2>
<p>As we said earlier, transformations are lazy evaluated operations on RDDs that return a new RDD.</p>
<p>You can pass each element through a function (with <em>map()</em>) or keep elements that pass a predicate (with <em>filter()</em>) or produce zero or more elements for each element (with <em>flatMap()</em>) and so on.</p>
<p></p><pre class="crayon-plain-tag">scala> val squares = numbers.map(x => x*x)
squares: org.apache.spark.rdd.RDD[Int] = MappedRDD[3] at map at <console>:14

scala> val spark = lines.filter(line => line.contains("Spark"))
spark: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:14

scala> val sequences = numbers.flatMap(x => 1 to x)
sequences: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[5] at flatMap at <console>:14

scala> val words = lines.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = FlatMappedRDD[6] at flatMap at <console>:14

scala></pre><p></p>
<h2>Actions</h2>
<p>They are the operations that return a final value to the driver program or write data to an external storage system that result in the evaluation of the transformations in the RDD.</p>
<p>For instance, retrieve the contents (<em>collect()</em>), return the first n elements (<em>take()</em>), count the number of elements (<em>count()</em>), combine elements with an associative function (<em>reduce()</em>) or write elements to a text file (<em>saveAsTextFile()</em>)</p>
<p></p><pre class="crayon-plain-tag">scala> sequences.collect()
res0: Array[Int] = Array(1, 1, 2, 1, 2, 3)                                      

scala> squares.take(2)
res1: Array[Int] = Array(1, 4)

scala> words.count()
res2: Long = 524

scala> numbers.reduce((x, y) => x + y)
res4: Int = 6

scala> spark.saveAsTextFile("borrar.txt")

scala></pre><p></p>
<h2>Key/Value Pairs</h2>
<p>There is a special type of RDD, <strong>Pair RDDs</strong>, that that contain elements that are tuples, that is, a key-value pair, being key and value of any type.</p>
<p>They are very useful for perform aggregations, grouping, counting. They can be obtained from some initial ETL (extract, transform, load) operations.</p>
<p>The pair RDDS can be partitioned across nodes for improving speed by allowing similar keys to be accesible on the same node.</p>
<p>Regarding operations, Spark offers special operation for Pair RDDs that allow you to act on each key in parallel, for instance, <em>reduceByKey()</em> to aggregate data by key, <em>join()</em> to merge two RDDs by grouping elements with the same key, or even <em>sortByKey()</em>.</p>
<p></p><pre class="crayon-plain-tag">scala> val pets = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 2)))
pets: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:12

scala> pets.collect()
res12: Array[(String, Int)] = Array((cat,1), (dog,1), (cat,2))

scala> pets.reduceByKey((a, b) => a + b).collect()
res9: Array[(String, Int)] = Array((dog,1), (cat,3))

scala> pets.groupByKey().collect()
res10: Array[(String, Iterable[Int])] = Array((dog,CompactBuffer(1)), (cat,CompactBuffer(1, 2)))

scala> pets.sortByKey().collect()
res11: Array[(String, Int)] = Array((cat,1), (cat,2), (dog,1))

scala></pre><p></p>
<p>Now let&#8217;s use the Shell to see how easily you can implement the MapReduce WordCount example in a single line:</p>
<p></p><pre class="crayon-plain-tag">scala> val counts = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((x, y) => x + y)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:14

scala> counts.collect()
res16: Array[(String, Int)] = Array((package,1), (this,1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (YARN,,1), (have,1), (pre-built,1), (locally.,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (first,1), (documentation,3), (Configuration,1), (learning,,1), (graph,1), (Hive,2), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (provides,1), (refer,2), (configure,1), (Interactive,2), (distribution.,1), (can,6), (build,3), (when,1), (Apache,1),...
scala></pre><p></p>
<h1>Spark Applications</h1>
<p>For writing a Spark Application it&#8217;s possible to use Scala, Python or Java. What I&#8217;m going to do is to use Java 8 to take advantage of the new features of the language in order to have a less verbose syntax.</p>
<h2>Word count Java application</h2>
<p>First, use the appropriate dependency. For instance, with maven:</p>
<p></p><pre class="crayon-plain-tag">&lt;dependency&gt;
			&lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
			&lt;artifactId&gt;spark-core_2.10&lt;/artifactId&gt;
			&lt;version&gt;1.2.1&lt;/version&gt;
		&lt;/dependency&gt;</pre><p></p>
<p>Then, you have to instantiate your own <strong>SparkContext</strong>, and it&#8217;s done via a <strong>SparkConf</strong> object. We use the minimal configuration: a name for the cluster URL (&#8220;local&#8221; to use a local cluster) and an application name to identify the application on the cluster:</p>
<p></p><pre class="crayon-plain-tag">SparkConf conf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;Word Count with Spark&quot;);
		JavaSparkContext sc = new JavaSparkContext(conf);</pre><p></p>
<p>Now, before writing Java, code it&#8217;s necessary to explain the differences with Scala.</p>
<p>Spark is written in Scala and it takes full advantage of its features. But Java lacks of some of them. So Spark provides alternatives with interfaces or concrete classes.</p>
<p>Let&#8217;s see the Word Count example in Spark written in Scala:</p>
<p></p><pre class="crayon-plain-tag">val file = spark.textFile(&quot;file&quot;)
val counts = file.flatMap(line =&gt; line.split(&quot; &quot;))
                 .map(word =&gt; (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile(&quot;out&quot;)</pre><p></p>
<p>Java didn&#8217;t accept functions as parameters, so Spark provides interfaces in the <em>org.apache.spark.api.java.function</em> package to be implemented, either as anonymous inner classes or as named classes, to be passed as arguments of the functions (<em>flatMap()</em>, <em>map()</em>, <em>reduceByKey()</em>, &#8230;)</p>
<p>In our case, these are the functions that are needed:</p>
<ul>
<li><strong>FlatMapFunction<T, R></strong> with the method <em>Iterable<R> call(T t)</em> to return zero or more output records from each input record (t).</li>
<li><strong>PairFunction<T, K, V></strong> with the method <em>Tuple2<K, V> call(T t)</em> to return key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.</li>
<li><strong>Function2<T1, T2, R></strong> with the method <em>R call(T1 v1, T2 v2)</em>, a two-argument function that takes arguments of type T1 and T2 and returns an R.</li>
</ul>
<p>Java doesn&#8217;t have a native implementation of Tuple (as <a href="https://twitter.com/lukaseder" title="Lukas Eder twitter account" target="_blank">Lukas Eder</a> said <em>On a side-note</em> at <a href="http://blog.jooq.org/2015/01/23/how-to-translate-sql-group-by-and-aggregations-to-java-8/" title="How to Translate SQL GROUP BY and Aggregations to Java 8" target="_blank">here</a> &#8220;Why the JDK doesn’t ship with built-in tuples like C#’s or Scala’s escapes me.&#8221;, in other words, <strong><em>&#8220;Functional programming without tuples is like coffee without sugar: A bitter punch in your face.&#8221;</em></strong>)</p>
<p>For that reason, Spark provides several implementations for Tuple in the <em>scala</em> package.</p>
<p>But, Java has evolved, and now functions are first class citizens, so it’s possible to pass them as parameters for other functions, thus it’s very easy to write the Java 8 version of the word count in Spark using lambdas (since the provided interfaces have a sole public method. And the result is almost as clear as the Scala version)</p>
<p></p><pre class="crayon-plain-tag">JavaRDD&lt;String&gt; lines = sc.textFile(&quot;file&quot;);
		JavaPairRDD&lt;String, Integer&gt; counts = lines.flatMap(line -&gt; Arrays.asList(line.split(&quot; &quot;)))
			.mapToPair(word -&gt; new Tuple2&lt;String, Integer&gt;(word, 1))
			.reduceByKey((x, y) -&gt; x + y);
		counts.saveAsTextFile(&quot;out&quot;);</pre><p></p>
<p>The complete source code is <a href="https://github.com/jbbarquero/spark-examples" title="spark-examples" target="_blank">available at GitHub</a>.</p>
<h2>Build and run</h2>
<p>Now, we only have to build the project (with maven) and submit it to Spark (with <strong>bin/spark-submit</strong>). From the root directory of the application (note: the out directory must not exists, so remove it previously if you need so with <strong><em>rm -r out</em></strong>):</p>
<p></p><pre class="crayon-plain-tag">$ mvn clean install
$ ~/Applications/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class com.malsolo.spark.examples.WordCount target/spark-examples-0.0.1-SNAPSHOT.jar</pre><p></p>
<p>Finally, we can see the results to compare with the ones obtained <a href="http://malsolo.com/blog4java/?p=516" title=" Getting started with Hadoop" target="_blank">using Hadoop</a>:</p>
<p></p><pre class="crayon-plain-tag">$ cat out/part-00000 | grep President
(President,,26)
(President,72)
(President.,8)
(Vice-President,5)
(Vice-President,,5)
(Vice-President;,1)
(President;,3)
(Vice-President.,1)

$ cat out/part-00000 | grep United
(United,85)

$ cat out/part-00000 | grep State
(State,47)
(States,46)
(States.",1)
(State.,6)
(States,,55)
(State,,20)
(States:,2)
(States.,8)
(Statement,1)
(States;,13)
(State;,4)</pre><p></p>
<h1>Shared Variables</h1>
<p>Spark closures and the variables they use are sent separately to the tasks running on the cluster, thus the variables created in the driver program are recieved in the tasks as a new copy, so updates on these copies are not propagated back to the driver.</p>
<p>Spark has two kinds of shared variables, <strong>accumulators</strong> and <strong>broadcast variables</strong>, to solve that problem as well as for solving issues related with the amount of data that is sent across the cluster.</p>
<h2>Accumulators</h2>
<p>Variables that can be used to aggregate values from worker nodes back to the driver program. In a nutshell:</p>
<ul>
<li>They are created with <em>SparkContext.accumulator(initialValue)</em> that returns an <em>org.apache.spark.Accumulator[T]</em> (with T, the type of initialValue)</li>
<li>Worker code adds values with <em>+=</em> in Scala or the function <em>add()</em> in Java.</li>
<li>The driver program can access with <em>value</em> in Scala or <em>value()</em>/<em>setValue()</em> in Java (accessing from worker code throws an exception)</li>
<li>The right value will be obtained after calling an <em><strong>action</strong></em> (remember that <u><em><strong>transformations</strong></em> are lazy operations</u>)</li>
</ul>
<p>Spark has built-in support for accumulators of type Integer, but you can create custom Accumulators by extending <a href="http://spark.apache.org/docs/1.2.1/api/java/index.html?org/apache/spark/AccumulatorParam.html" title="AccumulatorParam API" target="_blank">AccumulatorParam</a>.</p>
<p>Let&#8217;s see an example that counts the empty lines in the file that we use to count words:</p>
<p></p><pre class="crayon-plain-tag">public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;Word Count with Spark&quot;);
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		JavaRDD&lt;String&gt; lines = sc.textFile(INPUT_FILE_TEXT);
		
		final Accumulator&lt;Integer&gt; blankLines = sc.accumulator(0);
		
		@SuppressWarnings(&quot;resource&quot;)
		JavaPairRDD&lt;String, Integer&gt; counts = lines.flatMap(line -&gt; 
			{
				if (&quot;&quot;.equals(line)) {
					blankLines.add(1);
				}
				return Arrays.asList(line.split(&quot; &quot;));
			})
			.mapToPair(word -&gt; new Tuple2&lt;String, Integer&gt;(word, 1))
			.reduceByKey((x, y) -&gt; x + y);

		counts.saveAsTextFile(OUTPUT_FILE_TEXT);
		
		System.out.println(&quot;Blank lines: &quot; + blankLines.value());
		
		sc.close();
	}</pre><p></p>
<ul>
<li>In line 5 we create an Accumulator<Integer> initialized to 0</li>
<li>In lines 11 to 16 we modify the FlatMapFunction to add 1 if the input line is empty</li>
<li>In line 22 we print the value of the content. After the <em>saveAsTextFile()</em> action.</li>
</ul>
<p>Let&#8217;s try:</p>
<p></p><pre class="crayon-plain-tag">$ rm -r out
$ mvn clean install
$ ~/Applications/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class com.malsolo.spark.examples.WordCount target/spark-examples-0.0.1-SNAPSHOT.jar

Spark assembly has been built with Hive, including Datanucleus jars on classpath
15/03/02 15:26:22 WARN Utils: Your hostname, xxx resolves to a loopback address: 127.0.1.1; using yyy.yyy.y.yy instead (on interface eth0)
15/03/02 15:26:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/03/02 15:26:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Blank lines: 169
$</pre><p></p>
<h2>Broadcast variables</h2>
<p>Shared variable to efficiently distribute large read-only values to all the worker nodes.</p>
<p>If you need to use the same same variable in multiple parallel operations, it&#8217;s likely you’d rather share it instead of letting Spark sends it separately for each operation.</p>
<p>In a nutshell:</p>
<ul>
<li>They are created with SparkContext.broadcast(initValue) on an object of type T that has to be Serializable.</li>
<li>Access its value with <em>value</em> in Scala or <em>value()</em> in Java.</li>
<li>The value shouldn&#8217;t be modified after creation, because the change will only happen in one node.</li>
</ul>
<p>Let’s see an example with a list of words that have not to be included in the count (a short list, but enough to see the concept):</p>
<p></p><pre class="crayon-plain-tag">public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;Word Count with Spark&quot;);
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		JavaRDD&lt;String&gt; lines = sc.textFile(INPUT_FILE_TEXT);
		
		final Accumulator&lt;Integer&gt; blankLines = sc.accumulator(0);
		
		final Broadcast&lt;List&lt;String&gt;&gt; wordsToIgnore = sc.broadcast(getWordsToIgnore());
		
		@SuppressWarnings(&quot;resource&quot;)
		JavaPairRDD&lt;String, Integer&gt; counts = lines.flatMap(line -&gt; 
			{
				if (&quot;&quot;.equals(line)) {
					blankLines.add(1);
				}
				return Arrays.asList(line.split(&quot; &quot;));
			})
			.filter(word -&gt; !wordsToIgnore.value().contains(word))
			.mapToPair(word -&gt; new Tuple2&lt;String, Integer&gt;(word, 1))
			.reduceByKey((x, y) -&gt; x + y);
		
		counts.saveAsTextFile(OUTPUT_FILE_TEXT);
		
		System.out.println(&quot;Blank lines: &quot; + blankLines.value());
		
		sc.close();
	}

	private static List&lt;String&gt; getWordsToIgnore() {
		return Arrays.asList(&quot;the&quot;, &quot;of&quot;, &quot;and&quot;, &quot;for&quot;);
	}</pre><p></p>
<ul>
<li>In line 9 we create the broadcast variable: a list of words to ignore. In lines 30 to 31 we only return 4 words, but it&#8217;s easy to see that the list could be big enough.</li>
<li>In line 19 we access the broadcast variable with the <em>value()</em> method and use it in a filter method.</li>
</ul>
<h1>Resources</h1>
<ul>
<li>Source code at <a href="https://github.com/jbbarquero/spark-examples" title="spark-examples" target="_blank">GitHub</a></li>
<li><a title="Learning Spark" href="http://shop.oreilly.com/product/0636920028512.do" target="_blank">Learning Spark</a>. By Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia (O&#8217;Reilly Media)</li>
<li><a title="Cloudera Developer Training for Apache Spark" href="http://cloudera.com/content/cloudera/en/training/courses/spark-training.html" target="_blank">Cloudera Developer Training for Apache Spark</a>. By Diana Carroll (Cloudera training)</li>
<li><a title="Parallel Programming with Spark" href="https://www.youtube.com/watch?v=7k4yDKBYOcw" target="_blank">Parallel Programming with Spark (Part 1 &amp; 2)</a>. By Matei Zaharia ((UC Berkeley AMPLab YouTube channel))</li>
<li><a title="Advanced Spark Features" href="https://www.youtube.com/watch?v=w0Tisli7zn4" target="_blank">Advanced Spark Features</a>. By Matei Zaharia (UC Berkeley AMPLab YouTube channel)</li>
<li><a title="A Deeper Understanding of Spark Internals" href="https://www.youtube.com/watch?v=dmL0N3qfSc8" target="_blank">A Deeper Understanding of Spark Internals</a>. By Aaron Davidson (Apache Spark YouTube channel)</li>
</ul>
]]></content:encoded>
			<wfw:commentRss>http://malsolo.com/blog4java/?feed=rss2&#038;p=679</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>Getting started with Hadoop</title>
		<link>http://malsolo.com/blog4java/?p=516</link>
		<comments>http://malsolo.com/blog4java/?p=516#comments</comments>
		<pubDate>Wed, 25 Feb 2015 15:36:14 +0000</pubDate>
		<dc:creator><![CDATA[Javier (@jbbarquero)]]></dc:creator>
				<category><![CDATA[Big Data]]></category>
		<category><![CDATA[Architecture]]></category>
		<category><![CDATA[Hadoop]]></category>
		<category><![CDATA[Java]]></category>

		<guid isPermaLink="false">http://malsolo.com/blog4java/?p=516</guid>
		<description><![CDATA[Hadoop Introduction Hadoop is an open source framework for distributed fault-tolerant data storage and batch processing. It allows you to write applications for processing really huge data sets across clusters of computers using simple programming model with linear scalability on &#8230; <a href="http://malsolo.com/blog4java/?p=516">Continue reading <span class="meta-nav">&#8594;</span></a>]]></description>
				<content:encoded><![CDATA[<h1>Hadoop Introduction</h1>
<p>Hadoop is an open source framework for distributed fault-tolerant data storage and batch processing. It allows you to write applications for processing really huge data sets across clusters of computers using simple programming model with linear scalability on commodity hardware. Commodity hardware means cheaper hardware than the dedicated servers that are sold by many vendors. Linear scalability means that you have only to add more machines (nodes) to the Hadoop cluster.</p>
<p>The key concept for Hadoop is <strong><em>move-code-to-data</em></strong>, that is, data is distributed across the nodes of the Hadoop cluster and the applications (the jar files) are later sent to that nodes instead of vice versa (as in Java EE where applications are centralized in a application server and the data is collected to it over the network) in order to process the data locally. </p>
<p>At its core, Hadoop has two parts:</font></p>
<p>· <strong>Hadoop Distributed File System</strong> (<strong>HDFS™</strong>): a distributed file system that provides high-throughput access to application data.<br />
· <strong>YARN</strong> (<strong>Yet Another Resource Negotiator</strong>): a framework for job scheduling and cluster resource management.</p>
<p>As you can see in the very definition of the Apache Hadoop website (<a href="http://hadoop.apache.org/#What+Is+Apache+Hadoop%3Fhttp://" title="What is Apache Hadoop?" target="_blank">what is Apache Hadoop?</a>), Hadoop offers as a third component <strong>Hadoop MapReduce</strong>, a batch-based, distributed computing framework modeled after Google’s paper on MapReduce. It allows you to parallelize work over a large amount of raw data by splitting the input dataset into independent chunks which are processed by the map tasks (initial ingestion and transformation) in parallel, whose outputs are sorted and then passed to the reduce tasks (aggregation or summarization).</p>
<p>In the previous version of Hadoop (Hadoop 1), the implementation of MapReduce was based on a master <em>JobTracker</em>, for  resource management and job scheduling/monitoring, and per-node slaves called <em>TaskTracker</em> to launch/teardown tasks. But it had scalability problems, specially when you wanted very large clusters (more than 4,000 nodes).</p>
<p>So, MapReduce has undergone a complete overhaul and now is called MapReduce 2.0 (MRv2), but it is not a part by itself, currently, <u><strong>MapReduce</strong> is a YARN-based system</u>. That&#8217;s the reason why we can say that Hadoop has two main parts: HDFS and YARN.</p>
<h1>Hadoop ecosystem</h1>
<p>Besides the two core technologies, the distributed file system (HDFS) and Map Reduce (MR), there are a lot of projects that expand Hadoop with additional useful technologies, in such a way that we can consider all of them an ecosystem around Hadoop.</p>
<p>Next, a list of some of these projects, organized by some kind of categories:</p>
<ul>
<li><strong>Data Ingestion:</strong> to move data from and into HDFS
<ul>
<li><u>Flume</u>: a system for moving data into HDFS from remote systems using configurable memory-resident daemons that watch for data on those systems and then forward the data to Hadoop. For example, weblogs from multiple servers to HDFS.</li>
<li><u>Sqoop</u>: a tool for efficient bulk transfer of data between structured data stores (such as relational databases) and HDFS.</li>
</ul>
</li>
<li><strong>Data Processing:</strong>
<ul>
<li><u>Pig</u>: a procedural language for querying and data transform with scripts in a data flow language call PigLatin.</li>
<li><u>Hive</u>: a declarative SQL-like kanguage.</li>
<li><u>Spark</u>: an in-memory distributed data processing that breaks problems up over all of the Hadoop nodes, but keeps the data in memory for better performance and can be rebuilt with the details stored in the Resilient Distributed Dataset (RDD) from an external store (usually HDFS).</li>
<li><u>Storm</u>: a distributed real-time computation system for processing fast, large streams of data.</li>
</ul>
</li>
<li><strong>Data Formats:</strong>
<ul>
<li><u>Avro</u>: a language-neutral data serialization system. Expressed as JSON.</li>
<li><u>Parquet</u>: a compressed columnar storage format that can efficiently store nested data</li>
</ul>
</li>
<li><strong>Storage:</strong>
<ul>
<li><u>HBase</u>: a scalable, distributed database that supports structured data storage for large tables.</li>
<li><u>Accumulo</u>: a scalable, distributed database that supports structured data storage for large tables.</li>
</ul>
</li>
<li><strong>Coordination:</strong>
<ul>
<li><u>Zookeeper</u>: a high-performance coordination service for distributed applications.</li>
</ul>
</li>
<li><strong>Machine Learning:</strong>
<ul>
<li><u>Mahout</u>: a scalable machine learning and data mining library: classification, clustering, pattern mining, collaborative filtering and so on.</li>
</ul>
</li>
<li><strong>Workflow Management:</strong>
<ul>
<li><u>Oozie</u>: a service for running and scheduling workflows of Hadoop jobs (including Map-Reduce, Pig, Hive, and Sqoop jobs).</li>
</ul>
</li>
</ul>
<h1>Hadoop installation</h1>
<p>To install Hadoop on a single machine to try it out, just download the compressed file for the desired version and unpack it on the filesystem.</p>
<h2>Prerequisites</h2>
<p>There is some required software for running Apache Hadoop:</p>
<ul>
<li>Java. It&#8217;s also necessary to inform Hadoop where Java is via the environment variable JAVA_HOME</li>
<p></p><pre class="crayon-plain-tag">$ java -version
java version "1.8.0_31"
Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
$ echo $JAVA_HOME
/usr/lib/jvm/java-8-oracle</pre><p></p>
<li>ssh: I have Ubuntu 14.04 that comes with ssh, but I had to manually install a server.</li>
<p></p><pre class="crayon-plain-tag">$ which ssh
/usr/bin/ssh
$ which sshd
/usr/sbin/sshd</pre><p></p>
<li>On Mac OSX, make sure <strong>Remote Login</strong> (under <strong>System Preferences</strong> -> <strong>File Sharing</strong>) is enabled for the current user or for all users.</li>
<li>On Windows, the best option is to follow the instructions at the Wiki: <a href="http://wiki.apache.org/hadoop/Hadoop2OnWindows" title="Build and Install Hadoop 2.x or newer on Windows" target="_blank">Build and Install Hadoop 2.x or newer on Windows</a>.</li>
</ul>
<h2>Download and install</h2>
<p>To get a Hadoop distribution, download a recent stable release from one of the <a href="http://www.apache.org/dyn/closer.cgi/hadoop/common/http://" title="Apache Download Mirrors" target="_blank">Apache Download Mirrors</a>.</p>
<p>There are several directories, for the current, last stable, last v1 stable version and so on. Basically, you&#8217;ll download a tar gzipped file named <strong>hadoop-x.y.z.tar.gz</strong>, for instance: hadoop-2.6.0.tar.gz.</p>
<p>You can unpack it wherever you want and then point the PATH to that directory. For example:</p>
<p></p><pre class="crayon-plain-tag">$ tar xzf hadoop-2.6.0.tar.gz
$
$ export HADOOP_HOME=~/Applications/hadoop-2.6.0
$ export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin</pre><p></p>
<p>Now you can verify the installation by typing <strong>hadoop version</strong>:</p>
<p></p><pre class="crayon-plain-tag">$ hadoop version
Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using ~/Applications/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6.0.jar
$</pre><p></p>
<h2>Configuration</h2>
<p>Hadoop has three supported modes:</p>
<ul>
<li>Local (Standalone) Mode: a single Java process with daemons running. For development testing and debugging.</li>
<li>Pseudo-Distributed Mode: each Hadoop daemon runs in a separate Java process. For simulating a cluster on a small scale.</li>
<li>Fully-Distributed Mode: the Hadoop daemons run on a cluster of machines. If you want to take a look, see the oficial documentation: <a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html" title="Hadoop Cluster Setup" target="_blank">Hadoop MapReduce Next Generation &#8211; Cluster Setup</a>.</li>
</ul>
<p>In standalone mode, there is no further action to take, the default properties are enough and there are no daemons to run.</p>
<p>In pseudodistributed mode, you have to set up your computer as described at <a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.htmlhttp://" title="Hadoop MapReduce Next Generation - Setting up a Single Node Cluster." target="_blank">Hadoop MapReduce Next Generation &#8211; Setting up a Single Node Cluster</a>. But let&#8217;s review the steps needed.</p>
<p>You need at least a minimum configuration with four files in <strong>HADOOP_HOME/etc/hadoop/</strong>:</p>
<ul>
<li><strong>core-site.xml</strong>. Common configuration, default values at <a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/core-default.xml" title="core-default.xml" target="_blank">Configuration: core-default.xml</a></li>
<p></p><pre class="crayon-plain-tag">&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
&lt;?xml-stylesheet type=&quot;text/xsl&quot; href=&quot;configuration.xsl&quot;?&gt;
&lt;configuration&gt;
	&lt;property&gt;
		&lt;name&gt;fs.defaultFS&lt;/name&gt;
		&lt;value&gt;hdfs://localhost:8020&lt;/value&gt;
	&lt;/property&gt;
&lt;/configuration&gt;</pre><p></p>
<p><em>fs.defaultFS</em> replaces the deprecated <em>fs.default.name</em> whose default value is <em>file://</em></p>
<li><strong>hdfs-site.xml</strong>. HDFS configuration, default values at <a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml" title="hdfs-default.xml" target="_blank">Configuration: hdfs-default.xml</a></li>
<p></p><pre class="crayon-plain-tag">&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
&lt;?xml-stylesheet type=&quot;text/xsl&quot; href=&quot;configuration.xsl&quot;?&gt;
&lt;configuration&gt;
	&lt;property&gt;
		&lt;name&gt;dfs.replication&lt;/name&gt;
		&lt;value&gt;1&lt;/value&gt;
	&lt;/property&gt;
&lt;/configuration&gt;</pre><p></p>
<p><em>dfs.replication</em>is the default block replication, unless other is specified at creation time. The default value is 3, but we use 1 because we have only one node.</p>
<p>Other useful values are:</p>
<p><em>dfs.namenode.name.dir</em>, local path for storing the fsimage by the NN (defaults to file://${hadoop.tmp.dir}/dfs/name with hadoop.tmp.dir configurable at core-site.xml with default value /tmp/hadoop-${user.name})</p>
<p><em>dfs.datanode.data.dir</em>, local path for storing blocks by the DN (defaults to file://${hadoop.tmp.dir}/dfs/data)</p>
<li><strong>mapred-site.xml</strong>. MapReduce configuration, default values at <a href="http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml" title="mapred-default.xml" target="_blank">Configuration: mapred-default.xml</a></li>
<p></p><pre class="crayon-plain-tag">&lt;?xml version=&quot;1.0&quot;?&gt;
&lt;?xml-stylesheet type=&quot;text/xsl&quot; href=&quot;configuration.xsl&quot;?&gt;
&lt;configuration&gt;
	&lt;property&gt;
		&lt;name&gt;mapreduce.framework.name&lt;/name&gt;
		&lt;value&gt;yarn&lt;/value&gt;
	&lt;/property&gt;
&lt;/configuration&gt;</pre><p></p>
<p><em>mapreduce.framework.name</em>, the runtime framework for executing MapReduce jobs: local, classic or yarn.</p>
<p>Other useful values are:</p>
<p><em>mapreduce.jobtracker.system.dir</em>, the directory where MapReduce stores control files (defaults to ${hadoop.tmp.dir}/mapred/system).</p>
<p><em>mapreduce.cluster.local.dir</em>, the local directory where MapReduce stores intermediate data files (defaults to ${hadoop.tmp.dir}/mapred/local)</p>
<li><strong>yarn-site.xml</strong>. YARN configuration, default values at <a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/yarn-default.xml" title="yarn-default.xml" target="_blank">Configuration: yarn-default.xml</a></li>
</ul>
<p></p><pre class="crayon-plain-tag">&lt;?xml version=&quot;1.0&quot;?&gt;
&lt;configuration&gt;
	&lt;property&gt;
		&lt;name&gt;yarn.resourcemanager.hostname&lt;/name&gt;
		&lt;value&gt;localhost&lt;/value&gt;
	&lt;/property&gt;
	&lt;property&gt;
		&lt;name&gt;yarn.nodemanager.aux-services&lt;/name&gt;
		&lt;value&gt;mapreduce_shuffle&lt;/value&gt;
	&lt;/property&gt;
&lt;/configuration&gt;</pre><p></p>
<p><em>yarn.resourcemanager.hostname</em>, the host name of the Resource Manager.</p>
<p><em>yarn.nodemanager.aux-services</em>, list of auxiliary services executed by the Node Manager. The value mapreduce_shuffle is for the Suffle/Sort in MapReduce that is an auxilary service in Hadoop 2.x. </p>
<h2>Configuring SSH</h2>
<p>Pseudodistributed mode is like fully distributed mode with a single host: localhost. In order to start the daemons on the set of hosts in the cluster, SSH is used. So we&#8217;ll configure SSH to log in without password.</p>
<p>Remember that you need to have SSH installed and a server running. On Ubuntu, try this if you need so:</p>
<p></p><pre class="crayon-plain-tag">$ sudo apt-get install ssh</pre><p></p>
<p>Now create a SSH key with an empty passprhase</p>
<p></p><pre class="crayon-plain-tag">$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa_hadoop
$ cat ~/.ssh/id_rsa_hadoop.pub >> ~/.ssh/authorized_keys</pre><p></p>
<p>Finally, test if you can connect without a pasword by trying:</p>
<p></p><pre class="crayon-plain-tag">$ ssh localhost</pre><p></p>
<h2>First steps with HDFS</h2>
<p>Before using HDFS for the first time, some steps must be performed:</p>
<h3>Formatting the HDFS filesystem</h3>
<p>Just run the following command:</p>
<p></p><pre class="crayon-plain-tag">$ hdfs namenode -format</pre><p></p>
<h3>Starting the daemons</h3>
<p>To start the HDFS, YARN, and MapReduce daemons, type:</p>
<p></p><pre class="crayon-plain-tag">$ start-dfs.sh
$ start-yarn.sh
$ mr-jobhistory-daemon.sh start historyserver</pre><p></p>
<p>You can check what processes are running with the Java&#8217;s <strong>jps</strong> command:</p>
<p></p><pre class="crayon-plain-tag">$ jps
25648 NodeManager
25521 ResourceManager
25988 JobHistoryServer
25355 SecondaryNameNode
25180 DataNode
26025 Jps
$</pre><p></p>
<h3>Stopping the daemons</h3>
<p>Once it&#8217;s over, you can stop the daemons with:</p>
<p></p><pre class="crayon-plain-tag">$ mr-jobhistory-daemon.sh stop historyserver
$ stop-yarn.sh
$ stop-dfs.sh</pre><p></p>
<h3>Creating A User Directory</h3>
<p>You can create a home directory for a user with the next command:</p>
<p></p><pre class="crayon-plain-tag">$ hadoop fs -mkdir -p ~/Documents/hadoop-home/</pre><p></p>
<h1>Other Hadoop installations</h1>
<p>There are another way to get installed Hadoop, that is, using companies that provide products that include Apache Hadoop or some kind of derivatives:</p>
<ul>
<li><a href="http://aws.amazon.com/elasticmapreduce/" title="Amazon EMR" target="_blank">Amazon Elastic MapReduce (Amazon EMR)</a></li>
<li><a href="http://www.cloudera.com/content/cloudera/en/downloads.html" title="CDH" target="_blank">Cloudera&#8217;s Distribution including Apache Hadoop (CDH)</a></li>
<li><a href="http://hortonworks.com/hdp/" title="HDP" target="_blank">Hortonworks Data Platform Powered by Apache Hadoop (HDP)</a></li>
<li><a href="https://www.mapr.com/" title="MapR" target="_blank">MapR Technologies</a></li>
<li><a href="http://pivotal.io/big-data/pivotal-hd" title="Pivotal HD" target="_blank">Pivotal HD</a></li>
</ul>
<h1>Hadoop Distributed File System (HDFS)</h1>
<p>The HDFS filesystem designed for distributed storage of very large files (hundreds of megabytes, gigabytes, or terabytes in size) and distributed processing using commodity hardware. It is a hierarchical UNIX-like file system, but internally it splits large files into blocks (with size from 32MB to 128MB, being 64MB the default), in order to perform a distribution and a replication of these blocks among the nodes of the Hadoop cluster.The applications that use HDFS usually write data once and read data many times.</p>
<p>The HDFS has two types of nodes:</p>
<ul>
<li>The master <strong>NameNode</strong> (NN), that stores the filesystem tree and the metadata for locating the files and directories in the tree that are actually located in the DataNodes. It stores this information in memory, however, to ensure against data loss, it&#8217;s also saved to disk using two files: the namespace image and the edit log.
<ul>
<li>fsimage: a point in time snapshot of what HDFS looks like.</li>
<li>edit log: the deltas or changes to HDFS since the last snapshot.</li>
</ul>
<p>Both are prediodically merged.</p>
</li>
<li>The <strong>DataNode</strong>s (DN), that are responsible for serving the actual file data (once the client knows which one to use after contacting the NameNode). They also sends heatbeats every 3 seconds (by default) to the NN and block reports every 1 hour (by default) to the DN both for maintenance purposes.</li>
</ul>
<p>There is also a node poorly named <strong>Secondary NameNode</strong> that is not a failover node nor a backup node, it periodically merges the namespace image with the edit log to prevent the edit log from becoming too large. Thus, the best name for it is <strong>Checkpoint Node</strong>.</p>
<h2>The Command-Line Interface</h2>
<p>Once you have installed Hadoop, you can interact with HDFS, as well as other file systems that Hadoop supports (local filesystem, HFTP FS, S3 FS, and others), using the command line. The FS shell is invoked by:</p>
<p></p><pre class="crayon-plain-tag">$ hadoop fs <args></pre><p></p>
<p>Provided to you have hadoop in the PATH as we saw above.</p>
<p>You can find a list of available commands at <a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html" title="FS shell" target="_blank">File System Shell</a>.</p>
<p>You can perform operations like:</p>
<ul>
<li><a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#copyFromLocal" title="copyFromLocal" target="_blank">copyFromLocal</a> (putting files into HDFS)</li>
<li><a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#copyToLocal" title="copyToLocal" target="_blank">copyToLocal</a> (getting files form HDFS)</li>
<li><a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#mkdir" title="mkdir" target="_blank">mkdir</a> (creating directories in HDFS)</li>
<li><a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#ls" title="ls" target="_blank">ls</a> (list files in HDFS)</li>
</ul>
<h2>Data exchange with HDFS</h2>
<p>Hadoop is mainly written in Java, being the core class for HDFS the abstract class <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java" title="fs.FileSystem" target="_blank">org.apache.hadoop.fs.FileSystem</a>, that represents a filesystem in Hadoop. The several concrete subclasses provide implementations from local filesystem (<a href="https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java" title="fs.LocalFileSystem" target="_blank">fs.LocalFileSystem</a>) to HDFS (<a href="https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java" title="hdfs.DistributedFileSystem" target="_blank">hdfs.DistributedFileSystem</a>), or Amazon S3 (<a href="https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java" title="fs.s3native.NativeS3FileSystem" target="_blank">fs.s3native.NativeS3FileSystem</a>) and many more (read-only HTTP, FTP server, &#8230;)</p>
<h3>Reading data</h3>
<p>Reading data using the Java API involves to obtain the abstract <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java" title="fs.FileSystem" target="_blank">FileSystem</a> via one of the factory methods (<strong><em>get()</em></strong>), or the convenient method for retrieving the local filesystem (<strong><em>getLocal()</em></strong>):</p>
<p><strong>public static FileSystem get(Configuration conf) throws IOException<br />
public static FileSystem get(URI uri, Configuration conf) throws IOException<br />
public static FileSystem get(URI uri, Configuration conf, String user)<br />
throws IOException<br />
public static LocalFileSystem getLocal(Configuration conf) throws IOException</strong></p>
<p>And then obtain an input stream for a file (that can be later be closed):</p>
<p><strong>public FSDataInputStream open(Path f) throws IOException<br />
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException</strong></p>
<p>With these methods on hand, the flow of the data in HDFS will be the next:</p>
<div id="attachment_601" style="width: 650px" class="wp-caption alignnone"><a href="http://malsolo.com/blog4java/wp-content/uploads/2014/11/HDFS_Client_Read_File.jpg"><img src="http://malsolo.com/blog4java/wp-content/uploads/2014/11/HDFS_Client_Read_File.jpg" alt="HDFS read" width="640" height="367" class="size-full wp-image-601" /></a><p class="wp-caption-text">Reading from HDFS</p></div>
<ol>
<li>The client calls the <strong>open()</strong> method to read a file. A <strong>DistributedFileSystem</strong> is returned.</li>
<li>The DistributedFileSystem asks the <strong>NameNode</strong> for the block locations. The NameNode returns an ordered list of the DataNodes that have a copy of the block (sorted by proximity to the client). The DistributedFileSystem returns a <strong>FSDataInputStream</strong> to the client for it to read data from</li>
<li>The client calls <strong>read()</strong> on the input stream</li>
<li>The FSDataInputStream reads data for the client from the DataNode until there is no more data in that node.</li>
<li>The FSDataInputStream will manage the closing and opening connection to DataNodes for serving data to the client in a transparently way. It also manages validation (checksum) and errors (by trying to read data from a replica)</li>
<li>When the client has finished reading, it calls <strong>close()</strong> on the FSDataInputStream</li>
</ol>
<h3>Writing data</h3>
<p>The Java API allows you to create files with create methods (that, by the way, also create any parent directories of the file that don&#8217;t already exist). The API also includes a <strong>Progressable</strong> interface to be notified of the process of the data being written to the datanodes. It&#8217;s also possible to append data to an existing file, but this functionality is optional (S3 doesn&#8217;t support it from the time being)</p>
<p><strong>public FSDataOutputStream create(Path f) throws IOException<br />
public FSDataOutputStream append(Path f) throws IOException</strong></p>
<p>The output stream will be used for writing the data. Furthermore, the FSDataOutputStream can inform of the current position in the file.</p>
<p>The flow of the data written to HDFS with these methods is the next:</p>
<div id="attachment_602" style="width: 650px" class="wp-caption alignnone"><a href="http://malsolo.com/blog4java/wp-content/uploads/2014/11/HDFS_Client_Write_File.png"><img src="http://malsolo.com/blog4java/wp-content/uploads/2014/11/HDFS_Client_Write_File.png" alt="HDFS write" width="640" height="416" class="size-full wp-image-602" /></a><p class="wp-caption-text">Writing to HDFS</p></div>
<ol>
<li>The client creates the file by calling <strong>create()</strong> on <strong>DistributedFileSystem</strong>.</li>
<li>DistributedFileSystem makes an RPC call to the namenode to create a<br />
new file in the filesystem’s namespace, with no blocks associated with it. The NameNode checks file existence and permission, throwing IOException if theres is any problem, otherwise, it returns a FSDataOutputStream for writing data to.</li>
<li>The data written by the client is splitted into packets that are sent to a <em>data queue</em>.</li>
<li>The data queue is consumed by the Data Streamer which streams the packets to a pipeline of DataNodes (one per replication factor). Each DataNode stores the packet and send it to the next DataNode in the pipeline.</li>
<li>There is another queue, ack queue, that contains the packets that are waiting for acknowledged by all the datanodes in the pipeline. If a DataNode fails in a write operation, the pipeline will be re-arranged transparently for the client.</li>
<li>When the client has finished writing data, it calls <strong>close()</strong> on the stream.</li>
<li>The remaining packets are flushed and, after receiving all the acknowledgments, the NameNode is notified that the write to the file is completed.</li>
</ol>
<h1>Apache YARN (Yet Another Resource Negotiator)</h1>
<p>YARN is Hadoop’s cluster resource management system. It provides provides APIs for requesting and working with cluster resources to be used not by user code, but for higher level APIs, like MapReduce v2, Spark, Tez&#8230;</p>
<p>YARN separates resource management and job scheduling/monitoring into separate daemons. In Hadoop 1.x these two functions were performed by the JobScheduler, that implies a bottleneck for scaling the Hadoop nodes in the cluster.</p>
<h2>YARN Components</h2>
<p>There are five major component types in a YARN cluster:</p>
<ul>
<li><strong>Resource Manager (RM)</strong>: a global per-cluster daemon that is solely responsible for allocating and managing resources available within the cluster.</li>
<li><strong>Node Manager (NM)</strong>: a per-node daemon that is responsible for creating, monitoring, and killing containers.</li>
<li><strong>Application Master (AM)</strong>: This is a per-application daemon whose duty is the negotiation of resources from the ResourceManager and to work with the NodeManager(s) to execute and monitor the tasks.</li>
<li><strong>Container</strong>: This is an abstract representation of a resource set that is given to a particular application:  memory and cpu. It&#8217;s a computational unit (one node runs several containers, but a container cannot cross a node boundary). The AM is a specialized container that is used to bootstrap and manage the entire application&#8217;s life cycle.</li>
<li><strong>Application Client</strong>: it submits applications to the RM and it specifies the type of AM needed to execute the application (for instance, MapReduce).</li>
</ul>
<h2>Anatomy of a YARN Request</h2>
<p>These are the steps involved in the submission of a job to the YARN framework.</p>
<div id="attachment_649" style="width: 632px" class="wp-caption alignnone"><a href="http://malsolo.com/blog4java/wp-content/uploads/2015/02/yarn_architecture.gif"><img src="http://malsolo.com/blog4java/wp-content/uploads/2015/02/yarn_architecture.gif" alt="Anatomy of a YARN Request" width="622" height="385" class="size-full wp-image-649" /></a><p class="wp-caption-text">YARN architecture</p></div>
<ol>
<li>The client submits a job to the RM asking to run an AM process (Job Submission in the picture above).</li>
<li>The RM looks for resources to acquire a container on a node to launch an instance of the AM.</li>
<li>The AM registers with the RM to enable the client to query the RM for details about the AM.</li>
<li>Now the AM is running, and it could run the computation returning the result to the client, or it could request more containers to the RM to run a distributed computation (Resource Request in the picture above)</li>
<li>The application code executing in the launched container (tasks) reports its status to the AM through an application-specific protocol (MapReduce status in the picture above, that it&#8217;s assuming that the YARN application being executed is MapReduce).</li>
<li>Once the application completes execution, the AM deregisters with the RM, and the containers used are released back to the system.</li>
</ol>
<p>This process applies for each client that submits jobs. In the picture above there are two clients (the red one and the blue one)</p>
<h1>Hadoop first program: WordCount MapReduce</h1>
<p>MapReduce is a paradigm for data processing that uses two key phases:</p>
<ol>
<li><strong>Map</strong>: it performs a transformation on input key-value pairs to generate intermediate key-value pairs.</li>
<li><strong>Reduce</strong>: it performs a summarize function on intermediate key-value groups to generate the final output of key-value pairs.</li>
<li>The groups that are the input of the Reduce phase are created by sorting the output of the Map phase in an operation called as <strong>Short/Shuffle</strong> (in YARN, is an auxiliary service)
</ol>
<h2>Writing the program</h2>
<p>For writing a MapReduce program in Java for running it in Hadoop you need to provide a Mapper class, a Reducer class, and a driver program to run a job.</p>
<p>Let&#8217;s begin with <u>the Mapper class</u>, it will separate each word with a count of 1:</p>
<p></p><pre class="crayon-plain-tag">public class WordCountMapper extends Mapper&lt;LongWritable, Text, Text, IntWritable&gt; {
	
	private final static IntWritable ONE = new IntWritable(1);
	private Text word = new Text();
	
	@Override
	protected void map(LongWritable key, Text value,
			Mapper&lt;LongWritable, Text, Text, IntWritable&gt;.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());
			context.write(word, ONE);
		}
	}

}</pre><p></p>
<p>Highlights here are the parameters of the Mapper class, in this case:</p>
<ol>
<li>The input key, a long that will be ignored</li>
<li>The input value, a line of text</li>
<li>The output key, the word to be counted</li>
<li>The output value, the count for the word, always one, as we said before.</li>
</ol>
<p>As you can see, instead of using Java types, it&#8217;s better to use Hadoop basic types that are optimized for network serialization (available in the <em>org.apache.hadoop.io</em> package)</p>
<p>The basic approach is to override the <em>map()</em> method and make use of the key and value input parameters, as well as the instance of a Context to write the output to: the words with its count (one, for the moment being)</p>
<p>Let&#8217;s continue with <u>the Reducer class</u>.</p>
<p></p><pre class="crayon-plain-tag">public class WordCountReducer extends Reducer&lt;Text, IntWritable, Text, IntWritable&gt; {
	@Override
	protected void reduce(Text key, Iterable&lt;IntWritable&gt; values,
			Reducer&lt;Text, IntWritable, Text, IntWritable&gt;.Context context)
			throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {
			sum += val.get();
		}
		context.write(key, new IntWritable(sum));
	}
	
}</pre><p></p>
<p>The intermediate result from the Mapper will be partitioned by MapReduce in such a way that the same reducer will receive all output records containing the same key. MapReduce will also sort all the map output keys and will call each reducer only once for each output key along with a list of all the output values for this key.</p>
<p>Thus, to write a Mapper class, you override the method reduce that has as parameters the only key, the list of values as an iterable and an instance of the Context to write the final result to.</p>
<p>In our case, the reducer will sum the count that each words carry (always one) and it will write the result to the context.</p>
<p>Finally, <u>the Driver class</u>, the class that runs the MapReduce job.</p>
<p></p><pre class="crayon-plain-tag">public class WordCountDriver {
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] myArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (myArgs.length != 2) {
			System.err.println(&quot;Usage: WordCountDriver &lt;input path&gt; &lt;output path&gt;&quot;);
			System.exit(-1);
		}
		Job job = Job.getInstance(conf, &quot;Classic WordCount&quot;);
		job.setJarByClass(WordCountDriver.class);
		
		FileInputFormat.addInputPath(job, new Path(myArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(myArgs[1]));
		
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
		//job.setMapOutputKeyClass(Text.class);
		//job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}</pre><p></p>
<p>First, create a Hadoop Configuration (the default values are enough for this example) and use the <em>GenericOptionsParser</em> class to parse only the generic Hadoop arguments. </p>
<p>To configure, submit and control the execution of the job, as well as to monitor its progress, use a <em>Job</em> object. Take care of configuring it (via its <em>set()</em> methods) before submitting the job or an <em>IllegalStateException</em> will be thrown.</p>
<p>In a Hadoop cluster, the JAR package will be distributed around the cluster, to allow Hadoop to locate this JAR we pass a class in the Job ’s <em>setJarByClass()</em> method.</p>
<p>Next, we specify the input and output paths by calling the static <em>addInputPath()</em> (or <em>setInputPaths</em>) method on <em>FileInputFormat()</em> (with a file, directory or file pattern) and the static <em>setOutputPath()</em> method on <em>FileOutputFormat</em> (with a non-existing directory, in order to avoid data loss from another job) respectively.</p>
<p>Then, the job is configured with the Mapper class and the Reducer class.</p>
<p>There is no need for specifying the map output types because they are the same than the ones produced by the Reducer class, but we need to indicate the output types for the reduce function.</p>
<p>Finally, the <em>waitForCompletion()</em> method on Job submits the job and waits for it to finish. The argument is a flag for verbosity in the generated output. The return value indicates success (true) or failure (false). We use it for the the program’s exit code (0 or 1).</p>
<h2>Running the program</h2>
<p>The source code is available at <a href="https://github.com/jbbarquero/mapreduce" title="Mapreduce first program" target="_blank">github</a>. You can download it, go to the directory and just run the next commands:</p>
<p></p><pre class="crayon-plain-tag">$ mvn clean install
$ export HADOOP_CLASSPATH=target/mapreduce-0.0.1-SNAPSHOT.jar
$ hadoop com.malsolo.hadoop.mapreduce.WordCountDriver data/the_constitution_of_the_united_states.txt out</pre><p></p>
<p>You will see something like this:</p>
<p></p><pre class="crayon-plain-tag">15/02/25 15:30:47 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/02/25 15:30:47 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/02/25 15:30:47 INFO input.FileInputFormat: Total input paths to process : 1
15/02/25 15:30:47 INFO mapreduce.JobSubmitter: number of splits:1
15/02/25 15:30:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local284822998_0001
15/02/25 15:30:47 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/02/25 15:30:47 INFO mapreduce.Job: Running job: job_local284822998_0001
15/02/25 15:30:47 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/02/25 15:30:47 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/02/25 15:30:48 INFO mapred.LocalJobRunner: Waiting for map tasks
15/02/25 15:30:48 INFO mapred.LocalJobRunner: Starting task: attempt_local284822998_0001_m_000000_0
15/02/25 15:30:48 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/02/25 15:30:48 INFO mapred.MapTask: Processing split: file:.../mapreduce/data/the_constitution_of_the_united_states.txt:0+45119
15/02/25 15:30:48 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/02/25 15:30:48 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/02/25 15:30:48 INFO mapred.MapTask: soft limit at 83886080
15/02/25 15:30:48 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/02/25 15:30:48 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/02/25 15:30:48 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/02/25 15:30:48 INFO mapred.LocalJobRunner: 
15/02/25 15:30:48 INFO mapred.MapTask: Starting flush of map output
15/02/25 15:30:48 INFO mapred.MapTask: Spilling map output
15/02/25 15:30:48 INFO mapred.MapTask: bufstart = 0; bufend = 75556; bufvoid = 104857600
15/02/25 15:30:48 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26183792(104735168); length = 30605/6553600
15/02/25 15:30:48 INFO mapred.MapTask: Finished spill 0
15/02/25 15:30:48 INFO mapred.Task: Task:attempt_local284822998_0001_m_000000_0 is done. And is in the process of committing
15/02/25 15:30:48 INFO mapred.LocalJobRunner: map
15/02/25 15:30:48 INFO mapred.Task: Task 'attempt_local284822998_0001_m_000000_0' done.
15/02/25 15:30:48 INFO mapred.LocalJobRunner: Finishing task: attempt_local284822998_0001_m_000000_0
15/02/25 15:30:48 INFO mapred.LocalJobRunner: map task executor complete.
15/02/25 15:30:48 INFO mapred.LocalJobRunner: Waiting for reduce tasks
15/02/25 15:30:48 INFO mapred.LocalJobRunner: Starting task: attempt_local284822998_0001_r_000000_0
15/02/25 15:30:48 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/02/25 15:30:48 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@1bd34bf7
15/02/25 15:30:48 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10
15/02/25 15:30:48 INFO reduce.EventFetcher: attempt_local284822998_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
15/02/25 15:30:48 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local284822998_0001_m_000000_0 decomp: 90862 len: 90866 to MEMORY
15/02/25 15:30:48 INFO reduce.InMemoryMapOutput: Read 90862 bytes from map-output for attempt_local284822998_0001_m_000000_0
15/02/25 15:30:48 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 90862, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->90862
15/02/25 15:30:48 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
15/02/25 15:30:48 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/02/25 15:30:48 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
15/02/25 15:30:48 INFO mapred.Merger: Merging 1 sorted segments
15/02/25 15:30:48 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 90857 bytes
15/02/25 15:30:48 INFO reduce.MergeManagerImpl: Merged 1 segments, 90862 bytes to disk to satisfy reduce memory limit
15/02/25 15:30:48 INFO reduce.MergeManagerImpl: Merging 1 files, 90866 bytes from disk
15/02/25 15:30:48 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
15/02/25 15:30:48 INFO mapred.Merger: Merging 1 sorted segments
15/02/25 15:30:48 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 90857 bytes
15/02/25 15:30:48 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/02/25 15:30:48 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
15/02/25 15:30:48 INFO mapred.Task: Task:attempt_local284822998_0001_r_000000_0 is done. And is in the process of committing
15/02/25 15:30:48 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/02/25 15:30:48 INFO mapred.Task: Task attempt_local284822998_0001_r_000000_0 is allowed to commit now
15/02/25 15:30:48 INFO output.FileOutputCommitter: Saved output of task 'attempt_local284822998_0001_r_000000_0' to file:.../out/_temporary/0/task_local284822998_0001_r_000000
15/02/25 15:30:48 INFO mapred.LocalJobRunner: reduce > reduce
15/02/25 15:30:48 INFO mapred.Task: Task 'attempt_local284822998_0001_r_000000_0' done.
15/02/25 15:30:48 INFO mapred.LocalJobRunner: Finishing task: attempt_local284822998_0001_r_000000_0
15/02/25 15:30:48 INFO mapred.LocalJobRunner: reduce task executor complete.
15/02/25 15:30:48 INFO mapreduce.Job: Job job_local284822998_0001 running in uber mode : false
15/02/25 15:30:48 INFO mapreduce.Job:  map 100% reduce 100%
15/02/25 15:30:48 INFO mapreduce.Job: Job job_local284822998_0001 completed successfully
15/02/25 15:30:48 INFO mapreduce.Job: Counters: 33
	File System Counters
		FILE: Number of bytes read=283490
		FILE: Number of bytes written=809011
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=872
		Map output records=7652
		Map output bytes=75556
		Map output materialized bytes=90866
		Input split bytes=175
		Combine input records=0
		Combine output records=0
		Reduce input groups=1697
		Reduce shuffle bytes=90866
		Reduce input records=7652
		Reduce output records=1697
		Spilled Records=15304
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=8
		CPU time spent (ms)=0
		Physical memory (bytes) snapshot=0
		Virtual memory (bytes) snapshot=0
		Total committed heap usage (bytes)=525336576
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=45119
	File Output Format Counters 
		Bytes Written=17405
$</pre><p></p>
<p>And you can take a look to the result using:</p>
<p></p><pre class="crayon-plain-tag">$ sort -k2 -h -r out/part-r-00000 | head -20
the	663
of	494
shall	293
and	256
to	183
be	178
or	157
in	139
by	101
a	94
United	85
for	81
any	79
President	72
The	64
have	64
as	64
States,	55
such	52
State	47
$</pre><p></p>
<p>Regarding this example, I have to mention a couple of things:</p>
<ol>
<li>
	The code includes a data directory containing a text file (yes, the constitution of the USA)</p>
<ul>
<li>Yes, the program need some improvements for not considering the commas and something like that.</li>
<li>It&#8217;s funny to see the most repeated words (the, of, shall, and, to, be, or, in, by, a) and the most important words (United, States, President)</li>
</ul>
</li>
<li>Due to a problem with <a href="http://wiki.apache.org/hadoop/HadoopIPv6" title="Hadoop and IPv6" target="_blank">Hadoop and IPv6</a>, it doesn&#8217;t work with pseudistributed mode due to a connection exception (<font color="red"><em>java.net.ConnectException: Call [&#8230;] to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused</em></font>). For this example is enough to use local mode (just recover the original core-site.xml, hdfs-site.xml, mapred-site.xml and yarn-site.xml and you can also stop the HDFS, YARN, and MapReduce daemons. See above)</li>
</ol>
<h1>Resources</h1>
<ul>
<li><a href="http://shop.oreilly.com/product/0636920033448.do" title="Hadoop: The Definitive Guide" target="_blank">Hadoop: The Definitive Guide, 4th Edition</a>. By Tom White (O&#8217;Reilly Media)</li>
<li><a href="http://www.apress.com/9781430248637?gtmf=s" title="Pro Apache Hadoop" target="_blank">Pro Apache Hadoop, 2nd Edition</a>. By Sameer Wadkar, Madhu Siddalingaiah, Jason Venner (Apress)</li>
<li><a href="https://www.packtpub.com/big-data-and-business-intelligence/mastering-hadoop" title="Mastering Hadoop" target="_blank">Mastering Hadoop</a>. By Sandeep Karanth (Packt publishing)</li>
<li><a href="http://www.manning.com/holmes2/" title="Hadoop in Practice, Second Edition" target="_blank">Hadoop in Practice, Second Edition</a>. By Alex Holmes (Manning publications)</li>
<li><a href="http://www.manning.com/lam2/" title="Hadoop in Action, Second Edition" target="_blank">Hadoop in Action, Second Edition</a>. By Chuck P. Lam and Mark W. Davis (Manning publications)</li>
<li><a href="https://www.youtube.com/watch?v=xYnS9PQRXTg" title="Hadoop - Just the Basics for Big Data Rookies" target="_blank">Hadoop &#8211; Just the Basics for Big Data Rookies</a>. By  Adam Shook (SpringDeveloper YouTube channel)</li>
<li><a href="https://www.youtube.com/watch?v=tIPA6vMZomQ" title="Getting started with Spring Data and Apache Hadoop" target="_blank">Getting started with Spring Data and Apache Hadoop</a>. By Thomas Risberg, Janne Valkealahti (SpringDeveloper YouTube channel)</li>
<li><a href="https://www.youtube.com/watch?v=IcuTdJgUFmo" title="Hadoop 201 -- Deeper into the Elephant" target="_blank">Hadoop 201 &#8212; Deeper into the Elephant</a>. By Roman Shaposhnik (SpringDeveloper YouTube channel)</li>
<li><a href="http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/SingleCluster.html" title="Setting up a Single Node Cluster" target="_blank">Hadoop MapReduce Next Generation &#8211; Setting up a Single Node Cluster</a>.</li>
<li><a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html" title="FileSystemShell" target="_blank">The File System (FS) shell</a></li>
<li><a href="http://www.adictosaltrabajo.com/tutoriales/tutoriales.php?pagina=mapreduce_basic" title="Primeros pasos con Hadoop: instalación y configuración en Linux" target="_blank">Primeros pasos con Hadoop: instalación y configuración en Linux</a>. By Juan Alonso Ramos (Adictos al trabajo)</li>
<li><a href="http://blog.cloudera.com/blog/2014/06/how-to-install-a-virtual-apache-hadoop-cluster-with-vagrant-and-cloudera-manager/" title="Hadoop virtual cluster: Vagrant and CDH" target="_blank">How-to: Install a Virtual Apache Hadoop Cluster with Vagrant and Cloudera Manager</a>. By Justin Kestelyn (@kestelyn Cloudera blog()</li>
<li><a href="http://hortonworks.com/blog/building-hadoop-vm-quickly-ambari-vagrant/" title="Hadoop VM: Ambari and Vagrant, HDP" target="_blank">How to build a Hadoop VM with Ambari and Vagran</a>t. By Saptak Sen (Hortonworks blog)</li>
<li><a href="http://hadoopguide.blogspot.com.es/2013/05/hadoop-hdfs-data-flow-io-classes.html" title="Hadoop HDFS Data Flow IO Classes" target="_blank">Hadoop HDFS Data Flow IO Classes</a>. By Shrey Mehrotra (Hadoop Ecosystem : Hadoop 2.x)</li>
</ul>
]]></content:encoded>
			<wfw:commentRss>http://malsolo.com/blog4java/?feed=rss2&#038;p=516</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
	</channel>
</rss>
