apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.86k stars 1.77k forks source link

Whether to support restful api (http) as input source? #1270

Open tmljob opened 2 years ago

tmljob commented 2 years ago

Search before asking

Description

Whether to support restful api (http) as input source

Usage Scenario

Our data collection scenarios need to call third-party interfaces or accept pushes from third-party interfaces.

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

kalencaya commented 2 years ago

good idea. I have faced the similiar confusion just like you, for my main and core works are part of collecting data from third-party openplatform http interfaces.

But interfaces ratelimiter and authorization prevents me from syncing data by flink, have to do it by other way.

I'm glad and willing to comminucate more with you

tmljob commented 2 years ago

@kalencaya I tried to develop an http input plug-in based on version 1.X, the function is relatively simple, and I verified that it can run. On this basis, the function can be further improved. The code example is as follows:


import io.github.interestinglab.waterdrop.apis.BaseStaticInput
import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.util.EntityUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import java.io.IOException

class HttpStatic extends BaseStaticInput {
  var config: Config = ConfigFactory.empty()

  override def getDataset(spark: SparkSession): Dataset[Row] = {
    var path = config.getString("path")
    val response = getRequest(path)
    val reader = spark.read.format("json")
    var arr = Array(response)
    var rdd = spark.sparkContext.parallelize(arr)
    reader.json(rdd)
  }

  /**
   * Set Config
   * @param config
   */
  override def setConfig(config: Config): Unit = {
    this.config = config
  }

  /**
   * Get Config
   * @return
   */
  override def getConfig(): Config = {
    this.config
  }

  override def checkConfig(): (Boolean, String) = {
    config.hasPath("path") match {
      case true => (true,"")
      case  false => (false,"please specify [path] as non-empty string")
      }
    }

  def getRequest(url: String): String = {

    val httpClient = HttpClientBuilder.create().build()
    val httpGet = new HttpGet(url)
    var response: CloseableHttpResponse = null
    var request = ""
    try {
      response = httpClient.execute(httpGet)
      val entity = response.getEntity
      //      EntityUtils.
      request = EntityUtils.toString(entity)
    } catch {
      case ex: Exception => {
        ex.printStackTrace()
      }
    } finally {
      try {
        if (httpClient != null) httpClient.close()
        if (response != null) response.close()
      } catch {
        case e: IOException =>
          e.printStackTrace()
      }
    }
    request
  }

  def postRequest(url: String): String = {
    val httpClient = HttpClientBuilder.create().build()
    val httpPost = new HttpPost(url)
    httpPost.setHeader("Content-Type", "application/json;charset=utf8")

    var response: CloseableHttpResponse = null
    var request: String = ""

    try {
      response = httpClient.execute(httpPost)
      val entity = response.getEntity
      request = EntityUtils.toString(entity)
    } catch {
      case ex: Exception => {
        ex.printStackTrace()
      }
    } finally {
      try {
        if (httpClient != null) httpClient.close()
        if (response != null) response.close()
      } catch {
        case e: IOException =>
          e.printStackTrace()
      }
    }
    request
  }

  def postRequest(url: String,bodyData: String): String = {
    val httpClient = HttpClientBuilder.create().build()
    val httpPost = new HttpPost(url)
    httpPost.setHeader("Content-Type", "application/json;charset=utf-8")
    //设置超时时间
    httpPost.setConfig(RequestConfig
      .custom()
      .setConnectTimeout(5000)
      .setConnectionRequestTimeout(1000)
      .setSocketTimeout(5000)
      .build())

    val entity = new StringEntity(bodyData,"utf-8")
    entity.setChunked(false)
    httpPost.setEntity(entity)
    var response: CloseableHttpResponse = null
    var request: String = ""

    try {
      response = httpClient.execute(httpPost)
      request = EntityUtils.toString(response.getEntity,"UTF-8")
    } finally {
      try {
        httpClient.close()
        response.close()
      } catch {
        case e: IOException =>
          e.printStackTrace()
      }
    }
    request
  }

}
kalencaya commented 2 years ago

thanks for your sharing.

My data sync works are always limited by some http interfaces ratelimiter, especially for incremental sync. A classical http interface which supports incremental sync contains below request params or variant:

{
    "startTime": "yyyy-MM-dd HH:mm:ss",
    "endTime": "yyyy-MM-dd HH:mm:ss",
    "pageIndex": 1,
    "pageSize": 50
}

For ratelimiter and request params limitions, I have to controll concurrency and handle annoying network failure to get accurate and quick sync result. So, my core work is split sync task, then execute concurrently, update sync offset(latest synced time interval), I also belives flink source framework can do that process better.

Http authorization requires every request provides credentials, and I have more than 2500 credential account. Unbalanced hot data can appear in any credential account and any time, I had started a sync job for every credential account and every http interface, which just for incremental scenarios. How to handle numerous http interface credentials now really blocks me.

Now, I'm taking advantage of akka excellent concurrency and xxl-job to refactoring sync framework, you can contact me by wechat which is also kalencaya.

zhaomin1423 commented 2 years ago

thanks for your sharing.

My data sync works are always limited by some http interfaces ratelimiter, especially for incremental sync. A classical http interface which supports incremental sync contains below request params or variant:

{
    "startTime": "yyyy-MM-dd HH:mm:ss",
    "endTime": "yyyy-MM-dd HH:mm:ss",
    "pageIndex": 1,
    "pageSize": 50
}

For ratelimiter and request params limitions, I have to controll concurrency and handle annoying network failure to get accurate and quick sync result. So, my core work is split sync task, then execute concurrently, update sync offset(latest synced time interval), I also belives flink source framework can do that process better.

Http authorization requires every request provides credentials, and I have more than 2500 credential account. Unbalanced hot data can appear in any credential account and any time, I had started a sync job for every credential account and every http interface, which just for incremental scenarios. How to handle numerous http interface credentials now really blocks me.

Now, I'm taking advantage of akka excellent concurrency and xxl-job to refactoring sync framework, you can contact me by wechat which is also kalencaya.

I am willing to work with you. My wechat is 602128569.

tmljob commented 2 years ago

thanks for your sharing.

My data sync works are always limited by some http interfaces ratelimiter, especially for incremental sync. A classical http interface which supports incremental sync contains below request params or variant:

{
    "startTime": "yyyy-MM-dd HH:mm:ss",
    "endTime": "yyyy-MM-dd HH:mm:ss",
    "pageIndex": 1,
    "pageSize": 50
}

For ratelimiter and request params limitions, I have to controll concurrency and handle annoying network failure to get accurate and quick sync result. So, my core work is split sync task, then execute concurrently, update sync offset(latest synced time interval), I also belives flink source framework can do that process better.

Http authorization requires every request provides credentials, and I have more than 2500 credential account. Unbalanced hot data can appear in any credential account and any time, I had started a sync job for every credential account and every http interface, which just for incremental scenarios. How to handle numerous http interface credentials now really blocks me.

Now, I'm taking advantage of akka excellent concurrency and xxl-job to refactoring sync framework, you can contact me by wechat which is also kalencaya.

I can't find you through this WeChat account, please see if you can add me through this account: tmljob123, I will pull up a WeChat group and discuss with another brother. @kalencaya

zhaomin1423 commented 2 years ago

My opinion is that we can implement a spark-connector-http based on spark datasource api, then to integrate it.

CalvinKirs commented 2 years ago

My opinion is that we can implement a spark-connector-http based on spark datasource api, then to integrate it.

@tmljob @zhaomin1423 Contributions welcome

zhaomin1423 commented 2 years ago

My opinion is that we can implement a spark-connector-http based on spark datasource api, then to integrate it.

@tmljob @zhaomin1423 Contributions welcome

I will implement a spark-connector-http recently.

ashulin commented 2 years ago

For generality, I think the following points need to be considered:

  1. How to get HTTP API permissions, SSL or Token?
  2. How to get new data? Need to update timestamps or flip pages, or nothing at all.
  3. How to process data (Json)? Similar to Kafka Json deserialization, the format of each user may be inconsistent.