bryanyang0528 / ksql-python

A python wrapper for the KSQL REST API.
MIT License
159 stars 67 forks source link

*return* value, don't *print* it? #29

Closed r2evans closed 2 years ago

r2evans commented 6 years ago

Is there a reason that BaseAPI.query prints the returned data to the console instead of returning it? The .ksql method returns the value (as I would expect), why is the behavior different?

For the time being, I'm inheriting and overwriting this method:

     def query(self, query_string, encoding='utf-8', chunk_size=128):
         """
         Process streaming incoming data.

         """
         r = self._request(endpoint='query', sql_string=query_string)

-        for chunk in r.iter_content(chunk_size=chunk_size):
-            if chunk != b'\n':
-                print(chunk.decode(encoding))
+        r = [ a.decode(encoding) for a in r.iter_lines(chunk_size=chunk_size) if not a in [b'\n', b''] ]
+        return(r)

Side note: I use .iter_lines instead of .iter_content, because the latter is splitting lines. For example, if I return the first r immediately after self._request(...), then I can get something like this:

In [884]: r = myclient.query("select a,b from t_loop limit 3")
In [885]: list(r.iter_content(chunk_size=128))
Out[885]: 
[b'\n',
 b'\n',
 b'\n',
 b'\n',
 b'\n',
 b'\n',
 b'{"row":{"columns":["90","1"]},"errorMessage":null}',
 b'\n',
 b'\n',
 b'{"row":{"columns":["94","2"]},"errorMessage":null}',
 b'\n',
 b'{"row":{"columns":["47","2"]},"errorMessage":null}',
 b'\n',
 b'\n',
 b'\n',
 b'\n{"row":null,"errorMessage":{"@type":"generic_error","error_code":50000,"message":"LIMIT reached for the partition.","stackTrace',
b'":["io.confluent.ksql.structured.QueuedSchemaKStream$QueuePopulator.apply(QueuedSchemaKStream.java:179)","io.confluent.ksql.stru',

where the long error message continues as individual lines per element in the returned list. If I want to use any of these, I can json.loads each of the elements until I get to the error, at which point the b'\n{"row":null,"errorMessage":..."stractTrace' will not parse correctly. Why? Because it is incomplete JSON, requiring the concatenation of the subsequent lines (32 lines for this error). If wanted to try to json.loads them, I'd have to try a single element; if that fails, concat it with the next; if that fails, keep trying until I find a complete subsequence of elements that contains the entire JSON. Possible but inefficient.

Instead:

In [887]: r = myclient.query("select a,b from t_loop limit 3")
In [888]: list(r.iter_lines())
Out[888]: 
[b'',
 b'',
 b'',
 b'',
 b'',
 b'',
 b'{"row":{"columns":["71","2"]},"errorMessage":null}',
 b'{"row":{"columns":["4","3"]},"errorMessage":null}',
 b'',
 b'{"row":{"columns":["91","3"]},"errorMessage":null}',
 b'',
 b'',
 b'',
 b'{"row":null,"errorMessage":{"@type":"generic_error","error_code":50000,"message":"LIMIT reached for the partition.","stackTrace":["io.confluent.ksql.structured.QueuedSchemaKStream$QueuePopulator.apply(QueuedSchemaKStream.java:179)","io.confluent.ksql.structured.QueuedSchemaKStream$Que...

and the error message (truncated here) is contained in one element in the list. This way, I know that a long return-value is always in one element.

r2evans commented 6 years ago

Okay, on further reflection, I believe I understand a distinction ... your assumption is that the query is streaming/ongoing, in which case this might be preferred.

Alternative: produce a generator, thereby allowing the client to decide. So instead of

In [1]: client.query('select * from table1')
{"row":{"columns":[1512787743388,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753200,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}

it could be

In [2]: for ret in client.query('select * from table1'): print(ret)
{"row":{"columns":[1512787743388,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753200,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}

This would enable:

In [3]: list(client.query('select * from table1'))
['{"row":{"columns":[1512787743388,"key1",1,2,3]},"errorMessage":null}',
 '{"row":{"columns":[1512787753200,"key1",1,2,3]},"errorMessage":null}',
 '{"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}',
 '{"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}']
bryanyang0528 commented 6 years ago

LGTM. Could you create a new pull request for this update or use yield for this issue like #28 ?

r2evans commented 6 years ago

I had not seen #28. It seems the only difference is that I suggest the use of .iter_lines as well. What else would you need in the PR?

bryanyang0528 commented 6 years ago

I think using yield should solve this issue?

KenCox94 commented 2 years ago

assuming this issue is resolved so I am going to close this. @bryanyang0528 can reopen if we are still having this issue

r2evans commented 2 years ago

I haven't checked recently, but why are you assuming that this is resolved?

KenCox94 commented 2 years ago

@r2evans ill open it again

KenCox94 commented 2 years ago

@r2evans would you like to make a pull request related to this or would you like the issue perpetually open?

r2evans commented 2 years ago

I apologize for not immediately recognizing that e7a004ab81ae35963befc287bef8318fd8c74a9f resolves the issue by yielding the chunk instead of printing it. Having said that, at least be honest about it: either you believe it's resolved and don't know off-hand which commit; or you believe it's not an issue and you're not concerned with proving, resolving, or explaining yourself.

Thanks for maintaining this module.

KenCox94 commented 2 years ago

Considering how quickly you responded to a comment and issue closure on something thats about 4 years old; I had assumed you were more astute on this. So apologies for seeming curt.

r2evans commented 2 years ago

I've become accustomed to issues I open being resolved via a commit, either implicitly (the PR comment includes text that causes GH to auto-close it, ala https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue) or explicitly by a maintainer's comment about being fixed. To be candid, however, I was exploring KSQL for other needs and found this was a reasonable start at using it in python; ultimately the deciding "authority" chose against KSQL for performance reasons, and my need to access it went away. The reason for my comment is that I still think the original issue was valid for somebody (perhaps pedagogic since not currently practical for me), so I was searching for a reason. I wish I had seen the fix back in 2018, it would have been useful at the time.

Either way, (1) the original issue was resolved; (2) my response unfortunately came across as pedantic, perhaps I've been victim to several recent issues being automatically closed because the maintainers do nothing with it, not because the bug was fixed or mitigated (nextcloud, I'm looking at you).

Again, thanks.