elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 986 forks source link

NPE in HeartBeat.java in development snapshot #148

Closed rbraley closed 10 years ago

rbraley commented 10 years ago

Not sure if I am missing something but I thought this error might be upstream. Here is how to repeat.

build.sbt

name := "elasticsearch-spark-example"

version := "1.0"

scalaVersion := "2.10.3"

libraryDependencies ++= Seq(
  "org.apache.spark"  %% "spark-core" % "0.9.0-incubating",
  "org.elasticsearch"  % "elasticsearch-hadoop" % "1.3.0.BUILD-SNAPSHOT"
)

resolvers ++= Seq(
  "Typesafe Sonatype Snapshots" at "http://typesafe.artifactoryonline.com/typesafe/sonatype-snapshots/",
  "Sonatype OSS" at "http://oss.sonatype.org/content/repositories/snapshots/",
  "Akka Repository" at "http://repo.akka.io/releases/",
  "Conjars" at "http://conjars.org/repo"
)

SimpleApp.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.JobConf
import org.elasticsearch.hadoop.mr.EsInputFormat

object SimpleApp {
  def main(args: Array[String]) {
    val conf = new JobConf()
    conf.set("es.resource", "demo/demo")

    val sc = new SparkContext("local", "Simple App")

    val data = sc.hadoopRDD(conf, classOf[EsInputFormat[String,String]], classOf[String], classOf[String], 6)

    data foreach println
  }
}

Here's the stacktrace

$ sbt run
[info] Loading project definition from /Users/rbraley/IdeaProjects/elasticsearch-spark-example/project
[info] Set current project to elasticsearch-spark-example (in build file:/Users/rbraley/IdeaProjects/elasticsearch-spark-example/)
[info] Updating {file:/Users/rbraley/IdeaProjects/elasticsearch-spark-example/}elasticsearch-spark-example...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Running io.traintracks.elasticsearch.spark.example.SimpleApp
14/02/24 17:28:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/02/24 17:28:00 INFO Remoting: Starting remoting
14/02/24 17:28:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.2.88:61423]
14/02/24 17:28:00 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.2.88:61423]
14/02/24 17:28:00 INFO spark.SparkEnv: Registering BlockManagerMaster
14/02/24 17:28:00 INFO storage.DiskBlockManager: Created local directory at /var/folders/rn/p2d7mh016b34qvm47jybmg380000gn/T/spark-local-20140224172800-6e2d
14/02/24 17:28:00 INFO storage.MemoryStore: MemoryStore started with capacity 890.9 MB.
14/02/24 17:28:00 INFO network.ConnectionManager: Bound socket to port 61424 with id = ConnectionManagerId(192.168.2.88,61424)
14/02/24 17:28:00 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/02/24 17:28:00 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager 192.168.2.88:61424 with 890.9 MB RAM
14/02/24 17:28:00 INFO storage.BlockManagerMaster: Registered BlockManager
14/02/24 17:28:00 INFO spark.HttpServer: Starting HTTP Server
14/02/24 17:28:00 INFO server.Server: jetty-7.6.8.v20121106
14/02/24 17:28:00 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:61425
14/02/24 17:28:00 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.2.88:61425
14/02/24 17:28:00 INFO spark.SparkEnv: Registering MapOutputTracker
14/02/24 17:28:00 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/rn/p2d7mh016b34qvm47jybmg380000gn/T/spark-bec0c9ef-0032-4894-a4cb-25da4a33e0b0
14/02/24 17:28:00 INFO spark.HttpServer: Starting HTTP Server
14/02/24 17:28:00 INFO server.Server: jetty-7.6.8.v20121106
14/02/24 17:28:00 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:61426
14/02/24 17:28:01 INFO server.Server: jetty-7.6.8.v20121106
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/02/24 17:28:01 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/02/24 17:28:01 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
14/02/24 17:28:01 INFO ui.SparkUI: Started Spark Web UI at http://192.168.2.88:4040
2014-02-24 17:28:01.361 java[80822:6313] Unable to load realm info from SCDynamicStore
14/02/24 17:28:01 INFO storage.MemoryStore: ensureFreeSpace(32969) called with curMem=0, maxMem=934163251
14/02/24 17:28:01 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.2 KB, free 890.9 MB)
14/02/24 17:28:01 INFO mr.EsInputFormat: Discovered mapping {demo=[test=STRING]} for [demo/demo]
14/02/24 17:28:01 INFO mr.EsInputFormat: Created [5] shard-splits
14/02/24 17:28:01 INFO spark.SparkContext: Starting job: foreach at SimpleApp.scala:18
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Got job 0 (foreach at SimpleApp.scala:18) with 5 output partitions (allowLocal=false)
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Final stage: Stage 0 (foreach at SimpleApp.scala:18)
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Missing parents: List()
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Submitting Stage 0 (HadoopRDD[0] at hadoopRDD at SimpleApp.scala:16), which has no missing parents
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Submitting 5 missing tasks from Stage 0 (HadoopRDD[0] at hadoopRDD at SimpleApp.scala:16)
14/02/24 17:28:01 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 5 tasks
14/02/24 17:28:01 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/02/24 17:28:01 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2705 bytes in 5 ms
14/02/24 17:28:01 INFO executor.Executor: Running task ID 0
14/02/24 17:28:01 INFO storage.BlockManager: Found block broadcast_0 locally
14/02/24 17:28:01 INFO rdd.HadoopRDD: Input split: ShardInputSplit [node=[tXu2eOa5SsS3xwBId-m81A/Magnum I|127.0.0.1:9200],shard=0]
14/02/24 17:28:01 ERROR executor.Executor: Exception in task ID 0
java.lang.NullPointerException
    at org.elasticsearch.hadoop.mr.HeartBeat.<init>(HeartBeat.java:51)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.init(EsInputFormat.java:204)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.<init>(EsInputFormat.java:167)
    at org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.<init>(EsInputFormat.java:328)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:449)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:66)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 17:28:01 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/02/24 17:28:01 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 2705 bytes in 0 ms
14/02/24 17:28:01 INFO executor.Executor: Running task ID 1
14/02/24 17:28:01 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/02/24 17:28:01 INFO storage.BlockManager: Found block broadcast_0 locally
14/02/24 17:28:01 INFO rdd.HadoopRDD: Input split: ShardInputSplit [node=[tXu2eOa5SsS3xwBId-m81A/Magnum I|127.0.0.1:9200],shard=1]
14/02/24 17:28:01 ERROR executor.Executor: Exception in task ID 1
java.lang.NullPointerException
    at org.elasticsearch.hadoop.mr.HeartBeat.<init>(HeartBeat.java:51)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.init(EsInputFormat.java:204)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.<init>(EsInputFormat.java:167)
    at org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.<init>(EsInputFormat.java:328)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:449)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:66)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 17:28:01 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException
java.lang.NullPointerException
    at org.elasticsearch.hadoop.mr.HeartBeat.<init>(HeartBeat.java:51)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.init(EsInputFormat.java:204)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.<init>(EsInputFormat.java:167)
    at org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.<init>(EsInputFormat.java:328)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:449)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getRecordReader(EsInputFormat.java:66)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/02/24 17:28:01 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/02/24 17:28:01 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
14/02/24 17:28:01 INFO scheduler.TaskSchedulerImpl: Ignoring update with state RUNNING from TID 1 because its task set is gone
14/02/24 17:28:01 INFO scheduler.TaskSchedulerImpl: Ignoring update with state FAILED from TID 1 because its task set is gone
14/02/24 17:28:01 INFO scheduler.DAGScheduler: Failed to run foreach at SimpleApp.scala:18
[error] (run-main) org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.lang.NullPointerException)
org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.lang.NullPointerException)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[trace] Stack trace suppressed: run last compile:run for the full output.
14/02/24 17:28:02 INFO network.ConnectionManager: Selector thread was interrupted!
java.lang.RuntimeException: Nonzero exit code: 1
    at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 10 s, completed Feb 24, 2014 5:28:02 PM
costin commented 10 years ago

Hi,

The error stems due to the lack of a hadoop property that we use to identify the task running. I'm not sure whether Spark sets this or not - it looks like it sets internally when writing to Hadoop but I'm not clear about it being set when reading from Hadoop.

The master tries a different property and additionally gives a long error message with all the properties. Can you please post/upload the properties somewhere so I can take a better look at them?

Thanks.

fedesilva commented 10 years ago

Here is a properties dump from my environment as described in #144

14/02/24 16:55:58 ERROR EsInputFormat: Cannot determine task id - current properties are {mapred.task.cache.levels=2, ha.failover-controller.cli-check.rpc-timeout.ms=20000, mapred.job.restart.recover=true, ipc.client.connect.max.retries.on.timeouts=45, map.sort.class=org.apache.hadoop.util.QuickSort, hadoop.tmp.dir=/tmp/hadoop-${user.name}, es.internal.mr.target.resource=logstash-batanga-radio-2014.02.18/logs, ha.health-monitor.check-interval.ms=1000, ipc.client.idlethreshold=4000, mapred.system.dir=${hadoop.tmp.dir}/mapred/system, kfs.blocksize=67108864, fs.trash.checkpoint.interval=0, mapred.job.tracker.persist.jobstatus.hours=0, io.skip.checksum.errors=false, mapred.cluster.reduce.memory.mb=-1, mapred.child.tmp=./tmp, es.internal.es.version=0.90.5, mapred.skip.reduce.max.skip.groups=0, mapred.heartbeats.in.second=100, mapred.jobtracker.instrumentation=org.apache.hadoop.mapred.JobTrackerMetricsInst, mapred.tasktracker.dns.nameserver=default, fs.defaultFS=file:///, io.sort.factor=10, mapred.task.timeout=600000, mapred.max.tracker.failures=4, hadoop.rpc.socket.factory.class.default=org.apache.hadoop.net.StandardSocketFactory, mapred.job.tracker.jobhistory.lru.cache.size=5, kfs.replication=3, mapred.skip.map.auto.incr.proc.count=true, mapreduce.job.complete.cancel.delegation.tokens=true, io.mapfile.bloom.size=1048576, hadoop.rpc.protection=authentication, es.query=

{      
  "query": {
     "match_all": {}
  },
  "fields": [
    "btng.listenerdjid"
  ]
}
, mapreduce.reduce.shuffle.connect.timeout=180000, hadoop.ssl.require.client.cert=false, hadoop.skip.worker.version.check=false, tasktracker.http.threads=40, mapred.job.shuffle.merge.percent=0.66, io.bytes.per.checksum=512, mapred.output.compress=false, mapred.healthChecker.script.timeout=600000, file.stream-buffer-size=4096, ha.failover-controller.new-active.rpc-timeout.ms=60000, mapred.reduce.slowstart.completed.maps=0.05, mapred.reduce.max.attempts=4, es.ser.reader.value.class=org.elasticsearch.hadoop.mr.WritableValueReader, ha.zookeeper.acl=world:anyone:rwcda, mapreduce.ifile.readahead.bytes=4194304, fs.ftp.host.port=21, mapred.skip.map.max.skip.records=0, kfs.client-write-packet-size=65536, kfs.bytes-per-checksum=512, mapred.cluster.map.memory.mb=-1, hadoop.security.group.mapping=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback, hadoop.ssl.keystores.factory.class=org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory, s3.replication=3, net.topology.node.switch.mapping.impl=org.apache.hadoop.net.ScriptBasedMapping, mapred.job.tracker.persist.jobstatus.dir=/jobtracker/jobsInfo, fs.s3.buffer.dir=${hadoop.tmp.dir}/s3, job.end.retry.attempts=0, s3native.bytes-per-checksum=512, mapred.local.dir.minspacestart=0, mapred.output.compression.type=RECORD, s3.client-write-packet-size=65536, io.mapfile.bloom.error.rate=0.005, ftp.bytes-per-checksum=512, mapred.cluster.max.reduce.memory.mb=-1, mapred.max.tracker.blacklists=4, mapred.task.profile.maps=0-2, hadoop.security.group.mapping.ldap.search.attr.group.name=cn, mapred.userlog.retain.hours=24, ha.health-monitor.rpc-timeout.ms=45000, mapred.job.tracker.persist.jobstatus.active=false, hadoop.security.authorization=false, local.cache.size=10737418240, s3.bytes-per-checksum=512, mapreduce.shuffle.ssl.enabled=${hadoop.ssl.enabled}, mapred.min.split.size=0, mapred.map.tasks=2, mapred.child.java.opts=-Xmx200m, mapred.map.child.log.level=INFO, mapred.job.queue.name=default, mapred.job.tracker.retiredjobs.cache.size=1000, ipc.server.listen.queue.size=128, mapred.inmem.merge.threshold=1000, job.end.retry.interval=30000, mapred.skip.attempts.to.start.skipping=2, s3native.blocksize=67108864, mapred.reduce.tasks=1, mapred.merge.recordsBeforeProgress=10000, mapred.userlog.limit.kb=0, file.replication=1, mapred.job.reduce.memory.mb=-1, ftp.client-write-packet-size=65536, hadoop.work.around.non.threadsafe.getpwuid=false, mapred.job.shuffle.input.buffer.percent=0.70, io.sort.spill.percent=0.80, mapreduce.shuffle.ssl.port=50443, hadoop.http.staticuser.user=dr.who, mapred.map.tasks.speculative.execution=true, hadoop.http.authentication.type=simple, hadoop.util.hash.type=murmur, hadoop.security.instrumentation.requires.admin=false, mapred.map.max.attempts=4, mapreduce.job.acl-view-job= , mapreduce.ifile.readahead=true, io.map.index.interval=128, mapred.job.tracker.handler.count=10, mapreduce.reduce.shuffle.read.timeout=180000, mapred.tasktracker.expiry.interval=600000, hadoop.ssl.client.conf=ssl-client.xml, mapred.reduce.child.log.level=INFO, mapred.jobtracker.maxtasks.per.job=-1, mapred.jobtracker.job.history.block.size=3145728, keep.failed.task.files=false, hadoop.kerberos.kinit.command=kinit, ipc.client.tcpnodelay=false, mapred.task.profile.reduces=0-2, fs.AbstractFileSystem.hdfs.impl=org.apache.hadoop.fs.Hdfs, mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec, io.map.index.skip=0, hadoop.http.authentication.token.validity=36000, ipc.server.tcpnodelay=false, hadoop.jetty.logs.serve.aliases=true, ftp.replication=3, ha.failover-controller.graceful-fence.connection.retries=1, jobclient.progress.monitor.poll.interval=1000, ha.health-monitor.sleep-after-disconnect.ms=1000, es.resource=logstash-batanga-radio-2014.02.18/logs, mapred.job.map.memory.mb=-1, file.client-write-packet-size=65536, mapred.reduce.tasks.speculative.execution=true, fs.AbstractFileSystem.viewfs.impl=org.apache.hadoop.fs.viewfs.ViewFs, hadoop.security.group.mapping.ldap.search.filter.group=(objectClass=group), mapreduce.tasktracker.outofband.heartbeat=false, mapreduce.reduce.input.limit=-1, fs.s3n.block.size=67108864, net.topology.script.number.args=100, dfs.ha.fencing.ssh.connect-timeout=30000, hadoop.security.authentication=simple, tfile.fs.output.buffer.size=262144, mapred.job.reuse.jvm.num.tasks=1, mapred.jobtracker.completeuserjobs.maximum=100, hadoop.security.groups.cache.secs=300, ha.failover-controller.graceful-fence.rpc-timeout.ms=5000, fs.AbstractFileSystem.file.impl=org.apache.hadoop.fs.local.LocalFs, mapred.task.tracker.task-controller=org.apache.hadoop.mapred.DefaultTaskController, ha.health-monitor.connect-retry-interval.ms=1000, kfs.stream-buffer-size=4096, fs.s3.maxRetries=4, mapred.cluster.max.map.memory.mb=-1, file.blocksize=67108864, mapreduce.reduce.shuffle.maxfetchfailures=10, fs.ftp.host=0.0.0.0, file.bytes-per-checksum=512, ha.zookeeper.parent-znode=/hadoop-ha, mapreduce.job.acl-modify-job= , mapred.local.dir=${hadoop.tmp.dir}/mapred/local, fs.s3.sleepTimeSeconds=10, fs.trash.interval=0, mapred.submit.replication=10, hadoop.relaxed.worker.version.check=true, mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec, mapred.tasktracker.dns.interface=default, ftp.stream-buffer-size=4096, mapred.job.tracker=local, hadoop.http.authentication.signature.secret.file=${user.home}/hadoop-http-auth-signature-secret, io.seqfile.sorter.recordlimit=1000000, s3.blocksize=67108864, mapreduce.tasktracker.cache.local.numberdirectories=10000, mapred.jobtracker.taskScheduler=org.apache.hadoop.mapred.JobQueueTaskScheduler, mapred.line.input.format.linespermap=1, fs.permissions.umask-mode=022, mapred.tasktracker.instrumentation=org.apache.hadoop.mapred.TaskTrackerMetricsInst, hadoop.ssl.server.conf=ssl-server.xml, mapreduce.jobtracker.split.metainfo.maxsize=10000000, jobclient.completion.poll.interval=5000, mapred.local.dir.minspacekill=0, s3native.stream-buffer-size=4096, io.sort.record.percent=0.05, hadoop.http.authentication.kerberos.principal=HTTP/_HOST@LOCALHOST, mapred.temp.dir=${hadoop.tmp.dir}/mapred/temp, mapred.tasktracker.reduce.tasks.maximum=2, mapred.tasktracker.tasks.sleeptime-before-sigkill=5000, mapred.job.reduce.input.buffer.percent=0.0, mapred.tasktracker.indexcache.mb=10, es.internal.hosts=, hadoop.security.group.mapping.ldap.search.filter.user=(&(objectClass=user)(sAMAccountName={0})), fs.automatic.close=true, mapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.MapTask$MapOutputBuffer, mapred.skip.reduce.auto.incr.proc.count=true, s3.stream-buffer-size=4096, ha.zookeeper.session-timeout.ms=5000, io.seqfile.compress.blocksize=1000000, hadoop.http.filter.initializers=org.apache.hadoop.http.lib.StaticUserWebFilter, fs.s3.block.size=67108864, mapred.tasktracker.taskmemorymanager.monitoring-interval=5000, hadoop.http.authentication.simple.anonymous.allowed=true, mapred.acls.enabled=false, mapred.queue.default.state=RUNNING, mapreduce.jobtracker.staging.root.dir=${hadoop.tmp.dir}/mapred/staging, ftp.blocksize=67108864, mapreduce.shuffle.ssl.address=0.0.0.0, mapred.queue.names=default, mapred.task.tracker.http.address=0.0.0.0:50060, mapred.disk.healthChecker.interval=60000, mapred.reduce.parallel.copies=5, io.seqfile.lazydecompress=true, hadoop.common.configuration.version=0.23.0, hadoop.ssl.enabled=false, hadoop.security.group.mapping.ldap.search.attr.member=member, io.sort.mb=100, ipc.client.connection.maxidletime=10000, mapred.task.tracker.report.address=127.0.0.1:0, mapred.compress.map.output=false, hadoop.security.uid.cache.secs=14400, mapred.healthChecker.interval=60000, ipc.client.kill.max=10, ipc.client.connect.max.retries=10, io.seqfile.local.dir=${hadoop.tmp.dir}/io/local, mapred.user.jobconf.limit=5242880, mapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapred.ReduceTask$ReduceCopier, io.native.lib.available=true, mapred.job.tracker.http.address=0.0.0.0:50030, io.file.buffer.size=4096, mapred.jobtracker.restart.recover=false, io.serializations=org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization, tfile.fs.input.buffer.size=262144, mapred.task.profile=false, hadoop.security.group.mapping.ldap.ssl=false, jobclient.output.filter=FAILED, fs.df.interval=60000, s3native.client-write-packet-size=65536, hadoop.http.authentication.kerberos.keytab=${user.home}/hadoop.keytab, s3native.replication=3, mapred.tasktracker.map.tasks.maximum=2, tfile.io.chunk.size=1048576, hadoop.ssl.hostname.verifier=DEFAULT}
costin commented 10 years ago

Thanks. I've pushed a fix to master which ignores the task id in heartbeat (since it's an optional thing). However it looks to me like a bug since several Hadoop properties should be present but they're not. I'll probably raise this on the Spark mailing list at some point (if you guys do that in the meantime it would be even better).

Either way, please try the latest master and let me know how it goes. Thanks!

costin commented 10 years ago

@fedesilva @rbraley guys, if you are available in 2h or so maybe we can connect on IRC to speed things up? Just in case there are still bugs, we can chat directly and I'll do my best to fix the issues as they appear.

Let me know if that works for you guys.

fedesilva commented 10 years ago

The last master fixes the NPE. I have succesfully iterated over an RDD.

I am now getting this, that does not seem to abort the tasks.

14/02/24 18:47:56 WARN HadoopRDD: Exception in RecordReader.close()
java.lang.NullPointerException
    at org.elasticsearch.hadoop.mr.ReportingUtils.report(ReportingUtils.java:38)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.close(EsInputFormat.java:274)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:174)
    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
    at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
    at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
fedesilva commented 10 years ago

Regarding IRC: Ok, I'm online in elasticsearch IRC channel right now as fedesilva and will be available until 23 GMT. I am GMT -3 (Montevideo).

costin commented 10 years ago

Can you please try the latest master. This should address the NPE (since the reporting facility seems to be disabled).

On 2/24/2014 8:53 PM, federico silva wrote:

The last master fixes the NPE. I have succesfully iterated over an RDD.

I am now getting this, that does not seem to abort the tasks.

14/02/24 18:47:56 WARN HadoopRDD: Exception in RecordReader.close() java.lang.NullPointerException at org.elasticsearch.hadoop.mr.ReportingUtils.report(ReportingUtils.java:38) at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.close(EsInputFormat.java:274) at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:174) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)

— Reply to this email directly or view it on GitHub https://github.com/elasticsearch/elasticsearch-hadoop/issues/148#issuecomment-35920939.

Costin

costin commented 10 years ago

Cool - see you in a bit (my nick is typically costin) :)

On 2/24/2014 8:57 PM, federico silva wrote:

Regarding IRC: Ok, I'm online in es IRC right now as fedesilva and will be available until 23 GMT. I am GMT -3 (Montevideo).

— Reply to this email directly or view it on GitHub https://github.com/elasticsearch/elasticsearch-hadoop/issues/148#issuecomment-35921488.

Costin

fedesilva commented 10 years ago

Ok, see you soon.

costin commented 10 years ago

As this is fixed, I'm closing the issue. Guys, please continue creating issues if you encounter any problems but do mention the new issue under #151

rbraley commented 10 years ago

Hi guys #152 is also related to this. I will hang in irc as rbraley for a while to try to resolve issues together.

On Tue, Feb 25, 2014 at 3:25 AM, Costin Leau notifications@github.comwrote:

Closed #148https://github.com/elasticsearch/elasticsearch-hadoop/issues/148 .

Reply to this email directly or view it on GitHubhttps://github.com/elasticsearch/elasticsearch-hadoop/issues/148 .

Ryan Braley | Founder http://traintracks.io/ http://www.traintracks.io/

US: +1 (206) 866 5661 CN: +86 156 1153 7598 Coding the future. Decoding the game.