# -------------------------------------------------------# # Скрипт для созданиия и сравнительного тестирования # # нескольких моделей на одном наборе данных. # # В скрипте создается три модели: # # - 2-х мерный свёрточный слой # # - реккурентная сеть с LSTM блоком # # - Multi-Head Self-Attention # # При обучении моделей из обучающей выборки выделяется # # 1% выборки для валидации результатов. # # После обучения проводится проверка работоспособности # # модели на тестовой выборке (отдельный файл данных) # # -------------------------------------------------------# # Импорт библиотек import os import pandas as pd import numpy as np import tensorflow as tf from tensorflow import keras import matplotlib.pyplot as plt import MetaTrader5 as mt5 # Подключаемся к терминалу MetaTrader 5 if not mt5.initialize(): print("initialize() failed, error code =",mt5.last_error()) quit() # Запрашиваем путь в "песочницу" path=os.path.join(mt5.terminal_info().data_path,r'MQL5\Files') mt5.shutdown() # Загрузка обучающей выборки filename = os.path.join(path,'study_data.csv') data = np.asarray( pd.read_table(filename, sep=',', header=None, skipinitialspace=True, encoding='utf-8', float_precision='high', dtype=np.float64, low_memory=False)) # Разделение обучающей выборки на исходные данные и цели inputs=data.shape[1]-2 targerts=2 train_data=data[:,0:inputs] train_target=data[:,inputs:] # Модель с 2-х мерным свёрточным слоем model3 = keras.Sequential([keras.layers.InputLayer(input_shape=inputs), # Переформатируем тензор в 4-х мерный. Указываем 3 измерения, т.к. 4-е измерение определяется размером пакета keras.layers.Reshape((-1,4,1)), # Свёрточный слой с 8-ю фильтрами keras.layers.Conv2D(8,(3,1),1,activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), # Подвыборочный слой keras.layers.MaxPooling2D((2,1),strides=1), # Переформатируем тензор в 2-х мерный для полносвязных слоёв keras.layers.Flatten(), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(targerts, activation=tf.nn.tanh) ]) model3.summary() #keras.utils.plot_model(model3, show_shapes=True, to_file=os.path.join(path,'model3.png'),dpi=72,show_layer_names=False,rankdir='LR') # Модель LSTM блок без полносвязных слоёв model4 = keras.Sequential([keras.layers.InputLayer(input_shape=inputs), # Переформатируем тензор в 3-х мерный. Указываем 2 измерения, т.к. 3-е измерение определяется размером пакета keras.layers.Reshape((-1,4)), # 2 последовательных LSTM блока # 1-й содержит 40 элементами keras.layers.LSTM(40, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5), return_sequences=False), # 2-й выдаёт результат вместо полносвязного слоя keras.layers.Reshape((-1,2)), keras.layers.LSTM(targerts) ]) model4.summary() #keras.utils.plot_model(model4, show_shapes=True, to_file=os.path.join(path,'model4.png'),dpi=72,show_layer_names=False,rankdir='LR') # Модель Multi-Head Self-Attention @tf.keras.utils.register_keras_serializable(package="Custom", name='MHAttention') class MHAttention(tf.keras.layers.Layer): def __init__(self,key_size, heads, **kwargs): super(MHAttention, self).__init__(**kwargs) self.m_iHeads = heads self.m_iKeysSize = key_size self.m_iDimension=self.m_iHeads*self.m_iKeysSize; self.m_cQuerys = tf.keras.layers.Dense(self.m_iDimension) self.m_cKeys = tf.keras.layers.Dense(self.m_iDimension) self.m_cValues = tf.keras.layers.Dense(self.m_iDimension) self.m_cNormAttention=tf.keras.layers.LayerNormalization(epsilon=1e-6) self.m_cNormOutput=tf.keras.layers.LayerNormalization(epsilon=1e-6) def build(self, input_shape): self.m_iWindow=input_shape[-1] self.m_cW0 = tf.keras.layers.Dense(self.m_iWindow) self.m_cFF1=tf.keras.layers.Dense(4*self.m_iWindow, activation=tf.nn.swish) self.m_cFF2=tf.keras.layers.Dense(self.m_iWindow) def split_heads(self, x, batch_size): x = tf.reshape(x, (batch_size, -1, self.m_iHeads, self.m_iKeysSize)) return tf.transpose(x, perm=[0, 2, 1, 3]) def call(self, data): batch_size = tf.shape(data)[0] query = self.m_cQuerys(data) key = self.m_cKeys(data) value = self.m_cValues(data) query = self.split_heads(query, batch_size) key = self.split_heads(key, batch_size) value = self.split_heads(value, batch_size) score = tf.matmul(query, key, transpose_b=True) score = score / tf.math.sqrt(tf.cast(self.m_iKeysSize, tf.float32)) score = tf.nn.softmax(score, axis=-1) attention = tf.matmul(score, value) attention = tf.transpose(attention, perm=[0, 2, 1, 3]) attention = tf.reshape(attention,(batch_size, -1, self.m_iDimension)) attention = self.m_cW0(attention) attention=self.m_cNormAttention(data + attention) output=self.m_cFF1(attention) output=self.m_cFF2(output) output=self.m_cNormOutput(attention+output) return output def get_config(self): config={'key_size': self.m_iKeysSize, 'heads': self.m_iHeads, 'dimension': self.m_iDimension, 'window': self.m_iWindow } base_config = super(MHAttention, self).get_config() return dict(list(base_config.items()) + list(config.items())) @classmethod def from_config(cls, config): dimension=config.pop('dimension') window=config.pop('window') layer = cls(**config) layer._build_from_signature(dimension, window) return layer def _build_from_signature(self, dimension, window): self.m_iDimension=dimension self.m_iWindow=window heads=8 key_dimension=4 model5 = keras.Sequential([keras.layers.InputLayer(input_shape=inputs), # Переформатируем тензор в 3-х мерный. Указываем 2 измерения, # т.к. 3-е измерение определяется размером пакета # первое измерени - элементы последовательности # второе измерение - ветор описаниие одного элемента keras.layers.Reshape((-1,4)), MHAttention(key_dimension,heads), # Переформатируем тензор в 2-х мерный для полносвязных слоёв keras.layers.Flatten(), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(targerts, activation=tf.nn.tanh) ]) model5.summary() #keras.utils.plot_model(model5, show_shapes=True, to_file=os.path.join(path,'model5.png'),dpi=72,show_layer_names=False,rankdir='LR') model6 = keras.Sequential([keras.layers.InputLayer(input_shape=inputs), # Переформатируем тензор в 3-х мерный. Указываем 2 измерения, # т.к. 3-е измерение определяется размером пакета # первое измерени - элементы последовательности # второе измерение - ветор описаниие одного элемента keras.layers.Reshape((-1,4)), MHAttention(key_dimension,heads), MHAttention(key_dimension,heads), MHAttention(key_dimension,heads), MHAttention(key_dimension,heads), # Переформатируем тензор в 2-х мерный для полносвязных слоёв keras.layers.Flatten(), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(40, activation=tf.nn.swish, kernel_regularizer=keras.regularizers.l1_l2(l1=1e-7, l2=1e-5)), keras.layers.Dense(targerts, activation=tf.nn.tanh) ]) model6.summary() model3.compile(optimizer='Adam', loss='mean_squared_error', metrics=['accuracy']) model4.compile(optimizer='Adam', loss='mean_squared_error', metrics=['accuracy']) model5.compile(optimizer='Adam', loss='mean_squared_error', metrics=['accuracy']) #model5=keras.models.load_model(os.path.join(path,'attention.h5')) model6.compile(optimizer='Adam', loss='mean_squared_error', metrics=['accuracy']) callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=5) history3 = model3.fit(train_data, train_target, epochs=500, batch_size=1000, callbacks=[callback], verbose=2, validation_split=0.01, shuffle=True) model3.save(os.path.join(path,'conv2d')) history4 = model4.fit(train_data, train_target, epochs=500, batch_size=1000, callbacks=[callback], verbose=2, validation_split=0.01, shuffle=False) model4.save(os.path.join(path,'rnn')) history5 = model5.fit(train_data, train_target, epochs=500, batch_size=1000, callbacks=[callback], verbose=2, validation_split=0.01, shuffle=True) model5.save(os.path.join(path,'attention')) history6 = model6.fit(train_data, train_target, epochs=500, batch_size=1000, callbacks=[callback], verbose=2, validation_split=0.01, shuffle=True) model6.save(os.path.join(path,'attention2')) # Отрисовка результатов обучения моделей plt.figure() plt.plot(history3.history['loss'], label='Conv2D Train') plt.plot(history3.history['val_loss'], label='Conv2D Validation') plt.plot(history4.history['loss'], label='LSTM only Train') plt.plot(history4.history['val_loss'], label='LSTM only Validation') plt.plot(history5.history['loss'], label='MH Attention Train') plt.plot(history5.history['val_loss'], label='MH Attention Validation') plt.plot(history6.history['loss'], label='MH Attention 4l Train') plt.plot(history6.history['val_loss'], label='MH Attention 4l Validation') plt.ylabel('$MSE$ $Loss$') plt.xlabel('$Epochs$') plt.title('Dinamic of Models train') plt.legend(loc='upper right',fontsize='x-small',ncol=2) plt.figure() plt.plot(history3.history['accuracy'], label='Conv2D Train') plt.plot(history3.history['val_accuracy'], label='Conv2D Validation') plt.plot(history4.history['accuracy'], label='LSTM only Train') plt.plot(history4.history['val_accuracy'], label='LSTM only Validation') plt.plot(history5.history['accuracy'], label='MH Attention Train') plt.plot(history5.history['val_accuracy'], label='MH Attention Validation') plt.plot(history6.history['accuracy'], label='MH Attention 4l Train') plt.plot(history6.history['val_accuracy'], label='MH Attention 4l Validation') plt.ylabel('$Accuracy$') plt.xlabel('$Epochs$') plt.title('Dinamic of Models train') plt.legend(loc='lower right',fontsize='x-small',ncol=2) # Загрузка тестовой выборки test_filename = os.path.join(path,'test_data.csv') test = np.asarray( pd.read_table(test_filename, sep=',', header=None, skipinitialspace=True, encoding='utf-8', float_precision='high', dtype=np.float64, low_memory=False)) # Разделение тестовой выборки на исходные данные и цели test_data=test[:,0:inputs] test_target=test[:,inputs:] # Проверка результатов моделей на тестовой выборке test_loss3, test_acc3 = model3.evaluate(test_data, test_target, verbose=2) test_loss4, test_acc4 = model4.evaluate(test_data, test_target, verbose=2) test_loss5, test_acc5 = model5.evaluate(test_data, test_target, verbose=2) test_loss6, test_acc6 = model6.evaluate(test_data, test_target, verbose=2) # Вывод результатов тестирования в журнал print('Conv2D Model') print('Test accuracy:', test_acc3) print('Test loss:', test_loss3) print('LSTM only Model') print('Test accuracy:', test_acc4) print('Test loss:', test_loss4) print('MH Attention Model') print('Test accuracy:', test_acc5) print('Test loss:', test_loss5) print('MH Attention 4l Model') print('Test accuracy:', test_acc6) print('Test loss:', test_loss6) plt.figure() plt.bar(['Conv2D','LSTM', 'MH Attention','MH Attention 4l'],[test_loss3,test_loss4,test_loss5,test_loss6]) plt.ylabel('$MSE$ $Loss$') plt.title('Result of test') plt.figure() plt.bar(['Conv2D','LSTM', 'MH Attention','MH Attention 4l'],[ttest_acc3,test_acc4,test_acc5,test_acc6]) plt.ylabel('$Accuracy$') plt.title('Result of test') plt.show()