Closed wescpy closed 2 years ago
Hi @wescpy I was able to run the following code without exception:
def main():
# tally recent visitor counts from queue then delete those tasks
tallies = {}
acks = set()
#with psc_client:
print("pull")
rsp = psc_client.pull(subscription=SUB_PATH, max_messages=1)
msgs = rsp.received_messages
for rcvd_msg in msgs:
acks.add(rcvd_msg.ack_id)
visitor = rcvd_msg.message.data.decode('utf-8')
tallies[visitor] = tallies.get(visitor, 0) + 1
if acks:
psc_client.acknowledge(subscription=SUB_PATH, ack_ids=acks)
psc_client.close()
# increment those counts in Datastore and return
if tallies:
with ds_client.context():
for visitor in tallies:
counter = VisitorCount.query(VisitorCount.visitor == visitor).get()
if not counter:
counter = VisitorCount(visitor=visitor, counter=0)
counter.put()
counter.counter += tallies[visitor]
counter.put()
return 'DONE (with %d task[s] logging %d visitor[s])\r\n' % (len(msgs), len(tallies))
here is my pip freeze:
cachetools==3.1.1
certifi==2021.10.8
chardet==4.0.0
click==7.1.2
enum34==1.1.10
Flask==1.1.4
futures==3.3.0
google-api-core==1.32.0
google-auth==1.35.0
google-cloud-core==1.7.3
google-cloud-datastore==1.15.5
google-cloud-ndb==1.11.1
google-cloud-pubsub==1.7.2
googleapis-common-protos==1.52.0
grpc-google-iam-v1==0.12.3
grpcio==1.39.0
idna==2.10
itsdangerous==1.1.0
Jinja2==2.11.3
MarkupSafe==1.1.1
packaging==20.9
protobuf==3.17.3
pyasn1==0.4.8
pyasn1-modules==0.2.8
pymemcache==3.5.2
pyparsing==2.4.7
pytz==2022.1
redis==3.5.3
requests==2.27.1
rsa==4.5
six==1.16.0
urllib3==1.26.10
Werkzeug==1.0.1
Thanks for the follow-up. Let me take a look at what you did and see if I can replicate it in my dev env then try to get it to run on the GAE platform where I encountered this issue. There's a good possibility that the grpcio
I'm using (v1.39) conflicts with the one available on App Engine servers (v1.0.0).
FYI I was using virtualenv to run, which I highly recommend if you can.
Thx for the reco... using virtualenv
was useful. I too, was able to get a cmd-line version working (w/grpcio 1.39
) on my local Python 2 dev environment. Furthermore, when I rollback to grpcio==1.0.0
, I was able to repro the error. Using successive "binary tree" testing, I discovered the fix for this bug was integrated in grpcio
version 1.12.0 with this PR fixing this bug.
Conclusions:
grpcio<1.12.0
, and so long as grpcio>=1.12.0
is a prerequisite to install this version of the client library (or a proper warning given in its README
), then you can close this issue as "WaI" as it's more of a grpcio
issue vs. python-pubsub
mainly a library-compatibility thing.grpcio==1.0.0
. I will file a bug for the App Engine team to upgrade to >=1.12.0
.UPDATE: I filed a bug against the App Engine team to upgrade the built-in version of grpcio
, but in the meantime, all users who run into this issue with their Python 2 App Engine apps using Cloud Pub/Sub can just "push" the exception handler I outlined earlier up to their code, e.g., don't use the context manager for the subscriber client, and wrap the close()
:
try:
psc_client.close()
except AttributeError: # special Py2 handler for grpcio<1.12.0
pass
Environment details
google-cloud-pubsub
version: 1.7.2Also see:
Steps to reproduce
main.py
and convert this to a cmd-line app that can be easily verified, else deploy this to Python 2 App Engine.log()
function, which fails when trying to close the subscriber client (psc_client
). Remove myAttributeError
exception handler before you run it, and you should get one of the exceptions below when you execute it.The issue is this line of v1.7.2 of the Pub/Sub client library. Basically
self.api.transport.channel
doesn't have aclose
attribute, so this call fails. It doesn't work as a context manager nor a plain call toclose()
:Using context manager (
with
statement):Using normal
close()
:Code example
(see
main.py
in above code sample)Stack trace
(see above)
Root cause speculation
Call to
self.api.transport.channel.close()
fails with a subscriber (subscription client). Either you should fixself.api.transport.channel
so this call doesn't fail or wrap thisclose()
call with an exception handler at that lower-level, basically backporting possible fixes from future versions of this module. Users have no idea what to do except to add extra code to suppressAttributeError
here, and this is not a documented use case for this library—by that I mean this is not a user error, and they shouldn't be expected to write an exception handler here.The problem is that the proper comms channels should actually be closed in this call. I also thought that
channel
has been deprecated and thattransport
s/b used, so I don't know why the call is like this.Fix recommendation
To summarize, the offending code is this line: https://github.com/googleapis/python-pubsub/blob/v1.7.2/google/cloud/pubsub_v1/subscriber/client.py#L253
CURRENTLY:
So my code doesn't throw this exception, I had to make this...
TEMPORARY BUT BAD FIX:
The real fix is to get the correct object in line 253 that needs to be closed and really close and release the gRPC communication resource, whether
channel
,transport
, or something else. Again, you may need to investigate a future version of this file and backport the minimal necessary to get this to work for all remaining Python 2 Pub/Sub users out there. So far, it seems to only affect (synchronous pull) subscriber clients.