mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

NullPointerExceptipn when calling postStop() #268

Closed SnipyJulmy closed 2 years ago

SnipyJulmy commented 2 years ago

Hi @mjakubowski84

I sometimes receive NullPointerException when the stream is failing for some reason and the postStop() function is called.

Here is a sample stackTrace:

Error during postStop in 
[ java.lang.NullPointerException at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:177) at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124) at 
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) at 
com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.$anonfun$postStop$1(ParquetPartitioningFlow.scala:378) at 
com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.$anonfun$postStop$1$adapted(ParquetPartitioningFlow.scala:376)  at 
scala.collection.immutable.Map$Map2.foreach(Map.scala:273) at 
com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.postStop(ParquetPartitioningFlow.scala:376)

Could it be related to this issue PARQUET-544? I presume no (resolved in 1.8.2/1.9.0 and parquet4s use 1.12.2), but the exception is almost the same. Is it possible that a writer is closed multiple times within the postStop() function?

mjakubowski84 commented 2 years ago

Hi @SnipyJulmy .

That's an interesting error. Are you sure that you do not have some older version of Parquet at your classpath? Because otherwise we have to report this error to Parquet.

mjakubowski84 commented 2 years ago

I checked the code of Parquet. It is possible that this NPE will be thrown when close function is called concurrently by multiple threads. If Akka does call postStop concurrently then this error may happen. We can workaround this by enforcing some synchronisation...

But actually this should not happen at all...

mjakubowski84 commented 2 years ago

PR #270 introduces sync by using concurrent map for tracking partition writers. I am not sure when Akka Streams performs concurrent calls to logic of the flow. Maybe it happens when async downstream fails.

SnipyJulmy commented 2 years ago

I've checked twice and I am pretty sure that the only hadoop-parquet dependency available is 1.12.2.

Also, this error only occured once when we encounter some network issue. We had this log before:

Error transferring data from DatanodeInfoWithStorage[...]

So it happens when it fails somewhere else before.

Thanks a lot for the quick answer 😉 !

mjakubowski84 commented 2 years ago

@SnipyJulmy Bugfix version of 2.5.1 is now released. Please check if all is fine.

Xoerk commented 2 years ago

HI, I noticed that you closed this issue, but I'm still facing the same problem, I have tried upgrading to 2.5.1

[2022-06-12 08:51:30,107] [INFO] [software.amazon.kinesis.coordinator.Scheduler] [] [PersistenceActorSystem-akka.actor.default-dispatcher-19] - Worker shutdown requested. MDC: {}
[2022-06-12 08:51:30,106] [INFO] [akka.stream.Materializer] [] [PersistenceActorSystem-akka.actor.default-dispatcher-19] - [kinesis-records] Downstream finished, cause: SubscriptionWithCancelException$StageWasCompleted$: null MDC: {akkaAddress=akka://PersistenceActorSystem, akkaUid=1971679635946598331, sourceThread=PersistenceActorSystem-akka.actor.default-dispatcher-17, akkaSource=akka.stream.Log(akka://PersistenceActorSystem/system/Materializers/StreamSupervisor-0), sourceActorSystem=PersistenceActorSystem, akkaTimestamp=08:51:30.105UTC}
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at kamon.instrumentation.executor.ExecutorInstrumentation$InstrumentedForkJoinPool$TimingRunnable.run(ExecutorInstrumentation.scala:691)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:480)
at akka.stream.impl.fusing.GraphInterpreter.afterStageHasRun(GraphInterpreter.scala:580)
at akka.stream.impl.fusing.GraphInterpreter.finalizeStage(GraphInterpreter.scala:600)
at com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.postStop(ParquetPartitioningFlow.scala:356)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:273)
at com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.$anonfun$postStop$1$adapted(ParquetPartitioningFlow.scala:356)
at com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.$anonfun$postStop$1(ParquetPartitioningFlow.scala:358)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:177)
java.lang.NullPointerException: null
[2022-06-12 08:51:30,099] [ERROR] [akka.actor.RepointableActorRef] [] [PersistenceActorSystem-akka.actor.default-dispatcher-19] - Error during postStop in [com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic]: null MDC: {akkaAddress=akka://PersistenceActorSystem, akkaUid=1971679635946598331, sourceThread=PersistenceActorSystem-akka.actor.default-dispatcher-17, akkaSource=akka://PersistenceActorSystem/system/Materializers/StreamSupervisor-0/flow-6-0-ignoreSink, sourceActorSystem=PersistenceActorSystem, akkaTimestamp=08:51:30.093UTC}
, blocksActive=0, blockUploadsCompleted=0, blocksAllocated=1, blocksReleased=1, blocksActivelyAllocated=0, transferDuration=0 ms, totalUploadDuration=0 ms, effectiveBandwidth=0.0 bytes/s} MDC: {}
means=((action_executor_acquired.mean=(samples=0, sum=0, mean=0.0000)) (op_abort.mean=(samples=0, sum=0, mean=0.0000)) (multipart_upload_completed.mean=(samples=0, sum=0, mean=0.0000)) (multipart_upload_completed.failures.mean=(samples=0, sum=0, mean=0.0000)) (object_multipart_aborted.failures.mean=(samples=0, sum=0, mean=0.0000)) (action_executor_acquired.failures.mean=(samples=0, sum=0, mean=0.0000)) (object_multipart_aborted.mean=(samples=0, sum=0, mean=0.0000)) (op_abort.failures.mean=(samples=0, sum=0, mean=0.0000)));
maximums=((op_abort.max=-1) (action_executor_acquired.max=-1) (multipart_upload_completed.failures.max=-1) (object_multipart_aborted.failures.max=-1) (multipart_upload_completed.max=-1) (action_executor_acquired.failures.max=-1) (object_multipart_aborted.max=-1) (op_abort.failures.max=-1));
minimums=((action_executor_acquired.min=-1) (action_executor_acquired.failures.min=-1) (object_multipart_aborted.min=-1) (multipart_upload_completed.failures.min=-1) (object_multipart_aborted.failures.min=-1) (multipart_upload_completed.min=-1) (op_abort.min=-1) (op_abort.failures.min=-1));
gauges=((stream_write_block_uploads_pending=1) (stream_write_block_uploads_data_pending=9500));
[2022-06-12 08:51:30,065] [WARN] [org.apache.hadoop.fs.s3a.S3AInstrumentation] [] [PersistenceActorSystem-akka.actor.default-dispatcher-17] - Closing output stream statistics while data is still marked as pending upload in OutputStreamStatistics{counters=((stream_write_queue_duration=0) (op_abort=0) (action_executor_acquired.failures=0) (multipart_upload_completed=0) (multipart_upload_completed.failures=0) (object_multipart_aborted=0) (op_hsync=0) (stream_write_total_data=0) (stream_write_total_time=0) (op_abort.failures=0) (stream_write_block_uploads=1) (op_hflush=0) (stream_write_exceptions_completing_upload=0) (stream_write_exceptions=0) (action_executor_acquired=0) (object_multipart_aborted.failures=0) (stream_write_bytes=9500));
[2022-06-12 08:51:30,017] [DEBUG] [org.apache.parquet.hadoop.InternalParquetRecordWriter] [] [PersistenceActorSystem-akka.actor.default-dispatcher-18] - Flushing mem columnStore to file. allocated memory: 4047 MDC: {}
[2022-06-12 08:51:29,842] [DEBUG] [org.apache.parquet.hadoop.InternalParquetRecordWriter] [] [PersistenceActorSystem-akka.actor.default-dispatcher-18] - Flushing mem columnStore to file. allocated memory: 21028 MDC: {}
[2022-06-12 08:51:29,837] [DEBUG] [org.apache.parquet.hadoop.InternalParquetRecordWriter] [] [PersistenceActorSystem-akka.actor.default-dispatcher-20] - Flushing mem columnStore to file. allocated memory: 564400 MDC: {}

SBT:

 val akkaVersion       = "2.6.19"
  val alpakkaVersion    = "3.0.4"
  val hadoopVersion     = "3.3.1"
  val parquet4sVersion  = "2.5.1"

Any other idea why such could happen?

mjakubowski84 commented 2 years ago

Hi @Xoerk ! Do you have a stack trace from 2.5.1? It can help to search for an issue. Because either synchronisation is faulty or there's just a bug in Parquet itself.

Xoerk commented 2 years ago

Hi @mjakubowski84, I have attached it in the previous comment, but here is another stack trace:

[2022-06-12 10:48:09,425] [DEBUG] [com.github.mjakubowski84.parquet4s.ParquetWriter$] [] [PersistenceActorSystem-akka.actor.default-dispatcher-19] - Resolved following schema to write Parquet to "s3a://company-data-dev/customer/20220612/aae3a5b7-324c-498b-9147-9d8cdb7262a2.snappy.parquet":
[2022-06-12 10:48:09,381] [INFO] [software.amazon.kinesis.coordinator.PeriodicShardSyncManager] [] [PersistenceActorSystem-akka.actor.default-dispatcher-20] - Shutting down periodic shard sync task scheduler on worker company-persistence-svc-68655ddb85-t8sfx:1a886f21-73de-4f60-bedf-334a4ad0bf90 MDC: {}
[2022-06-12 10:48:09,381] [INFO] [software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider] [] [PersistenceActorSystem-akka.actor.default-dispatcher-20] - Successfully stopped leader election on the worker MDC: {}
[2022-06-12 10:48:09,381] [INFO] [software.amazon.kinesis.coordinator.PeriodicShardSyncManager] [] [PersistenceActorSystem-akka.actor.default-dispatcher-20] - Shutting down leader decider on worker company-persistence-svc-68655ddb85-t8sfx:1a886f21-73de-4f60-bedf-334a4ad0bf90 MDC: {}
[2022-06-12 10:48:09,380] [INFO] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator] [] [PersistenceActorSystem-akka.actor.default-dispatcher-20] - Worker company-persistence-svc-68655ddb85-t8sfx:1a886f21-73de-4f60-bedf-334a4ad0bf90 has successfully stopped lease-tracking threads MDC: {}
[2022-06-12 10:48:09,379] [INFO] [software.amazon.kinesis.coordinator.Scheduler] [] [PersistenceActorSystem-akka.actor.default-dispatcher-20] - Worker shutdown requested. MDC: {}
[2022-06-12 10:48:09,332] [INFO] [akka.stream.Materializer] [] [PersistenceActorSystem-akka.actor.default-dispatcher-20] - [kinesis-records] Downstream finished, cause: SubscriptionWithCancelException$StageWasCompleted$: null MDC: {akkaAddress=akka://PersistenceActorSystem, akkaUid=2836857205326094466, sourceThread=PersistenceActorSystem-akka.actor.default-dispatcher-17, akkaSource=akka.stream.Log(akka://PersistenceActorSystem/system/Materializers/StreamSupervisor-0), sourceActorSystem=PersistenceActorSystem, akkaTimestamp=10:48:09.330UTC}
MDC: {}
}
optional binary updateType (STRING);
.....
message com.company.transform.ParquetEvent {
[2022-06-12 10:48:09,325] [DEBUG] [com.github.mjakubowski84.parquet4s.ParquetWriter$] [] [PersistenceActorSystem-akka.actor.default-dispatcher-19] - Resolved following schema to write Parquet to "s3a://company-data-dev/company-ops/20220612/339e458f-3ae9-43f0-9fdb-b1f7fb08440b.snappy.parquet":
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at kamon.instrumentation.executor.ExecutorInstrumentation$InstrumentedForkJoinPool$TimingRunnable.run(ExecutorInstrumentation.scala:691)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:480)
at akka.stream.impl.fusing.GraphInterpreter.afterStageHasRun(GraphInterpreter.scala:580)
at akka.stream.impl.fusing.GraphInterpreter.finalizeStage(GraphInterpreter.scala:600)
at com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.postStop(ParquetPartitioningFlow.scala:356)
at scala.collection.immutable.Map$Map4.foreach(Map.scala:493)
at com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.$anonfun$postStop$1$adapted(ParquetPartitioningFlow.scala:356)
at com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic.$anonfun$postStop$1(ParquetPartitioningFlow.scala:358)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:177)
java.lang.NullPointerException: null
[2022-06-12 10:48:09,320] [ERROR] [akka.actor.RepointableActorRef] [] [PersistenceActorSystem-akka.actor.default-dispatcher-19] - Error during postStop in [com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow$Logic]: null MDC: {akkaAddress=akka://PersistenceActorSystem, akkaUid=2836857205326094466, sourceThread=PersistenceActorSystem-akka.actor.default-dispatcher-20, akkaSource=akka://PersistenceActorSystem/system/Materializers/StreamSupervisor-0/flow-6-0-ignoreSink, sourceActorSystem=PersistenceActorSystem, akkaTimestamp=10:48:09.313UTC}
, blocksActive=0, blockUploadsCompleted=0, blocksAllocated=1, blocksReleased=1, blocksActivelyAllocated=0, transferDuration=0 ms, totalUploadDuration=0 ms, effectiveBandwidth=0.0 bytes/s} MDC: {}
means=((object_multipart_aborted.failures.mean=(samples=0, sum=0, mean=0.0000)) (action_executor_acquired.failures.mean=(samples=0, sum=0, mean=0.0000)) (action_executor_acquired.mean=(samples=0, sum=0, mean=0.0000)) (op_abort.failures.mean=(samples=0, sum=0, mean=0.0000)) (op_abort.mean=(samples=0, sum=0, mean=0.0000)) (multipart_upload_completed.failures.mean=(samples=0, sum=0, mean=0.0000)) (object_multipart_aborted.mean=(samples=0, sum=0, mean=0.0000)) (multipart_upload_completed.mean=(samples=0, sum=0, mean=0.0000)));
maximums=((op_abort.max=-1) (object_multipart_aborted.max=-1) (object_multipart_aborted.failures.max=-1) (multipart_upload_completed.max=-1) (op_abort.failures.max=-1) (action_executor_acquired.failures.max=-1) (multipart_upload_completed.failures.max=-1) (action_executor_acquired.max=-1));
minimums=((object_multipart_aborted.failures.min=-1) (op_abort.min=-1) (action_executor_acquired.failures.min=-1) (action_executor_acquired.min=-1) (multipart_upload_completed.failures.min=-1) (op_abort.failures.min=-1) (object_multipart_aborted.min=-1) (multipart_upload_completed.min=-1));
gauges=((stream_write_block_uploads_data_pending=9006) (stream_write_block_uploads_pending=1));
[2022-06-12 10:48:09,281] [WARN] [org.apache.hadoop.fs.s3a.S3AInstrumentation] [] [PersistenceActorSystem-akka.actor.default-dispatcher-20] - Closing output stream statistics while data is still marked as pending upload in OutputStreamStatistics{counters=((object_multipart_aborted=0) (object_multipart_aborted.failures=0) (op_hsync=0) (action_executor_acquired=0) (action_executor_acquired.failures=0) (op_abort.failures=0) (stream_write_exceptions_completing_upload=0) (stream_write_queue_duration=0) (stream_write_total_data=0) (op_hflush=0) (multipart_upload_completed.failures=0) (stream_write_total_time=0) (multipart_upload_completed=0) (stream_write_bytes=9006) (stream_write_exceptions=0) (op_abort=0) (stream_write_block_uploads=1));
MDC: {}
}
optional binary bridgeId (STRING);

do you need any other info? (scala 2.12.15)

SnipyJulmy commented 2 years ago

Hi all, sorry @mjakubowski84 for previous question, I wasn't following this carefully... We didn't get this issue anymore.

Could this be due to some network issues between the Writer and HDFS?

Or maybe the path is somehow null in postStop?

Xoerk commented 2 years ago

HI again, We Found the issue, not related to this package or Parquet.

We had a problem with permission on the s3 bucket, which didn't allow writing the parquet files.

I think we can close this issue.

marcinaylien commented 2 years ago

Yeah, but still postStop should not throw an NPE (NPE is thrown by debug log!) :) I am going to catch it and ignore it :)

mjakubowski84 commented 2 years ago

Fix released in 2.6.0