streamsets / tutorials

StreamSets Tutorials
Apache License 2.0
348 stars 192 forks source link

Streamsets pipeline breaks if the number of fields exceeds the original field #117

Closed tommy24b closed 4 years ago

tommy24b commented 4 years ago

Pipeline Status: RUNNING_ERROR: For input string: "0." java.lang.NumberFormatException: For input string: "0." at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at oracle.sql.NUMBER.toBytes(NUMBER.java:1904) at oracle.sql.NUMBER.(NUMBER.java:287) at oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical(OraclePreparedStatement.java:8156) at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:7995) at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:8559) at oracle.jdbc.driver.OraclePreparedStatementWrapper.setObject(OraclePreparedStatementWrapper.java:225) at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.setObject(HikariProxyPreparedStatement.java) at com.streamsets.pipeline.lib.jdbc.JdbcBaseRecordWriter.setParamsToStatement(JdbcBaseRecordWriter.java:532) at com.streamsets.pipeline.lib.jdbc.JdbcGenericRecordWriter.setParameters(JdbcGenericRecordWriter.java:292) at com.streamsets.pipeline.lib.jdbc.JdbcGenericRecordWriter.processQueue(JdbcGenericRecordWriter.java:213) at com.streamsets.pipeline.lib.jdbc.JdbcGenericRecordWriter.write(JdbcGenericRecordWriter.java:159) at com.streamsets.pipeline.lib.jdbc.JdbcGenericRecordWriter.writeBatch(JdbcGenericRecordWriter.java:114) at com.streamsets.pipeline.lib.jdbc.JdbcUtil.write(JdbcUtil.java:1114) at com.streamsets.pipeline.stage.destination.jdbc.JdbcTarget.write(JdbcTarget.java:276) at com.streamsets.pipeline.stage.destination.jdbc.JdbcTarget.write(JdbcTarget.java:257) at com.streamsets.pipeline.api.base.configurablestage.DTarget.write(DTarget.java:34) at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:303) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:244) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:311) at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:220) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.processPipe(ProductionPipelineRunner.java:850) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.lambda$executeRunner$3(ProductionPipelineRunner.java:894) at com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:221) at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:142) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:893) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:871) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:599) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:390) at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:516) at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:112) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:75) at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:720) at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:151) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226) at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226) at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

tommy24b commented 4 years ago

image

tommy24b commented 4 years ago

I'm using stream sets to load a file into database which updates continuously. I'm running into a error where fields are mismatched with fields i specified in the JDBC producer.

below is my sample file tail

date=202002228, address="71.110.2.144", duration=10ms, url=google.com

in my pipeline in Field Replacer1, i'm using this

**/text ${str:replaceAll(record:value('/text'),"address=",',')} ==> this will remove the address= expression /text ${str:split(record:value('/text'), ',')} ===> this will split the file in fields Field Replacer2

/text[2] ${str:replaceAll(record:value('/text[2]'),"\"",'')} ==> this will take care of double quotes**

the issue is sometimes address have me two ips inthe file tail and that breaks the pipeline. How do i skip the address field if have two ip addresses as following

date=202002228, address="71.110.2.144, 71.110.4.174", duration=10ms, url=google.com

I'm using community version streamsets-datacollector-all-3.13.0. please, advice if this can be done in better way. I using data format as text

metadaddy commented 4 years ago

Take a look at the Stream Selector to send records along a different path depending on a condition - for instance, if a field contains a comma, you could process it differently.

We have several community channels available for asking general questions - please take a look at https://streamsets.com/community/

Closing the issue, since it doesn't relate to the tutorials.