apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.9k stars 4.27k forks source link

[Feature Request]: Python-based Unbounded WebSocker IO Connector #33229

Open CWrecker opened 5 days ago

CWrecker commented 5 days ago

What would you like to happen?

Apache Beam lacks a native Python-based IO connector that can ingest data directly from a socket. This feature would enable users to easily integrate streaming data sources, such as those emitting messages over TCP/IP sockets, into their Apache Beam pipelines.

Many real-time data sources, such as custom data generators, IoT devices, and legacy systems, often send data over sockets. Building a socket-based IO connector in Python would allow Beam pipelines to process this data seamlessly without requiring users to implement custom socket reading logic outside the Beam ecosystem.

Primary Question(?): Any advice on implementing an unbounded source would be appreciated. I have only recently begun to dig into Apache Beam.

Additional Context

Existing IO connectors in Beam are often geared towards standard services like Kafka, Pub/Sub, etc. Adding support for sockets will cater to users dealing with more specialized or ad-hoc data sources.

Current approach to read from socket


class ReadFromWebSocket(beam.DoFn):
    """
    A custom DoFn to read messages from a WebSocket stream.
    """
    def __init__(self, ws_url):
        """
        Initializes the WebSocket reader with the target URL.
        """
        self.ws_url = ws_url
        self.ws_connection = None

    def setup(self):
        """
        Set up the WebSocket connection.
        """
        self.ws_connection = create_connection(self.ws_url)
        print(f"Connected to WebSocket: {self.ws_url}")

    def process(self, element, *args, **kwargs):
        """
        Reads data from the WebSocket and outputs it as elements.
        """
        try:
            while True:
                # Read from WebSocket
                message = self.ws_connection.recv()
                message = json.loads(message)
                yield beam.window.TimestampedValue(message, datetime.datetime.now().timestamp())
                # Avoid busy-waiting
                time.sleep(0.01)
        except Exception as e:
            print(f"Error while reading WebSocket: {e}")

    def teardown(self):
        """
        Clean up the WebSocket connection.
        """
        if self.ws_connection:
            self.ws_connection.close()
            print(f"Closed WebSocket connection to {self.ws_url}")

Pipeline Example

  with beam.Pipeline(options=options) as pipeline:

          # Start with a dummy source (PBegin) that triggers the custom DoFn
          (

              pipeline
              | "CreateStart" >> beam.Create([None])  # Start with a single dummy element

              | "ReadFromWebSocket" >> beam.ParDo(ReadFromWebSocket(ws_url))

              | "WindowIntoFixed" >> beam.WindowInto(
                  GlobalWindows(),
                  trigger=trigger.Repeatedly(trigger.AfterCount(10)),
                  accumulation_mode=AccumulationMode.ACCUMULATING)
              # Extract and sum username/score pairs from the event data.
              | 'ExtractAndSumScore' >> ExtractAndSumScore('team')
              | "PrintMessages" >> beam.Map(print)  # Replace with actual processing logic
          )
class ExtractAndSumScore(beam.PTransform):
  """A transform to extract key/score information and sum the scores.
  The constructor argument `field` determines whether 'team' or 'user' info is
  extracted.
  """
  def __init__(self, field):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super().__init__()
    beam.PTransform.__init__(self)
    self.field = field

  def expand(self, pcoll):
    print(pcoll)
    return (
        pcoll
        | beam.Map(lambda elem: (elem[self.field], elem['score']))
        | beam.CombinePerKey(sum))

The current pipeline stalls when combined with a window and aggregation.

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

CWrecker commented 5 days ago

.take-issue

github-actions[bot] commented 4 days ago

Label p2 cannot be managed because it does not exist in the repo. Please check your spelling.

github-actions[bot] commented 4 days ago

Label cannot be managed because it does not exist in the repo. Please check your spelling.

github-actions[bot] commented 4 days ago

Label cannot be managed because it does not exist in the repo. Please check your spelling.

CWrecker commented 4 days ago

.set-labels P2,python,io,'new feature'

damondouglas commented 4 days ago

cc: @damondouglas