akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 645 forks source link

[elasticsearch] expose createBulk from ElasticsearchFlow to ElasticsearchSink #2358

Open aitorhh opened 4 years ago

aitorhh commented 4 years ago

Short description

Lately, a createBulk function has been added to the ElasticsearchFlow. But the Sink has not been updated. I suggest to add the same createBulk functions to the Sink.

Details

And example to expand the existing ElasticsearchSink as follows:

Note that the following is not complete, but illustrates one example:

  def createBulk[T](indexName: String,                                                                                                                                                                                                                                                        
                typeName: String,                                                                                                                                                                                                                                                           
                settings: ElasticsearchWriteSettings = ElasticsearchWriteSettings.Default)(                                                                                                                                                                                                 
      implicit elasticsearchClient: RestClient,                                                                                                                                                                                                                                             
      sprayJsonWriter: JsonWriter[T]                                                                                                                                                                                                                                                      
  ): Sink[immutable.Seq[WriteMessage[T, NotUsed]], Future[Done]] =
    ElasticsearchFlow.createBulk[T, NotUsed](indexName, typeName, settings).toMat(Sink.ignore)(Keep.right)

Unfortunately, I have no time to provide a PR during the summer.

ennru commented 4 years ago

Thank you for suggesting this. As this sink doesn't provide anything over adding Sink.ignore, I suggest we leave it to the user to attach that.

aitorhh commented 4 years ago

True.. And I've done that but I guess it's good for completion to map Flow and Sink capabilities? At least, we have it here for reference :)