JuliaDynamics / DiscreteEvents.jl

Discrete event generation and simulation in Julia
MIT License
56 stars 10 forks source link

Preempting jobs in a queue #53

Open hdavid16 opened 1 year ago

hdavid16 commented 1 year ago

When a job of higher priority enters a queue (i.e., a PriorityQueue), we should have an option to enable preempting the job that is currently in service. If a preemption occurs, the time in service on the active job is stored and the job is moved back into the queue. It may be possible to do this with interrupt!. Maybe just need to sit down and write an example of how to do this.

hdavid16 commented 1 year ago

Supplementary information:

Here is an example of how to use a PriorityQueue rather than a Channel to model a M/M/1 queue:

using DiscreteEvents, DataStructures, Distributions, Random

mutable struct Client
    id::Int
    arrive::Float64  # arrival time at Server A
    start::Float64  # beginning of service time at Server A
    leave::Float64  # end of service time
    priority::Int # client priority
end

mutable struct Server
    id::Int
    svcdist::Distribution
    tbusy::Float64
end

has_pending_jobs(input) = !isempty(input)
function serve(clk, srvr, input, output, limit)
    # client = take!(input) # use this if priorities don't matter (queue is a Channel)
    @wait clk has_pending_jobs(input) #!isempty(inpug) fails and needs to be wrapped (need to make macro more robust)
    client = dequeue!(input)
    st = clk.time
    client.start = clk.time
    @delay clk rand(srvr.svcdist)
    client.leave = clk.time
    srvr.tbusy += clk.time - st
    push!(output, client)
    length(output) == limit && stop!(clk)
end

function arrive(clk, input, count)
    count[1] += 1
    client = Client(count[1], clk.time, 0.0, 0.0, rand(0:10))
    # push!(input, client) # use this if priorities don't matter (queue is a Channel)
    enqueue!(input, client, client.priority)
end

Random.seed!(0)

N = 100
M_serve = Exponential(1)
M_arr = Exponential(1)
count = [0]

clock = Clock()
# input = Channel{Client}(Inf) # use this if priorities don't matter (queue is a Channel)
input = PriorityQueue{Client, Real}()
output = Client[]

srvr = Server(1, M_serve, 0.0)
@event arrive(clock, input, count) every M_arr
@process serve(clock, srvr, input, output, N)
@run! clock 20
hdavid16 commented 1 year ago

Preemption can be down as follows:

using DiscreteEvents, DataStructures, Distributions, Random

mutable struct Client
    id::Int
    arrive::Float64  # arrival time at Server A
    start::Any  # beginning of service time at Server A
    leave::Float64  # end of service time
    service::Float64 # time in service
    priority::Int # client priority
end

mutable struct Server
    id::Int
    svcdist::Distribution
    job::Any#Pair{Client,Real}
    tbusy::Float64
end

struct Preempt end #preemption event

has_pending_jobs(input) = !isempty(input)
function serve(clk, srvr, input, output, limit)
    @wait clk has_pending_jobs(input) #!isempty(input) fails and needs to be wrapped (need to make macro more robust)
    @wait clk isnothing(srvr.job)
    try 
        client = dequeue!(input)
        srvr.job = client
        st = clk.time
        client.start = clk.time
        duration = rand(truncated(srvr.svcdist, client.service, Inf)) - client.service #expected remaining processing time
        @delay clk duration
        client.leave = clk.time
        srvr.tbusy += clk.time - st
        push!(output, client)
        srvr.job = nothing
        length(output) == limit && stop!(clk)
    catch exc
        if exc isa PrcException && exc.event isa Preempt
            println("Preempted client $(srvr.job.id) at $(tau(clk))")
            if !isnothing(srvr.job.start) #update service time only if job has a start time
                srvr.job.service += clk.time - srvr.job.start
            end
            srvr.job.start = nothing
            enqueue!(input, srvr.job, srvr.job.priority)
            srvr.job = nothing
        else
            rethrow(exc)
        end
    end
end

function arrive_preempt(clk, srvr, srv_prc, input, count, M_arr, limit)
    count[1] += 1
    client = Client(count[1], clk.time, 0.0, 0.0, 0.0, rand(0:10))
    enqueue!(input, client, client.priority)
    if !isnothing(srvr.job) && client.priority < srvr.job.priority
        prc = clk.processes[srv_prc]
        interrupt!(prc, Preempt(), nothing)
    end
    @delay clk rand(M_arr)
    count[1] == limit && stop!(clk)
end

Random.seed!(0)

N = 100
M_serve = Exponential(1)
M_arr = Exponential(1)
count = [0]

clock = Clock()
input = PriorityQueue{Client, Real}()
output = Client[]

srvr = Server(1, M_serve, nothing, 0.0)
srv_prc = @process serve(clock, srvr, input, output, N)
@process arrive_preempt(clock, srvr, srv_prc, input, count, M_arr, N*2)
@run! clock 50
hdavid16 commented 1 year ago

There are issues with DataStructures.jl when it comes to breaking ties: https://github.com/JuliaCollections/DataStructures.jl/issues/498#issuecomment-1527779264

It makes more sense to use a fifo (first in first out) for jobs of the same priority. However this is not guaranteed in DataStructures.jl.

pbayer commented 1 year ago

So do you think we should implement this ourselves? How important is that?

I understand, for example, that it could be used to model multitasking. What kind of applications are you thinking of?

hdavid16 commented 1 year ago

I found you can use a tuple for the priority which has the priority and arrival time. This way ties are broken properly. So I don't think we need to implement anything here. I can just add a server multitasking example with this to the list of examples.