StarRocks / starrocks

The world's fastest open query engine for sub-second analytics both on and off the data lakehouse. With the flexibility to support nearly any scenario, StarRocks provides best-in-class performance for multi-dimensional analytics, real-time analytics, and ad-hoc queries. A Linux Foundation project.
https://starrocks.io
Apache License 2.0
9.06k stars 1.82k forks source link

[BugFix] fix partition id integer overflow in SparkDpp (backport #52965) #53016

Closed mergify[bot] closed 3 days ago

mergify[bot] commented 3 days ago

Why I'm doing:

// use bucket number as the parallel reduce task number int reduceNum = 0; for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) { for (int i = 0; i < partition.bucketNum; i++) { bucketKeyMap.put(partition.partitionId + "_" + i, reduceNum); reduceNum++; } }

+ 2.2) bucketKeyMap parse process

int partitionId = Integer.parseInt(bucketKey[0]);


## What I'm doing:

long partitionId = Long.parseLong(bucketKey[0]);

Fixes #issue

## What type of PR is this:

- [x] BugFix
- [ ] Feature
- [ ] Enhancement
- [ ] Refactor
- [ ] UT
- [ ] Doc
- [ ] Tool

Does this PR entail a change in behavior?

- [ ] Yes, this PR will result in a change in behavior.
- [x] No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

- [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
- [ ] Parameter changes: default values, similar parameters but with different default values
- [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
- [ ] Feature removed
- [ ] Miscellaneous: upgrade & downgrade compatibility, etc.

## Checklist:

- [ ] I have added test cases for my bug fix or my new feature
- [ ] This pr needs user documentation (for new or modified features or behaviors)
  - [ ] I have added documentation for my new feature or new function
- [x] This is a backport pr

## Bugfix cherry-pick branch check:
- [x] I have checked the version labels which the pr will be auto-backported to the target branch
  - [x] 3.3
  - [x] 3.2
  - [x] 3.1
  - [x] 3.0
  - [x] 2.5
<hr>This is an automatic backport of pull request #52965 done by [Mergify](https://mergify.com).
## Why I'm doing:
+ 1. error description

java.lang.NumberFormatException: For input string: "2151045907" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:583) at java.lang.Integer.parseInt(Integer.java:615) at com.starrocks.load.loadv2.dpp.SparkDpp$1.call(SparkDpp.java:276) at com.starrocks.load.loadv2.dpp.SparkDpp$1.call(SparkDpp.java:225) at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreachPartition$1(JavaRDDLike.scala:219) at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreachPartition$1$adapted(JavaRDDLike.scala:219) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1016) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1016) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1463) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

+ 2. reason descriptioin
+ 2.1) bucketKeyMap initial

@SerializedName(value = "partitionId") public long partitionId;

// use bucket number as the parallel reduce task number int reduceNum = 0; for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) { for (int i = 0; i < partition.bucketNum; i++) { bucketKeyMap.put(partition.partitionId + "_" + i, reduceNum); reduceNum++; } }

+ 2.2) bucketKeyMap parse process

int partitionId = Integer.parseInt(bucketKey[0]);


## What I'm doing:

long partitionId = Long.parseLong(bucketKey[0]);


Fixes #issue

## What type of PR is this:

- [x] BugFix
- [ ] Feature
- [ ] Enhancement
- [ ] Refactor
- [ ] UT
- [ ] Doc
- [ ] Tool

Does this PR entail a change in behavior?

- [ ] Yes, this PR will result in a change in behavior.
- [x] No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

- [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
- [ ] Parameter changes: default values, similar parameters but with different default values
- [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
- [ ] Feature removed
- [ ] Miscellaneous: upgrade & downgrade compatibility, etc.

## Checklist:

- [ ] I have added test cases for my bug fix or my new feature
- [ ] This pr needs user documentation (for new or modified features or behaviors)
  - [ ] I have added documentation for my new feature or new function
- [x] This is a backport pr
sonarcloud[bot] commented 3 days ago

Quality Gate Passed Quality Gate passed

Issues
0 New issues
0 Accepted issues

Measures
0 Security Hotspots
0.0% Coverage on New Code
0.0% Duplication on New Code

See analysis details on SonarQube Cloud