TDAmeritrade / stumpy

STUMPY is a powerful and scalable Python library for modern time series analysis
https://stumpy.readthedocs.io/en/latest/
Other
3.61k stars 318 forks source link

Add Egress Support to STUMPI and AAMPI #210

Closed seanlaw closed 4 years ago

seanlaw commented 4 years ago

Currently, as data is streamed in, stumpi appends this data to a growing time series. It may be helpful to add an egress=True function parameter to throw away the oldest point in the time series. This will ensure that the time series is of a pre-determined size but the matrix profile values for the newest ingressed data points will only be updated relative to the current time series and exclude the egressed/removed data points in the matrix profile calculations.

This is currently handled by floss natively (i.e., floss egresses the oldest data point)

seanlaw commented 4 years ago

@hmcoservit I don't know when we'll be able to get to it but I've added it here

hmcoservit commented 4 years ago

Great, thank you @seanlaw it would be very useful

seanlaw commented 4 years ago

@hmcoservit I have been thinking about this and wanted to get your input.

Let's say we have a starting time series, T, of length n=20. First, we compute the matrix profile and, for some coincidental reason, for the window size m=5, all of the subsequences have their closest match pointing to the subsequence at i=0 (i.e., the matrix profile indices are all 0). This is fine.

Then, the next data point, t, arrives and the first point,T[0], is removed before appending t to the end of the T (so the length of T stays the same). We also remove the first value in the matrix profile/matrix profile indices that corresponds to the subsequence starting at i=0. We compute the distance profile with respect to this updated time series and new subsequence but, in this fictitious example, the distances are all larger than those found in the existing matrix profile so, aside from the final subsequence, nothing is changed. So, essentially, the matrix profile indices are still full of zeros (referencing the original full length time series) and the last/final subsequence has, say, i=5 (technically, this is i=4 relative to this window but we add 1 to account for the point that we had egressed) as its corresponding matrix profile index.

So, in this current example, it is possible that the matrix profile indices are pointing to an index that no longer exists (i.e., those time series values have already been removed/egressed from the moving time series). Obviously, as more and more points get added to the time series then the "old" matrix profile values will eventually be removed.

I don't think there is any problem with this, per se, and I just wanted to make sure that I am aligned with your mental model of how this should work.

hmcoservit commented 4 years ago

Thanks for the interesting example @seanlaw.

According to my understanding, adding Egress Support to STUMPI might make maintaining indices a bit complicated (but not necessarily). For the case of anomaly detection in real time, only the left matrix profile is important and indices may not be considered. It may remove the need to maintain historical global indices, only local indices can be maintained. For example, before a new point comes in the oldest point (subsequence) can be removed, the MP and indices can then be updated based on the current historical data available. I am entirely not sure about the limitations though.

So, in this current example, it is possible that the matrix profile indices are pointing to an index that no longer exists (i.e., those time series values have already been removed/egressed from the moving time series).

This can be normal, but what is important is that the coming new point does not point to any indices that do not exist. The most important thing would be the distance profile, or more importantly the left matrix profile (stream.leftP) where historical values do not update when future points arrive.

seanlaw commented 4 years ago

@hmcoservit This is a good discussion. I'll have to think through the best way to incorporate this.

Considering that z-normalization may not be the best for this, is this feature still a high priority for you?

hmcoservit commented 4 years ago

Thanks @seanlaw, this feature would be good to have but it would not be a priority yet. Since z-normalization is affecting the benchmark results, it would be great to focus on implementing a way to have non-normalized sub-sequences.

hmcoservit commented 4 years ago

@seanlaw After AAMPI, this feature would be a great addition.

seanlaw commented 4 years ago

@hmcoservit Sounds good. I'll try to find some time to look at this. To be consistent with floss, I feel like this "sliding window" should be the default behavior

seanlaw commented 4 years ago

@hmcoservit The more I think about it, I wonder if it is really the "right matrix profile" that you are after. With floss, it uses the "right matrix profile indices" and not the "left matrix profile indices" to compute the corrected arc curve. In the case of both aampi and stumpi, wouldn't it be "better" for the oldest data point in the streaming time series (or any data point for that matter) to have its nearest neighbor located to the right of itself? Using the "left matrix profile index" just seems wrong and is somewhat of a nightmare to account for when we need to egress the oldest data point as new data comes in.

I understand that you only care about the matrix profile value and not the index number (though, it's not entirely clear to me what the rationale is) but the two must be consistent with each other and never mismatched (i.e., the matrix profile value must be connected to the correct matrix profile index). However, I think that if you are okay with a subsequence pointing to a nearest neighbor that is already gone then we might be fine.

hmcoservit commented 4 years ago

@seanlaw Thanks for getting back at this.

It may be possible for historical points (that have not been removed yet) to point to non-existing indices I guess, but it is important that the coming new point does not point to any indices (subsequence) that does not exist. For the case of anomaly detection in real time, only the left matrix profile would be important for our use case.

For example, before a new point comes in the oldest point (subsequence) can be removed, the MP and indices can then be updated based on the current historical data available. I am again not entirely sure about the limitations though.

The most important thing would be the distance profile, or more importantly the left matrix profile (stream.leftP) where historical values do not update when future points arrive.

@hmcoservit The more I think about it, I wonder if it is really the "right matrix profile" that you are after. With floss, it uses the "right matrix profile indices" and not the "left matrix profile indices" to compute the corrected arc curve. In the case of both aampi and stumpi, wouldn't it be "better" for the oldest data point in the streaming time series (or any data point for that matter) to have its nearest neighbor located to the right of itself? Using the "left matrix profile index" just seems wrong and is somewhat of a nightmare to account for when we need to egress the oldest data point as new data comes in.

This made me think a bit, but I am not entirely sure how it can work in a real-time scenario where new points do not have nearest neighbor located to the right of themselves. Probably I am missing something here.

I may be wrong but can we really describe FLOSS as a real time algorithm? Since for regime detection we search for a minima in the matrix profile, we automatically rely on processing subsequences before and after the global/local minima occurs. Therefore, when data points are arriving in real time in FLOSS we can not answer if the current data point (or sub-sequence) is a start of a new regime for example. But with AAMPI using the left matrix profile, we can know if the current point is further than the rest (the past) and might be an anomaly.

seanlaw commented 4 years ago

It may be possible for historical points (that have not been removed yet) to point to non-existing indices I guess, but it is important that the coming new point does not point to any indices (subsequence) that does not exist. For the case of anomaly detection in real time, only the left matrix profile would be important for our use case.

Understood

For example, before a new point comes in the oldest point (subsequence) can be removed, the MP and indices can then be updated based on the current historical data available. I am again not entirely sure about the limitations though.

So, this part is the problematic part. If you update the MP indices based on the current historical data available (i.e., what is currently visible and without considering any egressed data points) then you are forced to perform a full matrix profile computation which is not good for streaming data. So, if we egress the oldest data points then current data points that point left towards that egressed data point must still keep that egressed data point as its nearest neighbor. Otherwise, updating the full matrix profile by recomputing pairwise distances would be too expensive. Currently, what we are doing is computing the distance profile between the single (newest) subsequence and the entire time series. This is reasonably cheap to compute for a single subsequence.

The most important thing would be the distance profile, or more importantly the left matrix profile (stream.leftP) where historical values do not update when future points arrive.

Cool! I agree with this. Only the global matrix profile and left matrix profile for the new subsequence should be updated.

This made me think a bit, but I am not entirely sure how it can work in a real-time scenario where new points do not have nearest neighbor located to the right of themselves. Probably I am missing something here.

I take it back. Forget my suggestion to use the right matrix profile 😄

I think I will simply try to:

  1. Egress the oldest data point at each update
  2. Only update the global matrix profile and left matrix profile (and indices) for the newest ingressed subsequebce
  3. Any subsequence that has been previously assigned an MP/left MP will not be changed
  4. This will be the default behavior

Are you in agreement with this proposal?

I may be wrong but can we really describe FLOSS as a real time algorithm? Since for regime detection we search for a minima in the matrix profile, we automatically rely on processing subsequences before and after the global/local minima occurs. Therefore, when data points are arriving in real time in FLOSS we can not answer if the current data point (or sub-sequence) is a start of a new regime for example. But with AAMPI using the left matrix profile, we can know if the current point is further than the rest (the past) and might be an anomaly.

You bring up a valid point that I had not considered. I think that there are two separate questions here:

  1. Can floss process streaming data? I would say, yes, certainly and by that answer I would consider it a real-time. Notice that I don't say "real-time algorithm" here
  2. Can the algorithm find a regime change as soon as it happens? No, it will probably struggle with this since it requires that you need to have seen enough of BOTH the left and right regimes before it can identify it. However, I don't think this is a fault of the algorithm. I'm guessing that this is likely by definition that regime change searches are a retrospective property but I'm by no means an expert here.

What do you think?

hmcoservit commented 4 years ago

Thanks again for the explanations @seanlaw

So, this part is the problematic part. If you update the MP indices based on the current historical data available (i.e., what is currently visible and without considering any egressed data points) then you are forced to perform a full matrix profile computation which is not good for streaming data. So, if we egress the oldest data points then current data points that point left towards that egressed data point must still keep that egressed data point as its nearest neighbor. Otherwise, updating the full matrix profile by recomputing pairwise distances would be too expensive. Currently, what we are doing is computing the distance profile between the single (newest) subsequence and the entire time series. This is reasonably cheap to compute for a single subsequence.

I think I see your point there and the difficulty of current data points pointing to an egressed point. Can we assume that in the case of the left matrix profile too? I think the process you explained below should solve this.

Cool! I agree with this. Only the global matrix profile and left matrix profile for the new subsequence should be updated. Great!

I take it back. Forget my suggestion to use the right matrix profile 😄

I think I will simply try to:

  1. Egress the oldest data point at each update
  2. Only update the global matrix profile and left matrix profile (and indices) for the newest ingressed subsequebce
  3. Any subsequence that has been previously assigned an MP/left MP will not be changed
  4. This will be the default behavior

Are you in agreement with this proposal?

Sounds good to me! ;)

You bring up a valid point that I had not considered. I think that there are two separate questions here:

  1. Can floss process streaming data? I would say, yes, certainly and by that answer I would consider it a real-time. Notice that I don't say "real-time algorithm" here
  2. Can the algorithm find a regime change as soon as it happens? No, it will probably struggle with this since it requires that you need to have seen enough of BOTH the left and right regimes before it can identify it. However, I don't think this is a fault of the algorithm. I'm guessing that this is likely by definition that regime change searches are a retrospective property but I'm by no means an expert here.

What do you think?

Thanks Sean, I agree with you on this and thanks for the explanation

seanlaw commented 4 years ago

Awesome! Thanks for talking through it with me. I'll work on this and let you know when it is available to clone and test

hmcoservit commented 4 years ago

@seanlaw You're welcome, looking forward to it

seanlaw commented 4 years ago

@hmcoservit It turned out to be a quite a bit of work to incorporate. Would you mind taking a look to see if I've made any mistakes or wrong assumptions?

hmcoservit commented 4 years ago

@seanlaw Thank you, I really appreciate the work you put into this. From my perspective it seems good! ;)

Here is also a small test I did:

T_full = np.random.rand(1000)
m = 4
warm_start = int(len(T_full)/10)

T_stream = T_full[:warm_start].copy()
stream = stumpy.aampi(T_stream, m , egress= True)

# incrementally update mp (shape of stream.T_ will always be same as warm_start )
for i in range(len(T_stream), len(T_full)):

    t = T_full[i]
    stream.update(t)
    # calculate another stream based on the egressed time series
    stream2 = stumpy.aampi(stream.T_, m , egress= False)
    # the last elements of left_P_ should be the same for both
    npt.assert_almost_equal(stream.left_P_[-1], stream2.left_P_[-1])

print('no errors')

What do you think?

seanlaw commented 4 years ago

It looks reasonable to me. I am more concerned that the output is what you expect and this sort of tests that. Is there any way that you can also check the "left matrix profile indices" to make sure they are correct? I think that the "matrix profile" values are the easy part but the indices may possibly be "off-by-one". I guess one way to check is to take the index and recompute the distance profile and compare. I know that you are more interested in the distances but it would be great if you could spend some time on verifying the indices match the distance as I feel like I've been staring at this for too long and have gotten "too close" to it to spot mistakes.

In case you missed it, egress=True should be the default now for both stumpi and aampi.

hmcoservit commented 4 years ago

It looks reasonable to me. I am more concerned that the output is what you expect and this sort of tests that. Is there any way that you can also check the "left matrix profile indices" to make sure they are correct? I think that the "matrix profile" values are the easy part but the indices may possibly be "off-by-one". I guess one way to check is to take the index and recompute the distance profile and compare. I know that you are more interested in the distances but it would be great if you could spend some time on verifying the indices match the distance as I feel like I've been staring at this for too long and have gotten "too close" to it to spot mistakes.

Totally understand your concern @seanlaw. I am not sure if this is a correct approach, but if we slide the indices for the second stream we will get matching indices and matching left matrix profile value. Let me know what you think, I can look into it further.

T_full = np.random.rand(1000)
m = 4
warm_start = int(len(T_full)/10)

T_stream = T_full[:warm_start].copy()
stream = stumpy.aampi(T_stream, m , egress= True)

n=0
# incrementally update mp (shape of stream.T_ will always be same as warm_start )
for i in range(len(T_stream), len(T_full)):

    t = T_full[i]
    stream.update(t)
    # calculate another stream based on the egressed time series
    stream2 = stumpy.aampi(stream.T_, m , egress= False)
    # the last elements of left_P_ should be the same for both
    npt.assert_almost_equal(stream.left_P_[-1], stream2.left_P_[-1])
    npt.assert_almost_equal(stream.I_[-1], stream2.I_[-1]+n)
    n+=1

print('no errors')
seanlaw commented 4 years ago

Yes, something like that. I was thinking more like for each matrix profile index, can we get grab the subsequence corresponding to that index and then compute the non-normalized Euclidean distance to it's nearest neighbor and see if it matches P[i].

So, let's say our current data being processed is T = [1.4, 4.8, 5.5, 7.8, 0.2, 3.3] with a window size of m=3 and the matrix profile and indices are P = [2.0, 0.4, 8.3, 3.1] and I = [0, 2, 1, 4], respectively (I just made these numbers up). So, you'd take, say, i=1 which correspond to I[i] = 2 and P[i] = 0.4 and you'd compute distance(T[i : i+m], T[I[i] : I[i] + m) and make sure that it is the same as P[i]

hmcoservit commented 4 years ago

Yes, something like that. I was thinking more like for each matrix profile index, can we get grab the subsequence corresponding to that index and then compute the non-normalized Euclidean distance to it's nearest neighbor and see if it matches P[i].

So, let's say our current data being processed is T = [1.4, 4.8, 5.5, 7.8, 0.2, 3.3] with a window size of m=3 and the matrix profile and indices are P = [2.0, 0.4, 8.3, 3.1] and I = [0, 2, 1, 4], respectively (I just made these numbers up). So, you'd take, say, i=1 which correspond to I[i] = 2 and P[i] = 0.4 and you'd compute distance(T[i : i+m], T[I[i] : I[i] + m) and make sure that it is the same as P[i]

@seanlaw Apologies for the delay. It seems like for each sub-sequence corresponding to an index, the euclidean distance to it's nearest neighbor matches the left matrix profile of that index. Do you find the test below to be correct?


def distance(sub1, sub2):
    D = 0
    for k in range(len(sub1)):
        D += (sub1[k] - sub2[k])**2
    return math.sqrt(D)

T_full = np.random.rand(1000)
m = 4
warm_start = int(len(T_full)/10)

T_stream = T_full[:warm_start].copy()
stream = stumpy.aampi(T_stream, m , egress= True)

n=0
# incrementally update mp (shape of stream.T_ will always be same as warm_start )
for i in range(len(T_stream), len(T_full)):

    t = T_full[i]
    stream.update(t)
    # calculate another stream based on the egressed time series
    stream2 = stumpy.aampi(stream.T_, m , egress= False)
    # the last elements of left_P_ should be the same for both
    npt.assert_almost_equal(stream.left_P_[-1], stream2.left_P_[-1])
    npt.assert_almost_equal(stream.I_[-1], stream2.I_[-1]+n)
    # left matrix profile value of the last element is same as the euclidean distance
    npt.assert_almost_equal(stream.left_P_[-1], distance(stream.T_[stream.I_[-1]-n:stream.I_[-1]+m-n], stream.T_[-m:] ))
    n+=1

print('no errors')
seanlaw commented 4 years ago

Aha! In fact there was an off-by-one recording error for the indices as I had suspected. I think that the matrix profile values are correct. It's only the indices that are referenced incorrectly. I'll do a little more testing and correct this.

seanlaw commented 4 years ago

Here's the test that failed for me:

def distance(sub1, sub2):
    D = 0
    for k in range(len(sub1)):
        D += (sub1[k] - sub2[k])**2
    return math.sqrt(D)

for i in range(100):
    T_full = np.random.rand(1000)
    m = 4
    warm_start = int(len(T_full)/10)
    warm_start = 10

    T_stream = T_full[:warm_start].copy()
    stream = stumpy.aampi(T_stream, m , egress= True)
    P = np.full(stream.P_.shape, np.inf)
    left_P = np.full(stream.left_P_.shape, np.inf)

    n=0
    # incrementally update mp (shape of stream.T_ will always be same as warm_start )
    for i in range(len(T_stream), len(T_full)):
        t = T_full[i]
        stream.update(t)
        for j, I in enumerate(stream.I_):
            if I < 0:
                P[j] = np.inf
            else:
                P[j] = distance(T_full[j + n + 1: j + n + 1 + m], T_full[I : I + m])
        npt.assert_almost_equal(stream.P_, P)
        for j, I in enumerate(stream.left_I_):
            if I < 0:
                left_P[j] = np.inf
            else:
                left_P[j] = distance(T_full[j + n + 1: j + n + 1 + m], T_full[I : I + m])
        npt.assert_almost_equal(stream.left_P_, left_P)
        n += 1
print('no errors')
seanlaw commented 4 years ago

I've pushed a fix for aampi and stumpi in this commit

seanlaw commented 4 years ago

This test may be a little faster and concise:

T_full = np.random.rand(1000)
m = 4
warm_start = int(len(T_full)/10)
warm_start = 10

T_stream = T_full[:warm_start].copy()
stream = stumpy.aampi(T_stream, m , egress= True)
P = np.full(stream.P_.shape, np.inf)
left_P = np.full(stream.left_P_.shape, np.inf)

T_full_subseq = stumpy.core.rolling_window(T_full, m)

n=0

for i in range(len(T_stream), len(T_full)):
    t = T_full[i]
    stream.update(t)

    P[:] = np.inf
    idx = np.argwhere(stream.I_ >= 0).flatten()
    P[idx] = norm(T_full_subseq[idx + n + 1] - T_full_subseq[stream.I_[idx]], axis=1)
    npt.assert_almost_equal(stream.P_, P)

    left_P[:] = np.inf
    idx = np.argwhere(stream.left_I_ >= 0).flatten()
    left_P[idx] = norm(T_full_subseq[idx + n + 1] - T_full_subseq[stream.left_I_[idx]], axis=1)
    npt.assert_almost_equal(stream.left_P_, left_P)

    n += 1
hmcoservit commented 4 years ago

Thanks @seanlaw , looks good. I will start further tests for anomaly detection.