TA-Lib / ta-lib-python

Python wrapper for TA-Lib (http://ta-lib.org/).
http://ta-lib.github.io/ta-lib-python
Other
9.46k stars 1.74k forks source link

Unable to access a ta-lib function from multiple threads #540

Closed robcaulk closed 1 year ago

robcaulk commented 1 year ago

Over at freqtrade, we are using ta-lib in a multithreaded fashion and we have noticed that, on occasion, the library will throw an error that looks like this:

  File "/home/opc/freqtrade/user_data/strategies/EI3v2_RSI_CONS_TRAIL_AI.py", line 425, in populate_any_indicators
    informative[f"%-{coin}rsi-period_{t}"] = ta.RSI(informative, timeperiod=t)
  File "talib/_abstract.pxi", line 439, in talib._ta_lib.Function.__call__
  File "talib/_abstract.pxi", line 350, in talib._ta_lib.Function.outputs
  File "/home/opc/freqtrade/.env/lib64/python3.9/site-packages/pandas/core/series.py", line 442, in __init__
    com.require_length_match(data, index)
  File "/home/opc/freqtrade/.env/lib64/python3.9/site-packages/pandas/core/common.py", line 557, in require_length_match
    raise ValueError(
ValueError: Length of values (4005) does not match length of index (999)

Which is due to two threads simultaneously wanting to use ta.RSI() and using the ta.abstract interface

Is there a standard way to use ta-lib from multiple threads simultaneously? Or does ta-lib need a modification in the backend? If so, do you have any idea where in the ta-lib backend we should start looking?

mrjbq7 commented 1 year ago

Have a look at https://github.com/mrjbq7/ta-lib/blob/master/tools/threads_talib.py. I just updated it to assert the results are as expected and it seems to work properly in multiple threads.

I then made this diff and tested the abstract interface which also seems to work:

diff --git a/tools/threads_talib.py b/tools/threads_talib.py
index ce861a4a..209bb971 100644
--- a/tools/threads_talib.py
+++ b/tools/threads_talib.py
@@ -3,6 +3,7 @@ from __future__ import print_function
 import numpy
 from numpy.testing import assert_array_equal
 import talib
+import talib.abstract
 import sys

 TEST_LEN = int(sys.argv[1]) if len(sys.argv) > 1 else 10000
@@ -23,15 +24,15 @@ def loop():
     global total
     while total < LOOPS:
         total += 1
-        out1 = talib.MA(data)
+        out1 = talib.abstract.MA(data)
         assert_array_equal(out1, ref1)
-        out2, out3, out4 = talib.BBANDS(data)
+        out2, out3, out4 = talib.abstract.BBANDS(data)
         assert_array_equal(out2, ref2)
         assert_array_equal(out3, ref3)
         assert_array_equal(out4, ref4)
-        out5 = talib.KAMA(data)
+        out5 = talib.abstract.KAMA(data)
         assert_array_equal(out5, ref5)
-        out6 = talib.CDLMORNINGDOJISTAR(data, data, data, data)
+        out6 = talib.abstract.CDLMORNINGDOJISTAR(data, data, data, data)
         assert_array_equal(out6, ref6)

 import time

Would you be able to help me make a test case that fails?

robcaulk commented 1 year ago

Sure no problem. Thanks for the attention to the issue :-)

mrjbq7 commented 1 year ago

In case you want to workaround it, you can cache your indicators per-thread:

import threading
import talib.abstract

def indicator(name, local=threading.local()):
    value = getattr(local, name, None)
    if value is None:
        value = talib.abstract.Function(name)
        local.name = value
    return value

# in a thread
df['RSI'] = indicator('RSI')(prices)

I have a couple options to fix this...

1) make it so that abstract.RSI creates a thread-local function like the above example, but then from talib.abstract import RSI will not be thread-safe.

2) change the abstract.Function to store all of its values in thread-local or thread-safe storage...

3) maybe something else

Thoughts?

mrjbq7 commented 1 year ago

Okay, I pushed a fix, the Abstract API now has all the local caching stored behind a threading.local()... and your test case passes.

Are you interested in testing this? I'd be happy to do some more testing and make a new release soon for you.

robcaulk commented 1 year ago

If the test passes, then our use case passes. Thanks for the fix.

mrjbq7 commented 1 year ago

Oh, I’m sorry! I just adapted the code to be run under Pytest as a test case on the automated builders.

I thought the PR was example code! I’ll merge that PR. Sorry!

On Sep 7, 2022, at 3:30 AM, Robert Caulk @.***> wrote:

 If the test passes, then our use case passes. Thanks for the fix.

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you modified the open/close state.

mrjbq7 commented 1 year ago

@robcaulk I released version 0.4.25 with the fixes for threading... want to try using it with freqtrade using your parallel code?

robcaulk commented 1 year ago

Thanks alot, I'll try it as soon as I get a chance!

robcaulk commented 1 year ago

@robcaulk I released version 0.4.25 with the fixes for threading... want to try using it with freqtrade using your parallel code?

This finally made its way down into FreqAI. We removed all parallel locks and no longer see any issues.

Thanks again for the support!