Open humbleOldSage opened 10 months ago
Spent week and a half to figure out the pipeline flow (including the trip segmentation flow) from
b. discussion with @mukuFlash03 where he gave Issue https://github.com/e-mission/e-mission-docs/issues/950 as a good way to understand the flow of prediction pipeline.
pipeline begins from intake_stage.py
file run_intake_pipeline_for_user
function which shows the various stage calls as well.
We'll currently focus on the eaist.segment_current_trips
stage ( at emission/analysis/intake/segmentation/trip_segmentation.py
).
The very first point of improvement I could find is related to the negation of out of order data :
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
.
.
.
.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)
where invalidate_raw_entry
is as below :
def invalidate_raw_entry(self, obj_id):
self.timeseries_db.update_one({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})
This is done one ID at a time, which can be inefficient, especially with large list. We can improve this as below :
Create list of update operations . somethign like below
update_operations = [
UpdateOne({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})
for obj_id in out_of_order_id_list
]
and then use bulk_write
method to execute all updates at once.
if update_operations:
self.timeseries_db.bulk_write(update_operations)
To handle too big of a list, we can make batches and then create bulk operations on those batches.
To test the above, I flipped the loc_df
dataframe as
loc_df=loc_df[::-1]
thus allowing us to test the out_of_order part of the code.
Another thing I tried was experimenting with the scale of data by appending the same dataframe to the end of itself as :
for _ in range(n_fold):
loc_df=pd.concat([loc_df,loc_df],ignore_index=True)
where n_fold means the no_of_times you want to upscale the dataframe. After this I reversed the dataframe as above.
Results were as below :
Trips considered as out_of_order -> | 327 | 327*5 |
---|---|---|
Time taken by Old implementation | 0.363 | 12.101 |
Time taken by New Implementation | 0.037 | 0.777 |
I did try the batch method( with bulk_write ) as well, using different batch sizes ( 500,1000,2000) but there no improvement as compared to bulk_write.
there's two other dependency for invalidate_raw_entry
that we'll have to handle after this update:
dwell_segmentaion_dist_filter
abstract_timeseries.py
grep -rl invalidate_raw_entry | grep -v __pycache__
./emission/analysis/intake/segmentation/trip_segmentation.py
./emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py
./emission/storage/timeseries/abstract_timeseries.py
./emission/storage/timeseries/builtin_timeseries.py
For example, in the issue here: https://github.com/e-mission/e-mission-docs/issues/950 we did not start by looking at the code and determining what incremental improvement we should do. Instead, we looked at the logs and determined, through timestamps, where the time was being spent, and then determined why and then determined how to fix it.
Looking into the Android run time for now.
It took ~6.8s to run the entire segment_current_trips
from testSegmentationWrapperAndroid
from TestTripSegmentation.py
, out of which ~6.775s is to run the segment_into_trips
from dwell_segmentation_filter.py
.
Out of this, almost all the time ( ~ 6.774) is to run the for loop
(335 iterations)
.
.
for idx, row in filtered_points_df.iterrows():
.
.
Inside the loop, for each iteration run it took
~0.0043s to run from beginning of the loop :
currPoint = ad.AttrDict(row)
.
.
.
till
.
.
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)
AND another ~ 0.011 from
.
.
.
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df):
till rest of the loops,i.e., till
else:
prevPoint = currPoint
Meaning that the second part of the loop is currently taking the most time.
In this part, the has_trip_ended
is the almost the major contributor, inside which
is_tracking_restarted_in_range
took ~0.005s and get_ongoing_m option_in_range
took ~0.004s ( This is still inside the for look).
To improve is_tracking_restarted_in_range
:
There was this db call
tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)
which was run in every iteration. Instead , we can place this upstream outside the for loop
in segment_into_trips
( dwell_segmentation_time_filter.py
) and get all the trips for user in a df beforerhand. However, there's call to is_tracking_restarted_in_range
from further upstream in segment_current_trips
->create_places_and_trips
-> found_untracked_period
->_is_tracking_restarted
(trip_Segmentation.py
) and so, we'll make db calls from segment_current_trips
Once we have the df, since all the rows are sorted by ts
, we can do a log(n) search on start index and end index in every iteration as below .
transition_df_start_idx=transition_df.ts.searchsorted(start_ts,side='left')
transition_df_end_idx=transition_df.ts.searchsorted(end_ts,side='right')
transition_df_for_current=transition_df.iloc[transition_df_start_idx:transition_df_end_idx]
Similar changes in get_ongoing_motion_in_range
where I moved
tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
endTs = end_ts)
motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))
upstream to extract in a df and replace by :
motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left')
motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right')
motion_list=motion_df.iloc[motion_df_start_idx:motion_df_end_idx]
With these changes the runtime can down to ~0.0001 (from .005) for is_tracking_restarted_in_range
and ~0.0001
(from ~0.004) for get_ongoing_motion_in_range
.
So all in all the second part of the for loop that took ~.011s came down to ~0.0006 s.
In total , the segment_current_trips
runtime reduced to ~2.12.
https://github.com/e-mission/e-mission-server/pull/953/commits/1d1b31f1f4160af775479b8646a0c9726ca98771 handles these changes. This was bound to fail.
And with these changes inside for
loop, the first part now takes longer to run ( 0.0043 per iteration as mentioned above as well ).
I tried some pandas improvements which might be overkill ( in which case we can roll this back) which reduced overall runtime from ~2.12 to ~1.5.
For this , started by adding Vectorised implementation in calDistance
(emission\core\common.py
) :
def calDistance(point1, point2, coordinates=False):
.
.
rest of the code
.
.
.
if isinstance(point1,np.ndarray) and isinstance(point2,np.ndarray):
dLat = np.radians(point1[:,1]-point2[:,1])
dLon = np.radians(point1[:,0]-point2[:,0])
lat1 = np.radians(point1[:,1])
lat2 = np.radians(point2[:,1])
a = (np.sin(dLat/2) ** 2) + ((np.sin(dLon/2) ** 2) * np.cos(lat1) * np.cos(lat2))
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
d = earthRadius * c
return d
.
.
.
rest of the implementation
and in segment_into_trips
, the calculations for last5MinTimesand others are vectorized.
For the iOS wrapper, the necessary changes to support the generalized flow of trip and that brought the iOS runtime to 0.3s.
However, when running the combinedWrapper (iOS+Android), the loc_df
db call
loc_df = ts.get_data_df("background/filtered_location", time_query)
isn't work not when placed upstream in trip_segmentation.py
and passed to dwell_segmentation_dist_filter.py
and dwell_segmentation_dist_filter.py
. It only seems to work correctly when placed separately from dwell_segmentation_dist_filter.py
and dwell_segmentation_dist_filter.py
.
For now, on dummy data, this additional call is increasing android runtime from 1.5 to ~1.6 and iOS from 0.3 to 0.4.
Lets try to figure out why upstream call isn't working
Also, current CombinedWrapper runtime is 2.1s
On the side, after yesterday's PR, I was expecting all the tests to pass.
Figured from the logs that testJackUntrackedTimeMar12
test from /Users/ssaini/Documents/GitHub/e-mission-server/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py
was failing after get_ongoing_motion_in_range
was shifted from list-based to df-based in restart_checking.py
. The dataframe implementation was unable to handle an empty df case. This was fixed accordingly.
Indwell_segmentation_time_filter.py
, for evaluating last10PointsDistance
and last5MinsDistances
, the calculations are now vectored . To support vectorised calculations , calDistance
in common.py
now has numpy related changes.
This brought the runtime for this entire loop
for idx, row in filtered_points_df.iterrows():
from ~ 2s to 1.2s over 327 iterations.
I think we should revisit this from first principles and then validate that what Satyam came up aligns with what you find. I would like to handle this in a principled fashion, by:
we can instrument in two ways:
emission/pipeline/intake_stage.py
)Pro of the logs: it is quick and easy to see as you are making changes Pro of the stats: you can see the behavior in production without having to pull logs from cloudwatch
I would suggest additional logs initially so you can fix it and potentially generate some stats around improvements on pulled data and do lots of analysis
and then we deploy to production and then we can run analyses (before and after) on multiple deployments of various sizes
Aim : To improve the performance of the trip segmentation stage of the pipeline by reducing the number of DB calls and performing more in-memory operations (potentially using pandas).