1299 lines
56 KiB
Python
1299 lines
56 KiB
Python
# Kompleksowy eksperyment do pracy magisterskiej - NAPRAWIONA WERSJA
|
|
#
|
|
# NAPRAWY:
|
|
# 1. Dodano timeout'y dla operacji kwantowych (5 minut)
|
|
# 2. Zmniejszono liczbę shots z 1024 na 50
|
|
# 3. Ograniczono wymiar cech do maksymalnie 8
|
|
# 4. Wyłączono równoległe przetwarzanie (problematyczne)
|
|
# 5. Dodano checkpoint'ing co 5 iteracji
|
|
# 6. Uproszczono mapy cech - tylko proste warianty
|
|
# 7. Dodano lepszą obsługę błędów i logging
|
|
# 8. Wyłączono problematyczne eksperymenty
|
|
#
|
|
# INSTRUKCJE UŻYCIA:
|
|
# - Uruchom na serwerze w chmurze
|
|
# - Monitoruj logi w czasie rzeczywistym
|
|
# - Eksperyment może trwać 2-4 godziny (zamiast 30+)
|
|
# - Wyniki są zapisywane automatycznie
|
|
import numpy as np
|
|
import pandas as pd
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
import sklearn
|
|
from sklearn.svm import SVC
|
|
from sklearn.model_selection import train_test_split # GridSearchCV, cross_val_score - nie używane
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.metrics import accuracy_score # classification_report, confusion_matrix, roc_auc_score, precision_score, recall_score, f1_score - nie używane
|
|
# from sklearn.inspection import permutation_importance # Nie używane
|
|
from sklearn.decomposition import PCA
|
|
# import json # Nie używane
|
|
# import os.path # Nie używane
|
|
import signal
|
|
# import multiprocessing as mp # Nie używane
|
|
# from functools import partial # Nie używane
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
# Import bibliotek kwantowych - NAPRAWIONE IMPORTY
|
|
import qiskit
|
|
# Naprawiony import Aer dla nowszych wersji Qiskit
|
|
try:
|
|
from qiskit import Aer
|
|
print("✓ Import Aer z qiskit - sukces")
|
|
except ImportError:
|
|
try:
|
|
from qiskit_aer import Aer
|
|
print("✓ Import Aer z qiskit_aer - sukces")
|
|
except ImportError:
|
|
try:
|
|
from qiskit.providers.aer import Aer
|
|
print("✓ Import Aer z qiskit.providers.aer - sukces")
|
|
except ImportError:
|
|
print("❌ Błąd: Nie można zaimportować Aer z żadnego źródła")
|
|
print("Sprawdź czy qiskit-aer jest zainstalowany: pip install qiskit-aer")
|
|
sys.exit(1)
|
|
|
|
from qiskit.circuit.library import ZZFeatureMap, PauliFeatureMap, EfficientSU2
|
|
from qiskit.circuit.library.data_preparation import ZFeatureMap
|
|
|
|
# Naprawiony import QuantumKernel dla nowszych wersji
|
|
try:
|
|
from qiskit_machine_learning.kernels import QuantumKernel
|
|
print("✓ Import QuantumKernel z qiskit_machine_learning.kernels - sukces")
|
|
except ImportError:
|
|
try:
|
|
from qiskit_machine_learning.kernels.quantum_kernel import QuantumKernel
|
|
print("✓ Import QuantumKernel z qiskit_machine_learning.kernels.quantum_kernel - sukces")
|
|
except ImportError:
|
|
try:
|
|
from qiskit_machine_learning.kernels import FidelityQuantumKernel as QuantumKernel
|
|
print("✓ Import FidelityQuantumKernel jako QuantumKernel - sukces")
|
|
except ImportError:
|
|
print("❌ Błąd: Nie można zaimportować QuantumKernel")
|
|
print("Sprawdź wersję qiskit-machine-learning: pip install --upgrade qiskit-machine-learning")
|
|
sys.exit(1)
|
|
|
|
# Naprawiony import QSVC
|
|
try:
|
|
from qiskit_machine_learning.algorithms import QSVC
|
|
print("✓ Import QSVC z qiskit_machine_learning.algorithms - sukces")
|
|
except ImportError:
|
|
try:
|
|
from qiskit_machine_learning.algorithms.svm import QSVC
|
|
print("✓ Import QSVC z qiskit_machine_learning.algorithms.svm - sukces")
|
|
except ImportError:
|
|
print("❌ Błąd: Nie można zaimportować QSVC")
|
|
print("Sprawdź wersję qiskit-machine-learning: pip install --upgrade qiskit-machine-learning")
|
|
sys.exit(1)
|
|
# import dimod # Nie używane - usunięte
|
|
# import neal # Nie używane - usunięte
|
|
|
|
# ----------------- PARAMETRY KONFIGURACYJNE -----------------
|
|
|
|
# Wybór zmiennej docelowej
|
|
TARGET_VARIABLE = 'Primary_Diagnosis' # Możliwe wartości: 'Primary_Diagnosis' lub 'Grade'
|
|
|
|
# Wybór map cech do aktywacji w eksperymentach - UPROSZCZONE
|
|
FEATURE_MAPS_ENABLED = {
|
|
'ZZ1': False, # Tylko proste mapy cech
|
|
'ZZ2': False, # Wyłączone - zbyt złożone
|
|
'Pauli1': True, # Tylko 1 powtórzenie
|
|
'Pauli2': False, # Wyłączone - zbyt złożone
|
|
'Z1': True, # Prosta implementacja Z-FeatureMap
|
|
'Z2': True, # Wyłączone - zbyt złożone
|
|
'ZZ1_new': False, # Wyłączone
|
|
'ZZ2_new': False, # Wyłączone
|
|
'SU2_1': False, # Wyłączone - zbyt złożone
|
|
'SU2_2': False, # Wyłączone
|
|
'SU2_full': False, # Wyłączone
|
|
'Amplitude_l2': False, # Kodowanie amplitudowe - szybsze
|
|
'Amplitude_l1': False, # Wyłączone dla uproszczenia
|
|
'Amplitude_min-max': False # Wyłączone dla uproszczenia
|
|
}
|
|
|
|
# Parametry danych
|
|
DATA_FILE = 'dane/TCGA_GBM_LGG_Mutations_clean.csv'
|
|
TEST_SIZE = 0.3
|
|
RANDOM_STATE = 42
|
|
PCA_COMPONENTS = 12 # Zmniejszone z 16 na 8 dla szybszych obliczeń
|
|
|
|
# Parametry bezpieczeństwa i wydajności
|
|
QUANTUM_SHOTS = 50 # Zmniejszone z 1024 na 50
|
|
QUANTUM_TIMEOUT = 3000 # 5 minut timeout dla operacji kwantowych
|
|
MAX_FEATURE_DIMENSION = 8 # Maksymalny wymiar dla map cech
|
|
USE_PARALLEL = False # Wyłączone równoległe przetwarzanie
|
|
CHECKPOINT_INTERVAL = 5 # Zapisuj wyniki co 5 iteracji
|
|
|
|
# Wybór eksperymentów do przeprowadzenia
|
|
RUN_COMPLEXITY_EXPERIMENT = False # Wyłączone - zbyt czasochłonne
|
|
RUN_GENE_SUBSETS_EXPERIMENT = True
|
|
RUN_FEATURE_MAPPINGS_EXPERIMENT = False # Wyłączone - problematyczne
|
|
|
|
# Parametry wyjściowe
|
|
OUTPUT_DIR = 'wyniki/wyniki_eksperymentow_1109-2'
|
|
OUTPUT_FILE = os.path.join(OUTPUT_DIR, f'wyniki_eksperymentow_{TARGET_VARIABLE}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
|
|
|
|
# Upewnij się, że katalog wyjściowy istnieje
|
|
if not os.path.exists(OUTPUT_DIR):
|
|
os.makedirs(OUTPUT_DIR)
|
|
|
|
# Klasa do przekierowania wyjścia do pliku i terminala jednocześnie
|
|
class Logger:
|
|
def __init__(self, filename):
|
|
self.terminal = sys.stdout
|
|
self.log = open(filename, 'w', encoding='utf-8')
|
|
self.checkpoint_count = 0
|
|
|
|
def write(self, message):
|
|
self.terminal.write(message)
|
|
self.log.write(message)
|
|
self.log.flush()
|
|
|
|
# Checkpoint co pewien czas
|
|
if 'Testing' in message and 'for' in message:
|
|
self.checkpoint_count += 1
|
|
if self.checkpoint_count % CHECKPOINT_INTERVAL == 0:
|
|
self.write(f'\n*** CHECKPOINT {self.checkpoint_count} - {datetime.now().strftime("%H:%M:%S")} ***\n')
|
|
|
|
def flush(self):
|
|
self.terminal.flush()
|
|
self.log.flush()
|
|
|
|
# Funkcja do bezpiecznego tworzenia backend'u kwantowego
|
|
def get_quantum_backend():
|
|
"""
|
|
Tworzy backend kwantowy z obsługą różnych wersji Qiskit.
|
|
|
|
Returns:
|
|
Backend kwantowy
|
|
"""
|
|
try:
|
|
# Próbuj różne sposoby importu Aer
|
|
try:
|
|
from qiskit import Aer
|
|
return Aer.get_backend('qasm_simulator')
|
|
except ImportError:
|
|
try:
|
|
from qiskit_aer import Aer
|
|
return Aer.get_backend('qasm_simulator')
|
|
except ImportError:
|
|
from qiskit.providers.aer import Aer
|
|
return Aer.get_backend('qasm_simulator')
|
|
except Exception as e:
|
|
print(f"Błąd podczas tworzenia backend'u: {e}")
|
|
# Fallback - użyj podstawowego backend'u
|
|
try:
|
|
from qiskit import BasicAer
|
|
return BasicAer.get_backend('qasm_simulator')
|
|
except:
|
|
raise RuntimeError("Nie można utworzyć backend'u kwantowego")
|
|
|
|
def create_quantum_kernel(feature_map, backend_or_instance=None):
|
|
"""
|
|
Tworzy jądro kwantowe z obsługą różnych wersji Qiskit Machine Learning.
|
|
|
|
Args:
|
|
feature_map: Mapa cech kwantowych
|
|
backend_or_instance: Backend kwantowy lub quantum_instance (opcjonalny)
|
|
|
|
Returns:
|
|
Jądro kwantowe
|
|
"""
|
|
if backend_or_instance is None:
|
|
backend_or_instance = get_quantum_backend()
|
|
|
|
try:
|
|
# Próbuj różne sposoby tworzenia jądra kwantowego
|
|
try:
|
|
# Nowa wersja - FidelityQuantumKernel (tylko quantum_instance)
|
|
from qiskit_machine_learning.kernels import FidelityQuantumKernel
|
|
if hasattr(backend_or_instance, 'backend'): # quantum_instance
|
|
return FidelityQuantumKernel(feature_map=feature_map, quantum_instance=backend_or_instance)
|
|
else: # backend - musimy utworzyć quantum_instance
|
|
from qiskit.utils import QuantumInstance
|
|
quantum_instance = QuantumInstance(backend=backend_or_instance, shots=QUANTUM_SHOTS)
|
|
return FidelityQuantumKernel(feature_map=feature_map, quantum_instance=quantum_instance)
|
|
except ImportError:
|
|
try:
|
|
# Stara wersja - QuantumKernel
|
|
if hasattr(backend_or_instance, 'backend'): # quantum_instance
|
|
return QuantumKernel(feature_map=feature_map, quantum_instance=backend_or_instance)
|
|
else: # backend
|
|
return QuantumKernel(feature_map=feature_map, backend=backend_or_instance)
|
|
except TypeError:
|
|
# Jeszcze starsza wersja - bez parametru backend
|
|
return QuantumKernel(feature_map=feature_map)
|
|
except Exception as e:
|
|
print(f"Błąd podczas tworzenia jądra kwantowego: {e}")
|
|
raise RuntimeError("Nie można utworzyć jądra kwantowego")
|
|
|
|
# Funkcja timeout dla operacji kwantowych
|
|
def timeout_handler(signum, frame):
|
|
raise TimeoutError("Operacja kwantowa przekroczyła limit czasu")
|
|
|
|
# Funkcja bezpiecznego wykonywania operacji kwantowych
|
|
def safe_quantum_operation(operation, timeout=QUANTUM_TIMEOUT):
|
|
"""
|
|
Wykonuje operację kwantową z timeout.
|
|
|
|
Args:
|
|
operation: Funkcja do wykonania
|
|
timeout: Maksymalny czas w sekundach
|
|
|
|
Returns:
|
|
Wynik operacji lub None w przypadku błędu
|
|
"""
|
|
try:
|
|
# Ustaw timeout
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
signal.alarm(timeout)
|
|
|
|
# Wykonaj operację
|
|
result = operation()
|
|
|
|
# Wyłącz timeout
|
|
signal.alarm(0)
|
|
return result
|
|
|
|
except TimeoutError:
|
|
print(f" Timeout po {timeout} sekundach - pomijam")
|
|
return None
|
|
except Exception as e:
|
|
print(f" Błąd: {str(e)}")
|
|
return None
|
|
finally:
|
|
signal.alarm(0)
|
|
|
|
# ----------------- FUNKCJE POMOCNICZE -----------------
|
|
|
|
def z_feature_map(feature_dimension, reps=1):
|
|
"""
|
|
Tworzy niestandardową mapę cech Z-FeatureMap.
|
|
|
|
Args:
|
|
feature_dimension: Wymiar wektora cech
|
|
reps: Liczba powtórzeń
|
|
|
|
Returns:
|
|
QuantumCircuit: Układ kwantowy reprezentujący mapę cech
|
|
"""
|
|
from qiskit import QuantumCircuit
|
|
from qiskit.circuit import Parameter, ParameterVector
|
|
|
|
# Tworzymy układ kwantowy z odpowiednią liczbą kubitów
|
|
qc = QuantumCircuit(feature_dimension)
|
|
|
|
# Tworzymy wektor parametrów
|
|
params = ParameterVector('x', feature_dimension)
|
|
|
|
# Dodajemy bramki Hadamarda na wszystkich kubitach
|
|
for i in range(feature_dimension):
|
|
qc.h(i)
|
|
|
|
# Powtarzamy blok reps razy
|
|
for _ in range(reps):
|
|
# Dodajemy rotacje Z na każdym kubicie
|
|
for i in range(feature_dimension):
|
|
qc.rz(params[i], i)
|
|
|
|
return qc
|
|
|
|
def zz_feature_map(feature_dimension, reps=1):
|
|
"""
|
|
Tworzy niestandardową mapę cech ZZ-FeatureMap.
|
|
|
|
Args:
|
|
feature_dimension: Wymiar wektora cech
|
|
reps: Liczba powtórzeń
|
|
|
|
Returns:
|
|
QuantumCircuit: Układ kwantowy reprezentujący mapę cech
|
|
"""
|
|
from qiskit import QuantumCircuit
|
|
from qiskit.circuit import ParameterVector
|
|
|
|
# Tworzymy układ kwantowy z odpowiednią liczbą kubitów
|
|
qc = QuantumCircuit(feature_dimension)
|
|
|
|
# Tworzymy wektor parametrów
|
|
params = ParameterVector('x', feature_dimension)
|
|
|
|
# Dodajemy bramki Hadamarda na wszystkich kubitach
|
|
for i in range(feature_dimension):
|
|
qc.h(i)
|
|
|
|
# Powtarzamy blok reps razy
|
|
for _ in range(reps):
|
|
# Dodajemy rotacje Z na każdym kubicie
|
|
for i in range(feature_dimension):
|
|
qc.rz(params[i], i)
|
|
|
|
# Dodajemy bramki ZZ między sąsiednimi kubitami
|
|
for i in range(feature_dimension - 1):
|
|
qc.cx(i, i+1)
|
|
qc.rz(params[i] * params[i+1], i+1)
|
|
qc.cx(i, i+1)
|
|
|
|
return qc
|
|
|
|
# Funkcje eksperymentalne
|
|
|
|
def experiment_complexity_impact(X, y, mutation_counts):
|
|
"""
|
|
Bada wpływ złożoności danych na skuteczność klasyfikacji.
|
|
|
|
Parameters:
|
|
-----------
|
|
X : DataFrame
|
|
Dane wejściowe (cechy)
|
|
y : Series
|
|
Etykiety klas (zmienna docelowa określona przez TARGET_VARIABLE)
|
|
mutation_counts : Series
|
|
Liczba mutacji dla każdego przypadku
|
|
|
|
Returns:
|
|
--------
|
|
dict
|
|
Słownik z wynikami dla każdego poziomu złożoności
|
|
"""
|
|
# Definiuj poziomy złożoności na podstawie liczby mutacji
|
|
complexity_levels = {}
|
|
|
|
# Poziom 1: Przypadki z małą liczbą mutacji (dolny kwartyl)
|
|
low_threshold = mutation_counts.quantile(0.25)
|
|
complexity_levels['Low Complexity'] = mutation_counts[mutation_counts <= low_threshold].index.tolist()
|
|
|
|
# Poziom 2: Przypadki ze średnią liczbą mutacji (między dolnym a górnym kwartylem)
|
|
high_threshold = mutation_counts.quantile(0.75)
|
|
complexity_levels['Medium Complexity'] = mutation_counts[(mutation_counts > low_threshold) &
|
|
(mutation_counts < high_threshold)].index.tolist()
|
|
|
|
# Poziom 3: Przypadki z dużą liczbą mutacji (górny kwartyl)
|
|
complexity_levels['High Complexity'] = mutation_counts[mutation_counts >= high_threshold].index.tolist()
|
|
|
|
# Wyświetl informacje o poziomach złożoności
|
|
print("\nPoziomy złożoności danych:")
|
|
for level, indices in complexity_levels.items():
|
|
print(f" {level}: {len(indices)} przypadków")
|
|
avg_mutations = mutation_counts.iloc[indices].mean()
|
|
print(f" Średnia liczba mutacji: {avg_mutations:.2f}")
|
|
|
|
results = {}
|
|
|
|
# Przygotuj wszystkie możliwe mapowania cech - podobnie jak w experiment_feature_mappings
|
|
def prepare_feature_maps(feature_dimension):
|
|
"""
|
|
Przygotowuje wszystkie możliwe mapowania cech dla danego wymiaru.
|
|
|
|
Args:
|
|
feature_dimension: Wymiar wektora cech
|
|
|
|
Returns:
|
|
Lista słowników z konfiguracjami map cech
|
|
"""
|
|
all_feature_maps = [
|
|
{'name': 'ZZ1_C10', 'base_name': 'ZZ1', 'map': ZZFeatureMap(feature_dimension=feature_dimension, reps=1), 'C': 10.0},
|
|
{'name': 'ZZ2_C01', 'base_name': 'ZZ2', 'map': ZZFeatureMap(feature_dimension=feature_dimension, reps=2), 'C': 0.1},
|
|
{'name': 'Pauli1_C10', 'base_name': 'Pauli1', 'map': PauliFeatureMap(feature_dimension=feature_dimension, reps=1), 'C': 10.0},
|
|
{'name': 'ZZ1_C1', 'base_name': 'ZZ1', 'map': ZZFeatureMap(feature_dimension=feature_dimension, reps=1), 'C': 1.0},
|
|
{'name': 'ZZ2_C1', 'base_name': 'ZZ2', 'map': ZZFeatureMap(feature_dimension=feature_dimension, reps=2), 'C': 1.0},
|
|
{'name': 'Pauli1_C1', 'base_name': 'Pauli1', 'map': PauliFeatureMap(feature_dimension=feature_dimension, reps=1), 'C': 1.0},
|
|
{'name': 'Pauli2_C1', 'base_name': 'Pauli2', 'map': PauliFeatureMap(feature_dimension=feature_dimension, reps=2), 'C': 1.0},
|
|
# Dodaj nowe mapy cech Z-FeatureMap
|
|
{'name': 'Z1_C01', 'base_name': 'Z1', 'map': z_feature_map(feature_dimension=feature_dimension, reps=1), 'C': 0.1},
|
|
{'name': 'Z1_C1', 'base_name': 'Z1', 'map': z_feature_map(feature_dimension=feature_dimension, reps=1), 'C': 1.0},
|
|
{'name': 'Z1_C10', 'base_name': 'Z1', 'map': z_feature_map(feature_dimension=feature_dimension, reps=1), 'C': 10.0},
|
|
{'name': 'Z2_C1', 'base_name': 'Z2', 'map': z_feature_map(feature_dimension=feature_dimension, reps=2), 'C': 1.0},
|
|
{'name': 'Z2_C01', 'base_name': 'Z2', 'map': z_feature_map(feature_dimension=feature_dimension, reps=2), 'C': 0.1},
|
|
{'name': 'Z2_C10', 'base_name': 'Z2', 'map': z_feature_map(feature_dimension=feature_dimension, reps=2), 'C': 10.0},
|
|
# Dodaj nowe mapy cech ZZ-FeatureMap (nowa implementacja)
|
|
{'name': 'ZZ1_new_C1', 'base_name': 'ZZ1_new', 'map': zz_feature_map(feature_dimension=feature_dimension, reps=1), 'C': 1.0},
|
|
{'name': 'ZZ2_new_C1', 'base_name': 'ZZ2_new', 'map': zz_feature_map(feature_dimension=feature_dimension, reps=2), 'C': 1.0},
|
|
# Dodaj mapy cech SU2
|
|
{'name': 'SU2_1_C1', 'base_name': 'SU2_1', 'map': EfficientSU2(num_qubits=feature_dimension, entanglement='linear', reps=1), 'C': 1.0},
|
|
{'name': 'SU2_2_C1', 'base_name': 'SU2_2', 'map': EfficientSU2(num_qubits=feature_dimension, entanglement='linear', reps=2), 'C': 1.0},
|
|
{'name': 'SU2_full_C1', 'base_name': 'SU2_full', 'map': EfficientSU2(num_qubits=feature_dimension, entanglement='full', reps=1), 'C': 1.0},
|
|
# Dodaj oficjalną implementację ZFeatureMap
|
|
{'name': 'ZFeatureMap1_C1', 'base_name': 'ZFeatureMap1', 'map': ZFeatureMap(feature_dimension=feature_dimension, reps=1), 'C': 1.0},
|
|
{'name': 'ZFeatureMap2_C1', 'base_name': 'ZFeatureMap2', 'map': ZFeatureMap(feature_dimension=feature_dimension, reps=2), 'C': 1.0},
|
|
# Dodaj kodowanie amplitudowe
|
|
{'name': 'Amplitude_l2_C1', 'base_name': 'Amplitude_l2', 'map': None, 'C': 1.0, 'amplitude_encoding': True, 'normalization': 'l2'},
|
|
{'name': 'Amplitude_l1_C1', 'base_name': 'Amplitude_l1', 'map': None, 'C': 1.0, 'amplitude_encoding': True, 'normalization': 'l1'},
|
|
{'name': 'Amplitude_min-max_C1', 'base_name': 'Amplitude_min-max', 'map': None, 'C': 1.0, 'amplitude_encoding': True, 'normalization': 'min-max'}
|
|
]
|
|
|
|
# Filtruj mapy cech na podstawie ustawień FEATURE_MAPS_ENABLED
|
|
feature_maps = []
|
|
for fm in all_feature_maps:
|
|
if fm['base_name'] in FEATURE_MAPS_ENABLED and FEATURE_MAPS_ENABLED[fm['base_name']]:
|
|
feature_maps.append(fm)
|
|
|
|
return feature_maps
|
|
|
|
# Dla każdego poziomu złożoności
|
|
for complexity_level, indices in complexity_levels.items():
|
|
print(f"\nTestowanie poziomu złożoności: {complexity_level}")
|
|
|
|
# Wybierz podzbiór danych
|
|
X_subset = X.iloc[indices]
|
|
y_subset = y.iloc[indices]
|
|
|
|
# Podział na zbiory treningowy i testowy
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X_subset, y_subset, test_size=TEST_SIZE, random_state=RANDOM_STATE
|
|
)
|
|
|
|
# Skalowanie danych
|
|
scaler = StandardScaler()
|
|
X_train_scaled = scaler.fit_transform(X_train)
|
|
X_test_scaled = scaler.transform(X_test)
|
|
|
|
# Redukcja wymiarowości dla modelu kwantowego
|
|
pca = PCA(n_components=PCA_COMPONENTS)
|
|
X_train_reduced = pca.fit_transform(X_train_scaled)
|
|
X_test_reduced = pca.transform(X_test_scaled)
|
|
|
|
# Klasyczny SVM (liniowy)
|
|
svm_linear = SVC(kernel='linear')
|
|
svm_linear.fit(X_train_scaled, y_train)
|
|
linear_acc = svm_linear.score(X_test_scaled, y_test)
|
|
|
|
# Klasyczny SVM (RBF)
|
|
svm_rbf = SVC(kernel='rbf')
|
|
svm_rbf.fit(X_train_scaled, y_train)
|
|
rbf_acc = svm_rbf.score(X_test_scaled, y_test)
|
|
|
|
# Przygotuj wyniki dla kwantowych SVM
|
|
quantum_results = {}
|
|
|
|
# Przygotuj wszystkie mapy cech
|
|
feature_maps = prepare_feature_maps(X_train_reduced.shape[1])
|
|
print(f"Aktywne mapy cech: {[fm['name'] for fm in feature_maps]}")
|
|
|
|
# Normalizacja danych do zakresu [0, 2π]
|
|
X_train_normalized = X_train_reduced.copy()
|
|
X_test_normalized = X_test_reduced.copy()
|
|
|
|
for i in range(X_train_reduced.shape[1]):
|
|
min_val = min(X_train_reduced[:, i].min(), X_test_reduced[:, i].min())
|
|
max_val = max(X_train_reduced[:, i].max(), X_test_reduced[:, i].max())
|
|
|
|
if max_val > min_val:
|
|
X_train_normalized[:, i] = 2 * np.pi * (X_train_reduced[:, i] - min_val) / (max_val - min_val)
|
|
X_test_normalized[:, i] = 2 * np.pi * (X_test_reduced[:, i] - min_val) / (max_val - min_val)
|
|
|
|
# Testuj każdą mapę cech - SEKWENCYJNIE z timeout
|
|
for i, fm in enumerate(feature_maps):
|
|
print(f" Testowanie mapy cech: {fm['name']} ({i+1}/{len(feature_maps)})")
|
|
|
|
def train_quantum_model():
|
|
try:
|
|
# Używamy quantum_instance zamiast backend
|
|
quantum_instance = qiskit.utils.QuantumInstance(
|
|
backend=get_quantum_backend(),
|
|
shots=QUANTUM_SHOTS
|
|
)
|
|
quantum_kernel = create_quantum_kernel(fm['map'], quantum_instance)
|
|
except (TypeError, AttributeError):
|
|
# Dla starszych wersji Qiskit
|
|
backend = get_quantum_backend()
|
|
backend.shots = QUANTUM_SHOTS
|
|
quantum_kernel = create_quantum_kernel(fm['map'], backend)
|
|
|
|
# Używamy quantum_kernel bezpośrednio z QSVC
|
|
qsvm = QSVC(quantum_kernel=quantum_kernel, C=fm['C'])
|
|
|
|
# Trenuj model
|
|
qsvm.fit(X_train_normalized, y_train)
|
|
|
|
# Testuj model
|
|
quantum_acc = qsvm.score(X_test_normalized, y_test)
|
|
return quantum_acc
|
|
|
|
# Wykonaj z timeout
|
|
result = safe_quantum_operation(train_quantum_model, QUANTUM_TIMEOUT)
|
|
|
|
if result is not None:
|
|
quantum_results[fm['name']] = result
|
|
print(f" Dokładność: {result:.4f}")
|
|
else:
|
|
quantum_results[fm['name']] = 0.0
|
|
print(f" Pominięte z powodu błędu/timeout")
|
|
|
|
# Znajdź najlepszy model kwantowy
|
|
best_quantum_acc = max(quantum_results.values()) if quantum_results else 0.0
|
|
best_quantum_model = max(quantum_results.items(), key=lambda x: x[1])[0] if quantum_results else "None"
|
|
|
|
results[complexity_level] = {
|
|
'linear_svm': linear_acc,
|
|
'rbf_svm': rbf_acc,
|
|
'best_quantum_svm': best_quantum_acc,
|
|
'best_quantum_model': best_quantum_model
|
|
}
|
|
|
|
# Dodaj wyniki dla każdej mapy cech
|
|
for name, acc in quantum_results.items():
|
|
results[complexity_level][name] = acc
|
|
|
|
print(f" Linear SVM: {linear_acc:.4f}")
|
|
print(f" RBF SVM: {rbf_acc:.4f}")
|
|
print(f" Best Quantum SVM: {best_quantum_acc:.4f} ({best_quantum_model})")
|
|
|
|
return results
|
|
|
|
def experiment_gene_subsets(data):
|
|
"""
|
|
Bada wpływ różnych podzbiorów genów na skuteczność klasyfikacji.
|
|
|
|
Parameters:
|
|
-----------
|
|
data : DataFrame
|
|
Pełny zbiór danych
|
|
|
|
Returns:
|
|
--------
|
|
dict
|
|
Słownik z wynikami dla każdego podzbioru genów
|
|
"""
|
|
# Definiuj podzbiory genów
|
|
gene_subsets = {
|
|
"Wszystkie geny": None, # Wszystkie geny
|
|
"Geny często mutowane": ["IDH1", "TP53", "ATRX", "PTEN"],
|
|
"Geny średnio mutowane": ["EGFR", "CIC", "MUC16", "PIK3CA", "NF1", "PIK3R1", "FUBP1"],
|
|
"Geny rzadko mutowane": ["RB1", "NOTCH1", "BCOR", "CSMD3", "SMARCA4", "GRIN2A", "IDH2", "FAT4"],
|
|
}
|
|
|
|
# Oblicz liczbę genów w każdym podzbiorze i liczbę dostępnych genów w danych
|
|
gene_counts = {}
|
|
for name, genes in gene_subsets.items():
|
|
if genes:
|
|
available_genes = [g for g in genes if g in data.columns]
|
|
gene_counts[name] = {
|
|
'total': len(genes),
|
|
'available': len(available_genes),
|
|
'missing': len(genes) - len(available_genes)
|
|
}
|
|
else:
|
|
# Dla "All Genes" policz wszystkie kolumny genów
|
|
gene_cols = [col for col in data.columns if col not in
|
|
['Grade', 'Project', 'Case_ID', 'Gender', 'Age_at_diagnosis',
|
|
'Primary_Diagnosis', 'Race']]
|
|
gene_counts[name] = {
|
|
'total': len(gene_cols),
|
|
'available': len(gene_cols),
|
|
'missing': 0
|
|
}
|
|
|
|
print("\nLiczba genów w każdym podzbiorze:")
|
|
for name, counts in gene_counts.items():
|
|
print(f" {name}: {counts['total']} genów (dostępne: {counts['available']}, brakujące: {counts['missing']})")
|
|
|
|
results = {}
|
|
|
|
# Dla każdego podzbioru genów
|
|
for name, genes in gene_subsets.items():
|
|
print(f"\nTesting gene subset: {name} (n={len(genes) if genes else 'all'})")
|
|
|
|
# Przygotuj dane
|
|
if genes:
|
|
# Sprawdź, które geny są dostępne w danych
|
|
available_genes = [g for g in genes if g in data.columns]
|
|
if len(available_genes) < len(genes):
|
|
print(f" Warning: Only {len(available_genes)}/{len(genes)} genes are available in the dataset")
|
|
|
|
if not available_genes:
|
|
print(" Skipping due to no available genes")
|
|
continue
|
|
|
|
# Wybierz tylko określone geny
|
|
X = data[available_genes].copy()
|
|
sample_size = len(genes)
|
|
else:
|
|
# Użyj wszystkich genów
|
|
gene_cols = [col for col in data.columns if col not in
|
|
['Grade', 'Project', 'Case_ID', 'Gender', 'Age_at_diagnosis',
|
|
'Primary_Diagnosis', 'Race']]
|
|
X = data[gene_cols].copy()
|
|
sample_size = len(gene_cols) # Liczba wszystkich genów
|
|
|
|
# Konwertuj dane tekstowe na binarne (0/1)
|
|
for col in X.columns:
|
|
if X[col].dtype == 'object': # Jeśli kolumna zawiera dane tekstowe
|
|
X[col] = X[col].apply(lambda x: 1 if str(x).upper() == 'MUTATED' else 0)
|
|
|
|
# Użyj zmiennej docelowej określonej w konfiguracji
|
|
y = data[TARGET_VARIABLE]
|
|
|
|
# Podział na zbiory treningowy i testowy
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE
|
|
)
|
|
|
|
# Skalowanie danych
|
|
scaler = StandardScaler()
|
|
X_train_scaled = scaler.fit_transform(X_train)
|
|
X_test_scaled = scaler.transform(X_test)
|
|
|
|
# Redukcja wymiarowości dla modelu kwantowego - ograniczona do MAX_FEATURE_DIMENSION
|
|
max_components = min(PCA_COMPONENTS, MAX_FEATURE_DIMENSION, X_train_scaled.shape[1])
|
|
if X_train_scaled.shape[1] > max_components:
|
|
pca = PCA(n_components=max_components)
|
|
X_train_reduced = pca.fit_transform(X_train_scaled)
|
|
X_test_reduced = pca.transform(X_test_scaled)
|
|
else:
|
|
X_train_reduced = X_train_scaled
|
|
X_test_reduced = X_test_scaled
|
|
|
|
print(f" Redukcja wymiarowości: {X_train_scaled.shape[1]} -> {max_components} wymiarów")
|
|
|
|
# Klasyczny SVM (liniowy)
|
|
svm_linear = SVC(kernel='linear')
|
|
svm_linear.fit(X_train_scaled, y_train)
|
|
linear_acc = svm_linear.score(X_test_scaled, y_test)
|
|
|
|
# Klasyczny SVM (RBF)
|
|
svm_rbf = SVC(kernel='rbf')
|
|
svm_rbf.fit(X_train_scaled, y_train)
|
|
rbf_acc = svm_rbf.score(X_test_scaled, y_test)
|
|
|
|
# Przygotuj wyniki dla kwantowych SVM
|
|
quantum_results = {}
|
|
|
|
# Konfiguracja 1: ZZ1, C=10.0 - z timeout
|
|
print(" Testing ZZ1, C=10.0")
|
|
|
|
def train_zz1_model():
|
|
feature_map_zz1 = ZZFeatureMap(feature_dimension=X_train_reduced.shape[1], reps=1)
|
|
try:
|
|
# Używamy quantum_instance zamiast backend
|
|
quantum_instance = qiskit.utils.QuantumInstance(
|
|
backend=get_quantum_backend(),
|
|
shots=QUANTUM_SHOTS
|
|
)
|
|
quantum_kernel_zz1 = create_quantum_kernel(feature_map_zz1, quantum_instance)
|
|
except (TypeError, AttributeError):
|
|
# Dla starszych wersji Qiskit
|
|
backend = get_quantum_backend()
|
|
backend.shots = QUANTUM_SHOTS
|
|
quantum_kernel_zz1 = create_quantum_kernel(feature_map_zz1, backend)
|
|
|
|
# Używamy quantum_kernel bezpośrednio z QSVC
|
|
qsvm_zz1 = QSVC(quantum_kernel=quantum_kernel_zz1, C=10.0)
|
|
|
|
# Trenuj model
|
|
qsvm_zz1.fit(X_train_reduced, y_train)
|
|
|
|
# Testuj model
|
|
zz1_acc = qsvm_zz1.score(X_test_reduced, y_test)
|
|
return zz1_acc
|
|
|
|
result = safe_quantum_operation(train_zz1_model, QUANTUM_TIMEOUT)
|
|
if result is not None:
|
|
quantum_results['ZZ1_C10'] = result
|
|
print(f" Accuracy: {result:.4f}")
|
|
else:
|
|
quantum_results['ZZ1_C10'] = 0.0
|
|
print(f" Pominięte z powodu błędu/timeout")
|
|
|
|
# Konfiguracja 2: ZZ2, C=0.1 - z timeout
|
|
print(" Testing ZZ2, C=0.1")
|
|
|
|
def train_zz2_model():
|
|
feature_map_zz2 = ZZFeatureMap(feature_dimension=X_train_reduced.shape[1], reps=2)
|
|
try:
|
|
# Używamy quantum_instance zamiast backend
|
|
quantum_instance = qiskit.utils.QuantumInstance(
|
|
backend=get_quantum_backend(),
|
|
shots=QUANTUM_SHOTS
|
|
)
|
|
quantum_kernel_zz2 = create_quantum_kernel(feature_map_zz2, quantum_instance)
|
|
except (TypeError, AttributeError):
|
|
# Dla starszych wersji Qiskit
|
|
backend = get_quantum_backend()
|
|
backend.shots = QUANTUM_SHOTS
|
|
quantum_kernel_zz2 = create_quantum_kernel(feature_map_zz2, backend)
|
|
|
|
# Używamy quantum_kernel bezpośrednio z QSVC
|
|
qsvm_zz2 = QSVC(quantum_kernel=quantum_kernel_zz2, C=0.1)
|
|
|
|
# Trenuj model
|
|
qsvm_zz2.fit(X_train_reduced, y_train)
|
|
|
|
# Testuj model
|
|
zz2_acc = qsvm_zz2.score(X_test_reduced, y_test)
|
|
return zz2_acc
|
|
|
|
result = safe_quantum_operation(train_zz2_model, QUANTUM_TIMEOUT)
|
|
if result is not None:
|
|
quantum_results['ZZ2_C01'] = result
|
|
print(f" Accuracy: {result:.4f}")
|
|
else:
|
|
quantum_results['ZZ2_C01'] = 0.0
|
|
print(f" Pominięte z powodu błędu/timeout")
|
|
|
|
# Konfiguracja 3: Pauli1, C=10.0 - z timeout
|
|
print(" Testing Pauli1, C=10.0")
|
|
|
|
def train_pauli1_model():
|
|
feature_map_pauli1 = PauliFeatureMap(feature_dimension=X_train_reduced.shape[1], reps=1)
|
|
try:
|
|
# Używamy quantum_instance zamiast backend
|
|
quantum_instance = qiskit.utils.QuantumInstance(
|
|
backend=get_quantum_backend(),
|
|
shots=QUANTUM_SHOTS
|
|
)
|
|
quantum_kernel_pauli1 = create_quantum_kernel(feature_map_pauli1, quantum_instance)
|
|
except (TypeError, AttributeError):
|
|
# Dla starszych wersji Qiskit
|
|
backend = get_quantum_backend()
|
|
backend.shots = QUANTUM_SHOTS
|
|
quantum_kernel_pauli1 = create_quantum_kernel(feature_map_pauli1, backend)
|
|
|
|
# Używamy quantum_kernel bezpośrednio z QSVC
|
|
qsvm_pauli1 = QSVC(quantum_kernel=quantum_kernel_pauli1, C=10.0)
|
|
|
|
# Trenuj model
|
|
qsvm_pauli1.fit(X_train_reduced, y_train)
|
|
|
|
# Testuj model
|
|
pauli1_acc = qsvm_pauli1.score(X_test_reduced, y_test)
|
|
return pauli1_acc
|
|
|
|
result = safe_quantum_operation(train_pauli1_model, QUANTUM_TIMEOUT)
|
|
if result is not None:
|
|
quantum_results['Pauli1_C10'] = result
|
|
print(f" Accuracy: {result:.4f}")
|
|
else:
|
|
quantum_results['Pauli1_C10'] = 0.0
|
|
print(f" Pominięte z powodu błędu/timeout")
|
|
|
|
# Znajdź najlepszy model kwantowy
|
|
best_quantum_acc = max(quantum_results.values()) if quantum_results else 0.0
|
|
best_quantum_model = max(quantum_results.items(), key=lambda x: x[1])[0] if quantum_results else "None"
|
|
|
|
results[name] = {
|
|
'linear_svm': linear_acc,
|
|
'rbf_svm': rbf_acc,
|
|
'best_quantum_svm': best_quantum_acc,
|
|
'best_quantum_model': best_quantum_model,
|
|
'ZZ1_C10': quantum_results.get('ZZ1_C10', 0.0),
|
|
'ZZ2_C01': quantum_results.get('ZZ2_C01', 0.0),
|
|
'Pauli1_C10': quantum_results.get('Pauli1_C10', 0.0),
|
|
'sample_size': sample_size
|
|
}
|
|
|
|
print(f" Linear SVM: {linear_acc:.4f}")
|
|
print(f" RBF SVM: {rbf_acc:.4f}")
|
|
print(f" Best Quantum SVM: {best_quantum_acc:.4f} ({best_quantum_model})")
|
|
|
|
return results
|
|
|
|
def experiment_feature_mappings(X_train, X_test, y_train, y_test):
|
|
"""
|
|
Testuje różne mapowania cech kwantowych i analizuje ich wpływ na wydajność.
|
|
|
|
Parameters:
|
|
-----------
|
|
X_train, X_test : array
|
|
Dane treningowe i testowe po redukcji wymiarowości
|
|
y_train, y_test : array
|
|
Etykiety treningowe i testowe
|
|
|
|
Returns:
|
|
--------
|
|
dict
|
|
Słownik z wynikami dla każdego mapowania cech
|
|
"""
|
|
# Normalizacja danych do zakresu [0, 2π]
|
|
X_train_normalized = X_train.copy()
|
|
X_test_normalized = X_test.copy()
|
|
|
|
for i in range(X_train.shape[1]):
|
|
min_val = min(X_train[:, i].min(), X_test[:, i].min())
|
|
max_val = max(X_train[:, i].max(), X_test[:, i].max())
|
|
|
|
if max_val > min_val:
|
|
X_train_normalized[:, i] = 2 * np.pi * (X_train[:, i] - min_val) / (max_val - min_val)
|
|
X_test_normalized[:, i] = 2 * np.pi * (X_test[:, i] - min_val) / (max_val - min_val)
|
|
|
|
# Funkcja do tworzenia Z-FeatureMap
|
|
def z_feature_map(feature_dimension, reps=1):
|
|
"""
|
|
Tworzy Z-FeatureMap - prostą mapę cech opartą na bramkach Z.
|
|
|
|
Parameters:
|
|
-----------
|
|
feature_dimension : int
|
|
Liczba wymiarów wektora cech (liczba kubitów)
|
|
reps : int
|
|
Liczba powtórzeń obwodu
|
|
|
|
Returns:
|
|
--------
|
|
QuantumCircuit
|
|
Obwód kwantowy implementujący Z-FeatureMap
|
|
"""
|
|
from qiskit import QuantumCircuit
|
|
from qiskit.circuit import Parameter, ParameterVector
|
|
|
|
qc = QuantumCircuit(feature_dimension)
|
|
|
|
# Parametry dla każdego kubitu
|
|
params = ParameterVector(f'x', feature_dimension)
|
|
|
|
# Powtórz obwód określoną liczbę razy
|
|
for _ in range(reps):
|
|
# Hadamard na wszystkich kubitach
|
|
for i in range(feature_dimension):
|
|
qc.h(i)
|
|
|
|
# Rotacje Z na każdym kubicie
|
|
for i in range(feature_dimension):
|
|
qc.rz(params[i], i)
|
|
|
|
return qc
|
|
|
|
# Przygotuj wszystkie możliwe mapowania cech
|
|
all_feature_maps = [
|
|
{'name': 'ZZ1_C10', 'base_name': 'ZZ1', 'map': ZZFeatureMap(feature_dimension=X_train.shape[1], reps=1), 'C': 10.0},
|
|
{'name': 'ZZ2_C01', 'base_name': 'ZZ2', 'map': ZZFeatureMap(feature_dimension=X_train.shape[1], reps=2), 'C': 0.1},
|
|
{'name': 'Pauli1_C10', 'base_name': 'Pauli1', 'map': PauliFeatureMap(feature_dimension=X_train.shape[1], reps=1), 'C': 10.0},
|
|
{'name': 'ZZ1_C1', 'base_name': 'ZZ1', 'map': ZZFeatureMap(feature_dimension=X_train.shape[1], reps=1), 'C': 1.0},
|
|
{'name': 'ZZ2_C1', 'base_name': 'ZZ2', 'map': ZZFeatureMap(feature_dimension=X_train.shape[1], reps=2), 'C': 1.0},
|
|
{'name': 'Pauli1_C1', 'base_name': 'Pauli1', 'map': PauliFeatureMap(feature_dimension=X_train.shape[1], reps=1), 'C': 1.0},
|
|
{'name': 'Pauli2_C1', 'base_name': 'Pauli2', 'map': PauliFeatureMap(feature_dimension=X_train.shape[1], reps=2), 'C': 1.0},
|
|
# Dodaj nowe mapy cech Z-FeatureMap
|
|
{'name': 'Z1_C01', 'base_name': 'Z1', 'map': z_feature_map(feature_dimension=X_train.shape[1], reps=1), 'C': 0.1},
|
|
{'name': 'Z1_C1', 'base_name': 'Z1', 'map': z_feature_map(feature_dimension=X_train.shape[1], reps=1), 'C': 1.0},
|
|
{'name': 'Z1_C10', 'base_name': 'Z1', 'map': z_feature_map(feature_dimension=X_train.shape[1], reps=1), 'C': 10.0},
|
|
{'name': 'Z2_C1', 'base_name': 'Z2', 'map': z_feature_map(feature_dimension=X_train.shape[1], reps=2), 'C': 1.0},
|
|
{'name': 'Z2_C01', 'base_name': 'Z2', 'map': z_feature_map(feature_dimension=X_train.shape[1], reps=2), 'C': 0.1},
|
|
{'name': 'Z2_C10', 'base_name': 'Z2', 'map': z_feature_map(feature_dimension=X_train.shape[1], reps=2), 'C': 10.0},
|
|
# Dodaj nowe mapy cech ZZ-FeatureMap (nowa implementacja)
|
|
{'name': 'ZZ1_new_C1', 'base_name': 'ZZ1_new', 'map': zz_feature_map(feature_dimension=X_train.shape[1], reps=1), 'C': 1.0},
|
|
{'name': 'ZZ2_new_C1', 'base_name': 'ZZ2_new', 'map': zz_feature_map(feature_dimension=X_train.shape[1], reps=2), 'C': 1.0},
|
|
# Dodaj mapy cech SU2
|
|
{'name': 'SU2_1_C1', 'base_name': 'SU2_1', 'map': EfficientSU2(num_qubits=X_train.shape[1], entanglement='linear', reps=1), 'C': 1.0},
|
|
{'name': 'SU2_2_C1', 'base_name': 'SU2_2', 'map': EfficientSU2(num_qubits=X_train.shape[1], entanglement='linear', reps=2), 'C': 1.0},
|
|
{'name': 'SU2_full_C1', 'base_name': 'SU2_full', 'map': EfficientSU2(num_qubits=X_train.shape[1], entanglement='full', reps=1), 'C': 1.0},
|
|
# Dodaj oficjalną implementację ZFeatureMap
|
|
{'name': 'ZFeatureMap1_C1', 'base_name': 'ZFeatureMap1', 'map': ZFeatureMap(feature_dimension=X_train.shape[1], reps=1), 'C': 1.0},
|
|
{'name': 'ZFeatureMap2_C1', 'base_name': 'ZFeatureMap2', 'map': ZFeatureMap(feature_dimension=X_train.shape[1], reps=2), 'C': 1.0},
|
|
# Dodaj kodowanie amplitudowe
|
|
{'name': 'Amplitude_l2_C1', 'base_name': 'Amplitude_l2', 'map': None, 'C': 1.0, 'amplitude_encoding': True, 'normalization': 'l2'},
|
|
{'name': 'Amplitude_l1_C1', 'base_name': 'Amplitude_l1', 'map': None, 'C': 1.0, 'amplitude_encoding': True, 'normalization': 'l1'},
|
|
{'name': 'Amplitude_min-max_C1', 'base_name': 'Amplitude_min-max', 'map': None, 'C': 1.0, 'amplitude_encoding': True, 'normalization': 'min-max'}
|
|
]
|
|
|
|
# Filtruj mapy cech na podstawie ustawień FEATURE_MAPS_ENABLED
|
|
feature_maps = []
|
|
for fm in all_feature_maps:
|
|
if fm['base_name'] in FEATURE_MAPS_ENABLED and FEATURE_MAPS_ENABLED[fm['base_name']]:
|
|
feature_maps.append(fm)
|
|
|
|
print(f"Aktywne mapy cech: {[fm['name'] for fm in feature_maps]}")
|
|
|
|
results = {}
|
|
|
|
# Dla porównania, trenuj klasyczny SVM z jądrem RBF
|
|
svm_rbf = SVC(kernel='rbf')
|
|
svm_rbf.fit(X_train, y_train)
|
|
rbf_acc = svm_rbf.score(X_test, y_test)
|
|
print(f"Dokładność klasycznego SVM (RBF): {rbf_acc:.4f}")
|
|
|
|
# Testuj każde mapowanie cech - SEKWENCYJNIE z timeout
|
|
for i, fm in enumerate(feature_maps):
|
|
print(f"\nTestowanie mapowania: {fm['name']} ({i+1}/{len(feature_maps)})")
|
|
|
|
def train_feature_mapping_model():
|
|
# Utwórz jądro kwantowe
|
|
try:
|
|
# Używamy quantum_instance zamiast backend
|
|
quantum_instance = qiskit.utils.QuantumInstance(
|
|
backend=get_quantum_backend(),
|
|
shots=QUANTUM_SHOTS
|
|
)
|
|
quantum_kernel = create_quantum_kernel(fm['map'], quantum_instance)
|
|
except (TypeError, AttributeError):
|
|
# Dla starszych wersji Qiskit
|
|
backend = get_quantum_backend()
|
|
backend.shots = QUANTUM_SHOTS
|
|
quantum_kernel = create_quantum_kernel(fm['map'], backend)
|
|
|
|
# Używamy quantum_kernel bezpośrednio z QSVC
|
|
qsvm = QSVC(quantum_kernel=quantum_kernel, C=fm['C'])
|
|
|
|
# Trenuj model
|
|
qsvm.fit(X_train_normalized, y_train)
|
|
|
|
# Testuj model
|
|
quantum_acc = qsvm.score(X_test_normalized, y_test)
|
|
|
|
# Oblicz macierz jądra na niewielkiej próbce
|
|
sample_size = min(20, len(X_train_normalized)) # Zmniejszone z 50 na 20
|
|
sample_indices = np.random.choice(len(X_train_normalized), sample_size, replace=False)
|
|
|
|
try:
|
|
kernel_matrix = quantum_kernel.evaluate(
|
|
X_train_normalized[sample_indices],
|
|
X_train_normalized[sample_indices]
|
|
)
|
|
# Weź tylko część rzeczywistą macierzy jądra dla analizy
|
|
kernel_matrix = np.real(kernel_matrix)
|
|
except Exception as e:
|
|
print(f" Błąd podczas obliczania macierzy jądra: {str(e)}")
|
|
kernel_matrix = np.eye(sample_size) # Macierz jednostkowa jako fallback
|
|
|
|
# Analizuj właściwości macierzy jądra
|
|
kernel_properties = {
|
|
'mean': np.mean(kernel_matrix),
|
|
'std': np.std(kernel_matrix),
|
|
'min': np.min(kernel_matrix),
|
|
'max': np.max(kernel_matrix)
|
|
}
|
|
|
|
return quantum_acc, kernel_properties
|
|
|
|
# Wykonaj z timeout
|
|
result = safe_quantum_operation(train_feature_mapping_model, QUANTUM_TIMEOUT)
|
|
|
|
if result is not None:
|
|
quantum_acc, kernel_properties = result
|
|
results[fm['name']] = {
|
|
'accuracy': quantum_acc,
|
|
'kernel_properties': kernel_properties,
|
|
'C': fm['C']
|
|
}
|
|
|
|
print(f" Dokładność: {quantum_acc:.4f}")
|
|
print(f" Właściwości macierzy jądra: śr={kernel_properties['mean']:.3f}, "
|
|
f"std={kernel_properties['std']:.3f}, min={kernel_properties['min']:.3f}, "
|
|
f"max={kernel_properties['max']:.3f}")
|
|
else:
|
|
results[fm['name']] = {
|
|
'accuracy': 0.0,
|
|
'kernel_properties': {'mean': 0, 'std': 0, 'min': 0, 'max': 0},
|
|
'C': fm['C']
|
|
}
|
|
print(f" Pominięte z powodu błędu/timeout")
|
|
|
|
# Porównaj z klasycznym SVM
|
|
quantum_accuracies = [results[fm['name']]['accuracy'] for fm in feature_maps]
|
|
if quantum_accuracies:
|
|
best_quantum_acc = max(quantum_accuracies)
|
|
best_map = feature_maps[np.argmax(quantum_accuracies)]['name']
|
|
|
|
print(f"\nNajlepsze mapowanie kwantowe: {best_map} (dokładność: {best_quantum_acc:.4f})")
|
|
print(f"Różnica względem klasycznego SVM: {best_quantum_acc - rbf_acc:.4f}")
|
|
else:
|
|
print("\nNie udało się wytrenować żadnego modelu kwantowego.")
|
|
|
|
return results
|
|
|
|
# Dodaj funkcje do kodowania amplitudowego
|
|
def prepare_data_for_amplitude_encoding(X, normalization='l2'):
|
|
"""
|
|
Przygotowuje dane do kodowania amplitudowego.
|
|
|
|
Args:
|
|
X: Dane wejściowe
|
|
normalization: Typ normalizacji ('l2', 'l1', lub 'min-max')
|
|
|
|
Returns:
|
|
Przygotowane dane
|
|
"""
|
|
X_prepared = X.copy()
|
|
|
|
# Normalizacja danych
|
|
if normalization == 'l2':
|
|
# Normalizacja L2 (suma kwadratów = 1)
|
|
norms = np.linalg.norm(X_prepared, axis=1, ord=2)
|
|
# Unikaj dzielenia przez zero
|
|
norms[norms == 0] = 1.0
|
|
X_prepared = X_prepared / norms[:, np.newaxis]
|
|
elif normalization == 'l1':
|
|
# Normalizacja L1 (suma wartości bezwzględnych = 1)
|
|
norms = np.linalg.norm(X_prepared, axis=1, ord=1)
|
|
# Unikaj dzielenia przez zero
|
|
norms[norms == 0] = 1.0
|
|
X_prepared = X_prepared / norms[:, np.newaxis]
|
|
elif normalization == 'min-max':
|
|
# Normalizacja min-max (wartości między 0 a 1)
|
|
for i in range(X_prepared.shape[0]):
|
|
min_val = np.min(X_prepared[i])
|
|
max_val = np.max(X_prepared[i])
|
|
if max_val > min_val:
|
|
X_prepared[i] = (X_prepared[i] - min_val) / (max_val - min_val)
|
|
# W przypadku gdy wszystkie wartości są równe
|
|
else:
|
|
X_prepared[i] = np.zeros_like(X_prepared[i])
|
|
# Dodatkowa normalizacja L2
|
|
norms = np.linalg.norm(X_prepared, axis=1, ord=2)
|
|
# Unikaj dzielenia przez zero
|
|
norms[norms == 0] = 1.0
|
|
X_prepared = X_prepared / norms[:, np.newaxis]
|
|
|
|
return X_prepared
|
|
|
|
def amplitude_kernel(x1, x2):
|
|
"""
|
|
Oblicza wartość jądra kwantowego dla dwóch wektorów przy użyciu kodowania amplitudowego.
|
|
|
|
Args:
|
|
x1, x2: Wektory cech
|
|
|
|
Returns:
|
|
Wartość jądra kwantowego
|
|
"""
|
|
# Iloczyn skalarny kwadrat (odpowiada nakładaniu się stanów kwantowych)
|
|
return np.abs(np.dot(x1, x2.conj()))**2
|
|
|
|
class AmplitudeKernel:
|
|
"""
|
|
Klasa implementująca jądro kwantowe z kodowaniem amplitudowym.
|
|
"""
|
|
def __init__(self, feature_dimension, normalization='l2'):
|
|
"""
|
|
Inicjalizacja jądra kwantowego z kodowaniem amplitudowym.
|
|
|
|
Parameters:
|
|
-----------
|
|
feature_dimension : int
|
|
Liczba wymiarów wektora cech
|
|
normalization : str
|
|
Typ normalizacji ('l2', 'l1', lub 'min-max')
|
|
"""
|
|
self.feature_dimension = feature_dimension
|
|
self.normalization = normalization
|
|
|
|
def evaluate(self, x1_vec, x2_vec):
|
|
"""
|
|
Oblicza macierz jądra kwantowego dla dwóch zbiorów wektorów.
|
|
|
|
Parameters:
|
|
-----------
|
|
x1_vec, x2_vec : ndarray
|
|
Zbiory wektorów cech
|
|
|
|
Returns:
|
|
--------
|
|
ndarray
|
|
Macierz jądra kwantowego
|
|
"""
|
|
# Przygotowanie danych
|
|
x1_prepared = prepare_data_for_amplitude_encoding(x1_vec, self.normalization)
|
|
x2_prepared = prepare_data_for_amplitude_encoding(x2_vec, self.normalization)
|
|
|
|
# Obliczenie macierzy jądra
|
|
kernel_matrix = np.zeros((x1_vec.shape[0], x2_vec.shape[0]))
|
|
for i in range(x1_vec.shape[0]):
|
|
for j in range(x2_vec.shape[0]):
|
|
kernel_matrix[i, j] = amplitude_kernel(x1_prepared[i], x2_prepared[j])
|
|
|
|
return kernel_matrix
|
|
|
|
def evaluate_lists(self, x1_vec, x2_vec):
|
|
"""
|
|
Oblicza macierz jądra kwantowego dla dwóch list wektorów.
|
|
|
|
Parameters:
|
|
-----------
|
|
x1_vec, x2_vec : list
|
|
Listy wektorów cech
|
|
|
|
Returns:
|
|
--------
|
|
ndarray
|
|
Macierz jądra kwantowego
|
|
"""
|
|
return self.evaluate(np.array(x1_vec), np.array(x2_vec))
|
|
|
|
# Główna funkcja eksperymentalna
|
|
def run_experiments():
|
|
# Przekierowanie wyjścia do pliku i konsoli
|
|
sys.stdout = Logger(OUTPUT_FILE)
|
|
|
|
# Informacje o środowisku
|
|
print("======= INFORMACJE O ŚRODOWISKU =======")
|
|
print(f"Data i czas rozpoczęcia: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"Wersja NumPy: {np.__version__}")
|
|
print(f"Wersja Pandas: {pd.__version__}")
|
|
print(f"Wersja scikit-learn: {sklearn.__version__}")
|
|
print(f"Wersja Qiskit: {qiskit.__version__}")
|
|
try:
|
|
import qiskit_machine_learning
|
|
print(f"Wersja qiskit-machine-learning: {qiskit_machine_learning.__version__}")
|
|
except:
|
|
print("Nie można określić wersji qiskit-machine-learning")
|
|
# print(f"Wersja dimod: {dimod.__version__}") # Nie używane
|
|
# print(f"Wersja neal: {neal.__version__}") # Nie używane
|
|
|
|
# Informacje o optymalizacjach
|
|
print("\n======= OPTYMALIZACJE BEZPIECZEŃSTWA =======")
|
|
print(f"Timeout operacji kwantowych: {QUANTUM_TIMEOUT} sekund")
|
|
print(f"Liczba shots: {QUANTUM_SHOTS}")
|
|
print(f"Maksymalny wymiar cech: {MAX_FEATURE_DIMENSION}")
|
|
print(f"Równoległe przetwarzanie: {'TAK' if USE_PARALLEL else 'NIE'}")
|
|
print(f"Interwał checkpoint: {CHECKPOINT_INTERVAL} iteracji")
|
|
|
|
# Wyświetl informacje o wybranych eksperymentach
|
|
print("\n======= WYBRANE EKSPERYMENTY =======")
|
|
print(f"Eksperyment 1 (Wpływ złożoności danych): {'TAK' if RUN_COMPLEXITY_EXPERIMENT else 'NIE'}")
|
|
print(f"Eksperyment 2 (Podzbiory genów): {'TAK' if RUN_GENE_SUBSETS_EXPERIMENT else 'NIE'}")
|
|
print(f"Eksperyment 3 (Mapowania cech): {'TAK' if RUN_FEATURE_MAPPINGS_EXPERIMENT else 'NIE'}")
|
|
|
|
# Wczytanie danych
|
|
print("\n======= WCZYTYWANIE DANYCH =======")
|
|
data_load_start_time = time.time()
|
|
data = pd.read_csv(DATA_FILE)
|
|
data_load_end_time = time.time()
|
|
data_load_time = data_load_end_time - data_load_start_time
|
|
print(f"Czas wczytywania danych: {data_load_time:.2f} sekund")
|
|
print(f"Wymiary danych: {data.shape}")
|
|
|
|
# Sprawdź strukturę danych
|
|
print("\n======= STRUKTURA DANYCH =======")
|
|
print(f"Kolumny w zbiorze danych: {data.columns.tolist()[:5]}...")
|
|
print(f"Typy danych w kolumnach:\n{data.dtypes.head()}")
|
|
print(f"Przykładowe dane:\n{data.head(2)}")
|
|
|
|
# Sprawdź wartości w kolumnie 'Grade'
|
|
print(f"\nUnikalne wartości w kolumnie 'Primary_Diagnosis': {data['Primary_Diagnosis'].unique()}")
|
|
print(f"Liczba próbek dla każdej klasy:\n{data['Primary_Diagnosis'].value_counts()}")
|
|
|
|
# Przygotowanie danych do eksperymentów
|
|
print("\n======= PRZYGOTOWANIE DANYCH DO EKSPERYMENTÓW =======")
|
|
gene_cols = [col for col in data.columns if col not in
|
|
['Grade', 'Project', 'Case_ID', 'Gender', 'Age_at_diagnosis',
|
|
'Primary_Diagnosis', 'Race']]
|
|
|
|
# Sprawdź typ danych w kolumnach genów
|
|
print(f"Przykładowe wartości w kolumnach genów: {data[gene_cols].iloc[0].values[:5]}")
|
|
|
|
# Konwertuj dane tekstowe na binarne (0/1)
|
|
X = data[gene_cols].copy()
|
|
for col in X.columns:
|
|
if X[col].dtype == 'object': # Jeśli kolumna zawiera dane tekstowe
|
|
# Zamień 'MUTATED' na 1, a wszystko inne na 0
|
|
X[col] = X[col].apply(lambda x: 1 if str(x).upper() == 'MUTATED' else 0)
|
|
|
|
# Użyj zmiennej docelowej określonej w konfiguracji
|
|
y = data[TARGET_VARIABLE]
|
|
|
|
# Wyświetl informacje o zmiennej docelowej
|
|
print(f"\nWybrana zmienna docelowa: {TARGET_VARIABLE}")
|
|
print(f"Unikalne wartości w kolumnie '{TARGET_VARIABLE}': {y.unique()}")
|
|
print(f"Liczba próbek dla każdej klasy:\n{y.value_counts()}")
|
|
|
|
# Oblicz liczbę mutacji dla każdego przypadku
|
|
mutation_counts = X.sum(axis=1)
|
|
print(f"Średnia liczba mutacji na przypadek: {mutation_counts.mean():.2f}")
|
|
print(f"Mediana liczby mutacji: {mutation_counts.median():.2f}")
|
|
print(f"Min/Max liczby mutacji: {mutation_counts.min():.0f}/{mutation_counts.max():.0f}")
|
|
|
|
# Inicjalizacja zmiennych do podsumowania
|
|
complexity_results = None
|
|
gene_subsets_results = None
|
|
feature_mappings_results = None
|
|
complexity_time = 0
|
|
gene_subsets_time = 0
|
|
feature_mappings_time = 0
|
|
|
|
# Eksperyment 1: Wpływ złożoności danych
|
|
if RUN_COMPLEXITY_EXPERIMENT:
|
|
print("\n======= EKSPERYMENT 1: WPŁYW ZŁOŻONOŚCI DANYCH =======")
|
|
print("UWAGA: Ten eksperyment może być bardzo czasochłonny!")
|
|
complexity_start_time = time.time()
|
|
complexity_results = experiment_complexity_impact(X, y, mutation_counts)
|
|
complexity_end_time = time.time()
|
|
complexity_time = complexity_end_time - complexity_start_time
|
|
print(f"\nCzas wykonania eksperymentu złożoności: {complexity_time:.2f} sekund ({complexity_time/60:.2f} minut)")
|
|
|
|
# Zapisz wyniki do pliku CSV
|
|
complexity_df = pd.DataFrame(complexity_results).T
|
|
complexity_df.to_csv(os.path.join(OUTPUT_DIR, 'wyniki_eksperymentu_zlozonosc.csv'))
|
|
print(f"Wyniki zapisane do pliku: {os.path.join(OUTPUT_DIR, 'wyniki_eksperymentu_zlozonosc.csv')}")
|
|
else:
|
|
print("\n======= EKSPERYMENT 1: POMINIĘTY (zbyt czasochłonny) =======")
|
|
|
|
# Eksperyment 2: Podzbiory genów
|
|
if RUN_GENE_SUBSETS_EXPERIMENT:
|
|
print("\n======= EKSPERYMENT 2: PODZBIORY GENÓW =======")
|
|
print("Używając uproszczonych map cech z timeout'ami...")
|
|
gene_subsets_start_time = time.time()
|
|
gene_subsets_results = experiment_gene_subsets(data)
|
|
gene_subsets_end_time = time.time()
|
|
gene_subsets_time = gene_subsets_end_time - gene_subsets_start_time
|
|
print(f"\nCzas wykonania eksperymentu podzbiorów genów: {gene_subsets_time:.2f} sekund ({gene_subsets_time/60:.2f} minut)")
|
|
|
|
# Zapisz wyniki do pliku CSV
|
|
gene_subsets_df = pd.DataFrame(gene_subsets_results).T
|
|
gene_subsets_df.to_csv(os.path.join(OUTPUT_DIR, 'wyniki_eksperymentu_geny.csv'))
|
|
print(f"Wyniki zapisane do pliku: {os.path.join(OUTPUT_DIR, 'wyniki_eksperymentu_geny.csv')}")
|
|
else:
|
|
print("\n======= EKSPERYMENT 2: POMINIĘTY =======")
|
|
|
|
# Eksperyment 3: Mapowania cech
|
|
if RUN_FEATURE_MAPPINGS_EXPERIMENT:
|
|
print("\n======= EKSPERYMENT 3: MAPOWANIA CECH =======")
|
|
print("UWAGA: Ten eksperyment może być bardzo czasochłonny!")
|
|
# Przygotuj dane do eksperymentu mapowania cech
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE)
|
|
scaler = StandardScaler()
|
|
X_train_scaled = scaler.fit_transform(X_train)
|
|
X_test_scaled = scaler.transform(X_test)
|
|
max_components = min(PCA_COMPONENTS, MAX_FEATURE_DIMENSION, X_train_scaled.shape[1])
|
|
pca = PCA(n_components=max_components)
|
|
X_train_reduced = pca.fit_transform(X_train_scaled)
|
|
X_test_reduced = pca.transform(X_test_scaled)
|
|
|
|
print(f"Redukcja wymiarowości: {X_train_scaled.shape[1]} -> {max_components} wymiarów")
|
|
|
|
feature_mappings_start_time = time.time()
|
|
feature_mappings_results = experiment_feature_mappings(X_train_reduced, X_test_reduced, y_train, y_test)
|
|
feature_mappings_end_time = time.time()
|
|
feature_mappings_time = feature_mappings_end_time - feature_mappings_start_time
|
|
print(f"\nCzas wykonania eksperymentu mapowania cech: {feature_mappings_time:.2f} sekund ({feature_mappings_time/60:.2f} minut)")
|
|
|
|
# Zapisz wyniki do pliku CSV
|
|
feature_mappings_df = pd.DataFrame({k: v['accuracy'] for k, v in feature_mappings_results.items()}, index=['accuracy']).T
|
|
feature_mappings_df['mean_kernel'] = [v['kernel_properties']['mean'] for k, v in feature_mappings_results.items()]
|
|
feature_mappings_df['std_kernel'] = [v['kernel_properties']['std'] for k, v in feature_mappings_results.items()]
|
|
feature_mappings_df.to_csv(os.path.join(OUTPUT_DIR, 'wyniki_eksperymentu_mapowania.csv'))
|
|
print(f"Wyniki zapisane do pliku: {os.path.join(OUTPUT_DIR, 'wyniki_eksperymentu_mapowania.csv')}")
|
|
else:
|
|
print("\n======= EKSPERYMENT 3: POMINIĘTY (problematyczny) =======")
|
|
|
|
# Podsumowanie eksperymentów
|
|
print("\n======= PODSUMOWANIE EKSPERYMENTÓW =======")
|
|
total_time = complexity_time + gene_subsets_time + feature_mappings_time
|
|
print(f"Całkowity czas wykonania eksperymentów: {total_time:.2f} sekund ({total_time/60:.2f} minut)")
|
|
|
|
if complexity_results:
|
|
print("\nWyniki eksperymentu złożoności danych:")
|
|
complexity_df = pd.DataFrame(complexity_results).T
|
|
print(complexity_df)
|
|
|
|
if gene_subsets_results:
|
|
print("\nWyniki eksperymentu podzbiorów genów:")
|
|
gene_subsets_df = pd.DataFrame(gene_subsets_results).T
|
|
print(gene_subsets_df)
|
|
|
|
if feature_mappings_results:
|
|
print("\nWyniki eksperymentu mapowania cech:")
|
|
feature_mappings_df = pd.DataFrame({k: v['accuracy'] for k, v in feature_mappings_results.items()}, index=['accuracy']).T
|
|
print(feature_mappings_df)
|
|
|
|
print(f"\nWszystkie wyniki zostały zapisane w katalogu: {OUTPUT_DIR}")
|
|
|
|
# Zamknięcie pliku wyjściowego
|
|
if hasattr(sys.stdout, 'log') and not sys.stdout.log.closed:
|
|
sys.stdout.log.close()
|
|
sys.stdout = sys.__stdout__
|
|
print(f"Wyniki zostały zapisane do pliku: {OUTPUT_FILE}")
|
|
|
|
# Uruchomienie eksperymentów
|
|
if __name__ == "__main__":
|
|
run_experiments() |