vasia / gelly-streaming

An experimental Graph Streaming API for Apache Flink
Apache License 2.0
140 stars 44 forks source link

How to use GellyStream API in Scala code #32

Closed lilicao701 closed 7 years ago

lilicao701 commented 7 years ago

Hello, I want to use GellyStream algorithm API in my Scala project, but when I transform ConnectedComponentsExample.java code to My Scala code, but when compile with error: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:232) at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223) at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:257) at com.huawei.flink_test.gelly_stream_test.ConnectedComponent$.main(ConnectedComponent.scala:120) at com.huawei.flink_test.gelly_stream_test.ConnectedComponent.main(ConnectedComponent.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

My code : val edges: GraphStream[lang.Long, NullValue, NullValue] = getGraphStream(socketStream,split,env) val init_val: (lang.Long, lang.Long) = (0l,0l)

val cc_new: ConnectedComponents[lang.Long, NullValue] = new ConnectedComponents[java.lang.Long, NullValue](mergeWindowTime)
val cc = edges.aggregate(cc_new)
 val vertexInfo = cc.flatMap(new FlatMapFunction[DisjointSet[lang.Long], (lang.Long, lang.Long)] {
   override def flatMap(set:DisjointSet[lang.Long], out: Collector[Tuple2[lang.Long, lang.Long]]) = {
          val matches= set.getMatches
          wrapAsScala.mapAsScalaMap(matches).foreach(out.collect)
  }
})

val keyValuePair = vertexInfo.keyBy(0).timeWindow(Time.of(printWindowTime, TimeUnit.MILLISECONDS))
vasia commented 7 years ago

Hi @lilicao701, thank you for trying out gelly-stream. I haven't tried using it in Scala code and I wouldn't recommend it at the moment. We might want to add a Scala API in the future, but for now I would suggest you try it out in Java. -V.