MerlinR / Hexital

Hexital - Incremental Technical Analysis Library
MIT License
9 stars 2 forks source link

not sure about correctness of ADX #20

Open iSkywalker168 opened 4 days ago

iSkywalker168 commented 4 days ago

image as you can see, tulipy, talib, pandas_ta produce different ADX value at index 32.

it seems, talib and pandas_ta got very close ADX value, and I try to calculate ADX with formula from https://en.wikipedia.org/wiki/Average_directional_movement_index

and got ADX at index 31 for 9.41588407 which is very close to talib and pandas_ta. but not use whether my calculation is correct, do you have any ideas?

here is my testing code(need to download the attachment):

import pandas as pd
import pandas_ta as pta
from dataclasses import dataclass
import talib
from datetime import timedelta
import hexital
import math
import json
import tulipy
import pickle
import copy
from talipp.indicator_util import composite_to_lists
import talipp.indicators
from talipp.ohlcv import OHLCVFactory, OHLCV

pd.options.display.max_columns = None
pd.options.display.max_rows = None
pd.options.display.width = None
pd.options.display.precision = 10
pd.options.display.unicode.east_asian_width = True

with open(r"D:\work\xstarwalker168\Python\Finance\data\DOGE`FDUSD`5m`pandas_ta.zip", "rb") as file:
    df = pickle.load(file)

df.rename(columns={"OpenTime": "timestamp"}, inplace=True)
df.columns = df.columns.str.lower()
ohlcv_dicts = df.to_dict(orient="records")

# necessary to set index as datetime for pandas_ta
df["timestamp"] = pd.to_datetime(df["timestamp"])
df.set_index("timestamp", inplace=True)

# Convert the columns to NumPy arrays
open = df["open"].to_numpy()
high = df["high"].to_numpy()
low = df["low"].to_numpy()
close = df["close"].to_numpy()
volume = df["volume"].to_numpy()

len = 16
decimal_places = 10

pta_atr = pta.atr(df.high, df.low, df.close, length=len).round(decimal_places).to_list()
pta_vwap = pta.vwap(df.high, df.low, df.close, df.volume).round(decimal_places).to_list()
pta_rsi = pta.rsi(df.close, len).round(decimal_places).to_list()
pta_sma = pta.sma(df.close, len).round(decimal_places).to_list()
pta_ema = pta.ema(df.close, len).round(decimal_places).to_list()
pta_wma = pta.wma(df.close, len).round(decimal_places).to_list()
pta_rma = pta.rma(df.close, len).round(decimal_places).to_list()
pta_vwma = pta.vwma(df.close, df.volume, len).round(decimal_places).to_list()

pta_adx = pta.adx(df.high, df.low, df.close, length=len, lensig=len)
pta_adx = pta_adx.round(decimal_places)
talib_adx = talib.ADX(df.high, df.low, df.close, len)
tulipy_adx = tulipy.adx(high, low, close, len)

pta_bbands = pta.bbands(df.close, length=len, std=2.0)
pta_bbands = pta_bbands.round(decimal_places)

candles_lifespan = timedelta(minutes=(3 * len) * 5)
hexital_atr_obj = hexital.ATR(period=len, round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_vwap_obj = hexital.VWAP(round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_rsi_obj = hexital.RSI(period=len, round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_bbands_obj = hexital.BBANDS(period=len, round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_sma_obj = hexital.SMA(period=len, round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_ema_obj = hexital.EMA(period=len, round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_wma_obj = hexital.WMA(period=len, round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_rma_obj = hexital.RMA(period=len, round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_vwma_obj = hexital.VWMA(period=len, round_value=decimal_places, candles_lifespan=candles_lifespan)
hexital_adx_obj = hexital.ADX(period=len, period_signal=len, round_value=decimal_places, candles_lifespan=candles_lifespan)

for index, ohlcv_dict in enumerate(ohlcv_dicts):
    candle = hexital.Candle.from_dict(ohlcv_dict)
    print(f"checking {index}~")

    # BANDS
    hexital_bbands_obj.append(copy.deepcopy(candle))
    hexital_bbands_val = hexital_bbands_obj.reading()
    hexital_bbands_bbl = hexital_bbands_val["BBL"]
    hexital_bbands_bbm = hexital_bbands_val["BBM"]
    hexital_bbands_bbu = hexital_bbands_val["BBU"]
    if all([hexital_bbands_bbl, hexital_bbands_bbm, hexital_bbands_bbu]):
        assert math.isclose(hexital_bbands_bbl, pta_bbands.iloc[index][f"BBL_{len}_2.0"], abs_tol=1e-9)
        assert math.isclose(hexital_bbands_bbm, pta_bbands.iloc[index][f"BBM_{len}_2.0"], abs_tol=1e-9)
        assert math.isclose(hexital_bbands_bbu, pta_bbands.iloc[index][f"BBU_{len}_2.0"], abs_tol=1e-9)

    # RSI
    hexital_rsi_obj.append(copy.deepcopy(candle))
    hexital_rsi_val = hexital_rsi_obj.reading()
    if hexital_rsi_val is not None:
        assert math.isclose(hexital_rsi_val, pta_rsi[index], abs_tol=1e-9)

    # VWAP
    hexital_vwap_obj.append(copy.deepcopy(candle))
    hexital_vwap_val = hexital_vwap_obj.reading()
    if hexital_vwap_val is not None:
        assert math.isclose(hexital_vwap_val, pta_vwap[index], abs_tol=1e-9)
        ...

    # ATR
    hexital_atr_obj.append(copy.deepcopy(candle))
    hexital_atr_val = hexital_atr_obj.reading()
    if hexital_atr_val is not None:
        assert math.isclose(hexital_atr_val, pta_atr[index], abs_tol=1e-9)

    # ADX
    hexital_adx_obj.append(copy.deepcopy(candle))
    hexital_adx_val = hexital_adx_obj.reading()
    hexital_adx_adx = hexital_adx_val["ADX"]
    hexital_adx_dmp = hexital_adx_val["DM_Plus"]
    hexital_adx_dmn = hexital_adx_val["DM_Neg"]
    if all([hexital_adx_adx, hexital_adx_dmp, hexital_adx_dmn]):
        assert math.isclose(hexital_adx_adx, pta_adx.iloc[index][f"ADX_{len}"], abs_tol=1e-9)
        assert math.isclose(hexital_adx_dmp, pta_adx.iloc[index][f"DMP_{len}"], abs_tol=1e-9)
        assert math.isclose(hexital_adx_dmn, pta_adx.iloc[index][f"DMN_{len}"], abs_tol=1e-9)
        ...

    # SMA
    hexital_sma_obj.append(copy.deepcopy(candle))
    hexital_sma_val = hexital_sma_obj.reading()
    if hexital_sma_val is not None:
        assert math.isclose(hexital_sma_val, pta_sma[index], abs_tol=1e-9)

    # EMA
    hexital_ema_obj.append(copy.deepcopy(candle))
    hexital_ema_val = hexital_ema_obj.reading()
    if hexital_ema_val is not None:
        assert math.isclose(hexital_ema_val, pta_ema[index], abs_tol=1e-9)

    # WMA
    hexital_wma_obj.append(copy.deepcopy(candle))
    hexital_wma_val = hexital_wma_obj.reading()
    if hexital_wma_val is not None:
        assert math.isclose(hexital_wma_val, pta_wma[index], abs_tol=1e-9)

    # RMA
    hexital_rma_obj.append(copy.deepcopy(candle))
    hexital_rma_val = hexital_rma_obj.reading()
    if hexital_rma_val is not None:
        # pandas is calculating wrong
        # assert math.isclose(hexital_rma_val, pta_rma[index], abs_tol=1e-9)
        ...

    # VMWA
    hexital_vwma_obj.append(copy.deepcopy(candle))
    hexital_vwma_val = hexital_vwma_obj.reading()
    if hexital_vwma_val is not None:
        assert math.isclose(hexital_vwma_val, pta_vwma[index], abs_tol=1e-9)

DOGEFDUSD5m`pandas_ta.zip

and here is my own ADX calculation code:

import pandas as pd
import numpy as np
import pickle

def calculate_adx(df, period=16):
    # Ensure df has the correct column names
    assert all(col in df.columns for col in ["High", "Low", "Close"]), "DataFrame must contain 'High', 'Low', and 'Close' columns."

    # Calculate True Range (TR)
    df["TR"] = df[["High", "Low", "Close"]].apply(lambda x: max(x["High"] - x["Low"], abs(x["High"] - df["Close"].shift(1)[x.name]), abs(x["Low"] - df["Close"].shift(1)[x.name])), axis=1)

    # Calculate +DM and -DM
    df["UpMove"] = df["High"] - df["High"].shift(1)
    df["DownMove"] = df["Low"].shift(1) - df["Low"]

    df["+DM"] = np.where((df["UpMove"] > df["DownMove"]) & (df["UpMove"] > 0), df["UpMove"], 0)
    df["-DM"] = np.where((df["DownMove"] > df["UpMove"]) & (df["DownMove"] > 0), df["DownMove"], 0)

    # Calculate ATR (Average True Range)
    df["ATR"] = df["TR"].rolling(window=period, min_periods=1).mean()

    # Calculate Smoothed +DM and -DM
    df["Smoothed+DM"] = df["+DM"].rolling(window=period, min_periods=1).mean()
    df["Smoothed-DM"] = df["-DM"].rolling(window=period, min_periods=1).mean()

    # Calculate +DI and -DI
    df["+DI"] = 100 * df["Smoothed+DM"] / df["ATR"]
    df["-DI"] = 100 * df["Smoothed-DM"] / df["ATR"]

    # Calculate DX
    df["DX"] = 100 * abs(df["+DI"] - df["-DI"]) / (df["+DI"] + df["-DI"])

    # Calculate ADX
    df["ADX"] = df["DX"].rolling(window=period, min_periods=1).mean()

    return df[["ADX"]]

def calculate_adx2(high, low, close, period=16):
    # Ensure arrays are of the same length
    assert len(high) == len(low) == len(close), "Input arrays must be of the same length."

    # Initialize arrays
    plus_dm = np.zeros(len(high))
    minus_dm = np.zeros(len(high))
    tr = np.zeros(len(high))
    plus_di = np.zeros(len(high))
    minus_di = np.zeros(len(high))
    dx = np.zeros(len(high))
    adx = np.zeros(len(high))

    # Calculate +DM, -DM, and TR
    for i in range(1, len(high)):
        high_diff = high[i] - high[i - 1]
        low_diff = low[i - 1] - low[i]

        # +DM and -DM
        plus_dm[i] = high_diff if high_diff > low_diff and high_diff > 0 else 0
        minus_dm[i] = low_diff if low_diff > high_diff and low_diff > 0 else 0

        # True Range (TR)
        tr[i] = max(high[i] - low[i], abs(high[i] - close[i - 1]), abs(low[i] - close[i - 1]))

    # Smooth the +DM, -DM, and TR using the exponential smoothing formula
    smooth_plus_dm = np.zeros(len(high))
    smooth_minus_dm = np.zeros(len(high))
    smooth_tr = np.zeros(len(high))

    # Initialize the first smoothed values at the index period-1
    smooth_plus_dm[period - 1] = np.sum(plus_dm[:period])
    smooth_minus_dm[period - 1] = np.sum(minus_dm[:period])
    smooth_tr[period - 1] = np.sum(tr[:period])

    # Continue smoothing for the remaining periods
    for i in range(period, len(high)):
        smooth_plus_dm[i] = smooth_plus_dm[i - 1] - (smooth_plus_dm[i - 1] / period) + plus_dm[i]
        smooth_minus_dm[i] = smooth_minus_dm[i - 1] - (smooth_minus_dm[i - 1] / period) + minus_dm[i]
        smooth_tr[i] = smooth_tr[i - 1] - (smooth_tr[i - 1] / period) + tr[i]

    # Calculate +DI and -DI
    plus_di = (smooth_plus_dm / smooth_tr) * 100
    minus_di = (smooth_minus_dm / smooth_tr) * 100

    # Calculate DX
    dx = (np.abs(plus_di - minus_di) / (plus_di + minus_di)) * 100

    # Initialize ADX after first valid DX period (which starts after two full periods)
    adx[period * 2 - 1] = np.mean(dx[period - 1:period * 2 - 1])  # First ADX at index 2 * period - 1
    for i in range(period * 2, len(high)):
        adx[i] = ((adx[i - 1] * (period - 1)) + dx[i]) / period

    return adx

with open(r"DOGE`FDUSD`5m`pandas_ta.zip", "rb") as file:
    df = pickle.load(file)

df.rename(columns={"OpenTime": "timestamp"}, inplace=True)
ohlcv_dicts = df.to_dict(orient="records")

# necessary to set index as datetime for pandas_ta
df["timestamp"] = pd.to_datetime(df["timestamp"])
df.set_index("timestamp", inplace=True)

pd.options.display.max_columns = None
pd.options.display.max_rows = None
pd.options.display.width = None
pd.options.display.precision = 10
pd.options.display.unicode.east_asian_width = True

open = df["Open"].to_numpy()
high = df["High"].to_numpy()
low = df["Low"].to_numpy()
close = df["Close"].to_numpy()
volume = df["Volume"].to_numpy()
# Calculate ADX
adx_df = calculate_adx2(high, low, close, 16)
print(adx_df)
MerlinR commented 5 hours ago

I need to come back to ADX, i have some differences in first hundred or so readings, I couldn't pinpoint the issue. I think part of the issue is ADX in pandas_ta uses RMA, which does not calculate accurately, and with ADX using it multiple times the difference gets amplified.