-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.py
90 lines (80 loc) · 4.39 KB
/
stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
from attrs import define
import time
from dataclasses import replace
from typing import Callable
from time import time
import matplotlib.pyplot as plt
from compare_funcs import naive, miss_both_changing
from currency import CurrencyList
from async_retriever import currencies_pair_generator, Measurement
@define
class Stream:
""" Стриминг значений фьючерсов """
btc_stream = CurrencyList()
eth_stream = CurrencyList()
new_eth_stream = CurrencyList()
async def _process_values(self, change_func: Callable[[float, float], float], running_time: int | float = None):
""" Вычисляем значения (биткойна, эфира, изменения эфира, нового эфира и т д) в течение running_time """
time_expired = running_time + int(time()) if running_time else float("inf")
async for btc_measurement, eth_measurement in currencies_pair_generator():
if time_expired > time():
btc_diff = self._diff_value(self.btc_stream, btc_measurement)
eth_diff = self._diff_value(self.eth_stream, eth_measurement)
coeff = change_func(btc_diff, eth_diff)
new_measurement_value = eth_measurement.value * (1 + coeff)
new_eth_measurement = replace(eth_measurement,
value=round(new_measurement_value, 2),
currency='NEW_ETH')
self._follow(new_eth_measurement)
else:
break
def _follow(self, measurement: Measurement, percent: float = 1.) -> None:
"""
Проверяем изменение текущего измерения по сравнению с экстремумами,
если изменение больше чем percent процентов, выводит сообщение
"""
stream = self.new_eth_stream
if len(stream):
current_measurement_value = measurement.value
increasing = (current_measurement_value / stream.max_value - 1) * 100
decreasing = - (current_measurement_value / stream.min_value - 1) * 100
if increasing > percent:
stream.max_value = current_measurement_value
stream.min_value = current_measurement_value
print('%sUSDT фьючерс вырос на %s процентов!' % (measurement.currency, increasing))
if decreasing > percent:
stream.max_value = current_measurement_value
stream.min_value = current_measurement_value
print('%sUSDT фьючерс упал на %s процентов!' % (measurement.currency, decreasing))
self.new_eth_stream.append(measurement)
def _diff_value(self, stream: CurrencyList, measurement: Measurement) -> float:
""" Относительное изменение биткойна/эфира """
if len(stream):
previous_measurement_value = stream[-1].value
current_measurement_value = measurement.value
stream.append(measurement)
return current_measurement_value / previous_measurement_value - 1
else:
stream.append(measurement)
return 0
def start(self, change_func: Callable = naive, running_time: int | float = None):
coro = self._process_values(running_time=running_time, change_func=change_func)
asyncio.run(coro)
def _plot_stream(self, stream: CurrencyList):
""" Добавление списка фьючерсов на график """
assert len(stream), 'Empty CurrencyList given'
x = list(range(len(self.eth_stream)))
label = stream[0].currency
plt.plot(x, stream.to_list(), '-o', label=label)
def visualize_eth_values(self):
""" Запускает интерактивный векторный график, после выполнения .start(*args) """
self._plot_stream(self.eth_stream)
self._plot_stream(self.new_eth_stream)
plt.legend()
plt.show()
if __name__ == "__main__":
streaming = Stream()
streaming.start(change_func=miss_both_changing)
# # расскоментить чтобы посмотреть график, но при это необходимо передать running_time в start()
# s.visualize_eth_values()