akka / alpakka-samples

Example projects building Reactive Integrations using Alpakka
https://akka.io/alpakka-samples/
68 stars 37 forks source link

Fix csv source for the alpakka-sample-http-csv-to-kafka #68

Open giorgiopers opened 4 years ago

giorgiopers commented 4 years ago

The nasdaq changed the way they share data with the outside world. Just to see it working I've used another open source csv

val uriCsv = "https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2019-financial-year-provisional/Download-data/annual-enterprise-survey-2019-financial-year-provisional-csv.csv"

It's probably not the best link to use but currently the project just doesn't work because of a missing source.

giorgiopers commented 4 years ago

Also having a look at why publishing flow just hang because of the error.

ennru commented 4 years ago

Thank you for suggesting a new data source for this example.

PKRoma commented 3 years ago

I have found that the Nasdaq URI can be replaced with https://api.nasdaq.com/api/screener/stocks?tableonly=true&limit=25&exchange=NASDAQ&download=true and it works.

However, this new Nasdaq URI only responds to http2 requests, and so the example code must be changed in order to support http2. This is no mean feat and isn't as simple as making a change here or there.

It would be great if the examples were updated to use http2.

PKRoma commented 3 years ago

For example, step_001_http_request can be changed to this code, but it's only a hack to make it work:

/*
 * Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
 */

package samples

import akka.Done
import akka.actor._
import akka.http.scaladsl._
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges, ResponsePromise }
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Flow, Sink, Source }

import scala.concurrent.{ Future, Promise }

object Main extends App {

  implicit val actorSystem = ActorSystem("alpakka-samples")

  import actorSystem.dispatcher

  val httpRequest = HttpRequest(uri = "https://api.nasdaq.com/api/screener/stocks?tableonly=true&limit=25&exchange=NASDAQ&download=true")
    .withHeaders(Accept(MediaRanges.`text/*`))

  val future: Future[Done] =
    Source
      .single(httpRequest) //: HttpRequest
      .mapAsync(1)(singleRequest(Http().connectionTo("api.nasdaq.com").http2())(_)) //: HttpResponse
      .runWith(Sink.foreach(println))

  future.map { _ =>
    println("Done!")
    actorSystem.terminate()
  }

  // Boilerplate to deal with http2 from https://doc.akka.io/docs/akka-http/current/client-side/http2.html
  def singleRequest(connection: Flow[HttpRequest, HttpResponse, Any], bufferSize: Int = 100): HttpRequest => Future[HttpResponse] = {
    val queue =
      Source.queue(bufferSize, OverflowStrategy.dropNew)
        .via(connection)
        .to(Sink.foreach { response =>
          // complete the response promise with the response when it arrives
          val responseAssociation = response.attribute(ResponsePromise.Key).get
          responseAssociation.promise.trySuccess(response)
        })
        .run()

    req => {
      // create a promise of the response for each request and set it as an attribute on the request
      val p = Promise[HttpResponse]()
      queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p)))
        // return the future response
        .flatMap(_ => p.future)
    }
  }

}
ennru commented 3 years ago

Thank you for digging up the new URL. Would you be in a position to suggest these changes in a PR?