winking324 / pyactivemq

Automatically exported from code.google.com/p/pyactivemq
Apache License 2.0
0 stars 0 forks source link

pyactivemq ActiveMQ client hangs. #37

Open GoogleCodeExporter opened 9 years ago

GoogleCodeExporter commented 9 years ago
What steps will reproduce the problem?

I've created a script that subscribes to 2 topics as durable subscribers and 2 
queues as consumers.  The application works fine for several hours.  I am able 
to read messages from the topics and queues and send messages.  However after a 
few hours the application hangs (run from the command line--cygwin bash shell). 
 If I run it in PyCharm (python IDE) the application stops after the same few 
hours with exit code -1073741819 ( 0xC0000005) which is access violation. I 
registered an exception listener, but the onException() method is never called. 
I've tried valiantly to compile the code so I can run this in the debugger, but 
trunk does not compile, and I can't seem to find the right combination of 
Apache and the activemq project code to get it to compile.  I'd be happy to 
debug this if someone could provide more helpful hints on compilation.  I use 
VS2008 but also have VS2010 and could probably round up the VS2006 compiler on 
a different machine. 

What is the expected output? What do you see instead?
I would expect the activemq interface to continue working or tell my why it 
isn't working.  A thrown exception that I could catch and handle would be 
outstanding.

What version of the product are you using? On what operating system?
pyactivemq-0.1.0/python26/windows 7.

Please provide any additional information below.

The messages that this application gets are text XML docs.

Here are some code snippets (this wouldn't run without the extra code around 
it, but is provided so you can see the basics of how I'm using the pyqctivemq 
interface).

###########################################################################
class TextListener(pyactivemq.MessageListener):
    def __init__(self, name, parent):
        pyactivemq.MessageListener.__init__(self)
        self.monitor = Event()
        self.name = name
        self.msgCount = 0
        self.parent= parent # The parent object
    #######################################################################
    def onMessage(self, message):
        if isinstance(message, pyactivemq.TextMessage):
            try:
                #print ('%s: Reading Message: ' % self.name) + message.text
                self.msgCount += 1
                try:
                    self.parent.msgQ.put(
                        (self.name,message.text),timeout=60)
                except Queue.Full:
                    print("Queue is full or blocking!")

            except:
                print("Failed to put message on the queue")
                print(traceback.format_exc())
        else:
            self.monitor.set()

###########################################################################

class ExceptionListener(pyactivemq.ExceptionListener):
    def __init__(self,parent):
        self.parent= parent
        pyactivemq.ExceptionListener.__init__(self)
        #TODO: What goes in here?
    def onException(self,ex):
        print("Fatal exception?  Shutting down.")
        print(traceback.format_exc(10))
        sys.exit(-999)
###########################################################################
###########################################################################
def startRegularSubscriber(self):
    print('Starting regular subscriber')
    topicName = 'regular.update.topic'

    self.updateTopic = self.session.createTopic(topicName)
    self.connection.stop()
    self.regularSubscriber = self.session.createDurableConsumer(self.updateTopic, 
        "MakeItLast", '', False)
    self.regularSubscriber.messageListener = self.TextListener('reg',self)
    print('Subscribed to %s' % topicName)
    self.connection.start()
    print("Started connection")

###########################################################################
def startEodSubscriber(self):
    print('Starting eod subscriber')
    topicName = 'eod.update.topic'

    self.eodTopic = self.session.createTopic(topicName)
    self.connection.stop()
    self.eodSubscriber = self.session.createDurableConsumer(self.eodTopic, 
        "MakeItLast2", '', False)
    self.eodSubscriber.messageListener = self.TextListener('eod',self)
    print('Subscribed to %s' % topicName)
    self.connection.start()
    print("Started connection")

###########################################################################

connection = connectionFactory.createConnection('', '', clientID)
print('Connection has ID %s' % connection.clientID)
ex= ExceptionListener(self)
connection.exceptionListener= ex 

session = connection.createSession(AcknowledgeMode.AUTO_ACKNOWLEDGE)

rqstQueue = session.createQueue('report.request.queue')
rqstProducer = session.createProducer(rqstQueue)

clientQueueName = 'client.specific.update.queue'
clientQueue = session.createQueue(clientQueueName)

clientSelector = "ClientId='%s'" % connection.clientID
print('CS = %s' % clientSelector)
clientSubscriber = session.createConsumer(clientQueue, 
    clientSelector, True)

clientSubscriber.messageListener = TextListener('client',self)
print('Subscribed to %s' % clientQueueName)

connection.start()

print("Started connection")

startRegularSubscriber()
time.sleep(5) # Adding a sleep between subscriptions seems to prevent
              # Re-connect issues.  I'm not sure why.
startEodSubscriber()
time.sleep(5)

Original issue reported on code.google.com by st...@kelker.net on 4 Jan 2011 at 4:37