trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.5k stars 3.02k forks source link

Inserting data at the same time, AWS glue? Not sure, Trino, Not sure. 400 error. Same problem Iceberg, and HIve. #16762

Open bbarclay opened 1 year ago

bbarclay commented 1 year ago

I've run in Kubernetes, as Iceberg, and as Hive table inserts into AWS glue. I'm unsure if this is an AWS Glue problem or a Trino issue. It happens when I run an insert script, at the same time and with concurrency.

  1. Start two of the same insert scripts simultaneously, and they will somehow collide causing this error.
  2. Run using concurrency with more than 1 worker, and the error occurs.
  3. The error is different with Iceberg tables, but it's pretty much the same. There is some blocking that is coming from AWS, but the error codes are generic and difficult to find a resolution for.

MY CODE:

from trino.dbapi import connect
import concurrent.futures
from multiprocessing import freeze_support
conn = connect(
    host="localhost",
    port=8080,
    user="trino",
    catalog="iceberg",
    schema="order_data",
)

#create table called iceberg using the iceberg connector
cur = conn.cursor()

def doWork(i):
    print(i)
    try:
            cur.execute("INSERT INTO order_test3 (order_id, order_date, order_customer_id, order_status) VALUES ('"+str(i)+"', '2021-01-01', '"+str(i)+"', 'COMPLETE')")
            #  print response
            response = cur.fetchall()
            #  close
            print(response)

            print(i)
    except Exception as e:
       print(i)
       print("Error")
       print(e)
    #    print detailed error
       print(e.args[0])
def main (): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
        for i in range(1500,1510):
            print(i)
            executor.submit(doWork, i)

if __name__ == '__main__':
    # enable support for multiprocessing for main 
      freeze_support()
      main()

THE ERROR:

TrinoQueryError(type=INTERNAL_ERROR, name=GENERIC_INTERNAL_ERROR, message="Failed to commit to Glue table: order_data.order_test3", query_id=20230328_033606_00016_fe4cq)

{
  "message": "Failed to commit to Glue table: order_data.order_test3",
  "errorCode": 65536,
  "errorName": "GENERIC_INTERNAL_ERROR",
  "errorType": "INTERNAL_ERROR",
  "failureInfo": {
    "type": "org.apache.iceberg.exceptions.CommitFailedException",
    "message": "Failed to commit to Glue table: order_data.order_test3",
    "stack": [
      "io.trino.plugin.iceberg.catalog.glue.GlueIcebergTableOperations.commitToExistingTable(GlueIcebergTableOperations.java:147)",
      "io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.commit(AbstractIcebergTableOperations.java:151)",
      "org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$5(BaseTransaction.java:403)",
      "org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)",
      "org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)",
      "org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)",
      "org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)",
      "org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:394)",
      "org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:277)",
      "io.trino.plugin.iceberg.IcebergMetadata.finishInsert(IcebergMetadata.java:880)",
      "io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:527)",
      "io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:932)",
      "io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4128)",
      "io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)",
      "io.trino.operator.Driver.processInternal(Driver.java:396)",
      "io.trino.operator.Driver.lambda$process$8(Driver.java:299)",
      "io.trino.operator.Driver.tryWithLock(Driver.java:691)",
      "io.trino.operator.Driver.process(Driver.java:291)",
      "io.trino.operator.Driver.processForDuration(Driver.java:262)",
      "io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:773)",
      "io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)",
      "io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:523)",
      "io.trino.$gen.Trino_410____20230328_032911_2.run(Unknown Source)",
      "java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)",
      "java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)",
      "java.base/java.lang.Thread.run(Thread.java:833)"
    ],
    "cause": {
      "type": "com.amazonaws.services.glue.model.ConcurrentModificationException",
      "message": "Update table failed due to concurrent modifications. (Service: AWSGlue; Status Code: 400; Error Code: ConcurrentModificationException; Request ID: 00ed5238-e716-41da-91dd-89bdf628d75c; Proxy: null)",
      "stack": [
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)",
        "com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)",
        "com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)",
        "com.amazonaws.services.glue.AWSGlueClient.doInvoke(AWSGlueClient.java:12473)",
        "com.amazonaws.services.glue.AWSGlueClient.invoke(AWSGlueClient.java:12440)",
        "com.amazonaws.services.glue.AWSGlueClient.invoke(AWSGlueClient.java:12429)",
        "com.amazonaws.services.glue.AWSGlueClient.executeUpdateTable(AWSGlueClient.java:12198)",
        "com.amazonaws.services.glue.AWSGlueClient.updateTable(AWSGlueClient.java:12167)",
        "io.trino.plugin.iceberg.catalog.glue.GlueIcebergTableOperations.lambda$commitToExistingTable$1(GlueIcebergTableOperations.java:143)",
        "io.trino.plugin.hive.aws.AwsApiCallStats.call(AwsApiCallStats.java:37)",
        "io.trino.plugin.iceberg.catalog.glue.GlueIcebergTableOperations.commitToExistingTable(GlueIcebergTableOperations.java:143)",
        "io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.commit(AbstractIcebergTableOperations.java:151)",
        "org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$5(BaseTransaction.java:403)",
        "org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)",
        "org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)",
        "org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)",
        "org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)",
        "org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:394)",
        "org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:277)",
        "io.trino.plugin.iceberg.IcebergMetadata.finishInsert(IcebergMetadata.java:880)",
        "io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:527)",
        "io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:932)",
        "io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4128)",
        "io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)",
        "io.trino.operator.Driver.processInternal(Driver.java:396)",
        "io.trino.operator.Driver.lambda$process$8(Driver.java:299)",
        "io.trino.operator.Driver.tryWithLock(Driver.java:691)",
        "io.trino.operator.Driver.process(Driver.java:291)",
        "io.trino.operator.Driver.processForDuration(Driver.java:262)",
        "io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:773)",
        "io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)",
        "io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:523)",
        "io.trino.$gen.Trino_410____20230328_032911_2.run(Unknown Source)",
        "java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)",
        "java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)",
        "java.base/java.lang.Thread.run(Thread.java:833)"
      ],
      "suppressed": []
    },
    "suppressed": []
  }
}
electrum commented 1 year ago

This error should be classified better. It should be translated to TrinoException with error code TRANSACTION_CONFLICT. However, the root cause is the concurrency model used by Iceberg.

The table can only be updated by one writer at once. When there are multiple writers, Glue will reject the update for all but the first writer due to the table version being outdated. This is what saves you from losing updates. Iceberg will retry up to 4 times, but with a concurrency of 10, it's likely that one of your workers is always "unlucky" and fails each retry attempt.

Iceberg is not designed for doing many small inserts from independent writers. Some options:

bbarclay commented 1 year ago

The same thing happens with Athena.