SrinivasMushnoori / repex

An implementation of the RepEx package as an application written in the EnTK API
MIT License
2 stars 2 forks source link

Sliding Window appears to be generating incorrect exchange lists #32

Closed SrinivasMushnoori closed 5 years ago

SrinivasMushnoori commented 5 years ago

Replica Lists after first replica completes MD:

waitlist is: 
1
sorted waitlist is: 
1
1
exchange list is: 
1
exchange size is  4  and exchange list length is  1

sorted_waitlistis twice as long as it should be and has duplicate entries. This points to an issue around line 194. A fix could look like:

self._sorted_waitlist = list()
self._sorted_waitlist = sorted(self._waitlist, key=lambda x: x.rid) 

Attempting fix.

SrinivasMushnoori commented 5 years ago

This happens further down the line as well:

waitlist is: 
1
3
0
sorted waitlist is: 
0
1
3
3
4
exchange list is: 
1
3
3
3
exchange size is  4  and exchange list length is  4

What this shows is that the incorrect sorted list may be artificially inflating the exchange list.

SrinivasMushnoori commented 5 years ago

Fix applied, but not fully fixed.

waitlist is: 
1
3
0
end of waitlist
sorted waitlist is: 
0
1
3
end of sorted waitlist
exchange list is: 
1
3
3
3
end of exchange list
exchange size is  4  and exchange list length is  4

Somehow it's always replica 3 that's repeated. Investigating.

SrinivasMushnoori commented 5 years ago

After this commit sliding_window seems to be triggering multiple times per cycle.

waitlist is: 
1
3
0
2
end of waitlist
sorted waitlist is: 
0
1
2
3
end of sorted waitlist
rid_list in sliding window is:  [0, 1, 2]
Exchange list generated by sliding window is:  [1, 3, 1, 3, 2]
rid_list in sliding window is:  [1, 2, 3]
Exchange list generated by sliding window is:  [1, 3, 1, 3, 2, 3]
exchange list is: 
1
3
1
3
2
3
end of exchange list
exchange size is  4  and exchange list length is  6

Investigating.

SrinivasMushnoori commented 5 years ago

Suspicion: see here

In these lines, it looks like the replica that is at the END of the sorted waitlist is appended. Therefore, if the sliding_window is triggered twice because of two replicas finishing MD, the sorted waitlists at the end of both MD phases may have the same last element.

Fix could look like:

if len(rid_list) < exchange_size:
    self._exchange_list.append(new_replica) ##New_replica is the replica that joined the wait list last
SrinivasMushnoori commented 5 years ago

An odd set of observations:

waitlist is:  [3, 2, 1, 5, 0, 4]
sorted waitlist is:  [0, 1, 2, 3, 4, 5]
rid_list in sliding window is:  [0, 1, 2, 3, 4, 5]
The new replica is  4
Exchange list generated by sliding window is:  [3, 2, 1, 5, 0]
exchange list returned by sliding window is:  [3, 2, 1, 5, 0]
exchange size is  6  and exchange list length is  5
replica  0  should suspend now

Above, the new replica (replica 4) is not added to the exchange list despite

if len(rid_list) < exchange_size:
    self._exchange_list.append(new_replica[0])

where exchange_size is 6.

rid_list should probably be self._exchange_list since that's the ultimate list we need to compare against.

SrinivasMushnoori commented 5 years ago

Issue found to be because of incorrect exchange list is being pointed to.

SrinivasMushnoori commented 5 years ago
2019-05-10 13:31:53,804: radical.entk.wfprocessor.0001: wfprocessor                     : dequeue-thread : ERROR   : Execution failed in post_exec of stage stage.0002
Traceback (most recent call last):
  File "/home/scm177/VirtualEnvs/Env_RepEx/local/lib/python2.7/site-packages/radical/entk/appman/wfprocessor.py", line 390, in _dequeue
    resumed_pipe_uids = stage.post_exec()
  File "async_sliding_window.py", line 443, in _after_md
    self._check_ex(self)
  File "async_sliding_window.py", line 206, in _check_exchange
    self._exchange_list = self._sliding_window(self._sorted_waitlist, self._exchange_size, self._window_size)
  File "async_sliding_window.py", line 290, in _sliding_window
    if len(self._exchange_list) < exchange_size:     #### "exchange list" appears to not exist.
TypeError: object of type 'NoneType' has no len()
2019-05-10 13:31:53,806: radical.entk.wfprocessor.0001: wfprocessor                     : dequeue-thread : ERROR   : Unable to receive message from completed queue: object 
of type 'NoneType' has no len()
2019-05-10 13:31:53,806: radical.entk.wfprocessor.0001: wfprocessor                     : dequeue-thread : ERROR   : Error in dequeue-thread: object of type 'NoneType' has 
no len()
Traceback (most recent call last):
  File "/home/scm177/VirtualEnvs/Env_RepEx/local/lib/python2.7/site-packages/radical/entk/appman/wfprocessor.py", line 390, in _dequeue
    resumed_pipe_uids = stage.post_exec()
  File "async_sliding_window.py", line 443, in _after_md
    self._check_ex(self)
  File "async_sliding_window.py", line 206, in _check_exchange
    self._exchange_list = self._sliding_window(self._sorted_waitlist, self._exchange_size, self._window_size)
  File "async_sliding_window.py", line 290, in _sliding_window
    if len(self._exchange_list) < exchange_size:     #### "exchange list" appears to not exist.
TypeError: object of type 'NoneType' has no len()
SrinivasMushnoori commented 5 years ago

in line 273-274 the exceptis triggered because the list self._exchange_list does not appear to exist. However, line 74 disagrees....it defines an empty list when the exchange object is instantiated. So if nothing else, len(self._exchange_list) should return 0

SrinivasMushnoori commented 5 years ago

Update:

2019-05-10 13:51:27,937: radical.entk.wfprocessor.0001: wfprocessor                     : dequeue-thread : ERROR   : Execution failed in post_exec of stage stage.0002
Traceback (most recent call last):
  File "/home/scm177/VirtualEnvs/Env_RepEx/local/lib/python2.7/site-packages/radical/entk/appman/wfprocessor.py", line 390, in _dequeue
    resumed_pipe_uids = stage.post_exec()
  File "async_sliding_window.py", line 443, in _after_md
    self._check_ex(self)
  File "async_sliding_window.py", line 216, in _check_exchange
    replica.suspend()
  File "/home/scm177/VirtualEnvs/Env_RepEx/local/lib/python2.7/site-packages/radical/entk/pipeline/pipeline.py", line 255, in suspend
    'suspend() called on Pipeline %s that is already suspended' % self._uid)
EnTKError: suspend() called on Pipeline pipeline.0001 that is already suspended
2019-05-10 13:51:27,938: radical.entk.wfprocessor.0001: wfprocessor                     : dequeue-thread : ERROR   : Unable to receive message from completed queue: suspend
() called on Pipeline pipeline.0001 that is already suspended
2019-05-10 13:51:27,939: radical.entk.wfprocessor.0001: wfprocessor                     : dequeue-thread : ERROR   : Error in dequeue-thread: suspend() called on Pipeline p
ipeline.0001 that is already suspended
Traceback (most recent call last):
  File "/home/scm177/VirtualEnvs/Env_RepEx/local/lib/python2.7/site-packages/radical/entk/appman/wfprocessor.py", line 390, in _dequeue
    resumed_pipe_uids = stage.post_exec()
  File "async_sliding_window.py", line 443, in _after_md
    self._check_ex(self)
  File "async_sliding_window.py", line 216, in _check_exchange
    replica.suspend()
  File "/home/scm177/VirtualEnvs/Env_RepEx/local/lib/python2.7/site-packages/radical/entk/pipeline/pipeline.py", line 255, in suspend
    'suspend() called on Pipeline %s that is already suspended' % self._uid)
EnTKError: suspend() called on Pipeline pipeline.0001 that is already suspended

This error shows up right after:

waitlist is:  [5, 3]
sorted waitlist is:  [3, 5]
rid_list in sliding window is:  [3, 3]
The new replica is  new_replica seems to not exist or something
Hit Ctrl+C in 10 seconds to terminate
rid_list in sliding window is:  [5, 5]
The new replica is  new_replica seems to not exist or something
Hit Ctrl+C in 10 seconds to terminate
replica  5  should suspend now

Clearly the same replica is being pushed to the waitlist multiple times. Investigating.

SrinivasMushnoori commented 5 years ago

After 1 replica finishes MD:

waitlist is:  [4]
sorted waitlist is:  [4]
rid_list in sliding window is:  [4]
The new replica is  4
replica  4  should suspend now

After 2 finish:

waitlist is:  [4, 0]
sorted waitlist is:  [0, 4]
rid_list in sliding window is:  [0, 0]
The new replica is  0
rid_list in sliding window is:  [4, 4]
The new replica is  4
replica  4  should suspend now
replica  4  is already suspended, moving on

Issue seems to be around Line 243 (sliding window)

EDIT: Have we fought and fixed this before? sliding_window seems to be getting triggered more than once!

SrinivasMushnoori commented 5 years ago

Issue still persists:

After first MD completion:

waitlist is:  [4]
sorted waitlist is:  [4]
rid_list in sliding window is:  [4]
The new replica is  4
Exchange list generated by sliding window is:  [4]
replica  4  should suspend now

After two MD completions:

waitlist is:  [4, 0]
sorted waitlist is:  [0, 4]
rid_list in sliding window is:  [0, 0]
The new replica is  0
Exchange list generated by sliding window is:  [0]
rid_list in sliding window is:  [4, 4]
The new replica is  4
Exchange list generated by sliding window is:  [4]
replica  4  should suspend now
replica  4  is already suspended, moving on

Issue seems to stem from the line

rid_list =  [replica for r in sorted_waitlist 
            if (replica.rid >= rid_start and replica.rid < rid_end)]

i.e., in the sliding window function, the above line is ALREADY in a loop. So it only considers the current replica, and populates rid_list with r copies of the current replica, where r is the number of replicas in the wait list.

SrinivasMushnoori commented 5 years ago

Partial changes made to that line (Line 273).

SrinivasMushnoori commented 5 years ago

This has finally been fixed. Keeping ticket open until commit/merge.