Support asynchronous logging #40

Closed ablaom closed 4 months ago

ablaom commented 1 year ago

It seems one cannot log runs to a single experiment asynchronously:

using MLJModels
using MLJBase
using MLFlowClient
using MLJFlow

logger = MLFlowLogger("", experiment_name="white moon")

X, y = @load_iris

using .Threads
model = (@iload DecisionTreeClassifier pkg=DecisionTree)()
# 5

@sync for i in 1:5
    Threads.@spawn evaluate(model, X, y; logger)

    nested task error: HTTP.Exceptions.StatusError(400, "POST", "/api/2.0/mlflow/experiments/create", HTTP.Messages.Response:
    HTTP/1.1 400 Bad Request
    Server: gunicorn
    Date: Sun, 24 Sep 2023 20:35:24 GMT
    Connection: close
    Content-Type: application/json
    Content-Length: 95

    {"error_code": "RESOURCE_ALREADY_EXISTS", "message": "Experiment 'white moon' already exists."}""")
      [1] mlfpost(mlf::MLFlow, endpoint::String; kwargs::Base.Pairs{Symbol, Union{Missing, Nothing, String}, Tuple{Symbol, Symbol, Symbol}, NamedTuple{(:name, :artifact_location, :tags), Tuple{String, Nothing, Missing}}})
        @ MLFlowClient ~/.julia/packages/MLFlowClient/Szkbv/src/utils.jl:74
      [2] mlfpost
        @ ~/.julia/packages/MLFlowClient/Szkbv/src/utils.jl:66 [inlined]
      [3] createexperiment(mlf::MLFlow; name::String, artifact_location::Nothing, tags::Missing)                                                                                    
        @ MLFlowClient ~/.julia/packages/MLFlowClient/Szkbv/src/experiments.jl:21
      [4] createexperiment
        @ ~/.julia/packages/MLFlowClient/Szkbv/src/experiments.jl:16 [inlined]
      [5] #getorcreateexperiment#7
        @ ~/.julia/packages/MLFlowClient/Szkbv/src/experiments.jl:103 [inlined]
      [6] log_evaluation(logger::MLFlowLogger, performance_evaluation::PerformanceEvaluation
{MLJDecisionTreeInterface.DecisionTreeClassifier, Vector{LogLoss{Float64}}, Vector{Float64}, Vector{typeof(predict)}, Vector{Vector{Float64}}, Vector{Vector{Vector{Float64}}}, Vector{NamedTuple{(:tree, :raw_tree, :encoding, :features), Tuple{DecisionTree.InfoNode{Float64, UInt32}, DecisionTree.Root{Float64, UInt32}, Dict{UInt32, CategoricalArrays.CategoricalValue{String, UInt32}}, Vector{Symbol}}}}, Vector{NamedTuple{(:classes_seen, :print_tree, :features), Tuple{CategoricalArrays.CategoricalVector{String, UInt32, String, CategoricalArrays.CategoricalValue{String, UInt32}, Union{}}, MLJDecisionTreeInterface.TreePrinter{DecisionTree.Root{Float64, UInt32}}, Vector{Symbol}}}}, CV})
        @ MLJFlow ~/.julia/packages/MLJFlow/TqEtw/src/base.jl:2
      [7] evaluate!(mach::Machine{MLJDecisionTreeInterface.DecisionTreeClassifier, true}, resampling::Vector{Tuple{Vector{Int64}, UnitRange{Int64}}}, weights::Nothing, class_weights::Nothing, rows::Nothing, verbosity::Int64, repeats::Int64, measures::Vector{LogLoss{Float64}}, operations::Vector{typeof(predict)}, acceleration::CPU1{Nothing}, force::Bool, logger::MLFlowLogger, user_resampling::CV)                                                          
        @ MLJBase ~/.julia/packages/MLJBase/ByFwA/src/resampling.jl:1314
      [8] evaluate!(::Machine{MLJDecisionTreeInterface.DecisionTreeClassifier, true}, ::CV, ::Nothing, ::Nothing, ::Nothing, ::Int64, ::Int64, ::Vector{LogLoss{Float64}}, ::Vector{typeof(predict)}, ::CPU1{Nothing}, ::Bool, ::MLFlowLogger, ::CV)                           
        @ MLJBase ~/.julia/packages/MLJBase/ByFwA/src/resampling.jl:1335
      [9] evaluate!(mach::Machine{MLJDecisionTreeInterface.DecisionTreeClassifier, true}; resampling::CV, measures::Nothing, measure::Nothing, weights::Nothing, class_weights::Nothing, operations::Nothing, operation::Nothing, acceleration::CPU1{Nothing}, rows::Nothing, repeats::Int64, force::Bool, check_measure::Bool, verbosity::Int64, logger::MLFlowLogger)   
        @ MLJBase ~/.julia/packages/MLJBase/ByFwA/src/resampling.jl:1015
     [10] evaluate(::MLJDecisionTreeInterface.DecisionTreeClassifier, ::NamedTuple{(:sepal_length, :sepal_width, :petal_length, :petal_width), NTuple{4, Vector{Float64}}}, ::Vararg{Any}; cache::Bool, kwargs::Base.Pairs{Symbol, MLFlowLogger, Tuple{Symbol}, NamedTuple{(:logger,), Tuple{MLFlowLogger}}})         
        @ MLJBase ~/.julia/packages/MLJBase/ByFwA/src/resampling.jl:1029
     [11] (::var"#7#8")()
        @ Main ./threadingconstructs.jl:373

<repeats several times>
ablaom commented 1 year ago

@deyandyankov Could this possibly originate from a limitation of MLFlowClient.jl or mlflow itself?

deyandyankov commented 1 year ago

@ablaom was at least one experiment recorded? Seems to me that the first time evaluate was spawned, an experiment was created (white moon), and then the other executions try to create an experiment with the same name, which mlflow is denying:

    {"error_code": "RESOURCE_ALREADY_EXISTS", "message": "Experiment 'white moon' already exists."}""")

There is no strict limitation in MLFlowClient to explicitly disable concurrency as far as I can remember.

pebeto commented 1 year ago

@deyandyankov Could it be related with the way mlflow handles this pseudo-random names? Maybe they use the unix timestamp.

ablaom commented 1 year ago

| was at least one experiment recorded?


If we send a message to the service to create a new experiment, don t' we need to block logging until both the experiment is created and an experiment name is allocated? I don't see any blocking happening at present. https://docs.julialang.org/en/v1/manual/asynchronous-programming/#Communicating-with-Channels

ablaom commented 12 months ago

Looks like mlflow does not (or at least at one point did not) support asynchronous actions: https://github.com/mlflow/mlflow/issues/1550#issuecomment-1024492066

pebeto commented 12 months ago

In that case, we can generate the random names by ourselves, solving the naming problem you identified.

ablaom commented 12 months ago

I'm not sure I follow. Perhaps I misunderstand the problem. Be great if you can post a PR to test your theory.

pebeto commented 11 months ago

mlflow 2.8.0 was released with an experimental async logging for metrics, params and tags. Maybe we can take this again.


ablaom commented 11 months ago

Thanks for flagging the update!

I'm guessing this won't "just work" and that we need to buy into some new messaging, or something?

pebeto commented 9 months ago

This is not already fixed by mlflow. Including the response you posted, I'm getting four experiments with the same name (that must be impossible).

I suggest that we can handle something like a queue in MLFlowClient to avoid this kind of issues, or simply disallowing concurrency in our project. Below are the code we need to be aware of.

function mlfget(mlf, endpoint; kwargs...)
    apiuri = uri(mlf, endpoint, kwargs)
    apiheaders = headers(mlf, Dict("Content-Type" => "application/json"))
        response = HTTP.get(apiuri, apiheaders)
        return JSON.parse(String(response.body))
    catch e

function mlfpost(mlf, endpoint; kwargs...)
    apiuri = uri(mlf, endpoint)
    apiheaders = headers(mlf, Dict("Content-Type" => "application/json"))
    body = JSON.json(kwargs)
        response = HTTP.post(apiuri, apiheaders, body)
        return JSON.parse(String(response.body))
    catch e

function getexperiment(mlf::MLFlow, experiment_id::Integer)
        result = _getexperimentbyid(mlf, experiment_id)
        return MLFlowExperiment(result)
    catch e
        if isa(e, HTTP.ExceptionRequest.StatusError) && e.status == 404
            return missing

function createexperiment(mlf::MLFlow; name=missing, artifact_location=missing, tags=missing)
    endpoint = "experiments/create"
    if ismissing(name)
        name = string(UUIDs.uuid4())
    result = mlfpost(mlf, endpoint; name=name, artifact_location=artifact_location, tags=tags)
    experiment_id = parse(Int, result["experiment_id"])
    getexperiment(mlf, experiment_id)

function getorcreateexperiment(mlf::MLFlow, experiment_name::String; artifact_location=missing, tags=missing)
    exp = getexperiment(mlf, experiment_name)
    if ismissing(exp)
        exp = createexperiment(mlf, name=experiment_name, artifact_location=artifact_location, tags=tags)

I don't know if we have something like Python async/await in Julia. Do you know someone who can help us with that? @ablaom

ablaom commented 9 months ago

Julia fully supports ansynchronous programming: https://docs.julialang.org/en/v1/manual/asynchronous-programming/

What you call a "queue" is called a Channel.

ablaom commented 9 months ago

I have asked @OkonSamuel to have a look into this. He has expertise in this area (but is also quite busy),

pebeto commented 9 months ago

Adding more information:

mlflow is not fully accepting async operations. I can't say this is completely true, but sometimes it reports three experiments with the same name. This must be impossible by its own documentation. This could be not something related to us, but can be solved using channels (not sure).

ablaom commented 8 months ago

mlflow is not fully accepting async operations

Did you mean "`mlflow is now fully accepting async operations?

ablaom commented 7 months ago

The proposal referenced above may resolve this issue.

pebeto commented 4 months ago

Closing it in favor of #41.