alibaba / Alink

Alink is the Machine Learning algorithm platform based on Flink, developed by the PAI team of Alibaba computing platform.
Apache License 2.0
3.58k stars 802 forks source link

ProphetTrainBatchOp Error #232

Open joyatcloudfall opened 1 year ago

joyatcloudfall commented 1 year ago

I tried the example code of ProphetTrainBatch and got this unclassified error.

from pyalink.alink import *

import pandas as pd

useLocalEnv(1)

import time, datetime
import numpy as np
import pandas as pd

data = pd.DataFrame([
            [1,  datetime.datetime.fromtimestamp(1), 10.0],
            [1,  datetime.datetime.fromtimestamp(2), 11.0],
            [1,  datetime.datetime.fromtimestamp(3), 12.0],
            [1,  datetime.datetime.fromtimestamp(4), 13.0],
            [1,  datetime.datetime.fromtimestamp(5), 14.0],
            [1,  datetime.datetime.fromtimestamp(6), 15.0],
            [1,  datetime.datetime.fromtimestamp(7), 16.0],
            [1,  datetime.datetime.fromtimestamp(8), 17.0],
            [1,  datetime.datetime.fromtimestamp(9), 18.0],
            [1,  datetime.datetime.fromtimestamp(10), 19.0]
])

source = dataframeToOperator(data, schemaStr='id int, ts timestamp, val double', op_type='batch')

prophetModel = source.link(\
                    ProphetTrainBatchOp()\
                        .setTimeCol("ts")\
                        .setValueCol("val")
                )

ProphetPredictBatchOp()\
    .setValueCol("data")\
    .setPredictNum(4)\
    .setPredictionCol("pred")\
    .linkFrom(
         prophetModel,
         source.link(
            GroupByBatchOp()
                .setGroupByPredicate("id")
                .setSelectClause("id, mtable_agg(ts, val) as data")
            )
    )\
    .print()

Reference: https://www.yuque.com/pinshu/alink_doc/prophettrainbatchop

Caused by: ERROR: 0x0860080000001001-Unclassified error: Failed to call addValue with value: 1,"{""data"":{""ts"":[""1970-01-01 08:00:01.0"",""1970-01-01 08:00:02.0"",""1970-01-01 08:00:03.0"",""1970-01-01 08:00:04.0"",""1970-01-01 08:00:05.0"",""1970-01-01 08:00:06.0"",""1970-01-01 08:00:07.0"",""1970-01-01 08:00:08.0"",""1970-01-01 08:00:09.0"",""1970-01-01 08:00:10.0""],""val"":[10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]},""schema"":""ts TIMESTAMP,val DOUBLE""}",. at com.alibaba.alink.common.lazy.LazyEvaluation.addValue(LazyEvaluation.java:45) at com.alibaba.alink.python.utils.OperatorCsvCollector.lambda$null$0(OperatorCsvCollector.java:62) at com.alibaba.alink.operator.batch.BatchOperator.lambda$lazyCollect$5(BatchOperator.java:646) at io.reactivex.rxjava3.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63) at io.reactivex.rxjava3.subjects.ReplaySubject$UnboundedReplayBuffer.replay(ReplaySubject.java:791) at io.reactivex.rxjava3.subjects.ReplaySubject.onNext(ReplaySubject.java:365) at com.alibaba.alink.common.lazy.LazyEvaluation.addValue(LazyEvaluation.java:42) ... 14 more

I'd appreciate for it if anyone could help me to figure this out.