Last active
September 14, 2025 00:00
-
-
Save tapyu/05ca5852ff2edc9a9b201a47ba1d368c to your computer and use it in GitHub Desktop.
Revisions
-
tapyu revised this gist
Sep 14, 2025 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
LoadingSorry, something went wrong. Reload?Sorry, we cannot display this file.Sorry, this file is invalid so it cannot be displayed. -
tapyu revised this gist
Sep 13, 2025 . 1 changed file with 246 additions and 173 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,204 +1,277 @@ """ https://stackoverflow.com/questions/79759086/n-jobs-2-breaks-reproducibility Simple CNNClassifier MWE: n_jobs reproducibility issue Focused demonstration based on: https://stackoverflow.com/questions/79759086/n-jobs-2-breaks-reproducibility This MWE focuses on: 1. CNNClassifier only (as requested) 2. Learning curves (score vs epochs) 3. Small dataset for speed 4. Clear visualization of reproducibility differences """ import numpy as np import random import os import matplotlib.pyplot as plt import tensorflow as tf from sklearn.model_selection import StratifiedKFold from sktime.classification.deep_learning import CNNClassifier from sktime.classification.model_selection import TSCGridSearchCV import warnings warnings.filterwarnings('ignore') os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Reduce TensorFlow logging def set_seed(seed=42): """Set all random seeds for reproducibility (mimics original question).""" random.seed(seed) np.random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) # Set TensorFlow seeds import tensorflow as tf tf.random.set_seed(seed) tf.config.experimental.enable_op_determinism() def allow_memory_growth(): gpus = tf.config.list_physical_devices("GPU") if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) assert tf.config.experimental.get_memory_growth(gpu), f"Failed to enable memory growth for {gpu.name}." except RuntimeError as e: print(e) def create_time_series_data(seed=42): """Create small time series dataset for fast execution.""" set_seed(seed) n_samples = 60 # Very small for speed n_timepoints = 30 # Short series n_classes = 2 # Binary classification X = [] y = [] for i in range(n_samples): class_label = i % n_classes if class_label == 0: # Pattern 1: Sine wave with noise t = np.linspace(0, 2*np.pi, n_timepoints) ts = np.sin(t) + 100 * np.random.randn(n_timepoints) else: # Pattern 2: Linear trend with noise ts = np.linspace(-1, 1, n_timepoints) + 100 * np.random.randn(n_timepoints) X.append(ts) y.append(class_label) return np.array(X), np.array(y) def run_single_cnn_experiment(n_jobs, seed=42, run_id=1): """Run a single CNN experiment and return results.""" print(f" Run {run_id} with n_jobs={n_jobs}...") # Set seed before each run (critical for reproducibility test) set_seed(seed) # Allow memory growth (mimics original question setup) allow_memory_growth() # Create dataset X, y = create_time_series_data(seed) # Create CNNClassifier (similar to original question setup) cnn = CNNClassifier( n_epochs=10, # Very small for speed batch_size=4, verbose=False, random_state=seed, n_conv_layers=1, # Simplified kernel_size=3 ) # Create cross-validation (mimics original question) cv = StratifiedKFold( n_splits=3, shuffle=True, random_state=seed ) # Simple parameter grid (mimics original grid search) param_grid = { 'batch_size': [4, 8], 'kernel_size': [3, 5] } # GridSearchCV (similar to TSCGridSearchCV from original question) grid_search = TSCGridSearchCV( estimator=cnn, param_grid=param_grid, cv=cv, n_jobs=n_jobs, # This is where the issue occurs scoring='accuracy' ) # Fit and get results grid_search.fit(X, y) return { 'run_id': run_id, 'n_jobs': n_jobs, 'best_params': grid_search.best_params_, 'best_score': grid_search.best_score_, 'all_scores': grid_search.cv_results_['mean_test_score'], 'learning_curve': grid_search.best_estimator_.summary()['accuracy'] } def test_cnn_reproducibility(): """Test CNNClassifier reproducibility with n_jobs=1 vs n_jobs=2.""" print("=" * 60) print("CNNClassifier n_jobs Reproducibility Test") print("=" * 60) print("Based on: https://stackoverflow.com/questions/79759086/") print("Testing with small dataset for maximum speed...") # Test n_jobs=1 (should be reproducible) print("\n1. Testing n_jobs=1 (should be reproducible)") print("-" * 50) results_n1 = [] for i in range(3): result = run_single_cnn_experiment(n_jobs=1, seed=42, run_id=i+1) results_n1.append(result) print(f" Best: {result['best_params']}, Score: {result['best_score']:.6f}") # Check reproducibility for n_jobs=1 by comparing the learning curve and best hyperparameters reproducible_n1 = all( r['learning_curve'] == results_n1[0]['learning_curve'] and r['best_params'] == results_n1[0]['best_params'] for r in results_n1[1:] ) print(f"\n YES n_jobs=1 reproducible: {reproducible_n1}") # Test n_jobs=2 (may break reproducibility) print("\n2. Testing n_jobs=2 (may break reproducibility)") print("-" * 50) results_n2 = [] for i in range(3): result = run_single_cnn_experiment(n_jobs=2, seed=42, run_id=i+1) results_n2.append(result) print(f" Best: {result['best_params']}, Score: {result['best_score']:.6f}") # Check reproducibility for n_jobs=2 reproducible_n2 = all( r['learning_curve'] == results_n2[0]['learning_curve'] and r['best_params'] == results_n2[0]['best_params'] for r in results_n2[1:] ) print(f"\n {'YES' if reproducible_n2 else 'NO'} n_jobs=2 reproducible: {reproducible_n2}") # Create learning curve visualization create_learning_curves_plot(results_n1, results_n2, reproducible_n1, reproducible_n2) # Final summary print("\n" + "=" * 60) print("FINAL RESULTS") print("=" * 60) print(f"n_jobs=1: Reproducible = {reproducible_n1}") print(f"n_jobs=2: Reproducible = {reproducible_n2}") if reproducible_n1 and not reproducible_n2: print("\n🎯 SUCCESS: Demonstrated the n_jobs>=2 reproducibility issue!") print(" n_jobs=1 is reproducible, but n_jobs=2 is not.") elif not reproducible_n2: print("\n⚠️ PARTIAL: n_jobs=2 shows non-reproducible behavior.") else: print("\nYES No reproducibility issue detected in this run.") print(" Note: The issue may be intermittent or system-dependent.") print(" Try running multiple times or with different parameters.") def create_learning_curves_plot(results_n1, results_n2, repro_n1, repro_n2): """Create learning curve plots comparing n_jobs=1 vs n_jobs=2.""" fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(14, 10)) # Colors for different runs colors = ['blue', 'red', 'green'] # Plot 1: Best scores comparison for n_jobs=1 runs = [r['run_id'] for r in results_n1] scores = [r['best_score'] for r in results_n1] bars1 = ax1.bar(runs, scores, color='lightblue', alpha=0.7, edgecolor='blue') ax1.set_title(f'n_jobs=1: Best Scores per Run\n(Reproducible: {"YES" if repro_n1 else "NO"})') ax1.set_xlabel('Run Number') ax1.set_ylabel('Best CV Score') ax1.set_ylim([min(scores) - 0.01, max(scores) + 0.01]) ax1.grid(True, alpha=0.3) # Add value labels on bars for bar, score in zip(bars1, scores): ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001, f'{score:.4f}', ha='center', va='bottom', fontsize=9) # Plot 2: Best scores comparison for n_jobs=2 runs = [r['run_id'] for r in results_n2] scores = [r['best_score'] for r in results_n2] bars2 = ax2.bar(runs, scores, color='lightcoral', alpha=0.7, edgecolor='red') ax2.set_title(f'n_jobs=2: Best Scores per Run\n(Reproducible: {"YES" if repro_n2 else "NO"})') ax2.set_xlabel('Run Number') ax2.set_ylabel('Best CV Score') ax2.set_ylim([min(scores) - 0.01, max(scores) + 0.01]) ax2.grid(True, alpha=0.3) # Add value labels on bars for bar, score in zip(bars2, scores): ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001, f'{score:.4f}', ha='center', va='bottom', fontsize=9) # Plot 3: All CV scores for n_jobs=1 (learning curve style) for i, result in enumerate(results_n1): param_indices = range(len(result['learning_curve'])) ax3.plot(param_indices, result['learning_curve'], marker='o', color=colors[i], alpha=0.8, label=f'Run {result["run_id"]}', linewidth=2) ax3.set_title('n_jobs=1: CV Scores Across Parameter Sets') ax3.set_xlabel('Parameter Set Index') ax3.set_ylabel('CV Score') ax3.legend() ax3.grid(True, alpha=0.3) # Plot 4: All CV scores for n_jobs=2 (learning curve style) for i, result in enumerate(results_n2): param_indices = range(len(result['learning_curve'])) ax4.plot(param_indices, result['learning_curve'], marker='s', color=colors[i], alpha=0.8, label=f'Run {result["run_id"]}', linewidth=2) ax4.set_title('n_jobs=2: CV Scores Across Parameter Sets') ax4.set_xlabel('Parameter Set Index') ax4.set_ylabel('CV Score') ax4.legend() ax4.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('./cnn_learning_curves.png', dpi=150, bbox_inches='tight') print("\n📊 Learning curves plot saved: cnn_learning_curves.png") # Interpretation guide print("\n🔍 PLOT INTERPRETATION:") print(" • Overlapping lines in plots 3&4 = Reproducible results") print(" • Separated lines = Non-reproducible results") print(" • Compare n_jobs=1 vs n_jobs=2 patterns") if __name__ == "__main__": test_cnn_reproducibility() -
tapyu created this gist
Sep 13, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1 @@ # "Take a look" scripts This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,204 @@ import os import tensorflow as tf from pathlib import Path import sys import hydra from loguru import logger import ctypes import numpy as np from omegaconf import DictConfig import pandas as pd import yaml from sklearn.model_selection import StratifiedKFold from sktime.classification.model_selection import TSCGridSearchCV from iono_scint_charact.hydra_cfg import set_hydra_cfg from iono_scint_charact.utils.data import get_processed_dataset, get_md5sum_file from iono_scint_charact.utils.classifier import get_classifier, get_options # TODO: reimplement get_options() and use it to digest the input cfg for CV import random def _set_seed(seed: int): """Sets all random seeds for reproducibility.""" # NOTE: If it does not ensure reproducibility, try to uncomment the functions below. If the error persists, then it is likely that your Nvidia GPU is not properly configured to run deep learning models. Please, see the README.md file. # Set seed random.seed(seed) # Deterministic TF ops tf.config.experimental.enable_op_determinism() ## environment variables os.environ['PYTHONHASHSEED'] = str(seed) os.environ["TF_DETERMINISTIC_OPS"] = "1" # seeds np.random.seed(seed) tf.random.set_seed(seed) tf.keras.utils.set_random_seed(seed) # Avoid JIT/XLA kernel changes tf.config.optimizer.set_jit(False) tf.config.threading.set_inter_op_parallelism_threads(1) def _allow_memory_growth(): gpus = tf.config.list_physical_devices("GPU") if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) logger.info(f"Enable memory growth for {gpu.name}.") assert tf.config.experimental.get_memory_growth(gpu), f"Failed to enable memory growth for {gpu.name}." except RuntimeError as e: logger.error(e) def _check_gpu_environment(req: DictConfig) -> None: build_info = tf.sysconfig.get_build_info() py_ver = sys.version_info # get cuDNN shared library version libcudnn = ctypes.cdll.LoadLibrary("libcudnn.so") if tf.__version__ != req.tf_version: logger.error(f"TensorFlow {req.tf_version} required, found {tf.__version__}.") raise RuntimeError("GPU environment check failed.") if req.cuda_version not in build_info.get("cuda_version"): logger.error(f"CUDA {req.cuda_version} required, found {build_info.get('cuda_version')}.") raise RuntimeError("GPU environment check failed.") if libcudnn_version := libcudnn.cudnnGetVersion() != req.cudnn_version: logger.error(f"cuDNN {req.cudnn_version} required, found {libcudnn_version}.") raise RuntimeError("GPU environment check failed.") if not (req.python_version[0][0] <= py_ver.major <= req.python_version[1][0] and req.python_version[0][1] <= py_ver.minor <= req.python_version[1][1]): logger.error( f"Python {req.python_version[0]}-{req.python_version[1]} required, " f"found {py_ver.major}.{py_ver.minor}." ) raise RuntimeError("GPU environment check failed.") def _train(cfg: DictConfig) -> None: _check_gpu_environment(cfg.gpu_requirements) ### get parameters ### # get data data_path = Path(cfg.data.prefix.processed) / "processed_dataset.h5" X, y, _ = get_processed_dataset(cfg, data_path, "Training") # set GPU memory growth if cfg.train.is_memory_growth: _allow_memory_growth() # enable asynchronous CUDA memory allocation os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async" # NOTE: set TensorFlow to use NVIDIA’s stream-ordered memory allocator (based on `cudaMallocAsync/cudaFreeAsync`) instead of TF’s legacy BFC (Best-Fit with Coalescing) GPU allocator. In other words, it switches TF’s GPU allocator to the async CUDA allocator (introduced in CUDA 11.2+), which tends to suffer far less fragmentation than BFC. `cudaMallocAsync/cudaFreeAsync` tie allocations/frees to CUDA streams, so memory can be returned to a pool and reused without device-wide synchronization. This reduces stalls and fragmentation that often bite deep-learning workloads with many short-lived tensors. Instead of large fixed reservations and fragmented free lists in the BFC dump, the async allocator uses a memory pool that grows as needed and reuses blocks more effectively, stopping the “allocator ran out of memory” problem, which occurs even though total VRAM was available. os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" # NOTE: disable oneDNN optimizations to avoid unwanted behavior in some operations (e.g., `BatchNormalization`). Using this library can provide significant speedups, sometimes 2–4×, especially on Intel CPUs with features like AVX‑512, VNNI, etc. However, because oneDNN may change the order of operations, it can introduce tiny floating-point numerical differences compared to the standard TensorFlow path. This is purely due to rounding—not a bug, but something to be aware of. Setting TF_ENABLE_ONEDNN_OPTS=0 disables these optimizations, restoring TensorFlow’s default computation behavior. # model Clf = get_classifier(cfg.model.name) # instantiate the model _set_seed(cfg.seed) clf = Clf(random_state=cfg.seed) # instantiate the K-fold cross-validation object cv = StratifiedKFold( n_splits=cfg.train.n_folds, shuffle=False,#cfg.train.is_shuffle, random_state=cfg.seed ) # instantiate the grid search object grid_search = TSCGridSearchCV( estimator=clf, param_grid=dict(cfg.train.grid_params), cv=cv, n_jobs=cfg.train.n_jobs, scoring=cfg.train.scoring, verbose=4-(logger.level(cfg.log.cli.level).no//10), return_train_score=cfg.train.is_return_train_score, ) # train classifier with k-fold cross-validation logger.info(f"Start training with grid search over {len(cfg.train.grid_params)} hyperparameter sets for the classifier {cfg.model.name} and seed={cfg.seed}.") _set_seed(cfg.seed) # ???: the seed needs to be reset before calling `fit()`, otherwise the results will not be reproducible (?) logger.info(f"The seed used during the training phase is {cfg.seed}.") grid_search.fit(X, y) logger.info(f"The best parameter set is {grid_search.best_params_}, leading to an avarage {cfg.train.scoring} of {grid_search.best_score_:.4f}.") if cfg.log.cli.level == "DEBUG": grid_search.best_estimator_.model_.summary() # HACK: `summary()` directly sends to STDOUT, so no `print` or `logger.debug()` is required # save best parameters for DVC Studio consumption dvcstudio_bypass_path = Path(cfg.out.prefix) / "dvcstudio_bypass.yaml" with open(dvcstudio_bypass_path, "w") as f: # Store best estimator hyperparameters as top-level keys grid_search.best_params_['best_score'] = float(grid_search.best_score_) yaml.safe_dump(grid_search.best_params_, f, default_flow_style=False, sort_keys=False) # save classifier clf_path = Path(cfg.model.prefix) / "classifier" grid_search.best_estimator_.save(clf_path) if cfg.log.cli.level == "DEBUG": clf_digest = get_md5sum_file(f"{clf_path}.zip", cfg.md5sum_digest_chunksize) logger.debug(f"The MD5 checksum for best trained classifier: {clf_digest}.") # save per-fold validation scores of the best parameter set # NOTE: `cv_test_scores_best` (actually, the validation scores during CV): What: One score per split on the held‑out validation fold (the “test” fold in CV) for the hyperparameter set that ultimately won; When: During grid_search.fit, after training on the 4‑fold union, it evaluates on the 1 held‑out fold. Do this for all 5 splits → 5 numbers. # NOTE: `cv_train_scores_best`: What: One score per split on the training portion of that split (i.e., the union of the 4 folds used to fit for that split), for the same best hyperparameter set; When: Also during grid_search.fit, right after fitting on the 4‑fold union, it scores on that same training data. It’s not 4 separate scores; it’s a single score over the concatenated 4 folds per split → 5 numbers. # NOTE: Both sets are produced during cross-validation and are used for diagnostics (only the mean validation/test score drives model selection). After selection, with refit=True, the best_estimator_ is retrained on the full training data; that refit doesn’t produce per‑fold scores. try: cvres = grid_search.cv_results_ n_splits = cfg.train.n_folds split_keys = [f"split{i}_test_score" for i in range(n_splits)] scores = [float(cvres[k][grid_search.best_index_]) for k in split_keys] # Write as YAML mapping so DVC Experiments treats each as a scalar cv_scores_yaml = {f"fold_{i}": s for i, s in enumerate(scores, start=1)} cv_scores_path = Path(cfg.out.prefix) / "cv_val_scores_best.yaml" with open(cv_scores_path, "w") as f: yaml.safe_dump({"cv_val_scores_best": cv_scores_yaml}, f, sort_keys=False) logger.info(f"Saved per-fold validation {cfg.train.scoring} for best params at {cv_scores_path}.") except Exception as e: logger.warning(f"Could not extract per-fold validation scores for best params: {e}") # save per-fold TRAIN scores of the best parameter set (if requested) if getattr(cfg.train, "is_return_train_score", False): try: cvres = grid_search.cv_results_ n_splits = cfg.train.n_folds split_train_keys = [f"split{i}_train_score" for i in range(n_splits)] train_scores_available = all(k in cvres for k in split_train_keys) train_scores = None if train_scores_available: train_scores = [float(cvres[k][grid_search.best_index_]) for k in split_train_keys] # mean train score may also be present mean_train_score = None if "mean_train_score" in cvres: mean_train_score = float(cvres["mean_train_score"][grid_search.best_index_]) # Only write if we actually have something if train_scores is not None or mean_train_score is not None: payload = {} if train_scores is not None: payload["cv_train_scores_best"] = { f"fold_{i}": s for i, s in enumerate(train_scores, start=1) } if mean_train_score is not None: payload["mean_train_score_best"] = mean_train_score cv_train_scores_path = Path(cfg.out.prefix) / "cv_train_scores_best.yaml" with open(cv_train_scores_path, "w") as f: yaml.safe_dump(payload, f, sort_keys=False) logger.info( f"Saved train scores for best params at {cv_train_scores_path}." ) else: logger.warning( "return_train_score enabled but train scores not found in cv_results_." ) except Exception as e: logger.warning(f"Could not extract/train scores for best params: {e}") # save learning curve learning_curve = grid_search.best_estimator_.summary()["accuracy"] df = pd.DataFrame({ "epoch": np.arange(1, len(learning_curve)+1), "accuracies": learning_curve, }) learning_curve_path = Path(cfg.out.prefix) / "learning_curve.csv" df.to_csv(learning_curve_path, index=False) @hydra.main(version_base=None, config_path="../", config_name="params.yaml") def _main(cfg: DictConfig) -> int: cfg = set_hydra_cfg(cfg) _train(cfg) return 0 if __name__ == "__main__": sys.exit(_main())