Hadoop Tutorial & Inteview Questions

Hadoop Tutorial

BigData

Big Data is a term that represents data sets whose size is beyond the capacity of commonly used software tools to manage and process the data within a tolerable elapsed time. Big data sizes are a constantly moving target, as of 2012 ranging from a few dozen terabytes to many petabytes of data in a single data set. It is the term for a collection of data sets, so large and complex that it becomes difficult to process using on-hand database management tools or traditional data processing applications.

Bigdata is term which defines three characteristics

  • Volum
  • Velocity
  • Variety

Already we have RDBMS to store and process structured data. But of late we have been getting data in form of videos, images and text. This data is called as unstructured data and semi-structured data. It is difficult to efficiently store and process these data using RDBMS. So definitely, we have to find an alternative way to store and to process this type of unstructured and semi-structured data.

HADOOP is one of the technologies to efficiently store and to process large set of data. This HADOOP is entirely different from Traditional distributed file system. It can overcome all the problems exits in the traditional distributed systems. HADOOP is an open source framework written in java for storing data in a distributed file system and processing the data in parallel manner across cluster of commodity nodes.

The Motivation for Hadoop

What problems exist with ‘traditional’ large-scale computing systems?

What requirements an alternative approach should have?

How Hadoop addresses those requirements?

Problems with Traditional Large-Scale Systems

  • Traditionally, computation has been processor-bound Relatively small amounts of data
  • For decades, the primary push was to increase the computing power of a single machine
  • Distributed systems evolved to allow developers to use multiple machines for a single job

Distributed Systems: Data Storage

  • Typically, data for a distributed system is stored on a SAN
  • At compute time, data is copied to the compute nodes
  • Fine for relatively limited amounts of data

Distributed Systems: Problems

  • Programming for traditional distributed systems is complex
  • Data exchange requires synchronization
  • Finite bandwidth is available
  • Temporal dependencies are complicated
  • It is difficult to deal with partial failures of the system

The Data-Driven World

  • Modern systems have to deal with far more data than was the case in the past
  • Organizations are generating huge amounts of data
  • That data has inherent value, and cannot be discarded

Examples: Facebook -over 70PB of data, EBay -over SPB of data  etc

Many organizations are generating data at a rate of terabytes per day. Getting the data to the processors becomes the bottleneck.

Requirements for a New Approach

Partial Failure Support

  • The system must support partial failure
  • Failure of a component should result in a graceful degradation of application performance. Not complete failure of the entire system.

Data Recoverability

  • If a component of the system fails, its workload should be assumed by still-functioning units in the system
  • Failure should not result in the loss of any data

Component Recovery

  • If a component of the system fails and then recovers, it should be able to rejoin the system.
  • Without requiring a full restart of the entire system

Consistency

Component failures during execution of a job should not affect the outcome of the job

Scalability

Adding load to the system should result in a graceful decline in performance of individual jobs Not failure of the system

Increasing resources should support a proportional increase in load capacity.

Hadoop’s History

  • Hadoop is based on work done by Google in the late 1990’s to early 2000.
  • Specifically, on papers describing the Google File System (GFS) published in 2003, and MapReduce published in 2004.
  • This work takes a radical new approach to the problems of Distributed computing so that it meets all the requirements of reliability and availability.
  • This core concept is distributing the data as it is initially stored in the system.
  • Individual nodes can work on data local to those nodes so data cannot be transmitted over the network.
  • Developers need not to worry about network programming, temporal dependencies or low level infrastructure.
  • Nodes can talk to each other as little as possible. Developers should not write code which communicates between nodes.
  • Data spread among the machines in advance so that computation happens where the data is stored, wherever possible.
  • Data is replicated multiple times on the system for increasing availability and reliability.
  • When data is loaded into the system, it splits the input file into ‘blocks ” typically 64MB or 128MB.
  • Map tasks generally work on relatively small portions of data that is typically a single block.
  • A master program allocates work to nodes such that a map task will work on a block of data stored locally on that node whenever possible.
  • Nodes work in parallel to each of their own part of the dataset.
  • If a node fails, the master will detect that failure and re-assigns the work to some other node on the system.
  • Restarting a task does not require communication with nodes working on other portions of the data.
  • If failed node restarts, it is automatically add back to the system and will be assigned with new tasks.

Hadoop Overview

Hadoop consists of two core components

  1.  HDFS
  2. MapReduce

There are many other projects based around core concepts of Hadoop. All these projects are called as Hadoop Ecosystem.

Hadoop Ecosystem has

Pig

Hive Flume

Sqoop Oozie

and so on…

A set of machines running HDFS and MapReduce is known as hadoop cluster and Individual machines are known as nodes. A cluster can have as few as one node or as many as several thousands of nodes. As the no of nodes are increased performance will be increased. The other languages except java (C++, RubyOnRails, Python, Perl etc … ) that are supported by hadoop are called as HADOOP Streaming.

HADOOP S/W AND H/W Requirements

Hadoop useally runs on opensource os’s (like linux, ubantu etc) o Centos/RHEl is mostly used in production

If we have Windows it require virtualization s/w for running other os on windows o Vm player/Vm workstation/Virtual box

Java is prerequisite for hadoop installation

 HDFS

hdfs tutorial

There is no memory wastage in HDFS for example for storing 100MB file, HDFS will take 2 blocks one is 64 MB and another is 36 MB.For storing 64 it takes how many os blocks are required it will take that many and for 36 mb how many os level blocks required it will take that
many.
Each block is replicated three (configurable) times. Replicas are stored on different nodes this
ensures both reliability and availability.
It provides redundant storage for massive amount of data using cheap and unreliable computers.
Files in HDFS are ‘write Once’ .

Different blocks from same file will be stored on different machines. This provides for efficient MapReduce processing. There are five daemons in HDFS

  • NameNode
  • SecondaryNameNode
  • JobTracker
  • DataNode
  • TaskTracker

NameNode:

NameNode keeps track of name of each file and its permissions and its blocks which make up a file and where those blocks are located. These details of data are known as Metadata.
Example:

NameNode holds metadata for the two files (Foo.txt, Bar.txt)

NameNode:

hdfs tutorial

NameNode default port no is 8020 and web ui address is 50070

HDFS Default location is user/<username>

DataNodes:

hdfs tutorial

The NameNode Daemon must be running at all times. If the NameNode gets stopped, the cluster becomes inaccessible.
System administrator will take care to ensure that the NameNode hardware is reliable.

Secondary NameNode:

A separate daemon known as SecondaryNameNode takes care of some housekeeping task for the NameNode. But this SecondaryNameNode is not a backup NameNode. But it is a backup for meta data of NameNode.

Although files are splitted into 64MB or 128MB blocks, if a file is smaller than this the full 64MB or 128MB will not be used.
Blocks are stored as standard files on DataNodes, in a set of directories specified in Hadoop
configuration file.

Without metadata on the NameNode, there is no way to access the files in the HDFS. I When client application wants to read a file,

It communicates with the NameNode to determine which blocks makeup the file and which DataNodes those blocks reside on.
It then communicates directly with the DataNodes to read the data.

Accessing HDFS:

Application can read and write HDFS files directly via the Java API.
Typically, files are created on the local file system and must be moved into HDFS.
like wise files stored in HDFS may need to be moved to a machine’s local filesystem.
Access to HDFS from the command line is achieved with the “hadoop fs” command.

Local File System commands:

pwd :Present working directory.

Is : To list files and folders of the directory.

Is -I : to long list all files and directories
If any thing is starting with ‘d’ -it’s a directory

If any thing is starting with ‘-‘ -it’s s file

Is -a : to list all hidden files and directories

If any thing is starting with ‘. ‘ -it’s a hidden file.

mkdir : To create a directory

syn: mkdir <directoryname>

Cd : To change the directory

cd.. : To go to parent directory

cd “‘: To go to home directory

cd / :To go root directory

creating files:

files can be created in three ways,
1. cat
2. vi
3. touch

Cat:

cat can be used in three ways
1.for creating file
2.for appending text
3.for displaying file content

$ cat>filename – to create a file
After creating file we have to save and exit. To save and exit the file we have to use
“ctrl+d” . If we want to exit the file without saving we have to use “ctrl+c”.

$ cat» filename – to add content to the existing file
Scat filename – to view the content of the file

Vi:

Vi editor can be used in three ways
1.for creating the file
2.for appending text or for modifying the file
3.for displaying the file content

Vi editor is working with three modes.

1. Esc mode (press ‘ESC’)
2.Insert mode (press ‘i’)
3.Colon mode (press ‘shift+;’)

By default “vi” editor will be in ‘esc mode’ . If we want to insert text to the file we have to enter into insert mode by pressing ‘i’ from keyboard. After giving text if we want save and exit the file we must enter into colon mode. But there is no way of moving from insert mode to colon mode} it must be through “Esc” mode only.
The following dig will be brief idea about using the “vi” editor.

h2

Touch:

This command can be used in only one way, for creating a file. After creating the file
we can give text in three ways
1.either by using cat»
2.by using vi editor
3. by using text editor ( manual creation through text editor)

rm : To remove the file
rm -r : for recursively removing all files and sub directories from a specified directory.
rm -rf : for recursively and forcibly removing files and sub directories from a specified directory.
mv : To move file/directory from one location to another lcication
cp : To copy the file/directory from one location to another location

Hadoop distributed filesystem commands

– To copy file foo.txt from local file system to the HDFS we can use either “·put” Or “·copyFromLocal” command.
$ hadoop fs -copyFromLocal foo.txt foo

– To get file from HDFS to local file system we can use either II-get” or “-copyToLocal” ~
$ hadoop fs -copyToLocal foo/foo.txt file

– To get a directory listing in the user’s home directory in the HDFS

$ hadoop fs -Is /

– To get a directory listing of the HDFS root directory.
$ hadoop fs -Is /

– To get the contents of the file in HDFS /user/trainingjfoo.txt $ hadoop fs -cat /user/training/foo.txt

– To Create a directory in HDFS.
$ hadoopfs -mkdir <dii’ectoryname>

– To delete the directory from HDFS
$ hadoop fs -rmr <directory path>

–  To remove file from HDFS
$ hadoop fs -rm <file path>

– To copy files from one location to another location

$ hadoop fs -cp <src><dest>

– To move files from one location to another location
$ hadoop fs -my <src><dest>

– To find out disk free space
$ hadoop fs -df

– To find out disk usage
$ hadoop fs -du <src>

– To remove data from Trash
$ hadoop fs -expunz

– To create empty file
$ hadoop fs -touchz

HDFS Daemons:

hdfs tutorial

Hadoop Cluster

h4

h5

– Client consults Name Node
– Client writes block directly to one Data Node
– Data Nodes replicates block
– Cycle repeats for next block

 

h6

Pipelined Write

h7

Data Node reading files from HDFS

h8

MapReduce

MapReduce is a massive parallel processing technique for processing data which is distributed on a commodity cluster. MapReduce is a method for distributing a task across multiple nodes.
Each node is processing data, stored on local to those nodes whenever possible.

Hadoop can run MapReduce programs written in various languages.Generally we are writing the mapreduce programs in java because whole hadoop has developed in java. The other languages which support hadoop like c++, ruby on rails, python etc are called as hadoop streaming.
Map Reduce consists of two phases.

  1. Map    2. Reduce

Features of MapReduce:

  • Automatic parallelization and distribution.
  • Fault tolerance, Status and monitoring the tools
  • It is clear abstraction for the programmers.
  • MapReduce programs are usually written in java but it can be written in any other scripting language using Hadoop Streaming API.
  • MapReduce abstracts all the “housekeeping” away from the developer. Developer can
    concentrate simply on writing the Map and Reduce functions.
  • MapReduce default port number is 8021 and web ui is 50030

JobTracker

h9

MapReduce jobs are controlled by a software daemon known as the JobTracker.

The JobTracker resides on a ‘masternode’.

– Client submits MapReduce jobs to the JobTracker.
– The JobTracker assigns Map and Reduce tasks to other nodes on the cluster.
– These nodes each run a software daemon known as the TaskTracker.
– The TaskTracker is resoponsible for actual! instantiating the Map or Reduce tasks and reporting progress back to the JobTracker.
– If a task attempt fails, another will be started by the jobTracker.

hh

Mapper:

Hadoop attempts to ensure that mappers run on nodes which hold their portion ofthe data locally, to avoid network traffic.

One mapper can work on one individual split of the input file.

The mapper reads data in the form of (key, value) pairs.

It outputs either zero or more (key, value) pairs.

Map (input_key, input_value) ———–>(intermediate_key,intermediate_value) list

The Mapper may use or completely ignore the input key.

For example if a standard pattern is to read a line of a line starts.

The value is the contents of the line itself.

S0 typically the key is considered irrelevant.

If the mapper writes anything out, the output must be in the form (key, value) pairs.
Mappers output is called as intermediate data and this will be stored on local file system not on HDFS.

Example Mapper:Uppercase Mapper

– Change the output into uppercase letters

let map(k,v) = emit(k.toUpperO,v.toUpper())
(‘foo’,’ ba r) —————>( ‘FOO’,’ BAR’)
(‘foo’,’ other’)————–> (‘FOO’ ,’OTHER’)

Example Mapper:Explode Mapper

– Output each input character separately.

let map(k,v) =foreach character in v : emit(k,v)

(‘foo’,’bar) ————> (‘foo’ ,’b’), (‘foo’, ‘a’) , (‘foo’, ‘r)

(‘bar’,’other) ———–> (‘bar/a’), (‘bar,’t’), (‘bar’/h’), (‘bar’,’e’), (‘bar’,’r’)

Example Mapper:Changing keyspaces

The key output by the mapper does not need to be the identical to the input key.
Output the word length as the key

let map(k,v) = emit (v.lengthO , v)
(‘fod,’bar’) ———-> (3, !bar’)
(‘bal’, ‘other’) ———> (5, ‘other’)

A job is a ‘full program’

– A complete execution of Mappers and Reducers over a dataset
– A task is the execution of a single Mapper or Reducer over a slice of data
– A task attempt is a particular instance ofan attempt to execu.!e a task
– There will be at least as many task attempts as there are tasks
– If a task attempt fails, another will be started by the JobTracker is called as Speculative
execution.

Reducer:

After the Map phase is over, all the intermediate values for a given intermediate key are combined together into a list.This list is given to a Reducer.

There may be a single Reducer, or multiple Reducers. This is specified as part of the job configuration.

All values associated with a particular intermediate key are guaranteed to go to the same Reducer.
The intermediate keys, and their value lists, are passed to the Reducer in sorted key order .This step isknown as the ‘shuffle and sort.

The Reducer outputs zero or more final (key, value) pairs and these will be written to HDFS.
The Reducer usually emits a single (key, value) pair for each input key.
The reducer output is final and it will be stored in HDFS and after that intermediate data will be deleted.

Data Processing: Reduce

h1

Example Reducer: Sum Reducer

Add up all the values associated with each intermediate key

h2

Example Reducer: Identity Reducer

h3

MapReduce Example: Word Count

Count the number of occurrences of each word in a large amount of input data
-This is the ‘hello world’ of MapReduce programming

h4

h5

h6

h7

h8

MapReduce Flow Diagram:

h9

MapReduce: Data Localization

– Whenever possible, Hadoop will attempt to ensure that a Map task on a node is working 9n a block of data stored locally on that node via HDFS~ • If this is not possible, the Map task will have to transfer the data across the network as it processes that data.

– Once the Map tasks have finished, data is then transferred across the network to the Reducers Although the Reducers may run on the same physical machines as the Map tasks. there is no concept of data locality for the Reducers

– All Mappers will, in general, have to communicate with all Reducers

MapReduce Internal Concept:

– Client submits the job to job tracker.

– JobTracker contact namennode for the metadata related to the job i/p file or dir.

– Based on the information getting from the namenode JobTracker divides the file into II, logical splits called input splits, each input split is processed by one map task.

– Input split size is based on the various parameters like if job i/p is in the form of files it
will be based on the size of file and where the file stored.

– Once map tasks are assigned to task trackers (each task tracker has fixed no of map and
reduce slots to prform the tasks given by JobTracker), task tracker intiates a separate
jvm for each task.

– Based on InputFormat, its respective RecordReader class will divide the split into no of
records of each record is a one (k, v) pair.

– The RecordReader will give record by record to the Map task after processing each
record. Based on the map logic written we may get zero or more (k, v) pairs for each i/p (k, v)
pair to the map task.

– After all map tasks are completed the intermediate data{or intermediate o/p stored
temporarly in local fs) will be copied to reduce task.

– As hadoop tries to process the data where ever it is stored there is data local concept
for maptasks ,but reduce task need to copy the data from all the map tasks ,there is no
data local concept for reduce task. Default no of reduce tasks in hadoop is one, we can make it as zero if there is no requirement of reduce phase or we can set variable no of reducers based on the requirement.

– Between map and reduce phase there is a default sort and shuffle phase,so the data to
the reducer will be in sorting order and shuffle means hadoop framework will send all
common keys as a single (k, list of vales associated with same key).

– Once reduce task is completed intermediate data will be deleted, and final o/p from reducer will be stored in HDFS.

– Based on no of reducere, no of output files will be genereated in the output directory.

At TekSlate, we offer resources that help you in learning various IT courses. 
We avail both written material and demo video tutorials. 
To gain in-depth knowledge and be on par with  practical experience, then explore Hadoop Training Videos.

What if data isn’t local?

h

MapReduce: Is Shuffle and Sort a Bottleneck?

– It appears that the shuffle and sort phase is a bottleneck
– The reduce method in the Reducers cannot start until all Mappers finishes their task.
– In practice, Hadoop will start to transfer data from Mappers to Reducers as the Mappers finish their work. This mitigates against a huge amount of data transfer starting as soon as the last Mapper finishes

MapReduce: Is a Slow Mapper a Bottleneck?

– It is possible for one Map task to run more slowly than the othersdue to faulty hardware, or just a very slow machine. It would appear that this would create a bottleneck.

– The reduce method in the Reducer cannot start until every Mapper has finished

– Hadoop uses speculative executionto mitigate against this

– If a Mapper appears to be running significantly more slowly than the others, a new instance of the Mapper will be started on another machine, operating on the same data.

– The results of the first Mapper to finish will be used. Hadoop will kill the Mapper which is still running.

HADOOP Configuration files: There are three important configurations files in HADOOP
1.core-site.xml H,
2.hdfs-site.xml
3.mapred-site.xml
Hadoop-env.sh file is for placing path and classpaths of java, hadoop etc components.

HADOOP DataTypes:

In hadoop all datatypes are hadoop classes; those are called as HADOOP BOX classes

IntWritable
LongWritable
DoubleWrita ble
Text etc…

HADOOP Input Formats: 

Based on how the data stored in file we have several input formats available in HADOOP
If input data is in the form of files FilelnputFormat is the base class, and based on the how data
stored in file we will choose the InputFormat

1.TextlnputFormat

2.KeyValueTextlnputFormat

3.SequenceFilelnputFormat

4.SequenceFileAslnputFormat

Installing a Hadoop Cluster

Cluster installation is usually performed by the hadoop administrator, and is outside the
scope of this course.

Hadoop Administrators specifically aimed at those responsible for maintaining Hadoop
clusters.

Three types of cluster configurations available

Local Job Runner(no HDFS~MapReduce only local file system and java)
Pseudo Distributed{AIi five deamons running in single machine)
Fully Distributed{separation of machines for different deamons)
– However, it’s very useful to understand how the component parts of the Hadoop cluster
work together
– Typically, a developer will configure their machine to run in pseudo-distributed mode
-This effectively creates a single-machine cluster

All five Hadoop daemons are running on the same machine -Very useful for testing code before it is deployed to the real cluster

– Easiest way to download and install Hadoop, either fora-full cluster or in pseudodistributed mode, is by using Cloudera’s Distribution including Apache Hadoop (CDH)

Hadoop is comprised of five separate daemons
NameNode

– Holds the metadata for HDFS

Secondary NameNode

– Performs housekeeping functions for the NameNode

– Is not a backup or hot standby for the NameNode!

DataNode

– Stores actual HDFS data blocks

JobTracker

– Manages MapReduce jobs, distributes individual tasks to machines running the…

TaskTracker

– Instantiates and monitors individual Map and Reduce tasks

Each daemon runs in its own Java Virtual Machine (JVM)
No node on a real cluster will run all five daemons
Although this is technically possible
We can consider nodes to be in two different categories:

Master Nodes
– Run the NameNode, Secondary NameNode, JobTracker daemons
– Only one of each of these daemons runs on the cluster

Slave Nodes
– Run the DataNode and TaskTracker daemons
– A slave node will run both of these daemons

Basic Cluster Configuration:

 

m

– On very small clusters, the NameNode, JobTracker and Secondary NameNode can all
reside on a single machine

– It is typical to put them on separate machines as the cluster grows beyond 20-30 nodes

– Each dotted box in the above diagram represents a separate Java Virtual Machine (JVM)

Submitting a Job:

– When a client submits a job, its configuration information is packaged into an XMl file.
– This file, along with the .jar file containing the actual program code, is handed to the
JobTracker.
– The JobTracker then parcels out individual tasks to TaskTracker nodes.
– When a TaskTracker receives a request to run a task, it instantiates a separate JVM for
that task.
– TaskTracker nodes can be configured to run multiple tasks at the same time if the node
has enough processing power and memory .
– The intermediate data is held on the TaskTracker’s local disk.
– As Reducers start up, the intermediate data is distributed across the network to the
Reducers.
– Reducers write their final output to HDFS.
– Once the job has completed, the TaskTracker can delete the intermediate data from its
local disk.

Note: The intermediate data will not be deleted until entire job completes.

A Sample MapReduce Program
Job Name: Wordcount

Job Description: Counting the no of occurrences of each word in a input(input may be a file or I list of files or a directory or list of dirs etc)

– we will examine the code for WordCount.This will demonstrate the Hadoop API
– We will also investigate Hadoop Streaming,allow you to write MapReduce programs in (virtually) any language.
– You will always create at least a Driver, Mapper, and Reducer

The MapReduce Flow:

m1

m2

 

– Here hdfs file will be splitted into number of input splits. For each input split there will be given one mapper.

– How Map interface is dealing with (key,value) pair in Collection Framework, in the same way ,hadoop is also         dealing mapper and reducer with (key,value) pairs.

– So each mapper should get (key, value) pair from its corresponding input split.

– Here input is file but map (key, value) accepts only the data in the form of (key, value)
pair. So to present the data in the split in the (key, value) pairs we have for each input
there is an InputForamt and based on the InputFormat for each InputFormat ther is a
RecordReader. This will divide split into no of records of each record is a one (key,
value) pair.The RecordReader will submit one record at a time to mapper once mapper
process that record it will submit another next (key, value) pair (next record).

– Depending on the logic what we are writing in the Mapper code, this mapper input (key,value) pair will be processed and will be given as mapper output (key,value) pair.

– This mapper output is called as an intermediate data.

– This intermediate data will be shuffled and sorted and will be given as an input (key, , value) pair to the reducer.

Shuffle and Sort:

Shuffling is a phase on intermediate data, to group individual objects into single entity associated to single identical key.

Sorting is also a phase on intermediate data to decide its order depending on the key in (key, value) pair.

Our MapReduce Program: WordCount
To investigate the API, we will dissect the WordCount program as we have seen in the page no: This consists of three portions

The driver code Code that runs on the client to configure and submit the job

The Mapper code
Code that runs Mapper code

The Red ucer code
Code that runs Reducer code

Driver Code: In drivercode we need to specify the following things
– Input for the job(lnputFormat and HDFS location of input)
– Where to store final olp
– JobConf class for specifiying job specific configurations
– Mapper class name
– Map olp k,v type
– Reducer class name
– Reduce olp key value types
– JobClient to submit job

Before we look at the code, we need to cover some basic HadoopAPI concepts.

Getting Data to the Mapper:
The data passed to the Mapper is specified by anInputFormat.
– Specified in the driver code
– Defines the location of the input data,a file or directory, for example.
– Determines how to split the input data into input splits
– Each Mapper deals with a single input split
– InputFormat is a factory for RecordReader objects to extract (key, value) pairs from the :r input file.

Some Standard InputFormats

Filelnputformat
– This is the base class used for all file-based InputFormats

TextlnputFormat
– This is the default InputFormat class for any text related file.
– Key is the byte offset within the file of that line
– Treats each \n-terminated line of a file as a value

KeyValueTextlnputFormat
– Maps \n-terminated lines as ‘key SEP value’
– By default, separator is a tab

SequenceFilelnputFormat
– Binary file of (key, value) pairs with some additional meta data

SequenceFileAsTextlnputFormat
– Similar to SequenceFilelnputFormat, but maps {key.toString(}, value.toString(}}

Keys and Values are Objects
– Keys and values in Hadoop are Objects
– Values are objects which implements Writable interface
– Keys are objects which implements WritableComparable interface

In Java, for every primitive data type we have corresponding Wrapper classes. For example,
int ——–> Integer
float ——–> Float
double ——–> Double

in the same way we have “box classes” for all data types in hadoop
Hadoop defines its own ‘box classes’ for all ,data types as shown below -IntWritable for int Ste
-longWritable for long
-FloatWritable for float
-DoubleWritable for double -Text for string -Etc.
What is Writable?

– Writable is an interface which makes serialization quick and easy for Hadoop.
– Any value!s type must implement Writable interface.

What is WritableComparable?
– A WritableComaparable is an interface, which is Writable as well as Comparable.
– Two WritableComparables can be compared against each other to determine their order.
– Keys must be WritableComparables because they are passed to the Reducer in sorting order.

Note:
Despite their names, all Hadoop box classes implement both Writable and
WritableComparable interfaces. For example Intwritable is actually a WritableComparable.

Hello World Job

WordCountJob
Step 1: Prepare some input for the wordcount job

Create a file
Scat >file

h

Step 2:
Put this file to Hdfs

$hadoop fs -copyFromlocal file File

Step 3:
Write an mpreduce application programs to process the above file

The Driver Code:

Introduction:
– The driver code runs on the client machine
– It configures the job, then submits it to the cluster

import org.apache.hadoop.fs.Path;

import o”rg.apache.hadoop.i0.lntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FilelnputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapredJobConf;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.utiI.Tool;

import org.apache.hadoop.utiI.TooIRunner;

public class WordCount extends Configured implements Tool {public int run(String[] args) throws Exception {
 if (args.length != 2) {
 System.out.printf("please give input and output directories");
 return -1;
 }
 / /JobConf class is used for job specific configurations.
JobConf conf =new JobConf(WordCount.class);
 //to set the name for the job
 conf.setJobName(this.getClass().getNameO);
//to set intput path
 FilelnputFormat.setlnputPaths(conf, new Path(args[O]));
 //to set output path
 FileOutputFormat.setOutputPath(conf, new Path(args[1] ));
//to set Mapper class
 conf.setMapperClass(WordMapper.class);
 //to set Reducer class
 conf.setReducerClass(SumRedu,~er.class);
//to set Mapper output key type
 conf.setMapOutputKeyClass(Text.class);
 //to set Mapper output value type
 conf.setMapOutputValueClass(lntWritable.class);
 //to set output key from reducer
 conf.setOutputKeyClass(Text.class);
 //to set output value from reducer
 conf.setOutputValueClass!lntWritable.class);
/ /JobClient class for submitting job to Job Tracker
 JobClient.runJob( conf);
 return 0;
 }
public static voiq main(String[] args) throws Exception {
//ToolRunner class will take the implementation class of Tool Interface to read command line argument along with the input and output directories.
 int exitCode =TooIRunner.run(new WordCountO, args);
 System.exit( exitCode);
 }
 }

Explaination about Driver Code:

The Driver: Import Statements

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.lntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FilelnputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapredJobClient;

import org.apache.hadoop.mapredJobConf; import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.utiI.Tool;

import org.apache.hadoop.util.TooIRunner;

h1

h2

The Job’s Invocation

public int run(String[J args) throws Exception {
jf (args.length != 2) {
System.out.printf("please give input and output directories");
return -1;

h3

Creating a New JobConf Object

The JobConf class allows you to set configuration options for your MapReduce job
– The classes to be used for your Mapper and Reducer
– The input and output directories
– Many other options
Any options not explicitly set in your driver code will be read from your Hadoop configuration files
– Usually located in /etc/hadoop/conf
Any options not specified in your configuration files will receive Hadoop’s default values

Naming The Job

h4

Specifying the InputFormat

The default InputFormat is the TextlnputFormat and it will be used unless you specify the InputFormat class explicitly.

To use an InputFormat other than the default, use e.g.

conf.setinputFormat(KeyValueTextlnputFormat.class)

or

conf.setlnputFormat(SequenceFilelnputFormat.class)

or

conf.setl nputFormat(SequenceFileAsTextlnputFormat.class)

Determining Which Files To Read

By default, FilelnputFormat.setlnputPaths () will read all files from a specified directory and send them to Mappers
-Exceptions: items whose names begin with a period (.) or underscore L)
-Globs can be specified to restrict input
-For example, /2010/*/01/*
Alternatively, FilelnputFormat.addlnputPath () can be called multiple times, specifying a
single file or directory each time

Specifying Final Output with OutputFormat

FileOutputFormat.setOutputPath () specifies the directory to which the Reducers will write their final output

The driver can also specify the format of the output data
-Default is a plain text file

-Could be explicitly written as

conf.setOutputFormat (TextOutputFormat.class);

Specify the Classes for Mapper and Reducer

conf.setMapperClass (WordMapper.class);

conf.setReducerClass (WordReducer.class);

h5

Specify the Final Output Data Types

conf.setOutputKeyClass (Text.class);

conf.setOutputValueClass (lntWritable.class);

h6

There are two ways to run your MapReduce job:

-JobClient.runJob (conf)

Blocks (waits for the job to complete before continuing)

-JobClient.submitJob (conf)

Does not block (driver code continues as the job is running)

JobClient determines the proper division InputSplits of input data into.

JobClient then sends the job information to the JobTracker daemon on the cluster.

We didn’t specify any InputFormat so default is TextlnputFormat.

We didn’t specify any no of Reducers so default no of reducers is one. Based of no of reducers

we will get no of o/p file.

We didn’t specify any OutputFormat so default is KeyValueTextOutputFormat.

Default mapper is IdentityMaper
Default Reducer is IdentityReducer
Default Partitoner is HAshPartitioner etc ..

Note: Revise DriverCode once again so that you can understand it clearly.

The Mapper Code:

import org.apache.hadoopJo.lntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoopJo.Text;

import org.a pache.hadoop.mapred.MapReduceBase;

im port org.apache.h adoop.ma pred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

import java.io.lOException;

public class WordMapper extends MapReduceBase implements Mapper<LongWritable, Text,
 Text, IntWritable> {
 //implementing the map() method of Mapper interface
public void map(LongWritable key, Text value,OutputColiector<Text, IntWritable>
 output, Reporter reporter)throws IOException {
//converting key from Text type to String type
String s =value.toString();
for (String word: s.split("\\W+"l) {
 if (word.length() > 0) {
output.collect(new Text(word), new IntWritable(l));
}
}
}

The Reducer: Complete Code
import org.apache.hadoop.io.lntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputColiector;
import org.apache.hadoop.mapred.MapReduce8ase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import java.io.lOException;
import java.utii.Jterator;

public class SumReducer extends MapReduce8ase implements Reducer<Text, IntWritable,
 Text, IntWritable> {
//implementing the reduce() method of Reducer
public void reduce(Text key, Iterator<lntWritable> values,OutputColJector<Text,
IntWritable> output, Reporter reporter)throws IOException {
int wordCount = 0;
/ /retrieving the values from Iterator
while (values.hasNext()) {
IntWritable value = values.next();
//converting value from IntWritable type to int type
wordCount += value.get();
}
output.collect(key, new IntWritable(wordCount));
}
}

Step 4: Compile above programs $ javac -classpath $HADOOP_HOME/hadoop-core.jar *.java
Step 5: Create a jar file for all .class files generated in the above step $ jar wc.jar *.class

Step 6: Run the above create jar file on the file which is in stored hdfs $ hadoop jar wc.jar WordCount file File
Step 7: To see the content in the output file $ hadoop fs -cat File/part-OOOOO

h7

Example Flow of Wordcount program:

hh

What Is The New API?
When Hadoop 0.20 was released, a ‘New API’ was introduced
– It is designed to make the API easier to evolve in the future
– It favors abstract classes over interfaces

The ‘Old API’ was deprecated

However, the New API is still not absolutely feature-complete in Hadoop 0.20.x
The Old API should not have been deprecated as quickly as it was

Most developers still use the Old API

All the code examples in this course use the Old API as well as New API.

Difference between old API and new API

hh1

hh3

New API For Word Count
Driver Code:

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.lntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FilelnputFormat;

import org.apache. hadoop.ma pred uce.1ib.output. FileOutputForm at;

public class NewWordCount extends Configured{
public static void main(String[] args) throws Exception {

if (args.length != 2) {

System.out.printf(“plz give input and output directories”);

System.exit(-l);
}
Job job = new Job();
job.setJarByClass(NewWordCount.class);

FilelnputFormat.setlnputPaths(job, new Path(args[O]));
FileOutputFormat.setOutputPath(job, new Path(args[l]));

job.setMapperClass(NewWordMapper.class);
job.setReducerClass(NewWordReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(lntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(lntWritable.class);

System.exit(job. waitForCom pletion(true )?O:-l);

}

}

Mapper Code:
import java.io.IOException;

import org.apache.hadoop.io.lntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Mapper.Context;

public class NewWordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override

public void map(LongWritable key, Text value,Context context)
throws IOException,lnterruptedException {
String s = value.toStringO;
for (String word: s.Split(” “)) {
if (word.lengthO > 0) {

contextwrite{new Text(word), new IntWritable(l));
} } } }

Reducer Code:

import java.io.IOException;
import org.apache.hadoop.io.lntWritable;

import org.apache.hadoopjo.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Reducer.Context;

public class NewWordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public v’oid reduce(Text key, Iterable<lntWritable> values, Context context)throws
IOException, I nter ru pted Excepti on {
int count = 0;
while(values.iterator().hasNext()) {
IntWritable i= values.iterator()·next();

count+=i.get();
}
context. write{key, new IntWritable(count));
}
}

The Streaming API: Motivation

Many organizations have developers skilled in languages other than Java, such as

-Ruby
-Python
-Perl

The Streaming API allows developers to use any language they wish to write Mappers and Reducers
-As long as the language can read from standard input and write to standard output.

The Streaming API:

Advantages Advantages of the Streaming API:
-No need for non-Java coders to learn Java
-Fast development time
-Ability to use existing code libraries

How Streaming Works

To implement streaming, write separate Mapper and Reducer programs in the language of your choice
-They will receive input via stdin
-They should write their output to stdout
If TextlnputFormat (the default) is used, the streaming Mapper just receives each line from the file on stdin
-No key is passed

Streaming Mapper and streaming Reducer’s output should be sent to stdout as key (tab)
value (newline)

hh4

Streaming Reducers: Caution

Recall that in Java, all the values associated with a key are passed to the Reducer as an Iterator

Using Hadoop Streaming, the Reducer receives its input as (key,value) pairs
-One per line of standard input

Your code will have to keep track of the key so that it can detect when values from a new key
start appearing.

Launching a Streaming Job
To launch a Streaming job, use e.g.,:

hh5

Many other command·line options are available
-See the documentation for full details
Note that system commands can be used as a Streaming Mapper or Reducer
-For example: awk, grep, sed, or wc

 

Summary
Review Date
Reviewed Item
Step by Step Hadoop Tutorials
Author Rating
51star1star1star1star1star

Contact US

Need an Instructor Led Training, Contact Us





Support


Please leave a message and we'll get back to you soon.

3 + 5