Closed rafalsza closed 9 months ago
Hello, Thank you for your support. To answer your question, no I have not implemented any sort of "Volumized Order Block". Currently order blocks are the last candle before a FVG. Which specific Tradingview script are you referring to maybe I can look into implementing it.
1)Volumized Order Blocks | Flux Charts 2)MTF Order Block Finder try to find by name second one I've started to convert
I like the look of the "Volumized Order Blocks | Flux Charts" and if you are implementing the "MTF Order Block Finder" already I recon there should be an optional parameter you can pass to the ob function where you can specify the timeframe you want and if there is no timeframe specified then use the current chart data given.
this is the code I've managed to write so far
import matplotlib.pyplot as plt
import numpy as np
from binance.client import Client
from datetime import datetime
import pandas as pd
from loguru import logger
# Basic Configuration
resolution = "5" # Set the desired timeframe as a string
ob_period = 5
threshold = 0.3
bull_channels = 4
bear_channels = 4
# Advanced Configuration
doji = 0.05
fuzzy = 0.01
near_price = 0
# Style, Colors, and Drawing
style = "BOTH"
bullcolor = "green"
bearcolor = "red"
linewidth_hl = 1
linewidth_avg = 1
# Experimental
ob_shift = 1
ob_selector = "OHLC"
ob_search = 2
# Input Sanitization & Adjustments
ob_search = ob_search if ob_search >= ob_period else ob_period
ob_shift = ob_shift if ob_shift >= ob_period else ob_period
ob_period += 1
def import_data(symbol, start_str, timeframe):
logger.info(
"Importing data for symbol: {}, start: {}, timeframe: {}",
symbol,
start_str,
timeframe,
)
client = Client()
start_str = str(start_str)
end_str = f"{datetime.now()}"
df = pd.DataFrame(
client.get_historical_klines(
symbol=symbol, interval=timeframe, start_str=start_str, end_str=end_str
)
).astype(float)
df = df.iloc[:, :6]
df.columns = ["timestamp", "open", "high", "low", "close", "volume"]
df["time"] = df.timestamp
df.time = pd.to_datetime(df.time, unit="ms")
df = df.set_index("timestamp")
df.index = pd.to_datetime(df.index, unit="ms")
return df
# Returns the lowest wick of a data set and given search range
# A 'wick' is either: Low to Open, Low to Close
def low_wick_search(start_index, length, open_data, low_data, close_data):
logger.info(
"Searching for low wick at index {} with length {}", start_index, length
)
wick_high = float("nan")
wick_low = float("nan")
index = np.nan
for i in range(length + 1):
if low_data[start_index - i] > wick_low and i > 0:
continue
bar_dir = (
1 if close_data[start_index - i] - open_data[start_index - i] > 0 else -1
)
wick_high = (
close_data[start_index - i] if bar_dir == 1 else open_data[start_index - i]
)
wick_low = low_data[start_index - i]
index = i
return wick_high, wick_low, index
# Returns the highest wick of a data set and given search range
# A 'wick' is either: High to Open, High to Close
def high_wick_search(start_index, length, open_data, high_data, close_data):
logger.info(
"Searching for high wick at index {} with length {}", start_index, length
)
wick_high = float("nan")
wick_low = float("nan")
index = np.nan
for i in range(length + 1):
if high_data[start_index - i] < wick_high and i > 0:
continue
bar_dir = (
1 if close_data[start_index - i] - open_data[start_index - i] > 0 else -1
)
wick_low = (
open_data[start_index - i] if bar_dir == 1 else close_data[start_index - i]
)
wick_high = high_data[start_index - i]
index = i
return wick_high, wick_low, index
# Returns true if a candle has zero ticks (high, low, open, and close all equal)
def is_zero_candle(index, open_data, high_data, low_data, close_data):
base = open_data[index]
return (
base == close_data[index]
and base == high_data[index]
and base == low_data[index]
)
# Fetch historical stock data
symbol = "BTCUSDT" # Replace with the desired stock symbol
start_date = "2023-01-01"
end_date = "2024-01-29"
data = import_data(symbol, start_str=start_date, timeframe="1d")
# Extract necessary columns from the fetched data
open_data = data["open"].values
high_data = data["high"].values
low_data = data["low"].values
close_data = data["close"].values
# Use the functions
# Example: Find the lowest wick in the first 10 candles
low_wick_result = low_wick_search(9, 10, open_data, low_data, close_data)
print("Low Wick Result:", low_wick_result)
# Example: Find the highest wick in the first 10 candles
high_wick_result = high_wick_search(9, 10, open_data, high_data, close_data)
print("High Wick Result:", high_wick_result)
# Example: Check if the first candle has zero ticks
zero_candle_result = is_zero_candle(0, open_data, high_data, low_data, close_data)
print("Is Zero Candle:", zero_candle_result)
bullishOB = close_data[ob_period] < open_data[ob_period]
logger.info("bullishOB: {}", bullishOB)
bearishOB = close_data[ob_period] > open_data[ob_period]
logger.info("bearishOB: {}", bearishOB)
upcandles = 0
downcandles = 0
# Entire OB sequence has enough price movement to be valid (filter low volatility noise)
relmove = (
100 * ((abs(close_data[ob_period] - close_data[1])) / close_data[ob_period])
> threshold
)
logger.info("relmove: {}", relmove)
# Can't start with a Doji (active by default)
doji_candle = (
100 * abs(close_data[ob_period] - open_data[ob_period]) / open_data[ob_period]
> doji
)
logger.info("doji: {}", doji_candle)
for i in range(1, ob_period): # Check candles following the OB
if is_zero_candle(i, open_data, high_data, low_data, close_data):
continue
t_close = close_data[i]
t_open = open_data[i]
if np.abs(100 * (t_close - t_open) / t_open) < fuzzy:
upcandles += 1
downcandles += 1
continue
if t_close > t_open:
upcandles += 1
elif t_close < t_open:
downcandles += 1
if doji_candle and relmove:
OB_bull = bullishOB and (upcandles == (ob_period - 1))
logger.info("OB_bull: {},upcandles: {}", OB_bull, upcandles)
OB_bear = bearishOB and (downcandles == (ob_period - 1))
logger.info("OB_bear: {},downcandles: {}", OB_bear, downcandles)
OB_bull_chigh, OB_bull_clow = float("nan"), float(
"nan"
) # Initialize OB_bull_chigh and OB_bull_clow
OB_bear_chigh, OB_bear_clow = float("nan"), float(
"nan"
) # Initialize OB_bear_chigh and OB_bear_clow
if OB_bull:
logger.info("Bullish Order Block detected")
if ob_selector == "Context":
temp_high, temp_low, index = low_wick_search(
ob_period, ob_search, open_data, low_data, close_data
)
selector_shift = index
OB_bull_chigh = temp_high
OB_bull_clow = temp_low
elif ob_selector == "High/Low":
OB_bull_chigh = high_data[ob_period - ob_shift]
OB_bull_clow = low_data[ob_period - ob_shift]
elif ob_selector == "OHLC":
temp_high, temp_low, index = low_wick_search(
ob_period - 1, 0, open_data, low_data, close_data
)
OB_bull_chigh = temp_high
OB_bull_clow = temp_low
OB_bull_avg = (OB_bull_chigh + OB_bull_clow) / 2
if OB_bear:
logger.info("Bearish Order Block detected")
if ob_selector == "Context":
temp_high, temp_low, index = high_wick_search(
ob_period, ob_search, open_data, high_data, close_data
)
selector_shift = index
OB_bear_chigh = temp_high
OB_bear_clow = temp_low
elif ob_selector == "High/Low":
OB_bear_chigh = high_data[ob_period - 1]
OB_bear_clow = low_data[ob_period - 1]
elif ob_selector == "OHLC":
temp_high, temp_low, index = high_wick_search(
ob_period - 1, 0, open_data, high_data, close_data
)
OB_bear_chigh = temp_high
OB_bear_clow = temp_low
OB_bear_avg = (OB_bear_chigh + OB_bear_clow) / 2
# Plotting example (replace with your actual data)
plt.plot(data.index, data["close"], label="Close Price")
# Plotting order blocks
# if OB_bull:
# plt.fill_between(
# data.index[-ob_period:],
# OB_bull_clow,
# OB_bull_chigh,
# color=bullcolor,
# alpha=0.5,
# label="Bullish Order Block",
# )
#
# if OB_bear:
# plt.fill_between(
# data.index[-ob_period:],
# OB_bear_clow,
# OB_bear_chigh,
# color=bearcolor,
# alpha=0.5,
# label="Bearish Order Block",
# )
plt.title("Historical Stock Data with Order Blocks")
plt.xlabel("Date")
plt.ylabel("Close Price")
plt.legend()
plt.show()
`
Hi @rafalsza,
You don't seem to be using the OB function from smc, but instead you are identifying the order blocks using a different method, can I ask why this is?
And I think it would be more beneficial if you modify the existing OB function to add MTF support, what do you think?
yest, that's my goal. I want to use smc with some modifications to identify MTF order blocks. Code that I've pasted was an attempt to convert pinescript to python 1:1
oh I see, you then want to look into the "resample" pandas function, you will then be able to convert the timeframe of the OHLC data from one timeframe to another, then you can then call the OB function in smc using the resampled candle data. I hope this helps.
that's a problem cause I cannot understand how to do it according to pinescript
@rafalsza I think I need a more detailed description on what you are trying to achieve so I can help you. I create the SMC python functions from scratch and I don't copy any Pinescripts.
i'm interested in MTF order blocks and Volumized order blocks
Okay nice I will look into implementing these.
Hi,
Also im interested in order blocks and like you want to convert these scripts :
https://www.tradingview.com/v/bLdpFVuq/ or https://www.tradingview.com/v/piIbWMpY/
these both scripts based on same logic btw.
Hello,
I like the sound of Volumized Order Block https://www.tradingview.com/v/bLdpFVuq/ Can any one tell me the logic behind how this is calculated and I will look to implement it.
Hello,
I like the sound of Volumized Order Block https://www.tradingview.com/v/bLdpFVuq/ Can any one tell me the logic behind how this is calculated and I will look to implement it.
that d be great !
For bullish order blocks, the script checks if the closing price crosses above the detected swing high. If the closing price crosses above the high of the detected swing, it considers this a potential bullish order block. The script uses a loop to iterate over the detected swings (top) and, when the closing price crosses above the swing high, it marks the swing as crossed and calculates the relevant information for the order block, such as the start time, end time, and volume.
obVolume: The total volume associated with the order block. It is calculated by summing the volumes of the bars within the order block. bbVolume:
The volume associated with the breaker bar. If the order block is invalidated and marked as a breaker, this value represents the volume at the time of the break.
obLowVolume: The volume associated with the lower boundary of the order block.
obHighVolume: The volume associated with the upper boundary of the order block.
breaker: A boolean indicating whether the order block has been invalidated (broken). If true, the order block has been broken by price movement.
maybe this will help
I can't seem to get my head around this one, and Its a bit out of my scope. Can someone implement it and make a contribution push request to this project for me to review.
made it
Nice work! could you make a pull request to the project?
uploaded first version to my github https://github.com/rafalsza/trading-scripts/blob/main/volumized_order_blocks.py now we need to adapt the code to smartmoneyconcepts
uploaded first version to my github https://github.com/rafalsza/trading-scripts/blob/main/volumized_order_blocks.py now we need to adapt the code to smartmoneyconcepts
import ccxt
import pandas as pd
import pandas_ta as ta
import numpy as np
import os
from datetime import date, datetime, timezone, tzinfo
import time, schedule
import numpy as np
import requests
from math import floor
import ta as tl
import math
import functools
from pandas import DataFrame
import warnings
from io import StringIO
from pathlib import Path
from collections import deque
import itertools
import pprint
warnings.filterwarnings("ignore")
symbol = "ENJ/USDT" # Binance
pos_size =1
timeframe = "15m"
# API TANIMLAMALARI
account_binance = ccxt.binance({
"apiKey": 'API',
"secret": 'Key',
"enableRateLimit": True,
'options': {
'defaultType': 'future'
}
})
while True:
try:
orderTime = datetime.utcnow()
ohlcvLB = account_binance.fetch_ohlcv(symbol, timeframe)
dfLB = pd.DataFrame(ohlcvLB, columns=['time', 'open', 'high', 'low', 'close', 'volume'])
indiPoint = pd.DataFrame(columns=['time'])
if len(ohlcvLB):
def find_ob_swings2(df, length):
ob_swings = {"top": [], "bottom": []}
swing_type = 0
# Calculate the highest high and lowest low for the specified length
upper = df["high"].rolling(window=length).max()
lower = df["low"].rolling(window=length).min()
# Concatenate upper and lower to df
df["upper"] = upper
df["lower"] = lower
# Initialize the previous swing type
prev_swing_type = 0
# Iterate over each index in the dataframe
for i in range(len(df)):
try:
# Determine the swing type
if df["high"].iloc[i] > upper.iloc[i + length]:
swing_type = 0
elif df["low"].iloc[i] < lower.iloc[i + length]:
swing_type = 1
# Check if it's a new top or bottom
if swing_type == 0 and prev_swing_type != 0:
ob_swings["top"].append(
{
"index": i,
"loc": df.index[i],
"price": df["high"].iloc[i],
"volume": df["volume"].iloc[i],
"crossed": False,
}
)
elif swing_type == 1 and prev_swing_type != 1:
ob_swings["bottom"].append(
{
"index": i,
"loc": df.index[i],
"price": df["low"].iloc[i],
"volume": df["volume"].iloc[i],
"crossed": False,
}
)
# Update the previous swing type
prev_swing_type = swing_type
except IndexError:
pass
return ob_swings["top"], ob_swings["bottom"]
def findOrderBlocks(df, maxDistanceToLastBar, swingLength, obEndMethod, maxOrderBlocks):
bullishOrderBlocksList = []
bearishOrderBlocksList = []
bar_index = len(df) - 1
last_bar_index = max(0, bar_index - maxDistanceToLastBar)
if bar_index > last_bar_index - maxDistanceToLastBar:
top, btm = find_ob_swings2(df, swingLength)
useBody = False
# Bullish Order Block
for close_index in range(len(df)):
close_price = df["close"].iloc[close_index]
bullishBreaked = 0
if len(bullishOrderBlocksList) > 0:
for i in range(len(bullishOrderBlocksList) - 1, -1, -1):
currentOB = bullishOrderBlocksList[i]
if not currentOB["breaker"]:
if (
obEndMethod == "Wick"
and df.low.iloc[close_index - 1] < currentOB["bottom"]
) or (
obEndMethod != "Wick"
and df.low.iloc[close_index - 1]
< currentOB[["open", "close"]].min()
):
currentOB["breaker"] = True
currentOB["breakTime"] = df.index[close_index - 1]
currentOB["bbvolume"] = df["volume"].iloc[close_index - 1]
else:
if df.high.iloc[close_index] > currentOB["top"]:
bullishOrderBlocksList.pop(i)
elif (
i < 10
and top[i]["price"] < currentOB["top"]
and top[i]["price"] > currentOB["bottom"]
):
bullishBreaked = 1
last_top_index = None
for i in range(len(top)):
if top[i]["index"] < close_index:
last_top_index = i
if last_top_index is not None:
swing_top_price = top[last_top_index]["price"]
if close_price > swing_top_price and not top[last_top_index]["crossed"]:
top[last_top_index]["crossed"] = True
boxBtm = df.high.iloc[close_index - 1]
boxTop = df.low.iloc[close_index - 1]
boxLoc = df.index[close_index - 1]
for j in range(1, close_index - top[last_top_index]["index"]):
boxBtm = min(
df.low.iloc[top[last_top_index]["index"] + j], boxBtm
)
if boxBtm == df.low.iloc[top[last_top_index]["index"] + j]:
boxTop = df.high.iloc[top[last_top_index]["index"] + j]
boxLoc = (
df.index[top[last_top_index]["index"] + j]
if boxBtm == df.low.iloc[top[last_top_index]["index"] + j]
else boxLoc
)
newOrderBlockInfo = {
"top": boxTop,
"bottom": boxBtm,
"volume": df["volume"].iloc[close_index]
+ df["volume"].iloc[close_index - 1]
+ df["volume"].iloc[close_index - 2],
"type": "Bull",
"loc": boxLoc,
"loc_number": close_index,
"index": len(
bullishOrderBlocksList
),
"oblowvolume": df["volume"].iloc[close_index - 2],
"obhighvolume": (
df["volume"].iloc[close_index]
+ df["volume"].iloc[close_index - 1]
),
"breaker": False,
}
bullishOrderBlocksList.insert(0, newOrderBlockInfo)
if len(bullishOrderBlocksList) > maxOrderBlocks:
bullishOrderBlocksList.pop()
# break
for close_index in range(len(df)):
close_price = df["close"].iloc[close_index]
# Bearish Order Block
bearishBreaked = 0
if len(bearishOrderBlocksList) > 0:
for i in range(len(bearishOrderBlocksList) - 1, -1, -1):
currentOB = bearishOrderBlocksList[i]
if not currentOB["breaker"]:
if (
obEndMethod == "Wick"
and df.high.iloc[close_index] > currentOB["top"]
) or (
obEndMethod != "Wick"
and df.high.iloc[close_index - 1]
> currentOB[["open", "close"]].max()
):
currentOB["breaker"] = True
currentOB["breakTime"] = df.index[close_index]
currentOB["bbvolume"] = df["volume"].iloc[close_index]
else:
if df.low.iloc[close_index] < currentOB["bottom"]:
bearishOrderBlocksList.pop(i)
elif (
i < 10
and btm[i]["price"] > currentOB["bottom"]
and btm[i]["price"] < currentOB["top"]
):
bearishBreaked = 1
last_btm_index = None
for i in range(len(btm)):
if btm[i]["index"] < close_index:
last_btm_index = i
if last_btm_index is not None:
swing_btm_price = btm[last_btm_index]["price"]
if close_price < swing_btm_price and not btm[last_btm_index]["crossed"]:
btm[last_btm_index]["crossed"] = True
boxBtm = df.low.iloc[close_index - 1]
boxTop = df.high.iloc[close_index - 1]
boxLoc = df.index[close_index - 1]
for j in range(1, close_index - btm[last_btm_index]["index"]):
boxTop = max(
df.high.iloc[btm[last_btm_index]["index"] + j], boxTop
)
boxBtm = (
df.low.iloc[btm[last_btm_index]["index"] + j]
if boxTop == df.high.iloc[btm[last_btm_index]["index"] + j]
else boxBtm
)
boxLoc = (
df.index[btm[last_btm_index]["index"] + j]
if boxTop == df.high.iloc[btm[last_btm_index]["index"] + j]
else boxLoc
)
newOrderBlockInfo = {
"top": boxTop,
"bottom": boxBtm,
"volume": df["volume"].iloc[close_index]
+ df["volume"].iloc[close_index - 1]
+ df["volume"].iloc[close_index - 2],
"type": "Bear",
"loc": boxLoc,
"loc_number": close_index,
"index": len(bearishOrderBlocksList),
"oblowvolume": (
df["volume"].iloc[close_index]
+ df["volume"].iloc[close_index - 1]
),
"obhighvolume": df["volume"].iloc[close_index - 2],
"breaker": False,
}
bearishOrderBlocksList.insert(0, newOrderBlockInfo)
if len(bearishOrderBlocksList) > maxOrderBlocks:
bearishOrderBlocksList.pop()
break
return bullishOrderBlocksList
test = findOrderBlocks(dfLB,100, 10,"Wick",1)
pprint.pprint(test)
break
except Exception as e:
print(e)
continue
i guess these breaks should be deleted,
if len(bullishOrderBlocksList) > maxOrderBlocks:
bullishOrderBlocksList.pop()
# break
if len(bearishOrderBlocksList) > maxOrderBlocks:
bearishOrderBlocksList.pop()
# break
otherwise it prints old order blocks, (e.g. : assume we want to print 1 ob, it prints first, not a last.) and maxDistanceToLastBar feature doesnt work properly, when i limit it "100" it still shows order block that older than 100 bars
ENJUSDT.P - 15m - Binance ( 18 Feb 2024 14:45 UTC+3)
you can't compare spot data with perpetual, these are different dataframes
you can't compare spot data with perpetual, these are different dataframes
I changed time frame and spot with futures klines in code. Smth wrong in code
i made some changes, check on last version
Hi! Great work! I have a question about Volumized Order Block (tradingview script).Did you tried to convert this to python?