Upload files to "/"

This commit is contained in:
Michał Kasprowicz 2025-08-21 06:32:39 +00:00
parent fd8e9a8936
commit b507fdc131
5 changed files with 1668 additions and 0 deletions

436
qsvm.py Normal file
View File

@ -0,0 +1,436 @@
import numpy as np
import pandas as pd
import os
import sys
import time
from datetime import datetime
import json
import gc
import signal
import multiprocessing
from multiprocessing import Pool
# Import bibliotek kwantowych
from qiskit import Aer
from qiskit.circuit.library import ZZFeatureMap, PauliFeatureMap, EfficientSU2
from qiskit_machine_learning.kernels import QuantumKernel
from qiskit_machine_learning.algorithms import QSVC
import dimod
# Dodanie zalecanego zamiennika dla ZZFeatureMap
from qiskit.circuit import QuantumCircuit, Parameter
from qiskit.circuit.library.data_preparation import ZFeatureMap
# Dodanie bibliotek do kodowania amplitudowego
from qiskit.circuit import QuantumCircuit
from qiskit.extensions import Initialize
import scipy.linalg as la
# Dodanie biblioteki UMAP
import umap
# Maksymalny czas trwania eksperymentu
MAX_EXECUTION_TIME = 24 * 60 * 60 * 7 * 3
def timeout_handler(signum, frame):
print("\n\n======= PRZEKROCZONO MAKSYMALNY CZAS WYKONANIA =======")
print(f"Eksperyment został przerwany po {MAX_EXECUTION_TIME/3600:.1f} godzinach.")
sys.exit(1)
# Ustawienie obsługi sygnału
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(MAX_EXECUTION_TIME)
# ----------------- CZĘŚĆ 0: PARAMETRY KONFIGURACYJNE -----------------
# Parametry danych
DATA_FILES = [
'dane/TCGA_GBM_LGG_Mutations_all.csv',
'dane/zaszumione_rozszerzone/TCGA_GBM_LGG_Mutations_noise_1percent_added.csv',
'dane/zaszumione_rozszerzone/TCGA_GBM_LGG_Mutations_noise_5percent_added.csv',
'dane/zaszumione_rozszerzone/TCGA_GBM_LGG_Mutations_noise_10percent_added.csv',
'dane/zaszumione_rozszerzone/TCGA_GBM_LGG_Mutations_noise_15percent_added.csv',
'dane/zaszumione_rozszerzone/TCGA_GBM_LGG_Mutations_noise_20percent_added.csv',
'dane/zaszumione/TCGA_GBM_LGG_Mutations_noise_1percent_substituted.csv',
'dane/zaszumione/TCGA_GBM_LGG_Mutations_noise_5percent_substituted.csv',
'dane/zaszumione/TCGA_GBM_LGG_Mutations_noise_10percent_substituted.csv',
'dane/zaszumione/TCGA_GBM_LGG_Mutations_noise_15percent_substituted.csv',
'dane/zaszumione/TCGA_GBM_LGG_Mutations_noise_20percent_substituted.csv'
]
TEST_SIZE = 0.3
RANDOM_STATE = 42
# Parametry redukcji wymiarowości
USE_PCA = True
USE_TSNE = False
USE_UMAP = False
PCA_COMPONENTS = 14
TSNE_COMPONENTS = 3
TSNE_PERPLEXITY = 100
TSNE_LEARNING_RATE = 50
TSNE_MAX_ITER = 1000
UMAP_COMPONENTS = 14
UMAP_NEIGHBORS = 15
UMAP_MIN_DIST = 0.8
UMAP_METRIC = 'euclidean'
EVALUATE_SILHOUETTE = False
OPTIMAL_SILHOUETTE_SCORE = 0.1964
# Wybór eksperymentów do przeprowadzenia
RUN_CLASSIC_SVM = True
RUN_QUANTUM_SVM = True
RUN_HYBRID_APPROACH = True
# Parametry klasycznego SVM
SVM_PARAM_GRID = {
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.1, 0.01],
'kernel': ['linear', 'rbf', 'poly']
}
SVM_CV = 5
# Parametry kwantowego SVM
BACKEND_NAME = 'qasm_simulator'
C_VALUES = [0.1, 1.0, 10.0]
QSVM_CV = 10
# Parametry wyżarzania kwantowego
NUM_READS = 100
QUBO_PENALTY = 10.0
# Parametry analizy cech
IMPORTANCE_THRESHOLD = 0.01
# Parametry wyjściowe
OUTPUT_DIR = f'wyniki/2025-08-04-dim_reduction-{QSVM_CV}-fold'
# Parametry IBM Quantum Cloud
USE_IBM_QUANTUM = False
IBM_BACKEND = 'qasm_simulator'
IBM_REAL_BACKEND = 'qasm_simulator'
IBM_TOKEN = None
IBM_INSTANCE = None
IBM_MAX_SHOTS = 1024
IBM_OPTIMIZATION_LEVEL = 1
IBM_RESILIENCE_LEVEL = 1
# Upewnij się, że katalog wyjściowy istnieje
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# Przekierowanie wyjścia do pliku i konsoli
class Logger:
def __init__(self, filename):
self.terminal = sys.stdout
self.log = open(filename, 'w', encoding='utf-8')
def write(self, message):
self.terminal.write(message)
self.log.write(message)
self.log.flush()
def flush(self):
self.terminal.flush()
self.log.flush()
def close(self):
self.log.close()
# Funkcja do zapisywania szczegółowych metryk
def save_metrics(y_true, y_pred, model_name):
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)
try:
roc_auc = roc_auc_score(y_true, y_pred)
except:
roc_auc = "N/A"
print(f"\nSzczegółowe metryki dla modelu {model_name}:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc}")
cm = confusion_matrix(y_true, y_pred)
print("\nMacierz pomyłek:")
print(cm)
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'roc_auc': roc_auc,
'confusion_matrix': cm.tolist()
}
# Funkcja do zapisywania wyników pośrednich
def save_results_cache(results_dict, cache_file):
with open(cache_file, 'w') as f:
json.dump(results_dict, f)
# Funkcja do wczytywania wyników pośrednich
def load_results_cache(cache_file):
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
return json.load(f)
except:
print("Nie udało się wczytać pliku cache. Tworzenie nowego.")
return {
'quantum_results': [],
'quantum_times': {},
'completed_feature_maps': [],
'hybrid_scores': {},
'hybrid_eval_times': {}
}
# Funkcja do inicjalizacji lokalnego symulatora
def initialize_ibm_quantum():
print("\n======= INICJALIZACJA LOKALNEGO SYMULATORA =======")
print("IBM Quantum Cloud wyłączone - używanie lokalnego symulatora")
try:
backend = Aer.get_backend('qasm_simulator')
print("✓ Zainicjalizowano lokalny symulator Qiskit Aer")
print(f"✓ Backend: {backend.name}")
return None, backend, True
except Exception as e:
print(f"BŁĄD podczas inicjalizacji lokalnego symulatora: {str(e)}")
return None, None, False
# Funkcja do przygotowania danych
def prepare_data(data_file):
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics import silhouette_score
print(f"\n======= PRZYGOTOWANIE DANYCH =======")
start_time_data = time.time()
# Wczytanie danych
data = pd.read_csv(data_file)
print(f"Wymiary oryginalnych danych: {data.shape}")
# Wydzielenie zmiennej docelowej
y = data['Primary_Diagnosis']
print(f"Unikalne wartości zmiennej docelowej: {y.unique()}")
print(f"Rozkład klas: {y.value_counts().to_dict()}")
# Usunięcie kolumn identyfikacyjnych
id_columns = ['Project', 'Case_ID']
data_processed = data.drop(id_columns + ['Grade'], axis=1)
print(f"Wymiary danych po usunięciu kolumn identyfikacyjnych: {data_processed.shape}")
# Przekształcenie kolumn kategorycznych na binarne
print("Przekształcanie kolumn kategorycznych na binarne...")
categorical_columns = data_processed.select_dtypes(include=['object']).columns.tolist()
print(f"Kolumny kategoryczne do przekształcenia: {categorical_columns}")
# Przekształcenie każdej kolumny kategorycznej
for col in categorical_columns:
unique_values = data_processed[col].unique()
print(f"Kolumna {col} zawiera wartości: {unique_values}")
if len(unique_values) == 2:
if set(unique_values) == {'Yes', 'No'}:
data_processed[col] = data_processed[col].map({'Yes': 1, 'No': 0})
elif set(unique_values) == {'Male', 'Female'}:
data_processed[col] = data_processed[col].map({'Male': 1, 'Female': 0})
elif set(unique_values) == {'GBM', 'LGG'}:
data_processed[col] = data_processed[col].map({'GBM': 1, 'LGG': 0})
else:
data_processed[col] = pd.factorize(data_processed[col])[0]
else:
try:
data_processed[col] = pd.to_numeric(data_processed[col], errors='raise')
print(f"Kolumna {col} została przekształcona na liczbową.")
except:
print(f"Kolumna {col} zostanie zakodowana jako one-hot.")
dummies = pd.get_dummies(data_processed[col], prefix=col, drop_first=True)
data_processed = pd.concat([data_processed, dummies], axis=1)
data_processed.drop(col, axis=1, inplace=True)
# Sprawdzenie, czy wszystkie dane są numeryczne
non_numeric = data_processed.select_dtypes(include=['object']).columns.tolist()
if non_numeric:
print(f"UWAGA: Pozostały kolumny nieliczbowe: {non_numeric}")
print("Usuwanie pozostałych kolumn nieliczbowych...")
data_processed = data_processed.drop(non_numeric, axis=1)
print(f"Wymiary danych po przekształceniu: {data_processed.shape}")
# Wypełnienie brakujących wartości
if data_processed.isnull().sum().sum() > 0:
print("Wypełnianie brakujących wartości...")
data_processed.fillna(data_processed.mean(), inplace=True)
# Przygotowanie danych do modelowania
X = data_processed.values
print(f"Wymiary danych wejściowych: {X.shape}")
# Przetwarzanie cech
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
print(f"Wymiary danych po skalowaniu: {X_scaled.shape}")
# Redukcja wymiarowości
print("\n======= REDUKCJA WYMIAROWOŚCI =======")
# Przygotowanie różnych wersji zredukowanych danych
X_reduced_versions = {}
if USE_PCA:
pca_components = min(PCA_COMPONENTS, X_scaled.shape[1])
print(f"Liczba komponentów PCA: {pca_components}")
pca = PCA(n_components=pca_components)
X_reduced_pca = pca.fit_transform(X_scaled)
print(f"Wymiary danych po redukcji PCA: {X_reduced_pca.shape}")
print(f"Wyjaśniona wariancja: {sum(pca.explained_variance_ratio_):.4f}")
X_reduced_versions['pca'] = X_reduced_pca
if USE_TSNE:
print(f"\nStosowanie t-SNE na danych skalowanych...")
tsne = TSNE(
n_components=TSNE_COMPONENTS,
perplexity=TSNE_PERPLEXITY,
learning_rate=TSNE_LEARNING_RATE,
n_iter=TSNE_MAX_ITER,
random_state=RANDOM_STATE
)
tsne_start_time = time.time()
X_reduced_tsne = tsne.fit_transform(X_scaled)
tsne_end_time = time.time()
tsne_time = tsne_end_time - tsne_start_time
print(f"Wymiary danych po redukcji t-SNE: {X_reduced_tsne.shape}")
print(f"Czas wykonania t-SNE: {tsne_time:.2f} sekund")
if EVALUATE_SILHOUETTE:
try:
silhouette_avg = silhouette_score(X_reduced_tsne, y)
print(f"Silhouette Score dla t-SNE: {silhouette_avg:.4f}")
except Exception as e:
print(f"Nie udało się obliczyć Silhouette Score: {str(e)}")
X_reduced_versions['tsne'] = X_reduced_tsne
if USE_UMAP:
print(f"\nStosowanie UMAP na danych skalowanych...")
umap_reducer = umap.UMAP(
n_components=UMAP_COMPONENTS,
n_neighbors=UMAP_NEIGHBORS,
min_dist=UMAP_MIN_DIST,
metric=UMAP_METRIC,
random_state=RANDOM_STATE
)
umap_start_time = time.time()
X_reduced_umap = umap_reducer.fit_transform(X_scaled)
umap_end_time = time.time()
umap_time = umap_end_time - umap_start_time
print(f"Wymiary danych po redukcji UMAP: {X_reduced_umap.shape}")
print(f"Czas wykonania UMAP: {umap_time:.2f} sekund")
if EVALUATE_SILHOUETTE:
try:
silhouette_avg_umap = silhouette_score(X_reduced_umap, y)
print(f"Silhouette Score dla UMAP: {silhouette_avg_umap:.4f}")
except Exception as e:
print(f"Nie udało się obliczyć Silhouette Score dla UMAP: {str(e)}")
X_reduced_versions['umap'] = X_reduced_umap
# Wybór domyślnej wersji zredukowanych danych
if 'pca' in X_reduced_versions:
X_reduced = X_reduced_versions['pca']
print("Używanie danych zredukowanych przez PCA jako domyślnych.")
elif 'umap' in X_reduced_versions:
X_reduced = X_reduced_versions['umap']
print("Używanie danych zredukowanych przez UMAP jako domyślnych.")
elif 'tsne' in X_reduced_versions:
X_reduced = X_reduced_versions['tsne']
print("Używanie danych zredukowanych przez t-SNE jako domyślnych.")
else:
X_reduced = X_scaled
print("Nie wybrano żadnej metody redukcji wymiarowości. Używanie oryginalnych danych skalowanych.")
# Podział na zbiory treningowy i testowy
X_train_reduced, X_test_reduced, y_train, y_test = train_test_split(
X_reduced, y, test_size=TEST_SIZE, random_state=RANDOM_STATE
)
# Podział dla oryginalnych danych
X_train, X_test, _, _ = train_test_split(X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE)
print(f"Wymiary zbioru treningowego: {X_train_reduced.shape}")
print(f"Wymiary zbioru testowego: {X_test_reduced.shape}")
end_time_data = time.time()
data_preparation_time = end_time_data - start_time_data
print(f"\nCzas przygotowania danych: {data_preparation_time:.2f} sekund")
return {
'X_train': X_train,
'X_test': X_test,
'X_train_reduced': X_train_reduced,
'X_test_reduced': X_test_reduced,
'y_train': y_train,
'y_test': y_test,
'data_processed': data_processed,
'preparation_time': data_preparation_time
}
def run_experiment_parallel(exp_file):
"""Uruchom pojedynczy eksperyment"""
if exp_file == 'qsvm1_zz.py':
import qsvm1_zz
return qsvm1_zz.run_experiment()
elif exp_file == 'qsvm2_pauli.py':
import qsvm2_pauli
return qsvm2_pauli.run_experiment()
elif exp_file == 'qsvm3_z.py':
import qsvm3_z
return qsvm3_z.run_experiment()
elif exp_file == 'qsvm4_amplitude.py':
import qsvm4_amplitude
return qsvm4_amplitude.run_experiment()
elif exp_file == 'qsvm5_hybrid.py':
import qsvm5_hybrid
return qsvm5_hybrid.run_experiment()
else:
print(f"Nieznany eksperyment: {exp_file}")
return None
def run_all_experiments_parallel():
"""Uruchom wszystkie eksperymenty równolegle"""
experiment_files = [
'qsvm1_zz.py',
'qsvm2_pauli.py',
'qsvm3_z.py',
'qsvm4_amplitude.py',
'qsvm5_hybrid.py'
]
# Użyj 5 procesów (jeden na eksperyment)
with Pool(5) as p:
results = p.map(run_experiment_parallel, experiment_files)
return results
if __name__ == "__main__":
print("======= KONTROLER EKSPERYMENTÓW QSVM =======")
print(f"CPU cores: {multiprocessing.cpu_count()}")
print(f"Uruchamianie 5 eksperymentów równolegle...")
results = run_all_experiments_parallel()
print("======= WSZYSTKIE EKSPERYMENTY ZAKOŃCZONE =======")

316
qsvm1_zz.py Normal file
View File

@ -0,0 +1,316 @@
import numpy as np
import pandas as pd
import os
import sys
import time
from datetime import datetime
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
from sklearn.inspection import permutation_importance
from sklearn.decomposition import PCA
import json
import gc
# Import bibliotek kwantowych
from qiskit import Aer
from qiskit.circuit.library import ZZFeatureMap
from qiskit_machine_learning.kernels import QuantumKernel
from qiskit_machine_learning.algorithms import QSVC
# Import funkcji z głównego modułu
import qsvm
def run_experiment():
"""
Eksperyment 1: ZZ1 i ZZ2 Feature Maps
Testuje klasyczny SVM i kwantowy SVM z mapami cech ZZ1 i ZZ2
"""
print("======= EKSPERYMENT 1: ZZ1 i ZZ2 FEATURE MAPS =======")
# Konfiguracja eksperymentu
FEATURE_MAPS = {
'ZZ1': {'reps': 1, 'enabled': True},
'ZZ2': {'reps': 2, 'enabled': True}
}
# Dla każdego pliku danych
for data_file in qsvm.DATA_FILES:
if not os.path.exists(data_file):
print(f"Pominięto {data_file} - plik nie istnieje")
continue
print(f"\n======= PRZETWARZANIE PLIKU: {data_file} =======")
# Utwórz nazwę pliku wyjściowego
file_base_name = os.path.basename(data_file).split('.')[0]
output_file = os.path.join(qsvm.OUTPUT_DIR, f'wyniki_zz_{file_base_name}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
# Utwórz plik cache
cache_file = os.path.join(qsvm.OUTPUT_DIR, f'qsvm_zz_cache_{file_base_name}.json')
# Przekierowanie wyjścia
logger = qsvm.Logger(output_file)
sys.stdout = logger
try:
# Przygotowanie danych
data_dict = qsvm.prepare_data(data_file)
X_train = data_dict['X_train']
X_test = data_dict['X_test']
X_train_reduced = data_dict['X_train_reduced']
X_test_reduced = data_dict['X_test_reduced']
y_train = data_dict['y_train']
y_test = data_dict['y_test']
data_processed = data_dict['data_processed']
# Inicjalizacja backendu
ibm_service, ibm_backend, ibm_success = qsvm.initialize_ibm_quantum()
# ----------------- KLASYCZNY SVM -----------------
if qsvm.RUN_CLASSIC_SVM:
print("\n======= KLASYCZNY SVM (BASELINE) =======")
start_time_classic = time.time()
# Trenowanie modelu
grid = GridSearchCV(SVC(), qsvm.SVM_PARAM_GRID, cv=qsvm.SVM_CV, scoring='accuracy')
grid.fit(X_train, y_train)
print("Najlepsze parametry klasycznego SVM:", grid.best_params_)
print("Dokładność klasycznego SVM:", grid.best_score_)
# Ewaluacja modelu
classic_pred = grid.predict(X_test)
print("Raport klasyfikacji (klasyczny SVM):")
print(classification_report(y_test, classic_pred, zero_division=0))
# Zapisz szczegółowe metryki
classic_metrics = qsvm.save_metrics(y_test, classic_pred, "Klasyczny SVM")
end_time_classic = time.time()
classic_svm_time = end_time_classic - start_time_classic
print(f"\nCzas trenowania i ewaluacji klasycznego SVM: {classic_svm_time:.2f} sekund")
else:
print("\n======= KLASYCZNY SVM (BASELINE) - POMINIĘTY =======")
classic_svm_time = 0
classic_metrics = None
# ----------------- KWANTOWY SVM -----------------
if qsvm.RUN_QUANTUM_SVM:
print("\n======= KWANTOWY SVM Z ZZ FEATURE MAPS =======")
start_time_quantum = time.time()
# Wczytaj cache
cache = qsvm.load_results_cache(cache_file)
quantum_results = cache.get('quantum_results', [])
# Tworzenie map cech
feature_maps = []
feature_dimension = X_train_reduced.shape[1]
for name, config in FEATURE_MAPS.items():
if config['enabled']:
feature_map = ZZFeatureMap(feature_dimension=feature_dimension, reps=config['reps'])
feature_maps.append({'name': name, 'map': feature_map})
print(f"Testowanie {len(feature_maps)} map cech: {[fm['name'] for fm in feature_maps]}")
# Testowanie każdej mapy cech
for fm in feature_maps:
for C in qsvm.C_VALUES:
# Sprawdź cache
already_tested = False
for name, c_val, _ in quantum_results:
if name == fm['name'] and c_val == C:
already_tested = True
break
if already_tested:
print(f"Pomijanie już przetestowanej kombinacji: {fm['name']}, C={C}")
continue
fm_start_time = time.time()
try:
print(f"Testowanie {fm['name']} z C={C}...")
# Debugowanie danych
print(f" Wymiary danych: X_train_reduced {X_train_reduced.shape}")
print(f" Sprawdzenie NaN: {np.isnan(X_train_reduced).sum()}")
print(f" Sprawdzenie inf: {np.isinf(X_train_reduced).sum()}")
print(f" Zakres danych: [{X_train_reduced.min():.4f}, {X_train_reduced.max():.4f}]")
# Utworzenie quantum kernel z debugowaniem
quantum_kernel = QuantumKernel(
feature_map=fm['map'],
quantum_instance=ibm_backend
)
# Test quantum kernel
print(f" Testowanie quantum kernel...")
try:
test_kernel = quantum_kernel.evaluate(X_train_reduced[:2], X_train_reduced[:2])
print(f" Test kernel shape: {test_kernel.shape}")
print(f" Test kernel range: [{test_kernel.min():.4f}, {test_kernel.max():.4f}]")
if np.isnan(test_kernel).any() or np.isinf(test_kernel).any():
print(f" BŁĄD: Kernel zawiera NaN lub inf!")
continue
except Exception as e:
print(f" BŁĄD testowania kernel: {str(e)}")
continue
# Utworzenie SVM z niestandardowym jądrem i debugowaniem
def custom_kernel(X, Y):
try:
kernel_matrix = quantum_kernel.evaluate(X, Y)
# Sprawdź czy kernel jest poprawny
if np.isnan(kernel_matrix).any() or np.isinf(kernel_matrix).any():
print(f" BŁĄD: Kernel matrix zawiera NaN lub inf!")
return np.eye(len(X), len(Y)) # Fallback
return kernel_matrix
except Exception as e:
print(f" BŁĄD kernel evaluation: {str(e)}")
return np.eye(len(X), len(Y)) # Fallback
qsvm_model = SVC(kernel=custom_kernel, C=C, random_state=qsvm.RANDOM_STATE)
# Walidacja krzyżowa z debugowaniem
cv_start_time = time.time()
scores = []
kf = KFold(n_splits=qsvm.QSVM_CV, shuffle=True, random_state=qsvm.RANDOM_STATE)
for fold, (train_idx, val_idx) in enumerate(kf.split(X_train_reduced)):
X_cv_train, X_cv_val = X_train_reduced[train_idx], X_train_reduced[val_idx]
y_cv_train, y_cv_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
print(f" Fold {fold+1}/{qsvm.QSVM_CV}: train {X_cv_train.shape}, val {X_cv_val.shape}")
try:
qsvm_model.fit(X_cv_train, y_cv_train)
score = qsvm_model.score(X_cv_val, y_cv_val)
scores.append(score)
print(f" Fold {fold+1} score: {score:.4f}")
except Exception as e:
print(f" BŁĄD fold {fold+1}: {str(e)}")
scores.append(0.0) # Fallback
if len(scores) > 0:
mean_score = np.mean(scores)
std_score = np.std(scores)
print(f" Wszystkie scores: {scores}")
print(f" Mean score: {mean_score:.4f} ± {std_score:.4f}")
else:
mean_score = 0.0
print(f" BŁĄD: Brak poprawnych scores!")
cv_end_time = time.time()
cv_time = cv_end_time - cv_start_time
quantum_results.append((fm['name'], C, mean_score))
fm_end_time = time.time()
fm_time = fm_end_time - fm_start_time
print(f"Dokładność kwantowego SVM z {fm['name']}, C={C}: {mean_score:.4f} (czas: {fm_time:.2f} s)")
# Zapisz wyniki pośrednie
cache['quantum_results'] = quantum_results
qsvm.save_results_cache(cache, cache_file)
except Exception as e:
print(f"BŁĄD dla {fm['name']}, C={C}: {str(e)}")
# Dodaj fallback wynik
quantum_results.append((fm['name'], C, 0.0))
cache['quantum_results'] = quantum_results
qsvm.save_results_cache(cache, cache_file)
continue
# Znajdź najlepszy model kwantowy
if quantum_results:
best_qsvm = max(quantum_results, key=lambda x: x[2])
print(f"\nNajlepszy kwantowy SVM: {best_qsvm[0]} z C={best_qsvm[1]}, dokładność: {best_qsvm[2]:.4f}")
# Ewaluacja najlepszego modelu
best_feature_map = None
for fm in feature_maps:
if fm['name'] == best_qsvm[0]:
best_feature_map = fm['map']
break
if best_feature_map:
quantum_kernel_best = QuantumKernel(
feature_map=best_feature_map,
quantum_instance=ibm_backend
)
qsvm_best = SVC(kernel=quantum_kernel_best.evaluate, C=best_qsvm[1])
qsvm_best.fit(X_train_reduced, y_train)
quantum_pred = qsvm_best.predict(X_test_reduced)
print("Raport klasyfikacji (najlepszy kwantowy SVM):")
print(classification_report(y_test, quantum_pred, zero_division=0))
quantum_metrics = qsvm.save_metrics(y_test, quantum_pred, f"Kwantowy SVM {best_qsvm[0]}")
else:
print("Nie udało się wytrenować żadnego modelu kwantowego.")
quantum_metrics = None
end_time_quantum = time.time()
quantum_svm_time = end_time_quantum - start_time_quantum
print(f"\nCałkowity czas dla kwantowego SVM: {quantum_svm_time:.2f} sekund")
else:
print("\n======= KWANTOWY SVM - POMINIĘTY =======")
quantum_svm_time = 0
quantum_metrics = None
# ----------------- ANALIZA WYNIKÓW -----------------
print("\n======= PORÓWNANIE WYNIKÓW =======")
if classic_metrics:
print(f"Klasyczny SVM: {classic_metrics['accuracy']:.4f}")
if quantum_metrics:
print(f"Kwantowy SVM: {quantum_metrics['accuracy']:.4f}")
# Analiza znaczenia cech (tylko dla klasycznego SVM)
if qsvm.RUN_CLASSIC_SVM and classic_metrics:
print("\n======= ANALIZA ZNACZENIA CECH =======")
importance_start_time = time.time()
result = permutation_importance(grid.best_estimator_, X_test, y_test, n_repeats=10, random_state=qsvm.RANDOM_STATE)
important_features = []
feature_columns = list(data_processed.columns)
for i in range(len(feature_columns)):
if result.importances_mean[i] > qsvm.IMPORTANCE_THRESHOLD:
important_features.append((feature_columns[i], result.importances_mean[i]))
print("Najważniejsze cechy dla klasyfikacji:")
for feature, importance in sorted(important_features, key=lambda x: x[1], reverse=True):
print(f" {feature}: {importance:.4f}")
importance_end_time = time.time()
importance_time = importance_end_time - importance_start_time
print(f"\nCzas analizy znaczenia cech: {importance_time:.2f} sekund")
# Podsumowanie
print("\n======= PODSUMOWANIE EKSPERYMENTU ZZ =======")
print(f"Data i czas zakończenia: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
total_time = time.time() - data_dict['preparation_time']
print(f"Całkowity czas eksperymentu: {total_time:.2f} sekund")
except Exception as e:
print(f"BŁĄD podczas przetwarzania {data_file}: {str(e)}")
finally:
# Zamknięcie pliku wyjściowego
logger.close()
sys.stdout = logger.terminal
# Czyszczenie pamięci
gc.collect()
print("\n======= EKSPERYMENT 1 ZAKOŃCZONY =======")
if __name__ == "__main__":
run_experiment()

316
qsvm2_pauli.py Normal file
View File

@ -0,0 +1,316 @@
import numpy as np
import pandas as pd
import os
import sys
import time
from datetime import datetime
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
from sklearn.inspection import permutation_importance
from sklearn.decomposition import PCA
import json
import gc
# Import bibliotek kwantowych
from qiskit import Aer
from qiskit.circuit.library import PauliFeatureMap
from qiskit_machine_learning.kernels import QuantumKernel
from qiskit_machine_learning.algorithms import QSVC
# Import funkcji z głównego modułu
import qsvm
def run_experiment():
"""
Eksperyment 2: Pauli1 i Pauli2 Feature Maps
Testuje klasyczny SVM i kwantowy SVM z mapami cech Pauli1 i Pauli2
"""
print("======= EKSPERYMENT 2: PAULI1 i PAULI2 FEATURE MAPS =======")
# Konfiguracja eksperymentu
FEATURE_MAPS = {
'Pauli1': {'reps': 1, 'enabled': True},
'Pauli2': {'reps': 2, 'enabled': True}
}
# Dla każdego pliku danych
for data_file in qsvm.DATA_FILES:
if not os.path.exists(data_file):
print(f"Pominięto {data_file} - plik nie istnieje")
continue
print(f"\n======= PRZETWARZANIE PLIKU: {data_file} =======")
# Utwórz nazwę pliku wyjściowego
file_base_name = os.path.basename(data_file).split('.')[0]
output_file = os.path.join(qsvm.OUTPUT_DIR, f'wyniki_pauli_{file_base_name}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
# Utwórz plik cache
cache_file = os.path.join(qsvm.OUTPUT_DIR, f'qsvm_pauli_cache_{file_base_name}.json')
# Przekierowanie wyjścia
logger = qsvm.Logger(output_file)
sys.stdout = logger
try:
# Przygotowanie danych
data_dict = qsvm.prepare_data(data_file)
X_train = data_dict['X_train']
X_test = data_dict['X_test']
X_train_reduced = data_dict['X_train_reduced']
X_test_reduced = data_dict['X_test_reduced']
y_train = data_dict['y_train']
y_test = data_dict['y_test']
data_processed = data_dict['data_processed']
# Inicjalizacja backendu
ibm_service, ibm_backend, ibm_success = qsvm.initialize_ibm_quantum()
# ----------------- KLASYCZNY SVM -----------------
if qsvm.RUN_CLASSIC_SVM:
print("\n======= KLASYCZNY SVM (BASELINE) =======")
start_time_classic = time.time()
# Trenowanie modelu
grid = GridSearchCV(SVC(), qsvm.SVM_PARAM_GRID, cv=qsvm.SVM_CV, scoring='accuracy')
grid.fit(X_train, y_train)
print("Najlepsze parametry klasycznego SVM:", grid.best_params_)
print("Dokładność klasycznego SVM:", grid.best_score_)
# Ewaluacja modelu
classic_pred = grid.predict(X_test)
print("Raport klasyfikacji (klasyczny SVM):")
print(classification_report(y_test, classic_pred, zero_division=0))
# Zapisz szczegółowe metryki
classic_metrics = qsvm.save_metrics(y_test, classic_pred, "Klasyczny SVM")
end_time_classic = time.time()
classic_svm_time = end_time_classic - start_time_classic
print(f"\nCzas trenowania i ewaluacji klasycznego SVM: {classic_svm_time:.2f} sekund")
else:
print("\n======= KLASYCZNY SVM (BASELINE) - POMINIĘTY =======")
classic_svm_time = 0
classic_metrics = None
# ----------------- KWANTOWY SVM -----------------
if qsvm.RUN_QUANTUM_SVM:
print("\n======= KWANTOWY SVM Z PAULI FEATURE MAPS =======")
start_time_quantum = time.time()
# Wczytaj cache
cache = qsvm.load_results_cache(cache_file)
quantum_results = cache.get('quantum_results', [])
# Tworzenie map cech
feature_maps = []
feature_dimension = X_train_reduced.shape[1]
for name, config in FEATURE_MAPS.items():
if config['enabled']:
feature_map = PauliFeatureMap(feature_dimension=feature_dimension, reps=config['reps'])
feature_maps.append({'name': name, 'map': feature_map})
print(f"Testowanie {len(feature_maps)} map cech: {[fm['name'] for fm in feature_maps]}")
# Testowanie każdej mapy cech
for fm in feature_maps:
for C in qsvm.C_VALUES:
# Sprawdź cache
already_tested = False
for name, c_val, _ in quantum_results:
if name == fm['name'] and c_val == C:
already_tested = True
break
if already_tested:
print(f"Pomijanie już przetestowanej kombinacji: {fm['name']}, C={C}")
continue
fm_start_time = time.time()
try:
print(f"Testowanie {fm['name']} z C={C}...")
# Debugowanie danych
print(f" Wymiary danych: X_train_reduced {X_train_reduced.shape}")
print(f" Sprawdzenie NaN: {np.isnan(X_train_reduced).sum()}")
print(f" Sprawdzenie inf: {np.isinf(X_train_reduced).sum()}")
print(f" Zakres danych: [{X_train_reduced.min():.4f}, {X_train_reduced.max():.4f}]")
# Utworzenie quantum kernel z debugowaniem
quantum_kernel = QuantumKernel(
feature_map=fm['map'],
quantum_instance=ibm_backend
)
# Test quantum kernel
print(f" Testowanie quantum kernel...")
try:
test_kernel = quantum_kernel.evaluate(X_train_reduced[:2], X_train_reduced[:2])
print(f" Test kernel shape: {test_kernel.shape}")
print(f" Test kernel range: [{test_kernel.min():.4f}, {test_kernel.max():.4f}]")
if np.isnan(test_kernel).any() or np.isinf(test_kernel).any():
print(f" BŁĄD: Kernel zawiera NaN lub inf!")
continue
except Exception as e:
print(f" BŁĄD testowania kernel: {str(e)}")
continue
# Utworzenie SVM z niestandardowym jądrem i debugowaniem
def custom_kernel(X, Y):
try:
kernel_matrix = quantum_kernel.evaluate(X, Y)
# Sprawdź czy kernel jest poprawny
if np.isnan(kernel_matrix).any() or np.isinf(kernel_matrix).any():
print(f" BŁĄD: Kernel matrix zawiera NaN lub inf!")
return np.eye(len(X), len(Y)) # Fallback
return kernel_matrix
except Exception as e:
print(f" BŁĄD kernel evaluation: {str(e)}")
return np.eye(len(X), len(Y)) # Fallback
qsvm_model = SVC(kernel=custom_kernel, C=C, random_state=qsvm.RANDOM_STATE)
# Walidacja krzyżowa z debugowaniem
cv_start_time = time.time()
scores = []
kf = KFold(n_splits=qsvm.QSVM_CV, shuffle=True, random_state=qsvm.RANDOM_STATE)
for fold, (train_idx, val_idx) in enumerate(kf.split(X_train_reduced)):
X_cv_train, X_cv_val = X_train_reduced[train_idx], X_train_reduced[val_idx]
y_cv_train, y_cv_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
print(f" Fold {fold+1}/{qsvm.QSVM_CV}: train {X_cv_train.shape}, val {X_cv_val.shape}")
try:
qsvm_model.fit(X_cv_train, y_cv_train)
score = qsvm_model.score(X_cv_val, y_cv_val)
scores.append(score)
print(f" Fold {fold+1} score: {score:.4f}")
except Exception as e:
print(f" BŁĄD fold {fold+1}: {str(e)}")
scores.append(0.0) # Fallback
if len(scores) > 0:
mean_score = np.mean(scores)
std_score = np.std(scores)
print(f" Wszystkie scores: {scores}")
print(f" Mean score: {mean_score:.4f} ± {std_score:.4f}")
else:
mean_score = 0.0
print(f" BŁĄD: Brak poprawnych scores!")
cv_end_time = time.time()
cv_time = cv_end_time - cv_start_time
quantum_results.append((fm['name'], C, mean_score))
fm_end_time = time.time()
fm_time = fm_end_time - fm_start_time
print(f"Dokładność kwantowego SVM z {fm['name']}, C={C}: {mean_score:.4f} (czas: {fm_time:.2f} s)")
# Zapisz wyniki pośrednie
cache['quantum_results'] = quantum_results
qsvm.save_results_cache(cache, cache_file)
except Exception as e:
print(f"BŁĄD dla {fm['name']}, C={C}: {str(e)}")
# Dodaj fallback wynik
quantum_results.append((fm['name'], C, 0.0))
cache['quantum_results'] = quantum_results
qsvm.save_results_cache(cache, cache_file)
continue
# Znajdź najlepszy model kwantowy
if quantum_results:
best_qsvm = max(quantum_results, key=lambda x: x[2])
print(f"\nNajlepszy kwantowy SVM: {best_qsvm[0]} z C={best_qsvm[1]}, dokładność: {best_qsvm[2]:.4f}")
# Ewaluacja najlepszego modelu
best_feature_map = None
for fm in feature_maps:
if fm['name'] == best_qsvm[0]:
best_feature_map = fm['map']
break
if best_feature_map:
quantum_kernel_best = QuantumKernel(
feature_map=best_feature_map,
quantum_instance=ibm_backend
)
qsvm_best = SVC(kernel=quantum_kernel_best.evaluate, C=best_qsvm[1])
qsvm_best.fit(X_train_reduced, y_train)
quantum_pred = qsvm_best.predict(X_test_reduced)
print("Raport klasyfikacji (najlepszy kwantowy SVM):")
print(classification_report(y_test, quantum_pred, zero_division=0))
quantum_metrics = qsvm.save_metrics(y_test, quantum_pred, f"Kwantowy SVM {best_qsvm[0]}")
else:
print("Nie udało się wytrenować żadnego modelu kwantowego.")
quantum_metrics = None
end_time_quantum = time.time()
quantum_svm_time = end_time_quantum - start_time_quantum
print(f"\nCałkowity czas dla kwantowego SVM: {quantum_svm_time:.2f} sekund")
else:
print("\n======= KWANTOWY SVM - POMINIĘTY =======")
quantum_svm_time = 0
quantum_metrics = None
# ----------------- ANALIZA WYNIKÓW -----------------
print("\n======= PORÓWNANIE WYNIKÓW =======")
if classic_metrics:
print(f"Klasyczny SVM: {classic_metrics['accuracy']:.4f}")
if quantum_metrics:
print(f"Kwantowy SVM: {quantum_metrics['accuracy']:.4f}")
# Analiza znaczenia cech (tylko dla klasycznego SVM)
if qsvm.RUN_CLASSIC_SVM and classic_metrics:
print("\n======= ANALIZA ZNACZENIA CECH =======")
importance_start_time = time.time()
result = permutation_importance(grid.best_estimator_, X_test, y_test, n_repeats=10, random_state=qsvm.RANDOM_STATE)
important_features = []
feature_columns = list(data_processed.columns)
for i in range(len(feature_columns)):
if result.importances_mean[i] > qsvm.IMPORTANCE_THRESHOLD:
important_features.append((feature_columns[i], result.importances_mean[i]))
print("Najważniejsze cechy dla klasyfikacji:")
for feature, importance in sorted(important_features, key=lambda x: x[1], reverse=True):
print(f" {feature}: {importance:.4f}")
importance_end_time = time.time()
importance_time = importance_end_time - importance_start_time
print(f"\nCzas analizy znaczenia cech: {importance_time:.2f} sekund")
# Podsumowanie
print("\n======= PODSUMOWANIE EKSPERYMENTU PAULI =======")
print(f"Data i czas zakończenia: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
total_time = time.time() - data_dict['preparation_time']
print(f"Całkowity czas eksperymentu: {total_time:.2f} sekund")
except Exception as e:
print(f"BŁĄD podczas przetwarzania {data_file}: {str(e)}")
finally:
# Zamknięcie pliku wyjściowego
logger.close()
sys.stdout = logger.terminal
# Czyszczenie pamięci
gc.collect()
print("\n======= EKSPERYMENT 2 ZAKOŃCZONY =======")
if __name__ == "__main__":
run_experiment()

269
qsvm3_z.py Normal file
View File

@ -0,0 +1,269 @@
import numpy as np
import pandas as pd
import os
import sys
import time
from datetime import datetime
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
from sklearn.inspection import permutation_importance
from sklearn.decomposition import PCA
import json
import gc
# Import bibliotek kwantowych
from qiskit import Aer
from qiskit.circuit.library.data_preparation import ZFeatureMap
from qiskit_machine_learning.kernels import QuantumKernel
from qiskit_machine_learning.algorithms import QSVC
# Import funkcji z głównego modułu
import qsvm
def run_experiment():
"""
Eksperyment 3: Z1 i Z2 Feature Maps
Testuje klasyczny SVM i kwantowy SVM z mapami cech Z1 i Z2
"""
print("======= EKSPERYMENT 3: Z1 i Z2 FEATURE MAPS =======")
# Konfiguracja eksperymentu
FEATURE_MAPS = {
'Z1': {'reps': 1, 'enabled': True},
'Z2': {'reps': 2, 'enabled': True}
}
# Dla każdego pliku danych
for data_file in qsvm.DATA_FILES:
if not os.path.exists(data_file):
print(f"Pominięto {data_file} - plik nie istnieje")
continue
print(f"\n======= PRZETWARZANIE PLIKU: {data_file} =======")
# Utwórz nazwę pliku wyjściowego
file_base_name = os.path.basename(data_file).split('.')[0]
output_file = os.path.join(qsvm.OUTPUT_DIR, f'wyniki_z_{file_base_name}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
# Utwórz plik cache
cache_file = os.path.join(qsvm.OUTPUT_DIR, f'qsvm_z_cache_{file_base_name}.json')
# Przekierowanie wyjścia
logger = qsvm.Logger(output_file)
sys.stdout = logger
try:
# Przygotowanie danych
data_dict = qsvm.prepare_data(data_file)
X_train = data_dict['X_train']
X_test = data_dict['X_test']
X_train_reduced = data_dict['X_train_reduced']
X_test_reduced = data_dict['X_test_reduced']
y_train = data_dict['y_train']
y_test = data_dict['y_test']
data_processed = data_dict['data_processed']
# Inicjalizacja backendu
ibm_service, ibm_backend, ibm_success = qsvm.initialize_ibm_quantum()
# ----------------- KLASYCZNY SVM -----------------
if qsvm.RUN_CLASSIC_SVM:
print("\n======= KLASYCZNY SVM (BASELINE) =======")
start_time_classic = time.time()
# Trenowanie modelu
grid = GridSearchCV(SVC(), qsvm.SVM_PARAM_GRID, cv=qsvm.SVM_CV, scoring='accuracy')
grid.fit(X_train, y_train)
print("Najlepsze parametry klasycznego SVM:", grid.best_params_)
print("Dokładność klasycznego SVM:", grid.best_score_)
# Ewaluacja modelu
classic_pred = grid.predict(X_test)
print("Raport klasyfikacji (klasyczny SVM):")
print(classification_report(y_test, classic_pred, zero_division=0))
# Zapisz szczegółowe metryki
classic_metrics = qsvm.save_metrics(y_test, classic_pred, "Klasyczny SVM")
end_time_classic = time.time()
classic_svm_time = end_time_classic - start_time_classic
print(f"\nCzas trenowania i ewaluacji klasycznego SVM: {classic_svm_time:.2f} sekund")
else:
print("\n======= KLASYCZNY SVM (BASELINE) - POMINIĘTY =======")
classic_svm_time = 0
classic_metrics = None
# ----------------- KWANTOWY SVM -----------------
if qsvm.RUN_QUANTUM_SVM:
print("\n======= KWANTOWY SVM Z Z FEATURE MAPS =======")
start_time_quantum = time.time()
# Wczytaj cache
cache = qsvm.load_results_cache(cache_file)
quantum_results = cache.get('quantum_results', [])
# Tworzenie map cech
feature_maps = []
feature_dimension = X_train_reduced.shape[1]
for name, config in FEATURE_MAPS.items():
if config['enabled']:
feature_map = ZFeatureMap(feature_dimension=feature_dimension, reps=config['reps'])
feature_maps.append({'name': name, 'map': feature_map})
print(f"Testowanie {len(feature_maps)} map cech: {[fm['name'] for fm in feature_maps]}")
# Testowanie każdej mapy cech
for fm in feature_maps:
for C in qsvm.C_VALUES:
# Sprawdź cache
already_tested = False
for name, c_val, _ in quantum_results:
if name == fm['name'] and c_val == C:
already_tested = True
break
if already_tested:
print(f"Pomijanie już przetestowanej kombinacji: {fm['name']}, C={C}")
continue
fm_start_time = time.time()
try:
print(f"Testowanie {fm['name']} z C={C}...")
# Utworzenie quantum kernel
quantum_kernel = QuantumKernel(
feature_map=fm['map'],
quantum_instance=ibm_backend
)
# Utworzenie SVM z niestandardowym jądrem
def custom_kernel(X, Y):
return quantum_kernel.evaluate(X, Y)
qsvm_model = SVC(kernel=custom_kernel, C=C)
# Walidacja krzyżowa
cv_start_time = time.time()
scores = []
kf = KFold(n_splits=qsvm.QSVM_CV, shuffle=True, random_state=qsvm.RANDOM_STATE)
for train_idx, val_idx in kf.split(X_train_reduced):
X_cv_train, X_cv_val = X_train_reduced[train_idx], X_train_reduced[val_idx]
y_cv_train, y_cv_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
qsvm_model.fit(X_cv_train, y_cv_train)
score = qsvm_model.score(X_cv_val, y_cv_val)
scores.append(score)
mean_score = np.mean(scores)
cv_end_time = time.time()
cv_time = cv_end_time - cv_start_time
quantum_results.append((fm['name'], C, mean_score))
fm_end_time = time.time()
fm_time = fm_end_time - fm_start_time
print(f"Dokładność kwantowego SVM z {fm['name']}, C={C}: {mean_score:.4f} (czas: {fm_time:.2f} s)")
# Zapisz wyniki pośrednie
cache['quantum_results'] = quantum_results
qsvm.save_results_cache(cache, cache_file)
except Exception as e:
print(f"Błąd dla {fm['name']}, C={C}: {str(e)}")
continue
# Znajdź najlepszy model kwantowy
if quantum_results:
best_qsvm = max(quantum_results, key=lambda x: x[2])
print(f"\nNajlepszy kwantowy SVM: {best_qsvm[0]} z C={best_qsvm[1]}, dokładność: {best_qsvm[2]:.4f}")
# Ewaluacja najlepszego modelu
best_feature_map = None
for fm in feature_maps:
if fm['name'] == best_qsvm[0]:
best_feature_map = fm['map']
break
if best_feature_map:
quantum_kernel_best = QuantumKernel(
feature_map=best_feature_map,
quantum_instance=ibm_backend
)
qsvm_best = SVC(kernel=quantum_kernel_best.evaluate, C=best_qsvm[1])
qsvm_best.fit(X_train_reduced, y_train)
quantum_pred = qsvm_best.predict(X_test_reduced)
print("Raport klasyfikacji (najlepszy kwantowy SVM):")
print(classification_report(y_test, quantum_pred, zero_division=0))
quantum_metrics = qsvm.save_metrics(y_test, quantum_pred, f"Kwantowy SVM {best_qsvm[0]}")
else:
print("Nie udało się wytrenować żadnego modelu kwantowego.")
quantum_metrics = None
end_time_quantum = time.time()
quantum_svm_time = end_time_quantum - start_time_quantum
print(f"\nCałkowity czas dla kwantowego SVM: {quantum_svm_time:.2f} sekund")
else:
print("\n======= KWANTOWY SVM - POMINIĘTY =======")
quantum_svm_time = 0
quantum_metrics = None
# ----------------- ANALIZA WYNIKÓW -----------------
print("\n======= PORÓWNANIE WYNIKÓW =======")
if classic_metrics:
print(f"Klasyczny SVM: {classic_metrics['accuracy']:.4f}")
if quantum_metrics:
print(f"Kwantowy SVM: {quantum_metrics['accuracy']:.4f}")
# Analiza znaczenia cech (tylko dla klasycznego SVM)
if qsvm.RUN_CLASSIC_SVM and classic_metrics:
print("\n======= ANALIZA ZNACZENIA CECH =======")
importance_start_time = time.time()
result = permutation_importance(grid.best_estimator_, X_test, y_test, n_repeats=10, random_state=qsvm.RANDOM_STATE)
important_features = []
feature_columns = list(data_processed.columns)
for i in range(len(feature_columns)):
if result.importances_mean[i] > qsvm.IMPORTANCE_THRESHOLD:
important_features.append((feature_columns[i], result.importances_mean[i]))
print("Najważniejsze cechy dla klasyfikacji:")
for feature, importance in sorted(important_features, key=lambda x: x[1], reverse=True):
print(f" {feature}: {importance:.4f}")
importance_end_time = time.time()
importance_time = importance_end_time - importance_start_time
print(f"\nCzas analizy znaczenia cech: {importance_time:.2f} sekund")
# Podsumowanie
print("\n======= PODSUMOWANIE EKSPERYMENTU Z =======")
print(f"Data i czas zakończenia: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
total_time = time.time() - data_dict['preparation_time']
print(f"Całkowity czas eksperymentu: {total_time:.2f} sekund")
except Exception as e:
print(f"BŁĄD podczas przetwarzania {data_file}: {str(e)}")
finally:
# Zamknięcie pliku wyjściowego
logger.close()
sys.stdout = logger.terminal
# Czyszczenie pamięci
gc.collect()
print("\n======= EKSPERYMENT 3 ZAKOŃCZONY =======")
if __name__ == "__main__":
run_experiment()

331
qsvm4_amplitude.py Normal file
View File

@ -0,0 +1,331 @@
import numpy as np
import pandas as pd
import os
import sys
import time
from datetime import datetime
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
from sklearn.inspection import permutation_importance
from sklearn.decomposition import PCA
import json
import gc
# Import bibliotek kwantowych
from qiskit import Aer
from qiskit.circuit.library import ZZFeatureMap
from qiskit_machine_learning.kernels import QuantumKernel
from qiskit_machine_learning.algorithms import QSVC
# Import funkcji z głównego modułu
import qsvm
# Funkcja do przygotowania danych dla kodowania amplitudowego
def prepare_data_for_amplitude_encoding(data, normalization='l2'):
"""
Przygotowuje dane dla kodowania amplitudowego z różnymi normalizacjami.
Args:
data: Dane wejściowe
normalization: Typ normalizacji ('l2', 'l1', 'min-max')
Returns:
Przygotowane dane
"""
if normalization == 'l2':
# Normalizacja L2
norms = np.linalg.norm(data, axis=1, ord=2)
norms[norms == 0] = 1.0
return data / norms[:, np.newaxis]
elif normalization == 'l1':
# Normalizacja L1
norms = np.linalg.norm(data, axis=1, ord=1)
norms[norms == 0] = 1.0
return data / norms[:, np.newaxis]
elif normalization == 'min-max':
# Normalizacja min-max
min_vals = np.min(data, axis=1, keepdims=True)
max_vals = np.max(data, axis=1, keepdims=True)
range_vals = max_vals - min_vals
range_vals[range_vals == 0] = 1.0
return (data - min_vals) / range_vals
else:
raise ValueError(f"Nieznana normalizacja: {normalization}")
# Funkcja jądra amplitudowego
def amplitude_kernel(x1, x2):
"""
Oblicza jądro amplitudowe między dwoma wektorami.
Args:
x1, x2: Wektory wejściowe
Returns:
Wartość jądra amplitudowego
"""
# Oblicz iloczyn skalarny
dot_product = np.dot(x1, x2)
# Jądro amplitudowe to kwadrat iloczynu skalarnego
return dot_product ** 2
# Klasa jądra amplitudowego
class AmplitudeKernel:
def __init__(self, feature_dimension, normalization='l2'):
self.feature_dimension = feature_dimension
self.normalization = normalization
def evaluate(self, x1_vec, x2_vec):
"""Oblicza macierz jądra amplitudowego"""
# Przygotowanie danych
x1_prepared = prepare_data_for_amplitude_encoding(x1_vec, self.normalization)
x2_prepared = prepare_data_for_amplitude_encoding(x2_vec, self.normalization)
# Obliczanie macierzy jądra
kernel_matrix = np.zeros((x1_prepared.shape[0], x2_prepared.shape[0]))
for i in range(x1_prepared.shape[0]):
for j in range(x2_prepared.shape[0]):
kernel_matrix[i, j] = amplitude_kernel(x1_prepared[i], x2_prepared[j])
return kernel_matrix
def run_experiment():
"""
Eksperyment 4: Amplitude Encoding
Testuje klasyczny SVM i kwantowy SVM z kodowaniem amplitudowym
"""
print("======= EKSPERYMENT 4: AMPLITUDE ENCODING =======")
# Konfiguracja eksperymentu
AMPLITUDE_NORMALIZATIONS = ['l2', 'l1', 'min-max']
# Dla każdego pliku danych
for data_file in qsvm.DATA_FILES:
if not os.path.exists(data_file):
print(f"Pominięto {data_file} - plik nie istnieje")
continue
print(f"\n======= PRZETWARZANIE PLIKU: {data_file} =======")
# Utwórz nazwę pliku wyjściowego
file_base_name = os.path.basename(data_file).split('.')[0]
output_file = os.path.join(qsvm.OUTPUT_DIR, f'wyniki_amplitude_{file_base_name}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
# Utwórz plik cache
cache_file = os.path.join(qsvm.OUTPUT_DIR, f'qsvm_amplitude_cache_{file_base_name}.json')
# Przekierowanie wyjścia
logger = qsvm.Logger(output_file)
sys.stdout = logger
try:
# Przygotowanie danych
data_dict = qsvm.prepare_data(data_file)
X_train = data_dict['X_train']
X_test = data_dict['X_test']
X_train_reduced = data_dict['X_train_reduced']
X_test_reduced = data_dict['X_test_reduced']
y_train = data_dict['y_train']
y_test = data_dict['y_test']
data_processed = data_dict['data_processed']
# Inicjalizacja backendu
ibm_service, ibm_backend, ibm_success = qsvm.initialize_ibm_quantum()
# ----------------- KLASYCZNY SVM -----------------
if qsvm.RUN_CLASSIC_SVM:
print("\n======= KLASYCZNY SVM (BASELINE) =======")
start_time_classic = time.time()
# Trenowanie modelu
grid = GridSearchCV(SVC(), qsvm.SVM_PARAM_GRID, cv=qsvm.SVM_CV, scoring='accuracy')
grid.fit(X_train, y_train)
print("Najlepsze parametry klasycznego SVM:", grid.best_params_)
print("Dokładność klasycznego SVM:", grid.best_score_)
# Ewaluacja modelu
classic_pred = grid.predict(X_test)
print("Raport klasyfikacji (klasyczny SVM):")
print(classification_report(y_test, classic_pred, zero_division=0))
# Zapisz szczegółowe metryki
classic_metrics = qsvm.save_metrics(y_test, classic_pred, "Klasyczny SVM")
end_time_classic = time.time()
classic_svm_time = end_time_classic - start_time_classic
print(f"\nCzas trenowania i ewaluacji klasycznego SVM: {classic_svm_time:.2f} sekund")
else:
print("\n======= KLASYCZNY SVM (BASELINE) - POMINIĘTY =======")
classic_svm_time = 0
classic_metrics = None
# ----------------- KWANTOWY SVM Z AMPLITUDE ENCODING -----------------
if qsvm.RUN_QUANTUM_SVM:
print("\n======= KWANTOWY SVM Z AMPLITUDE ENCODING =======")
start_time_quantum = time.time()
# Wczytaj cache
cache = qsvm.load_results_cache(cache_file)
quantum_results = cache.get('quantum_results', [])
# Testowanie każdej normalizacji
for normalization in AMPLITUDE_NORMALIZATIONS:
feature_map_name = f'Amplitude_{normalization}'
for C in qsvm.C_VALUES:
# Sprawdź cache
already_tested = False
for name, c_val, _ in quantum_results:
if name == feature_map_name and c_val == C:
already_tested = True
break
if already_tested:
print(f"Pomijanie już przetestowanej kombinacji: {feature_map_name}, C={C}")
continue
fm_start_time = time.time()
try:
print(f"Testowanie {feature_map_name} z C={C}...")
# Utworzenie jądra amplitudowego
amplitude_kernel_obj = AmplitudeKernel(
feature_dimension=X_train_reduced.shape[1],
normalization=normalization
)
# Utworzenie SVM z niestandardowym jądrem
def custom_kernel(X, Y):
return amplitude_kernel_obj.evaluate(X, Y)
qsvm_model = SVC(kernel=custom_kernel, C=C)
# Walidacja krzyżowa
cv_start_time = time.time()
scores = []
kf = KFold(n_splits=qsvm.QSVM_CV, shuffle=True, random_state=qsvm.RANDOM_STATE)
for train_idx, val_idx in kf.split(X_train_reduced):
X_cv_train, X_cv_val = X_train_reduced[train_idx], X_train_reduced[val_idx]
y_cv_train, y_cv_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
qsvm_model.fit(X_cv_train, y_cv_train)
score = qsvm_model.score(X_cv_val, y_cv_val)
scores.append(score)
mean_score = np.mean(scores)
cv_end_time = time.time()
cv_time = cv_end_time - cv_start_time
quantum_results.append((feature_map_name, C, mean_score))
fm_end_time = time.time()
fm_time = fm_end_time - fm_start_time
print(f"Dokładność kwantowego SVM z {feature_map_name}, C={C}: {mean_score:.4f} (czas: {fm_time:.2f} s)")
# Zapisz wyniki pośrednie
cache['quantum_results'] = quantum_results
qsvm.save_results_cache(cache, cache_file)
except Exception as e:
print(f"Błąd dla {feature_map_name}, C={C}: {str(e)}")
continue
# Znajdź najlepszy model kwantowy
if quantum_results:
best_qsvm = max(quantum_results, key=lambda x: x[2])
print(f"\nNajlepszy kwantowy SVM: {best_qsvm[0]} z C={best_qsvm[1]}, dokładność: {best_qsvm[2]:.4f}")
# Ewaluacja najlepszego modelu
best_normalization = best_qsvm[0].split('_')[1]
print(f"Ewaluacja najlepszego modelu z kodowaniem amplitudowym (normalizacja: {best_normalization})...")
# Utworzenie jądra kwantowego
amplitude_kernel_best = AmplitudeKernel(
feature_dimension=X_train_reduced.shape[1],
normalization=best_normalization
)
# Utworzenie klasyfikatora SVC z niestandardowym jądrem
def custom_kernel(X, Y):
return amplitude_kernel_best.evaluate(X, Y)
qsvm_best = SVC(kernel=custom_kernel, C=best_qsvm[1])
# Trenowanie modelu
qsvm_best.fit(X_train_reduced, y_train)
# Ewaluacja modelu
quantum_pred = qsvm_best.predict(X_test_reduced)
print(f"Raport klasyfikacji (najlepszy kwantowy SVM z kodowaniem amplitudowym, normalizacja: {best_normalization}):")
print(classification_report(y_test, quantum_pred, zero_division=0))
# Zapisz szczegółowe metryki
quantum_metrics = qsvm.save_metrics(y_test, quantum_pred, f"Kwantowy SVM z kodowaniem amplitudowym ({best_normalization})")
else:
print("Nie udało się wytrenować żadnego modelu kwantowego.")
quantum_metrics = None
end_time_quantum = time.time()
quantum_svm_time = end_time_quantum - start_time_quantum
print(f"\nCałkowity czas dla kwantowego SVM: {quantum_svm_time:.2f} sekund")
else:
print("\n======= KWANTOWY SVM - POMINIĘTY =======")
quantum_svm_time = 0
quantum_metrics = None
# ----------------- ANALIZA WYNIKÓW -----------------
print("\n======= PORÓWNANIE WYNIKÓW =======")
if classic_metrics:
print(f"Klasyczny SVM: {classic_metrics['accuracy']:.4f}")
if quantum_metrics:
print(f"Kwantowy SVM: {quantum_metrics['accuracy']:.4f}")
# Analiza znaczenia cech (tylko dla klasycznego SVM)
if qsvm.RUN_CLASSIC_SVM and classic_metrics:
print("\n======= ANALIZA ZNACZENIA CECH =======")
importance_start_time = time.time()
result = permutation_importance(grid.best_estimator_, X_test, y_test, n_repeats=10, random_state=qsvm.RANDOM_STATE)
important_features = []
feature_columns = list(data_processed.columns)
for i in range(len(feature_columns)):
if result.importances_mean[i] > qsvm.IMPORTANCE_THRESHOLD:
important_features.append((feature_columns[i], result.importances_mean[i]))
print("Najważniejsze cechy dla klasyfikacji:")
for feature, importance in sorted(important_features, key=lambda x: x[1], reverse=True):
print(f" {feature}: {importance:.4f}")
importance_end_time = time.time()
importance_time = importance_end_time - importance_start_time
print(f"\nCzas analizy znaczenia cech: {importance_time:.2f} sekund")
# Podsumowanie
print("\n======= PODSUMOWANIE EKSPERYMENTU AMPLITUDE =======")
print(f"Data i czas zakończenia: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
total_time = time.time() - data_dict['preparation_time']
print(f"Całkowity czas eksperymentu: {total_time:.2f} sekund")
except Exception as e:
print(f"BŁĄD podczas przetwarzania {data_file}: {str(e)}")
finally:
# Zamknięcie pliku wyjściowego
logger.close()
sys.stdout = logger.terminal
# Czyszczenie pamięci
gc.collect()
print("\n======= EKSPERYMENT 4 ZAKOŃCZONY =======")
if __name__ == "__main__":
run_experiment()