trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
9.85k stars 2.85k forks source link

Hive's CombineTextInputFormat #21842

Open sjdurfey opened 1 month ago

sjdurfey commented 1 month ago

I use the hive metastore pretty heavily and we currently have hundreds of tables that make use of the CombineTextInputFormat for the storage descriptor. I was recently trying to upgrade to Trino 445 from Trino 409 and found https://github.com/trinodb/trino/issues/15921 which mentions removing Hive dependencies in the code base. I spun it up and tried querying against my metastore and was getting errors like this when querying text based tables:

io.trino.spi.TrinoException: Unsupported storage format: mydatabase.mytable:<UNPARTITIONED> StorageFormat{serde=org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, inputFormat=org.apache.hadoop.mapred.lib.CombineTextInputFormat, outputFormat=org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat}

This error tracks with the code changes in the HiveStorageFormat and the HiveClassNames. Our hive tables are used to query with Trino but also run spark jobs against them. So, using the CombineTextInputFormat is ideal to help deal with the small file problem in spark. However, that means those tables can't be queried via Trino.

A few questions:

raunaqmorarka commented 1 month ago

cc: @dain @electrum @findinpath

electrum commented 1 month ago

This should be easy to fix. Try adding the following lines to the HIVE_STORAGE_FORMATS map in HiveStorageFormat:

.put(new SerdeAndInputFormat(LAZY_SIMPLE_SERDE_CLASS, "org.apache.hadoop.mapred.lib.CombineTextInputFormat"), TEXTFILE)
.put(new SerdeAndInputFormat(LAZY_SIMPLE_SERDE_CLASS, "org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat"), TEXTFILE)

If that works, then please send a PR.

Trino has never supported combined splits, so there should be no performance difference.

sjdurfey commented 1 month ago

I added that and I do get back data, however, the trino logs are flooded with exceptions. They are DEBUG, so, perhaps that is fine and normal. They don't show up with INFO logging. If these are fine, then I'll get out a PR for the change. Thanks!

2024-05-08T19:08:29.870-0400    DEBUG   task-notification-0 io.trino.execution.TaskStateMachine Task 20240508_230827_00000_ihhvj.1.0.0 is CANCELED
2024-05-08T19:08:29.870-0400    DEBUG   SplitRunner-25  io.trino.execution.executor.dedicated.SplitProcessor    Driver was interrupted
io.trino.spi.TrinoException: Driver was interrupted
    at io.trino.operator.Driver.lambda$process$8(Driver.java:327)
    at io.trino.operator.Driver.tryWithLock(Driver.java:709)
    at io.trino.operator.Driver.process(Driver.java:298)
    at io.trino.operator.Driver.processForDuration(Driver.java:269)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
    at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:76)
    at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
    at io.trino.$gen.Trino_dev____20240508_230700_2.run(Unknown Source)
    at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
    at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:174)
    at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)
    Suppressed: io.trino.spi.TrinoException: Error opening Hive split s3://mys3bucket/<some part path>/part-00000-451f2b6b-005a-44f9-aaa1-4d4e979cf644.c000.gz (offset=0, length=1114348): Read of file s3://mys3bucket/<same part path>/part-00000-451f2b6b-005a-44f9-aaa1-4d4e979cf644.c000.gz failed: null
        at io.trino.plugin.hive.line.LinePageSourceFactory.createPageSource(LinePageSourceFactory.java:156)
        at io.trino.plugin.hive.HivePageSourceProvider.createHivePageSource(HivePageSourceProvider.java:200)
        at io.trino.plugin.hive.HivePageSourceProvider.createPageSource(HivePageSourceProvider.java:136)
        at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:48)
        at io.trino.split.PageSourceManager.createPageSource(PageSourceManager.java:61)
        at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:264)
        at io.trino.operator.Driver.processInternal(Driver.java:403)
        at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
        ... 17 more
    Caused by: java.io.IOException: Read of file s3://mys3bucket/<some part path>/part-00000-451f2b6b-005a-44f9-aaa1-4d4e979cf644.c000.gz failed: null
        at io.trino.filesystem.hdfs.HdfsTrinoInputStream.read(HdfsTrinoInputStream.java:108)
        at java.base/java.io.InputStream.readNBytes(InputStream.java:412)
        at java.base/java.io.InputStream.readAllBytes(InputStream.java:349)
        at io.trino.plugin.hive.line.LinePageSourceFactory.createPageSource(LinePageSourceFactory.java:140)
        ... 24 more
    Caused by: java.io.InterruptedIOException
        at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.propagate(TrinoS3FileSystem.java:1641)
        at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.openStream(TrinoS3FileSystem.java:1600)
        at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.openStream(TrinoS3FileSystem.java:1553)
        at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.seekStream(TrinoS3FileSystem.java:1546)
        at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.lambda$read$1(TrinoS3FileSystem.java:1490)
        at io.trino.hdfs.s3.RetryDriver.run(RetryDriver.java:125)
        at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.read(TrinoS3FileSystem.java:1489)
        at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:345)
        at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
        at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:405)
        at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
        at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
        at io.trino.filesystem.hdfs.HdfsTrinoInputStream.read(HdfsTrinoInputStream.java:102)
        ... 27 more
Caused by: java.lang.Exception: Interrupted By
    at java.base/java.lang.Thread.getStackTrace(Thread.java:2450)
    at io.trino.operator.Driver$DriverLock.interruptCurrentOwner(Driver.java:858)
    at io.trino.operator.Driver.close(Driver.java:178)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.close(SqlTaskExecution.java:909)
    at io.trino.execution.executor.dedicated.TaskEntry.destroy(TaskEntry.java:88)
    at io.trino.execution.executor.dedicated.ThreadPerDriverTaskExecutor.removeTask(ThreadPerDriverTaskExecutor.java:143)
    at io.trino.execution.SqlTaskExecution.lambda$createTaskHandle$3(SqlTaskExecution.java:232)
    at io.trino.execution.StateMachine.fireStateChangedListener(StateMachine.java:240)
    at io.trino.execution.StateMachine.lambda$fireStateChanged$0(StateMachine.java:232)
    ... 3 more
electrum commented 1 month ago

These logs are normal. The Driver was interrupted message for SplitRunner occurs when a task is cancelled, which usually happens due to a LIMIT query or terminating manually in the CLI.