frenetic-lang / pyretic

The Pyretic language and runtime system
http://frenetic-lang.org/pyretic/
159 stars 99 forks source link

Issue 3 - dynamic policy does not destroy previous versions of policy #29

Closed danieledc closed 10 years ago

danieledc commented 10 years ago

Hello everyone, I am Daniele De Cicco, master student of Computer Engineering at University of Naples (Italy). Since November I have been working on my master thesis at the Catholic University of Louvain (Belgium). I am developing my project with Pyretic and recently I have run into three different anomalies:

In case I create a dynamic policy that changes periodically, and each time installs a new query, the thread associated with the previous version of the policy is not destroyed but keeps on running.

This code exposes clearly the problem:

from pyretic.lib.corelib import *
from pyretic.lib.std import *
from pyretic.lib.query import *
from threading import Thread, current_thread
from time import sleep

def count_packets_printer(counts):
    print '\n'
    print current_thread()
    print '----SWITCH MONITOR------'
    print counts

def port_monitoring(dst_port,time_interval):
    q=count_packets(interval=time_interval,group_by=['srcip','dstip']) 
    q.register_callback(count_packets_printer)
    return (match(dstport=dst_port) >> q)

class test(DynamicPolicy):
    def __init__(self):
        super(test,self).__init__(false)
        self.start()

    def start(self):
        x=75
        while(True):
             self.policy = port_monitoring(x,5)
             x +=1
             sleep(4)

def main():
     return test()

I hope I was clear enough. Thanks in advance, Daniele

robertsoule commented 10 years ago

Hi Daniele, do you have an email address where I can contact you off of this forum? I'd like to ask you a few questions about your project.

joshreich commented 10 years ago

So, I'm not sure this is really a problem, so much as something we should think about. You are correct that each time a new count_packets is created it will spawn a new thread. When a count_packets instance is removed from the current policy, it is de-connected, not destroyed, so the thread hangs around though it doesn't do anything (and shouldn't utilize much in the way of resources---though having dozens or hundreds of these hanging around probably isn't a great idea). We could easily end the threads of count_packets instances that are de-connected. The only issue is what happens if you write a policy that creates a count_packets instance, then stops using it, then starts using that same instance again? We'd need to hook up logic to respawn the thread and make sure all of the state (e.g., bucket counters remains the same). I don't think this is conceptually difficult, but the details will need to be implemented and there might be something I'm overlooking. Since this is part of the querying library, I'm assigning to @ngsrinivas .

mcanini commented 10 years ago

Well, maybe the developer knows what's best to do. In our case, we'd like to be able to terminate the thread once the query thread becomes de-connected. Is there a way to get ahold of an handle?

joshreich commented 10 years ago

I just pushed a small update to the query library (on a branch named after you https://github.com/frenetic-lang/pyretic/tree/marco) that would let you do this

try changing the above code to

def start(self):
    x=75
    while(True):
         old_pol = self.policy
         self.policy = port_monitoring(x,5)
         q = ast_fold(add_query_sub_pols,set(),old_pol)
         q.stop()
         x +=1
         sleep(4)
ngsrinivas commented 10 years ago

@joshreich Neat! This commit can be merged into master IMO.

danieledc commented 10 years ago

Good job guys. It works properly with the code I posted.

I tried also with this:

    def start(self):
        x=75
        self.policy = port_monitoring(x,5)
        while(x<77):
            old_pol = self.policy
            self.policy = port_monitoring(x,5)
            q = ast_fold(add_query_sub_pols,set(),old_pol)
            q.stop()
            x +=1
            sleep(4)
        old_pol = self.policy
        self.policy = drop
        q =ast_fold(add_query_sub_pols,set(),old_pol)
        q.stop()

and I noticed that the thread of the last query ( port_monitoring(x,76) ) is not killed. In other words, the solution works fine only when a query is substituted with another query.

Do I need to open a new issue about it?

Thanks in advance, Daniele

joshreich commented 10 years ago

Hi Daniele,

Sorry for the delayed response. How did you instrument the code posted above? If you checked immediately after stop() was called on last query thread, there's a very high chance you would see that it is still running, since the current implementation polls self.running (which is set by stop()) periodically. You need to give it a couple of seconds. I've pushed an example w/ instrumentation to the marco branch (it also has a couple of fixes). Take a look and see if this addresses your problem.

-Josh

danieledc commented 10 years ago

Hi Josh, sorry for the late reply. I have followed the example you pushed and now my code works properly. Thank you very much.

Regards, Daniele

joshreich commented 10 years ago

excellent, glad to hear it.