collective / collective.zamqp

AMQP-integration for Zope2
https://pypi.python.org/pypi/collective.zamqp
Other
3 stars 2 forks source link

having issue setting up zamqp #17

Closed toropok closed 2 years ago

toropok commented 2 years ago

hi

getting this error message after the instance start what may cause it?

2022-04-01T08:49:25 ERROR Zope.SiteErrorLog 1648799365.570.757291335619 http://localhost/superuser.ping
Traceback (innermost last):
  Module ZServer.ZPublisher.Publish, line 144, in publish
  Module ZPublisher.mapply, line 85, in mapply
  Module ZServer.ZPublisher.Publish, line 44, in call_object
  Module collective.zamqp.datatypes, line 97, in <lambda>
  Module collective.zamqp.keepalive, line 118, in ping
  Module collective.zamqp.producer, line 276, in publish
  Module zope.component._api, line 165, in getUtility
ComponentLookupError: (<InterfaceClass collective.zamqp.interfaces.ISerializer>, 'text/plain')
datakurre commented 2 years ago

Which Plone and Python version are you using? According to the error, not all code from the package has been properly loaded. I am not sure, how that error is possible without even more critical errors šŸ˜®

toropok commented 2 years ago

Plone 5.2.7 (5215) CMF 2.5.4 Zope 4.6.3 Python 2.7.18 (default, Mar 8 2021, 13:02:45) [GCC 9.3.0] PIL 6.2.2 (Pillow) WSGI: Off. Server: ZServer 4.0.2

toropok commented 2 years ago

there are no more critical errors in logs that appeared (well at least I can see).

The error we have raises every 60 sec which is the default heartbeat I suppose.

datakurre commented 2 years ago

I would assume this still to work on that version.

Try out if that issue only affects keep alive-feature by dropping keepalive from the connection configuration. It is not really required with heartbeat, and I donā€™t remember anymore why we didnā€™t have heartbeat support in the beginning.

datakurre commented 2 years ago

keepalive was for manual ā€œheartbeatā€ with implicit producer and consumer. heartbeat option instead set ups the protocol level heartbeat support

toropok commented 2 years ago

Ok. I will try removing "keepalive" setting from buildout.cfg

datakurre commented 2 years ago

Also both heartbeat and keepalive should only matter if there are firewalls cutting idle connections between Plone and RabbitMQ (we did and heartbeat was mandatory).

toropok commented 2 years ago

we've both started local Plone on 8081 port, RabbitMQ on its default.

here is my cfg:

[instance]
<= instance_base
recipe = plone.recipe.zope2instance
http-address = 8081
#eggs =
#    collective.zamqp
environment-vars =
    ZAMQP_LOGLEVEL DEBUG

zope-conf-additional =
    %import collective.taskqueue
    %import collective.zamqp
    <taskqueue />
    <taskqueue-server />
    <amqp-broker-connection>
        connection_id   superuser
        hostname       
        username        admin
        password        admin
        port            5672
#        keepalive       60
    </amqp-broker-connection>
    <amqp-consuming-server>
        connection_id   superuser
        site_id         senaite20
        user_id         admin
        port            8081
    </amqp-consuming-server>

[worker]
<= instance
http-address = 8085
zserver-threads = 1
environment-vars =
    ZAMQP_LOGLEVEL DEBUG

zope-conf-additional =
    %import collective.zamqp
    <amqp-broker-connection>
        connection_id   superuser
        hostname        
        port            5672
        username        admin
        password        admin
        heartbeat       120
 #       keepalive       60
    </amqp-broker-connection>
    <amqp-consuming-server>
        connection_id   superuser
        use_vhm         off
        site_id         senaite20
        user_id         admin
        port            8081
    </amqp-consuming-server>

I've removed "keepalive" and the regular error disappeared, but no connection established to RabbitMQ and no signs of any try of that

datakurre commented 2 years ago

@toropok Hang on, and I'm sure, we'll get it running.

zamqp does not do much without a package using it and declaring connection / consumer objects, which could be the reason for the initial error

Please, try to restore keepalive, and add

zcml = collective.zamp 

to the buildout. Let's see if the keepalive connection opens and runs without error after that.

datakurre commented 2 years ago

Here's an old package with both simple and advanced examples https://github.com/datakurre/collective.zamqpdemo

datakurre commented 2 years ago

@toropok Please, set also ZAMQP_LOGLEVEL INFO. I'm sorry for it being very unintuitive. It defines, which loglevel zamqp uses for logging. By setting it to INFO, it will log also debug level messages with INFO level, allowing them to be logged with the default Plone logging settings.

toropok commented 2 years ago

Thanks

@datakurre I've added zcml = collective.zamqp to the buildout.cfg and it connected to RabbitMQ server:

2022-04-04T09:23:07 INFO collective.zamqp Connection 'superuser' connecting
2022-04-04T09:23:07 INFO collective.zamqp Connection 'superuser' connected
2022-04-04T09:23:07 INFO collective.zamqp Channel for connection 'superuser' opened
2022-04-04T09:23:07 INFO collective.zamqp Producer ready to publish to exchange '' with routing key '' on connection 'superuser'
2022-04-04T09:23:07 INFO collective.zamqp Producer declared exchange 'collective.zamqp' on connection 'superuser'
2022-04-04T09:23:07 INFO collective.zamqp Consumer declared queue 'collective.zamqp.vasya' on connection 'superuser'
2022-04-04T09:23:07 INFO collective.zamqp Consumer ready to consume queue 'collective.zamqp.vasya' on connection 'superuser'
2022-04-04T09:23:07 INFO collective.zamqp Producer declared queue 'collective.zamqp.vasya' on connection 'superuser'
2022-04-04T09:23:07 INFO collective.zamqp Producer bound queue 'collective.zamqp.vasya' to exchange 'collective.zamqp' on connection 'superuser'
2022-04-04T09:23:07 INFO collective.zamqp Producer ready to publish to exchange 'collective.zamqp' with routing key 'collective.zamqp.vasya' on connection 'superuser'

However, I've encountered problems with consuming messages.

I chose to try this message.py - collective.zamqp demo. However, because we use Plone 5.2 throws the exception:

ImportError: cannot import name 'IPossibleSite'

My guess the reason is that Grok is no longer supported in Plone 5.2 and I've tried to use ZCML instead

I've added subscriber to the configure.zcml:

    <subscriber
        for="senaite.fhir.messageprocessor.IMessage
                collective.zamqp.interfaces.IMessageArrivedEvent"
        handler="senaite.fhir.messageprocessor.consumeItem"
    />

And here is my code in__init__.py:

class IMessage(Interface):
    """Message marker interface"""

class DummyConsumer(Consumer):
    connection_id = CONNECTION_ID
    queue = "collective.zamqp.vasya"
    marker = IMessage
    durable = False

def consumeItem(message, event):
    """Consume item creation message"""

    logger.info("============= BODY: {}".format(message.body))

    message.ack()

There is no message arriving at the consumerItem method, however message removes from the RabbitMQ queue. May be we should use other approach?

datakurre commented 2 years ago

Curious.

When is ImportError: cannot import name 'IPossibleSite' being raised?

Mostly grok seems to still work, because now ZAMQP components are being registered and connection established.

I wonder, where those messages go. It should log when it receives message, and if the message is left unacked. It does not auto-ack messages.

datakurre commented 2 years ago

I'm able to reproduce that IPossibleSite issue.

toropok commented 2 years ago

When is ImportError: cannot import name 'IPossibleSite' being raised?

It happens right on the instance start and start expectedly fails:

File "/home/senaite/senaitelims20/src/senaite.fhir.messageprocessor/src/senaite/fhir/messageprocessor/__init__.py", line 18, in <module>
    from five import grok
  File "/home/senaite/buildout-cache/eggs/cp27mu/five.grok-1.3.2-py2.7.egg/five/grok/__init__.py", line 22, in <module>
    from five.grok.components import Model, Container, Site, LocalUtility
  File "/home/senaite/buildout-cache/eggs/cp27mu/five.grok-1.3.2-py2.7.egg/five/grok/components.py", line 21, in <module>
    from zope.location.interfaces import IPossibleSite
zope.configuration.xmlconfig.ZopeXMLConfigurationError: File "/home/senaite/buildout-cache/eggs/cp27mu/Products.CMFPlone-5.2.7-py2.7.egg/Products/CMFPlone/meta.zcml", line 38.4-42.10
    File "/home/senaite/senaitelims20/parts/instance/etc/site.zcml", line 12.2-12.39
    ImportError: cannot import name IPossibleSite

Definitely this happens when we import grok package: 'from five import grok'

we use your post as an example from Stackoverflow

datakurre commented 2 years ago

@toropok Thanks for you patience.

I can confirm that five.grok is the issue and it is no longer supported on Plone 5.2. I was surprised that collective.zamqp still runs (our Python 2.7 sites relying on zamqp are still on some 5.1.x version). It seems to work, because it uses lower level grok packages instead.

I figure out the ZCML way to register things ping back. We've done that, but I am currently out of the office for some time and therefore unable to access our code.

toropok commented 2 years ago

@datakurre great! eager to get news from you. we're totally stuck with it :)

tnx

datakurre commented 2 years ago

@toropok Wow. That message.py from zamqdemo seem to demonstrate all kind of advanced AMQP concepts, so eventually I need to make an example for you from that Stackoverflow answer.

Meanwhile, Grok still works for ZAMQP related stuff (plone.directives.from and other five.grok related things remain unsupported):

In addon configure.zcml, declare grok-namespace

<configure xmlns="http://namespaces.zope.org/zope"
...
           xmlns:grok="http://namespaces.zope.org/grok"
...
>

and include collective.zamqp (this replaces zcml-line from buildout):

    <include package="collective.zamqp" />

because ZAMQP includes grokcore.component you are now able grok ZAMQP related modules, with Consumers, Producers and subscribers:

    <grok:grok package=".message"/>

In the module itself, grok for grok.subscribe must be imported with

import grokcore.component as grok

I tested that producers, consumers and subscribers from that otherwise outdated message.py worked as expected.

Next I'll move to Stackoverflow example and translate it to ZCML. There are some implicit defaults that rely on Grok, but those don't matter if connection_ids, queues and routing_keys are always set explicitly

datakurre commented 2 years ago

@toropok Standalone example:

example.py

from Products.statusmessages.interfaces import IStatusMessage
from collective.zamqp.consumer import Consumer
from collective.zamqp.interfaces import IProducer
from collective.zamqp.producer import Producer
from plone.app.textfield import RichText
from plone.app.textfield import RichTextValue
from plone.autoform.form import AutoExtensibleForm
from plone.dexterity.utils import createContentInContainer
from plone.supermodel import model
from z3c.form import button
from z3c.form import form
from zope import schema
from zope.component import getUtility
from zope.component.hooks import getSite
from zope.interface import Interface

import logging
logger = logging.getLogger("amqpdemo")

class CreateItemProducer(Producer):
    """Produces item creation requests"""

    connection_id = "superuser"
    serializer = "msgpack"
    routing_key = "amqpdemo.create"
    queue = "amqpdemo.create"

    durable = False

class ICreateItemMessage(Interface):
    """Marker interface for item creation message"""

class CreateItemConsumer(Consumer):
    """Consumes item creation messages"""

    connection_id = "superuser"
    marker = ICreateItemMessage
    queue = "amqpdemo.create"

    durable = False

def createItem(message, event):
    """Consume item creation message"""
    logger.info(message.body)
    portal = getSite()
    data = message.body.copy()
    data["text"] = RichTextValue(
        raw=data.get("text") or "",
        mimeType="text/html",
        outputMimeType="text/html",
        encoding="utf-8",
    )
    createContentInContainer(
        portal,
        "Document",
        checkConstraints=True,
        **data
    )
    message.ack()

class IDocument(model.Schema):
    """Status form schema"""

    title = schema.TextLine(
        title=u"Title",
    )

    description = schema.Text(
        title=u"Description",
    )

    text = RichText(
        title=u"Body text",
        default_mime_type='text/html',
        output_mime_type='text/html',
        allowed_mime_types=('text/html',),
    )

class Form(AutoExtensibleForm, form.Form):
    """Form"""

    schema = IDocument
    ignoreContext = True

    label = u"Create document"
    description = u"Queues a document creation'."

    def update(self):
        self.request.set("disable_border", True)
        super(Form, self).update()

    @button.buttonAndHandler(u"Send")
    def queueMessage(self, action):
        data, errors = self.extractData()
        if not errors:
            producer = getUtility(IProducer, name="amqpdemo.create")
            producer._register()  # register for transaction
            producer.publish(dict(
                title=data["title"],
                description=data["description"],
                text=data["text"].output,
            ))

            IStatusMessage(self.request).addStatusMessage(
                           u"Queued: %s" % data["title"],
                           "info")

example.zcml

<configure xmlns="http://namespaces.zope.org/zope"
           xmlns:zcml="http://namespaces.zope.org/zcml"
           xmlns:browser="http://namespaces.zope.org/browser">

    <include package="collective.zamqp" />

    <utility
        factory=".example.CreateItemProducer"
        provides="collective.zamqp.interfaces.IProducer"
        name="amqpdemo.create"
        />

    <utility
        factory=".example.CreateItemConsumer"
        provides="collective.zamqp.interfaces.IConsumer"
        name="amqpdemo.create"
        />

    <subscriber
        for=".example.ICreateItemMessage
             collective.zamqp.interfaces.IMessageArrivedEvent"
        handler=".example.createItem"
        />

    <browser:page
        for="*"
        name="create-document"
        class=".example.Form"
        permission="zope2.View"
        />

</configure>

buildout-instance

environment-vars =
    ZAMQP_LOGLEVEL INFO
zope-conf-additional =
   %import collective.zamqp
   <amqp-broker-connection>
       connection_id superuser
       hostname localhost
       virtual_host /
       username guest
       password guest
       heartbeat 120
       keepalive 30
   </amqp-broker-connection>
   <amqp-consuming-server>
       connection_id superuser
       site_id Plone
       user_id admin
   </amqp-consuming-server>
datakurre commented 2 years ago
2022-04-04 14:32:30 INFO ZServer HTTP server started at Mon Apr  4 14:32:30 2022
        Hostname: 0.0.0.0
        Port: 8080
2022-04-04 14:32:30 INFO collective.zamqp AMQP Broker connection 'superuser' created. hostname: 'localhost', port: '5672', virtual_host: '/', username: 'guest', heartbeat: '120', prefetch_count: '0', tx_select: 'False'
2022-04-04 14:32:30 INFO collective.zamqp Setting up keepalive (30 s) for connection 'superuser'
2022-04-04 14:32:30 INFO ZServer Clock server for "/superuser.ping" started (user: Anonymous, period: 30)
2022-04-04 14:32:30 INFO collective.zamqp AMQP Consuming Server for connection 'superuser' started (site 'Plone' user: 'admin')
2022-04-04 14:32:34 INFO Zope Ready to handle requests
2022-04-04 14:32:34 INFO collective.zamqp Connection 'superuser' connecting
2022-04-04 14:32:34 INFO collective.zamqp Connection 'superuser' connected
2022-04-04 14:32:34 INFO collective.zamqp Channel for connection 'superuser' opened
2022-04-04 14:32:34 INFO collective.zamqp Producer ready to publish to exchange '' with routing key '' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Producer declared exchange 'collective.zamqp' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Consumer declared queue 'collective.zamqp.superuser' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Consumer ready to consume queue 'collective.zamqp.superuser' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Consumer declared queue 'amqpdemo.create' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Consumer ready to consume queue 'amqpdemo.create' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Producer declared queue 'amqpdemo.create' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Producer ready to publish to exchange '' with routing key 'amqpdemo.create' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Producer declared queue 'collective.zamqp.superuser' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Producer bound queue 'collective.zamqp.superuser' to exchange 'collective.zamqp' on connection 'superuser'
2022-04-04 14:32:34 INFO collective.zamqp Producer ready to publish to exchange 'collective.zamqp' with routing key 'collective.zamqp.superuser' on connection 'superuser'
2022-04-04 14:32:50 INFO collective.zamqp Received message '1' sent to exchange '' with routing key 'amqpdemo.create'
2022-04-04 14:32:50 INFO collective.zamqp Worker started processing message '1' (status = 'RECEIVED', age = '0:00:00.006058')
2022-04-04 14:32:50 INFO amqpdemo {u'text': u'<p style="text-align: left;"><strong>BOLD DRAGONS</strong></p>', u'description': u'HERE DRAGONS', u'title': u'Hello World'}
2022-04-04 14:32:50 INFO collective.zamqp Letting Zope to commit database transaction for message '1' (status = 'RECEIVED', age = '0:00:00.061184')
2022-04-04 14:32:50 INFO collective.zamqp Handled message '1' (status = 'ACK', age = '0:00:00.128827')
^C2022-04-04 14:32:53 INFO SignalHandler Caught signal SIGINT
2022-04-04 14:32:53 INFO Z2 Shutting down
toropok commented 2 years ago

Wow! Thanks will try that soon.

toropok commented 2 years ago

@datakurre

thank you so much for your time and assistance, we've finally succeeded )))

It was non-obvious that `collective.zamqp' creates aux exchange and queue for keepalive feature only, beside of that it works fine. Thanks again

datakurre commented 2 years ago

@toropok Agreed. As said above, keepalive is redundant and not needed with heartbeat.