acsicuib / YAFS

Yet Another Fog Simulator (YAFS)
MIT License
98 stars 72 forks source link

Preemption of tasks #59

Closed Sherlocked556 closed 2 years ago

Sherlocked556 commented 2 years ago

Hi @wisaaco , if I want to preempt tasks and perform them later, is there a provision for the same in the simulator. I wish to implement something like a task being performed by a fog node and it is completed by a certain percentage, I deploy another task of higher priority (I distinguish using msg-type) on the same node and complete the remaining task after the current high priority task.

wisaaco commented 2 years ago

Hi @Sherlocked556,

It is not trivial but it is possible. It is necessary to modify a couple of functions in the core package. All messages are consumed from a queue list, identified by application id, module id (service) and instance id. In your case, each type of priority should have its own queue list. An example of this is:

msg = yield self.consumer_pipes["%s%s%i"%(app_name,module,ides)].get() %line 550 core.py

At that point (550line) of the code, the message with the highest priority should be selected. It is necessary to implement an ordered selection of messages according to priority. To sum up, the message should have a new attribute, e.g. priority type. And when it is inserted into a pipe or consumed, check that attribute. An example of insertion is in line 241 (core.py) The interrupted task/message can be reinserted into its corresponding queue. Indicating with a new attribute the remaining execution time.

Regarding the analysis of the results, I think the easiest way is that the traces in the file are still generated in the same way. Using pandas you could find out which trace has been interrupted and obviously, calculate the response time of the different messages with or without priority. Best

sanket099 commented 2 years ago

Hi @wisaaco , a quick question , how to interrupt message and reinsert in the queue with respect to the core.py ? which code lines or function would be potentially responsible of achieving that.

wisaaco commented 2 years ago

To control the interruption of a DES process within the simulator requires the incorporation of more events. Let's take it one step at a time.

Each DES process that represents an "app-module-instance" simulates the service time with a simple timeout event. In our case, it is in line 591:

yield self.env.timeout(service_time)

This "app-module-instance" (aka. DES process) will become available to "execute" another task when this service time has elapsed. We need to incorporate more events to break this timeout condition.

Let me introduce a small example to illustrate the new functionality you need. We need to use some functionalities of simpy library. YAFS relays in this library to manage the simulation.

import simpy

env = simpy.Environment()

interruption = env.event()

def my_proc1(env):
    while True:
        print("On proc1")
        print(env.active_process)  # will print "p1"
        yield env.timeout(10) | interruption # Magic point!
        print("I'm waking up at time:", env.now)

def my_proc2(env):
    while True:
        #wait at least 2 unit times
        yield env.timeout(2)
        print("On proc2")
        print(env.active_process)  # will print "p2"
        interruption.succeed() #trigger the interruption event

p1 = env.process(my_proc1(env))
p2 = env.process(my_proc2(env))

# Lets run this simulation (run line by line)

#Two Events at time 0: proc1, proc2
env.step() #runs the proc1
print(env.now)
env.step() #runs the proc2
print(env.now)

# One event at time 2:
env.step() #next event is proc2 
print(env.now)

#but the previous event, trigger the "interruption event" in time 2 as well
env.step()
env.step() #same time: 2

In this code, we run two DES processes (proc1, and proc2). Each one has a different timeout. Proc1 runs each 10-time unit, and Proc2 runs each 2-time unit. But, Proc2 can break the timeout condition of Proc1 using the interruption event. Try to run this code by invoking "env.step()" sequentially to see the result.

Going back to the YAFS simulator, one way to implement this mechanism is to have as many interrupt events as "app-module-instances" processes. This interrupt can be triggered each time the message is transmitted to the module (def __network_process). The consuming module (def __add_consumer_module) has to control if the event is triggered which message to execute: the current one or the new one with higher priority. It is not trivial, but it is possible.

Initially, we designed YAFS as an M/M/1/FIFO model... but obviously, it can integrate more scheduling models.

wisaaco commented 2 years ago

I forgot to mention two points of the question, the control of the remaining execution time and the requeueing of the task. Now each message should have a new attribute regarding the computed time and the time to requeue it is defined in the variable: self.last_busy_time = {} # dict(zip(edges, [0.0] * len(edges))) You should check this value to put it in the "future list of events".