bryanyang0528 / ksql-python

A python wrapper for the KSQL REST API.
MIT License
159 stars 64 forks source link

generator is never returned #34

Closed sammerry closed 6 years ago

sammerry commented 6 years ago

Related to #29

Rather than returning a generator, query returns void/None

codecov[bot] commented 6 years ago

Codecov Report

Merging #34 into master will not change coverage. The diff coverage is 0%.

Impacted file tree graph

@@           Coverage Diff           @@
##           master      #34   +/-   ##
=======================================
  Coverage   78.51%   78.51%           
=======================================
  Files           6        6           
  Lines         270      270           
  Branches       33       33           
=======================================
  Hits          212      212           
  Misses         48       48           
  Partials       10       10
Impacted Files Coverage Δ
ksql/client.py 82.35% <0%> (ø) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 908c681...b0991d8. Read the comment docs.

bryanyang0528 commented 6 years ago

Thanks @sammerry. But query is for read data screamingly. I'm not sure if data could be return in the new version. If your would like to select data one-off, you can use ksql.

sammerry commented 6 years ago

Whenever i try to make a query using ksql with the following steps i get this error. Is there some other way to query on the /ksql endpoint?

>>> import ksql
>>> client = ksql.KSQLAPI('http://ksql:8088')
>>> client.ksql('SELECT * FROM teststream LIMIT 1')
ValueError: Status Code: 400.
Message: b'{"@type":"statement_error","error_code":40002,"message":"SELECT and PRINT queries must use the /query endpoint","stackTrace": [],"statementText":"SELECT * FROM teststream LIMIT 1;","entities":[]}

SELECT and PRINT queries must use the /query endpoint

sammerry commented 6 years ago

Without returning the generator from the /query endpoint you cant access the data returned. Can you give me an example select statement. I may not be understanding something here.

maulikjs commented 6 years ago

@sammerry for streaming queries like Print and select you use

>>> import ksql
>>> client = ksql.KSQLAPI('http://ksql:8088')
>>> client.query('SELECT * FROM teststream LIMIT 1')

and not

client.ksql(*)
bryanyang0528 commented 6 years ago

@sammerry sorry for confusing you. The example of @maulikjs is right. ksql is for commands excepting select query if for the select command

I found the official document shows the example of this case. (https://docs.confluent.io/current/ksql/docs/api.html#post--query)

I think return is ok for get the response now. @maulikjs could you help me test this?

bryanyang0528 commented 6 years ago

@sammerry I found that return worked but the parser of the response from query is not complete. Could you help me write a parser for it? or I can merge this PR first and updated the parser in the future.

>>> print([x for x in client.query("select * from teststream3 limit 2")])
['{"row":{"columns":[1532787319799,null,"qq4"]},"errorMessage":null}', '{"row":{"columns":[1532787320830,null,"fsdafasklfjas;"]},"errorMessage":null}', '\n{"row":null,"errorMessage":{"message":"LIMIT reached for the partition.","stackTrace":["io.confluent.ksql.structured.QueuedSche', 'maKStream$QueuePopulator.apply(QueuedSchemaKStream.java:184)","io.confluent.ksql.structured.QueuedSchemaKStream$QueuePopulator.a', 'pply(QueuedSchemaKStream.java:161)","org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStream', 'Peek.java:42)","org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)","org.apache.kafka.strea', 'ms.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)","org.apache.kafka.streams.processor.int', 'ernals.ProcessorNode.process(ProcessorNode.java:124)","org.apache.kafka.streams.processor.internals.AbstractProcessorContext.for', 'ward(AbstractProcessorContext.java:174)","org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.proces', 's(KStreamMapValues.java:41)","org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)","org.apac', 'he.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)","org.apache.kafka.streams', '.processor.internals.ProcessorNode.process(ProcessorNode.java:124)","org.apache.kafka.streams.processor.internals.AbstractProces', 'sorContext.forward(AbstractProcessorContext.java:174)","org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStrea', 'mTransformValuesProcessor.process(KStreamTransformValues.java:169)","org.apache.kafka.streams.processor.internals.ProcessorNode$', '1.run(ProcessorNode.java:46)","org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsIm', 'pl.java:208)","org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)","org.apache.kafka.str', 'eams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)","org.apache.kafka.streams.kstream.', 'internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)","org.apache.kafka.streams.processor.internals.', 'ProcessorNode$1.run(ProcessorNode.java:46)","org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(St', 'reamsMetricsImpl.java:208)","org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)","org.ap', 'ache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)","org.apache.kafka.st', 'reams.processor.internals.SourceNode.process(SourceNode.java:80)","org.apache.kafka.streams.processor.internals.StreamTask.proce', 'ss(StreamTask.java:224)","org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94', ')","org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)","org.apache.kafka.streams.processor.', 'internals.StreamThread.processAndMaybeCommit(StreamThread.java:923)","org.apache.kafka.streams.processor.internals.StreamThread.', 'runOnce(StreamThread.java:803)","org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)","org.', 'apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)"]}}']
davit555 commented 6 years ago

Any updates guys ? By the way don't know why but installing the lib with pip install ksql

installing strange lib which does not yield generator, it is just printing results from ksql server. In general if you are yielding generator you should return it in order to provide client to iterate over the result

bryanyang0528 commented 6 years ago

@davit555 I forgot to updated PyPI. I'll merge this PR after completing the parser.

bryanyang0528 commented 6 years ago

solved in #31