commoncrawl / cc-mrjob

Demonstration of using Python to process the Common Crawl dataset with the mrjob framework
MIT License
166 stars 65 forks source link

Not working anymore on EMR? "subprocess failed with code 1" #22

Closed joergrech closed 6 years ago

joergrech commented 7 years ago

Hi, I have some problems letting the MR jobs run on EMR. The scripts work locally and they work with 1 or 10 warc files on EMR but with 100 I always get some failures of the type "PipeMapRed.waitOutputThreads(): subprocess failed with code 1". I start the script with the following command:

python email_counter_emr.py -r emr --jobconf mapreduce.task.timeout=3600000 --conf-path mrjob.conf --no-output --output-dir=s3://tw-mapreduce/emailscan-8/ input/test-100.warc

NOTE: I use the 2017-10-16 commoncrawl files in test-100.warc - e.g. crawl-data/CC-MAIN-2017-43/segments/1508187820466.2/warc/CC-MAIN-20171016214209-20171016234209-00000.warc.gz

My mrjob.conf file:

runners:
  emr:
    region: us-west-2
    zone: us-west-2c

    instance_type: c3.8xlarge
    core_instance_bid_price: '1.0'
    num_core_instances: 10

    # We also install packages needed for streaming compressed files from S3 or reading WARC files
    image_version: 3.0.4   # There's a newer AMI version but it has issues with the released stable mrjob
    interpreter: python2.7 # EMR comes with Python 2.6 by default -- installing Python 2.7 takes a while but might be necessary
    bootstrap:
    - sudo yum --releasever=2014.09 install -y python27 python27-devel gcc-c++
    - sudo python2.7 get-pip.py#
    - sudo pip2.7 install boto mrjob simplejson warc
    - sudo pip2.7 install https://github.com/commoncrawl/gzipstream/archive/master.zip

My MR-script email_counter_emr.py to identify emails (based on tag_counter.py but shortened):

#!/usr/bin/env python

import gzip
import logging
import os.path as Path
import warc
import boto
from boto.s3.key import Key
from gzipstream import GzipStreamFile
from mrjob.job import MRJob
from mrjob.util import log_to_stream

import re
from collections import Counter

EMAIL_PATTERN = re.compile(r"\b[a-z0-9\p{L}\.\_\%\+\-]+@[a-z0-9\p{L}\.\-]+\.[a-z]{2,}\b")
FILE_PATTERN  = re.compile(r".*\.(jpe?g|a?png|gif|tiff?|bmp|ico|dxf|webp|svg|eps|pdf|html?|css|js|py|downloads?|hidden|stream|chnla|aspx|invalid|jira|comand|tld|funky|nix)$")

class EmailCounter(MRJob):
    def configure_args(self):
        super(EmailCounter, self).configure_args()
        self.pass_arg_through('--runner')
        self.pass_arg_through('-r')

    def process_record(self, record):
        # We're only interested in the HTTP responses
        if record['Content-Type'] != 'application/http; msgtype=response':
            return
        payload = record.payload.read()

        # The HTTP response is defined by a specification: first part is headers (metadata)
        # and then following two CRLFs (newlines) has the data for the response
        headers, body = payload.split('\r\n\r\n', 1)
        if not 'Content-Type: text/html' in headers:
            return
        for email in EMAIL_PATTERN.findall(body.lower()):
            # filter out files and dirty emails
            if not (FILE_PATTERN.match(email) or (".." in email)):
                yield (email, 1)
            else:
                yield "x@x.x", 0

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))

    def mapper(self, _, line):
        # If we're on EC2 or running on a Hadoop cluster, pull files via S3
        if self.options.runner in ['emr', 'hadoop']:
            # Connect to Amazon S3 using anonymous credentials
            conn = boto.connect_s3(anon=True)
            pds = conn.get_bucket('commoncrawl')
            k = Key(pds, line)
            ccfile = warc.WARCFile(fileobj=GzipStreamFile(k))
        # If we're local, use files on the local file system
        else:
            line = Path.join(Path.abspath("/Users/jorgrech/repos/cc-mrjob"), line)
            ccfile = warc.WARCFile(fileobj=gzip.open(line))

        for _i, record in enumerate(ccfile):
            for key, value in self.process_record(record):
                yield (key, value)

if __name__ == '__main__':
    EmailCounter.run()

The logs have the following info: syslog:

...
2017-11-05 19:40:37,023 INFO org.apache.hadoop.mapreduce.Job (main):  map 98% reduce 31%
2017-11-05 19:40:46,080 INFO org.apache.hadoop.mapreduce.Job (main):  map 99% reduce 31%
2017-11-05 19:45:18,145 INFO org.apache.hadoop.mapreduce.Job (main): Task Id : attempt_1509908316427_0001_m_000065_1, Status : FAILED
2017-11-05 19:45:19,149 INFO org.apache.hadoop.mapreduce.Job (main):  map 98% reduce 31%
2017-11-05 19:45:29,202 INFO org.apache.hadoop.mapreduce.Job (main):  map 99% reduce 31%
2017-11-05 19:49:52,766 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 100%
2017-11-05 19:49:52,775 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1509908316427_0001 failed with state FAILED due to: Task failed task_1509908316427_0001_m_000019
Job failed as tasks failed. failedMaps:1 failedReduces:0
...

stderr:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:433)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

As you mentioned "run over batches of files" in the readme I was hoping that you have some leads what could be wrong or how I can debug this problem. Running over ~8900 files with 10 warc files (for the 2017-10-16 commoncrawl) would be a little bit excessive and more expensive.

sebastian-nagel commented 7 years ago

Thanks for reporting this, can you add the task logs from the failed task:

2017-11-05 19:45:18,145 INFO org.apache.hadoop.mapreduce.Job (main): Task Id : attempt_1509908316427_0001_m_000065_1, Status : FAILED

It can be a problem caused by a specific input (e.g., invalid encoding). It may be also a substantial problem: A user reported a job failing with many (too many) failures fetching data from S3 (mostly "503 slow down"). I've also observed this issue a month ago. No glue why this issue appeared recently and not the years ago. But for my case it's fixed by upgrading to use boto3 (#18, #21). I only want to verify the solution with more tests on various examples. In case, the task log shows a similar problem, could you report whether the patch to use boto3 fixes your problem? Thanks!

joergrech commented 7 years ago

I hope you mean the file task-attempts/application_1509908316427_0001/container_1509908316427_0001_01_000065/syslog:

2017-11-05 19:00:34,590 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2017-11-05 19:00:34,593 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2017-11-05 19:00:35,345 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2017-11-05 19:00:35,372 INFO [main] org.apache.hadoop.metrics2.sink.cloudwatch.CloudWatchSink: Initializing the CloudWatchSink for metrics.
2017-11-05 19:00:35,631 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSinkAdapter: Sink file started
2017-11-05 19:00:35,728 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 300 second(s).
2017-11-05 19:00:35,728 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
2017-11-05 19:00:35,745 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
2017-11-05 19:00:35,745 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1509908316427_0001, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@8f9c8a7)
2017-11-05 19:00:35,859 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
2017-11-05 19:00:36,547 INFO [main] amazon.emr.metrics.MetricsSaver: MetricsSaver YarnChild root:hdfs:///mnt/var/lib/hadoop/metrics/ period:60 instanceId:i-038ee0934f815f02a jobflow:j-1R3D55QZR1BO9
2017-11-05 19:00:36,758 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2017-11-05 19:00:36,759 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2017-11-05 19:00:37,031 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1509908316427_0001
2017-11-05 19:00:37,357 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2017-11-05 19:00:37,358 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2017-11-05 19:00:37,583 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
2017-11-05 19:00:37,583 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
2017-11-05 19:00:37,585 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
2017-11-05 19:00:37,599 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
2017-11-05 19:00:37,600 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2017-11-05 19:00:37,601 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: job.local.dir is deprecated. Instead, use mapreduce.job.local.dir
2017-11-05 19:00:37,602 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.cache.localFiles is deprecated. Instead, use mapreduce.job.cache.local.files
2017-11-05 19:00:37,602 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
2017-11-05 19:00:38,204 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
2017-11-05 19:00:39,072 INFO [main] org.apache.hadoop.mapred.MapTask: Host name: ip-172-31-15-75.us-west-2.compute.internal
2017-11-05 19:00:40,002 INFO [main] com.amazon.ws.emr.hadoop.fs.EmrFileSystem: Created AmazonS3 with InstanceProfileCredentialsProvider
2017-11-05 19:00:41,168 INFO [main] org.apache.hadoop.mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
2017-11-05 19:00:41,713 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: s3://mrjob-a9c34337a7ce8845/tmp/email_counter_emr.jorgrech.20171105.185113.194241/files/test-100.warc:4284+68
2017-11-05 19:00:41,935 INFO [main] com.amazon.ws.emr.hadoop.fs.EmrFileSystem: Created AmazonS3 with InstanceProfileCredentialsProvider
2017-11-05 19:00:42,042 INFO [main] com.hadoop.compression.lzo.GPLNativeCodeLoader: Loaded native gpl library
2017-11-05 19:00:42,046 INFO [main] com.hadoop.compression.lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev c7d54fffe5a853c437ee23413ba71fc6af23c91d]
2017-11-05 19:00:42,073 INFO [main] com.amazon.ws.emr.hadoop.fs.EmrFileSystem: Opening 's3://mrjob-a9c34337a7ce8845/tmp/email_counter_emr.jorgrech.20171105.185113.194241/files/test-100.warc' for reading
2017-11-05 19:00:42,100 INFO [main] com.amazon.ws.emr.hadoop.fs.EmrFileSystem: Stream for key 'tmp/email_counter_emr.jorgrech.20171105.185113.194241/files/test-100.warc' seeking to position '4284'
2017-11-05 19:00:42,168 INFO [main] org.apache.hadoop.mapred.MapTask: numReduceTasks: 79
2017-11-05 19:00:42,174 INFO [main] org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2017-11-05 19:00:42,266 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 52428796(209715184)
2017-11-05 19:00:42,266 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 200
2017-11-05 19:00:42,266 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 167772160
2017-11-05 19:00:42,266 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 209715200
2017-11-05 19:00:42,266 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 52428796; length = 13107200
2017-11-05 19:00:42,299 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/usr/bin/python2.7, email_counter_emr.py, --step-num=0, --mapper, -r, emr]
2017-11-05 19:00:42,307 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start
2017-11-05 19:00:42,310 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2017-11-05 19:00:42,312 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length
2017-11-05 19:00:42,318 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir
2017-11-05 19:00:42,320 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
2017-11-05 19:00:42,708 INFO [Thread-15] org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done
2017-11-05 19:00:42,709 INFO [main] org.apache.hadoop.streaming.PipeMapRed: mapRedFinished
2017-11-05 19:00:42,714 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2017-11-05 19:00:42,727 INFO [main] org.apache.hadoop.io.compress.CodecPool: Got brand-new compressor [.snappy]
2017-11-05 19:00:44,215 INFO [main] org.apache.hadoop.mapred.Task: Task:attempt_1509908316427_0001_m_000063_0 is done. And is in the process of committing
2017-11-05 19:00:45,073 INFO [main] org.apache.hadoop.mapred.Task: Task 'attempt_1509908316427_0001_m_000063_0' done.
2017-11-05 19:00:45,175 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Stopping MapTask metrics system...
2017-11-05 19:00:45,176 INFO [file] org.apache.hadoop.metrics2.impl.MetricsSinkAdapter: file thread interrupted.
2017-11-05 19:00:45,176 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system stopped.
2017-11-05 19:00:45,176 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system shutdown complete.

the differences to the other syslog files seem to start in the line before the first "thread" log (here Thread-15). The file for task 000003 has the following output:

...
2017-11-05 19:00:41,090 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
2017-11-05 19:00:41,133 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2017-11-05 19:00:51,149 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: Records R/W=1/1
2017-11-05 19:01:04,131 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: Records R/W=1/1277
...
sebastian-nagel commented 7 years ago

No, that's not the right log file, it belongs to the container ID container_1509908316427_0001_01_000065 not the task attempt attempt_1509908316427_0001_m_000065_1.

joergrech commented 7 years ago

Can you give me a hint where I can find this log? I can find only container dirs under "task-attempts". In s3 there is the following directory structure:

daemons
- (different nodes with hadoop and yarn logs)
node
- (different nodes with bootstrap action logs)
steps
- 1
  - controller
  - stderr
  - stdout
  - syslog
task-attempts
- application_...
  - container_1509908316427_0001_01_000001 ... 000251
sebastian-nagel commented 7 years ago

On Hadoop YARN the first container runs the ApplicationMaster, in it's syslog (should be task-attempts/application_.../container_*_0001_01_000001/syslog) the mapping between task attempts and container ID, e.g.:

Assigned container container_..._0001_01_000002 to attempt_..._0001_m_000000_0

It maybe easier to use the YARN webui or command-line yarn logs ... to access the log files.

joergrech commented 7 years ago

Ok, good to know. It's a 75 MB file so here are the lines regarding attempt_1509908316427_0001_m_000065_1:

2017-11-05 19:21:24,544 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1509908316427_0001_m_000065_1 TaskAttempt Transitioned from NEW to UNASSIGNED
2017-11-05 19:21:24,544 INFO [Thread-80] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Added attempt_1509908316427_0001_m_000065_1 to list of failed maps
2017-11-05 19:25:24,696 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned container container_1509908316427_0001_01_000246 to attempt_1509908316427_0001_m_000065_1
2017-11-05 19:25:24,701 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1509908316427_0001_m_000065_1 TaskAttempt Transitioned from UNASSIGNED to ASSIGNED
2017-11-05 19:25:24,706 INFO [ContainerLauncher #8] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Processing the event EventType: CONTAINER_REMOTE_LAUNCH for container container_1509908316427_0001_01_000246 taskAttempt attempt_1509908316427_0001_m_000065_1
2017-11-05 19:25:24,706 INFO [ContainerLauncher #8] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Launching attempt_1509908316427_0001_m_000065_1
2017-11-05 19:25:24,713 INFO [ContainerLauncher #8] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Shuffle port returned by ContainerManager for attempt_1509908316427_0001_m_000065_1 : 13562
2017-11-05 19:25:24,713 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: TaskAttempt: [attempt_1509908316427_0001_m_000065_1] using containerId: [container_1509908316427_0001_01_000246 on NM: [ip-172-31-15-75.us-west-2.compute.internal:9103]
2017-11-05 19:25:24,713 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1509908316427_0001_m_000065_1 TaskAttempt Transitioned from ASSIGNED to RUNNING
2017-11-05 19:25:26,111 INFO [IPC Server handler 6 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: JVM with ID: jvm_1509908316427_0001_m_000246 given task: attempt_1509908316427_0001_m_000065_1
2017-11-05 19:25:29,970 INFO [IPC Server handler 29 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Ping from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:25:33,018 INFO [IPC Server handler 51 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Status update from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:25:33,019 INFO [IPC Server handler 51 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Progress of TaskAttempt attempt_1509908316427_0001_m_000065_1 is : 0.667
2017-11-05 19:25:36,440 INFO [IPC Server handler 34 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Status update from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:25:36,440 INFO [IPC Server handler 34 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Progress of TaskAttempt attempt_1509908316427_0001_m_000065_1 is : 0.667
... more status updates and progress
2017-11-05 19:28:59,064 INFO [IPC Server handler 28 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Status update from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:28:59,064 INFO [IPC Server handler 28 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Progress of TaskAttempt attempt_1509908316427_0001_m_000065_1 is : 0.667
2017-11-05 19:29:02,081 INFO [IPC Server handler 32 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Status update from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:29:02,081 INFO [IPC Server handler 32 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Progress of TaskAttempt attempt_1509908316427_0001_m_000065_1 is : 0.667
2017-11-05 19:29:05,082 INFO [IPC Server handler 35 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Ping from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:29:08,083 INFO [IPC Server handler 49 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Ping from attempt_1509908316427_0001_m_000065_1
... more pings
2017-11-05 19:45:14,428 INFO [IPC Server handler 43 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Ping from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:45:17,430 INFO [IPC Server handler 58 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Ping from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:45:17,920 INFO [IPC Server handler 45 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Status update from attempt_1509908316427_0001_m_000065_1
2017-11-05 19:45:17,920 INFO [IPC Server handler 45 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Progress of TaskAttempt attempt_1509908316427_0001_m_000065_1 is : 0.667
2017-11-05 19:45:17,921 FATAL [IPC Server handler 56 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1509908316427_0001_m_000065_1 - exited : java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
2017-11-05 19:45:17,921 INFO [IPC Server handler 56 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Diagnostics report from attempt_1509908316427_0001_m_000065_1: Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
2017-11-05 19:45:17,921 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report from attempt_1509908316427_0001_m_000065_1: Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
2017-11-05 19:45:17,922 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1509908316427_0001_m_000065_1 TaskAttempt Transitioned from RUNNING to FAIL_CONTAINER_CLEANUP
2017-11-05 19:45:17,922 INFO [ContainerLauncher #4] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Processing the event EventType: CONTAINER_REMOTE_CLEANUP for container container_1509908316427_0001_01_000246 taskAttempt attempt_1509908316427_0001_m_000065_1
2017-11-05 19:45:17,922 INFO [ContainerLauncher #4] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: KILLING attempt_1509908316427_0001_m_000065_1
2017-11-05 19:45:17,929 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1509908316427_0001_m_000065_1 TaskAttempt Transitioned from FAIL_CONTAINER_CLEANUP to FAIL_TASK_CLEANUP
2017-11-05 19:45:17,929 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1509908316427_0001_m_000065_1 TaskAttempt Transitioned from FAIL_TASK_CLEANUP to FAILED
2017-11-05 19:45:19,418 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report from attempt_1509908316427_0001_m_000065_1: Container killed by the ApplicationMaster.
sebastian-nagel commented 7 years ago

Yes, but the reason for the failure should be in one of the container logs running attempt 000065_1. Could you check .../container_1509908316427_0001_01_000246/{syslog,stderr,stdout}? I would expect a Python exception logged there.

joergrech commented 7 years ago

And here are the exceptions from hadoop in the syslog file:

2017-11-05 19:45:17,921 FATAL [IPC Server handler 56 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1509908316427_0001_m_000065_1 - exited : java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:433)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

2017-11-05 19:45:17,921 INFO [IPC Server handler 56 on 58077] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Diagnostics report from attempt_1509908316427_0001_m_000065_1: Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:433)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

2017-11-05 19:45:17,921 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report from attempt_1509908316427_0001_m_000065_1: Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:433)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

the FATAL one looks like to be access control related.

joergrech commented 7 years ago

Sorry, missed your reply. Here is the Stderr for 246:

Traceback (most recent call last):
  File "email_counter_emr.py", line 99, in <module>
    EmailCounter.run()
  File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 424, in run
    mr_job.execute()
  File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 433, in execute
    self.run_mapper(self.options.step_num)
  File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 517, in run_mapper
    for out_key, out_value in mapper(key, value) or ():
  File "email_counter_emr.py", line 92, in mapper
    for key, value in self.process_record(record):
  File "email_counter_emr.py", line 45, in process_record
    payload = record.payload.read()
  File "/usr/lib/python2.7/site-packages/warc/utils.py", line 59, in read
    return self._read(self.length)
  File "/usr/lib/python2.7/site-packages/warc/utils.py", line 69, in _read
    content = self.buf + self.fileobj.read(size)
  File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 67, in read
    result = super(GzipStreamFile, self).read(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 48, in readinto
    data = self.read(len(b))
  File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 43, in read
    return self.read(size)
  File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 43, in read
    return self.read(size)
  File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 43, in read
    return self.read(size)
  File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 43, in read
    return self.read(size)
  File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 43, in read
    return self.read(size)
  File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 38, in read
    raw = self.stream.read(io.DEFAULT_BUFFER_SIZE)
  File "/usr/lib/python2.7/site-packages/boto/s3/key.py", line 412, in read
    data = self.resp.read(size)
  File "/usr/lib/python2.7/site-packages/boto/connection.py", line 413, in read
    return http_client.HTTPResponse.read(self, amt)
  File "/usr/lib64/python2.7/httplib.py", line 567, in read
    s = self.fp.read(amt)
  File "/usr/lib64/python2.7/socket.py", line 380, in read
    data = self._sock.recv(left)
  File "/usr/lib64/python2.7/ssl.py", line 246, in recv
    return self.read(buflen)
  File "/usr/lib64/python2.7/ssl.py", line 165, in read
    return self._sslobj.read(len)
socket.error: [Errno 104] Connection reset by peer
sebastian-nagel commented 7 years ago

Ok, the reason seems to be a network error while reading from S3. This may happen infrequently (and then does not harm because the task attempt is retried), but I recommend to try the boto3 update (#21): the file is first downloaded and buffered as temp file which prevents network errors while reading/processing.

joergrech commented 7 years ago

Great! I copied the changes into my files and it works without a problem on 100 files! I will check on the full corpora.

Many thanks!

sebastian-nagel commented 7 years ago

Better run it on chunks of 10-20% of a monthly archive and merge the results later. That's trivial for counting email addresses, loosing a job after 90% is done is the worse experience.

joergrech commented 7 years ago

Sadly, my MR job did not go through with 10000 lines from warc.paths - it had one task that failed three times and stopped after 6 hours. I'm wondering if this might be related to the region - I'm starting the Cluster in us-east-2 (Oregon) and the delay might be a problem (I've read the commoncrawl resides in us-east-1). I tried to start the Cluster in us-east-1 but could not get it working. If I set it in mrjob.conf I get an XML problem that is probably caused by boto3 (see https://github.com/boto/boto3/issues/125):

botocore.exceptions.ClientError: An error occurred (MalformedXML) when calling the CreateBucket operation: The XML you provided was not well-formed or did not validate against our published schema

I tried to set the region via aws configure set default.region us-east-1 but that does not work - it looks like the default is us-east-2 (Oregon). Is there another way to configure the region? Have you ever run an EMR cluster in us-east-1?

sebastian-nagel commented 7 years ago

I would first check why this task failed - web data is weired and might trigger unexpected bugs. Yes, run the processing in us-east-1, it should be faster and otherwise you're charged for the inter-region data transfer. Default region via aws configure: that only affects the aws cli tools. Afaik, the settings in ~/.aws/config are not used by boto/boto3.

kleneway commented 6 years ago

Any update on this? I'm also having trouble running on us-east-1 (getting the same botocore.exceptions... error)

sebastian-nagel commented 6 years ago

The underlying mrjob has been changed significantly during the last months, the mrjob.conf needs a complete overhaul. I hope to get this done including testing soon. Ev. you can avoid avoid the CreateBucket error by using an already existing bucket in the us-east-1 region, see http://mrjob.readthedocs.io/en/stable/guides/emr-opts.html#option-cloud_tmp_dir.

sebastian-nagel commented 6 years ago

The mrjob.conf is updated to use the us-east-1 region and explicitly define cloud_tmp_dir to avoid the An error occurred (MalformedXML) when calling the CreateBucket operation error.

I've verified the update configuration by running

python tag_counter_emr.py -r emr --conf-path mrjob.conf --no-output --output-dir s3://my-output-bucket/cc-mrjob-test/ input/test-100.warc

as described in the README. Output dir and cloud_tmp_dir have been set appropriately: bucket with write permissions, output dir does not exist. Thanks!