Blog4Java

A personal and Java blog, likely only for me

Getting started with Hadoop

| 0 comments

Hadoop Introduction

Hadoop is an open source framework for distributed fault-tolerant data storage and batch processing. It allows you to write applications for processing really huge data sets across clusters of computers using simple programming model with linear scalability on commodity hardware. Commodity hardware means cheaper hardware than the dedicated servers that are sold by many vendors. Linear scalability means that you have only to add more machines (nodes) to the Hadoop cluster.

The key concept for Hadoop is move-code-to-data, that is, data is distributed across the nodes of the Hadoop cluster and the applications (the jar files) are later sent to that nodes instead of vice versa (as in Java EE where applications are centralized in a application server and the data is collected to it over the network) in order to process the data locally.

At its core, Hadoop has two parts:

· Hadoop Distributed File System (HDFS™): a distributed file system that provides high-throughput access to application data.
· YARN (Yet Another Resource Negotiator): a framework for job scheduling and cluster resource management.

As you can see in the very definition of the Apache Hadoop website (what is Apache Hadoop?), Hadoop offers as a third component Hadoop MapReduce, a batch-based, distributed computing framework modeled after Google’s paper on MapReduce. It allows you to parallelize work over a large amount of raw data by splitting the input dataset into independent chunks which are processed by the map tasks (initial ingestion and transformation) in parallel, whose outputs are sorted and then passed to the reduce tasks (aggregation or summarization).

In the previous version of Hadoop (Hadoop 1), the implementation of MapReduce was based on a master JobTracker, for resource management and job scheduling/monitoring, and per-node slaves called TaskTracker to launch/teardown tasks. But it had scalability problems, specially when you wanted very large clusters (more than 4,000 nodes).

So, MapReduce has undergone a complete overhaul and now is called MapReduce 2.0 (MRv2), but it is not a part by itself, currently, MapReduce is a YARN-based system. That’s the reason why we can say that Hadoop has two main parts: HDFS and YARN.

Hadoop ecosystem

Besides the two core technologies, the distributed file system (HDFS) and Map Reduce (MR), there are a lot of projects that expand Hadoop with additional useful technologies, in such a way that we can consider all of them an ecosystem around Hadoop.

Next, a list of some of these projects, organized by some kind of categories:

  • Data Ingestion: to move data from and into HDFS
    • Flume: a system for moving data into HDFS from remote systems using configurable memory-resident daemons that watch for data on those systems and then forward the data to Hadoop. For example, weblogs from multiple servers to HDFS.
    • Sqoop: a tool for efficient bulk transfer of data between structured data stores (such as relational databases) and HDFS.
  • Data Processing:
    • Pig: a procedural language for querying and data transform with scripts in a data flow language call PigLatin.
    • Hive: a declarative SQL-like kanguage.
    • Spark: an in-memory distributed data processing that breaks problems up over all of the Hadoop nodes, but keeps the data in memory for better performance and can be rebuilt with the details stored in the Resilient Distributed Dataset (RDD) from an external store (usually HDFS).
    • Storm: a distributed real-time computation system for processing fast, large streams of data.
  • Data Formats:
    • Avro: a language-neutral data serialization system. Expressed as JSON.
    • Parquet: a compressed columnar storage format that can efficiently store nested data
  • Storage:
    • HBase: a scalable, distributed database that supports structured data storage for large tables.
    • Accumulo: a scalable, distributed database that supports structured data storage for large tables.
  • Coordination:
    • Zookeeper: a high-performance coordination service for distributed applications.
  • Machine Learning:
    • Mahout: a scalable machine learning and data mining library: classification, clustering, pattern mining, collaborative filtering and so on.
  • Workflow Management:
    • Oozie: a service for running and scheduling workflows of Hadoop jobs (including Map-Reduce, Pig, Hive, and Sqoop jobs).

Hadoop installation

To install Hadoop on a single machine to try it out, just download the compressed file for the desired version and unpack it on the filesystem.

Prerequisites

There is some required software for running Apache Hadoop:

  • Java. It’s also necessary to inform Hadoop where Java is via the environment variable JAVA_HOME
  • ssh: I have Ubuntu 14.04 that comes with ssh, but I had to manually install a server.
  • On Mac OSX, make sure Remote Login (under System Preferences -> File Sharing) is enabled for the current user or for all users.
  • On Windows, the best option is to follow the instructions at the Wiki: Build and Install Hadoop 2.x or newer on Windows.

Download and install

To get a Hadoop distribution, download a recent stable release from one of the Apache Download Mirrors.

There are several directories, for the current, last stable, last v1 stable version and so on. Basically, you’ll download a tar gzipped file named hadoop-x.y.z.tar.gz, for instance: hadoop-2.6.0.tar.gz.

You can unpack it wherever you want and then point the PATH to that directory. For example:

Now you can verify the installation by typing hadoop version:

Configuration

Hadoop has three supported modes:

  • Local (Standalone) Mode: a single Java process with daemons running. For development testing and debugging.
  • Pseudo-Distributed Mode: each Hadoop daemon runs in a separate Java process. For simulating a cluster on a small scale.
  • Fully-Distributed Mode: the Hadoop daemons run on a cluster of machines. If you want to take a look, see the oficial documentation: Hadoop MapReduce Next Generation – Cluster Setup.

In standalone mode, there is no further action to take, the default properties are enough and there are no daemons to run.

In pseudodistributed mode, you have to set up your computer as described at Hadoop MapReduce Next Generation – Setting up a Single Node Cluster. But let’s review the steps needed.

You need at least a minimum configuration with four files in HADOOP_HOME/etc/hadoop/:

  • core-site.xml. Common configuration, default values at Configuration: core-default.xml
  • fs.defaultFS replaces the deprecated fs.default.name whose default value is file://

  • hdfs-site.xml. HDFS configuration, default values at Configuration: hdfs-default.xml
  • dfs.replicationis the default block replication, unless other is specified at creation time. The default value is 3, but we use 1 because we have only one node.

    Other useful values are:

    dfs.namenode.name.dir, local path for storing the fsimage by the NN (defaults to file://${hadoop.tmp.dir}/dfs/name with hadoop.tmp.dir configurable at core-site.xml with default value /tmp/hadoop-${user.name})

    dfs.datanode.data.dir, local path for storing blocks by the DN (defaults to file://${hadoop.tmp.dir}/dfs/data)

  • mapred-site.xml. MapReduce configuration, default values at Configuration: mapred-default.xml
  • mapreduce.framework.name, the runtime framework for executing MapReduce jobs: local, classic or yarn.

    Other useful values are:

    mapreduce.jobtracker.system.dir, the directory where MapReduce stores control files (defaults to ${hadoop.tmp.dir}/mapred/system).

    mapreduce.cluster.local.dir, the local directory where MapReduce stores intermediate data files (defaults to ${hadoop.tmp.dir}/mapred/local)

  • yarn-site.xml. YARN configuration, default values at Configuration: yarn-default.xml

yarn.resourcemanager.hostname, the host name of the Resource Manager.

yarn.nodemanager.aux-services, list of auxiliary services executed by the Node Manager. The value mapreduce_shuffle is for the Suffle/Sort in MapReduce that is an auxilary service in Hadoop 2.x.

Configuring SSH

Pseudodistributed mode is like fully distributed mode with a single host: localhost. In order to start the daemons on the set of hosts in the cluster, SSH is used. So we’ll configure SSH to log in without password.

Remember that you need to have SSH installed and a server running. On Ubuntu, try this if you need so:

Now create a SSH key with an empty passprhase

Finally, test if you can connect without a pasword by trying:

First steps with HDFS

Before using HDFS for the first time, some steps must be performed:

Formatting the HDFS filesystem

Just run the following command:

Starting the daemons

To start the HDFS, YARN, and MapReduce daemons, type:

You can check what processes are running with the Java’s jps command:

Stopping the daemons

Once it’s over, you can stop the daemons with:

Creating A User Directory

You can create a home directory for a user with the next command:

Other Hadoop installations

There are another way to get installed Hadoop, that is, using companies that provide products that include Apache Hadoop or some kind of derivatives:

Hadoop Distributed File System (HDFS)

The HDFS filesystem designed for distributed storage of very large files (hundreds of megabytes, gigabytes, or terabytes in size) and distributed processing using commodity hardware. It is a hierarchical UNIX-like file system, but internally it splits large files into blocks (with size from 32MB to 128MB, being 64MB the default), in order to perform a distribution and a replication of these blocks among the nodes of the Hadoop cluster.The applications that use HDFS usually write data once and read data many times.

The HDFS has two types of nodes:

  • The master NameNode (NN), that stores the filesystem tree and the metadata for locating the files and directories in the tree that are actually located in the DataNodes. It stores this information in memory, however, to ensure against data loss, it’s also saved to disk using two files: the namespace image and the edit log.
    • fsimage: a point in time snapshot of what HDFS looks like.
    • edit log: the deltas or changes to HDFS since the last snapshot.

    Both are prediodically merged.

  • The DataNodes (DN), that are responsible for serving the actual file data (once the client knows which one to use after contacting the NameNode). They also sends heatbeats every 3 seconds (by default) to the NN and block reports every 1 hour (by default) to the DN both for maintenance purposes.

There is also a node poorly named Secondary NameNode that is not a failover node nor a backup node, it periodically merges the namespace image with the edit log to prevent the edit log from becoming too large. Thus, the best name for it is Checkpoint Node.

The Command-Line Interface

Once you have installed Hadoop, you can interact with HDFS, as well as other file systems that Hadoop supports (local filesystem, HFTP FS, S3 FS, and others), using the command line. The FS shell is invoked by:

Provided to you have hadoop in the PATH as we saw above.

You can find a list of available commands at File System Shell.

You can perform operations like:

Data exchange with HDFS

Hadoop is mainly written in Java, being the core class for HDFS the abstract class org.apache.hadoop.fs.FileSystem, that represents a filesystem in Hadoop. The several concrete subclasses provide implementations from local filesystem (fs.LocalFileSystem) to HDFS (hdfs.DistributedFileSystem), or Amazon S3 (fs.s3native.NativeS3FileSystem) and many more (read-only HTTP, FTP server, …)

Reading data

Reading data using the Java API involves to obtain the abstract FileSystem via one of the factory methods (get()), or the convenient method for retrieving the local filesystem (getLocal()):

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user)
throws IOException
public static LocalFileSystem getLocal(Configuration conf) throws IOException

And then obtain an input stream for a file (that can be later be closed):

public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

With these methods on hand, the flow of the data in HDFS will be the next:

HDFS read

Reading from HDFS

  1. The client calls the open() method to read a file. A DistributedFileSystem is returned.
  2. The DistributedFileSystem asks the NameNode for the block locations. The NameNode returns an ordered list of the DataNodes that have a copy of the block (sorted by proximity to the client). The DistributedFileSystem returns a FSDataInputStream to the client for it to read data from
  3. The client calls read() on the input stream
  4. The FSDataInputStream reads data for the client from the DataNode until there is no more data in that node.
  5. The FSDataInputStream will manage the closing and opening connection to DataNodes for serving data to the client in a transparently way. It also manages validation (checksum) and errors (by trying to read data from a replica)
  6. When the client has finished reading, it calls close() on the FSDataInputStream

Writing data

The Java API allows you to create files with create methods (that, by the way, also create any parent directories of the file that don’t already exist). The API also includes a Progressable interface to be notified of the process of the data being written to the datanodes. It’s also possible to append data to an existing file, but this functionality is optional (S3 doesn’t support it from the time being)

public FSDataOutputStream create(Path f) throws IOException
public FSDataOutputStream append(Path f) throws IOException

The output stream will be used for writing the data. Furthermore, the FSDataOutputStream can inform of the current position in the file.

The flow of the data written to HDFS with these methods is the next:

HDFS write

Writing to HDFS

  1. The client creates the file by calling create() on DistributedFileSystem.
  2. DistributedFileSystem makes an RPC call to the namenode to create a
    new file in the filesystem’s namespace, with no blocks associated with it. The NameNode checks file existence and permission, throwing IOException if theres is any problem, otherwise, it returns a FSDataOutputStream for writing data to.
  3. The data written by the client is splitted into packets that are sent to a data queue.
  4. The data queue is consumed by the Data Streamer which streams the packets to a pipeline of DataNodes (one per replication factor). Each DataNode stores the packet and send it to the next DataNode in the pipeline.
  5. There is another queue, ack queue, that contains the packets that are waiting for acknowledged by all the datanodes in the pipeline. If a DataNode fails in a write operation, the pipeline will be re-arranged transparently for the client.
  6. When the client has finished writing data, it calls close() on the stream.
  7. The remaining packets are flushed and, after receiving all the acknowledgments, the NameNode is notified that the write to the file is completed.

Apache YARN (Yet Another Resource Negotiator)

YARN is Hadoop’s cluster resource management system. It provides provides APIs for requesting and working with cluster resources to be used not by user code, but for higher level APIs, like MapReduce v2, Spark, Tez…

YARN separates resource management and job scheduling/monitoring into separate daemons. In Hadoop 1.x these two functions were performed by the JobScheduler, that implies a bottleneck for scaling the Hadoop nodes in the cluster.

YARN Components

There are five major component types in a YARN cluster:

  • Resource Manager (RM): a global per-cluster daemon that is solely responsible for allocating and managing resources available within the cluster.
  • Node Manager (NM): a per-node daemon that is responsible for creating, monitoring, and killing containers.
  • Application Master (AM): This is a per-application daemon whose duty is the negotiation of resources from the ResourceManager and to work with the NodeManager(s) to execute and monitor the tasks.
  • Container: This is an abstract representation of a resource set that is given to a particular application: memory and cpu. It’s a computational unit (one node runs several containers, but a container cannot cross a node boundary). The AM is a specialized container that is used to bootstrap and manage the entire application’s life cycle.
  • Application Client: it submits applications to the RM and it specifies the type of AM needed to execute the application (for instance, MapReduce).

Anatomy of a YARN Request

These are the steps involved in the submission of a job to the YARN framework.

Anatomy of a YARN Request

YARN architecture

  1. The client submits a job to the RM asking to run an AM process (Job Submission in the picture above).
  2. The RM looks for resources to acquire a container on a node to launch an instance of the AM.
  3. The AM registers with the RM to enable the client to query the RM for details about the AM.
  4. Now the AM is running, and it could run the computation returning the result to the client, or it could request more containers to the RM to run a distributed computation (Resource Request in the picture above)
  5. The application code executing in the launched container (tasks) reports its status to the AM through an application-specific protocol (MapReduce status in the picture above, that it’s assuming that the YARN application being executed is MapReduce).
  6. Once the application completes execution, the AM deregisters with the RM, and the containers used are released back to the system.

This process applies for each client that submits jobs. In the picture above there are two clients (the red one and the blue one)

Hadoop first program: WordCount MapReduce

MapReduce is a paradigm for data processing that uses two key phases:

  1. Map: it performs a transformation on input key-value pairs to generate intermediate key-value pairs.
  2. Reduce: it performs a summarize function on intermediate key-value groups to generate the final output of key-value pairs.
  3. The groups that are the input of the Reduce phase are created by sorting the output of the Map phase in an operation called as Short/Shuffle (in YARN, is an auxiliary service)

Writing the program

For writing a MapReduce program in Java for running it in Hadoop you need to provide a Mapper class, a Reducer class, and a driver program to run a job.

Let’s begin with the Mapper class, it will separate each word with a count of 1:

Highlights here are the parameters of the Mapper class, in this case:

  1. The input key, a long that will be ignored
  2. The input value, a line of text
  3. The output key, the word to be counted
  4. The output value, the count for the word, always one, as we said before.

As you can see, instead of using Java types, it’s better to use Hadoop basic types that are optimized for network serialization (available in the org.apache.hadoop.io package)

The basic approach is to override the map() method and make use of the key and value input parameters, as well as the instance of a Context to write the output to: the words with its count (one, for the moment being)

Let’s continue with the Reducer class.

The intermediate result from the Mapper will be partitioned by MapReduce in such a way that the same reducer will receive all output records containing the same key. MapReduce will also sort all the map output keys and will call each reducer only once for each output key along with a list of all the output values for this key.

Thus, to write a Mapper class, you override the method reduce that has as parameters the only key, the list of values as an iterable and an instance of the Context to write the final result to.

In our case, the reducer will sum the count that each words carry (always one) and it will write the result to the context.

Finally, the Driver class, the class that runs the MapReduce job.

First, create a Hadoop Configuration (the default values are enough for this example) and use the GenericOptionsParser class to parse only the generic Hadoop arguments.

To configure, submit and control the execution of the job, as well as to monitor its progress, use a Job object. Take care of configuring it (via its set() methods) before submitting the job or an IllegalStateException will be thrown.

In a Hadoop cluster, the JAR package will be distributed around the cluster, to allow Hadoop to locate this JAR we pass a class in the Job ’s setJarByClass() method.

Next, we specify the input and output paths by calling the static addInputPath() (or setInputPaths) method on FileInputFormat() (with a file, directory or file pattern) and the static setOutputPath() method on FileOutputFormat (with a non-existing directory, in order to avoid data loss from another job) respectively.

Then, the job is configured with the Mapper class and the Reducer class.

There is no need for specifying the map output types because they are the same than the ones produced by the Reducer class, but we need to indicate the output types for the reduce function.

Finally, the waitForCompletion() method on Job submits the job and waits for it to finish. The argument is a flag for verbosity in the generated output. The return value indicates success (true) or failure (false). We use it for the the program’s exit code (0 or 1).

Running the program

The source code is available at github. You can download it, go to the directory and just run the next commands:

You will see something like this:

And you can take a look to the result using:

Regarding this example, I have to mention a couple of things:

  1. The code includes a data directory containing a text file (yes, the constitution of the USA)

    • Yes, the program need some improvements for not considering the commas and something like that.
    • It’s funny to see the most repeated words (the, of, shall, and, to, be, or, in, by, a) and the most important words (United, States, President)
  2. Due to a problem with Hadoop and IPv6, it doesn’t work with pseudistributed mode due to a connection exception (java.net.ConnectException: Call […] to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused). For this example is enough to use local mode (just recover the original core-site.xml, hdfs-site.xml, mapred-site.xml and yarn-site.xml and you can also stop the HDFS, YARN, and MapReduce daemons. See above)

Resources

Author: Javier (@jbbarquero)

Java EE developer, swim addict, occasional videogames player, fan of Ben Alex and Shamus Young, and deeply in love with my wife. Sooner or later I'll dedicate a post to expand this simple introduction.

Leave a Reply

Required fields are marked *.