Texera / texera

Collaborative Machine-Learning-Centric Data Analytics Using Workflows
https://texera.github.io
Apache License 2.0
162 stars 73 forks source link

hadoop evalution #632

Closed AbyssV closed 5 years ago

AbyssV commented 6 years ago

using apache hadoop to store and compute data in distributed system

AbyssV commented 6 years ago

An Overview of Apache Hadoop 2.7

Apache Hadoop is a collection of open-source software utilities that facilitate using a network of many computers to solve problems involving massive amounts of data and computation. It provides a software framework for distributed storage and processing of big data using the MapReduce programming model (wikipedia).

Note: We are using Hadoop 2.7.6 in both local machine and AWS ubuntu14.04 clusters (three instances, including one NameNode and three DataNodes). The latest version is Hadoop 3.0.3.

Beyond Batch

For all its strengths, MapReduce is fundamentally a batch processing system, and is not suitable for interactive analysis. You can’t run a query and get results back in a few seconds or less. Queries typically take minutes or more, so it’s best for offline use, where there isn’t a human sitting in the processing loop waiting for results. However, since its original incarnation, Hadoop has evolved beyond batch processing. Indeed, the term “Hadoop” is sometimes used to refer to a larger ecosystem of projects, not just HDFS and MapReduce, that fall under the umbrella of infrastructure for distributed computing and large-scale data processing. Many of these are hosted by the Apache Software Foundation, which provides support for a community of open source software projects, including the original HTTP Server from which it gets its name. The first component to provide online access was HBase, a key-value store that uses HDFS for its underlying storage. HBase provides both online read/write access of individual rows and batch operations for reading and writing data in bulk, making it a good solution for building applications on. The real enabler for new processing models in Hadoop was the introduction of YARN (which stands for Yet Another Resource Negotiator) in Hadoop 2. YARN is a cluster resource management system, which allows any distributed program (not just MapReduce) to run on data in a Hadoop cluster.

Comparison with Other Systems

Hadoop isn’t the first distributed system for data storage and analysis, but it has some unique properties that set it apart from other systems that may seem similar. Here we look at some of them

Relational Database Management Systems

If the data access pattern is dominated by seeks, it will take longer to read or write large portions of the dataset than streaming through it, which operates at the transfer rate. On the other hand, for updating a small proportion of records in a database, a traditional B Tree (the data structure used in relational databases, which is limited by the rate at which it can perform seeks) works well. For updating the majority of a database, a B-Tree is less efficient than MapReduce, which uses Sort/Merge to rebuild the database. In many ways, MapReduce can be seen as a complement to a Relational Database Management System (RDBMS). (The differences between the two systems are shown in Table 1-1.) MapReduce is a good fit for problems that need to analyze the whole dataset in a batch fashion, particularly for ad hoc analysis. An RDBMS is good for point queries or updates, where the dataset has been indexed to deliver low-latency retrieval and update times of a relatively small amount of data. MapReduce suits applications where the data is written once and read many times, whereas a relational database is good for datasets that are continually updated. 1

However, the differences between relational databases and Hadoop systems are blurring. Relational databases have started incorporating some of the ideas from Hadoop, and from the other direction, Hadoop systems such as Hive are becoming more interactive (by moving away from MapReduce) and adding features like indexes and transactions that make them look more and more like traditional RDBMSs. Another difference between Hadoop and an RDBMS is the amount of structure in the datasets on which they operate. Structured data is organized into entities that have a defined format, such as XML documents or database tables that conform to a particular predefined schema. This is the realm of the RDBMS. Semi-structured data, on the other hand, is looser, and though there may be a schema, it is often ignored, so it may be used only as a guide to the structure of the data: for example, a spreadsheet, in which the structure is the grid of cells, although the cells themselves may hold any form of data. Unstructured data does not have any particular internal structure: for example, plain text or image data. Hadoop works well on unstructured or semi-structured data because it is designed to interpret the data at processing time (so called schema-on-read). This provides flexibility and avoids the costly data loading phase of an RDBMS, since in Hadoop it is just a file copy. Relational data is often normalized to retain its integrity and remove redundancy. Normalization poses problems for Hadoop processing because it makes reading a record a nonlocal operation, and one of the central assumptions that Hadoop makes is that it is possible to perform (high-speed) streaming reads and writes. A web server log is a good example of a set of records that is not normalized (for example, the client hostnames are specified in full each time, even though the same client may appear many times), and this is one reason that logfiles of all kinds are particularly well suited to analysis with Hadoop. Note that Hadoop can perform joins; it’s just that they are not used as much as in the relational world. MapReduce — and the other processing models in Hadoop — scales linearly with the size of the data. Data is partitioned, and the functional primitives (like map and reduce) can work in parallel on separate partitions. This means that if you double the size of the input data, a job will run twice as slowly. But if you also double the size of the cluster, a job will run as fast as the original one. This is not generally true of SQL queries.

A Brief History of Apache Hadoop

Hadoop was created by Doug Cutting, the creator of Apache Lucene, the widely used text search library. Hadoop has its origins in Apache Nutch, an open source web search engine, itself a part of the Lucene project. Building a web search engine from scratch was an ambitious goal, for not only is the software required to crawl and index websites complex to write, but it is also a challenge to run without a dedicated operations team, since there are so many moving parts. It’s expensive, too: Mike Cafarella and Doug Cutting estimated a system supporting a one billion-page index would cost around $500,000 in hardware, with a monthly running cost of $30,000. Nevertheless, they believed it was a worthy goal, as it would open up and ultimately democratize search engine algorithms. Nutch was started in 2002, and a working crawler and search system quickly emerged. However, its creators realized that their architecture wouldn’t scale to the billions of pages on the Web. Help was at hand with the publication of a paper in 2003 that described the architecture of Google’s distributed filesystem, called GFS, which was being used in production at Google. GFS, or something like it, would solve their storage needs for the very large files generated as a part of the web crawl and indexing process. In particular, GFS would free up time being spent on administrative tasks such as managing storage nodes. In 2004, Nutch’s developers set about writing an open source implementation, the Nutch Distributed Filesystem (NDFS). In 2004, Google published the paper that introduced MapReduce to the world. Early in 2005, the Nutch developers had a working MapReduce implementation in Nutch, and by the middle of that year all the major Nutch algorithms had been ported to run using MapReduce and NDFS. NDFS and the MapReduce implementation in Nutch were applicable beyond the realm of search, and in February 2006 they moved out of Nutch to form an independent subproject of Lucene called Hadoop. At around the same time, Doug Cutting joined Yahoo!, which provided a dedicated team and the resources to turn Hadoop into a system that ran at web scale (see the following sidebar). This was demonstrated in February 2008 when Yahoo! announced that its production search index was being generated by a 10,000-core Hadoop cluster. In January 2008, Hadoop was made its own top-level project at Apache, confirming its success and its diverse, active community. By this time, Hadoop was being used by many other companies besides Yahoo!, such as Last.fm, Facebook, and the New York Times. Today, Hadoop is widely used in mainstream enterprises. Hadoop’s role as a general purpose storage and analysis platform for big data has been recognized by the industry, and this fact is reflected in the number of products that use or incorporate Hadoop in some way. Commercial Hadoop support is available from large, established enterprise vendors, including EMC, IBM, Microsoft, and Oracle, as well as from specialist Hadoop companies such as Cloudera, Hortonworks, and MapR.

FYI

Google paper about MapReduce Model: Jeffrey Dean and Sanjay Ghemawat, “MapReduce: Simplified Data Processing on Large Clusters,” December 2004 Apache Hadoop 2.7.2 Documentation

AbyssV commented 6 years ago

An Overview of Hadoop Architecture

MapReduce

Overview

Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner. A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks. Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster. The MapReduce framework consists of a single master ResourceManager, one slave NodeManager per cluster-node, and MRAppMaster per application (see YARN Architecture Guide). Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration. The Hadoop job client then submits the job (jar/executable etc.) and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client. Although the Hadoop framework is implemented in Java™, MapReduce applications need not be written in Java.

1

(A WordCount example with MapReduce function analysis will be presented in later section)

Workflow

WordCount is a simple application that counts the number of occurrences of each word in a given input set. 2

Inputs and Outputs

The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types. The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework. Input and Output types of a MapReduce job: (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

HDFS

Overview

HDFS is the primary distributed storage used by Hadoop applications. A HDFS cluster primarily consists of a NameNode that manages the file system metadata and DataNodes that store the actual data. The HDFS architecture diagram depicts basic interactions among NameNode, the DataNodes, and the clients. Clients contact NameNode for file metadata or file modifications and perform actual file I/O directly with the DataNodes. The following are some of the salient features that could be of interest to many users.

HDFS architecture

The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets. HDFS relaxes a few POSIX requirements to enable streaming access to file system data. HDFS was originally built as infrastructure for the Apache Nutch web search engine project. HDFS is part of the Apache Hadoop Core project. The project URL is [http://hadoop.apache.org]

NameNode and DataNodes

HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.

3

The NameNode and DataNode are pieces of software designed to run on commodity machines. These machines typically run a GNU/Linux operating system (OS). HDFS is built using the Java language; any machine that supports Java can run the NameNode or the DataNode software. Usage of the highly portable Java language means that HDFS can be deployed on a wide range of machines. A typical deployment has a dedicated machine that runs only the NameNode software. Each of the other machines in the cluster runs one instance of the DataNode software. The architecture does not preclude running multiple DataNodes on the same machine but in a real deployment that is rarely the case. The existence of a single NameNode in a cluster greatly simplifies the architecture of the system. The NameNode is the arbitrator and repository for all HDFS metadata. The system is designed in such a way that user data never flows through the NameNode.

The File System Namespace

HDFS supports a traditional hierarchical file organization. A user or an application can create directories and store files inside these directories. The file system namespace hierarchy is similar to most other existing file systems; one can create and remove files, move a file from one directory to another, or rename a file. HDFS does not yet implement user quotas or access permissions. HDFS does not support hard links or soft links. However, the HDFS architecture does not preclude implementing these features. The NameNode maintains the file system namespace. Any change to the file system namespace or its properties is recorded by the NameNode. An application can specify the number of replicas of a file that should be maintained by HDFS. The number of copies of a file is called the replication factor of that file. This information is stored by the NameNode.

Data Replication

HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once and have strictly one writer at any time. The NameNode makes all decisions regarding replication of blocks. It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode.

4

The Persistence of File System Metadata

The HDFS namespace is stored by the NameNode. The NameNode uses a transaction log called the EditLog to persistently record every change that occurs to file system metadata. For example, creating a new file in HDFS causes the NameNode to insert a record into the EditLog indicating this. Similarly, changing the replication factor of a file causes a new record to be inserted into the EditLog. The NameNode uses a file in its local host OS file system to store the EditLog. The entire file system namespace, including the mapping of blocks to files and file system properties, is stored in a file called the FsImage. The FsImage is stored as a file in the NameNode’s local file system too. The NameNode keeps an image of the entire file system namespace and file Blockmap in memory. This key metadata item is designed to be compact, such that a NameNode with 4 GB of RAM is plenty to support a huge number of files and directories. When the NameNode starts up, it reads the FsImage and EditLog from disk, applies all the transactions from the EditLog to the in-memory representation of the FsImage, and flushes out this new version into a new FsImage on disk. It can then truncate the old EditLog because its transactions have been applied to the persistent FsImage. This process is called a checkpoint. In the current implementation, a checkpoint only occurs when the NameNode starts up. Work is in progress to support periodic checkpointing in the near future. The DataNode stores HDFS data in files in its local file system. The DataNode has no knowledge about HDFS files. It stores each block of HDFS data in a separate file in its local file system. The DataNode does not create all files in the same directory. Instead, it uses a heuristic to determine the optimal number of files per directory and creates subdirectories appropriately. It is not optimal to create all local files in the same directory because the local file system might not be able to efficiently support a huge number of files in a single directory. When a DataNode starts up, it scans through its local file system, generates a list of all HDFS data blocks that correspond to each of these local files and sends this report to the NameNode: this is the Blockreport.

HDFS Commands

Check Apache Hadoop 2.7.2 documentation for more HDFS commands: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html

YARN

Overview

The new architecture introduced in hadoop-0.23, divides the two major functions of the JobTracker: resource management and job life-cycle management into separate components. The new ResourceManager manages the global assignment of compute resources to applications and the per-application ApplicationMaster manages the application’s scheduling and coordination. An application is either a single job in the sense of classic MapReduce jobs or a DAG of such jobs. The ResourceManager and per-machine NodeManager daemon, which manages the user processes on that machine, form the computation fabric. The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

YARN architecture

The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job or a DAG of jobs. The ResourceManager and the NodeManager form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler. The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

5

The ResourceManager has two main components: Scheduler and ApplicationsManager. The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc. The Scheduler has a pluggable policy which is responsible for partitioning the cluster resources among the various queues, applications etc. The current schedulers such as the CapacityScheduler and the FairScheduler would be some examples of plug-ins. The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure. The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress. MapReduce in hadoop-2.x maintains API compatibility with previous stable release (hadoop-1.x). This means that all MapReduce jobs should still run unchanged on top of YARN with just a recompile.

AbyssV commented 6 years ago

Installing Hadoop 2.7.6

Hadoop can be run in one of three modes:

Installing Hadoop Pseudodistributed mode on local machine

The instructions that follow are suitable for Unix-based systems, including Mac OS X (which is not a production platform, but is fine for development).

Check your Java version

The following command confirms that Java was installed correctly: java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

Download Hadoop

wget http://apache.mirrors.tds.net/hadoop/common/hadoop-2.x.x/hadoop-2.x.x.tar.gz -P ~/Downloads/Hadoop Check that Hadoop runs by typing: hadoop version

Configure SSH

In pseudodistributed mode, we have to start daemons, and to do that using the supplied scripts we need to have SSH installed. Hadoop doesn’t actually distinguish between pseudodistributed and fully distributed modes; it merely starts daemons on the set of hosts in the cluster (defined by the slaves file) by SSHing to each host and starting a daemon process. Pseudodistributed mode is just a special case of fully distributed mode in which the (single) host is localhost, so we need to make sure that we can SSH to localhost and log in without having to enter a password. First, make sure that SSH is installed and a server is running. On Ubuntu, for example, this is achieved with: sudo apt-get install ssh Then, to enable passwordless login, generate a new SSH key with an empty passphrase: ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys You may also need to run ssh-add if you are running ssh-agent. Test that you can connect with: ssh localhost If successful, you should not have to type in a password.

Set up environment variable

vi .bash_profile

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_112.jdk/Contents/Home
export JRE_HOME=$JAVA_HOME/jre
export HADOOP_HOME=/Users/hadoop-2.7.6
export HADOOP_HOME_WARN_SUPPRESS=1

export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$HADOOP_HOME/bin:$PATH

source ~/.bash_profile

Modify configuration files

enter your hadoop configuration directory /Users/hadoop-2.7.6/etc/hadoop

hadoop-env.sh

vim hadoop-env.sh

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home
export HADOOP_HEAPSIZE=2000
export HADOOP_OPTS="-Djava.security.krb5.realm=OX.AC.UK -Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk"

core-site.xml

vim core-site.xml

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/Users/hadoop-2.7.6/</value>
        <description>A base for other temporary directories.</description>
    </property>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:8000</value>
    </property>
</configuration>

mapred-site.xml

vim mapred-site.xml

<configuration>
    <property>
        <name>mapred.job.tracker</name>
        <value>hdfs://localhost:9000</value>
    </property>
    <property>
        <name>mapred.tasktracker.map.tasks.maximum</name>
        <value>2</value>
    </property>
    <property>
        <name>mapred.tasktracker.reduce.tasks.maximum</name>
        <value>2</value>
    </property>
</configuration>

hdfs-site.xml

vim hdfs-site.xml

<configuration>
        <property>
            <name>dfs.replication</name>
            <value>1</value>
       </property>
</configuration>

yarn-site.xml

vim yarn-site.xml

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>

Formatting the HDFS filesystem

Before HDFS can be used for the first time, the filesystem must be formatted. This is done by running the following command: hdfs namenode -format

Starting and stopping the daemons

To start the HDFS, YARN, and MapReduce daemons, type:

start-dfs.sh start-yarn.sh mr-jobhistory-daemon.sh start historyserver

Stopping the daemons is done as follows:

mr-jobhistory-daemon.sh stop historyserver stop-yarn.sh stop-dfs.sh

Installing Hadoop Fully distributed mode on AWS

_You need to set up AWS instances first. The steps are omitted here. Check AWS documentation for more help._

Update packages on the server

sudo apt-get update

Install Java Developers Kit

sudo apt-get install default-jdk

Get Java install location and copy the location into the /etc/bash.bashrc file

sudo update-alternatives --config java

Add environment variables

sudo chown ubuntu /etc/bash.bashrc echo "JAVA_HOME=/usr/lib/jvm/default-java" >> /etc/bash.bashrc echo "PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/bash.bashrc sudo chown root /etc/bash.bashrc

Instantiate the environment variables

. /etc/bash.bashrc or sudo reboot

Confirm the java version

java -version

Passwordless SSH on your NameNode

sudo apt-get install openssh-server

Generate a key

ssh-keygen -f ~/.ssh/id_rsa -t rsa -P "" sudo cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys or ssh-copy-id -i hduser@localhost ssh <machine_name> or localhost sudo reboot

Download Hadoop from Apache

wget http://apache.mirrors.tds.net/hadoop/common/hadoop-2.x.x/hadoop-2.x.x.tar.gz -P ~/Downloads/Hadoop

Uncompress the Hadoop tar file into the /usr/local folder

sudo tar zxvf ~/Downloads/Hadoop/hadoop-2.7.x.tar.gz -C /usr/local

Move all Hadoop related file from /usr/local to /usr/local/hadoop

sudo mv /usr/local/hadoop-* /usr/local/hadoop

Set environment variables on all Nodes

Add these variables to /etc/bash.bashrc

Repeat the steps above on all Nodes #HADOOP Variables START export HADOOP_HOME=/usr/local/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export HADOOP_DATA_HOME=/home/$USER/hadoop_data/hdfs/ export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin #HADOOP Variables END

Instantiate the environment variables

. /etc/bash.bashrc

Copy /etc/bash.bashrc to all nodes

You'll need to adjust permissions on the /etc/bash.bashrc file on the datanodes scp /etc/bash.bashrc datanode1:/etc/bash.bashrc scp /etc/bash.bashrc datanode2:/etc/bash.bashrc scp /etc/bash.bashrc datanode3:/etc/bash.bashrc

Hadoop Configuration Files on all Nodes

Open permissions to the HADOOP_CONF_DIR:

sudo chown ubuntu -R $HADOOP_CONF_DIR

$HADOOP_CONF_DIR/hadoop-env.sh change JAVA_HOME

export JAVA_HOME=/usr/lib/jvm/default-java

$HADOOP_CONF_DIR/core-site.xml change configuration element

Change the namenode_public_dns to your NameNode Public DNS

<configuration>

  <property>

    <name>fs.defaultFS</name>

    <value>hdfs://namenode_public_dns:9000</value>

  </property>

</configuration>

$HADOOP_CONF_DIR/yarn-site.xml change configuration element

change the namenode_public_dns to your NameNode Public DNS

<configuration>

  <property>

    <name>yarn.nodemanager.aux-services</name>

    <value>mapreduce_shuffle</value>

  </property> 

  <property>

    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

    <value>org.apache.hadoop.mapred.ShuffleHandler</value>

  </property>

  <property>

    <name>yarn.resourcemanager.hostname</name>

    <value>namenode_public_dns</value>

  </property>

</configuration>

$HADOOP_CONF_DIR/mapred-site.xml

Copy the mapred-site.xml template and rename the new file mapred-site.xml edit configuration element sudo cp $HADOOP_CONF_DIR/mapred-site.xml.template $HADOOP_CONF_DIR/mapred-site.xml

Edit the configuration element in mapred-site.xml Change the namenode_public_dns to your NameNode Public DNS

<configuration>

  <property>

    <name>mapreduce.jobtracker.address</name>

    <value>namenode_public_dns:8021</value>

  </property>

  <property>

    <name>mapreduce.framework.name</name>

    <value>yarn</value>

  </property>

</configuration>

For Multi-Node Cluster Copy configuration files to DataNodes

scp $HADOOP_CONF_DIR/mapred-site.xml $HADOOP_CONF_DIR/yarn-site.xml $HADOOP_CONF_DIR/core-site.xml $HADOOP_CONF_DIR/hadoop-env.sh datanode1:$HADOOP_CONF_DIR scp $HADOOP_CONF_DIR/mapred-site.xml $HADOOP_CONF_DIR/yarn-site.xml $HADOOP_CONF_DIR/core-site.xml $HADOOP_CONF_DIR/hadoop-env.sh datanode2:$HADOOP_CONF_DIR scp $HADOOP_CONF_DIR/mapred-site.xml $HADOOP_CONF_DIR/yarn-site.xml $HADOOP_CONF_DIR/core-site.xml $HADOOP_CONF_DIR/hadoop-env.sh datanode3:$HADOOP_CONF_DIR

NameNode Specific Configurations

Get your hostname on all nodes echo $(hostname)

Modify etc/hosts file

Give permission to ubuntu so we can make changes sudo chown ubuntu /etc/hosts

Add these 4 lines above the first entry "127.0.0.1 localhost" and change the dns and host name entries to your DNS and Host

#NameNode 

namenode_ip namenode_public_dns namenode_hostname

#DataNode1 

datanode_ip datanode1_public_dns datanode1_hostname

#DataNode2 

datanode_ip datanode2_public_dns datanode2_hostname

#DataNode3

datanode_ip datanode3_public_dns datanode3_hostname`

Return ownership back to previous values sudo chown root /etc/hosts or sudo chmod 644 /etc/hosts

Modify $HADOOP_CONF_DIR/hdfs-site.xml

Single-Node

<configuration>

 <property>

 <name>dfs.replication</name>

 <value>1</value>

 <description>Default block replication.

 The actual number of replications can be specified when the file is created.

 The default is used if replication is not specified in create time.

 </description>

 </property>

 <property>

 <name>dfs.namenode.name.dir</name>

 <value>file:///app/hadoop/hadoop_data/hdfs/namenode</value>

 </property>

 <property>

 <name>dfs.datanode.data.dir</name>

 <value>file:///app/hadoop/hadoop_data/hdfs/datanode</value>

 </property>

 <property>

 <name>dfs.permissions.enabled</name>

 <value>false</value>

 <description>If "true", enable permission checking in HDFS. If "false", permission checking is turned off, but all other behavior is unchanged. Switching from one parameter value to the other does not change the mode, owner or group of files or directories.</description>

 </property>

</configuration>

Multi-Node

<configuration>

  <property>

    <name>dfs.replication</name>

    <value>3</value>

  </property>

  <property>

    <name>dfs.namenode.name.dir</name>

    <value>file:///usr/local/hadoop/hadoop_data/hdfs/namenode</value>

  </property>

</configuration>

Create the hadoop data directory in the $HADOOP_HOME directory

sudo mkdir -p $HADOOP_HOME/hadoop_data/hdfs/namenode

Create a file named "masters" in the $HADOOP_CONF_DIR directory

echo "namenode_hostname" | cat >> $HADOOP_CONF_DIR/masters

Add the host name to the file $HADOOP_CONF_DIR/masters

for example ip-172.31.52.198

namenode_hostname

Remove the old the $HADOOP_CONF_DIR/slaves file sudo rm $HADOOP_CONF_DIR/slaves

Add datanode hosts to the $HADOOP_CONF_DIR/slaves file

echo "datanode1_hostname" | cat >> $HADOOP_CONF_DIR/slaves echo "datanode2_hostname" | cat >> $HADOOP_CONF_DIR/slaves echo "datanode3_hostname" | cat >> $HADOOP_CONF_DIR/slaves

Add DataNode hosts

datanode1_hostname

datanode2_hostname

datanode3_hostname

localhost entry

Update the $HADOOP_HOME directory ownership to ubuntu (that's you)

sudo chown -R ubuntu $HADOOP_HOME

DataNode Configurations

Modify each DataNodes $HADOOP_CONF_DIR/hdfs-site.xml files

<configuration>

  <property>

    <name>dfs.replication</name>

    <value>3</value>

  </property>

  <property>

    <name>dfs.datanode.data.dir</name>

    <value>file:///usr/local/hadoop/hadoop_data/hdfs/datanode</value>

  </property>

</configuration>

Copy the hdfs-site.xml from datanode1 to 2 and 3

scp $HADOOP_CONF_DIR/hdfs-site.xml datanode2:$HADOOP_CONF_DIR scp $HADOOP_CONF_DIR/hdfs-site.xml datanode3:$HADOOP_CONF_DIR

Create a data directory on each DataNode

sudo mkdir -p $HADOOP_HOME/hadoop_data/hdfs/datanode

Update the $HADOOP_HOME directory ownership to ubuntu on all DataNodes

sudo chown -R ubuntu $HADOOP_HOME

Starting up the Hadoop Cluster!

Format the HDFS hdfs namenode -format

Optional: Hadoop Eclipse plugin for local development

Make sure Hadoop and Eclipse are installed and all hadoop processes are running.

FYI

Hadoop Cluster Setup on documentation

Quoted from Hadoop The Definition Guide, 4th Edition

Each component in Hadoop is configured using an XML file. Common properties go in core-site.xml, and properties pertaining to HDFS, MapReduce, and YARN go into the appropriately named file: hdfs-site.xml, mapred-site.xml, and yarn-site.xml. These files are all located in the etc/hadoop subdirectory. To run Hadoop in a particular mode, you need to do two things: set the appropriate properties, and start the Hadoop daemons. Table A-1 shows the minimal set of properties to configure each mode.

1

In standalone mode, the local filesystem and the local MapReduce job runner are used. In the distributed modes, the HDFS and YARN daemons are started, and MapReduce is configured to use YARN.

2

These files are all found in the etc/hadoop directory of the Hadoop distribution. The configuration directory can be relocated to another part of the filesystem (outside the Hadoop installation, which makes upgrades marginally easier) as long as daemons are started with the --config option (or, equivalently, with the HADOOP_CONF_DIR environment variable set) specifying the location of this directory on the local filesystem.

AbyssV commented 6 years ago

Word Count example

WordCount is a simple application that counts the number of occurrences of each word in a given input set. The code example is provided here.

Prerequisite

Hadoop should be preinstalled and configured on all instances, check Installing Hadoop Fully distributed mode on AWS section for more details

In my case, I have one NameNode(master) and two DataNodes (slaves).

screen shot 2018-07-17 at 6 09 15 pm

Uploading/Compiling your Java program To have an executable jar file, you can either export a jar file from eclipse by clicking File -> Export -> JAR file

screen shot 2018-07-17 at 6 55 49 pm

then upload that file to your ubuntu directory by typing scp -i key.pem your_file_directory ubuntu@public_dns: or you can type javac WordCount.java to compile your Java program on your ubuntu machine

Create directories for input and output path under HDFS directory On your master machine, type hdfs dfs -mkdir /wordcount to create a directory for your project then type hdfs dfs -mkdir /wordcount/input hdfs dfs -mkdir /wordcount/output to create input path and output path

You can also type hdfs dfs -ls /wordcount to check your directory (can also be browsed on webpage)

For more HDFS commands, check HDFS command documentation.

Move input files to your HDFS directory

I assume you already have some input files to test our wordcount program. If you don't have, under your local directory you can type This is a test file Hello Hello Hello World World World" > input1.txt echo "This is another test file Hello Hello World World" > input2.txt do this for several times to create multiple input files for test use.

Transfer your input files to HDFS input path by typing hdfs dfs -put input*.txt /wordcount/input

To check if your input file has been successfully put under the directory, type: hdfs dfs -cat /wordcount/input/input1.txt Then result will be shown on the screen

screen shot 2018-07-17 at 9 25 14 pm

You have done all the prerequisite work.

Application demonstration

Connect to instances

screen shot 2018-07-17 at 6 16 47 pm

In master, start hdfs and yarn

$HADOOP_HOME/sbin/start-dfs.sh $HADOOP_HOME/sbin/start-yarn.sh

You can check if your Hadoop processes are started correctly by typing: jps On master the following information should appeared

screen shot 2018-07-17 at 6 22 11 pm

On slaves

screen shot 2018-07-17 at 6 22 39 pm

You can also check the webpage your_master_public_dns_address:50070 (Hadoop file system) and your_master_public_dns_address: 8088 (resource manager)

screen shot 2018-07-17 at 6 23 44 pm screen shot 2018-07-17 at 6 24 20 pm

Your HDFS can also be browsed on webpage

screen shot 2018-07-17 at 6 24 08 pm

Runing your WordCount jar file

type hadoop jar /home/ubuntu/hadoop-2.7.6/workplace/wordcount/WordCount.jar WordCount /wordcount/input /wordcount/output/2 to run your WordCount program

You should see some messages about the job

To check the output file, type hdfs dfs -cat /wordcount/output/1/part-00000

The result will be shown on the screen

screen shot 2018-07-17 at 9 22 33 pm

Code analysis

Import built-in packages

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

Maper Code

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

1

The Mapper implementation, via the map method, processes one line at a time, as provided by the specified TextInputFormat. It then splits the line into tokens separated by whitespaces, via the StringTokenizer, and emits a key-value pair of < , 1>.

Mapper maps input key/value pairs to a set of intermediate key/value pairs.

Maps are the individual tasks that transform input records into intermediate records. The transformed intermediate records do not need to be of the same type as the input records. A given input pair may map to zero or many output pairs.

The Hadoop MapReduce framework spawns one map task for each InputSplit generated by the InputFormat for the job.

Overall, Mapper implementations are passed the Job for the job via the Job.setMapperClass(Class) method. The framework then calls map(WritableComparable, Writable, Context) for each key/value pair in the InputSplit for that task. Applications can then override the cleanup(Context) method to perform any required cleanup.

Output pairs do not need to be of the same types as input pairs. A given input pair may map to zero or many output pairs. Output pairs are collected with calls to context.write(WritableComparable, Writable).

Applications can use the Counter to report its statistics.

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator via Job.setGroupingComparatorClass(Class).

The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

The intermediate, sorted outputs are always stored in a simple (key-len, key, value-len, value) format. Applications can control if, and how, the intermediate outputs are to be compressed and the CompressionCodec to be used via the Configuration.

Reducer Code

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

2

The _Reduce_r implementation, via the reduce method just sums up the values, which are the occurence counts for each key (i.e. words in this example).

Reducer reduces a set of intermediate values which share a key to a smaller set of values.

The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).

Overall, Reducer implementations are passed the Job for the job via the Job.setReducerClass(Class) method and can override it to initialize themselves. The framework then calls reduce(WritableComparable, Iterable, Context) method for each <key, (list of values)> pair in the grouped inputs. Applications can then override the cleanup(Context) method to perform any required cleanup.

Reducer has 3 primary phases: shuffle, sort and reduce.

Shuffle Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.

Sort The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage.

The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.

Secondary Sort If equivalence rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, then one may specify a Comparator via Job.setSortComparatorClass(Class). Since Job.setGroupingComparatorClass(Class) can be used to control how intermediate keys are grouped, these can be used in conjunction to simulate secondary sort on values.

Reduce In this phase the reduce(WritableComparable, Iterable, Context) method is called for each <key, (list of values)> pair in the grouped inputs.

The output of the reduce task is typically written to the FileSystem via Context.write(WritableComparable, Writable).

Applications can use the Counter to report its statistics.

The output of the Reducer is not sorted.

Driver Code

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

3

The main method specifies various facets of the job, such as the input/output paths (passed via the command line), key/value types, input/output formats etc., in the Job. It then calls the job.waitForCompletion to submit the job and monitor its progress