jamesshocking / Spark-REST-API-UDF

Example of how to leverage Apache Spark distributed capabilities to call REST-API using a UDF
MIT License
50 stars 18 forks source link

Paginating api requests #2

Open kijewskimateusz opened 2 years ago

kijewskimateusz commented 2 years ago

Hello!

First, thank you for detailed guide on how to handle api requests using worker nodes. Based on your manual, I'm trying to implement solution alike for paginated APIs. I've come up with an idea to create while loop to execute api requests for each page, but I'm not sure if this solution will support parallel spark execution, when it comes to extracting data hidden in results struct.

import requests
import json
from pyspark.sql.functions import udf, col, explode, max
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql import Row

def executeRestApi(verb, url):
  #
  headers = {
      'content-type': "application/json"
  }
  res = None
  # Make API request, get response object back, create dataframe from above schema.
  try:
    if verb == "get":
      res = requests.get(url)
    else:
      res = requests.post(url)
  except Exception as e:
    return e
  if res != None and res.status_code == 200:
    return json.loads(res.text)
  return None

schema = StructType([
  StructField("count", IntegerType(), True),
  StructField("next", StringType(), True),
  StructField("previous", StringType(), True),
  StructField("results", ArrayType(
    StructType([
      StructField("name", StringType()),
      StructField("url", StringType())
    ])
  ))
])

udf_executeRestApi = udf(executeRestApi, schema)

from pyspark.sql import Row
offset = 10
count = 999
enum = 0
url = "https://pokeapi.co/api/v2/pokemon?limit={offset}".format(offset=offset)

while offset < count:
    RestApiRequestRow = Row("verb", "url")
    request_df = spark.createDataFrame([RestApiRequestRow("get", url)])
    if enum == 0:
        result_df = request_df \
             .withColumn("result", udf_executeRestApi(col("verb"), col("url")))
        count = result_df.select(col("result.count")).distinct().collect()[0][0]
        url = result_df.select(col("result.next")).distinct().collect()[0][0]
        offset += offset
        enum += 1
    else:
        append_df = request_df \
             .withColumn("result", udf_executeRestApi(col("verb"), col("url")))
        result_df = result_df.union(append_df)
        count = append_df.select(col("result.count")).distinct().collect()[0][0]
        url = append_df.select(col("result.next")).distinct().collect()[0][0]
        offset += offset
somuworld commented 2 years ago

Did this work? This approach looks like it would not provide the required parallelism. the function 'withcolumn' is being called out in every iteration causing the API call to happen in sequence rather than in parallel.

jamesshocking commented 2 years ago

@somuworld - apologies for the late reply. I agree and you are correct, this wouldn't create the parallelism that is needed. I supplied a different answer, albeit via email.

The problem with this solution and as you rightly point out, the dataframe will execute a task for each iteration of the loop. A better solution would be to first identify how many pages are available, build a dataframe for each page and then apply an action. This would execute in parallel.

The catch would be if the REST API wasn't able to provide a sum of pages. In this case I would argue that Spark isn't the right tool for this type of job. Better to use Python, write the results to file and then use Spark.

somuworld commented 2 years ago

yeah and thank you it worked for me like a charm. But one question is how do we handle expiring tokens? Our API uses OAutho2.0 and we have multiple API endpoints where we paginate through multiple pages which takes long time to complete. The bearer access token is valid only for 15mins and I'm able to renew token when old one expires within UDF but unable to pass it on to next API call. I tried using databricks delta tables (we run our code on databricks) but looks like we can't access them through UDF when called from dataframe.

Only option I see is to write the access token to a file and refer it from UDF, whenever a token expires renew it and update file from UDF itself. But I'm not happy with this approach as this involves reading the file for each and every API call which might be effect the performance. Any thoughts @jamesshocking

jamesshocking commented 2 years ago

@somuworld, the only alternative that i can think of is to use the foreachpartition method on the rdd object, which you can access via dataframe.rdd.

Sadly broadcast variables are read only and accumulators are numeric leaving either your option (either a file or memcache of sorts) or the the rdd route.

As soon as you execute an action, the foreachpartition code will execute. You pass an object to the function and there you'll be able to declare a singleton representing your shared token along with a separate function to execute your Http request.