byzer-org / byzer-lang

Byzer (former MLSQL): A low-code open-source programming language for data pipeline, analytics and AI.
https://www.byzer.org
Apache License 2.0
1.83k stars 547 forks source link

`map` do not works in schema definition #1626

Open MichelZhan opened 2 years ago

MichelZhan commented 2 years ago

define a map field in byzer-python schema like codes below:

!python env "PYTHON_ENV=source activate dev";
!python conf "schema=st(field(top,map(string,string)))";
!python conf "runIn=driver";
!python conf "dataMode=model";

run command as Ray.`` where 
inputTable="command"
and outputTable="output"
and code='''
from pyjava.api.mlsql import RayContext, PythonContext
context.build_result([{"top": {"a": "string"}}])
''';

throws java.lang.UnsupportedOperationException

Error while decoding: java.lang.UnsupportedOperationException
createexternalrow(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -1), lambdavariable(MapObject, StringType, true, -1).toString, input[0, map<string,string>, true].keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -2), lambdavariable(MapObject, StringType, true, -2).toString, input[0, map<string,string>, true].valueArray, None).array, true, false), true, false), StructField(top,MapType(StringType,StringType,true),true))
java.lang.RuntimeException: Error while decoding: java.lang.UnsupportedOperationException
createexternalrow(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -1), lambdavariable(MapObject, StringType, true, -1).toString, input[0, map<string,string>, true].keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -2), lambdavariable(MapObject, StringType, true, -2).toString, input[0, map<string,string>, true].valueArray, None).array, true, false), true, false), StructField(top,MapType(StringType,StringType,true),true))
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:186)
tech.mlsql.ets.Ray.$anonfun$distribute_execute$20(Ray.scala:182)
scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
scala.collection.Iterator.foreach(Iterator.scala:941)
scala.collection.Iterator.foreach$(Iterator.scala:941)
scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47)
scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
scala.collection.AbstractIterator.to(Iterator.scala:1429)
scala.collection.TraversableOnce.toList(TraversableOnce.scala:299)
scala.collection.TraversableOnce.toList$(TraversableOnce.scala:299)
scala.collection.AbstractIterator.toList(Iterator.scala:1429)
tech.mlsql.ets.Ray.distribute_execute(Ray.scala:180)
tech.mlsql.ets.Ray.train(Ray.scala:56)
tech.mlsql.dsl.adaptor.TrainAdaptor.parse(TrainAdaptor.scala:116)
streaming.dsl.ScriptSQLExecListener.execute$1(ScriptSQLExec.scala:368)
streaming.dsl.ScriptSQLExecListener.exitSql(ScriptSQLExec.scala:407)
streaming.dsl.parser.DSLSQLParser$SqlContext.exitRule(DSLSQLParser.java:296)
org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:47)
org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:30)
org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28)
streaming.dsl.ScriptSQLExec$._parse(ScriptSQLExec.scala:159)
streaming.dsl.ScriptSQLExec$.parse(ScriptSQLExec.scala:146)
streaming.rest.RestController.$anonfun$script$1(RestController.scala:153)
tech.mlsql.job.JobManager$.run(JobManager.scala:74)
tech.mlsql.job.JobManager$$anon$1.run(JobManager.scala:91)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
hellozepp commented 2 years ago

we currently not support map type.

A column vector backed by Apache Arrow. Currently calendar interval type and map type are not supported.
hellozepp commented 2 years ago

Total jobs: 1 current job:1 job script:run command as ShowTableExt.where parameters='''["5ec0cffe278f4494904511706071d476"]''' Total jobs: 5 current job:1 job script:run command as PythonCommand. where parameters='''["env","PYTHON_ENV=source activate ray1.8.0"]''' as e213a9f98144443fa7ab4e0ad95fae07 Total jobs: 5 current job:2 job script:run command as PythonCommand.where parameters='''["conf","schema=st(field(top,map(string,string)))"]''' as 03c02d5cdcc648cb9ee06a27a1c17cc3 Total jobs: 5 current job:3 job script:run command as PythonCommand. where parameters='''["conf","runIn=driver"]''' as dab0c0a0cf6749ea8718d175cc4d6b3d Total jobs: 5 current job:4 job script:run command as PythonCommand.where parameters='''["conf","dataMode=model"]''' as dde717fb7f324eef8916a08cf26845e8 Total jobs: 5 current job:5 job script:run command as Ray. where inputTable="data" and outputTable="output" and code=''' from pyjava.api.mlsql import RayContext, PythonContext ray_context = RayContext.connect(globals(),None) ray_context.build_result([{"top": {"a": "string"}}]) ''' Error while decoding: java.lang.UnsupportedOperationException createexternalrow(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -1), lambdavariable(MapObject, StringType, true, -1).toString, input[0, map<string,string>, true].keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -2), lambdavariable(MapObject, StringType, true, -2).toString, input[0, map<string,string>, true].valueArray, None).array, true, false), true, false), StructField(top,MapType(StringType,StringType,true),true)) java.lang.RuntimeException: Error while decoding: java.lang.UnsupportedOperationException createexternalrow(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -1), lambdavariable(MapObject, StringType, true, -1).toString, input[0, map<string,string>, true].keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObject, StringType, true, -2), lambdavariable(MapObject, StringType, true, -2).toString, input[0, map<string,string>, true].valueArray, None).array, true, false), true, false), StructField(top,MapType(StringType,StringType,true),true)) org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:186) tech.mlsql.ets.Ray.$anonfun$distribute_execute$20(Ray.scala:182) scala.collection.Iterator$$anon$10.next(Iterator.scala:459) scala.collection.Iterator$$anon$11.next(Iterator.scala:494) scala.collection.Iterator.foreach(Iterator.scala:941) scala.collection.Iterator.foreach$(Iterator.scala:941) scala.collection.AbstractIterator.foreach(Iterator.scala:1429) scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47) scala.collection.TraversableOnce.to(TraversableOnce.scala:315) scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) scala.collection.AbstractIterator.to(Iterator.scala:1429) scala.collection.TraversableOnce.toList(TraversableOnce.scala:299) scala.collection.TraversableOnce.toList$(TraversableOnce.scala:299) scala.collection.AbstractIterator.toList(Iterator.scala:1429) tech.mlsql.ets.Ray.distribute_execute(Ray.scala:180) tech.mlsql.ets.Ray.train(Ray.scala:56) tech.mlsql.dsl.adaptor.TrainAdaptor.parse(TrainAdaptor.scala:116) streaming.dsl.ScriptSQLExecListener.execute$1(ScriptSQLExec.scala:408) streaming.dsl.ScriptSQLExecListener.exitSql(ScriptSQLExec.scala:447) streaming.dsl.parser.DSLSQLParser$SqlContext.exitRule(DSLSQLParser.java:296) org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:47) org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:30) org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) streaming.dsl.ScriptSQLExec$._parse(ScriptSQLExec.scala:160) streaming.dsl.ScriptSQLExec$.parse(ScriptSQLExec.scala:147) streaming.rest.RestController.$anonfun$script$1(RestController.scala:153) tech.mlsql.job.JobManager$.run(JobManager.scala:74) tech.mlsql.job.JobManager$$anon$1.run(JobManager.scala:91) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) caused by: java.lang.UnsupportedOperationException org.apache.spark.sql.vectorized.ArrowColumnVector$ArrowVectorAccessor.getMap(ArrowColumnVector.java:244) org.apache.spark.sql.vectorized.ArrowColumnVector.getMap(ArrowColumnVector.java:126) org.apache.spark.sql.vectorized.ColumnarBatchRow.getMap(ColumnarBatch.java:245) org.apache.spark.sql.vectorized.ColumnarBatchRow.getMap(ColumnarBatch.java:132) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.MapObjects_0$(Unknown Source) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.StaticInvoke_0$(Unknown Source) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:182) tech.mlsql.ets.Ray.$anonfun$distribute_execute$20(Ray.scala:182) scala.collection.Iterator$$anon$10.next(Iterator.scala:459) scala.collection.Iterator$$anon$11.next(Iterator.scala:494) scala.collection.Iterator.foreach(Iterator.scala:941) scala.collection.Iterator.foreach$(Iterator.scala:941) scala.collection.AbstractIterator.foreach(Iterator.scala:1429) scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47) scala.collection.TraversableOnce.to(TraversableOnce.scala:315) scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) scala.collection.AbstractIterator.to(Iterator.scala:1429) scala.collection.TraversableOnce.toList(TraversableOnce.scala:299) scala.collection.TraversableOnce.toList$(TraversableOnce.scala:299) scala.collection.AbstractIterator.toList(Iterator.scala:1429) tech.mlsql.ets.Ray.distribute_execute(Ray.scala:180) tech.mlsql.ets.Ray.train(Ray.scala:56) tech.mlsql.dsl.adaptor.TrainAdaptor.parse(TrainAdaptor.scala:116) streaming.dsl.ScriptSQLExecListener.execute$1(ScriptSQLExec.scala:408) streaming.dsl.ScriptSQLExecListener.exitSql(ScriptSQLExec.scala:447) streaming.dsl.parser.DSLSQLParser$SqlContext.exitRule(DSLSQLParser.java:296) org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:47) org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:30) org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) streaming.dsl.ScriptSQLExec$._parse(ScriptSQLExec.scala:160) streaming.dsl.ScriptSQLExec$.parse(ScriptSQLExec.scala:147) streaming.rest.RestController.$anonfun$script$1(RestController.scala:153) tech.mlsql.job.JobManager$.run(JobManager.scala:74) tech.mlsql.job.JobManager$$anon$1.run(JobManager.scala:91) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) File[101] have been saved.