PetrGlad / python-prevayler

Prevayler implemented in Python
ISC License
6 stars 2 forks source link

HybridLog: support relational queries #2

Closed mbucc closed 12 years ago

mbucc commented 13 years ago

Some data is a better fit for SQL.

For example, I need to be able to count the number of events on any date (or date range), in any venue (or venue combination), in any category (or category combination). N!/(N-K)! gets big pretty fast---so, too many states to store in memory. But SQL is perfect for this.

Attached HybridLog uses in-memory SQLite database to keep with the spirit of prevayler. Snapshots just dump the whole db to disk. Can even handle schema changes as an event. :)

class HybridLog(Log):
        SQLDUMP_SUFFIX = "sql"
        SCHEMA_FILENAME = "schema.sql"

        def __init__(self, dataDir):
                Log.__init__(self, dataDir)
                self.conn = None

        def make_sql_filename(self, serialid):
                basename = "%u.%s" % (serialid, self.SQLDUMP_SUFFIX)
                return os.path.join(self.dataDir, basename)

        def make_schema_filename(self):
                return os.path.join(self.dataDir, self.SCHEMA_FILENAME)

        def loadInitState(self, initStateConstructor):          
                (initialState, replayLog) = \
                    Log.loadInitState(self, initStateConstructor)

                (self.serialid, snapshot_id, loglist) = self.findPieces()               
                if snapshot_id:
                        sqlfn = self.make_sql_filename(snapshot_id)
                else:
                        sqlfn = self.make_schema_filename()
                fp = open(sqlfn, 'r')
                s = fp.read()
                fp.close()
                self.conn = sqlite3.connect(":memory:")
                self.conn.executescript(s)
                return (initialState, replayLog)

        def putSnapshot(self, root):
                Log.putSnapshot(self, root)
                fn = self.make_sql_filename(self.serialid)
                with open(fn, 'w') as fp:
                    for line in self.conn.iterdump():
                        fp.write('%s\n' % line)
                fp.close()

        def close(self):
                Log.close(self)
                self.conn.close()
                self.conn = None
mbucc commented 13 years ago

Though this design requires call() to take third param---the db connection.

I didn't want to persist the connection as state in root. (I should probably fork and then give you a patch---but I will wait to see if you like the idea ...)

PetrGlad commented 12 years ago

I think better implementation can use an object persistent system that holds connection to sqlite and execute prevayler transactions with that object as parameter. As a rule of thumb I prefer not to use subclassing if composition suffices. So implementation for this approach may look like:

psys = PSys(Log(dataDir), InMemorySqLite())
psys.exe(SqlTn("create table graph (a int, b int, weight float)"))
psys.exe(SqlTn("insert into graph (a, b, weight) values (1, 2, 0.5)"))

where InMemorySqLite constructor initializes connection to empty in-memory db and SqlTn.__call__ executes update on that connection using normal python's db api. To support snapshots InMemorySqLite.__getstate__() and __setstate__() should be overridden to save and restore state of whole db.

Is this appropriate for you?

mbucc commented 12 years ago

I agree re: composition.

I need to think some on this. Thinking out loud ...

(1) If a transaction updates both log and sqlite, don't they both belong inside one exe() call so read operations are consistent. It's possible that a read occurs between the two locks (the two exe() calls).

(2) Some transactions update both the SQL state and the in-memory state. For example, a cache---if key not in root, get data from SQL and load into root[key], otherwise return root[key]. (Although so far, I haven't needed caching as in-memory SQLite is plenty fast.)

(3) How could this work with a distributed system where multiple PSys instances are kept in sync by message passing?

By the way, I find this Martin Fowler article as a good reference for the topic of prevalyer http://martinfowler.com/eaaDev/EventSourcing.html.

What did you read that motivated you to write this code?

mbucc commented 12 years ago

I think db connection is state.

I wonder if I can right transaction code that simply uses root['db_connection'] and then we just use Log as is.

PetrGlad commented 12 years ago

It's up to you how you organize transactions however prevalyer (both java, and this one) is so extremely simple due to some constraints on your application. I suggest you reading available examples on sources of java version (http://prevayler.org). One of these requirements is that your application is assumed to be tingle threaded (or at least that part that operates on prevayler objects should be in single thread - let's call it an agent). Reads can be also executed as transactions. But we need some way to avoid writing RO transactions to log (no need to save them), for example by adding Psys.read method.

You may send log to other instance (slave) to replay it there. But you asking for too much complexity if you need multi-master updates.

Ok in more detail if there was something unclear:

from pv.core import PSys, Log
from pysqlite2 import dbapi2 as sqlite
import os, time

class Root:
    def __init__(self):
        self.attributes = {}
        self.dbConn = sqlite.connect(":memory:")        
        self.createInitialSchema()

    def createInitialSchema(self):
        "Alternatively it can also be a separate transaction"
        cur = self.dbConn.cursor()
        cur.execute("create table props(name string, value string);")

    def __getstate__(self, value):        
        value = {"attributes": self.attributes,
                 "dbState" : self.dbConn.iterdump()}
        return value

    def __setstate__(self, value):
        self.attributes = value["attributes"]        
        self.dbConn = sqlite.connect(":memory:")
        for sqlCmd in value["dbState"]:
            self.dbConn.cursor().execute(sqlCmd) 

class DbTx:
    def __init__(self, name, val):
        self.name = name
        self.val = val    

    def __call__(self, root): 
        cur = root.dbConn.cursor()
        cur.execute("insert into props (name, value) values (?, ?)", [self.name, self.val])
        root.attributes["dirty"] = True

def someFn(root):
    "Simple operation"    
    root.attributes["dirty"] = False

def showProps(root):
    cur = root.dbConn.cursor()
    return cur.execute("select * from props").fetchall()

if __name__ == "__main__":
    dataDir = "./data"
    if not os.path.isdir(dataDir):
        os.makedirs(dataDir)
    psys = PSys(Log(dataDir), lambda : Root())
    psys.exe(DbTx("level", time.time()))
    psys.exe(someFn)
    print "props", showProps(psys.root)
    print "attributes", psys.root.attributes
    psys.log.close()

You can at most save a description of how to re-establish connection (e.g. connection uri). Connection itself must not be persisted since next time you load it it will be invalid anyway.

mbucc commented 12 years ago

Ok, I get it---define my own Root object. It's a better idea; very nice. I will try it out.

Re: RO transactions, I just grab psys.root instance and read what I need. so for me, no code change is needed to support this.

Re: single-threaded writer, yes, makes sense too. so, writes just have to be fast enough to meet any peak load. should not be too hard, as logging to file and updating in-memory sqlite3 are both really fast.

Closing.

PetrGlad commented 12 years ago

And since you've asked. A the time I wrote this I needed to solve some performance problems with a billing system at work and looked for alternatives to a very expensive server+DBMS. So this was an experiment in that direction.