Open lazarillo opened 1 week ago
@lazarillo Is this for all the BigQueryIO method or just for StorageWrite? For the latter, it is quite easy since it is done through x-lang and Java already supports that. cc @ahmedabu98
@liferoad Java does support it but xlang introduces another bottle neck because it requires us to send Beam Rows from Python to Java.
We would have to convert protos in Python to Beam Rows and --> over to Java, where it would convert back to protos and write to BigQuery
@liferoad , I do not use StorageWrite because I have found some situations where working with the cross-language APIs gets confusing and/or weird. So I try to stick with just one SDK for the entire pipeline. (I cannot recall one at the moment, sorry.)
So yes, I understand that I can make cross-language calls, but I am trying to stay away from that.
I hadn't even thought of @ahmedabu98 's point, but yes, it sounds like that wouldn't help the performance issue, anyway.
I actually have the following notes in my internal repo, which might give some idea as to why this sort of feature is useful.
As I'm sure anyone reading this thread is aware, the way that Google handles timestamps across all of its products is all over the place.
I actually have notes within our repo to make sure that I (or anyone else) does not forget all of these idiosyncracies:
Timestamp expectations are all over the place within the Googleverse.
Timestamp
message, which has the fields seconds
and nanos
. (Note, it is nanos
, not nanoseconds
).Timestamp
message, and you must instead provide an integer which is the number of microseconds since epoch (1970-01-01). Yes, microseconds.WriteToBigQuery
with a protobuf message, you have to first convert to JSON or a dict
or something similar. In this case (when working with JSON, or a dict
, which is pushed to a JSON representation behind the scenes as best I can tell), the timestamp must either be an integer which is the number of seconds since epoch or it can be a float which is the number of milliseconds since epoch. So the expected resolution depends upon the primitive type! Best to avoid using a numeric timestamp in this case.Yes, you read that correctly, when working with timestamps in the Googleverse, it must be represented either as nanoseconds, microseconds, milliseconds, or seconds, depending upon the resource you're talking to and the data type of the timestamp itself (and maybe the language you're using).
Best practice to deal with all of this:
Timestamp
because when this is converted to a JSON or dict
, the protobuf conversion code manages this by converting it to a string representation using the proper scale.
int
or float
.So the addition of this feature will drastically simplify our workflow of type checking, etc.
We do not have a plan now to improve the BigQueryIO methods except StorageWrite. This is why I asked whether you use StorageWrite.
OK. Bummer to hear that. I cannot be the only one to think that the challenges in working cross-language are not worth the hassle.
How viable is it to create a PR to implement this sort of thing?
You are definitely welcome to make any PR to improve this experience.
I think we have two issues here if we plan to work on StorageWrite:
Another idea (only for your case if it is simple to use BigQuery as a sink) is you could create your own IO following this BigQuery example: https://cloud.google.com/bigquery/docs/samples/bigquerystorage-append-rows-raw-proto2#bigquerystorage_append_rows_raw_proto2-python
It could be pretty challenging if it's your first time delving into Beam code. Our BigQuery connectors are our most complex.
You also don't have a Python-native Storage Write API connector to work off of. And I think to work with protos, you'd need Storage Write API. From a cursory search, I don't think the legacy streaming API supports protos, but the Storage Write API Python client does.
With that said, it's definitely feasible and we'd welcome such an improvement. You could open a PR with the functionality and unit/integration tests and someone will review it. Just make sure it is well integrated with the existing WriteToBigQuery API
Side note - Beam is shifting towards leaning more on cross-language transforms, so if there's something we can improve (I know there's plenty) we'd love the feedback. Your comment on timestamps was pretty comprehensive
I will think more on my biggest pain points going cross-language, and provide a specific example when I remember / experience it.
But for now, I'll give one pain point: I am really bad at Java. I am a machine learning / data engineer. If I could, I'd work with modern languages like Go or Rust. But I need to use a language that is well supported for my career. So that means either Java, Python, or Scala. (Less so Scala these days, but it was quite big when Spark initially started.) I chose Python because it's easiest to work with, but also because I vehemently believe that code should be easy to read, and that object oriented is a choice that is only sometimes the right choice. So Python seemed better than Java. Lastly, the JVM was far more useful before Docker came onto the scene. So even that perk lost its luster.
I very much rely upon discovered code when documentation is insufficient. I can read just about any Python code, no matter how complex, and understand what it is doing and how I can interact with it (and whether I should or not). But with Java (or Go), I am too ignorant of the language: I need good docs in order to know what to do and how to interact with the SDK.
I think the Apache Beam docs are great. But Beam is such a complex beast that often I learn what I really need to know by going straight to the source code (which is linked to throughout the documentation, which is awesome).
Because of all this, I try to stay with the pure Python code as much as possible. It allows me more confidence to create my pipeline. Even a "simple" pipeline is quite complex, I've found in my experience.
Thanks, @ahmedabu98 . The timestamps thing is by no means Beam's fault... that's a Google thing, and specifically a GCP thing. I know there is strong overlap. But still, Beam should not need to compensate for GCP because Beam needs to be cross-runner to be successful, IMHO.
I know I just said I'm not a fan of cross-language. But I do like the cross-runner work!
I did try to use the StorageWrite API about 1 year ago, and I was having trouble getting it to work. That was in an Airflow job, not within Dataflow / Beam, and it was a while ago. I guess I'll try StorageWrite again, if it's where all the work is going.
Can I ask: why is there a strong push for cross-language, instead of creating separate SDKs for each language? Is it easier to maintain? I assume there a loss in efficiency or bandwidth by going cross language, right?
I ask because we were considering moving our Dataflow work to Go once Go is more fully supported. But if Go is supported via cross-language integrations, then I guess there is no speed-up or memory reasons to move to Go.
Yes, it is easier to maintain. For our own tests, the efficiency is not a problem with cross-lang. For Go, most IOs will be supported via cross-lang. cc @lostluck @kennknowles @robertwb @chamikaramj they may have more comments.
I try to stay with the pure Python code as much as possible
We're ideally aiming for a multi-language experience where the user is not even aware that they are using a cross-language transform. They might still need a JVM or docker, but ideally they'd just worry about writing pure Python code like you say
I did try to use the StorageWrite API about 1 year ago, and I was having trouble getting it to work
Might be worth a shot trying it again -- there was quite a bit of work done around 6 months ago to fix multiple issues with Storage Write API
why is there a strong push for cross-language
Yep it's a lot easier to maintain, and significantly lowers the threshold for new SDKs to become usable. Also new features added to the main transform can almost instantly become available to foreign SDKs. With that said, there's nothing stopping SDKs from still developing and building native transforms.
I assume there a loss in efficiency or bandwidth by going cross language
There's some -- cross-language introduces a fusion break between SDKs, and elements need to get serialized to pass through.
were considering moving our Dataflow work to Go once Go is more fully supported
@lostluck can speak more on Go's roadmap
OK, thanks everyone!
I'm fighting other issues at the moment, but once I get a chance, I'll revisit the StorageWriteAPI and see if it can work for me.
Regarding Beam Go's roadmap, it's tied very tightly to Prism (https://beam.apache.org/documentation/runners/prism/) which is written in Go, and replaced that SDK's direct runner.
Basically the Go SDK can't move forward until the local experience lets you run and test everything. Which ultimately includes Cross Language transforms too.
Which means that Prism needs to be able to execute any Python and Java (and other SDK) pipelines too!
Currently, actual work on Prism is a bit on hold since I had to run the 2.59 release, and do a few talks at Beam Summit, and as of next week I'm on a month long vacation. But it's always moving forward.
Updating Python API to accept protos instead of dicts is a very valid feature request. I think this is orthogonal to the fact that Python transform itself is cross-language (uses Java underneath). We just need to introduce a Beam Row (which is the standard cross-language data type) that includes the proto bytes and the schema that is to be written to BQ generated from the proto and may be some additional information provided by the customer (Java API requires providing a Java class type).
Regarding timestamp inconsistencies, this is also something we should resolve (also orthogonal to cross-language and other implementation details).
What would you like to happen?
To my knowledge, there is no way to write protocol buffers to BigQuery using the Python SDK.
I am currently writing them by converting them to a
dict
(or to JSON if that were better, but usingdict
for now) and then writing the dict to BigQuery.But the point of protocol buffers is that they reduce the data that we're sending over the wire by typically around 80%. So, converting back to a
dict
or JSON right before sending the data over the wire is counter-productive.Is this feature already on the roadmap? Could it be placed on the roadmap? Is this a feature that I could help implement, or would it be far too complex with under-the-hood elements?
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components