Hadoop Elefant

wall < "Hadoop installieren & konfigurieren"

Broadcast message from spillerm@unixe.de (pts/1) (Mi Sep 17 12:25:54 2008):
4
Diesen Beitrag schrieb ich vor 10 Jahren. Behalte das beim Lesen bitte im Hinterkopf.

Hier möchte ich Euch zusammenfassend aufschreiben, wie Hadoop initial eingerichtet und betrieben werden kann; für Fine-Tuning ist natürlich immer Raum, und ich möchte einfach den Leuten, die das Zeug einsetzen (müssen) einige erste Anhaltspunkte liefern — die derzeit verfügbare Dokumentation ist ein wenig… na egal ;)

Ich gehe hier von drei Maschinen aus, die im selben Subnetz hängen: ein Master (master01 10.16.0.1), der die Arbeit an die Nodes verteilen wird, und zwei Nodes (node01 10.16.0.100 & node02 10.16.0.101), die rechnen werden. Im Laufe dieses Artikels müssen also folgende Schritte bewältigt werden:

  • ein einheitliches Konzept für alle Maschinen überlegen
  • auf allen Maschinen die identische Java-Version installieren
  • die aktuelle Hadoop-Version herunterladen
  • Hadoop installieren und konfigurieren
  • die benötigten Dienste starten und überwachen
  • einen Testlauf random-sort starten

Das einheitliche Konzept

Ich stelle das Konzept vor, dass sich für mich bislang ganz gut bewährt hat, aber prinzipiell kann das jeder machen, wie er will.

  • Java wird nach /opt installiert (jdk1.6.0_02) und ein Symlink gesetzt (java), so dass die Konfiguration von Hadoop nicht extra angepasst werden muss, wenn ein neues Java installiert wird.
  • Für die Arbeitsdaten wird die Partition /data benutzt.
  • Die Software soll nicht unter der UID root laufen; ich entscheide mich stattdessen für den/die aussagekräftige/n User/Gruppe compute; User und Gruppe müssen entsprechend auf Master und Nodes eingerichtet werden, Home-Verzeichnis /home/compute.

Java installieren

Dieser Arbeitsschritt unterscheidet sich nicht für Master und Nodes! Ich entscheide mich für Java-1.6 und transportiere die Datei jdk-6u2-linux-i586.bin nach /opt. Anschliessend wird sie ausgeführt und entpackt sich automatisch in den Ordner /opt/jdk1.6.0_02, jetzt kann ich meinen Symlink setzen:

$ cd /opt
$ ./jdk-6u2-linux-i586.bin ## Abfrage mit 'yes' bestätigen
$ ln -s jdk1.6.0_02 java

Hadoop installieren

Auch diese Arbeitsschritte sind prinzipiell für Master und Nodes gleich! Ich arbeite nun unter dem Usernamen compute (nicht als root, und das ist wichtig).

$ cd /home/compute
$ mkdir -p var/run
$ wget http://apache.mirroring.de/hadoop/core/hadoop-0.18.0/hadoop-0.18.0.tar.gz
$ tar xvfz hadoop-0.18.0.tar.gz
$ mv hadoop-0.18.0 Hadoop-0.18.0
$ ln -s Hadoop-0.18.0 hadoop

Ich habe nun also im Home-Verzeichnis des Users compute die Ordner var und var/run angelegt, die derzeit aktuelle Version der Software per wget heruntergeladen, sie ausgepackt und einen Symlink gesetzt. Wie geht es weiter?

Hadoop konfigurieren

Wir befinden uns nun im Ordner /home/compute/hadoop/conf. Auch diese Schritte müssen sowohl auf dem Master als auch auf den Nodes ausgeführt werden. Eine unterschiedliche Konfiguration ist möglich, verursacht aber erhöhten Wartungsaufwand.

masters

In diesem Beispiel belasse ich die Datei masters leer; sie ist wichtig, wenn mehrere Master vorhanden sind und secondary namenodes betrieben werden sollen — in unserem Mini-Cluster ist dies nicht der Fall.

slaves

Die Datei slaves enthält eine Liste aller Nodes, die in Zukunft rechnen sollen; es ist sinnvoll, den Master, der in aller Regel neben dem Verteilen der Jobs auch noch andere Aufgaben wie DHCP, DNS etc. zu bewältigen hat, nicht mitrechnen zu lassen. Die Datei sieht also so aus:

## Konfig: /home/compute/hadoop/conf/slaves
10.16.0.100
10.16.0.101

hadoop-env.sh

Darüberhinaus muss der Software mitgeteilt werden, wo eine gültige Java-Version zu finden ist; das erledigen wir in der Datei hadoop-env.sh; die Variable JAVA_HOME ist die einzig wirklich wichtige, die anderen sind alle optional — ich passe aber auch den Speicherort für die PID-Files an, denn /tmp gefällt mir nicht:

## Konfig: /home/compute/hadoop/conf/hadoop-env.sh
export JAVA_HOME=/opt/java
# export HADOOP_CLASSPATH=
# export HADOOP_HEAPSIZE=2000
# export HADOOP_OPTS=-server
export HADOOP_NAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_NAMENODE_OPTS"
export HADOOP_SECONDARYNAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_SECONDARYNAMENODE_OPTS"
export HADOOP_DATANODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTS"
export HADOOP_BALANCER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_BALANCER_OPTS"
export HADOOP_JOBTRACKER_OPTS="-Dcom.sun.management.jmxremote $HADOOP_JOBTRACKER_OPTS"
# export HADOOP_TASKTRACKER_OPTS=
# export HADOOP_CLIENT_OPTS
# export HADOOP_SSH_OPTS="-o ConnectTimeout=1 -o SendEnv=HADOOP_CONF_DIR"
# export HADOOP_LOG_DIR=${HADOOP_HOME}/logs
# export HADOOP_SLAVES=${HADOOP_HOME}/conf/slaves
# export HADOOP_MASTER=master:/home/$USER/src/hadoop
# export HADOOP_SLAVE_SLEEP=0.1
export HADOOP_PID_DIR=/home/compute/var/run
# export HADOOP_IDENT_STRING=$USER
# export HADOOP_NICENESS=10

hadoop-site.xml

In dieser Datei definieren wir eigene Einträge; wichtig ist, nicht die Datei hadoop-default.xml zu editieren! Eine leere hadoop-site.xml ist aber schon im Package enthalten, diese werden wir nun bearbeiten; die einzelnen Punkte sind in der Datei ordentlich dokumentiert. Insgesamt verwenden wir die folgenden Daten:

  • mapred.job.tracker master01:9001 — Host und Port, auf dem der MapReduce job tracker läuft
  • mapred.system.dir /home/compute/tmp/mapred/system shared directory zur Aufbewahrung von Kontrolldateien
  • mapred.local.dir /data/tmp zur Aufbewahrung von Zwischenergebnissen
  • fs.default.name hdfs://master01:8020/ default file system
  • dfs.data.dir /data/dfs/data — wohin im lokalen Filesystem wandern die Blöcke des DFS?
  • dfs.name.dir /data/dfs/name — wohin wandert im lokalen Filesystem die name table?
  • mapred.child.java.opts -Xmx1024m — Java-Optionen für den Tasktracker-Child-Prozess, Speicherangabe im MB (1024MB in diesem Beispiel)
  • mapred.tasktracker.tasks.maximum 4 — Maximale Anzahl von Tasks für einen Tasktracker
  • mapred.reduce.parallel.copies 20
  • mapred.tasktracker.reduce.tasks.maximum 4
  • io.sort.factor 50
  • io.sort.mb 1000

Die vollständige Konfiguration sieht dann so aus:

## Konfig: /home/compute/hadoop/conf/hadoop-site.xml 
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>
  <name>mapred.job.tracker</name>
  <value>master01:9001</value>
  <description>The host and port that the MapReduce job tracker runs
  at.  If "local", then jobs are run in-process as a single map
  and reduce task.
  </description>
</property>

<property>
  <name>mapred.system.dir</name>
  <value>/home/compute/tmp/mapred/system</value>
  <description>The shared directory where MapReduce stores control files.
  </description>
</property>

<property>
  <name>mapred.local.dir</name>
  <value>/data/tmp</value>
  <description>The local directory where MapReduce stores intermediate
  data files.  May be a comma-separated list of
  directories on different devices in order to spread disk i/o.
  Directories that do not exist are ignored.
  </description>
</property>

<property>
  <name>fs.default.name</name>
  <value>hdfs://master01:8020/</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.
 </description>
</property>

<property>
  <name>dfs.data.dir</name>
  <value>/data/dfs/data</value>
  <description>Determines where on the local filesystem an DFS data node
  should store its blocks.  If this is a comma-delimited
  list of directories, then data will be stored in all named
  directories, typically on different devices.
  Directories that do not exist are ignored.
  </description>
</property>

<property>
  <name>dfs.name.dir</name>
  <value>/data/dfs/name</value>
  <description>Determines where on the local filesystem the DFS name node
      should store the name table.  If this is a comma-delimited list
      of directories then the name table is replicated in all of the
      directories, for redundancy.
 </description>
</property>

<property>
  <name>mapred.child.java.opts</name>
  <value>-Xmx1024m</value>
  <description>Java opts for the task tracker child processes.  Subsumes
  'mapred.child.heap.size' (If a mapred.child.heap.size value is found
  in a configuration, its maximum heap size will be used and a warning
  emitted that heap.size has been deprecated). Also, the following symbol,
  if present, will be interpolated: @taskid@ is replaced by current TaskID.
  Any other occurrences of '@' will go unchanged. For
  example, to enable verbose gc logging to a file named for the taskid in
  /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
        -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
  </description>
</property>

<property>
  <name>mapred.reduce.parallel.copies</name>
  <value>20</value>
  <description>The default number of parallel transfers run by reduce
  during the copy(shuffle) phase.
  </description>
</property>

<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>4</value>
  <description>The maximum number of map tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>4</value>
  <description>The maximum number of reduce tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

<property>
  <name>io.sort.factor</name>
  <value>50</value>
  <description>The number of streams to merge at once while sorting
  files.  This determines the number of open file handles.
 </description>
</property>

<property>
  <name>io.sort.mb</name>
  <value>1000</value>
  <description>The total amount of buffer memory to use while sorting
  files, in megabytes.  By default, gives each merge stream 1MB, which
  should minimize seeks.
 </description>
</property>
</configuration>

Benötigte Ordner und Zugriffsrechte

Und nun legen wir die benötigten Ordner mal an:

$ mkdir -p /home/compute/tmp/mapred/system
$ mkdir -p /data/tmp
$ mkdir -p /data/dfs/data
$ mkdir -p /data/dfs/name

Anschliessend stellen wir sicher, dass überall die korrekten Zugriffsrechte eingestellt sind:

$ chown -R compute:compute /data/
$ chmod 0777 /data/
$ chmod a+t /data/
$ chown -R compute:compute /home/compute

HDFS: Filesystem initialisieren

Jetzt muss das HDFS vorbereitet werden; hierzu bewegen wir uns als User compute auf master01 nach /home/compute/hadoop:

$ bin/hadoop namenode -format
08/08/11 16:39:00 INFO dfs.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = master01/127.0.0.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.18.0
STARTUP_MSG:   build = http://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18 -r 686010; compiled by 'hadoopqa' on Thu Aug 14 19:48:33 UTC 2008
 ************************************************************/
Re-format filesystem in /data/dfs/name ? (Y or N) Y
08/08/11 16:39:29 INFO fs.FSNamesystem: fsOwner=compute,compute,dialout,cdrom,floppy,audio,video,plugdev
08/08/11 16:39:29 INFO fs.FSNamesystem: supergroup=supergroup
08/08/11 16:39:29 INFO fs.FSNamesystem: isPermissionEnabled=true
08/08/11 16:39:29 INFO dfs.Storage: Storage directory /data/dfs/name has been successfully formatted.
08/08/11 16:39:29 INFO dfs.NameNode: SHUTDOWN_MSG: 
 /************************************************************
 SHUTDOWN_MSG: Shutting down NameNode at master01/127.0.0.1
 ************************************************************/

Dienste starten

Jetzt, wo alles soweit konfiguriert ist, können wir es wagen, die Dienste zu starten; dies gestaltet sich vergleichsweise unspektakulär:

$ /home/compute/hadoop/bin/start-dfs.sh
$ /home/compute/hadoop/bin/start-mapred.sh

In logs/ finden sich die Logfiles, die im Fehlerfalle zu durchforsten sind. Ein /opt/java/bin/jps gibt Aufschluss darüber, welche Prozesse gerade laufen:

$ /opt/java/bin/jps
9769 Jps
4838 DataNode
5276 TaskTracker

Ein erster Testlauf

Um das Setup einmal zu testen gibt es die Möglichkeit, in einem ersten Arbeitsschritt zufällige Zahlen zu generieren und diese im zweiten Arbeitsschritt sortieren zu lassen. Per default werden 10GB an Daten generiert — was unter Umständen, je nach Setup, viel zu viel sein kann. Doch wie der Homepage zu entnehmen ist, kann diese Default-Einstellung verändert werden. Hierzu legen wir uns ein kleines Konfig-File an:

## Konfig: /home/compute/hadoop/conf/randomwriter.conf
test.randomwrite.bytes_per_map=512

Vor jedem Lauf müssen wir den Ordner mit den zuvor generierten Zufallszahlen löschen, erst dann können wir ihn neu erstellen:

## Ordner mit bereits generierten Zufallszahlen löschen
$ bin/hadoop dfs -rmr rand-sort
## Ordner mit Zufallszahlen unter Zuhilfenahme des Konfig-Files erstellen
$ bin/hadoop jar hadoop-0.16.4-examples.jar randomwriter rand conf/randomwriter.conf
## Wenn dies fehlerfrei durchläuft: Zufallszahlen sortieren 
$ bin/hadoop jar hadoop-0.16.4-examples.jar sort rand rand-sort

Webinterfaces

Hadoop bringt diverse Webinterfaces mit, die die Arbeit mit dem System erleichtern:

  • NameNodeOverview: http://10.16.0.1:50070/
  • TaskTrackerOverview: http://10.16.0.1:50030/

Mittels ganglia und rrdtool können lustige Bildchen bzgl. der Auslastung des Gesamt-Clusters und jedes einzelnen Nodes gemalt werden — und wie das geht, habe ich euch hier aufnotiert!

4