medzin / beam-postgres

Light IO transforms for Postgres read/write in Apache Beam pipelines.
Apache License 2.0
12 stars 3 forks source link

How to update values from PCollection. #1

Open SnoozingSimian opened 1 year ago

SnoozingSimian commented 1 year ago

Hey I have been trying to use your library (which is very useful by the way) to try to Update a few records in by database. I am providing a snippet of code here which should provide some insight on what I am trying to achieve.

import apache_beam as beam
from beam_postgres.io import WriteToPostgres
from dataclasses import dataclass

DB_USERNAME = <db user name>
DB_PASSWORD = <db pass>
DB_HOST = <db host>
DB_PORT = <db port>
DB_DATABASENAME = <db name>

@dataclass
class DocId:
    document_id: str

class ProcessVals(beam.DoFn):
    def process(self, value):
        if value is None:
            yield DocId(None)
        else:
            yield DocId(value)

def check_if_none(value):
    if value.document_id is None:
        return False
    else :
        return True

with beam.Pipeline() as p:
    data = p | "Creating" >> beam.Create(
        ['02267a6d-0a9f-40bf-9051-4971961cb0ac', 
         '05db919e-adda-41c2-9197-54623dff6d1a', 
         '04c607ec-64e8-420e-b5a5-bb63e035ba2f', 
         None, 
         None]
       )

    data2 = data | "Molding" >> beam.ParDo(ProcessVals()) | "Filter" >> beam.Filter(check_if_none)

    data2 | "Writing example records to database" >> WriteToPostgres(
        conninfo = f"host={DB_HOST} dbname={DB_DATABASENAME} user={DB_USERNAME} password={DB_PASSWORD}",
        statement ="UPDATE documents SET is_available = true WHERE document_id = %s",
        )

Unfortunately this does not seem to be working, the pipeline runs fine, but I do not see the changes reflected in my database. I am really new to the apcache-beam space and I am having troubleshooting, could you help?

medzin commented 1 year ago

Hi, thanks for using the library! I would start by adding error logging, just like in this example: https://github.com/medzin/beam-postgres/blob/main/examples/write_error.py#L31

Also, it looks like there is a typo in the process method of the ProcessVals class:

if value is not None:
    yield DocId(None)

This DoFn will always create DocId objects with document_id set to None, so filtering will always return an empty collection.

SnoozingSimian commented 1 year ago

Thank you for your prompt reply. Yes, that seems to have been the case. As you might guess this is just an imitation of the actual code I am trying to run. Let me recheck if I have made the same mistake in my main code.

I tried to log the error and here is the error I got.

(DocId(document_id='0230d273-ca7f-42d4-bf2d-5552d8ec9efa'), ProgrammingError('the query has 1 placeholders but 0 parameters were passed'))

It seems like I am passing the correct object and it still looks like it does not recognise the parameter. For reference, the query I am using is this.

UPDATE documents SET is_available = true WHERE document_id = %s
medzin commented 1 year ago

The error message suggests that document_id is set to None. You could try printing data classes before writing with beam.Map(print).

SnoozingSimian commented 1 year ago

But the error message shows the dataclass value right? The document id is clearly set to a valid value.

medzin commented 1 year ago

What runner are you using to run the pipeline (e.g. DataflowRunner)?

SnoozingSimian commented 1 year ago

Right now I'm testing using the direct runner but eventually I'll be using the dataflow runner

medzin commented 1 year ago

I experienced a few problems with data class serialization on Dataflow - fields appeared to be set. Still, some class metadata was lost after deserialization, and function call here is returning an empty tuple. They only work without problems when I put my models in a dedicated Python package outside the pipeline code. I use apache-beam[gcp]==2.42.0 and Dataflow Flex Templates.

SnoozingSimian commented 1 year ago

The issue is I am not yet even using DataFlow, just the plain old direct runner. I see that the code example above actually updates the required documents. And in my main code, I am actually using the exact same environment apache-beam==2.46.0 and python versions (3.10.8). I am also using the same dataclass and query. So I am not exactly sure why these are behaving differently.

medzin commented 1 year ago

Well, I'm using Python 3.9.16, which could be the difference causing the problem. Still, I suggest you put the data class in a dedicated Python package and check if it removes the problematic behavior.

SnoozingSimian commented 1 year ago

Okay, let me try that. I'll report the results.

SnoozingSimian commented 1 year ago

Hey so what I did was use tuples instead of Dataclasses. It seems to work now although I am not entirely sure what was causing the line you mentioned to fail.

medzin commented 1 year ago

The problem is that the astuple returns an empty tuple for data classes defined in the main session. It looks like that after the serialization/deserialization process __dataclass_fields__ attribute is corrupted somehow (it is used inside the astuple implementation).