mesos / storm

Storm on Mesos!
Apache License 2.0
139 stars 66 forks source link

Selfless Offer Handling #200

Closed JessicaLHartog closed 7 years ago

JessicaLHartog commented 7 years ago

In order to not keep resources from Mesos that we can't use, we've added logic here to:

This PR also addresses the following issues:

JessicaLHartog commented 7 years ago

Sample of the important lines of old scheduling behavior from the nimbus.log file (... added where lines are removed):

2017-06-08T23:52:24.903+0000 b.s.d.nimbus [INFO] Activating sample-topology: sample-topology-83-1496965944
2017-06-08T23:52:25.675+0000 s.m.MesosNimbus [INFO] resourceOffers: Recording offer: {"offer_id":"7e316567-c185-4814-8395-f8a3bbcc8568-O23331760","hostname":"worker15.colo","cpus(*)":"25.5","disk(*)":"1872546.0","mem(*)":"84337.0","ports(*)":"[31002,31005-31009,31011-32000]"}
{...}
2017-06-08T23:52:25.677+0000 s.m.MesosNimbus [INFO] resourceOffers: After processing offers, now have 20 offers buffered: [
{...}
{"offer_id":"7e316567-c185-4814-8395-f8a3bbcc8568-O23331760","hostname":"worker15.colo","cpus(*)":"25.5","disk(*)":"1872546.0","mem(*)":"84337.0","ports(*)":"[31002,31005-31009,31011-32000]"},
{...}
2017-06-08T23:52:26.466+0000 s.m.s.DefaultScheduler [INFO] Topologies that need assignments: #{"sample-topology-83-1496965944"}
{...}
2017-06-08T23:52:26.467+0000 s.m.u.MesosCommon [INFO] Available resources at worker15.colo: cpus: 25.5 (dynamic: 0.0, static: 0.0, unreserved: 25.5), mem: 84337.0 (dynamic: 0.0, static: 0.0, unreserved: 84337.0), ports: [31002-31002,31005-31009,31011-32000]
2017-06-08T23:52:26.467+0000 s.m.s.DefaultScheduler [INFO] Trying to find 5 slots for sample-topology-83-1496965944
2017-06-08T23:52:26.467+0000 s.m.s.DefaultScheduler [INFO] sample-topology-83-1496965944 with requestedWorkerCpu 0.5 and requestedWorkerMem 1250 does fit onto worker19.colo with resources cpus: 3.0 (dynamic: 0.0, static: 0.0, unreserved: 3.0), mem: 58801.0 (dynamic: 0.0, static: 0.0, unreserved: 58801.0), ports: [31007-31007,31010-31010,31026-31026,31028-32000]
{...}
2017-06-08T23:52:26.468+0000 s.m.s.DefaultScheduler [INFO] Number of available slots for sample-topology-83-1496965944: 5
2017-06-08T23:52:26.468+0000 s.m.s.DefaultScheduler [INFO] allSlotsAvailableForScheduling: 5 available slots: [worker19.colo:31007, worker18.colo:31006, worker16.colo:31007, worker9.colo:31010, worker17.colo:31006]
2017-06-08T23:52:26.824+0000 s.m.s.DefaultScheduler [INFO] Scheduling the following worker slots from cluster.getAvailableSlots: [worker19.colo:31007, worker9.colo:31010, worker17.colo:31006, worker18.colo:31006, worker16.colo:31007]
2017-06-08T23:52:26.824+0000 s.m.s.DefaultScheduler [INFO] Schedule the per-topology slots: {sample-topology-83-1496965944, [worker19.colo:31007, worker9.colo:31010, worker17.colo:31006, worker18.colo:31006, worker16.colo:31007]}
2017-06-08T23:52:26.824+0000 s.m.s.DefaultScheduler [INFO] topologyId: sample-topology-83-1496965944, slotsRequested: 5, slotsAssigned: 0, slotsAvailable: 5
2017-06-08T23:52:26.824+0000 s.m.s.DefaultScheduler [INFO] schedule: Cluster assignment for sample-topology-83-1496965944. Requesting 5 slots, with 5 slots available, and 0 currently assigned. Setting new assignment (node:port, executorsPerWorkerList) as: (worker19.colo:31007, [[1, 1], [6, 6], [11, 11], [16, 16], [21, 21], [26, 26], [31, 31], [36, 36], [41, 41], [46, 46], [51, 51], [56, 56], [61, 61], [66, 66], [71, 71], [76, 76], [81, 81], [86, 86], [91, 91], [96, 96], [101, 101], [106, 106]]), (worker9.colo:31010, [[2, 2], [7, 7], [12, 12], [17, 17], [22, 22], [27, 27], [32, 32], [37, 37], [42, 42], [47, 47], [52, 52], [57, 57], [62, 62], [67, 67], [72, 72], [77, 77], [82, 82], [87, 87], [92, 92], [97, 97], [102, 102]]), (worker17.colo:31006, [[3, 3], [8, 8], [13, 13], [18, 18], [23, 23], [28, 28], [33, 33], [38, 38], [43, 43], [48, 48], [53, 53], [58, 58], [63, 63], [68, 68], [73, 73], [78, 78], [83, 83], [88, 88], [93, 93], [98, 98], [103, 103]]), (worker18.colo:31006, [[4, 4], [9, 9], [14, 14], [19, 19], [24, 24], [29, 29], [34, 34], [39, 39], [44, 44], [49, 49], [54, 54], [59, 59], [64, 64], [69, 69], [74, 74], [79, 79], [84, 84], [89, 89], [94, 94], [99, 99], [104, 104]]), (worker16.colo:31007, [[5, 5], [10, 10], [15, 15], [20, 20], [25, 25], [30, 30], [35, 35], [40, 40], [45, 45], [50, 50], [55, 55], [60, 60], [65, 65], [70, 70], [75, 75], [80, 80], [85, 85], [90, 90], [95, 95], [100, 100], [105, 105]])
2017-06-08T23:52:27.218+0000 b.s.d.nimbus [INFO] Setting new assignment for topology id sample-topology-83-1496965944: #backtype.storm.daemon.common.Assignment{:master-code-dir "/var/groupon/storm/storm_local/nimbus/stormdist/sample-topology-83-1496965944", :node->host {"worker16.colo" "worker16.colo", "worker17.colo" "worker17.colo", "worker18.colo" "worker18.colo", "worker9.colo" "worker9.colo", "worker19.colo" "worker19.colo"}, :executor->node+port {[2 2] ["worker9.colo" 31010], [34 34] ["worker18.colo" 31006], ... 
2017-06-08T23:52:27.227+0000 s.m.MesosNimbus [INFO] assignSlots: sample-topology-83-1496965944 being assigned to 5 slots (worker:port, cpu, mem) as follows: (worker17.colo:31006, 0.5, 1250.0), (worker16.colo:31007, 0.5, 1250.0), (worker18.colo:31006, 0.5, 1250.0), (worker19.colo:31007, 0.5, 1250.0), (worker9.colo:31010, 0.5, 1250.0)
2017-06-08T23:52:27.227+0000 s.m.u.MesosCommon [INFO] Available resources at worker19.colo: cpus: 3.0 (dynamic: 0.0, static: 0.0, unreserved: 3.0), mem: 58801.0 (dynamic: 0.0, static: 0.0, unreserved: 58801.0), ports: [31007-31007,31010-31010,31026-31026,31028-32000]
{...}
2017-06-08T23:52:27.230+0000 s.m.MesosNimbus [INFO] Using offerIDs: [7e316567-c185-4814-8395-f8a3bbcc8568-O23331758] on host: worker19.colo to launch tasks: [{"task_id":"worker19.colo-31007-1496965947.230","slave_id":"7e316567-c185-4814-8395-f8a3bbcc8568-S77","cpus":"0.5","mem":"1250.0","ports":"[31007]","executor_id":"sample-topology-83-1496965944"}]
{...}
2017-06-08T23:52:32.727+0000 s.m.MesosNimbus [INFO] resourceOffers: Recording offer: {"offer_id":"7e316567-c185-4814-8395-f8a3bbcc8568-O23331778","hostname":"worker18.colo","cpus(*)":"7.5","disk(*)":"1872546.0","mem(*)":"43691.0","ports(*)":"[31013,31016,31018-32000]"}
{...}
2017-06-08T23:52:32.728+0000 s.m.MesosNimbus [INFO] resourceOffers: After processing offers, now have 20 offers buffered: [
{"offer_id":"7e316567-c185-4814-8395-f8a3bbcc8568-O23331769","hostname":"worker13.colo","cpus(*)":"26.5","disk(*)":"1872546.0","mem(*)":"85193.0","ports(*)":"[31005-32000]"},
{...}
2017-06-08T23:52:33.125+0000 s.m.s.DefaultScheduler [INFO] Declining all offers that are currently buffered because no topologies need assignments
2017-06-08T23:52:33.493+0000 s.m.s.DefaultScheduler [INFO] Scheduling the following worker slots from cluster.getAvailableSlots: []

Notably, after this point offers continue to come in after the number of seconds refused offers are declined for (as part of a filter) elapses.

Sample of the important lines of new scheduling behavior from the nimbus.log file (... added where lines are removed):

2017-06-08T23:25:40.194+0000 b.s.d.nimbus [INFO] Activating sample-topology: sample-topology-1-1496964340
2017-06-08T23:25:40.469+0000 s.m.s.StormSchedulerImpl [INFO] Topologies that need assignments: #{"sample-topology-1-1496964340"}
2017-06-08T23:25:40.469+0000 s.m.s.StormSchedulerImpl [INFO] (REVIVE OFFERS) We have topologies that need assignments, but offers are currently suppressed. Reviving offers.
2017-06-08T23:25:40.486+0000 s.m.MesosNimbus [INFO] resourceOffers: Recording offer: {"offer_id":"2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365965","hostname":"worker5.colo","cpus(*)":"17.5","disk(*)":"880435.0","mem(*)":"84895.0","ports(*)":"[31000-31001,31006,31008-32000]"}
{...}
2017-06-08T23:25:40.488+0000 s.m.MesosNimbus [INFO] resourceOffers: After processing offers, now have 7 offers buffered: [
{"offer_id":"2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365965","hostname":"worker5.colo","cpus(*)":"17.5","disk(*)":"880435.0","mem(*)":"84895.0","ports(*)":"[31000-31001,31006,31008-32000]"}, ... ]

2017-06-08T23:25:40.526+0000 s.m.u.MesosCommon [INFO] Available resources at worker5.colo: cpus: 17.5 (dynamic: 0.0, static: 0.0, unreserved: 17.5), mem: 84895.0 (dynamic: 0.0, static: 0.0, unreserved: 84895.0), ports: [31000-31001,31006-31006,31008-32000]
{...}
2017-06-08T23:25:44.008+0000 s.m.s.StormSchedulerImpl [INFO] Topologies that need assignments: #{"sample-topology-1-1496964340"}
2017-06-08T23:25:44.009+0000 s.m.u.MesosCommon [INFO] Available resources at worker5.colo: cpus: 17.5 (dynamic: 0.0, static: 0.0, unreserved: 17.5), mem: 84895.0 (dynamic: 0.0, static: 0.0, unreserved: 84895.0), ports: [31000-31001,31006-31006,31008-32000]
{...}
2017-06-08T23:25:44.010+0000 s.m.s.StormSchedulerImpl [INFO] Trying to find 5 slots for sample-topology-1-1496964340
2017-06-08T23:25:44.010+0000 s.m.s.StormSchedulerImpl [INFO] sample-topology-1-1496964340 with requestedWorkerCpu 0.5 and requestedWorkerMem 1250 does fit onto worker5.colo with resources cpus: 17.5 (dynamic: 0.0, static: 0.0, unreserved: 17.5), mem: 84895.0 (dynamic: 0.0, static: 0.0, unreserved: 84895.0), ports: [31000-31001,31006-31006,31008-32000]
{...}
2017-06-08T23:25:44.042+0000 s.m.s.StormSchedulerImpl [INFO] Scheduling the following worker slots from cluster.getAvailableSlots: [worker7.colo:31010, worker5.colo:31000, worker2.colo:31004, worker6.colo:31002, worker4.colo:31010]
2017-06-08T23:25:44.042+0000 s.m.s.StormSchedulerImpl [INFO] Schedule the per-topology slots: {sample-topology-1-1496964340, [worker7.colo:31010, worker5.colo:31000, worker2.colo:31004, worker6.colo:31002, worker4.colo:31010]}
2017-06-08T23:25:44.042+0000 s.m.s.StormSchedulerImpl [INFO] topologyId: sample-topology-1-1496964340, slotsRequested: 5, slotsAssigned: 0, slotsAvailable: 5
2017-06-08T23:25:44.045+0000 s.m.s.StormSchedulerImpl [INFO] schedule: Cluster assignment for sample-topology-1-1496964340. Requesting 5 slots, with 5 slots available, and 0 currently assigned. Setting new assignment (node:port, executorsPerWorkerList) as: (worker7.colo:31010, [[1, 1], [6, 6], [11, 11], [16, 16], [21, 21], [26, 26], [31, 31], [36, 36], [41, 41], [46, 46], [51, 51], [56, 56], [61, 61], [66, 66], [71, 71], [76, 76], [81, 81], [86, 86], [91, 91], [96, 96], [101, 101], [106, 106]]), (worker5.colo:31000, [[2, 2], [7, 7], [12, 12], [17, 17], [22, 22], [27, 27], [32, 32], [37, 37], [42, 42], [47, 47], [52, 52], [57, 57], [62, 62], [67, 67], [72, 72], [77, 77], [82, 82], [87, 87], [92, 92], [97, 97], [102, 102]]), (worker2.colo:31004, [[3, 3], [8, 8], [13, 13], [18, 18], [23, 23], [28, 28], [33, 33], [38, 38], [43, 43], [48, 48], [53, 53], [58, 58], [63, 63], [68, 68], [73, 73], [78, 78], [83, 83], [88, 88], [93, 93], [98, 98], [103, 103]]), (worker6.colo:31002, [[4, 4], [9, 9], [14, 14], [19, 19], [24, 24], [29, 29], [34, 34], [39, 39], [44, 44], [49, 49], [54, 54], [59, 59], [64, 64], [69, 69], [74, 74], [79, 79], [84, 84], [89, 89], [94, 94], [99, 99], [104, 104]]), (worker4.colo:31010, [[5, 5], [10, 10], [15, 15], [20, 20], [25, 25], [30, 30], [35, 35], [40, 40], [45, 45], [50, 50], [55, 55], [60, 60], [65, 65], [70, 70], [75, 75], [80, 80], [85, 85], [90, 90], [95, 95], [100, 100], [105, 105]])
2017-06-08T23:25:44.085+0000 b.s.d.nimbus [INFO] Setting new assignment for topology id sample-topology-1-1496964340: #backtype.storm.daemon.common.Assignment{:master-code-dir "/var/groupon/storm/storm_local/nimbus/stormdist/sample-topology-1-1496964340", :node->host {"worker6.colo" "worker6.colo", "worker7.colo" "worker7.colo", "worker2.colo" "worker2.colo", "worker4.colo" "worker4.colo", "worker5.colo" "worker5.colo"}, :executor->node+port {[2 2] ["worker5.colo" 31000], [34 34] ["worker6.colo" 31002], ...}}
2017-06-08T23:25:44.091+0000 s.m.MesosNimbus [INFO] assignSlots: sample-topology-1-1496964340 being assigned to 5 slots (worker:port, cpu, mem) as follows: (worker4.colo:31010, 0.5, 1250.0), (worker7.colo:31010, 0.5, 1250.0), (worker5.colo:31000, 0.5, 1250.0), (worker2.colo:31004, 0.5, 1250.0), (worker6.colo:31002, 0.5, 1250.0)
2017-06-08T23:25:44.092+0000 s.m.u.MesosCommon [INFO] Available resources at worker5.colo: cpus: 17.5 (dynamic: 0.0, static: 0.0, unreserved: 17.5), mem: 84895.0 (dynamic: 0.0, static: 0.0, unreserved: 84895.0), ports: [31000-31001,31006-31006,31008-32000]
{...}
2017-06-08T23:25:44.098+0000 s.m.MesosNimbus [INFO] Using offerIDs: [2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365965] on host: worker5.colo to launch tasks: [{"task_id":"worker5.colo-31000-1496964344.096","slave_id":"e2054b8b-a272-499d-90a1-3aeda5e9da12-S5","cpus":"0.5","mem":"1250.0","ports":"[31000]","executor_id":"sample-topology-1-1496964340"}]
{...}
2017-06-08T23:25:44.106+0000 b.s.d.nimbus [INFO] Cleaning up sample-topology-1-1496872842
2017-06-08T23:25:50.083+0000 s.m.MesosNimbus [INFO] resourceOffers: Recording offer: {"offer_id":"2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365972","hostname":"worker6.colo","cpus(*)":"8.0","disk(*)":"880435.0","mem(*)":"82093.0","ports(*)":"[31011-31019,31023-32000]"}
{...}
2017-06-08T23:25:50.086+0000 s.m.MesosNimbus [INFO] resourceOffers: After processing offers, now have 7 offers buffered: [
{...}
{"offer_id":"2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365972","hostname":"worker6.colo","cpus(*)":"8.0","disk(*)":"880435.0","mem(*)":"82093.0","ports(*)":"[31011-31019,31023-32000]"},
{...}
2017-06-08T23:25:54.387+0000 s.m.s.StormSchedulerImpl [INFO] Declining all offers that are currently buffered because no topologies need assignments. Declined offer ids: [2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365976, 2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365969, 2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365975, 2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365968, 2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365974, 2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365972, 2f4e392f-b3a7-446a-96bd-f360e89243ed-O7365973]
2017-06-08T23:25:54.388+0000 s.m.s.StormSchedulerImpl [INFO] (SUPPRESS OFFERS) We don't have any topologies that need assignments, but offers are still flowing. Suppressing offers.

Full version with more offer details, etc., can be found here

erikdw commented 7 years ago

I haven't read through yet -- I'm tempted to run this in our staging environment before merging it in, but maybe that is being too conservative.

JessicaLHartog commented 7 years ago

@erikdw Rebased this after the latest merge. Should be good to go, LMK!

erikdw commented 7 years ago

Merged this. @JessicaLHartog can you backport this to storm-0.x branch and send another PR please?