apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.44k stars 3.52k forks source link

[Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 #21400

Closed asfimport closed 3 years ago

asfimport commented 5 years ago

Creating this in Arrow project as the traceback seems to suggest this is an issue in Arrow. Continuation from the conversation on the https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=P_1Nst5AjjCRg0MExO5Kby9i-g@mail.gmail.com%3E

When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:


  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 279, in load_stream
    for batch in reader:
  File "pyarrow/ipc.pxi", line 265, in __iter__
  File "pyarrow/ipc.pxi", line 281, in pyarrow.lib._RecordBatchReader.read_next_batch
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: read length must be positive or -1

as my dataset size starts increasing that I want to group on. Here is a reproducible code snippet where I can reproduce this. Note: My actual dataset is much larger and has many more unique IDs and is a valid usecase where I cannot simplify this groupby in any way. I have stripped out all the logic to make this example as simple as I could.


import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
import findspark
findspark.init()
import pyspark
from pyspark.sql import functions as F, types as T
import pandas as pd

spark = pyspark.sql.SparkSession.builder.getOrCreate()

pdf1 = pd.DataFrame(
    [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
    columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
)
df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')

pdf2 = pd.DataFrame(
    [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
    columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
)
df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index')
df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')

def myudf(df):
    return df

df4 = df3
udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)

df5 = df4.groupBy('df1_c1').apply(udf)
print('df5.count()', df5.count())

# df5.write.parquet('/tmp/temp.parquet', mode='overwrite')

I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per executor too.

Environment: Cloudera cdh5.13.3 Cloudera Spark 2.3.0.cloudera3 Reporter: Abdeali Kothari

Related issues:

Note: This issue was originally created as ARROW-4890. Please see the migration documentation for further details.

asfimport commented 5 years ago

Arvind Ravish: I get the same thing when running the repro code in Databricks. Can we get some description about what the error means?

Plan looks like this

== Physical Plan == *(7) HashAggregate(keys=[], functions=[finalmerge_count(merge count#506L) AS count(1)#502L|#506L) AS count(1)#502L]) +- Exchange SinglePartition +- *(6) HashAggregate(keys=[], functions=[partial_count(1) AS count#506L|#506L]) +- *(6) Project +- FlatMapGroupsInPandas [df1_c1#72L|#72L], myudf(df1_c1#72L, df1_c2#73, df1_c3#74, df1_c4#75, df2_c1#108L, df2_c2#109, df2_c3#110, df2_c4#111, df2_c5#112, df2_c6#113), [df1_c1#264L, df1_c2#265, df1_c3#266, df1_c4#267, df2_c1#268L, df2_c2#269, df2_c3#270, df2_c4#271, df2_c5#272, df2_c6#273|#264L, df1_c2#265, df1_c3#266, df1_c4#267, df2_c1#268L, df2_c2#269, df2_c3#270, df2_c4#271, df2_c5#272, df2_c6#273] +- *(5) Project [df1_c1#72L, df1_c1#72L, df1_c2#73, df1_c3#74, df1_c4#75, df2_c1#108L, df2_c2#109, df2_c3#110, df2_c4#111, df2_c5#112, df2_c6#113|#72L, df1_c1#72L, df1_c2#73, df1_c3#74, df1_c4#75, df2_c1#108L, df2_c2#109, df2_c3#110, df2_c4#111, df2_c5#112, df2_c6#113] +- *(5) SortMergeJoin [df1_c1#72L|#72L], [df2_c1#108L|#108L], Inner :- *(2) Sort [df1_c1#72L ASC NULLS FIRST|#72L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(df1_c1#72L, 200) : +- *(1) Project [df1_c1#72L, df1_c2#73, df1_c3#74, df1_c4#75|#72L, df1_c2#73, df1_c3#74, df1_c4#75] : +- *(1) Filter isnotnull(df1_c1#72L) : +- *(1) Scan ExistingRDD[index#71L,df1_c1#72L,df1_c2#73,df1_c3#74,df1_c4#75|#71L,df1_c1#72L,df1_c2#73,df1_c3#74,df1_c4#75] +- *(4) Sort [df2_c1#108L ASC NULLS FIRST|#108L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(df2_c1#108L, 200) +- *(3) Project [df2_c1#108L, df2_c2#109, df2_c3#110, df2_c4#111, df2_c5#112, df2_c6#113|#108L, df2_c2#109, df2_c3#110, df2_c4#111, df2_c5#112, df2_c6#113] +- *(3) Filter isnotnull(df2_c1#108L) +- *(3) Scan ExistingRDD[index#107L,df2_c1#108L,df2_c2#109,df2_c3#110,df2_c4#111,df2_c5#112,df2_c6#113|#107L,df2_c1#108L,df2_c2#109,df2_c3#110,df2_c4#111,df2_c5#112,df2_c6#113]

  image-2019-07-04-12-03-57-002.png

An error occurred while calling o1471.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 93 in stage 39.0 failed 4 times, most recent failure: Lost task 93.3 in stage 39.0 (TID 1261, 10.139.64.6, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 296, in dump_stream for series in iterator: File "/databricks/spark/python/pyspark/serializers.py", line 319, in load_stream for batch in generator(): File "/databricks/spark/python/pyspark/serializers.py", line 314, in generator for batch in reader: File "pyarrow/ipc.pxi", line 268, in iter (/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:70278) File "pyarrow/ipc.pxi", line 284, in pyarrow.lib._RecordBatchReader.read_next_batch (/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:70534) File "pyarrow/error.pxi", line 79, in pyarrow.lib.check_status (/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:8345) pyarrow.lib.ArrowIOError: read length must be positive or -1 at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

asfimport commented 5 years ago

Micah Kornfield / @emkornfield: This seems to indicate that the stream of data being passed to Arrow isn't in the correct format (or Arrow is misinterpreting it).

asfimport commented 4 years ago

SURESH CHAGANTI: [~emkornfield@gmail.com] is there any size limit as to how much we can send to pandas_udf ? I am also seeing the same error as above and my groups are pretty large around 200M records and size is around 2 to 4 GB 

asfimport commented 4 years ago

Micah Kornfield / @emkornfield: Yes.  I believe it is 2GB per shard currently.

asfimport commented 4 years ago

SURESH CHAGANTI: got it, thank you [~emkornfield@gmail.com], is there any way we can increase that size?

I am assuming the data that gets sent to pandas_udf method is in the uncompressed format 

 

asfimport commented 4 years ago

SURESH CHAGANTI: [~AbdealiJK]  do you find any fix or workaround for this error?

asfimport commented 4 years ago

Abdeali Kothari: No, I had to stop using pandas UDFs due to this and find another approach for the transformations I need to do.

asfimport commented 4 years ago

SURESH CHAGANTI: Thank you [~AbdealiJK].  can I know another approach you are using? 

asfimport commented 4 years ago

Abdeali Kothari: We changed our pipeline to do it with joins and explodes and used spark functions.

Was a while back, dont remember the exact specifics of that pipeline.

asfimport commented 4 years ago

Micah Kornfield / @emkornfield: "I am assuming the data that gets sent to pandas_udf method is in the uncompressed format", yes I believe this to be true but this isn't really the main limitation.  

 

Currently, ArrowBufs (the components that hold memory) are limited to be less than 2GB each and only.  I need to cleanup https://github.com/apache/arrow/pull/5020 to address this (and other limitations).  I actually might have mis-stated the actual limitations per shard for toPandas functions. 

[~cutlerb]  do you know what the actual limits are? I can't seem to find any documentation on it.

asfimport commented 4 years ago

SURESH CHAGANTI: thank you [~emkornfield@gmail.com] appreciate your time, I have ran multiple tests with different sizes of the data and shard with more than 2GB size was failed.  glad to see the fix is in progress and it will be great if this change rolls out sooner. I will be happy to contribute to this issue.

asfimport commented 4 years ago

Bryan Cutler / @BryanCutler: Sorry, I'm not sure of any documentation with the limits. It would be great to get that down somewhere and there should be a better error message for this, but maybe it should be done on the Spark side.

asfimport commented 4 years ago

Micah Kornfield / @emkornfield: I agree, it should probably be on the spark size (assuming the root cause is hitting caps in Arrow).

asfimport commented 4 years ago

SURESH CHAGANTI: sure, I will create an issue against spark, thank you!.  for now to get rid of the issue, I will build the code with https://github.com/apache/arrow/pull/5020  and see what happens.  thank you [~emkornfield@gmail.com] & @BryanCutler

asfimport commented 4 years ago

SURESH CHAGANTI: I guess this branch address the 2 GB issue https://github.com/emkornfield/arrow/tree/int64_address could you please confirm?  thank you.

  

asfimport commented 4 years ago

Micah Kornfield / @emkornfield: Yes that is the one. It hasn't been tested too much yet, but if you have time to check to see if it works for you that would be great.

asfimport commented 4 years ago

SURESH CHAGANTI: thank you. I will keep you guys posted on my testing. 

asfimport commented 4 years ago

Raz Cohen: Hey guys, is there any news regarding the 2GB issue?

asfimport commented 4 years ago

Micah Kornfield / @emkornfield: There have been a few PRs checked in, but the full end-to-end IPC path has not been tested yet.  CC [~fan_li_ya]

asfimport commented 4 years ago

Liya Fan / @liyafan82: Sure. [~emkornfield@gmail.com] is right. After [~emkornfield@gmail.com] has finished the implementation of the 64 bit buffer, we have a few follow-up work items to do, before we can claim that the 2GB restrict is removed:

  1. In ARROW-7610, we apply the 64 bit buffer to vector implementations, and add integration tests. This work item is on-going.
  2. In ARROW-6111, we provide new vectors to support 64 bit buffers, as the current ones have a offset width of 4 bytes. This work item is on-going. (we would appreciate if anyone could provide some feedback/review comments for the PRs for 1 and 2)
  3. And we need another work item to make everything goes well in IPC scenarios. This is tracked by ARROW-7746, and has not started yet. (we would appreciate if anyone would provide a solution to this issue. Otherwise, I will try to provide a solution some days later)
asfimport commented 4 years ago

Ruslan Dautkhanov: [~fan_li_ya] looks like all the follow up jiras you mentioned are resolved now in Arrow 1.0, does this automatically resolve this Jira or there are some follow ups that are left? 

I think Netty still has 2Gb limit? Would we still be running into that limit too? Thx 


java.lang.IndexOutOfBoundsException: index: 0, length: 1073741824 (expected: range(0, 0)) 
at io.netty.buffer.ArrowBuf.checkIndex(ArrowBuf.java:716) 
at io.netty.buffer.ArrowBuf.setBytes(ArrowBuf.java:954) 
at org.apache.arrow.vector.BaseVariableWidthVector.reallocDataBuffer(BaseVariableWidthVector.java:508) 
at org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1239) 
at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1028)
asfimport commented 4 years ago

Micah Kornfield / @emkornfield: [~Tagar]  this doesn't automatically resolve the JIRA.  But I think most of the work in Arrow is taken care of as of 1.0.  I would guess there is likely still work to do in Spark and there might be a few more bugs to work out.   

 

We've introduced an alternative allocator that isn't based on Netty.

asfimport commented 4 years ago

Ruslan Dautkhanov: Thanks for the details [~emkornfield@gmail.com]! Great to know the issue with netty allocator and some other related issues are now resolved. Understood that there might be other things to complete on Arrow side to lift the 2Gb limitation. Created https://issues.apache.org/jira/browse/SPARK-32294 for the Spark side. 

asfimport commented 3 years ago

Dmitry Kravchuk: I've tryed pyarrow 2.0.0 on spark 2.4.4 and still got familiar error - "OSError: Invalid IPC message: negative bodyLength".

Do we have any coming news for resolving this issue?

asfimport commented 3 years ago

Liya Fan / @liyafan82: [~dishka_krauch] It is difficult to find the cause without providing more details about the problem. The possible reasons that comes to my mind include:

  1. If you are using Flight for IPC, buffers larger than 2GB are not supported, as restricted by GRPC.
  2. Please make sure you are using UnsafeAllocationManager as the allocation manager (not NettyAllocationManager)
asfimport commented 3 years ago

Dmitry Kravchuk: [~fan_li_ya] I've just used code in the description of this issue at my Hadoop cluster.

Which detailes do you need to look at the whole problem? I can give you everything, just let me know.

asfimport commented 3 years ago

Liya Fan / @liyafan82: [~dishka_krauch] The problem occured during IPC from python to Java? I think the stack traces and some other logs would be helpful.

asfimport commented 3 years ago

Dmitry Kravchuk: [~fan_li_ya] okay, here we go.

Spark version - 2.4.4

Python env:

asfimport commented 3 years ago

Liya Fan / @liyafan82: [~dishka_krauch] Thanks for the detailed information. It seems the stack trace of the root cause is as follows:


java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
    at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
    at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
    at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
    at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)

The negative capacity was likely to be caused by integer overflow. According to Arrow specification, the message size is represented as a 32-bit little endian integer (see https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc), so the message size cannot exceed 2GB.

To overcome this constraint, we need to change the specification, which will be a large change, involving multiple languages. If you really need to do so, maybe you can open a discussion in the mailing list.

asfimport commented 3 years ago

Micah Kornfield / @emkornfield: [~fan_li_ya] I believe the 32-bit integer is for the message metadata and the actual message size. (All buffers use 64 bit lengths).

 

Dimitry are you using stock Spark 2.4.4?  If so I believe the version of jave version Arrow is quite old and at the very lest you would have to set  the environment variable ARROW_PRE_0_15_IPC_FORMAT=1 to have anything work (https://arrow.apache.org/blog/2019/10/06/0.15.0-release/)

 

I think there is still some coding work to upgrade Spark to a new version of Arrow that is potentially capable of having data > 2GB.

asfimport commented 3 years ago

Liya Fan / @liyafan82: @emkornfield You are right. According to the above stack trace, the exception was thrown when reading schema.

asfimport commented 3 years ago

Dmitry Kravchuk: [~fan_li_ya] @emkornfield  thanks for your replies!

Yes, I'm using spark 2.4.4.

And yes, main goal is to have more than 2GB in pandas_udf functions.

I've used ARROW_PRE_0_15_IPC_FORMAT=1 in my spark submit:

 


%sh
cd /home/zeppelin/code && \
export PYSPARK_DRIVER_PYTHON=/home/zeppelin/envs/env3/bin/python && \
export PYSPARK_PYTHON=./env3/bin/python && \
export ARROW_PRE_0_15_IPC_FORMAT=1 && \
spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 5 \
--executor-cores 5 \
--driver-memory 8G \
--executor-memory 8G \
--conf spark.executor.memoryOverhead=4G \
--conf spark.driver.memoryOverhead=4G \
--archives /home/zeppelin/env3.tar.gz#env3 \
--jars "/opt/deltalake/delta-core_2.11-0.5.0.jar" \
--py-files jobs.zip,"/opt/deltalake/delta-core_2.11-0.5.0.jar" main.py \
--job temp

 

With pyarrow version 0.15.0 and dataset with size 172 mb in padas_udf I still got error number 2 (look at detail log in my previous big message).

Any suggestions?

asfimport commented 3 years ago

Dmitry Kravchuk: I found out that this error relates to pyarrow version after 0.14.1 https://stackoverflow.com/questions/58458415/pandas-scalar-udf-failing-illegalargumentexception

How can I cure it? Upgrade spark?

Environment variable ARROW_PRE_0_15_IPC_FORMAT=1 didn't help either.

I've tried to use pyarrow version 2.0.0 but it's still throwing the java.lang.IllegalArgumentException exception.

UPD: I've cured this following prompt (look at udf function) using 172 mb dataset:


import pyspark
from pyspark.sql import functions as F, types as T
import pandas as pd

def analyze(spark):

    pdf1 = pd.DataFrame(
        [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
        columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
    )
    df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')

    pdf2 = pd.DataFrame(
        [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
        columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
    )
    df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(4899)]).reset_index()).drop('index')
    df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')

    def myudf(df):
        import os
        os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
        return df

    df4 = df3 \
        .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
        .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
        .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
        .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
        .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
        .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
        .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
        .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
        .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
        .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))
    print(df4.printSchema())

    udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)

    df5 = df4.groupBy('df1_c1').apply(udf)
    print('df5.count()', df5.count())

 

asfimport commented 3 years ago

Dmitry Kravchuk: Finally I've tried pandas_udf with 1.72 gb dataset, pyarrow version 2.0.0 and ARROW_PRE_0_15_IPC_FORMAT=1 prompt - spark-submit returns error "OSError: Invalid IPC message: negative bodyLength" (look at detail log number 4 in my previous messages).

https://github.com/apache/arrow/blob/f95781eca2c924a4c0882c5b619e18f521ae93d3/cpp/src/arrow/ipc/message.cc#L189

Any thoughts?

 


import pyspark
from pyspark.sql import functions as F, types as T
import pandas as pd

def analyze(spark):

    pdf1 = pd.DataFrame(
        [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
        columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
    )
    df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')

    pdf2 = pd.DataFrame(
        [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
        columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
    )
    df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index')
    df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')

    def myudf(df):
        import os
        os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
        return df

    df4 = df3 \
        .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
        .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
        .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
        .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
        .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
        .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
        .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
        .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
        .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
        .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))
    print(df4.printSchema())

    udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)

    df5 = df4.groupBy('df1_c1').apply(udf)
    print('df5.count()', df5.count())

 

asfimport commented 3 years ago

Liya Fan / @liyafan82: ARROW_PRE_0_15_IPC_FORMAT controls the write_legacy_ipc_format flag, which controls if we write 0xffffffff in the message header. However, for the Java and C++ implementations, the message length is represented with a 32-bit integer, regardless of the value of write_legacy_ipc_format. So the ARROW_PRE_0_15_IPC_FORMAT flag does not help with the problem.

To solve the problem, we need either reduce the message size, or remove the 2GB constraint of the message size (this may involve changing the specification, and the change to the implementation is also large).

asfimport commented 3 years ago

Dmitry Kravchuk: [~fan_li_ya] do I need to create new issue for all this stuff?

asfimport commented 3 years ago

Liya Fan / @liyafan82: I think some discussions are needed.

asfimport commented 3 years ago

Dmitry Kravchuk: [~fan_li_ya] okay.

https://issues.apache.org/jira/browse/ARROW-10957

asfimport commented 3 years ago

Micah Kornfield / @emkornfield: This should be fixed as of Spark 3.1 which upgrades the java arrow dependency to 2.0