Zenith-FX/Price_Qiskit_Visual.py

450 lines
17 KiB
Python
Raw Permalink Normal View History

2025-11-14 07:30:33 +01:00
import MetaTrader5 as mt5
import numpy as np
from qiskit import QuantumCircuit, transpile, QuantumRegister, ClassicalRegister
from qiskit_aer import AerSimulator
from Crypto.Hash import SHA256
import pandas as pd
from datetime import datetime, timedelta
import matplotlib
matplotlib.use('Agg') # Use Agg backend - no GUI required
import matplotlib.pyplot as plt
import os
# Simple plot settings
plt.rcParams['figure.figsize'] = (7.5, 4.5)
plt.rcParams['figure.dpi'] = 100
plt.rcParams['savefig.dpi'] = 150
plt.rcParams['font.family'] = 'sans-serif'
# Create directory for saving images
SAVE_DIR = "quantum_trading_results"
os.makedirs(SAVE_DIR, exist_ok=True)
def initialize_mt5():
if not mt5.initialize():
print("MT5 initialization error")
return False
return True
def get_price_data(symbol="EURUSD", timeframe=mt5.TIMEFRAME_D1, n_candles=256, offset=0):
"""Retrieves price data from MT5"""
rates = mt5.copy_rates_from_pos(symbol, timeframe, offset, n_candles)
if rates is None:
return None
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s')
df.set_index('time', inplace=True)
return df
def prices_to_binary(df):
"""Converts price movements into a binary sequence"""
binary_sequence = ""
for i in range(1, len(df)):
binary_sequence += "1" if df['close'][i] > df['close'][i-1] else "0"
return binary_sequence.zfill(256)
def calculate_future_horizon(current_data, future_data, horizon=10):
"""Calculates the binary horizon of future prices"""
future_binary = ""
current_price = current_data['close'].iloc[-1]
for price in future_data['close']:
future_binary += "1" if price > current_price else "0"
current_price = price
return future_binary.zfill(horizon)
def calculate_trend_ratio(binary_sequence):
"""Calculates the ratio of 1/0 in a binary sequence"""
ones = binary_sequence.count('1')
zeros = binary_sequence.count('0')
total = len(binary_sequence)
return ones/total, zeros/total
def predict_horizon(quantum_counts, horizon=10):
"""Predicts the binary horizon based on the top-10 most probable states"""
total_counts = sum(quantum_counts.values())
# Get the top-10 states with the highest probability
top_10_states = sorted(quantum_counts.items(), key=lambda x: x[1], reverse=True)[:10]
predicted_binary = ""
# For each position in the horizon
for i in range(horizon):
weighted_ones = 0
weighted_zeros = 0
# Calculate weighted probabilities for each state
for state, count in top_10_states:
if len(state) > i:
weight = count / total_counts
if state[i] == '1':
weighted_ones += weight
else:
weighted_zeros += weight
# Determine the bit based on weighted probabilities
predicted_binary += "1" if weighted_ones > weighted_zeros else "0"
return predicted_binary
def predict_trend(binary_sequence):
"""Predicts the trend based on the 1/0 ratio"""
ones_ratio, zeros_ratio = calculate_trend_ratio(binary_sequence)
return "BULL" if ones_ratio > 0.5 else "BEAR"
def verify_prediction(current_data, future_data):
"""Verifies the accuracy of the prediction"""
actual_trend = "BULL" if future_data['close'].iloc[-1] > current_data['close'].iloc[-1] else "BEAR"
return actual_trend
def sha256_to_binary(input_data):
hasher = SHA256.new()
hasher.update(input_data)
return bin(int(hasher.hexdigest(), 16))[2:].zfill(256)
def qpe_dlog(a, N, num_qubits):
qr = QuantumRegister(num_qubits + 1)
cr = ClassicalRegister(num_qubits)
qc = QuantumCircuit(qr, cr)
for q in range(num_qubits):
qc.h(q)
qc.x(num_qubits)
for q in range(num_qubits):
qc.cp(2 * np.pi * (a**(2**q) % N) / N, q, num_qubits)
qc.barrier()
for i in range(num_qubits):
qc.h(i)
for j in range(i):
qc.cp(-np.pi / float(2 ** (i - j)), j, i)
for i in range(num_qubits // 2):
qc.swap(i, num_qubits - 1 - i)
qc.measure(range(num_qubits), range(num_qubits))
return qc
def analyze_market_state(price_binary, num_qubits=22):
a = 70000000
N = 17000000
qc = qpe_dlog(a, N, num_qubits)
simulator = AerSimulator()
compiled_circuit = transpile(qc, simulator)
job = simulator.run(compiled_circuit, shots=3000)
result = job.result()
counts = result.get_counts()
best_match = max(counts, key=counts.get)
dlog_value = int(best_match, 2)
return dlog_value, counts
def compare_horizons(real_horizon, predicted_horizon):
"""
Compares the direction of the real and predicted horizons
Calculates the predominant direction (more 1s or 0s) and compares them
"""
# Count the number of 1s in the real and predicted horizons
real_ones = real_horizon.count('1')
pred_ones = predicted_horizon.count('1')
# Determine the predominant direction
real_trend = "BULL" if real_ones > len(real_horizon)/2 else "BEAR"
pred_trend = "BULL" if pred_ones > len(predicted_horizon)/2 else "BEAR"
return "WIN" if real_trend == pred_trend else "LOSS"
def save_histogram_plot(quantum_counts, filename="quantum_probabilities.png"):
"""Saves histogram of quantum state probabilities"""
# Get top 10 states
total_shots = sum(quantum_counts.values())
sorted_states = sorted(quantum_counts.items(), key=lambda x: x[1], reverse=True)[:10]
labels = [state for state, _ in sorted_states]
values = [count/total_shots*100 for _, count in sorted_states]
plt.figure(figsize=(7.5, 4))
# Simple color palette
colors = ['blue', 'green', 'red', 'purple', 'orange', 'brown', 'pink', 'gray', 'olive', 'cyan']
bars = plt.bar(range(len(labels)), values, color=colors[:len(labels)])
plt.xticks(range(len(labels)), labels, rotation=45, fontsize=8)
plt.xlabel("Quantum State")
plt.ylabel("Probability (%)")
plt.title("Top 10 Probable Quantum States")
plt.tight_layout()
full_path = os.path.join(SAVE_DIR, filename)
plt.savefig(full_path)
plt.close()
return full_path
def visualize_price_chart(historical_data, future_data, filename="price_chart.png"):
"""Visualizes price chart with forecast"""
combined_data = pd.concat([historical_data[-30:], future_data])
fig, ax = plt.subplots(figsize=(7.5, 4))
# Plot historical data
ax.plot(combined_data.index[:-len(future_data)],
combined_data['close'][:-len(future_data)],
label='Historical Data',
color='blue')
# Plot actual future data
ax.plot(combined_data.index[-len(future_data):],
combined_data['close'][-len(future_data):],
label='Actual Data',
color='green')
# Add vertical line at the separation point
ax.axvline(combined_data.index[-(len(future_data)+1)],
color='red', linestyle='--', label='Event Horizon')
ax.set_title(f'Price Chart {combined_data.index[0].strftime("%Y-%m-%d")} - {combined_data.index[-1].strftime("%Y-%m-%d")}')
ax.set_xlabel('Date')
ax.set_ylabel('Close Price')
ax.legend()
plt.tight_layout()
full_path = os.path.join(SAVE_DIR, filename)
plt.savefig(full_path)
plt.close()
return full_path
def visualize_binary_comparison(real_horizon, predicted_horizon, filename="horizon_comparison.png"):
"""Visualizes comparison of real and predicted horizons"""
fig, axes = plt.subplots(2, 1, figsize=(7.5, 4), sharex=True)
# Real horizon
for i, bit in enumerate(real_horizon):
color = 'green' if bit == '1' else 'red'
axes[0].bar(i, 1, color=color, edgecolor='black', linewidth=0.5)
axes[0].set_title('Real Horizon')
axes[0].set_yticks([])
# Predicted horizon
for i, bit in enumerate(predicted_horizon):
color = 'green' if bit == '1' else 'red'
axes[1].bar(i, 1, color=color, edgecolor='black', linewidth=0.5)
axes[1].set_title('Predicted Horizon')
axes[1].set_xlabel('Horizon Bit')
axes[1].set_yticks([])
# Legend
from matplotlib.patches import Patch
green_patch = Patch(color='green', label='Uptrend (1)')
red_patch = Patch(color='red', label='Downtrend (0)')
fig.legend(handles=[green_patch, red_patch], loc='upper right')
plt.tight_layout()
full_path = os.path.join(SAVE_DIR, filename)
plt.savefig(full_path)
plt.close()
return full_path
def visualize_probabilities(sorted_states, horizon_length, filename="bit_probabilities.png"):
"""Visualizes probabilities for each bit in the horizon"""
total_shots = sum(count for _, count in sorted_states)
# Create probability matrix
probabilities = []
for i in range(horizon_length):
weighted_ones = 0
weighted_zeros = 0
# Calculate weighted probabilities
for state, count in sorted_states[:10]:
if len(state) > i:
weight = count / total_shots
if state[i] == '1':
weighted_ones += weight
else:
weighted_zeros += weight
probabilities.append([weighted_ones, weighted_zeros])
# Create a simple visualization instead of a heatmap
fig, ax = plt.subplots(figsize=(7.5, 4))
# Convert to numpy array for easier handling
data = np.array(probabilities)
# Plot as line chart
x = range(horizon_length)
ax.plot(x, data[:, 0], 'g-', marker='o', label='Uptrend (1)')
ax.plot(x, data[:, 1], 'r-', marker='x', label='Downtrend (0)')
# Add value labels
for i in range(horizon_length):
ax.text(i, data[i, 0] + 0.02, f'{data[i, 0]:.2f}', ha='center')
ax.text(i, data[i, 1] - 0.05, f'{data[i, 1]:.2f}', ha='center')
ax.set_xticks(range(horizon_length))
ax.set_xticklabels([f'{i+1}' for i in range(horizon_length)])
ax.set_xlabel('Horizon Bit')
ax.set_ylabel('Probability')
ax.set_ylim(0, 1)
ax.set_title('Probabilities for Each Bit in Horizon')
ax.legend()
plt.tight_layout()
full_path = os.path.join(SAVE_DIR, filename)
plt.savefig(full_path)
plt.close()
return full_path
def analyze_from_point(timepoint_offset, n_candles=256, horizon_length=10, symbol="EURUSD"):
"""
Analyzes the market from a given point in the past with visualization
timepoint_offset - number of candles back from the current moment
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
try:
# Get historical data up to the event horizon point
historical_data = get_price_data(symbol=symbol, n_candles=n_candles, offset=timepoint_offset + horizon_length)
if historical_data is None:
print("Failed to retrieve historical data")
return
# Get real data after the point (for prediction verification)
future_data = get_price_data(symbol=symbol, n_candles=horizon_length, offset=timepoint_offset)
if future_data is None:
print("Failed to retrieve verification data")
return
# Determine the event horizon point
horizon_point_price = historical_data['close'].iloc[-1]
horizon_point_time = historical_data.index[-1]
print(f"\n=== ANALYSIS FROM EVENT HORIZON POINT ===")
print(f"Price at event horizon point: {horizon_point_price:.5f}")
print(f"Time of event horizon point: {horizon_point_time}")
# Convert historical prices into a binary sequence
price_binary = prices_to_binary(historical_data)
print("\nBinary price sequence before event horizon (last 64 bits):")
print(price_binary[-64:])
# Analyze the market state
print("\nRunning quantum simulation...")
market_state, quantum_counts = analyze_market_state(price_binary)
print("Quantum simulation complete")
# Visualize probability histogram
try:
histogram_path = save_histogram_plot(quantum_counts, f"{timestamp}_quantum_probabilities.png")
print(f"Probability histogram saved: {histogram_path}")
except Exception as e:
print(f"Could not create probability histogram: {str(e)}")
# Display the matrix of probable projections
print("\nMatrix of probable projections (top-20 states):")
print("{:<22} {:<10} {:<10}".format("State", "Frequency", "Probability"))
print("-" * 42)
total_shots = sum(quantum_counts.values())
sorted_states = sorted(quantum_counts.items(), key=lambda x: x[1], reverse=True)
for state, count in sorted_states[:20]:
probability = count / total_shots * 100
print("{:<22} {:<10} {:.2f}%".format(state, count, probability))
# Get the real and predicted horizons
real_horizon = calculate_future_horizon(historical_data, future_data, horizon_length)
predicted_horizon = predict_horizon(quantum_counts, horizon_length)
# Visualize horizon comparison
try:
comparison_path = visualize_binary_comparison(real_horizon, predicted_horizon, f"{timestamp}_horizon_comparison.png")
print(f"Horizon comparison saved: {comparison_path}")
except Exception as e:
print(f"Could not create horizon comparison: {str(e)}")
# Visualize bit probabilities
try:
probabilities_path = visualize_probabilities(sorted_states, horizon_length, f"{timestamp}_bit_probabilities.png")
print(f"Probability chart saved: {probabilities_path}")
except Exception as e:
print(f"Could not create probability chart: {str(e)}")
# Visualize price chart
try:
price_chart_path = visualize_price_chart(historical_data, future_data, f"{timestamp}_price_chart.png")
print(f"Price chart saved: {price_chart_path}")
except Exception as e:
print(f"Could not create price chart: {str(e)}")
print("\n=== COMPARISON OF PREDICTION WITH REALITY ===")
print("Real horizon after the point:")
print(real_horizon)
print("\nPredicted horizon of future prices:")
print(predicted_horizon)
# Compare the directions of the horizons
result = compare_horizons(real_horizon, predicted_horizon)
horizon_accuracy = sum(a == b for a, b in zip(real_horizon, predicted_horizon)) / horizon_length
print(f"\nNumber of 1s in the real horizon: {real_horizon.count('1')}/{len(real_horizon)}")
print(f"Number of 1s in the prediction: {predicted_horizon.count('1')}/{len(predicted_horizon)}")
print(f"Bit match accuracy: {horizon_accuracy:.2%}")
print(f"Direction comparison result: {result}")
# Analyze the probability distribution
print("\nAnalysis of probability distribution for event horizon bits:")
print("{:<5} {:<10} {:<10} {:<10}".format("Bit", "Prob.1", "Prob.0", "Prediction"))
print("-" * 35)
for i in range(horizon_length):
weighted_ones = 0
weighted_zeros = 0
# Calculate weighted probabilities for the top-10 states
for state, count in sorted_states[:10]:
if len(state) > i:
weight = count / total_shots
if state[i] == '1':
weighted_ones += weight
else:
weighted_zeros += weight
predicted_bit = "1" if weighted_ones > weighted_zeros else "0"
print("{:<5} {:<10.2%} {:<10.2%} {:<10}".format(
i+1, weighted_ones, weighted_zeros, predicted_bit
))
except Exception as e:
print(f"Error during analysis: {str(e)}")
def main():
if not initialize_mt5():
return
try:
print("\n=== QUANTUM TRADING ANALYZER ===")
print("This tool analyzes financial markets using quantum computing")
# Request symbol
symbol = input("Enter symbol for analysis (default EURUSD): ") or "EURUSD"
# Request event horizon point
offset = int(input("Enter event horizon point offset (number of candles back from current moment): "))
horizon_length = int(input("Enter event horizon length (default 10): ") or "10")
# Perform analysis from the given point
analyze_from_point(offset, horizon_length=horizon_length, symbol=symbol)
print(f"\nAnalysis completed. All visualizations saved in folder: {os.path.abspath(SAVE_DIR)}")
except Exception as e:
print(f"Error in main function: {str(e)}")
finally:
mt5.shutdown()
if __name__ == "__main__":
main()