google / fhir-data-pipes

A collection of tools for extracting FHIR resources and analytics services on top of that data.
https://google.github.io/fhir-data-pipes/
Apache License 2.0
151 stars 84 forks source link

Task Fhir Resource failing with "Composite converter must be given a single composite element, received: uuid" #912

Closed cchigoriwa closed 8 months ago

cchigoriwa commented 9 months ago

With Task Resource added to "resourceList", I tried to run the full pipeline and incremental pipeline but this is failing producing the following exception:

java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Pipeline execution failed at com.google.fhir.analytics.EtlUtils.runMultiplePipelines(EtlUtils.java:136) at com.google.fhir.analytics.EtlUtils.runMultiplePipelinesWithTimestamp(EtlUtils.java:76) at com.google.fhir.analytics.PipelineManager$PipelineThread.run(PipelineManager.java:554) Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Pipeline execution failed at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at com.google.fhir.analytics.EtlUtils.runMultiplePipelines(EtlUtils.java:123) ... 2 more Caused by: java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:112) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:321) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:307) at com.google.fhir.analytics.EtlUtils$PipelineSupplier.get(EtlUtils.java:157) at com.google.fhir.analytics.EtlUtils$PipelineSupplier.get(EtlUtils.java:147) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) at akka.dispatch.OnComplete.internal(Future.scala:300) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) at jdk.internal.reflect.GeneratedMethodAccessor64.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 5 more Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: Exception in chained task 'FlatMap (FlatMap at JdbcIO fetch for Task/CoGroupByKey/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult).output)' at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkNonMergingReduceFunction.reduce(FlinkNonMergingReduceFunction.java:103) at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:120) at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:145) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:519) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: Exception in chained task 'FlatMap (FlatMap at JdbcIO fetch for Task/CoGroupByKey/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult).output)' at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:81) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:221) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275) at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192) Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: Exception in chained task 'FlatMap (FlatMap at JdbcIO fetch for Task/Join Resource and Tag Collections/ParMultiDo(Anonymous).output)' at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at com.google.fhir.analytics.JdbcFetchHapi$FetchRowsJdbcIo$3$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:60) at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:35) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:221) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275) at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkNonMergingReduceFunction.reduce(FlinkNonMergingReduceFunction.java:103) at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:120) at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:145) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:519) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: Exception in chained task 'FlatMap (FlatMap at JdbcIO fetch for Task/Join Resource and Tag Collections/ParMultiDo(Anonymous).output)' at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:81) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:221) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275) at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411) at com.google.fhir.analytics.JdbcFetchHapi$FetchRowsJdbcIo$3.process(JdbcFetchHapi.java:293) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Composite converter must be given a single composite element, received: uuid at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at com.google.fhir.analytics.ConvertResourceFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:60) at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:35) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:221) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275) at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411) at com.google.fhir.analytics.JdbcFetchHapi$FetchRowsJdbcIo$3.process(JdbcFetchHapi.java:293) at com.google.fhir.analytics.JdbcFetchHapi$FetchRowsJdbcIo$3$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:60) at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:35) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:221) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275) at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn.processElement(CoGroupByKey.java:192) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructCoGbkResultFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.FlinkNonMergingReduceFunction.reduce(FlinkNonMergingReduceFunction.java:103) at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:120) at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:145) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:519) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.IllegalArgumentException: Composite converter must be given a single composite element, received: uuid at com.cerner.bunsen.definitions.HapiCompositeConverter.toHapiConverter(HapiCompositeConverter.java:264) at com.cerner.bunsen.definitions.HapiChoiceConverter.toHapiConverter(HapiChoiceConverter.java:105) at com.cerner.bunsen.definitions.HapiCompositeConverter.lambda$toHapiConverter$1(HapiCompositeConverter.java:354) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at com.cerner.bunsen.definitions.HapiCompositeConverter.toHapiConverter(HapiCompositeConverter.java:365) at com.cerner.bunsen.avro.converters.DefinitionToAvroVisitor$MultiValuedToAvroConverter.toHapiConverter(DefinitionToAvroVisitor.java:377) at com.cerner.bunsen.definitions.HapiCompositeConverter.lambda$toHapiConverter$1(HapiCompositeConverter.java:354) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at com.cerner.bunsen.definitions.HapiCompositeConverter.toHapiConverter(HapiCompositeConverter.java:365) at com.cerner.bunsen.avro.AvroConverter.(AvroConverter.java:32) at com.cerner.bunsen.avro.AvroConverter.visitResource(AvroConverter.java:89) at com.cerner.bunsen.avro.AvroConverter.forResource(AvroConverter.java:155) at com.cerner.bunsen.avro.AvroConverter.forResource(AvroConverter.java:133) at com.google.fhir.analytics.ParquetUtil.getConverter(ParquetUtil.java:184) at com.google.fhir.analytics.ParquetUtil.getResourceSchema(ParquetUtil.java:273) at com.google.fhir.analytics.ParquetUtil.createWriter(ParquetUtil.java:220) at com.google.fhir.analytics.ParquetUtil.write(ParquetUtil.java:235) at com.google.fhir.analytics.ConvertResourceFn.writeResource(ConvertResourceFn.java:145) at com.google.fhir.analytics.ConvertResourceFn.processElement(ConvertResourceFn.java:197)

cchigoriwa commented 9 months ago

I think uuid was omitted from "fhir-data-pipes::bunsen::bunsen-core::com.cerner.bunsen.definitions.StructureDefinitions.PRIMITIVE_TYPES". By adding uuid to PRIMITE_TYPES this error disappears.

I rebuilt the entire project by running

mvn clean install.

I then run

java -cp fhir-data-pipes/pipelines/batch/target/batch-bundled.jar  \
    com.google.fhir.analytics.FhirEtl \
    --fhirServerUrl=http://localhost:8091/fhir \
    --outputParquetPath=/tmp/TEST/ \
    --resourceList=Task

The error disappears. @bashir2 @pmanko @fredhersch @gdevanla @rajeshvelicheti @cchigoriwa @gitcliff @ibacher @bashir2 @mozzy11 @omarismail94 @kimaina @atulai-sg @williamito @rayyang29 @jecihjoy @LJNIC @vbothe23 @suriyan3 @tawfiqkhalilieh @fredhersch @gdevanla @abhia-dev @suruchee

bashir2 commented 9 months ago

Thanks @cchigoriwa for reporting this issue; if you drop --resourceList=Task does that work for you (without your uuid change)?

cchigoriwa commented 9 months ago

Hi @bashir2 If I drop --resourceList=Task (without my uuid change), it works for me. However, it only fetches and processes patient, encounter, and observation resources. I think these are default resources if you don't explicitly specify a resource list.   I encounter this issue when I include "task" in the resource list and there is at least one task resource in the source Fhir store.   Reference-> fhir-data-pipes:pipelines::batch::com.google.fhir.analytics.FhirEtlOptions.java

bashir2 commented 8 months ago

Sorry @cchigoriwa for the delay on this. I finally managed to spend some time on this and sent #937 for review. The fix is basically what you mentioned, the only difference is that I have done some related clean-ups and added unit-tests to prevent similar cases in the future. Are you interested in reviewing that PR? If you prefer to send a similar PR, I would be happy to review and merge that instead; we love to get community contributions. Once more, thanks for reporting this issue.

cchigoriwa commented 8 months ago

Hi, @bashir2. I appreciate your addressing the problem and submitting Pull Request #937 for review.

I'm glad to hear that the fix aligns with my suggestions. The additional clean-ups and including unit tests to prevent similar cases in the future are valuable improvements.

I would be more than happy to review the PR #937.

bashir2 commented 8 months ago

Okay great @cchigoriwa I just added you to the list of collaborators (you should have received an invite). I think once you accept that invitation you should be able to provide your review for that PR; thank you.