jamesshocking / Spark-REST-API-UDF

Example of how to leverage Apache Spark distributed capabilities to call REST-API using a UDF
MIT License
47 stars 18 forks source link

API throttle #1

Open Seojunkim opened 2 years ago

Seojunkim commented 2 years ago

Is it possible to throttle the API calls if the API supports low RPS?

jamesshocking commented 2 years ago

When Spark executes a Python UDF, it starts a separate Python process, runs the code and consumes the result ready for further processing. What this means is that just like any HTTP/HTTPS request from Python using a libraray like Requests, you will be able to use regular Python code to implement a delay. The catch is that each execution of a UDF is independent of any other and so the challenge will be to coordinate requests across each UDF call.

The only way I know of doing this is to use an additional column on the Dataframe, which can be passed to the UDF to help it understand how long it should wait for before executing the HTTP/S call.

For example, you could use the Python sleep function https://realpython.com/python-sleep/.

from pyspark.sql import SparkSession
import requests
import json
import time
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql import Row

# response function - udf
# delay in seconds
def executeRestApi(url, delay=0):
  headers = {
      'content-type': "application/json"
  }
  time.sleep(delay)
  res = None
  # Make API request, get response object back, create dataframe from above schema.
  try:
    res = requests.get(url, data=body, headers=headers)    
  except Exception as e:
    return e

  if res != None and res.status_code == 200:
    return json.loads(res.text)

  return None

#
schema = <declare schema....>
#
udf_executeRestApi = udf(executeRestApi, schema)

# requests with a 10 second delay
RestApiRequest = Row("url", "delay")
request_df = spark.createDataFrame([
            RestApiRequest("get", "https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json", 10)
          ])\
          .withColumn("execute", udf_executeRestApi(col("url"), col("delay")))

request_df.select(explode(col("execute.Results")).alias("results"))\
    .select(col("results.Make_ID"), col("results.Make_Name")).show()