usegalaxy-eu / tpv-metascheduler-api

Metascheduler for TPV as Service
MIT License
0 stars 3 forks source link

Additional stats available in the Galaxy database #6

Open sebastian-luna-valero opened 4 months ago

sebastian-luna-valero commented 4 months ago

Hi @sanjaysrikakulam

As discussed in today's WP4 meeting, in addition to queued_job_count, what other stats can we get from the Galaxy database about jobs?

Would it be possible to get

Many thanks! Sebastian

sanjaysrikakulam commented 4 months ago

In general, we can get all those; we need to construct the appropriate queries we can use via TPV and send them to TPV API as additional destination attributes. However, if you need CGroup data about the job, I am unsure whether we get these when we use remote resources. I need to cross-check on that. I will update you on all these soon.

sebastian-luna-valero commented 4 months ago

@enolfc's additional parameter that is interesting:

pauldg commented 4 months ago

Reference to @abdulrahmanazab s comment on additional stats in his PR:

Originally posted by @abdulrahmanazab in https://github.com/usegalaxy-eu/tpv-metascheduler-api/issues/7#issuecomment-1960958822

pauldg commented 4 months ago

@abdulrahmanazab @sanjaysrikakulam @sebastian-luna-valero

I've been working on this query to get the average queue time from the GalaxyDB since we already keep a history of the job state (new, queued, running) for each job.

SELECT
  j.destination_id,
  AVG(qt.duration) AS average_waiting_time
FROM (
      SELECT 
          queued.job_id,
          queued.create_time AS queued_time,
          running.create_time AS running_time,
          running.create_time - queued.create_time AS duration
      FROM 
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'queued' GROUP BY job_id) AS queued
      JOIN 
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'running' GROUP BY job_id) AS running
      ON 
          queued.job_id = running.job_id
      ) AS qt 
JOIN 
    (
      SELECT 
        id, 
        destination_id
      FROM
        job
      WHERE
        LOWER(destination_id) LIKE LOWER('%pulsar%')
    ) AS j
ON
    j.id = qt.job_id
GROUP BY
  j.destination_id
ORDER BY
  j.destination_id;

The output is something like this:

  destination_id   | average_waiting_time 
-------------------+----------------------
 pulsar_be_tpv     | 00:35:07.283159
 pulsar_sanjay_tpv | 00:00:11.969259

I'll have to figure out how to execute this query within TPV, but we can still do this without needing another database.

sebastian-luna-valero commented 4 months ago

Amazing, thank you very much @pauldg

pauldg commented 4 months ago

Some more statistics within a time window as well:

  destination_id   | count | avg_queue_time  | min_queue_time  | median_queue_time | perc_95_queue_time | perc_99_queue_time | max_queue_time  |  avg_run_time   |  min_run_time   | median_run_time | perc_95
_run_time | perc_99_run_time |  max_run_time   
-------------------+-------+-----------------+-----------------+-------------------+--------------------+--------------------+-----------------+-----------------+-----------------+-----------------+--------
----------+------------------+-----------------
 pulsar_be_tpv     |    21 | 00:35:07.283159 | 00:00:01.072093 | 00:00:10.20019    | 02:27:28.891529    | 05:34:23.599588    | 06:21:07.276603 | 00:00:22.076166 | 00:00:08.381275 | 00:00:16.316088 | 00:00:4
7.203214  | 00:00:48.153607  | 00:00:48.391205
 pulsar_sanjay_tpv |     2 | 00:00:11.969259 | 00:00:11.271614 | 00:00:11.969259   | 00:00:12.59714     | 00:00:12.652951    | 00:00:12.666904 | 00:00:13.602212 | 00:00:13.602212 | 00:00:13.602212 | 00:00:1
3.602212  | 00:00:13.602212  | 00:00:13.602212
pauldg commented 4 months ago

https://github.com/usegalaxy-eu/tpv-metascheduler-api/pull/9 Here I've converted the query to sqlalchemy and this is now working with TPV and I've tested this using my galaxy test instance.

pauldg commented 3 months ago

Updated SQL query for these stats:

SELECT
  j.destination_id,
  COUNT(j.id),
  AVG(qt.queue_time) avg_queue_time,
  MIN(qt.queue_time) min_queue_time,
  PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY qt.queue_time) as median_queue_time,
  PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY qt.queue_time) as perc_95_queue_time,
  PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY qt.queue_time) as perc_99_queue_time,
  MAX(qt.queue_time) as max_queue_time, 
  AVG(qt.run_time) avg_run_time,
  MIN(qt.run_time) min_run_time,
  PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY qt.run_time) as median_run_time,
  PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY qt.run_time) as perc_95_run_time,
  PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY qt.run_time) as perc_99_run_time,
  MAX(qt.run_time) max_run_time
FROM (
      SELECT 
          queued.job_id,
          queued.create_time AS queued_time,
          running.create_time AS running_time,
          finished.create_time AS finished_time,
          running.create_time - queued.create_time AS queue_time,
          finished.create_time - running.create_time AS run_time
      FROM 
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'queued' GROUP BY job_id) AS queued
      JOIN 
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'running' GROUP BY job_id) AS running
      ON 
          queued.job_id = running.job_id
      LEFT JOIN 
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'ok' GROUP BY job_id) AS finished
      ON 
          queued.job_id = finished.job_id
      ) AS qt 
JOIN 
    (
      SELECT 
        id, 
        destination_id
      FROM
        job
      WHERE
        LOWER(destination_id) LIKE LOWER('%pulsar%')
        AND create_time > (NOW() AT TIME ZONE 'UTC' - '30 months'::interval)
    ) AS j
ON
    j.id = qt.job_id
GROUP BY
  j.destination_id
ORDER BY
  j.destination_id;
`
`
pauldg commented 3 months ago

Updated query grouped by destinations and tools:

SELECT
  j.destination_id,
  j.tool_id,
  COUNT(j.id),
  AVG(qt.queue_time) avg_queue_time,
  MIN(qt.queue_time) min_queue_time,
  PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY qt.queue_time) as median_queue_time,
  PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY qt.queue_time) as perc_95_queue_time,
  PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY qt.queue_time) as perc_99_queue_time,
  MAX(qt.queue_time) as max_queue_time,
  AVG(qt.run_time) avg_run_time,
  MIN(qt.run_time) min_run_time,
  PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY qt.run_time) as median_run_time,
  PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY qt.run_time) as perc_95_run_time,
  PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY qt.run_time) as perc_99_run_time,
  MAX(qt.run_time) max_run_time
FROM (
      SELECT
          queued.job_id,
          queued.create_time AS queued_time,
          running.create_time AS running_time,
          finished.create_time AS finished_time,
          running.create_time - queued.create_time AS queue_time,
          finished.create_time - running.create_time AS run_time
      FROM
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'queued' GROUP BY job_id) AS queued
      JOIN
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'running' GROUP BY job_id) AS running
      ON
          queued.job_id = running.job_id
      LEFT JOIN
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'ok' GROUP BY job_id) AS finished
      ON
          queued.job_id = finished.job_id
      ) AS qt
JOIN
    (
      SELECT
        id,
        destination_id,
        tool_id
      FROM
        job
      WHERE
        create_time > (NOW() AT TIME ZONE 'UTC' - '30 days'::interval)
    ) AS j
ON
    j.id = qt.job_id
GROUP BY
  j.destination_id, j.tool_id
ORDER BY
  j.destination_id;
sanjaysrikakulam commented 3 months ago

As I mentioned in the meetings, we cannot use such large queries in TPV real-time to compute the tool's resource consumption and so on.

Here is the output of such a query (shared by @pauldg; I had to make a minor correction to it). This query took approximately 30 minutes or so.

EXPLAIN ANALYZE SELECT
  j.destination_id,
  j.tool_id,
  COUNT(j.id),
  AVG(qt.queue_time) avg_queue_time,
  MIN(qt.queue_time) min_queue_time,
  PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY qt.queue_time) as median_queue_time,
  PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY qt.queue_time) as perc_95_queue_time,
  PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY qt.queue_time) as perc_99_queue_time,
  MAX(qt.queue_time) as max_queue_time,
  AVG(qt.run_time) avg_run_time,
  MIN(qt.run_time) min_run_time,
  PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY qt.run_time) as median_run_time,
  PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY qt.run_time) as perc_95_run_time,
  PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY qt.run_time) as perc_99_run_time,
  MAX(qt.run_time) max_run_time
FROM (
      SELECT
          queued.job_id,
          queued.create_time AS queued_time,
          running.create_time AS running_time,
          finished.create_time AS finished_time,
          running.create_time - queued.create_time AS queue_time,
          finished.create_time - running.create_time AS run_time
      FROM
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'queued' GROUP BY job_id) AS queued
      JOIN
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'running' GROUP BY job_id) AS running
      ON
          queued.job_id = running.job_id
      LEFT JOIN
          (SELECT job_id, MIN(create_time) AS create_time FROM job_state_history WHERE state = 'ok' GROUP BY job_id) AS finished
      ON
          queued.job_id = finished.job_id
      ) AS qt
JOIN
    (
      SELECT
        id,
        destination_id,
        tool_id
      FROM
        job
      WHERE
        LOWER(tool_id) LIKE LOWER('toolshed.g2.bx.psu.edu/repos/iuc/rgrnastar/rna_star/2.5.2b-2') AND
        create_time > (NOW() AT TIME ZONE 'UTC' - '30 days'::interval)
    ) AS j
ON
    j.id = qt.job_id
GROUP BY
  j.destination_id, j.tool_id
ORDER BY
  j.destination_id;

                                                                                                     QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=20609750.29..20612153.91 rows=1 width=271) (actual time=1852717.712..1852717.850 rows=0 loops=1)
   Group Key: job.destination_id, job.tool_id
   ->  Sort  (cost=20609750.29..20609836.13 rows=34337 width=99) (actual time=1852717.709..1852717.846 rows=0 loops=1)
         Sort Key: job.destination_id, job.tool_id
         Sort Method: quicksort  Memory: 25kB
         ->  Merge Left Join  (cost=19147047.01..20607163.43 rows=34337 width=99) (actual time=1852717.677..1852717.814 rows=0 loops=1)
               Merge Cond: (job_state_history.job_id = job_state_history_2.job_id)
               ->  Merge Join  (cost=12859740.39..14198904.25 rows=15 width=95) (actual time=1852717.674..1852717.805 rows=0 loops=1)
                     Merge Cond: (job_state_history.job_id = job_state_history_1.job_id)
                     ->  Nested Loop  (cost=6261400.95..7479929.88 rows=1 width=87) (actual time=1852717.671..1852717.794 rows=0 loops=1)
                           ->  Finalize GroupAggregate  (cost=6261400.39..6376341.57 rows=453686 width=12) (actual time=255919.731..305355.773 rows=64941978 loops=1)
                                 Group Key: job_state_history.job_id
                                 ->  Gather Merge  (cost=6261400.39..6367267.85 rows=907372 width=12) (actual time=255919.706..283216.742 rows=64941978 loops=1)
                                       Workers Planned: 2
                                       Workers Launched: 0
                                       ->  Sort  (cost=6260400.36..6261534.58 rows=453686 width=12) (actual time=255919.277..277330.797 rows=64941978 loops=1)
                                             Sort Key: job_state_history.job_id
                                             Sort Method: external merge  Disk: 1652224kB
                                             ->  Partial HashAggregate  (cost=6213236.68..6217773.54 rows=453686 width=12) (actual time=160403.939..226229.219 rows=64941978 loops=1)
                                                   Group Key: job_state_history.job_id
                                                   Batches: 261  Memory Usage: 66073kB  Disk Usage: 2356256kB
                                                   ->  Parallel Seq Scan on job_state_history  (cost=0.00..6078239.91 rows=26999353 width=12) (actual time=0.447..127713.895 rows=66006610 loops=1)
                                                         Filter: ((state)::text = 'queued'::text)
                                                         Rows Removed by Filter: 398740044
                           ->  Index Scan using job_pkey on job  (cost=0.57..2.42 rows=1 width=75) (actual time=0.024..0.024 rows=0 loops=64941978)
                                 Index Cond: (id = job_state_history.job_id)
                                 Filter: ((lower((tool_id)::text) ~~ 'toolshed.g2.bx.psu.edu/repos/iuc/rgrnastar/rna_star/2.5.2b-2'::text) AND (create_time > (timezone('UTC'::text, now()) - '30 days'::interval)))
                                 Rows Removed by Filter: 1
                     ->  Finalize GroupAggregate  (cost=6598339.43..6713280.61 rows=453686 width=12) (never executed)
                           Group Key: job_state_history_1.job_id
                           ->  Gather Merge  (cost=6598339.43..6704206.89 rows=907372 width=12) (never executed)
                                 Workers Planned: 2
                                 Workers Launched: 0
                                 ->  Sort  (cost=6597339.41..6598473.62 rows=453686 width=12) (never executed)
                                       Sort Key: job_state_history_1.job_id
                                       ->  Partial HashAggregate  (cost=6550175.72..6554712.58 rows=453686 width=12) (never executed)
                                             Group Key: job_state_history_1.job_id
                                             ->  Parallel Seq Scan on job_state_history job_state_history_1  (cost=0.00..6078239.91 rows=94387162 width=12) (never executed)
                                                   Filter: ((state)::text = 'running'::text)
               ->  Finalize GroupAggregate  (cost=6287306.63..6402247.81 rows=453686 width=12) (never executed)
                     Group Key: job_state_history_2.job_id
                     ->  Gather Merge  (cost=6287306.63..6393174.09 rows=907372 width=12) (never executed)
                           Workers Planned: 2
                           Workers Launched: 0
                           ->  Sort  (cost=6286306.60..6287440.82 rows=453686 width=12) (never executed)
                                 Sort Key: job_state_history_2.job_id
                                 ->  Partial HashAggregate  (cost=6239142.92..6243679.78 rows=453686 width=12) (never executed)
                                       Group Key: job_state_history_2.job_id
                                       ->  Parallel Seq Scan on job_state_history job_state_history_2  (cost=0.00..6078239.91 rows=32180601 width=12) (never executed)
                                             Filter: ((state)::text = 'ok'::text)
 Planning Time: 1.327 ms
 Execution Time: 1852954.493 ms
(52 rows)

We can try to optimize the query to use CTEs and such; even then, it is not possible to keep it in TPV to do real-time data querying of the Galaxy DB. This might work when the Galaxy DB is really small.

pauldg commented 3 months ago

Thanks for checking Sanjay! In that case I would propose that we do move our approach to an InfluxDB and we populate this information once a week for example.

sanjaysrikakulam commented 3 months ago

Thanks for checking Sanjay! In that case I would propose that we do move our approach to an InfluxDB and we populate this information once a week for example.

Yup, I agree! We need to collect and store all these metrics using the TIG stack, like the AU, and then use it with TPV-API to query.

Plan of action:

  1. Identify the exact metrics we would need
  2. Create an appropriate SQL query with all the necessary attributes (tool-by-tool basis, destination basis, etc.).
  3. Design the Influxdb measurement such that there won't be much computation required when TPV-API is querying, so TPV will not spend more than a few milliseconds in fetching the necessary data
  4. We need a plan for configuring the remote/Pulsar destinations to ship the data to the InfluxDB. The EU could bake the Pulsar images with the required credentials and scripts to push the data to the EU's InfluxDB. This way, we do not have to establish a dedicated resource for this and can use what the EU already has.

These are some of the things that came to my mind. There may be more once we move forward.

sebastian-luna-valero commented 3 months ago

Thanks both!

Regarding point 4) above, I was not expecting to rely on destinations to push stats back to a central site but instead to run the big SQL query in Galaxy (the frequency TBD), dump the output in the central InfluxDB for the TPV-API to query it.

What do you think?

sebastian-luna-valero commented 3 months ago

@abdulrahmanazab @lcodo what's your opinion on Sanjay's plan of action drafted above?

sanjaysrikakulam commented 3 months ago

Thanks both!

Regarding point 4) above, I was not expecting to rely on destinations to push stats back to a central site but instead to run the big SQL query in Galaxy (the frequency TBD), dump the output in the central InfluxDB for the TPV-API to query it.

What do you think?

If @abdulrahmanazab still wants to have real-time stats about the cluster and other things about the remote destination for his algorithms, then the big SQL query might not be sufficient.

sebastian-luna-valero commented 3 months ago

I would put in place a couple of meta-scheduling algorithms with the stats provided by Galaxy first; and then aftewards work on the real-time stats; but this is just my opinion. I would like to know @abdulrahmanazab 's thoughts on this.

abdulrahmanazab commented 2 months ago

TPV checks for job X whether Destination D has at least one machine that has the capacity to host the job (CPU etc. wise)

Static information of the job (available in TPV):

Dynamic information about the job (frequency: daily):

Dynamic information about the destination (frequency: 30 mins):

pauldg commented 2 months ago

Plan of action:

  1. Identify the exact metrics we would need
  2. Create an appropriate SQL query with all the necessary attributes (tool-by-tool basis, destination basis, etc.).

I've added the query (with influx compatible output) to get statistics grouped by destinations and tools in gxadmin: https://github.com/galaxyproject/gxadmin/pull/155 eg: gxadmin iquery destination-queue-run-time --seconds --older-than=90

destination-queue-run-time,destination_id=condor_tpv,tool_id=api_test count=10,avg_queue=43.4000000000000000,min_queue=41,median_queue=42.5,perc_95_queue=48.199999999999996,perc_99_queue=49.64,max_queue=50,avg_run=9.5000000000000000,min_run=6,median_run=6.5,perc_95_run=22.49999999999999,perc_99_run=26.1,max_run=27
destination-queue-run-time,destination_id=condor_tpv,tool_id=Show\ beginning1 count=4,avg_queue=42.0000000000000000,min_queue=42,median_queue=42,perc_95_queue=42,perc_99_queue=42,max_queue=42,avg_run=15.5000000000000000,min_run=12,median_run=14,perc_95_run=21.099999999999998,perc_99_run=21.82,max_run=22
destination-queue-run-time,destination_id=pulsar_be_tpv,tool_id=api_test count=31,avg_queue=3.6451612903225806,min_queue=1,median_queue=1,perc_95_queue=15.5,perc_99_queue=19.499999999999996,max_queue=21,avg_run=12.7096774193548387,min_run=7,median_run=8,perc_95_run=38.5,perc_99_run=47.4,max_run=48
  1. Design the Influxdb measurement such that there won't be much computation required when TPV-API is querying, so TPV will not spend more than a few milliseconds in fetching the necessary data

Is the measurement okay as is? I think the main way to make this fast depends on how often telegraf collects these stats. Additionally, if we know the amount of combinations of destinations and tools, we could select the top X rows from the measurement, this would be faster than a where clause to select a specific tool or destination and timestamp on the whole database, I think.

I've already started collecting these statistics in influxdb on my test instance.

  1. We need a plan for configuring the remote/Pulsar destinations to ship the data to the InfluxDB. The EU could bake the Pulsar images with the required credentials and scripts to push the data to the EU's InfluxDB. This way, we do not have to establish a dedicated resource for this and can use what the EU already has.

I've also deployed monitoring scripts on my pulsar and started collecting these measurements in the same central influxdb on my test instance. Here I'm still thinking about how we identify different destinations. I think the easiest way to get this deployed on all pulsar destinations would be through https://github.com/usegalaxy-eu/pulsar-deployment . The question is then where the central influxdb should be deployed.

pauldg commented 2 months ago

Dynamic information about the destination (frequency: 30 mins):

  • Current, median queue size
  • Current, median number of jobs running (this can give an indication on how the destination is popular in terms of usage)

These two are supposed to be the current queue size and number of jobs right? @abdulrahmanazab

abdulrahmanazab commented 2 months ago

Action point: provide a written/graphical version of the algorithm