Elucidation / mapf-multiagent-robot-planning

Multi-Agent PathFinding (MAPF) for 2D Robots moving inventory on a grid - Practice building environment + robots + planning + inventory management etc.
MIT License
11 stars 3 forks source link

write selective robot updates to redis #106

Closed Elucidation closed 1 year ago

Elucidation commented 1 year ago

every world_sim / RA update cycle every key and value is getting written even if only a few are being changed.

This isn't a big deal for ~50 robots, but for ~500 robots this gets very slow.

def update_robots(self, robots: List[Robot], pipeline: 'redis.Pipeline' = None):
        # Execute update if pipeline not defined, else pass to pipeline.
        _pipeline = self.redis.pipeline() if pipeline is None else pipeline
        for robot in robots:
            robot_key = f'robot:{robot.robot_id}'
            _pipeline.hset(robot_key, mapping=robot.json_data())
            # Add robot to busy/free based on state
            if robot.state == RobotStatus.AVAILABLE:
                _pipeline.sadd('robots:free', robot_key)
                _pipeline.srem('robots:busy', robot_key)
            elif robot.state == RobotStatus.IN_PROGRESS:
                _pipeline.sadd('robots:busy', robot_key)
                _pipeline.srem('robots:free', robot_key)
        if pipeline is None:
Elucidation commented 1 year ago

Some profiling with line_profiler of the update

Timer unit: 1e-07 s

Total time: 1.92166 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: update at line 335

Line #      Hits         Time  Per Hit   % Time  Line Contents
   335                                               @profile
   336                                               def update(self, robots=None, time_read=None):
   337                                                   """Process jobs and assign robots tasks.
   338                                                      If update takes longer than a threshold, the step is skipped and redis is not updated."""
   339        32        529.0     16.5      0.0          t_start = time.perf_counter() if time_read is None else time_read
   340        32      34058.0   1064.3      0.2          self.logger.debug('update start')
   342                                                   # 1 - Update to latest robots from WDB if not passed in
   343        32        338.0     10.6      0.0          t_load_robots = time.perf_counter()
   344        32      38685.0   1208.9      0.2          self.robots = self.wdb.get_robots() if robots is None else robots
   346        32        446.0     13.9      0.0          def update_too_long():
   347                                                       """Check if time since t_start > MAX_UPDATE_TIME_SEC"""
   348                                                       return (time.perf_counter() - t_start) > MAX_UPDATE_TIME_SEC
   349        32        810.0     25.3      0.0          if update_too_long():
   350                                                       self.logger.warning('update started too late at %.2f ms > %.2f ms threshold, skipping',
   351                                                                           (time.perf_counter() - t_start)*1000, MAX_UPDATE_TIME_SEC * 1000)
   352                                                       return
   353        32        547.0     17.1      0.0          t_load_robots = (time.perf_counter() - t_load_robots)*1000
   355                                                   # 2 - Check and update any jobs
   356        32        154.0      4.8      0.0          t_update_jobs = time.perf_counter()
   357        32       1343.0     42.0      0.0          job_keys = list(self.jobs)
   358                                                   # TODO : Replace this with round-robin
   359        32     114046.0   3563.9      0.6          shuffled_job_keys = random.sample(job_keys, len(job_keys))
   361                                                   # Get the dynamic obstacles for this timestep
   362        32    1055815.0  32994.2      5.5          self.latest_dynamic_obstacles = self.get_all_current_dynamic_obstacles()
   363                                                   # Only process jobs for up to MAX_TIME_CHECK_JOB_SEC locally and MAX_UPDATE_TIME_SEC total
   364        32        223.0      7.0      0.0          jobs_processed = 0
   365        32        131.0      4.1      0.0          processed_jobs: list[Job] = []
   366      2713       9345.0      3.4      0.0          for job_key in shuffled_job_keys:
   367                                                       # Create a copy of the job to be changed
   368      2713     333300.0    122.9      1.7              job = self.jobs[job_key].copy()
   369      2713   11755883.0   4333.2     61.2              self.check_and_update_job(job)
   370      2713      10346.0      3.8      0.1              jobs_processed += 1
   371      2713      15343.0      5.7      0.1              processed_jobs.append(job)
   372      2706      45174.0     16.7      0.2              if ((time.perf_counter() - t_start) > MAX_TIME_CHECK_JOB_SEC) or update_too_long():
   373         7         40.0      5.7      0.0                  break
   374        32        292.0      9.1      0.0          t_update_jobs = (time.perf_counter() - t_update_jobs)*1000
   376                                                   # 3 - Now check for any available robots and tasks for
   377                                                   # up to MAX_TIME_ASSIGN_JOB_SEC locally and MAX_UPDATE_TIME_SEC total
   378        32        116.0      3.6      0.0          t_assign = time.perf_counter()
   379        32        129.0      4.0      0.0          new_jobs: list[Job] = []
   380                                                   # Get available robots
   381        32        114.0      3.6      0.0          robots_assigned = 0
   382        32      11731.0    366.6      0.1          available_robots = self.get_available_robots()
   383        32        250.0      7.8      0.0          available_robots_count = len(available_robots)
   384                                                   # Get available new tasks
   385        32     410505.0  12828.3      2.1          all_new_tasks_count = self.redis_db.llen('tasks:new')
   386        32     328049.0  10251.5      1.7          new_tasks = self.redis_db.lrange(
   387        32        131.0      4.1      0.0              'tasks:new', 0, available_robots_count)
   388        32        353.0     11.0      0.0          new_tasks_count = len(new_tasks)
   389                                                   # For each available pair of robot and tasks
   390        82        931.0     11.4      0.0          for idx in range(min(available_robots_count, new_tasks_count)):
   391        82       1214.0     14.8      0.0              if (time.perf_counter() - t_assign) > MAX_TIME_ASSIGN_JOB_SEC:
   392                                                           break
   393        82       1464.0     17.9      0.0              if update_too_long():
   394                                                           break
   395                                                       # Assign new task to available robot, creating a new job
   396        82        389.0      4.7      0.0              task_key = new_tasks[idx]
   397        82        379.0      4.6      0.0              robot = available_robots[idx]
   398        82     315750.0   3850.6      1.6              new_job = self.assign_task_to_robot(task_key, robot)
   399        82        907.0     11.1      0.0              new_jobs.append(new_job)
   400        82        454.0      5.5      0.0              robots_assigned += 1
   401        32        517.0     16.2      0.0          t_assign = (time.perf_counter() - t_assign)*1000
   403                                                   # Revert changes if at this point update took too long
   404                                                   # Expectation: No redis writes were done up to this point.
   405        32        631.0     19.7      0.0          if update_too_long():
   406                                                       update_duration_ms = (time.perf_counter() - t_start)*1000
   407                                                       self.logger.error(
   408                                                           f'update end, reverted due to over threshold, '
   409                                                           f'took {update_duration_ms:.3f} ms > {MAX_UPDATE_TIME_SEC*1000} ms threshold, '
   410                                                           f'reverting processed {jobs_processed}/{len(shuffled_job_keys)} jobs, '
   411                                                           f'reverting assigned {robots_assigned}/{available_robots_count} available robots '
   412                                                           f'to {new_tasks_count}/{all_new_tasks_count} available tasks')
   413                                                       return
   415                                                   # 4 - Batch update robots, jobs, tasks now
   416        32        189.0      5.9      0.0          t_update_all = time.perf_counter()
   417        32       4165.0    130.2      0.0          pipeline = self.redis_db.pipeline()
   418        32    1832775.0  57274.2      9.5          self.wdb.update_robots(self.robots, pipeline=pipeline)
   420                                                   # replace stored jobs that were processed with the chaanged ones
   421      2713       9897.0      3.6      0.1          for job in processed_jobs:
   422                                                       # Either replace job in progress, or pop completed ones
   423      2678      14118.0      5.3      0.1              if job.state == JobState.COMPLETE:
   424                                                           # Remove allocation and job on completion
   425        35        207.0      5.9      0.0                  self.allocations[job.robot_id] = None
   426        35        585.0     16.7      0.0                  self.jobs.pop(job.job_id, None)
   427        35        114.0      3.3      0.0                  continue
   429      2678      12293.0      4.6      0.1              if job.state == JobState.ITEM_DROPPED:
   430                                                           # Notify task complete (Order Proc adds item to station)
   432                                                           pipeline.lpush('tasks:processed', job.task_key)
   433                                                           self.logger.info(f'Task {job.task_key} complete, '
   434                                                                            f'Robot {job.robot_id} successfully dropped item')
   436      2678      23726.0      8.9      0.1              self.jobs[job.job_id] = job
   437      2678      11598.0      4.3      0.1              self.allocations[job.robot_id] = job.job_id
   439                                                   # For newly created jobs, track them and make their tasks inprogress in redis
   440        82        305.0      3.7      0.0          for job in new_jobs:
   441        82        408.0      5.0      0.0              self.jobs[job.job_id] = job  # Track job
   442                                                   # Move task keys associated with new jobs from new -> inprogress
   443        22        121.0      5.5      0.0          if new_jobs:
   444        22        753.0     34.2      0.0              task_keys = [job.task_key for job in new_jobs]
   445        22       1408.0     64.0      0.0              pipeline.lpop('tasks:new', len(task_keys))
   446                                                       # Set tasks in progress
   447        22        952.0     43.3      0.0              pipeline.sadd('tasks:inprogress', *task_keys)
   449                                                   # Execute transactions on redis
   450        32    2648613.0  82769.2     13.8          pipeline.execute()
   451        32        993.0     31.0      0.0          t_update_all = (time.perf_counter() - t_update_all)*1000
   453        32        260.0      8.1      0.0          update_duration_ms = (time.perf_counter() - t_start)*1000
   454        32     159417.0   4981.8      0.8          self.logger.info(
   455       128       1156.0      9.0      0.0              f'update end, took {update_duration_ms:.3f} ms, '
   456        64        490.0      7.7      0.0              f'processed {jobs_processed}/{len(shuffled_job_keys)} jobs, '
   457        64        210.0      3.3      0.0              f'assigned {robots_assigned}/{available_robots_count} available robots '
   458        64        207.0      3.2      0.0              f'to {new_tasks_count}/{all_new_tasks_count} available tasks '
   459       128        751.0      5.9      0.0              f'[{t_load_robots:.3f}, {t_update_jobs:.3f}, {t_assign:.3f}, {t_update_all:.3f}] ms')
Elucidation commented 1 year ago

While check and update jobs is the primary driver for now, 418 32 1832775.0 57274.2 9.5 self.wdb.update_robots(self.robots, pipeline=pipeline) still eats up 9.5% of the time, and since its pipelined, that's not redis update time, just building the json data and queuing the transactions.

Elucidation commented 1 year ago

Profiled most of robot allocator in https://gist.github.com/Elucidation/739b2e3b29b74d2155ea716256730f0f

Elucidation commented 1 year ago

Some of the interesting lines

Line Number Hits Total Time (ms) Per Hit (ms) % Time Line Contents
377 182 924110.0 5077.5 8.4 self.latest_dynamic_obstacles = self.get_all_current_dynamic_obstacles()
383 17573 246275.3 14.0 2.2 job = self.jobs[job_key].copy()
384 17573 6016957.7 342.4 54.5 self.check_and_update_job(job)
400 182 257945.9 1417.3 2.3 all_new_tasks_count = self.redis_db.llen('tasks:new')
401 182 195363.8 1073.4 1.8 new_tasks = self.redis_db.lrange('tasks:new', 0, available_robots_count)
413 192 97182.1 506.2 0.9 new_job = self.assign_task_to_robot(task_key, robot)
433 182 1165658.0 6404.7 10.6 self.wdb.update_robots(self.robots, pipeline=pipeline)
448 111 52497.6 473.0 0.5 self.logger.info(f'Task {job.task_key} complete, ' 'Robot {job.robot_id} successfully dropped item')
465 182 1633635.4 8976.0 14.8 pipeline.execute()
469 182 101979.4 560.3 0.9 self.logger.info(f'update end, took {update_duration_ms:.3f} ms, ' 'processed {jobs_processed}/{len(shuffled_job_keys)} jobs, ' 'assigned {robots_assigned}/{available_robots_count} available robots ' 'to {new_tasks_count}/{all_new_tasks_count} available tasks ' '[{t_load_robots:.3f}, {t_update_jobs:.3f}, {t_assign:.3f}, {t_update_all:.3f}] ms')
