Datatamer / tamr-client

Programmatically interact with Tamr
https://tamr-client.readthedocs.io
Apache License 2.0
11 stars 25 forks source link

TC: record.stream() not returning records #481

Closed tiems90 closed 3 years ago

tiems90 commented 3 years ago

Using tc.dataset.record.stream is not returning any records of the associated dataset. This was confirmed against a dataset and the TUC implementation, which did work.

🐛 bug report

tc_ud = tc.dataset.unified.from_project(s,projects[-1])
next(tc.dataset.record.stream(s, tc_ud))

Gives an error for all projects.

🤔 Expected Behavior

I would expect to iterate over the records of a dataset.

😯 Current Behavior

An error is thrown. Alternatively [x for x in tc.dataset.record.stream(s, tc_ud)] should not return an empty list.

🔦 Context

It is possible to set stream=False in the function, but this loads the full query into memory.

💻 Code Sample

os.environ["TAMR_CLIENT_BETA"] = '1'
import tamr_client as tc
tc_ud = tc.dataset.unified.from_project(s,projects[-1])
next(tc.dataset.record.stream(s, tc_ud))
[x for x in tc.dataset.record.stream(s, tc_ud)]

🌍 Your Environment

Software Version(s)
tamr-unify-client 1.0.0
Tamr server 2021.001
Python 3.8.6
Operating System MacOs
pcattori commented 3 years ago

@tiems90 : FYI I added python to your codeblocks to get better syntax highlighting. 😄

skalish commented 3 years ago

@tiems90 Can you share the error that is raised?

tiems90 commented 3 years ago

Sure; the error is mostly related to next() being called on an empty iterator.

StopIteration                             Traceback (most recent call last)
<ipython-input-8-7a850c8bbd17> in <module>
----> 1 next(tc.dataset.record.stream(s, tc_ud))

StopIteration: 

[x for x in tc.dataset.record.stream(s, tc_ud)] does not throw an error, but just shows an empty list ([]).

skalish commented 3 years ago

A little investigation to this: We have a function in the response module

def ndjson(response: requests.Response, **kwargs) -> Iterator[JsonDict]:
    for line in response.iter_lines(**kwargs):
        yield json.loads(line)python

It is called in the record.stream function

def stream(session: Session, dataset: AnyDataset) -> Iterator[JsonDict]:
    with session.get(str(dataset.url) + "/records", stream=True) as r:
        return response.ndjson(r)

This definition currently causes the error.

If I compose these two together, rewriting so that

def stream(session: Session, dataset: AnyDataset) -> Iterator[JsonDict]:
    with session.get(str(dataset.url) + "/records", stream=True) as r:
        for line in r.iter_lines():
            yield json.loads(line)

the problem vanishes.

It is not yet clear to me why this is the case.

pcattori commented 3 years ago

It could be the semantics about returning generators. I'm curious if redefining stream like so would solve it:

def stream(session: Session, dataset: AnyDataset) -> Iterator[JsonDict]:
    with session.get(str(dataset.url) + "/records", stream=True) as r:
        yield from response.ndjson(r)

I believe this changes stream from (a) non-generator function that returns a generator to (b) a generator itself. Naively those two would behave the same way, but I think Python might have some nuance about this...

skalish commented 3 years ago

yield from works! (for me at least)

github-actions[bot] commented 3 years ago

:tada: This issue has been resolved in version 1.0.1 :tada:

The release is available on:

Your semantic-release bot :package::rocket: