325 lines
16 KiB
Python
325 lines
16 KiB
Python
|
# -------------------------------------------------------#
|
||
|
# Скрипт для созданиия и сравнительного тестирования #
|
||
|
# нескольких моделей на одном наборе данных. #
|
||
|
# В скрипте создается три модели: #
|
||
|
# - 2-х мерный свёрточный слой #
|
||
|
# - реккурентная сеть с LSTM блоком #
|
||
|
# - Multi-Head Self-Attention #
|
||
|
# При обучении моделей из обучающей выборки выделяется #
|
||
|
# 1% выборки для валидации результатов. #
|
||
|
# После обучения проводится проверка работоспособности #
|
||
|
# модели на тестовой выборке (отдельный файл данных) #
|
||
|
# -------------------------------------------------------#
|
||
|
# Импорт библиотек
|
||
|
import os
|
||
|
import pandas as pd
|
||
|
import numpy as np
|
||
|
import tensorflow as tf
|
||
|
from tensorflow import keras
|
||
|
import matplotlib.pyplot as plt
|
||
|
import MetaTrader5 as mt5
|
||
|
|
||
|
# Подключаемся к терминалу MetaTrader 5
|
||
|
if not mt5.initialize():
|
||
|
print("initialize() failed, error code =",mt5.last_error())
|
||
|
quit()
|
||
|
# Запрашиваем путь в "песочницу"
|
||
|
path=os.path.join(mt5.terminal_info().data_path,r'MQL5\Files')
|
||
|
mt5.shutdown()
|
||
|
# Загрузка обучающей выборки
|
||
|
filename = os.path.join(path,'study_data.csv')
|
||
|
data = np.asarray( pd.read_table(filename,
|
||
|
sep=',',
|
||
|
header=None,
|
||
|
skipinitialspace=True,
|
||
|
encoding='utf-8',
|
||
|
float_precision='high',
|
||
|
dtype=np.float64,
|
||
|
low_memory=False))
|
||
|
|
||
|
# Разделение обучающей выборки на исходные данные и цели
|
||
|
inputs=data.shape[1]-2
|
||
|
targerts=2
|
||
|
train_data=data[:,0:inputs]
|
||
|
train_target=data[:,inputs:]
|
||
|
|
||
|
# Модель с 2-х мерным свёрточным слоем
|
||
|
model3 = keras.Sequential([keras.layers.InputLayer(input_shape=inputs),
|
||
|
# Переформатируем тензор в 4-х мерный. Указываем 3 измерения, т.к. 4-е измерение определяется размером пакета
|
||
|
keras.layers.Reshape((-1,4,1)),
|
||
|
# Свёрточный слой с 8-ю фильтрами
|
||
|
keras.layers.Conv2D(8,(3,1),1,activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
# Подвыборочный слой
|
||
|
keras.layers.MaxPooling2D((2,1),strides=1),
|
||
|
# Переформатируем тензор в 2-х мерный для полносвязных слоёв
|
||
|
keras.layers.Flatten(),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(targerts, activation=tf.nn.tanh)
|
||
|
])
|
||
|
model3.summary()
|
||
|
#keras.utils.plot_model(model3, show_shapes=True, to_file=os.path.join(path,'model3.png'),dpi=72,show_layer_names=False,rankdir='LR')
|
||
|
|
||
|
# Модель LSTM блок без полносвязных слоёв
|
||
|
model4 = keras.Sequential([keras.layers.InputLayer(input_shape=inputs),
|
||
|
# Переформатируем тензор в 3-х мерный. Указываем 2 измерения, т.к. 3-е измерение определяется размером пакета
|
||
|
keras.layers.Reshape((-1,4)),
|
||
|
# 2 последовательных LSTM блока
|
||
|
# 1-й содержит 40 элементами
|
||
|
keras.layers.LSTM(40,
|
||
|
kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5),
|
||
|
return_sequences=False),
|
||
|
# 2-й выдаёт результат вместо полносвязного слоя
|
||
|
keras.layers.Reshape((-1,2)),
|
||
|
keras.layers.LSTM(targerts)
|
||
|
])
|
||
|
model4.summary()
|
||
|
#keras.utils.plot_model(model4, show_shapes=True, to_file=os.path.join(path,'model4.png'),dpi=72,show_layer_names=False,rankdir='LR')
|
||
|
|
||
|
# Модель Multi-Head Self-Attention
|
||
|
@tf.keras.utils.register_keras_serializable(package="Custom", name='MHAttention')
|
||
|
class MHAttention(tf.keras.layers.Layer):
|
||
|
def __init__(self,key_size, heads, **kwargs):
|
||
|
super(MHAttention, self).__init__(**kwargs)
|
||
|
|
||
|
self.m_iHeads = heads
|
||
|
self.m_iKeysSize = key_size
|
||
|
self.m_iDimension=self.m_iHeads*self.m_iKeysSize;
|
||
|
|
||
|
self.m_cQuerys = tf.keras.layers.Dense(self.m_iDimension)
|
||
|
self.m_cKeys = tf.keras.layers.Dense(self.m_iDimension)
|
||
|
self.m_cValues = tf.keras.layers.Dense(self.m_iDimension)
|
||
|
self.m_cNormAttention=tf.keras.layers.LayerNormalization(epsilon=1e-6)
|
||
|
self.m_cNormOutput=tf.keras.layers.LayerNormalization(epsilon=1e-6)
|
||
|
|
||
|
def build(self, input_shape):
|
||
|
self.m_iWindow=input_shape[-1]
|
||
|
self.m_cW0 = tf.keras.layers.Dense(self.m_iWindow)
|
||
|
self.m_cFF1=tf.keras.layers.Dense(4*self.m_iWindow, activation=tf.nn.swish)
|
||
|
self.m_cFF2=tf.keras.layers.Dense(self.m_iWindow)
|
||
|
|
||
|
def split_heads(self, x, batch_size):
|
||
|
x = tf.reshape(x, (batch_size, -1, self.m_iHeads, self.m_iKeysSize))
|
||
|
return tf.transpose(x, perm=[0, 2, 1, 3])
|
||
|
|
||
|
def call(self, data):
|
||
|
batch_size = tf.shape(data)[0]
|
||
|
|
||
|
query = self.m_cQuerys(data)
|
||
|
key = self.m_cKeys(data)
|
||
|
value = self.m_cValues(data)
|
||
|
|
||
|
query = self.split_heads(query, batch_size)
|
||
|
key = self.split_heads(key, batch_size)
|
||
|
value = self.split_heads(value, batch_size)
|
||
|
|
||
|
score = tf.matmul(query, key, transpose_b=True)
|
||
|
|
||
|
score = score / tf.math.sqrt(tf.cast(self.m_iKeysSize, tf.float32))
|
||
|
score = tf.nn.softmax(score, axis=-1)
|
||
|
|
||
|
attention = tf.matmul(score, value)
|
||
|
|
||
|
attention = tf.transpose(attention, perm=[0, 2, 1, 3])
|
||
|
attention = tf.reshape(attention,(batch_size, -1, self.m_iDimension))
|
||
|
|
||
|
attention = self.m_cW0(attention)
|
||
|
attention=self.m_cNormAttention(data + attention)
|
||
|
|
||
|
output=self.m_cFF1(attention)
|
||
|
output=self.m_cFF2(output)
|
||
|
output=self.m_cNormOutput(attention+output)
|
||
|
return output
|
||
|
|
||
|
def get_config(self):
|
||
|
config={'key_size': self.m_iKeysSize,
|
||
|
'heads': self.m_iHeads,
|
||
|
'dimension': self.m_iDimension,
|
||
|
'window': self.m_iWindow
|
||
|
}
|
||
|
base_config = super(MHAttention, self).get_config()
|
||
|
return dict(list(base_config.items()) + list(config.items()))
|
||
|
|
||
|
@classmethod
|
||
|
def from_config(cls, config):
|
||
|
dimension=config.pop('dimension')
|
||
|
window=config.pop('window')
|
||
|
layer = cls(**config)
|
||
|
layer._build_from_signature(dimension, window)
|
||
|
return layer
|
||
|
|
||
|
def _build_from_signature(self, dimension, window):
|
||
|
self.m_iDimension=dimension
|
||
|
self.m_iWindow=window
|
||
|
|
||
|
heads=8
|
||
|
key_dimension=4
|
||
|
|
||
|
model5 = keras.Sequential([keras.layers.InputLayer(input_shape=inputs),
|
||
|
# Переформатируем тензор в 3-х мерный. Указываем 2 измерения,
|
||
|
# т.к. 3-е измерение определяется размером пакета
|
||
|
# первое измерени - элементы последовательности
|
||
|
# второе измерение - ветор описаниие одного элемента
|
||
|
keras.layers.Reshape((-1,4)),
|
||
|
MHAttention(key_dimension,heads),
|
||
|
# Переформатируем тензор в 2-х мерный для полносвязных слоёв
|
||
|
keras.layers.Flatten(),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(targerts, activation=tf.nn.tanh)
|
||
|
])
|
||
|
model5.summary()
|
||
|
#keras.utils.plot_model(model5, show_shapes=True, to_file=os.path.join(path,'model5.png'),dpi=72,show_layer_names=False,rankdir='LR')
|
||
|
|
||
|
model6 = keras.Sequential([keras.layers.InputLayer(input_shape=inputs),
|
||
|
# Переформатируем тензор в 3-х мерный. Указываем 2 измерения,
|
||
|
# т.к. 3-е измерение определяется размером пакета
|
||
|
# первое измерени - элементы последовательности
|
||
|
# второе измерение - ветор описаниие одного элемента
|
||
|
keras.layers.Reshape((-1,4)),
|
||
|
MHAttention(key_dimension,heads),
|
||
|
MHAttention(key_dimension,heads),
|
||
|
MHAttention(key_dimension,heads),
|
||
|
MHAttention(key_dimension,heads),
|
||
|
# Переформатируем тензор в 2-х мерный для полносвязных слоёв
|
||
|
keras.layers.Flatten(),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish,
|
||
|
kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish,
|
||
|
kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(40, activation=tf.nn.swish,
|
||
|
kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)),
|
||
|
keras.layers.Dense(targerts, activation=tf.nn.tanh)
|
||
|
])
|
||
|
model6.summary()
|
||
|
|
||
|
model3.compile(optimizer='Adam',
|
||
|
loss='mean_squared_error',
|
||
|
metrics=['accuracy'])
|
||
|
|
||
|
model4.compile(optimizer='Adam',
|
||
|
loss='mean_squared_error',
|
||
|
metrics=['accuracy'])
|
||
|
model5.compile(optimizer='Adam',
|
||
|
loss='mean_squared_error',
|
||
|
metrics=['accuracy'])
|
||
|
|
||
|
#model5=keras.models.load_model(os.path.join(path,'attention.h5'))
|
||
|
|
||
|
model6.compile(optimizer='Adam',
|
||
|
loss='mean_squared_error',
|
||
|
metrics=['accuracy'])
|
||
|
|
||
|
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=5)
|
||
|
|
||
|
history3 = model3.fit(train_data, train_target,
|
||
|
epochs=500, batch_size=1000,
|
||
|
callbacks=[callback],
|
||
|
verbose=2,
|
||
|
validation_split=0.01,
|
||
|
shuffle=True)
|
||
|
model3.save(os.path.join(path,'conv2d'))
|
||
|
|
||
|
history4 = model4.fit(train_data, train_target,
|
||
|
epochs=500, batch_size=1000,
|
||
|
callbacks=[callback],
|
||
|
verbose=2,
|
||
|
validation_split=0.01,
|
||
|
shuffle=False)
|
||
|
model4.save(os.path.join(path,'rnn'))
|
||
|
|
||
|
history5 = model5.fit(train_data, train_target,
|
||
|
epochs=500, batch_size=1000,
|
||
|
callbacks=[callback],
|
||
|
verbose=2,
|
||
|
validation_split=0.01,
|
||
|
shuffle=True)
|
||
|
model5.save(os.path.join(path,'attention'))
|
||
|
|
||
|
history6 = model6.fit(train_data, train_target,
|
||
|
epochs=500, batch_size=1000,
|
||
|
callbacks=[callback],
|
||
|
verbose=2,
|
||
|
validation_split=0.01,
|
||
|
shuffle=True)
|
||
|
model6.save(os.path.join(path,'attention2'))
|
||
|
|
||
|
# Отрисовка результатов обучения моделей
|
||
|
plt.figure()
|
||
|
plt.plot(history3.history['loss'], label='Conv2D Train')
|
||
|
plt.plot(history3.history['val_loss'], label='Conv2D Validation')
|
||
|
plt.plot(history4.history['loss'], label='LSTM only Train')
|
||
|
plt.plot(history4.history['val_loss'], label='LSTM only Validation')
|
||
|
plt.plot(history5.history['loss'], label='MH Attention Train')
|
||
|
plt.plot(history5.history['val_loss'], label='MH Attention Validation')
|
||
|
plt.plot(history6.history['loss'], label='MH Attention 4l Train')
|
||
|
plt.plot(history6.history['val_loss'], label='MH Attention 4l Validation')
|
||
|
plt.ylabel('$MSE$ $Loss$')
|
||
|
plt.xlabel('$Epochs$')
|
||
|
plt.title('Dinamic of Models train')
|
||
|
plt.legend(loc='upper right',fontsize='x-small',ncol=2)
|
||
|
|
||
|
plt.figure()
|
||
|
plt.plot(history3.history['accuracy'], label='Conv2D Train')
|
||
|
plt.plot(history3.history['val_accuracy'], label='Conv2D Validation')
|
||
|
plt.plot(history4.history['accuracy'], label='LSTM only Train')
|
||
|
plt.plot(history4.history['val_accuracy'], label='LSTM only Validation')
|
||
|
plt.plot(history5.history['accuracy'], label='MH Attention Train')
|
||
|
plt.plot(history5.history['val_accuracy'], label='MH Attention Validation')
|
||
|
plt.plot(history6.history['accuracy'], label='MH Attention 4l Train')
|
||
|
plt.plot(history6.history['val_accuracy'], label='MH Attention 4l Validation')
|
||
|
plt.ylabel('$Accuracy$')
|
||
|
plt.xlabel('$Epochs$')
|
||
|
plt.title('Dinamic of Models train')
|
||
|
plt.legend(loc='lower right',fontsize='x-small',ncol=2)
|
||
|
|
||
|
# Загрузка тестовой выборки
|
||
|
test_filename = os.path.join(path,'test_data.csv')
|
||
|
test = np.asarray( pd.read_table(test_filename,
|
||
|
sep=',',
|
||
|
header=None,
|
||
|
skipinitialspace=True,
|
||
|
encoding='utf-8',
|
||
|
float_precision='high',
|
||
|
dtype=np.float64,
|
||
|
low_memory=False))
|
||
|
# Разделение тестовой выборки на исходные данные и цели
|
||
|
test_data=test[:,0:inputs]
|
||
|
test_target=test[:,inputs:]
|
||
|
|
||
|
# Проверка результатов моделей на тестовой выборке
|
||
|
test_loss3, test_acc3 = model3.evaluate(test_data, test_target, verbose=2)
|
||
|
test_loss4, test_acc4 = model4.evaluate(test_data, test_target, verbose=2)
|
||
|
test_loss5, test_acc5 = model5.evaluate(test_data, test_target, verbose=2)
|
||
|
test_loss6, test_acc6 = model6.evaluate(test_data, test_target, verbose=2)
|
||
|
|
||
|
# Вывод результатов тестирования в журнал
|
||
|
print('Conv2D Model')
|
||
|
print('Test accuracy:', test_acc3)
|
||
|
print('Test loss:', test_loss3)
|
||
|
|
||
|
print('LSTM only Model')
|
||
|
print('Test accuracy:', test_acc4)
|
||
|
print('Test loss:', test_loss4)
|
||
|
|
||
|
print('MH Attention Model')
|
||
|
print('Test accuracy:', test_acc5)
|
||
|
print('Test loss:', test_loss5)
|
||
|
|
||
|
print('MH Attention 4l Model')
|
||
|
print('Test accuracy:', test_acc6)
|
||
|
print('Test loss:', test_loss6)
|
||
|
|
||
|
plt.figure()
|
||
|
plt.bar(['Conv2D','LSTM', 'MH Attention','MH Attention 4l'],[test_loss3,test_loss4,test_loss5,test_loss6])
|
||
|
plt.ylabel('$MSE$ $Loss$')
|
||
|
plt.title('Result of test')
|
||
|
|
||
|
plt.figure()
|
||
|
plt.bar(['Conv2D','LSTM', 'MH Attention','MH Attention 4l'],[ttest_acc3,test_acc4,test_acc5,test_acc6])
|
||
|
plt.ylabel('$Accuracy$')
|
||
|
plt.title('Result of test')
|
||
|
|
||
|
plt.show()
|