alibaba / Alink

Alink is the Machine Learning algorithm platform based on Flink, developed by the PAI team of Alibaba computing platform.
Apache License 2.0
3.58k stars 802 forks source link

alink1.5.2 getOutputTable().to_pandas() raise ERROR #197

Open ygean opened 2 years ago

ygean commented 2 years ago

sample code as below:

input_users = [[617127], [4841758], [382730], [1399180]]
df_users = pd.DataFrame(input_users)
sdata = StreamOperator.fromDataframe(df_users, schemaStr='user bigint')
predictor = ItemCfItemsPerUserRecommStreamOp(model)\
    .setUserCol("user")\
    .setReservedCols(["user"])\
    .setK(3)\
    .setRecommCol("prediction_result");
b = predictor.linkFrom(sdata).getOutputTable()
StreamOperator.execute()
b.to_pandas()

Get ERROR:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [37], in <module>
----> 1 b.to_pandas()

File ~/.local/lib/python3.8/site-packages/pyflink/table/table.py:998, in Table.to_pandas(self)
    995 gateway = get_gateway()
    996 max_arrow_batch_size = self._j_table.getTableEnvironment().getConfig().getConfiguration()\
    997     .getInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_ARROW_BATCH_SIZE)
--> 998 batches_iterator = gateway.jvm.org.apache.flink.table.runtime.arrow.ArrowUtils\
    999     .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
   1000 if batches_iterator.hasNext():
   1001     import pytz

File ~/.local/lib/python3.8/site-packages/py4j/java_gateway.py:1285, in JavaMember.__call__(self, *args)
   1279 command = proto.CALL_COMMAND_NAME +\
   1280     self.command_header +\
   1281     args_command +\
   1282     proto.END_COMMAND_PART
   1284 answer = self.gateway_client.send_command(command)
-> 1285 return_value = get_return_value(
   1286     answer, self.gateway_client, self.target_id, self.name)
   1288 for temp_arg in temp_args:
   1289     temp_arg._detach()

File ~/.local/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: java.lang.UnsupportedOperationException: Python vectorized UDF doesn't support logical type LEGACY('RAW', 'ANY<com.alibaba.alink.common.MTable>') currently.
    at org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:947)
    at org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:821)
    at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:202)
    at org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:107)
    at org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:227)
    at org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:218)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:219)
    at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:662)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

dependency list: pandas==1.3.5

ygean commented 2 years ago

downgrade alink at 1.4.0 and reinstall pandas==1.1.5, it works.

Fanoid commented 2 years ago

Hi. to_pandas is provided by PyFlink, not PyAlink. From the error message, it seems to_pandas does not support Alink-defined types: : java.lang.UnsupportedOperationException: Python vectorized UDF doesn't support logical type LEGACY('RAW', 'ANY<com.alibaba.alink.common.MTable>') currently.

In previous PyAlink version, this operator didn't use the Alink-defined type (MTable), so it worked.

ygean commented 2 years ago

@Fanoid Thanks.