forked from lefnire/tforce_btc_trader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
btc_env.py
executable file
·438 lines (372 loc) · 20.4 KB
/
btc_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
"""BTC trading environment. Trains on BTC price history to learn to buy/sell/hold.
This is an environment tailored towards TensorForce, not OpenAI Gym. Gym environments are
a standard used by many projects (Baselines, Coach, etc) and so would make sense to use; and TForce is compatible with
Gym envs. It's just that there's hoops to go through converting a Gym env to TForce, and it was ugly code. I actually
had it that way, you can search through Git if you want the Gym env; but one day I decided "I'm not having success with
any of these other projects, TForce is the best - I'm just gonna stick to that" and this approach was cleaner.
I actually do want to try NervanaSystems/Coach, that one's new since I started developing. Will require converting this
env back to Gym format. Anyone wanna give it a go?
"""
import random, time, requests, pdb, gdax, math, pickle, os, shutil
from scipy.stats import truncnorm
from enum import Enum
import numpy as np
import pandas as pd
import talib.abstract as tlib
from box import Box
from tensorforce.environments import Environment
from tensorforce.execution import Runner
from sklearn import preprocessing
from sklearn.pipeline import make_pipeline
from data.data import Exchange, EXCHANGE
from data import data
from autoencoder import AutoEncoder
class Mode(Enum):
TRAIN = 1
TEST = 2
LIVE = 3
TEST_LIVE = 4
# See 6fc4ed2 for Scaling states/rewards
class BitcoinEnv(Environment):
EPISODE_LEN = 5000
def __init__(self, hypers, cli_args={}):
"""Initialize hyperparameters (done here instead of __init__ since OpenAI-Gym controls instantiation)"""
self.hypers = h = Box(hypers)
self.conv2d = self.hypers['net.type'] == 'conv2d'
self.cli_args = cli_args
# cash/val start @ about $3.5k each. You should increase/decrease depending on how much you'll put into your
# exchange accounts to trade with. Presumably the agent will learn to work with what you've got (cash/value
# are state inputs); but starting capital does effect the learning process.
self.start_cash, self.start_value = .4, .4
# We have these "accumulator" objects, which collect values over steps, over episodes, etc. Easier to keep
# same-named variables separate this way.
self.acc = Box(
episode=dict(
i=0,
total_steps=0,
sharpes=[],
returns=[],
uniques=[],
),
step=dict(), # setup in reset()
tests=dict(
i=0,
n_tests=0
)
)
self.mode = Mode.TRAIN
self.conn = data.engine.connect()
# gdax min order size = .01btc; kraken = .002btc
self.min_trade = {Exchange.GDAX: .01, Exchange.KRAKEN: .002}[EXCHANGE]
self.update_btc_price()
# Our data is too high-dimensional for the way MemoryModel handles batched episodes. Reduce it (don't like this)
all_data = data.db_to_dataframe(self.conn, arbitrage=h.arbitrage)
self.all_observations, self.all_prices = self.xform_data(all_data)
self.all_prices_diff = self.diff(self.all_prices, True)
# Action space
if h.action_type == 'single':
# In single_action we discard any vals b/w [-min_trade, +min_trade] and call it "hold" (in execute())
self.actions_ = dict(type='float', shape=(), min_value=-1., max_value=1.)
elif h.action_type == 'multi':
# In multi-modal, hold is an actual action (in which case we discard "amount")
self.actions_ = dict(
action=dict(type='int', shape=(), num_actions=3),
amount=dict(type='float', shape=(), min_value=0., max_value=1.))
# Observation space
stationary_ct = 2
self.cols_ = self.all_observations.shape[1]
self.states_ = dict(
series=dict(type='float', shape=self.cols_), # all state values that are time-ish
stationary=dict(type='float', shape=stationary_ct) # everything that doesn't care about time
)
if self.conv2d:
# width = step-window (150 time-steps)
# height = nothing (1)
# channels = features/inputs (price actions, OHCLV, etc).
self.states_['series']['shape'] = (h.step_window, 1, self.cols_)
def __str__(self): return 'BitcoinEnv'
def close(self): self.conn.close()
@property
def states(self): return self.states_
@property
def actions(self): return self.actions_
# We don't want random-seeding for reproducibilityy! We _want_ two runs to give different results, because we only
# trust the hyper combo which consistently gives positive results.
def seed(self, seed=None): return
def update_btc_price(self):
try:
self.btc_price = int(requests.get(f"https://api.cryptowat.ch/markets/{EXCHANGE.value}/btcusd/price").json()['result']['price'])
except:
self.btc_price = self.btc_price or 8000
def diff(self, arr, percent=True):
series = pd.Series(arr)
diff = series.pct_change() if percent else series.diff()
diff.iloc[0] = 0 # always NaN, nothing to compare to
# Remove outliers (turn them to NaN)
q = diff.quantile(0.99)
diff = diff.mask(diff > q, np.nan)
# then forward-fill the NaNs.
return diff.replace([np.inf, -np.inf], np.nan).ffill().bfill().values
def xform_data(self, df):
"""
Some special handling of the price data. First, we don't want prices to be absolute, since we wan't the agent
to learn actions _relative_ to states; that is, states need to be transformed into "relative" some how. This
is called "stationary time series"; they fluctuate around y=0, like visualizing audio rather than a line graph.
Next, we don't want absolute price changes, since that's still not relative enough (prices change in larger
amounts when the BTC price is already large - we want to learn the pattern, not the numbers). So the solution
is percent-changes. Now - making everything a percent-change from its past makes it so you can track that
field's history, but you lose how it relates to the other fields in its cross-section. So here's what we do.
Anchor all the price fields to the target (close-price); so they're relative w/i the cross-section. Then set
target to its percent-change over time. Leave the volume stuff alone, we _do_ want that absolute. Then scale
everything. Crazy, I know; but IMO makes sense. Hit me if you have a better idea.
"""
columns = []
ind_ct = self.hypers.indicators_count
tables_ = data.get_tables(self.hypers.arbitrage)
for table in tables_:
for col in table['cols']:
name_col = f'{table["name"]}_{col}'
if name_col == data.target:
columns.append(self.diff(df[name_col], True))
elif col in table['price_cols']:
columns.append(df[name_col]/df[data.target])
else:
columns.append(df[name_col])
# Add extra indicator columns
ohlcv = table.get('ohlcv', {})
if ohlcv and ind_ct:
ind = pd.DataFrame()
# TA-Lib requires specifically-named columns (OHLCV)
for k, v in ohlcv.items():
ind[k] = df[f"{name}_{v}"]
# Sort these by effectiveness. I'm no expert, so if this seems off please submit a PR! Later after
# you've optimized the other hypers, come back here and create a hyper for every indicator you want to
# try (zoom in on indicators)
best_indicators = [
tlib.MOM,
tlib.SMA,
# tlib.BBANDS, # TODO signature different; special handling
tlib.RSI,
tlib.EMA,
tlib.ATR
]
for i in range(ind_ct):
columns.append(best_indicators[i](ind, timeperiod=self.hypers.indicators_window) / df[data.target])
states = np.column_stack(columns)
prices = df[data.target].values
# Remove padding at the start of all data. Indicators are aggregate fns, so don't count until we have
# that much historical data
if ind_ct:
states = states[self.hypers.indicators_window:]
prices = prices[self.hypers.indicators_window:]
# Pre-scale all price actions up-front, since they don't change. We'll scale changing values real-time elsewhere
states = preprocessing.robust_scale(states, quantile_range=(1., 99.))
# Reducing the dimensionality of our states (OHLCV + indicators + arbitrage => 5 or 6 weights)
# because TensorForce's memory branch changed Policy Gradient models' batching from timesteps to episodes.
# This takes of way too much GPU RAM for us, so we had to cut back in quite a few areas (num steps to train
# per episode, episode batch_size, and especially states:
if self.cli_args.autoencode:
ae = AutoEncoder()
states = ae.fit_transform_tied(states)
return states, prices
def use_dataset(self, mode, full_set=False):
"""Fetches, transforms, and stores the portion of data you'll be working with (ie, 80% train data, 20% test
data, or the live database). Make sure to call this before reset()!
"""
self.mode = mode
if mode in (Mode.LIVE, Mode.TEST_LIVE):
self.conn = data.engine_live.connect()
# Work with 6000 timesteps up until the present (play w/ diff numbers, depends on LSTM)
# Offset=0 data.py currently pulls recent-to-oldest, then reverses
rampup = int(1e5) # 6000 # FIXME temporarily using big number to build up Scaler (since it's not saved)
limit, offset = (rampup, 0) # if not self.conv2d else (self.hypers.step_window + 1, 0)
df, self.last_timestamp = data.db_to_dataframe(
self.conn, limit=limit, offset=offset, arbitrage=self.hypers.arbitrage, last_timestamp=True)
# save away for now so we can keep transforming it as we add new data (find a more efficient way)
self.df = df
else:
row_ct = data.count_rows(self.conn, arbitrage=self.hypers.arbitrage)
split = .9 # Using 90% training data.
n_train, n_test = int(row_ct * split), int(row_ct * (1 - split))
if mode == mode.TEST:
offset = n_train
limit = 40000 if full_set else 10000 # should be `n_test` in full_set, getting idx errors
else:
# Grab a random window from the 90% training data. The random bit is important so the agent
# sees a variety of data. The window-size bit is a hack: as long as the agent doesn't die (doesn't cause
# `terminal=True`), PPO's MemoryModel can keep filling up until it crashes TensorFlow. This ensures
# there's a stopping point (limit). I'd rather see how far he can get w/o dying, figure out a solution.
limit = self.EPISODE_LEN
offset_start = 0 if not self.conv2d else self.hypers.step_window + 1
offset = random.randint(offset_start, n_train - self.EPISODE_LEN)
self.offset, self.limit = offset, limit
self.prices = self.all_prices[offset:offset+limit]
self.prices_diff = self.all_prices_diff[offset:offset+limit]
def get_next_state(self, i, stationary):
i = i + self.offset
series = self.all_observations[i]
if self.conv2d:
# Take note of the +1 here. LSTM uses a single index [i], which grabs the list's end. Conv uses a window,
# [-something:i], which _excludes_ the list's end (due to Python indexing). Without this +1, conv would
# have a 1-step-behind delayed response.
window = self.all_observations[i - self.hypers.step_window + 1:i + 1]
series = np.expand_dims(window, axis=1)
return dict(series=series, stationary=stationary)
def reset(self):
step_acc, ep_acc = self.acc.step, self.acc.episode
step_acc.i = 0
step_acc.cash, step_acc.value = self.start_cash, self.start_value
step_acc.hold_value = self.start_value
step_acc.totals = Box(
trade=[self.start_cash + self.start_value],
hold=[self.start_cash + self.start_value]
)
step_acc.signals = []
ep_acc.i += 1
stationary = [1., 1.]
return self.get_next_state(0, stationary)
def execute(self, actions):
step_acc, ep_acc = self.acc.step, self.acc.episode
totals = step_acc.totals
h = self.hypers
if h.action_type == 'single':
act_pct = actions
elif h.action_type == 'multi':
# Two actions: `action` (buy/sell/hold) and `amount` (how much)
act_pct = {
0: -1, # make amount negative
1: 0, # hold
2: 1 # make amount positive
}[actions['action']] * actions['amount']
# multi-action min_trade accounted for in constructor
act_btc = act_pct * (step_acc.cash if act_pct > 0 else step_acc.value)
fee = {
Exchange.GDAX: 0.0025, # https://support.gdax.com/customer/en/portal/articles/2425097-what-are-the-fees-on-gdax-
Exchange.KRAKEN: 0.0026 # https://www.kraken.com/en-us/help/fees
}[EXCHANGE]
# Perform the trade. In training mode, we'll let it dip into negative here, but then kill and punish below.
# In testing/live, we'll just block the trade if they can't afford it
if act_pct > 0:
if step_acc.cash < self.min_trade:
act_btc = -(self.start_cash + self.start_value)
elif act_btc < self.min_trade:
act_btc = 0
else:
step_acc.value += act_btc - act_btc*fee
step_acc.cash -= act_btc
elif act_pct < 0:
if step_acc.value < self.min_trade:
act_btc = -(self.start_cash + self.start_value)
elif abs(act_btc) < self.min_trade:
act_btc = 0
else:
step_acc.cash += abs(act_btc) - abs(act_btc)*fee
step_acc.value -= abs(act_btc)
step_acc.signals.append(float(act_btc))
# next delta. [1,2,2].pct_change() == [NaN, 1, 0]
pct_change = self.prices_diff[step_acc.i + 1]
step_acc.value += pct_change * step_acc.value
total_now = step_acc.value + step_acc.cash
totals.trade.append(total_now)
# calculate what the reward would be "if I held", to calculate the actual reward's _advantage_ over holding
hold_before = step_acc.hold_value
step_acc.hold_value += pct_change * hold_before
totals.hold.append(step_acc.hold_value + self.start_cash)
reward = 0
step_acc.i += 1
ep_acc.total_steps += 1
stationary = [step_acc.cash/self.start_cash, step_acc.value/self.start_value]
next_state = self.get_next_state(step_acc.i, stationary)
terminal = int(step_acc.i + 1 >= self.limit)
if step_acc.value < 0 or step_acc.cash < 0:
terminal = True
if terminal and self.mode in (Mode.TRAIN, Mode.TEST):
# We're done.
step_acc.signals.append(0) # Add one last signal (to match length)
reward = self.sharpe()
if terminal and self.mode in (Mode.LIVE, Mode.TEST_LIVE):
# See 6fc4ed2 for prior live-mode code which worked. Much has changed since then and it won't work in
# that state, so removing and leaving to you to fix (and submit PR please!)
raise NotImplementedError
# if step_acc.value <= 0 or step_acc.cash <= 0: terminal = 1
return next_state, terminal, reward
def sharpe(self):
"""https://www.investopedia.com/terms/s/sharperatio.asp
= (portfolio_return - risk_free_rate) / (portfolio_std - risk_free_std)
"""
totals = self.acc.step.totals
if np.count_nonzero(totals.trade) == 0:
return 0.
diff = (pd.Series(totals.trade).pct_change() - pd.Series(totals.hold).pct_change())[1:]
numerator = self.cumm_return() # diff.mean()
denominator = 1 # FIXME cumm_return / std() in different scales, getting funky results
# denominator = pd.Series(totals.trade).pct_change().std() - pd.Series(totals.hold).pct_change.std()
return numerator / denominator
def cumm_return(self):
totals = self.acc.step.totals
return (totals.trade[-1] / totals.trade[0] - 1) - (totals.hold[-1] / totals.hold[0] - 1)
def episode_finished(self, runner):
step_acc, ep_acc, test_acc = self.acc.step, self.acc.episode, self.acc.tests
signals = step_acc.signals
totals = step_acc.totals
n_uniques = float(len(np.unique(signals)))
sharpe = self.sharpe()
cumm_ret = self.cumm_return()
ep_acc.sharpes.append(float(sharpe))
ep_acc.returns.append(float(cumm_ret))
ep_acc.uniques.append(n_uniques)
# Print (limit to note-worthy)
lt_0 = len([s for s in signals if s < 0])
eq_0 = len([s for s in signals if s == 0])
gt_0 = len([s for s in signals if s > 0])
completion = int(test_acc.i / test_acc.n_tests * 100)
steps = f"\tSteps: {step_acc.i}"
print(f"{completion}%{steps}\tSharpe: {'%.3f'%sharpe}\tReturn: {'%.3f'%cumm_ret}\tTrades:\t{lt_0}[<0]\t{eq_0}[=0]\t{gt_0}[>0]")
return True
def run_deterministic(self, runner, print_results=True):
next_state, terminal = self.reset(), False
while not terminal:
next_state, terminal, reward = self.execute(runner.agent.act(next_state, deterministic=True, independent=True))
if print_results: self.episode_finished(None)
def train_and_test(self, agent, n_steps, n_tests, early_stop):
test_acc = self.acc.tests
n_steps = n_steps * 10000
test_acc.n_tests = n_tests
test_acc.i = 0
timesteps_each = n_steps // n_tests
runner = Runner(agent=agent, environment=self)
try:
while test_acc.i <= n_tests:
self.use_dataset(Mode.TRAIN)
# max_episode_timesteps not required, since we kill on (cash|value)<0 or max_repeats
runner.run(timesteps=timesteps_each)
self.use_dataset(Mode.TEST)
self.run_deterministic(runner, print_results=True)
if early_stop > 0:
sharpes = np.array(self.acc.episode.sharpes[-early_stop:])
if test_acc.i >= early_stop and np.all(sharpes > 0):
test_acc.i = n_tests
test_acc.i += 1
except KeyboardInterrupt:
# Lets us kill training with Ctrl-C and skip straight to the final test. This is useful in case you're
# keeping an eye on terminal and see "there! right there, stop you found it!" (where early_stop & n_steps
# are the more methodical approaches)
pass
# On last "how would it have done IRL?" run, without getting in the way (no killing on repeats, 0-balance)
print('Running no-kill test-set')
self.use_dataset(Mode.TEST, full_set=True)
self.run_deterministic(runner, print_results=True)
def run_live(self, agent, test=True):
self.live_at_head = False
self.acc.tests.n_tests = 1 # not used (but referenced for %completion)
gdax_conf = data.config_json['GDAX']
self.gdax_client = gdax.AuthenticatedClient(gdax_conf['key'], gdax_conf['b64secret'], gdax_conf['passphrase'])
# self.gdax_client = gdax.AuthenticatedClient(gdax_conf['key'], gdax_conf['b64secret'], gdax_conf['passphrase'],
# api_url="https://api-public.sandbox.gdax.com")
accounts = self.gdax_client.get_accounts()
self.start_cash = float([a for a in accounts if a['currency'] == 'USD'][0]['balance']) / self.btc_price
self.start_value = float([a for a in accounts if a['currency'] == 'BTC'][0]['balance'])
print(f'Starting total: {self.start_cash + self.start_value}')
runner = Runner(agent=agent, environment=self)
self.use_dataset(Mode.TEST_LIVE if test else Mode.LIVE)
self.run_deterministic(runner, print_results=True)