allisonwang-db / pyspark-data-sources

Custom PySpark Data Sources
https://allisonwang-db.github.io/pyspark-data-sources/
Apache License 2.0
18 stars 4 forks source link

Code of Stock Streaming DataSource #3

Closed n8shadow closed 2 weeks ago

n8shadow commented 3 weeks ago

Hi @allisonwang-db, Thanks for publishing the code of the DataSoure examples! May I kindly ask you to also publish the streaming data source code that was mentioned in the DAIS talk, especially DataSourceStreamReader extension? Many thanks in advance!

allisonwang-db commented 3 weeks ago

Hi @n8shadow, @chaoqin-li1123 is working on adding more examples for streaming data source!

n8shadow commented 3 weeks ago

That's great! I am looking for streaming examples since I am currently developing my own streaming data source with following challenge.

Quite similar to this blog post, I want to take data from a custom api that continuously exposes (latest) information of some system's state. Yet, instead of creating offsets and partitions according to ids, I am using dates which go into a filter of the api request.Overall, I want to run the entire streaming job in a scheduled manner (let's say once a day). Apparently tomorrows data isn't available today such that I need to make Spark only plan and process data until the latest data available (let's say current timestamp).

To me it is not clear how to make the custom streaming data source stop at a specific offset. I know I can implement the stop() method, but so far my attempts failed.

Do you have a hint for me how I can achieve this additional functionality? help would be much appreciated!

Might also be interesting for @chaoqin-li1123 to take into consideration when working on more examples.

chaoqin-li1123 commented 3 weeks ago

@n8shadow Can you paste your code here? stop() is not intended for stoping at a specific offset, it is intended for cleanup of resources when the query stop. For your use case, you can simply make current_stamp an offset, read data of a time range(between start offset and end offset) every batch.

n8shadow commented 3 weeks ago

@chaoqin-li1123, sure, please find a simplified version of my code below. I actually do read time ranges based on start and end offsets, but since I am constantly advancing current_date, offsets don't stop at the date according to which it is actually possible to retrieve data from the api. E.g. say I run the job today noon ("2024-08-21 12:00:00"), at some point it will try to retrieve tomorrow's data (e.g. some range partition might be ["2024-08-22 12:00:00", "2024-08-22 15:00:00",]) which, however, is not yet available.

Any hint how to best avoid this "overshooting"?

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter, InputPartition
from pyspark.streaming import StreamingContext
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.types import StructType
import pyspark.sql.functions as F
from typing import Iterator, Tuple
import json
import os
import time
import requests
from datetime import datetime, timedelta

class CustomAPIStreamDataSource(DataSource):
    """
    Data source for streaming data from the custom API containing cancellation code information.
    """
    @classmethod
    def name(cls):
        return "custom_api"

    def schema(self):
        return "date string, id string, a_b string, a_c string"

    def streamReader(self, schema: StructType):
        return CustomAPIStreamReader(schema, self.options)

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class CustomAPIStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        ##api definitions
        self.api_url_load = "..." # I am using a non-public api
        self.api_headers = {'Accept': 'application/vnd.sit.pom.simple-order-model-v2+json'}
        self.api_key = options.get("api_key")
        self.items_per_batch = options.get("items_per_batch",500) # each request can take 500 items at max from api
        ##databricks workspace definitions
        self.dbx_ws_url = options.get("dbx_ws_url")
        self.dbx_ws_token = options.get("dbx_ws_token")
        self.progress_path = options.get("progress_path")
        self.progress_url = f"{self.dbx_ws_url}/api/2.0/fs/files{self.progress_path}progress.json"
        ##stream reader definitions
        self.latest_date_current_session_dt = datetime.now()
        self.latest_date_current_session_str = str(self.latest_date_current_session_dt)
        self.stop_date_str = "2024-06-02 00:00:00.000" # test date that I want the streaming job to stop at
        self.stop_date_dt = datetime.strptime(self.stop_date_str, '%Y-%m-%d %H:%M:%S.%f')
        self._load_progress()

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader. 
        """
        return {"offset_date": "2024-06-01 00:00:00.000"}

    def _load_progress(self):
        """
        Loads the current progress from checkpoint location when starting streaming session. 
        """
        ##initialize progress file
        headers = {"Authorization": f"Bearer {self.dbx_ws_token}"}
        response = requests.get(self.progress_url, headers = headers)

        #information retrieved from queried data
        latest_date_last_session_str = str(response.json().get('latest_date_last_session',0))        
        self.latest_date_last_session_str = latest_date_last_session_str
        self.latest_date_last_session_dt = datetime.strptime(self.latest_date_last_session_str, '%Y-%m-%d %H:%M:%S.%f')
        self.current_date_str = str(response.json().get('current_date',0))   
        self.current_date_dt = datetime.strptime(self.current_date_str, '%Y-%m-%d %H:%M:%S')

    def _save_progress(self):
        """
        Saves the current_date data was processed till and the date the current session was started to progress file location. Invoked by commit().
        """
        progress_url = f"{self.progress_url}?overwrite=true"

        headers = {
            "Authorization": f"Bearer {self.dbx_ws_token}",
            "Content-Type": "application/json"
        }
        data = json.dumps({"current_date": self.current_date_str, 
                           "latest_date_last_session": self.latest_date_current_session_str}) 
        response = requests.put(progress_url, headers=headers, data=data)

    def latestOffset(self) -> dict:
        """
        todo: update: Returns the current latest offset that the next microbatch will read to.
        Note: offsets are reported in the offsets file in the checkpoint
        """
        ##implement stop criterion here(!?) streamingContext.stop()
        if self.current_date_dt + timedelta(minutes = 180) <= self.stop_date_dt:
            self.current_date_dt = self.current_date_dt + timedelta(minutes = 180)
            self.current_date_str = str(self.current_date_dt)
            return {"offset_date": self.current_date_str}
        else:
            pass#self.stop() #CustomAPIStreamReader().stop() #streamingContext.stop()

    def partitions(self, start: dict, end: dict):
        """
        Returns a range oof partitions that is used to distribute tasks to executors.
        """
        return [RangePartition(start["offset_date"], end["offset_date"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        self._save_progress()

    #def stop(self):
    #    """
    #    Stop this source and free any resources it has allocated.
    #    Invoked when the streaming query terminated.
    #    """
    #    ...

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        ##how to stop the reader when stop_date is reached?

        #get partition information
        start_date, end_date = partition.start, partition.end

        ##retrieve data from api...
        filter_condition = f'[["date", ">=", "{start_date}"], "and", ["date", "<", "{end_date}"]'
        params = {'take': self.items_per_batch, 
                'filter':filter_condition, 
                'key': self.api_key
        }
        response = requests.get(self.api_url, headers=self.api_headers, params=params)

        ##...and store them as long as they non-empty
        if len(response.json()) > 0:
            for r in response.json():
                yield (r['date'],
                        r['id'],
                        r['a']['b'],
                        r['a']['c'])
        else:
            pass
chaoqin-li1123 commented 3 weeks ago

I see, so you want the query to stop when the source reach a specific end offset? Can you use available now trigger? Consume a batch which contains all available data and exit. @n8shadow

n8shadow commented 3 weeks ago

Yes, exactly @chaoqin-li1123 , I want the streaming job to terminate at a specific offset. Using availableNow trigger unfortunately doesn't work for me out of the box. It shuts down the stream right after start. Since I am kind of handling the source myself, I assume the problem is that Spark doesn't know about available data since this information must be provided by the source, I guess, using reportLatestOffset as documented in SupportsTriggerAvailableNow. (Btw, I cannot use a One-time trigger since I can only process 500 item at most with one api request.) I would highly appreciate hints how to report the latest offset appropriately.

Overall, it seems that it isn't that straight forward to terminate the application from within the application itself. One way might be to make it hard fail on some custom exception taking care the latest offset is held constant and all data are processed, but that doesn't appear very satisfactory to me.

Terminating the application from outside (e.g. like suggested here) might be possible, but needs the application to communicate to the outside and feels quite a workaround.

chaoqin-li1123 commented 3 weeks ago

You can try to call query.processAllAvailable(), the query will terminate if the offset returned by latestOffset() doesn't advance. @n8shadow

n8shadow commented 2 weeks ago

Yes, this works. I have multiple endpoints of same type that I retrieve data from which I start as separate streams. It seems that all queries become blocked if the final offset is reached (meaning latestOffset() doesn't advance) for a single one, but I can work around that by doing the api requests centrally. Finally, I am listening to the query status and stop the query if no further data arrive. Thank you so much!

chaoqin-li1123 commented 2 weeks ago

That is unexpected, if you call query.processAllAvailable() for different queries, they are independent. @n8shadow

n8shadow commented 2 weeks ago

It seems that the processAllAvailable() call wasn't correctly distributed across queries. This is fixed now and the solution is a proper one, though must be tested in our bigger framework. Thank you once more!