KxSystems / pyq

PyQ — Python for kdb+
http://code.kx.com/q/interfaces
Apache License 2.0
190 stars 49 forks source link

Segmentation fault when using threading #59

Closed zak-b2c2 closed 6 years ago

zak-b2c2 commented 6 years ago

Hi guys,

I'm getting a segmentation fault when running a q process that loads in a Python script (.p) The use-case if fairly simple. I'm exposing Python subscription and unsubscription functions for Redis to q, then I call them so that they can run in a separate thread. That means that q can still work in "parallel"

$ cat redis.q
system "l /home/zakariyya/redis_thread.p";
.z.pc:{ if[x ~ hndl; -2" lost handle to tp"; if[`res in key `.;unsubscribe_redis enlist (res)] ] };
.z.exit:{ if[`res in key `.;unsubscribe_redis enlist (res)]; };
hndl:hopen 6006;
upd:{[t;d] data:d; neg[hndl](`upd;`tbl; data)};
show .z.i
$ q redis.q
KDB+ 3.5 2018.02.26 Copyright (C) 1993-2018 Kx Systems
l64/ 8()core 15999MB zakariyya zakariyya-pc-2193 127.0.1.1 EXPIRE 2018.07.01 zak********* KOD #51578

3645i
q)res:subscribe_redis("quote*";"quote")

And the Python script

$ cat redis_thread.p
import threading
import sys
import time
import redis
import signal
import os
from functools import partial
from pyq import q, K, _k

class PublisherThread(threading.Thread):

    def __init__(self, channel, r, kdb_table):
        super(PublisherThread, self).__init__()
        self._stopper = threading.Event()
        self.channel = channel
        self.kdb_table = kdb_table
        self.redis = r
        self.pubsub = self.redis.pubsub()
        self.redis.client_setname("kdb-feed-" + self.kdb_table)
        self.pubsub.psubscribe(self.channel)

    def stop(self):
        print('Closing Redis connection...')
        self.pubsub.close()
        self._stopper = True

    @property
    def stopped(self):
        return self._stopper.isSet()

    def run(self):
        while not self.stopped:
            try:
                msg = self.pubsub.get_message()
                if msg:
                    if msg['type'] in ('message', 'pmessage'):
                        #print(msg)
                        qmsg = K.string(msg)
                        q('upd', self.kdb_table, qmsg)
                        time.sleep(0.001)
            except _k.error as e:
                print('Caught Q error. Cannot insert data to table')
                self.stop()
            except Exception as e:
                print('Received unhandled exception. Cannot insert data to table')
                self.stop()

class RedisManager(object):

    def __init__(self, subscriber_init):
        self._subscribers_store = {}
        self.subscriber_init = subscriber_init

    def add(self, feed, kdb_table):
       print('Subscribing')
        key = ':'.join([feed, kdb_table])
        self._subscribers_store[key] = self.subscriber_init(feed, kdb_table)
        return key

    def remove(self, key):
        self._subscribers_store[key].stop()
        del self._subscribers_store[key]
        return True

def subscriber_init(feed, kdb_table, redis_client):
    t = PublisherThread(feed, redis_client, kdb_table)
    t.start()
    return t

# Create curried function of RedisManager's init
redis_manager = RedisManager(
    partial(subscriber_init,
         redis_client=redis.StrictRedis(host='XX.XXX.XXX.XXX', port=6399, db=0)))

# Create and expose Python functions a q callables

def q_subscribe_redis(feed, kdb_table):
    return redis_manager.add(str(feed), str(kdb_table))

def q_unsubscribe_redis(key):
    return redis_manager.remove(str(key))

q.subscribe_redis = q_subscribe_redis
q.unsubscribe_redis = q_unsubscribe_redis

And the segmentation fault:

q)Sorry, this application or an associated library has encountered a fatal error and will exit.
If known, please email the steps to reproduce this error to tech@kx.com
with a copy of the kdb+ startup banner.
Thank you.
/home/zakariyya/q/q64/l64/q() [0x47a8b1]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x11390) [0x7fc0aa739390]
/home/zakariyya/q/q64/l64/q(r0+0) [0x41bd40]
/home/zakariyya/q/q64/l64/q() [0x408a9c]
/home/zakariyya/q/q64/l64/q() [0x40fb6f]
/home/zakariyya/q/q64/l64/q() [0x4042c8]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7fc0aa37e830]
/home/zakariyya/q/q64/l64/q() [0x4043a1]
rlwrap: warning: q crashed, killed by SIGSEGV (core dumped).
rlwrap itself has not crashed, but for transparency,
it will now kill itself with the same signal

warnings can be silenced by the --no-warnings (-n) option
Segmentation fault (core dumped)
abalkin commented 6 years ago

I did not have time to reproduce your issue, but from the backtrace it looks like you are passing a K object from one thread to another resulting in r0 being called in one thread on an object created in another thread. To avoid this you should convert K objects to native Python objects whenever they can be shared between threads.

abalkin commented 6 years ago

@zak-b2c2 - your redis.q scrip attempts to hopen a handle to port 6006. What process is listening on this port? An unmodified redis server does not accept kdb+ connections.

zak-b2c2 commented 6 years ago

@abalkin - Thanks for the quick update. From what I can see in the code, there is not much being shared between threads. And as you can see for yourself, I make sure to redefine d as data in 'upd'

I've also changed:

                        qmsg = K.string(msg)
                        q('upd', self.kdb_table, qmsg)

to

                        q('upd', self.kdb_table, msg)

so that data is never casted to K object.

As for port 6006, it is a simple q process that gets updates from the redis.q process

tbl:([] msg:());
upd:{x insert enlist enlist y}; / y is whatever is sent from upstream
abalkin commented 6 years ago

Thanks for the missing part, but I still cannot reproduce your problem. For me, the your redis.q script hangs after printing "Subscribing" and after a few minutes prints a connection error:

$ q redis.q
KDB+ 3.5 2017.06.19 Copyright (C) 1993-2017 Kx Systems
m32/ 20()core 65536MB a **** NONEXPIRE

98033i
q)subscribe_redis("quote*";"quote")
Subscribing
'ConnectionError
  [0]  subscribe_redis("quote*";"quote")
       ^

Note that when you call q('upd', self.kdb_table, msg), msg is implicitly cast to a q symbol, so this is not what you might want to do.

I am running a redis server on the default 6379 port. How do you run the redis server?

abalkin commented 6 years ago

Aha, I missed the

redis_client=redis.StrictRedis(host='XX.XXX.XXX.XXX', port=6399, db=0)

part. Let me restart redis on port 6399.

zak-b2c2 commented 6 years ago

Sorry, yes I blurred out the host and port for security reasons. It's an in-house Redis server

abalkin commented 6 years ago

OK, let me try 127.0.0.1 ...

abalkin commented 6 years ago

Now it looks like I am getting a connection but still no crash:

q)subscribe_redis("quote*";"quote")
Subscribing
`quote*:quote
q)

Apparently, there is something on your redis server that causes the problem. Can you reproduce the issue using a local redis instance with a minimal setup that you can share?

zak-b2c2 commented 6 years ago

Two things:

zak-b2c2 commented 6 years ago

Ok I've managed to reproduce on a local instance

I have 3 q processes with different functionalities:

27814i q)res:subscribe_redis("quote*";"quote") Subscribing q)

- Process 3 - "Tickerplant"

$ q q)tbl:([] msg:()); q)upd:{x insert enlist enlist y};


And the error raised on Process 2

q)Sorry, this application or an associated library has encountered a fatal error and will exit. If known, please email the steps to reproduce this error to tech@kx.com with a copy of the kdb+ startup banner. Thank you. /home/zakariyya/q/q64/l64/q() [0x47a8b1] /lib/x86_64-linux-gnu/libpthread.so.0(+0x11390) [0x7f8a1de58390] /home/zakariyya/q/q64/l64/q(r0+0) [0x41bd40] /home/zakariyya/q/q64/l64/q() [0x408a9c] /home/zakariyya/q/q64/l64/q() [0x40fb6f] /home/zakariyya/q/q64/l64/q() [0x4042c8] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7f8a1da9d830] /home/zakariyya/q/q64/l64/q() [0x4043a1] rlwrap: warning: q crashed, killed by SIGSEGV (core dumped). rlwrap itself has not crashed, but for transparency, it will now kill itself with the same signal

warnings can be silenced by the --no-warnings (-n) option Segmentation fault (core dumped)

abalkin commented 6 years ago

Unfortunately, without being able to reproduce your issue, I don't think we can help you. A few general remarks:

  1. Your redis.q process sends asyncronous updates to a separate kdb+ instance and then sleeps for a fraction of a second. Why not use synchronous updates?

  2. Do you need to run a Redis client and kdb+ in separate processes? Why not have redis.q process manage the kdb+ tables directly?

  3. Do you need multiple threads? Take a look at asyncio framework or its Python 2 precursors.

abalkin commented 6 years ago

Sorry, I missed your post. I'll try to reproduce your steps.

abalkin commented 6 years ago

Still no crash. On the "Tickerplant", I get

q)count tbl
9500
q)count tbl
9817
q)count tbl
9983
q)count tbl
10645
q)count tbl
11032
q)-5#tbl
msg
----------------------------------------------------------------
`type`pattern`channel`data!("pmessage";"quote*";"quote";"Hello")
`type`pattern`channel`data!("pmessage";"quote*";"quote";"Hello")
`type`pattern`channel`data!("pmessage";"quote*";"quote";"Hello")
`type`pattern`channel`data!("pmessage";"quote*";"quote";"Hello")
`type`pattern`channel`data!("pmessage";"quote*";"quote";"Hello")

I'll check if it behaves differently with Python 2.7 and released version of PyQ. My current setup is

$ pyq --versions
PyQ 4.2.0.dev141+b6bfbfa
NumPy 1.14.0
KDB+ 3.5 (2017.06.19) m32
Python 3.6.4 (default, Jan  8 2018, 14:03:33)
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)]
abalkin commented 6 years ago

Same story with

$ pyq --versions
PyQ 4.1.3
NumPy 1.14.2
KDB+ 3.5 (2017.06.19) m32
Python 2.7.14 (default, Jan  8 2018, 14:00:34)
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)]

The "tbl" is happily growing on the "Tickerplant".

q)count tbl
2236
q)count tbl
2400
q)count tbl
2521
zak-b2c2 commented 6 years ago

Thanks a lot for replicating on your end. Will try with Python 3

Two things:

zak-b2c2 commented 6 years ago

I've just tried on a clean EC2 instance, using both Python 2 and Python 3 following the installation guide

Python 2

(py2) ubuntu@ip-XX-X-X-XX:~$ q redis.q 
KDB+ 3.5 2018.02.26 Copyright (C) 1993-2018 Kx Systems
l64/ 8()core 61440MB ubuntu ip--XX-X-X-XX -XX.X.X.XX EXPIRE 2018.07.01 zak********* KOD #51578

478i
q)res:subscribe_redis("quote*";"quote")
Subscribing
q)Sorry, this application or an associated library has encountered a fatal error and will exit.
If known, please email the steps to reproduce this error to tech@kx.com
with a copy of the kdb+ startup banner.
Thank you.
/home/ubuntu/q/l64/q() [0x47a8b1]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x11390) [0x7f1a6ba3b390]
/home/ubuntu/q/l64/q(r0+0) [0x41bd40]
/home/ubuntu/q/l64/q() [0x408a9c]
/home/ubuntu/q/l64/q() [0x40fb6f]
/home/ubuntu/q/l64/q() [0x4042c8]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7f1a6b680830]
/home/ubuntu/q/l64/q() [0x4043a1]
rlwrap: warning: q crashed, killed by SIGSEGV (core dumped).
rlwrap itself has not crashed, but for transparency,
it will now kill itself with the same signal

warnings can be silenced by the --no-warnings (-n) option
Segmentation fault (core dumped)
(py2) ubuntu@ip--XX-X-X-XX:~$ pyq --versions
PyQ 4.1.3
KDB+ 3.5 (2018.02.26) l64
Python 2.7.12 (default, Dec  4 2017, 14:50:18) 
[GCC 5.4.0 20160609]

Python 3

(py3) ubuntu@ip-XX-X-X-XX:~$ q redis.q 
KDB+ 3.5 2018.02.26 Copyright (C) 1993-2018 Kx Systems
l64/ 8()core 61440MB ubuntu ip-XX-X-X-XX XX.X.X.XX EXPIRE 2018.07.01 zak********* KOD #51578

965i
q)res:subscribe_redis("quote*";"quote")
Subscribing
q)Sorry, this application or an associated library has encountered a fatal error and will exit.
If known, please email the steps to reproduce this error to tech@kx.com
with a copy of the kdb+ startup banner.
Thank you.
/home/ubuntu/q/l64/q() [0x47a8b1]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x11390) [0x7f7c96d89390]
/home/ubuntu/q/l64/q(r0+0) [0x41bd40]
/home/ubuntu/q/l64/q() [0x408a9c]
/home/ubuntu/q/l64/q() [0x40fb6f]
/home/ubuntu/q/l64/q() [0x4042c8]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7f7c969ce830]
/home/ubuntu/q/l64/q() [0x4043a1]
rlwrap: warning: q crashed, killed by SIGSEGV (core dumped).
rlwrap itself has not crashed, but for transparency,
it will now kill itself with the same signal

warnings can be silenced by the --no-warnings (-n) option
Segmentation fault (core dumped)
(py3) ubuntu@ip-XX-X-X-XX:~$ pyq --versions
PyQ 4.1.3
KDB+ 3.5 (2018.02.26) l64
Python 3.5.2 (default, Nov 23 2017, 16:37:01) 
[GCC 5.4.0 20160609]
abalkin commented 6 years ago

I let the setup run for a while and after almost half a million messages, got

q) lost handle to tp
Closing Redis connection...
'snd. OS reports: Bad file descriptor
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/python36/3.6.4_2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "<string>", line 24, in run
  File "<string>", line 20, in stopped
AttributeError: 'bool' object has no attribute 'isSet'

There is too much of irrelevant code in your redis_thread.p. Please try to simplify it by getting rid of the classes and leaving just a simple "run" function that you will start using a basic Thread instance. If you can reproduce the issue in fewer than 10 lines of code, I will take another look.

I will strongly recommend not using threads, but if you must, don't make async calls from the thread: use hndl instead of neg[hndl] in upd and use pubsub.listen() instead of pubsub.get_message().

zak-b2c2 commented 6 years ago

Thanks for the advice. I've reduced the complexity of the code. Process 1 and Process 3 remain untouched. As for Process 2, I now have the following code, essentially a "dumb" listener:

Sync sending blocks the process too, so I've had to switch to async. I've also changed from get_message() to listen()

r. p Code

$ cat r.p
import sys
import redis
import threading
import cmd
import time

def callback():
    r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)

    channel = 'quote*'
    p = r.pubsub()
    p.psubscribe(channel)

    print('monitoring channel', channel)
    for m in p.listen():
        if m is None:
            continue
        else:
            q.upd(m)

def start_feed():
    my_thread = threading.Thread(target=callback)
    my_thread.setDaemon(True)
    my_thread.start()

q.start = start_feed
q)\l r.p
q)hndl:hopen 6006
q)upd:{[d] data:d; neg[hndl](`upd;`tbl; data)};
q)start()
q)('monitoring channel', 'quote*')

In parallel, I've spent the last 24 hours trying on different Linux builds (AWS r3.2xlarge running Ubuntu 16.04, AWS t2.micro running RHEL 7.4, AWS m4.xlarge running RHEL 7.4, personal desktop running Ubuntu 16.04), different Python versions (3.6.0, 3.5.2, 2.7.12, 2.7.5), q binaries (32-bit and 64-bit) and CPU affinity (taskset on one core or no tasksetting) the results are mixed:

PyQ 4.1.3
KDB+ 3.5 (2018.02.26) l64
Python 2.7.12 (default, Dec  4 2017, 14:50:18) 
[GCC 5.4.0 20160609]
PyQ 4.1.3
KDB+ 3.5 (2018.02.26) l64
Python 2.7.5 (default, Feb 20 2018, 09:19:12) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)]

FWIW, I have the 64-bit license which sends heartbeat packets to Kx's license daemon, but I doubt that could interfere...

abalkin commented 6 years ago

I've simplified you r.p further as follows:

import redis
import threading
import time

from pyq import q

def run(r, channel):
    p = r.pubsub()
    p.psubscribe(channel)
    print('monitoring channel', channel)
    for m in p.listen():
        assert m is not None
        q.upd(m)

def start():
    t = threading.Thread(target=run,
        args=(redis.StrictRedis(host='127.0.0.1', port=6379, db=0),
              "quote*"))
    t.setDaemon(True)
    t.start()

q.start = start

q)hndl:hopen 6006;
  upd:{[d] data:d; neg[hndl](`upd;`tbl; data)};
  start();

This makes it self-contained and you can run it simply as q r.p. I did get a crash when running under rlwrap:

q: warning: q crashed, killed by SIGSEGV.
q itself has not crashed, but for transparency,
it will now kill itself (without dumping core) with the same signal

warnings can be silenced by the --no-warnings (-n) option

but when I restarted without rlwrap, I don't see a crash anymore.

From your report, I see that you also use rlwrap. Please try to reproduce the issue without rlwrap.

zak-b2c2 commented 6 years ago

Thanks for getting back. I've just run as mentioned (without rlwrap). Again seg-fault... :(

(pyq2) zakariyya@zakariyya-PC-2193:~/Work/pyq2$ ./q/l64/q r.p
KDB+ 3.5 2018.02.26 Copyright (C) 1993-2018 Kx Systems
l64/ 4()core 15999MB zakariyya zakariyya-pc-2193 127.0.1.1 EXPIRE 2018.07.01 zak********* KOD #51578

q)hndl:hopen 6006
q)upd:{[d] data:d; neg[hndl](`upd;`tbl; data)};
q)
q)start()
q)('monitoring channel', 'quote*')

q)Sorry, this application or an associated library has encountered a fatal error and will exit.
If known, please email the steps to reproduce this error to tech@kx.com
with a copy of the kdb+ startup banner.
Thank you.
./q/l64/q() [0x47a8b1]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x11390) [0x7fa7bb84e390]
./q/l64/q(r0+0) [0x41bd40]
./q/l64/q() [0x408a9c]
./q/l64/q() [0x40fb6f]
./q/l64/q() [0x4042c8]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7fa7bb493830]
./q/l64/q() [0x4043a1]
Segmentation fault (core dumped)
(pyq2) zakariyya@zakariyya-PC-2193:~/Work/pyq2$ pyq --versions
PyQ 4.1.3
KDB+ 3.5 (2018.02.26) l64
Python 2.7.12 (default, Dec  4 2017, 14:50:18) 
[GCC 5.4.0 20160609]
abalkin commented 6 years ago

OK, after 300,000 messages, I did see a crash. I have two ideas to try next:

  1. Let's take redis out of the picture by replacing get_message() with dict(type='pmessage', ...).
  2. I still blame the async IPC call from the thread. Let's replace neg[hndl] with hndl in the listener's upd function.
abalkin commented 6 years ago

I was successful in reproducing the problem without Redis. The following script requires only a virgin kdb+ instance listening on port 6006:

$ cat r2.p
import threading
import time

from pyq import q

def run():
    while 1:
        q.upd()
        time.sleep(0.001)

def start():
    t = threading.Thread(target=run)
    t.setDaemon(True)
    t.start()

q.start = start

q)h:neg hopen 6006;
  upd:{h({n+::x};1)};
  start();

It reliably crashes after less than 10,000 cycles. At this point we should probably report this to Kx, but I won't be surprised if they respond with the advice to move IPC calls to the main thread.

abalkin commented 6 years ago

.. and finally, I have a proof that this issue has nothing to do with PyQ:

$ q -s 4
KDB+ 3.5 2018.04.25 Copyright (C) 1993-2018 Kx Systems
m32/ 20()core 65536MB **** NONEXPIRE

q)h:neg hopen 6006
q)upd:{h({n+::x};1)}
q)upd peach til 100000
[1]    74350 segmentation fault (core dumped)  q -s 4

(on the remote end, n got incremented to 4)

I am reassigning this issue to @awilson-kx to see what the Kx team has to say about this behavior.

For completeness, the back trace is

$ lldb -f $VIRTUAL_ENV/q/m32/q -c /cores/core.74350
(lldb) target create "/Users/a/.virtualenvs/3/q/m32/q" --core "/cores/core.74350"
Core file '/cores/core.74350' (i386) was loaded.
(lldb) process status
Process 0 stopped
* thread #1, stop reason = signal SIGSTOP
    frame #0: 0xa754745e libsystem_kernel.dylib`semaphore_wait_trap + 10
libsystem_kernel.dylib`semaphore_wait_trap:
->  0xa754745e <+10>: retl
    0xa754745f <+11>: nop

libsystem_kernel.dylib`semaphore_wait_signal_trap:
    0xa7547460 <+0>:  movl   $0xffffffdb, %eax         ; imm = 0xFFFFFFDB
    0xa7547465 <+5>:  calll  0xa7552f64                ; _sysenter_trap
  thread #2, stop reason = signal SIGSTOP
    frame #0: 0x0000af67 q`___lldb_unnamed_symbol61$$q + 951
q`___lldb_unnamed_symbol61$$q:
->  0xaf67 <+951>: movl   (%eax), %eax
    0xaf69 <+953>: testl  %eax, %eax
    0xaf6b <+955>: jne    0xaf1b                    ; <+875>
    0xaf6d <+957>: jmp    0xad0d                    ; <+349>
  thread #3, stop reason = signal SIGSTOP
    frame #0: 0x0002011a q`r0 + 10
q`r0:
->  0x2011a <+10>: movl   0x4(%eax), %edx
    0x2011d <+13>: testl  %edx, %edx
    0x2011f <+15>: je     0x20140                   ; <+48>
    0x20121 <+17>: movl   0x82ffe(%ebx), %ecx
  thread #4, stop reason = signal SIGSTOP
    frame #0: 0xa754745e libsystem_kernel.dylib`semaphore_wait_trap + 10
libsystem_kernel.dylib`semaphore_wait_trap:
->  0xa754745e <+10>: retl
    0xa754745f <+11>: nop

libsystem_kernel.dylib`semaphore_wait_signal_trap:
    0xa7547460 <+0>:  movl   $0xffffffdb, %eax         ; imm = 0xFFFFFFDB
    0xa7547465 <+5>:  calll  0xa7552f64                ; _sysenter_trap
  thread #5, stop reason = signal SIGSTOP
    frame #0: 0xa754745e libsystem_kernel.dylib`semaphore_wait_trap + 10
libsystem_kernel.dylib`semaphore_wait_trap:
->  0xa754745e <+10>: retl
    0xa754745f <+11>: nop

libsystem_kernel.dylib`semaphore_wait_signal_trap:
    0xa7547460 <+0>:  movl   $0xffffffdb, %eax         ; imm = 0xFFFFFFDB
    0xa7547465 <+5>:  calll  0xa7552f64                ; _sysenter_trap
zak-b2c2 commented 6 years ago

Thanks a lot for taking the time to look into this, but also glad to know I was going crazy :sweat_smile:

I've actually already implemented the main thread solution, which funnily enough needs to make sync calls to the "Tickerplant", otherwise the packets get thrown God knows where. I guess the split thread solution was an interesting approach to having q and Python run in parallel to manage their bits together (Python for Redis management, q for IPC and reconnecting to TP...)

Interestingly, the processes are still running on the AWS RHEL 7.4 boxes after now 10 hours (will leave them overnight but looks like they've passed the threshold of 10,000 cycles).