bwhite / hadoopy

Python MapReduce library written in Cython. Visit us in #hadoopy on freenode. See the link below for documentation and tutorials.
GNU General Public License v3.0
243 stars 59 forks source link

using hadoopy.launch() with wc.py example causes error in StreamJob #41

Open tylere opened 13 years ago

tylere commented 13 years ago

hadoopy test notes

Environment

Start with a 'clean' system::

sudo apt-get remove --purge -y hadoop-0.20
sudo rm -fr /var/run/hadoop-0.20 /var/log/hadoop-0.20 /var/lib/hadoop-0.20 /etc/hadoop-0.20
Install Hadoop with the Psuedo node configuration::
sudo apt-get install -y hadoop-0.20
sudo apt-get install -y hadoop-0.20-conf-pseudo

Start the services::

for service in /etc/init.d/hadoop-0.20-*; do sudo $service start; done

Check on the services::

for service in /etc/init.d/hadoop-0.20-*; do sudo $service status; done
hadoop-0.20-datanode is running
hadoop-0.20-jobtracker is running
hadoop-0.20-namenode is running
hadoop-0.20-secondarynamenode is running
hadoop-0.20-tasktracker is running

Copy some data to HDFS::

hadoop-0.20 fs -mkdir input
hadoop-0.20 fs -put /etc/hadoop-0.20/conf/*.xml input
hadoop-0.20 fs -ls input

Try running an example job::

hadoop-0.20 jar /usr/lib/hadoop-0.20/hadoop-*-examples.jar grep input output 'dfs[a-z.]+'
11/11/18 15:37:52 WARN snappy.LoadSnappy: Snappy native library is available
11/11/18 15:37:52 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/11/18 15:37:52 INFO snappy.LoadSnappy: Snappy native library loaded
11/11/18 15:37:52 INFO mapred.FileInputFormat: Total input paths to process : 7
11/11/18 15:37:53 INFO mapred.JobClient: Running job: job_201111181442_0016
11/11/18 15:37:54 INFO mapred.JobClient:  map 0% reduce 0%
11/11/18 15:37:58 INFO mapred.JobClient:  map 28% reduce 0%
11/11/18 15:38:00 INFO mapred.JobClient:  map 42% reduce 0%
11/11/18 15:38:01 INFO mapred.JobClient:  map 57% reduce 0%
11/11/18 15:38:03 INFO mapred.JobClient:  map 85% reduce 0%
11/11/18 15:38:04 INFO mapred.JobClient:  map 100% reduce 0%
11/11/18 15:38:06 INFO mapred.JobClient:  map 100% reduce 33%
11/11/18 15:38:08 INFO mapred.JobClient:  map 100% reduce 100%
11/11/18 15:38:08 INFO mapred.JobClient: Job complete: job_201111181442_0016
11/11/18 15:38:08 INFO mapred.JobClient: Counters: 23
11/11/18 15:38:08 INFO mapred.JobClient:   Job Counters 
11/11/18 15:38:08 INFO mapred.JobClient:     Launched reduce tasks=1
11/11/18 15:38:08 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=15554
11/11/18 15:38:08 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
11/11/18 15:38:08 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
11/11/18 15:38:08 INFO mapred.JobClient:     Launched map tasks=7
11/11/18 15:38:08 INFO mapred.JobClient:     Data-local map tasks=7
11/11/18 15:38:08 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=9706
11/11/18 15:38:08 INFO mapred.JobClient:   FileSystemCounters
11/11/18 15:38:08 INFO mapred.JobClient:     FILE_BYTES_READ=191
11/11/18 15:38:08 INFO mapred.JobClient:     HDFS_BYTES_READ=17711
11/11/18 15:38:08 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=442160
11/11/18 15:38:08 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=313
11/11/18 15:38:08 INFO mapred.JobClient:   Map-Reduce Framework
11/11/18 15:38:08 INFO mapred.JobClient:     Reduce input groups=7
11/11/18 15:38:08 INFO mapred.JobClient:     Combine output records=7
11/11/18 15:38:08 INFO mapred.JobClient:     Map input records=424
11/11/18 15:38:08 INFO mapred.JobClient:     Reduce shuffle bytes=227
11/11/18 15:38:08 INFO mapred.JobClient:     Reduce output records=7
11/11/18 15:38:08 INFO mapred.JobClient:     Spilled Records=14
11/11/18 15:38:08 INFO mapred.JobClient:     Map output bytes=171
11/11/18 15:38:08 INFO mapred.JobClient:     Map input bytes=16962
11/11/18 15:38:08 INFO mapred.JobClient:     Combine input records=7
11/11/18 15:38:08 INFO mapred.JobClient:     Map output records=7
11/11/18 15:38:08 INFO mapred.JobClient:     SPLIT_RAW_BYTES=749
11/11/18 15:38:08 INFO mapred.JobClient:     Reduce input records=7
11/11/18 15:38:08 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/11/18 15:38:08 INFO mapred.FileInputFormat: Total input paths to process : 1
11/11/18 15:38:08 INFO mapred.JobClient: Running job: job_201111181442_0017
11/11/18 15:38:09 INFO mapred.JobClient:  map 0% reduce 0%
11/11/18 15:38:12 INFO mapred.JobClient:  map 100% reduce 0%
11/11/18 15:38:20 INFO mapred.JobClient:  map 100% reduce 100%
11/11/18 15:38:20 INFO mapred.JobClient: Job complete: job_201111181442_0017
11/11/18 15:38:20 INFO mapred.JobClient: Counters: 23
11/11/18 15:38:20 INFO mapred.JobClient:   Job Counters 
11/11/18 15:38:20 INFO mapred.JobClient:     Launched reduce tasks=1
11/11/18 15:38:20 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=2798
11/11/18 15:38:20 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
11/11/18 15:38:20 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
11/11/18 15:38:20 INFO mapred.JobClient:     Launched map tasks=1
11/11/18 15:38:20 INFO mapred.JobClient:     Data-local map tasks=1
11/11/18 15:38:20 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=8014
11/11/18 15:38:20 INFO mapred.JobClient:   FileSystemCounters
11/11/18 15:38:20 INFO mapred.JobClient:     FILE_BYTES_READ=191
11/11/18 15:38:20 INFO mapred.JobClient:     HDFS_BYTES_READ=427
11/11/18 15:38:20 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=108532
11/11/18 15:38:20 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=129
11/11/18 15:38:20 INFO mapred.JobClient:   Map-Reduce Framework
11/11/18 15:38:20 INFO mapred.JobClient:     Reduce input groups=1
11/11/18 15:38:20 INFO mapred.JobClient:     Combine output records=0
11/11/18 15:38:20 INFO mapred.JobClient:     Map input records=7
11/11/18 15:38:20 INFO mapred.JobClient:     Reduce shuffle bytes=0
11/11/18 15:38:20 INFO mapred.JobClient:     Reduce output records=7
11/11/18 15:38:20 INFO mapred.JobClient:     Spilled Records=14
11/11/18 15:38:20 INFO mapred.JobClient:     Map output bytes=171
11/11/18 15:38:20 INFO mapred.JobClient:     Map input bytes=227
11/11/18 15:38:20 INFO mapred.JobClient:     Combine input records=0
11/11/18 15:38:20 INFO mapred.JobClient:     Map output records=7
11/11/18 15:38:20 INFO mapred.JobClient:     SPLIT_RAW_BYTES=114
11/11/18 15:38:20 INFO mapred.JobClient:     Reduce input records=7
Install hadoopy

Make a Python virtual environment::

mkvirtualenv --no-site-packages hadoopy
pip install hadoopy

Test out hadoopy

Setup a working directory::

mkdir ~/temp/hadoopy
cd ~/temp/hadoopy

Download the wc.py example::

wget https://raw.github.com/bwhite/hadoopy/master/tests/wc.py

Command line test (without args)::

python wc.py
Hadoopy Wordcount Demo

Command line test (map/sort/reduce)::

echo "a b a a b c" | python wc.py map | sort | python wc.py reduce
a   3
b   2
c   1

Download some test files::

mkdir ~/temp/hadoopy/playground
cd ~/temp/hadoopy/playground
wget https://raw.github.com/bwhite/hadoopy/master/examples/data/wc-input-alice.tb
wget https://raw.github.com/bwhite/hadoopy/master/examples/data/wc-input-alice.txt
wget https://raw.github.com/bwhite/hadoopy/master/examples/data/wc-input-alice.txt.gz
cd ~/temp/hadoopy

copy local files to HDFS::

hadoop fs -put ~/temp/hadoopy/playground playground

list them out

hadoop fs -ls playground
Found 3 items
-rw-r--r--   1 taericks supergroup     259835 2011-11-18 14:58 /user/taericks/playground/wc-input-alice.tb
-rw-r--r--   1 taericks supergroup     167529 2011-11-18 14:58 /user/taericks/playground/wc-input-alice.txt
-rw-r--r--   1 taericks supergroup      60638 2011-11-18 14:58 /user/taericks/playground/wc-input-alice.txt.gz

Output using hadoopy.launch()

(hadoopy)taericks@precisionm6500b:~/temp/hadoopy$ python -c "import hadoopy; out = hadoopy.launch('playground/wc-input-alice.txt', 'playground/out/', 'wc.py')"
/\----------Hadoop Output----------/\
hadoopy: Running[hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -output playground/out/ -input playground/wc-input-alice.txt -mapper "python wc.py pipe map" -reducer "python wc.py pipe reduce" -combiner "python wc.py pipe combine" -file wc.py -jobconf mapred.job.name=wc -io typedbytes -outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat -inputformat AutoInputFormat]
11/11/18 15:31:07 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [wc.py, /var/lib/hadoop-0.20/cache/taericks/hadoop-unjar7362629401782511066/] [] /tmp/streamjob2464703749544658811.jar tmpDir=null
11/11/18 15:31:08 INFO mapred.FileInputFormat: Total input paths to process : 1
11/11/18 15:31:08 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/taericks/mapred/local]
11/11/18 15:31:08 INFO streaming.StreamJob: Running job: job_201111181442_0013
11/11/18 15:31:08 INFO streaming.StreamJob: To kill this job, run:
11/11/18 15:31:08 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201111181442_0013
11/11/18 15:31:08 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201111181442_0013
11/11/18 15:31:09 INFO streaming.StreamJob:  map 0%  reduce 0%
11/11/18 15:31:12 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/18 15:31:19 INFO streaming.StreamJob:  map 100%  reduce 17%
11/11/18 15:31:22 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/18 15:31:29 INFO streaming.StreamJob:  map 100%  reduce 17%
11/11/18 15:31:32 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/18 15:31:39 INFO streaming.StreamJob:  map 100%  reduce 17%
11/11/18 15:31:43 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/18 15:31:50 INFO streaming.StreamJob:  map 100%  reduce 33%
11/11/18 15:31:53 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/18 15:31:54 INFO streaming.StreamJob:  map 100%  reduce 100%
11/11/18 15:31:54 INFO streaming.StreamJob: To kill this job, run:
11/11/18 15:31:54 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201111181442_0013
11/11/18 15:31:54 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201111181442_0013
11/11/18 15:31:54 ERROR streaming.StreamJob: Job not successful. Error: NA
11/11/18 15:31:54 INFO streaming.StreamJob: killJob...
Streaming Command Failed!
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/terickson/.virtualenvs/hadoopy/lib/python2.6/site-packages/hadoopy/_runner.py", line 214, in launch
    raise subprocess.CalledProcessError(process.returncode, ' '.join(cmd))
subprocess.CalledProcessError: Command 'hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -output playground/out/ -input playground/wc-input-alice.txt -mapper "python wc.py pipe map" -reducer "python wc.py pipe reduce" -combiner "python wc.py pipe combine" -file wc.py -jobconf mapred.job.name=wc -io typedbytes -outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat -inputformat AutoInputFormat' returned non-zero exit status 1

By contrast, using hadoopy.launch_frozen() works.

(hadoopy)taericks@precisionm6500b:~/temp/hadoopy$ python -c "import hadoopy; out = hadoopy.launch_frozen('playground/wc-input-alice.txt', 'playground/out/', 'wc.py')"
/\----------Hadoop Output----------/\
hadoopy: Running[hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -output playground/out/ -input playground/wc-input-alice.txt -mapper "_frozen/wc pipe map" -reducer "_frozen/wc pipe reduce" -combiner "_frozen/wc pipe combine" -jobconf "mapred.cache.archives=_hadoopy_temp/1321658960.368775/_frozen.tar#_frozen" -jobconf "mapreduce.job.cache.archives=_hadoopy_temp/1321658960.368775/_frozen.tar#_frozen" -jobconf mapred.job.name=wc -io typedbytes -outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat -inputformat AutoInputFormat]
11/11/18 15:29:25 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [/var/lib/hadoop-0.20/cache/taericks/hadoop-unjar455082660605498019/] [] /tmp/streamjob1065141797242303883.jar tmpDir=null
11/11/18 15:29:26 INFO mapred.FileInputFormat: Total input paths to process : 1
11/11/18 15:29:26 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/taericks/mapred/local]
11/11/18 15:29:26 INFO streaming.StreamJob: Running job: job_201111181442_0012
11/11/18 15:29:26 INFO streaming.StreamJob: To kill this job, run:
11/11/18 15:29:26 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201111181442_0012
11/11/18 15:29:26 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201111181442_0012
11/11/18 15:29:27 INFO streaming.StreamJob:  map 0%  reduce 0%
11/11/18 15:29:31 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/18 15:29:38 INFO streaming.StreamJob:  map 100%  reduce 33%
11/11/18 15:29:40 INFO streaming.StreamJob:  map 100%  reduce 100%
11/11/18 15:29:41 INFO streaming.StreamJob: Job complete: job_201111181442_0012
11/11/18 15:29:41 INFO streaming.StreamJob: Output: playground/out/
\/----------Hadoop Output----------\/
bwhite commented 12 years ago

Can you post the stderr of the task? You can find it by going to http://localhost:50030/jobdetails.jsp?jobid=job_201111181442_0013, clicking on failed tasks, going to a specific task, then stderr. I suspect it just had a problem with python in launching the job as when it tries to run python it probably isn't using your virtual env. This should go away in the psuedo distributed case if you install hadoopy without virtual env or use launch_frozen (as you did). The benefit of launch_frozen is that it doesn't use any python anything on the cluster (in your case the base python packages which don't appear to include hadoopy) and instead brings it along in a single package.

tylere commented 12 years ago

What you described makes sense... the stderr log indicates that hadoopy cannot be found. However, I prefer not to install python packages system wide. Is there a way to make hadoopy.launch() utilize the python environment that it was invoked in?

stderr log

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572) at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:479) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.hadoop.mapred.Child.main(Child.java:264) Traceback (most recent call last): File "wc.py", line 22, in import hadoopy ImportError: No module named hadoopy log4j:WARN No appenders could be found for logger (org.apache.hadoop.hdfs.DFSClient). log4j:WARN Please initialize the log4j system properly.

bwhite commented 12 years ago

I'll look into it later, possibly by making an option to change into a virtualenv right away given an optional parameter. If you're interested in looking into it yourself, check out the code for hadoopy.launch and it'll probably be fairly straightforward.