trbs / pid

Pidfile featuring stale detection and file-locking, can also be used as context-manager or decorator
https://pypi.python.org/pypi/pid/
Apache License 2.0
102 stars 26 forks source link

PID file is not getting created #11

Closed sergani closed 8 years ago

sergani commented 8 years ago

Hello,

I feel I'm doing something wrong, but I checked the source code of pid and couldn't find the issue.

I'm basically doing the following:

try:
    with PidFile(piddir='.'):
        print 'Starting script'
        do even more stuff that takes quite a lot of time...
except PidFileError:
    print 'Unable to start script - another instance is running!'

I tried piddir = '.', '/tmp/' and quite a lot of things, but for some reason the pid file is not being created. I added a simple print statement within the source's init.py to show me the filename, and sure enough it's showing correctly, but not getting creating.

Of course, I'm able to run multiple instances of the script.

Any thoughts?

Many thanks in advance //M

sergani commented 8 years ago

Forgot to mention, I'm using Python 2.7.10.

Thanks //M

trbs commented 8 years ago

Sound like that should work.

I created a little test running Python 2.7.12 on my Macbook (I don't think it should matter much vs 2.7.10)

$ cat test.py
from pid import PidFile
import time

with PidFile(piddir='.'):
    time.sleep(60)

When I run the first instance:

$ python test.py

I see the pidfile

$ ls -al test.py.pid
-rw-r--r--   1 trbs  staff      6 Sep  5 22:18 test.py.pid

When I run the second instance

$ python test.py
Traceback (most recent call last):
  File "test.py", line 4, in <module>
    with PidFile(piddir='.'):
  File "/Users/trbs/tmp/pid/pid/__init__.py", line 206, in __enter__
    self.create()
  File "/Users/trbs/tmp/pid/pid/__init__.py", line 170, in create
    raise PidFileAlreadyLockedError(exc)
pid.PidFileAlreadyLockedError: [Errno 35] Resource temporarily unavailable

Could you see if this simplified test case works for you ?

sergani commented 8 years ago

Hello,

Thanks for the feedback - I tried the simple test case and sure enough, it works correctly. I even tried the following and it works:

from pid import PidFile
import time

try:
    with PidFile(piddir='.'):
        time.sleep(60)
except:
    print 'Can not start'

Should I post my running code?

Thanks //M

trbs commented 8 years ago

Yes please, hopefully we can figure out what goes wrong.

sergani commented 8 years ago

Hello,

HYG

from datetime           import datetime
from pid                import PidFile
from threading          import Thread
import helpers.dbconfig as config
import MySQLdb
import os
import paramiko
import socket
import sys
import time

date = datetime.strftime(datetime.now(), '%Y%m%d')   # current date
time = datetime.strftime(datetime.now(), '%H:%M') # current time

# DB configurations 
dbhost = config.db['host']
dbusername = config.db['username']
dbpassword = config.db['password']
dbname = config.db['name']

# DB Tables
tblclusters = config.db['clusters']
tblmsxs = config.db['msxs']
tbligrps = config.db['igrps']
tblreadings = 'igrp_readings_' + date

# print date
# print time

def get_db(host, username, password, db):
    '''Creates a DB connection and cursor'''
    try:
        con = MySQLdb.connect(host=host, user=username, passwd=password, db=db)
        cur = con.cursor()

        return con, cur

    except MySQLdb.Error as e:
        print 'Error %d: %s' % (e.args[0],e.args[1])
        sys.exit(1)

def close_db(db):
    '''Close the DB connection'''
    try:
        db.close()
    except MySQLdb.Error as e:
        print 'Error %d: %s' % (e.args[0],e.args[1])
        sys.exit(1)

def get_igrps_list(cur):
    '''Gets a list of the active IGRPs that the system will query each MSX for'''
    try:
        cur.execute('''select id, name from igrps where active = 1''')
    except MySQLdb.Error as e:
        print 'Error %d: %s' % (e.args[0],e.args[1])
        sys.exit(1)
    if cur.rowcount > 0:
        results = cur.fetchall()
        igrps = {}
        for result in results:
            igrps.setdefault(result[1], result[0])

        return igrps

def add_igrps_readings(cur, table, time, igrpid, igrpcalls, igrpmaxcalls, msxid, clusterid):
    '''Inserts the fetched IGRP reading into the DB'''
    try:
        cur.execute('''
            insert into %s (time, igrp_id, calls, max_calls, msx_id, cluster_id)
            values ("%s", %s, %s, %s, %s, %s)''' 
            % (table, time, igrpid, igrpcalls, igrpmaxcalls, msxid, clusterid)
            )
    except MySQLdb.Error as e:
        print 'Error %d: %s' % (e.args[0],e.args[1])
        db.rollback()
        sys.exit(1)

def get_active_msx_data(cur):
    '''Gets the ID, IP address, port number, username, password, private key, and cluster ID of the
    primary MSX under the repl_master cluster.

    It will return all available MSXs'''

    try:
        cur.execute('''select
        msxs.id, ip_address , port , username , password , private_key, cluster_id
        from msxs join clusters
        on clusters.id = msxs.cluster_id and
        clusters.active = 1 and
        msxs.primary_msx = 1''')

    except MySQLdb.Error as e:
        print 'Error %d: %s' % (e.args[0],e.args[1])
        sys.exit(1)

    if cur.rowcount > 0:
        return cur.fetchall()

def create_ssh_conn(ip, port, username, password, private_key=None):
    '''Create and return an SSH client object.

    Checks if there's a supplied private key file path, and will use it instead of the password.
    If neither is supplied, it'll exit.'''
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    if private_key is not None:
        privatekeyfile = os.path.expanduser(private_key)
        privatekey = paramiko.RSAKey.from_private_key_file(privatekeyfile)
        client.connect(ip, port=port, username=username, pkey=privatekey, timeout=5)
    elif password is not None:
        client.connect(ip, port=port, username=username, password=password, timeout=5)
    else:
        print 'No proper authentication method supplied - exiting!'
        sys.exit(1)
    return client

def add_readings(msx):
    '''The main function, and is the one that gets added to threads to fetch readings
    from the MSXs and insert them into the DB'''

    db, cur = get_db(dbhost, dbusername, dbpassword, dbname)

    # id, IP address, port number, username, password, private key, and cluster ID
    print 'Creating SSH connection for MSX ' + msx[1]
    try:
        sshconn = create_ssh_conn(msx[1], msx[2], msx[3], msx[4], msx[5])
    except socket.timeout:
        print 'Connection timed out :('
        close_db(db)
        sys.exit(1)
    except Exception as e:
        print e
        close_db(db)
        sys.exit(1)

    stdin, stdout, stderr = sshconn.exec_command(
        '''for igrp in %s; do cli igrp lkup $igrp | grep "Iedge Group\|maxCallsTotal\|  CallsTotal"; done''' % bashigrpslist)
    state = 0
    for line in stdout.readlines():
        # print line.rstrip()
        if state == 0:
            igrpname = str(line.rstrip().split()[2])
            state += 1
            continue
        elif state == 1:
            igrpcalls = int(line.rstrip().split()[1])
            state += 1
            continue
        else:
            igrpmaxcalls = str(line.rstrip().split()[1])
            state = 0

        if igrpname in igrpslist:
            # cur, table, time, igrpid, igrpcalls, igrpmaxcalls, msxid, clusterid
            igrpid = igrpslist[igrpname]

            try:
                add_igrps_readings(cur, tblreadings, time, str(igrpid), igrpcalls, igrpmaxcalls, msx[0], msx[6])
                db.commit()
            except MySQLdb.Error as e:
                print 'Error %d: %s' % (e.args[0],e.args[1])
                # Discard all DB changes and exit
                db.rollback()
                close_ssh_conn(sshconn)
                # close_db(db)
                sys.exit(1)

    close_ssh_conn(sshconn)

def close_ssh_conn(conn):
    '''Closes DB connection'''
    if conn:
        conn.close()

try:
    with PidFile(piddir='.'):
        print 'Starting Fetchr'

        # Get a DB and Cursor
        db, cur = get_db(dbhost, dbusername, dbpassword, dbname)

        # Get a list of the active MSXs and their data for query
        activemsxs = get_active_msx_data(cur)
        # print activemsxs

        # Create a list of the enabled IGRPs from the DB
        try:
            igrpslist = get_igrps_list(cur)
        except AttributeError:
            print 'There are no IGRPs in the DB - please run igrp_updatr.py first!'
            sys.exit(1)
        # print igrpslist

        # Create a space-separated list of the IGRPs
        bashigrpslist = ''
        for igrp in igrpslist.keys():
            bashigrpslist += igrp + ' '

        # print bashigrpslist

        # Loop over the MSXs
        for msx in activemsxs:
            t = Thread(target=add_readings, args=(msx,))
            t.start()

        close_db(db)

except:
    print 'Unable to start Fetchr - another instance is running!'

What do you think?

trbs commented 8 years ago

I think your current use of non-daemon Thread's are the issue.

Would I be right to say that the most time intensive part is in these thread's ?

After your t.start() the threads and close the database connection db the with PidFile(...) block will end.

The main thread of the Python interpreter will end but Python will wait on all the other threads to finish (since they are not daemon threads).

You should join the threads or better yet use a thread pool (what if there are a large number of MSXs).

Try the following in your code:

...
        # Loop over the MSXs
        threads = []
        for msx in activemsxs:
            t = Thread(target=add_readings, args=(msx,))
            t.start()
            threads.append(t)

        close_db(db)

        for t in threads:
            t.join()
...

This will force Python to wait for all the threads to finish before closing the PidFile.

sergani commented 8 years ago

Many thanks for the tip - your proposed solution helped and now I see the .pid file - the other instance doesn't work as long as the PID file is there. This is an internal solution for some of our systems, the number of MSXs will not be huge to make us worry.

Thanks again! //M