Kismuz / btgym

Scalable, event-driven, deep-learning-friendly backtesting library
https://kismuz.github.io/btgym/
GNU Lesser General Public License v3.0
985 stars 260 forks source link

Problem with observation_space.shape #4

Closed shokimble closed 7 years ago

shokimble commented 7 years ago

Hi,

Finally got around to trying a real "deep learning" implementation against btgym and I've run up against a problem. I really don't know enough about openai gym to understand what the problem is fully but it seems to me based on the implementation I am trying and observing other implementations that observation_space.shape returns inconsistent values.

I tried modifying this basic A3C implementation: https://github.com/jaara/AI-blog/blob/master/CartPole-A3C.py

I've attached my attempt below but when I run it I get the error:

ValueError: Error when checking : expected input_1 to have 2 dimensions, but got array with shape (1, 4, 30)

When I print observation_space.shape it seems to change all the time which is expected but I believe it should always follow the same format (again I'm a bit of a newbie with this stuff so I could be wrong).

Are you able to take a look? I'll keep digging and try another implementation - I'm thinking a DQN implementation. The problem with most of the implementations for DQN or A3C is that they rely on the state being implemented as a screenshot so an array of rows and columns sometimes represented as RGB or often just 0 or 1.

Keep up the good work BTW.

# OpenGym CartPole-v0 with A3C on GPU
# -----------------------------------
#
# A3C implementation with GPU optimizer threads.
# 
# Made as part of blog series Let's make an A3C, available at
# https://jaromiru.com/2017/02/16/lets-make-an-a3c-theory/
#
# author: Jaromir Janisch, 2017

import numpy as np
import tensorflow as tf

import gym, time, random, threading
from btgym import BTgymEnv

from keras.models import *
from keras.layers import *
from keras import backend as K

#-- constants
ENV = 'backtrader-v46'

RUN_TIME = 30
THREADS = 8
OPTIMIZERS = 2
THREAD_DELAY = 0.001

GAMMA = 0.99

N_STEP_RETURN = 8
GAMMA_N = GAMMA ** N_STEP_RETURN

EPS_START = 0.4
EPS_STOP  = .15
EPS_STEPS = 75000

MIN_BATCH = 32
LEARNING_RATE = 5e-3

LOSS_V = .5         # v loss coefficient
LOSS_ENTROPY = .01  # entropy coefficient

#---------
class Brain:
    train_queue = [ [], [], [], [], [] ]    # s, a, r, s', s' terminal mask
    lock_queue = threading.Lock()

    def __init__(self):
        self.session = tf.Session()
        K.set_session(self.session)
        K.manual_variable_initialization(True)

        self.model = self._build_model()
        self.graph = self._build_graph(self.model)

        self.session.run(tf.global_variables_initializer())
        self.default_graph = tf.get_default_graph()

        self.default_graph.finalize()   # avoid modifications

    def _build_model(self):

        l_input = Input( batch_shape=(None, NUM_STATE) )
        l_dense = Dense(16, activation='relu')(l_input)

        out_actions = Dense(NUM_ACTIONS, activation='softmax')(l_dense)
        out_value   = Dense(1, activation='linear')(l_dense)

        model = Model(inputs=[l_input], outputs=[out_actions, out_value])
        model._make_predict_function()  # have to initialize before threading

        return model

    def _build_graph(self, model):
        s_t = tf.placeholder(tf.float32, shape=(None, NUM_STATE))
        a_t = tf.placeholder(tf.float32, shape=(None, NUM_ACTIONS))
        r_t = tf.placeholder(tf.float32, shape=(None, 1)) # not immediate, but discounted n step reward

        p, v = model(s_t)

        log_prob = tf.log( tf.reduce_sum(p * a_t, axis=1, keep_dims=True) + 1e-10)
        advantage = r_t - v

        loss_policy = - log_prob * tf.stop_gradient(advantage)                                  # maximize policy
        loss_value  = LOSS_V * tf.square(advantage)                                             # minimize value error
        entropy = LOSS_ENTROPY * tf.reduce_sum(p * tf.log(p + 1e-10), axis=1, keep_dims=True)   # maximize entropy (regularization)

        loss_total = tf.reduce_mean(loss_policy + loss_value + entropy)

        optimizer = tf.train.RMSPropOptimizer(LEARNING_RATE, decay=.99)
        minimize = optimizer.minimize(loss_total)

        return s_t, a_t, r_t, minimize

    def optimize(self):
        if len(self.train_queue[0]) < MIN_BATCH:
            time.sleep(0)   # yield
            return

        with self.lock_queue:
            if len(self.train_queue[0]) < MIN_BATCH:    # more thread could have passed without lock
                return                                  # we can't yield inside lock

            s, a, r, s_, s_mask = self.train_queue
            self.train_queue = [ [], [], [], [], [] ]

        s = np.vstack(s)
        a = np.vstack(a)
        r = np.vstack(r)
        s_ = np.vstack(s_)
        s_mask = np.vstack(s_mask)

        if len(s) > 5*MIN_BATCH: print("Optimizer alert! Minimizing batch of %d" % len(s))

        v = self.predict_v(s_)
        r = r + GAMMA_N * v * s_mask    # set v to 0 where s_ is terminal state

        s_t, a_t, r_t, minimize = self.graph
        self.session.run(minimize, feed_dict={s_t: s, a_t: a, r_t: r})

    def train_push(self, s, a, r, s_):
        with self.lock_queue:
            self.train_queue[0].append(s)
            self.train_queue[1].append(a)
            self.train_queue[2].append(r)

            if s_ is None:
                self.train_queue[3].append(NONE_STATE)
                self.train_queue[4].append(0.)
            else:   
                self.train_queue[3].append(s_)
                self.train_queue[4].append(1.)

    def predict(self, s):
        with self.default_graph.as_default():
            p, v = self.model.predict(s)
            return p, v

    def predict_p(self, s):
        with self.default_graph.as_default():
            p, v = self.model.predict(s)        
            return p

    def predict_v(self, s):
        with self.default_graph.as_default():
            p, v = self.model.predict(s)        
            return v

#---------
frames = 0
class Agent:
    def __init__(self, eps_start, eps_end, eps_steps):
        self.eps_start = eps_start
        self.eps_end   = eps_end
        self.eps_steps = eps_steps

        self.memory = []    # used for n_step return
        self.R = 0.

    def getEpsilon(self):
        if(frames >= self.eps_steps):
            return self.eps_end
        else:
            return self.eps_start + frames * (self.eps_end - self.eps_start) / self.eps_steps   # linearly interpolate

    def act(self, s):
        eps = self.getEpsilon()         
        global frames; frames = frames + 1

        if random.random() < eps:
            return random.randint(0, NUM_ACTIONS-1)

        else:
            s = np.array([s])
            p = brain.predict_p(s)[0]

            # a = np.argmax(p)
            a = np.random.choice(NUM_ACTIONS, p=p)

            return a

    def train(self, s, a, r, s_):
        def get_sample(memory, n):
            s, a, _, _  = memory[0]
            _, _, _, s_ = memory[n-1]

            return s, a, self.R, s_

        a_cats = np.zeros(NUM_ACTIONS)  # turn action into one-hot representation
        a_cats[a] = 1 

        self.memory.append( (s, a_cats, r, s_) )

        self.R = ( self.R + r * GAMMA_N ) / GAMMA

        if s_ is None:
            while len(self.memory) > 0:
                n = len(self.memory)
                s, a, r, s_ = get_sample(self.memory, n)
                brain.train_push(s, a, r, s_)

                self.R = ( self.R - self.memory[0][2] ) / GAMMA
                self.memory.pop(0)      

            self.R = 0

        if len(self.memory) >= N_STEP_RETURN:
            s, a, r, s_ = get_sample(self.memory, N_STEP_RETURN)
            brain.train_push(s, a, r, s_)

            self.R = self.R - self.memory[0][2]
            self.memory.pop(0)  

    # possible edge case - if an episode ends in <N steps, the computation is incorrect

#---------
class Environment(threading.Thread):
    stop_signal = False

    def __init__(self, instancenumber=0, render=False, eps_start=EPS_START, eps_end=EPS_STOP, eps_steps=EPS_STEPS):
        threading.Thread.__init__(self)

        self.render = render

        #-- create btgym environment
        self.env = BTgymEnv(filename='../examples/data/DAT_ASCII_EURUSD_M1_2016.csv',
                          start_weekdays=[0, 1, 2],
                          episode_len_days=1,
                          episode_len_hours=23,
                          episode_len_minutes=55,
                          start_00=True,
                          start_cash=100,
                          broker_commission=0.002,
                          fixed_stake=10,
                          drawdown_call=30,
                          state_shape=(4,30),
                          state_low=None,
                          state_high=None,
                          port=5555 + instancenumber,
                          verbose=1,)

        self.agent = Agent(eps_start, eps_end, eps_steps)

    def runEpisode(self):
        s = self.env.reset()
        print(s)

        R = 0
        while True:         
            time.sleep(THREAD_DELAY) # yield 

            if self.render: self.env.render()

            a = self.agent.act(s)
            s_, r, done, info = self.env.step(a)

            if done: # terminal state
                s_ = None

            self.agent.train(s, a, r, s_)

            s = s_
            R += r

            if done or self.stop_signal:
                break

        print("Total R:", R)

    def run(self):
        while not self.stop_signal:
            self.runEpisode()

    def stop(self):
        self.stop_signal = True

#---------
class Optimizer(threading.Thread):
    stop_signal = False

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        while not self.stop_signal:
            brain.optimize()

    def stop(self):
        self.stop_signal = True

#-- main
env_test = Environment(render=True, instancenumber=0, eps_start=0., eps_end=0.)
print(env_test.env.observation_space.shape[0])
NUM_STATE = env_test.env.observation_space.shape[0]
NUM_ACTIONS = env_test.env.action_space.n
NONE_STATE = np.zeros(NUM_STATE)

brain = Brain() # brain is global in A3C

envs = [Environment(instancenumber=i) for i in range(THREADS)]
opts = [Optimizer() for i in range(OPTIMIZERS)]

for o in opts:
    o.start()

for e in envs:
    e.start()

time.sleep(RUN_TIME)

for e in envs:
    e.stop()
for e in envs:
    e.join()

for o in opts:
    o.stop()
for o in opts:
    o.join()

print("Training finished")
env_test.run()
Kismuz commented 7 years ago

@shokimble, Just use tf.squeeze() method to get rid of this redundant dimension (tensor), or use [0,:,:] indexing for numpy array, but..

... a3c right from the start... that's quite complex algorithm with a lot of moving parts. I don't think it will work with btgym right out of the box, at least:

  1. you should set different environment communication port for every worker, it will mess up otherwise, -- as a3c launches separate environment [and server] instance for every one.

  2. There is essential to featurise and normalize environment state representation in some way, original price signal is unbounded and non-stationary.

  3. A3C is [comparatively] fast but less sample-efficient than DQN family, need to keep it in mind.

Download it here: https://yadi.sk/d/9140mMaC3Kx8aL

shokimble commented 7 years ago

Like I said still learning 😄

Squeeze looks like a no go. Not working still.

I have point 1 sorted, new port for each instance. It might be 2 that is the problem.

I'll try your example.

I wouldn't expect any RL to be able to beat the market (especially in FX). It really wasn't my starting point. I'm really interested in applications around the periphery like position sizing and risk etc.

Also very interested when it comes to games but that's a different but related ballgame. You might have noticed my posts on trying to get StreetFighter 2 working. I have spent too much of my life trying to master that game and the reason why it's so hard is it was originally built with a simple feedback AI so to be able to beat it is also the kind of accomplishment I am after.

Kismuz commented 7 years ago

@shokimble, Since single space state is (4,30), batch input tensor should be: ( None, 4, 30).

Change here:

  1. in MAIN:
    
    #-- main
    env_test = Environment(render=True, instancenumber=0, eps_start=0., eps_end=0.)
    print(env_test.env.observation_space.shape)
    (NUM_STATE_0, NUM_STATE_1)  = env_test.env.observation_space.shape
in class Brain:

..................... def _build_model(self): .................... l_input = Input( batch_shape=(None, NUM_STATE_0, NUM_STATE_1) )