Closed trws closed 3 months ago
Ok, this took far far too long to rework to use jansson. I think I ended up spending 5 hours figuring out what was wrong with my format strings and working out the one-off jansson interfaces to make this work, it might have been more. This at least gets the thing working, and starts to give us something to manage jansson values and build up serializers. It also fixes the memory leaks in resource from unfreed jansson objects is taken care of.
Hopefully this is good enough for the release, and I'll spend a bit more time to make something that has the uniform interface of object and array-like things in C++, because this feels like a loss:
{
- using namespace boost::json;
- auto &obj = jv.emplace_object ();
- obj = {{"policy", this->policy ()},
- {"queue_depth", m_queue_depth},
- {"max_queue_depth", m_max_queue_depth},
- {"queue_parameters", value_from (m_qparams)},
- {"policy_parameters", value_from (m_pparams)},
- {"action counts",
- {{"pending", m_pq_cnt},
- {"running", m_rq_cnt},
- {"reserved", m_oq_cnt},
- {"rejected", m_dq_cnt},
- {"complete", m_cq_cnt},
- {"cancelled", m_cancel_cnt},
- {"reprioritized", m_reprio_cnt}}}};
+ json::value qparams;
+ to_json (qparams, m_qparams);
+ json::value pparams;
+ to_json (pparams, m_pparams);
char buf[128] = {};
- auto add_queue = [&] (::boost::json::array &a, auto &map) {
+ auto add_queue = [&] (json_t *a, auto &map) {
for (auto &[k, jobid] : map) {
if (flux_job_id_encode (jobid, "f58plain", buf, sizeof buf) < 0)
- a.push_back (jobid);
+ json_array_append_new (a, json_integer (jobid));
else
- a.push_back (buf);
+ json_array_append_new (a, json_string (buf));
}
};
- auto &pending_queues = obj["pending_queues"].emplace_object ();
- add_queue (pending_queues["pending"].emplace_array (), m_pending);
- add_queue (pending_queues["pending_provisional"].emplace_array (), m_pending_provisional);
- add_queue (pending_queues["blocked"].emplace_array (), m_blocked);
- auto &scheduled_queues = obj["scheduled_queues"].emplace_object ();
- add_queue (scheduled_queues["running"].emplace_array (), m_running);
- add_queue (scheduled_queues["rejected"].emplace_array (), m_rejected);
- add_queue (scheduled_queues["canceled"].emplace_array (), m_canceled);
+ json::value pending;
+ pending.emplace_object ();
+ json::value pending_arr;
+ pending_arr.emplace_array ();
+ json_object_set (pending.get (), "pending", pending_arr.get ());
+ add_queue (pending_arr.get (), m_pending);
+ pending_arr.emplace_array ();
+ json_object_set (pending.get (), "pending_provisional", pending_arr.get ());
+ add_queue (pending_arr.get (), m_pending_provisional);
+ pending_arr.emplace_array ();
+ json_object_set (pending.get (), "blocked", pending_arr.get ());
+ add_queue (pending_arr.get (), m_blocked);
+
+ json::value scheduled;
+ scheduled.emplace_object ();
+ json::value scheduled_arr;
+ scheduled_arr.emplace_array ();
+ json_object_set (scheduled.get (), "running", scheduled_arr.get ());
+ add_queue (scheduled_arr.get (), m_running);
+ scheduled_arr.emplace_array ();
+ json_object_set (scheduled.get (), "rejected", scheduled_arr.get ());
+ add_queue (scheduled_arr.get (), m_rejected);
+ scheduled_arr.emplace_array ();
+ json_object_set (scheduled.get (), "canceled", scheduled_arr.get ());
+ add_queue (scheduled_arr.get (), m_canceled);
+
+ json_error_t err = {0};
+ jv = json::value (json::no_incref{},
+ json_pack_ex (&err,
+ 0,
+ // begin object
+ "{"
+ // policy
+ "s:s%"
+ // queue_depth
+ "s:I"
+ // max_queue_depth
+ "s:I"
+ // queue parameters
+ "s:O"
+ // policy parameters
+ "s:O"
+ // action counts
+ "s:o"
+ // pending queues
+ "s:O"
+ // scheduled queues
+ "s:O"
+ // end object
+ "}",
+ // VALUE START
+ // policy, str+length style
+ "policy",
+ this->policy ().data (),
+ this->policy ().length (),
+ // queue_depth
+ "queue_depth",
+ (json_int_t)m_queue_depth,
+ // max_queue_depth
+ "max_queue_depth",
+ (json_int_t)m_max_queue_depth,
+ // queue parameters
+ "queue_parameters",
+ qparams.get (),
+ // policy parameters
+ "policy_parameters",
+ pparams.get (),
+ // action counts
+ "action_counts",
+ json_pack ("{s:I s:I s:I s:I s:I s:I s:I}",
+ "pending",
+ m_pq_cnt,
+ "running",
+ m_rq_cnt,
+ "reserved",
+ m_oq_cnt,
+ "rejected",
+ m_dq_cnt,
+ "complete",
+ m_cq_cnt,
+ "cancelled",
+ m_cancel_cnt,
+ "reprioritized",
+ m_reprio_cnt),
+ // pending queues
+ "pending_queues",
+ pending.get (),
+ // scheduled queues
+ "scheduled_queues",
+ scheduled.get ()));
+ if (!jv.get ()) {
+ throw std::runtime_error (err.text);
+ }
}
I think we should test the other JSON libraries you listed to see if they have a substantial impact on Fluxion performance. I suspect partial cancel can be accelerated with a faster JSON library. I also wonder whether JSON manipulation will become a bottleneck with large JGFs.
I'll open another issue or discussion and we can look over some benchmarks/options and tradeoffs. Thanks for reviewing this, and I'm sorry all for being a bit grumpy about this, I had everything basically working before realizing the CI is missing jammy and having to rework it all. Hopefully the stats will be useful.
Attention: Patch coverage is 86.02941%
with 19 lines
in your changes missing coverage. Please review.
Project coverage is 75.5%. Comparing base (
8b2cb13
) to head (fdf1038
). Report is 124 commits behind head on master.
problem: it's hard to get information about the current state of queues withotu debugging fluxion
solution: add a stats callback that prints information about each queue including:
This does introduce a dependency on boost::json, which technically requires a higher version of boost. I think we already require a higher version for other reasons, but version 1.78 is already installed on TOSS4 from EPEL. May need to tweak the rhel8 container, will see. Here is some example output from a test where there are 10 jobs blocked on a drained node and two jobs that have completed:
There is plenty more information we could surface here, including the timestamps of jobs moving between states, current state each job considers itself to be in, etc. One thing I think we definitely want is something indicating partial release status, but we're currently not storing it so I'd rather do that after that gets worked out for final release.