apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

[Bug] [connectors-v2] 使用localfile 本地导入超过50万行的excel,速度越来越慢,提示oom #8040

Open dwave opened 1 week ago

dwave commented 1 week ago

Search before asking

What happened

使用localfile 本地导入超过50万行的excel,速度越来越慢,提示oom image image

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
"job.mode"=BATCH
"job.name"="SeaTunnel_Job"
"savemode.execute.location"=CLUSTER
}
source {
  LocalFile{
    result_table_name = "fake1"
    delimiter = "#"
    skip_header_row_number = 1
    path = "/data/sale_detail_v2.xlsx"
    file_format_type = "excel"
    datatime_format = "yyyy-MM-dd HH:mm:ss"
    schema {
        fields {
            order_id = string
            order_label = string
            payment_time = string
            product_name = string
            product_type = string
            product_specification = string
            purchase_quantity = string
            appointment_store = string
            applicable_store_type = string
            product_id = string
            coupon_code = string
            coupon_status = string
            total_times = string
            redeemed_times = string
            remaining_times = string
            redemption_time = string
            user_strike_price = string
            user_single_strike_price = string
            voucher_redeemed_value = string
            order_actual_received = string
            selling_amount = string
            merchant_subsidy = string
            merchant_subsidy_details = string
            product_payment = string
            platform_subsidy = string
            platform_subsidy_details = string
            brand_merchant_subsidy = string
            brand_merchant_subsidy_details = string
            software_service_fee = string
            talent_commission = string
            artisan_incentive_commission = string
            service_provider_commission = string
            insurance_cost = string
            pre_sale_price = string
            appointment_surcharge = string
            software_service_fee_rate = string
            talent_commission_rate = string
            artisan_incentive_commission_rate = string
            service_provider_commission_rate = string
            service_provider_name = string
            sales_role = string
            sales_channel = string
            order_owner_nickname = string
            order_owner_uid = string
            talent_nickname = string
            talent_douyin_number = string
            talent_uid = string
            content_address = string
            artisan_nickname = string
            artisan_douyin_number = string
            artisan_uid = string
            store_staff_nickname = string
            store_staff_douyin_number = string
            store_staff_uid = string
            store_staff_incentive_amount = string
            store_staff_incentive_amount_rate = string
        }
    }
  }
}
transform {
}

sink {
    StarRocks {
        labelPrefix="seatunnel_v4"
        batch_max_rows=102500
        batch_max_bytes=52428800
        enable_upsert_delete=false
        schema_save_mode=IGNORE
        data_save_mode=DROP_DATA
        save_mode_create_template="CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n${rowtype_primary_key},\n${rowtype_fields}\n) ENGINE=OLAP\n PRIMARY KEY (${rowtype_primary_key})\nDISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (\n    replication_num = 1 \n)"
        http_socket_timeout_ms=180000
        source_table_name=Table15546062651361
        table="excel_sale_detail_v2"
        database=test
        nodeUrls=[
            ""
        ]
        username=
        password=
        base-url="jdbc:mysql://:9030/test"
    }
    }

Running Command

./bin/seatunnel.sh  -DJvmOption="-Xms10G -Xmx10G"  --config ./config/sale_excel.conf -e local

Error Exception

ask (1/1)] end with state FAILED and Exception: java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.apache.poi.util.IOUtils.toByteArray(IOUtils.java:185)
        at org.apache.poi.util.IOUtils.toByteArray(IOUtils.java:149)
        at org.apache.poi.util.IOUtils.toByteArray(IOUtils.java:136)
        at org.apache.poi.openxml4j.util.ZipArchiveFakeEntry.<init>(ZipArchiveFakeEntry.java:47)
        at org.apache.poi.openxml4j.util.ZipInputStreamZipEntrySource.<init>(ZipInputStreamZipEntrySource.java:53)
        at org.apache.poi.openxml4j.opc.ZipPackage.<init>(ZipPackage.java:106)
        at org.apache.poi.openxml4j.opc.OPCPackage.open(OPCPackage.java:307)
        at org.apache.poi.ooxml.util.PackageHelper.open(PackageHelper.java:47)
        at org.apache.poi.xssf.usermodel.XSSFWorkbook.<init>(XSSFWorkbook.java:309)
        at org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy.readProcess(ExcelReadStrategy.java:94)
        at org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy.resolveArchiveCompressedInputStream(AbstractReadStrategy.java:241)
        at org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy.read(ExcelReadStrategy.java:78)
        at org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader.pollNext(MultipleTableFileSourceReader.java:81)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct