Skip to content

Instantly share code, notes, and snippets.

@tapyu
Last active September 14, 2025 00:00
Show Gist options
  • Select an option

  • Save tapyu/05ca5852ff2edc9a9b201a47ba1d368c to your computer and use it in GitHub Desktop.

Select an option

Save tapyu/05ca5852ff2edc9a9b201a47ba1d368c to your computer and use it in GitHub Desktop.

Revisions

  1. tapyu revised this gist Sep 14, 2025. 1 changed file with 0 additions and 0 deletions.
    Binary file added repro_issue.png
    Loading
    Sorry, something went wrong. Reload?
    Sorry, we cannot display this file.
    Sorry, this file is invalid so it cannot be displayed.
  2. tapyu revised this gist Sep 13, 2025. 1 changed file with 246 additions and 173 deletions.
    419 changes: 246 additions & 173 deletions repro_issue.py
    Original file line number Diff line number Diff line change
    @@ -1,204 +1,277 @@
    import os
    import tensorflow as tf
    from pathlib import Path
    import sys
    """
    https://stackoverflow.com/questions/79759086/n-jobs-2-breaks-reproducibility
    Simple CNNClassifier MWE: n_jobs reproducibility issue
    Focused demonstration based on:
    https://stackoverflow.com/questions/79759086/n-jobs-2-breaks-reproducibility
    This MWE focuses on:
    1. CNNClassifier only (as requested)
    2. Learning curves (score vs epochs)
    3. Small dataset for speed
    4. Clear visualization of reproducibility differences
    """

    import hydra
    from loguru import logger
    import ctypes
    import numpy as np
    from omegaconf import DictConfig
    import pandas as pd
    import yaml
    import random
    import os
    import matplotlib.pyplot as plt
    import tensorflow as tf
    from sklearn.model_selection import StratifiedKFold
    from sktime.classification.deep_learning import CNNClassifier
    from sktime.classification.model_selection import TSCGridSearchCV
    import warnings
    warnings.filterwarnings('ignore')
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Reduce TensorFlow logging

    from iono_scint_charact.hydra_cfg import set_hydra_cfg
    from iono_scint_charact.utils.data import get_processed_dataset, get_md5sum_file
    from iono_scint_charact.utils.classifier import get_classifier, get_options # TODO: reimplement get_options() and use it to digest the input cfg for CV
    import random

    def _set_seed(seed: int):
    """Sets all random seeds for reproducibility."""
    # NOTE: If it does not ensure reproducibility, try to uncomment the functions below. If the error persists, then it is likely that your Nvidia GPU is not properly configured to run deep learning models. Please, see the README.md file.
    # Set seed
    def set_seed(seed=42):
    """Set all random seeds for reproducibility (mimics original question)."""
    random.seed(seed)
    # Deterministic TF ops
    tf.config.experimental.enable_op_determinism()
    ## environment variables
    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ["TF_DETERMINISTIC_OPS"] = "1"
    # seeds
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

    # Set TensorFlow seeds
    import tensorflow as tf
    tf.random.set_seed(seed)
    tf.keras.utils.set_random_seed(seed)
    # Avoid JIT/XLA kernel changes
    tf.config.optimizer.set_jit(False)
    tf.config.threading.set_inter_op_parallelism_threads(1)
    tf.config.experimental.enable_op_determinism()

    def _allow_memory_growth():
    def allow_memory_growth():
    gpus = tf.config.list_physical_devices("GPU")
    if gpus:
    try:
    for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    logger.info(f"Enable memory growth for {gpu.name}.")
    assert tf.config.experimental.get_memory_growth(gpu), f"Failed to enable memory growth for {gpu.name}."
    except RuntimeError as e:
    logger.error(e)
    print(e)

    def _check_gpu_environment(req: DictConfig) -> None:
    build_info = tf.sysconfig.get_build_info()
    py_ver = sys.version_info
    def create_time_series_data(seed=42):
    """Create small time series dataset for fast execution."""
    set_seed(seed)

    # get cuDNN shared library version
    libcudnn = ctypes.cdll.LoadLibrary("libcudnn.so")

    if tf.__version__ != req.tf_version:
    logger.error(f"TensorFlow {req.tf_version} required, found {tf.__version__}.")
    raise RuntimeError("GPU environment check failed.")

    if req.cuda_version not in build_info.get("cuda_version"):
    logger.error(f"CUDA {req.cuda_version} required, found {build_info.get('cuda_version')}.")
    raise RuntimeError("GPU environment check failed.")

    if libcudnn_version := libcudnn.cudnnGetVersion() != req.cudnn_version:
    logger.error(f"cuDNN {req.cudnn_version} required, found {libcudnn_version}.")
    raise RuntimeError("GPU environment check failed.")
    n_samples = 60 # Very small for speed
    n_timepoints = 30 # Short series
    n_classes = 2 # Binary classification

    X = []
    y = []

    for i in range(n_samples):
    class_label = i % n_classes

    if class_label == 0:
    # Pattern 1: Sine wave with noise
    t = np.linspace(0, 2*np.pi, n_timepoints)
    ts = np.sin(t) + 100 * np.random.randn(n_timepoints)
    else:
    # Pattern 2: Linear trend with noise
    ts = np.linspace(-1, 1, n_timepoints) + 100 * np.random.randn(n_timepoints)

    X.append(ts)
    y.append(class_label)

    return np.array(X), np.array(y)

    if not (req.python_version[0][0] <= py_ver.major <= req.python_version[1][0] and
    req.python_version[0][1] <= py_ver.minor <= req.python_version[1][1]):
    logger.error(
    f"Python {req.python_version[0]}-{req.python_version[1]} required, "
    f"found {py_ver.major}.{py_ver.minor}."
    )
    raise RuntimeError("GPU environment check failed.")
    def run_single_cnn_experiment(n_jobs, seed=42, run_id=1):
    """Run a single CNN experiment and return results."""

    print(f" Run {run_id} with n_jobs={n_jobs}...")

    # Set seed before each run (critical for reproducibility test)
    set_seed(seed)

    def _train(cfg: DictConfig) -> None:
    _check_gpu_environment(cfg.gpu_requirements)
    ### get parameters ###
    # get data
    data_path = Path(cfg.data.prefix.processed) / "processed_dataset.h5"
    X, y, _ = get_processed_dataset(cfg, data_path, "Training")
    # set GPU memory growth
    if cfg.train.is_memory_growth:
    _allow_memory_growth()
    # enable asynchronous CUDA memory allocation
    os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async" # NOTE: set TensorFlow to use NVIDIA’s stream-ordered memory allocator (based on `cudaMallocAsync/cudaFreeAsync`) instead of TF’s legacy BFC (Best-Fit with Coalescing) GPU allocator. In other words, it switches TF’s GPU allocator to the async CUDA allocator (introduced in CUDA 11.2+), which tends to suffer far less fragmentation than BFC. `cudaMallocAsync/cudaFreeAsync` tie allocations/frees to CUDA streams, so memory can be returned to a pool and reused without device-wide synchronization. This reduces stalls and fragmentation that often bite deep-learning workloads with many short-lived tensors. Instead of large fixed reservations and fragmented free lists in the BFC dump, the async allocator uses a memory pool that grows as needed and reuses blocks more effectively, stopping the “allocator ran out of memory” problem, which occurs even though total VRAM was available.
    os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" # NOTE: disable oneDNN optimizations to avoid unwanted behavior in some operations (e.g., `BatchNormalization`). Using this library can provide significant speedups, sometimes 2–4×, especially on Intel CPUs with features like AVX‑512, VNNI, etc. However, because oneDNN may change the order of operations, it can introduce tiny floating-point numerical differences compared to the standard TensorFlow path. This is purely due to rounding—not a bug, but something to be aware of. Setting TF_ENABLE_ONEDNN_OPTS=0 disables these optimizations, restoring TensorFlow’s default computation behavior.
    # model
    Clf = get_classifier(cfg.model.name)
    # instantiate the model
    _set_seed(cfg.seed)
    clf = Clf(random_state=cfg.seed)
    # instantiate the K-fold cross-validation object
    # Allow memory growth (mimics original question setup)
    allow_memory_growth()

    # Create dataset
    X, y = create_time_series_data(seed)

    # Create CNNClassifier (similar to original question setup)
    cnn = CNNClassifier(
    n_epochs=10, # Very small for speed
    batch_size=4,
    verbose=False,
    random_state=seed,
    n_conv_layers=1, # Simplified
    kernel_size=3
    )

    # Create cross-validation (mimics original question)
    cv = StratifiedKFold(
    n_splits=cfg.train.n_folds,
    shuffle=False,#cfg.train.is_shuffle,
    random_state=cfg.seed
    n_splits=3,
    shuffle=True,
    random_state=seed
    )
    # instantiate the grid search object

    # Simple parameter grid (mimics original grid search)
    param_grid = {
    'batch_size': [4, 8],
    'kernel_size': [3, 5]
    }

    # GridSearchCV (similar to TSCGridSearchCV from original question)
    grid_search = TSCGridSearchCV(
    estimator=clf,
    param_grid=dict(cfg.train.grid_params),
    estimator=cnn,
    param_grid=param_grid,
    cv=cv,
    n_jobs=cfg.train.n_jobs,
    scoring=cfg.train.scoring,
    verbose=4-(logger.level(cfg.log.cli.level).no//10),
    return_train_score=cfg.train.is_return_train_score,
    n_jobs=n_jobs, # This is where the issue occurs
    scoring='accuracy'
    )
    # train classifier with k-fold cross-validation
    logger.info(f"Start training with grid search over {len(cfg.train.grid_params)} hyperparameter sets for the classifier {cfg.model.name} and seed={cfg.seed}.")
    _set_seed(cfg.seed) # ???: the seed needs to be reset before calling `fit()`, otherwise the results will not be reproducible (?)
    logger.info(f"The seed used during the training phase is {cfg.seed}.")

    # Fit and get results
    grid_search.fit(X, y)
    logger.info(f"The best parameter set is {grid_search.best_params_}, leading to an avarage {cfg.train.scoring} of {grid_search.best_score_:.4f}.")
    if cfg.log.cli.level == "DEBUG":
    grid_search.best_estimator_.model_.summary() # HACK: `summary()` directly sends to STDOUT, so no `print` or `logger.debug()` is required
    # save best parameters for DVC Studio consumption
    dvcstudio_bypass_path = Path(cfg.out.prefix) / "dvcstudio_bypass.yaml"
    with open(dvcstudio_bypass_path, "w") as f:
    # Store best estimator hyperparameters as top-level keys
    grid_search.best_params_['best_score'] = float(grid_search.best_score_)
    yaml.safe_dump(grid_search.best_params_, f, default_flow_style=False, sort_keys=False)
    # save classifier
    clf_path = Path(cfg.model.prefix) / "classifier"
    grid_search.best_estimator_.save(clf_path)
    if cfg.log.cli.level == "DEBUG":
    clf_digest = get_md5sum_file(f"{clf_path}.zip", cfg.md5sum_digest_chunksize)
    logger.debug(f"The MD5 checksum for best trained classifier: {clf_digest}.")

    # save per-fold validation scores of the best parameter set
    # NOTE: `cv_test_scores_best` (actually, the validation scores during CV): What: One score per split on the held‑out validation fold (the “test” fold in CV) for the hyperparameter set that ultimately won; When: During grid_search.fit, after training on the 4‑fold union, it evaluates on the 1 held‑out fold. Do this for all 5 splits → 5 numbers.
    # NOTE: `cv_train_scores_best`: What: One score per split on the training portion of that split (i.e., the union of the 4 folds used to fit for that split), for the same best hyperparameter set; When: Also during grid_search.fit, right after fitting on the 4‑fold union, it scores on that same training data. It’s not 4 separate scores; it’s a single score over the concatenated 4 folds per split → 5 numbers.
    # NOTE: Both sets are produced during cross-validation and are used for diagnostics (only the mean validation/test score drives model selection). After selection, with refit=True, the best_estimator_ is retrained on the full training data; that refit doesn’t produce per‑fold scores.
    try:
    cvres = grid_search.cv_results_
    n_splits = cfg.train.n_folds
    split_keys = [f"split{i}_test_score" for i in range(n_splits)]
    scores = [float(cvres[k][grid_search.best_index_]) for k in split_keys]
    # Write as YAML mapping so DVC Experiments treats each as a scalar
    cv_scores_yaml = {f"fold_{i}": s for i, s in enumerate(scores, start=1)}
    cv_scores_path = Path(cfg.out.prefix) / "cv_val_scores_best.yaml"
    with open(cv_scores_path, "w") as f:
    yaml.safe_dump({"cv_val_scores_best": cv_scores_yaml}, f, sort_keys=False)
    logger.info(f"Saved per-fold validation {cfg.train.scoring} for best params at {cv_scores_path}.")
    except Exception as e:
    logger.warning(f"Could not extract per-fold validation scores for best params: {e}")

    # save per-fold TRAIN scores of the best parameter set (if requested)
    if getattr(cfg.train, "is_return_train_score", False):
    try:
    cvres = grid_search.cv_results_
    n_splits = cfg.train.n_folds
    split_train_keys = [f"split{i}_train_score" for i in range(n_splits)]
    train_scores_available = all(k in cvres for k in split_train_keys)

    train_scores = None
    if train_scores_available:
    train_scores = [float(cvres[k][grid_search.best_index_]) for k in split_train_keys]

    # mean train score may also be present
    mean_train_score = None
    if "mean_train_score" in cvres:
    mean_train_score = float(cvres["mean_train_score"][grid_search.best_index_])

    return {
    'run_id': run_id,
    'n_jobs': n_jobs,
    'best_params': grid_search.best_params_,
    'best_score': grid_search.best_score_,
    'all_scores': grid_search.cv_results_['mean_test_score'],
    'learning_curve': grid_search.best_estimator_.summary()['accuracy']
    }

    # Only write if we actually have something
    if train_scores is not None or mean_train_score is not None:
    payload = {}
    if train_scores is not None:
    payload["cv_train_scores_best"] = {
    f"fold_{i}": s for i, s in enumerate(train_scores, start=1)
    }
    if mean_train_score is not None:
    payload["mean_train_score_best"] = mean_train_score
    def test_cnn_reproducibility():
    """Test CNNClassifier reproducibility with n_jobs=1 vs n_jobs=2."""

    print("=" * 60)
    print("CNNClassifier n_jobs Reproducibility Test")
    print("=" * 60)
    print("Based on: https://stackoverflow.com/questions/79759086/")
    print("Testing with small dataset for maximum speed...")

    # Test n_jobs=1 (should be reproducible)
    print("\n1. Testing n_jobs=1 (should be reproducible)")
    print("-" * 50)

    results_n1 = []
    for i in range(3):
    result = run_single_cnn_experiment(n_jobs=1, seed=42, run_id=i+1)
    results_n1.append(result)
    print(f" Best: {result['best_params']}, Score: {result['best_score']:.6f}")

    cv_train_scores_path = Path(cfg.out.prefix) / "cv_train_scores_best.yaml"
    with open(cv_train_scores_path, "w") as f:
    yaml.safe_dump(payload, f, sort_keys=False)
    logger.info(
    f"Saved train scores for best params at {cv_train_scores_path}."
    )
    else:
    logger.warning(
    "return_train_score enabled but train scores not found in cv_results_."
    )
    except Exception as e:
    logger.warning(f"Could not extract/train scores for best params: {e}")
    # save learning curve
    learning_curve = grid_search.best_estimator_.summary()["accuracy"]
    df = pd.DataFrame({
    "epoch": np.arange(1, len(learning_curve)+1),
    "accuracies": learning_curve,
    })
    learning_curve_path = Path(cfg.out.prefix) / "learning_curve.csv"
    df.to_csv(learning_curve_path, index=False)
    # Check reproducibility for n_jobs=1 by comparing the learning curve and best hyperparameters
    reproducible_n1 = all(
    r['learning_curve'] == results_n1[0]['learning_curve'] and
    r['best_params'] == results_n1[0]['best_params']
    for r in results_n1[1:]
    )

    print(f"\n YES n_jobs=1 reproducible: {reproducible_n1}")

    # Test n_jobs=2 (may break reproducibility)
    print("\n2. Testing n_jobs=2 (may break reproducibility)")
    print("-" * 50)

    results_n2 = []
    for i in range(3):
    result = run_single_cnn_experiment(n_jobs=2, seed=42, run_id=i+1)
    results_n2.append(result)
    print(f" Best: {result['best_params']}, Score: {result['best_score']:.6f}")

    # Check reproducibility for n_jobs=2
    reproducible_n2 = all(
    r['learning_curve'] == results_n2[0]['learning_curve'] and
    r['best_params'] == results_n2[0]['best_params']
    for r in results_n2[1:]
    )

    print(f"\n {'YES' if reproducible_n2 else 'NO'} n_jobs=2 reproducible: {reproducible_n2}")

    # Create learning curve visualization
    create_learning_curves_plot(results_n1, results_n2, reproducible_n1, reproducible_n2)

    # Final summary
    print("\n" + "=" * 60)
    print("FINAL RESULTS")
    print("=" * 60)
    print(f"n_jobs=1: Reproducible = {reproducible_n1}")
    print(f"n_jobs=2: Reproducible = {reproducible_n2}")

    if reproducible_n1 and not reproducible_n2:
    print("\n🎯 SUCCESS: Demonstrated the n_jobs>=2 reproducibility issue!")
    print(" n_jobs=1 is reproducible, but n_jobs=2 is not.")
    elif not reproducible_n2:
    print("\n⚠️ PARTIAL: n_jobs=2 shows non-reproducible behavior.")
    else:
    print("\nYES No reproducibility issue detected in this run.")
    print(" Note: The issue may be intermittent or system-dependent.")
    print(" Try running multiple times or with different parameters.")

    @hydra.main(version_base=None, config_path="../", config_name="params.yaml")
    def _main(cfg: DictConfig) -> int:
    cfg = set_hydra_cfg(cfg)
    _train(cfg)
    return 0
    def create_learning_curves_plot(results_n1, results_n2, repro_n1, repro_n2):
    """Create learning curve plots comparing n_jobs=1 vs n_jobs=2."""

    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(14, 10))

    # Colors for different runs
    colors = ['blue', 'red', 'green']

    # Plot 1: Best scores comparison for n_jobs=1
    runs = [r['run_id'] for r in results_n1]
    scores = [r['best_score'] for r in results_n1]

    bars1 = ax1.bar(runs, scores, color='lightblue', alpha=0.7, edgecolor='blue')
    ax1.set_title(f'n_jobs=1: Best Scores per Run\n(Reproducible: {"YES" if repro_n1 else "NO"})')
    ax1.set_xlabel('Run Number')
    ax1.set_ylabel('Best CV Score')
    ax1.set_ylim([min(scores) - 0.01, max(scores) + 0.01])
    ax1.grid(True, alpha=0.3)

    # Add value labels on bars
    for bar, score in zip(bars1, scores):
    ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
    f'{score:.4f}', ha='center', va='bottom', fontsize=9)

    # Plot 2: Best scores comparison for n_jobs=2
    runs = [r['run_id'] for r in results_n2]
    scores = [r['best_score'] for r in results_n2]

    bars2 = ax2.bar(runs, scores, color='lightcoral', alpha=0.7, edgecolor='red')
    ax2.set_title(f'n_jobs=2: Best Scores per Run\n(Reproducible: {"YES" if repro_n2 else "NO"})')
    ax2.set_xlabel('Run Number')
    ax2.set_ylabel('Best CV Score')
    ax2.set_ylim([min(scores) - 0.01, max(scores) + 0.01])
    ax2.grid(True, alpha=0.3)

    # Add value labels on bars
    for bar, score in zip(bars2, scores):
    ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
    f'{score:.4f}', ha='center', va='bottom', fontsize=9)

    # Plot 3: All CV scores for n_jobs=1 (learning curve style)
    for i, result in enumerate(results_n1):
    param_indices = range(len(result['learning_curve']))
    ax3.plot(param_indices, result['learning_curve'],
    marker='o', color=colors[i], alpha=0.8,
    label=f'Run {result["run_id"]}', linewidth=2)

    ax3.set_title('n_jobs=1: CV Scores Across Parameter Sets')
    ax3.set_xlabel('Parameter Set Index')
    ax3.set_ylabel('CV Score')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Plot 4: All CV scores for n_jobs=2 (learning curve style)
    for i, result in enumerate(results_n2):
    param_indices = range(len(result['learning_curve']))
    ax4.plot(param_indices, result['learning_curve'],
    marker='s', color=colors[i], alpha=0.8,
    label=f'Run {result["run_id"]}', linewidth=2)

    ax4.set_title('n_jobs=2: CV Scores Across Parameter Sets')
    ax4.set_xlabel('Parameter Set Index')
    ax4.set_ylabel('CV Score')
    ax4.legend()
    ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig('./cnn_learning_curves.png', dpi=150, bbox_inches='tight')
    print("\n📊 Learning curves plot saved: cnn_learning_curves.png")

    # Interpretation guide
    print("\n🔍 PLOT INTERPRETATION:")
    print(" • Overlapping lines in plots 3&4 = Reproducible results")
    print(" • Separated lines = Non-reproducible results")
    print(" • Compare n_jobs=1 vs n_jobs=2 patterns")

    if __name__ == "__main__":
    sys.exit(_main())
    test_cnn_reproducibility()
  3. tapyu created this gist Sep 13, 2025.
    1 change: 1 addition & 0 deletions .take_a_look.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1 @@
    # "Take a look" scripts
    204 changes: 204 additions & 0 deletions repro_issue.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,204 @@
    import os
    import tensorflow as tf
    from pathlib import Path
    import sys

    import hydra
    from loguru import logger
    import ctypes
    import numpy as np
    from omegaconf import DictConfig
    import pandas as pd
    import yaml
    from sklearn.model_selection import StratifiedKFold
    from sktime.classification.model_selection import TSCGridSearchCV

    from iono_scint_charact.hydra_cfg import set_hydra_cfg
    from iono_scint_charact.utils.data import get_processed_dataset, get_md5sum_file
    from iono_scint_charact.utils.classifier import get_classifier, get_options # TODO: reimplement get_options() and use it to digest the input cfg for CV
    import random

    def _set_seed(seed: int):
    """Sets all random seeds for reproducibility."""
    # NOTE: If it does not ensure reproducibility, try to uncomment the functions below. If the error persists, then it is likely that your Nvidia GPU is not properly configured to run deep learning models. Please, see the README.md file.
    # Set seed
    random.seed(seed)
    # Deterministic TF ops
    tf.config.experimental.enable_op_determinism()
    ## environment variables
    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ["TF_DETERMINISTIC_OPS"] = "1"
    # seeds
    np.random.seed(seed)
    tf.random.set_seed(seed)
    tf.keras.utils.set_random_seed(seed)
    # Avoid JIT/XLA kernel changes
    tf.config.optimizer.set_jit(False)
    tf.config.threading.set_inter_op_parallelism_threads(1)

    def _allow_memory_growth():
    gpus = tf.config.list_physical_devices("GPU")
    if gpus:
    try:
    for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    logger.info(f"Enable memory growth for {gpu.name}.")
    assert tf.config.experimental.get_memory_growth(gpu), f"Failed to enable memory growth for {gpu.name}."
    except RuntimeError as e:
    logger.error(e)

    def _check_gpu_environment(req: DictConfig) -> None:
    build_info = tf.sysconfig.get_build_info()
    py_ver = sys.version_info

    # get cuDNN shared library version
    libcudnn = ctypes.cdll.LoadLibrary("libcudnn.so")

    if tf.__version__ != req.tf_version:
    logger.error(f"TensorFlow {req.tf_version} required, found {tf.__version__}.")
    raise RuntimeError("GPU environment check failed.")

    if req.cuda_version not in build_info.get("cuda_version"):
    logger.error(f"CUDA {req.cuda_version} required, found {build_info.get('cuda_version')}.")
    raise RuntimeError("GPU environment check failed.")

    if libcudnn_version := libcudnn.cudnnGetVersion() != req.cudnn_version:
    logger.error(f"cuDNN {req.cudnn_version} required, found {libcudnn_version}.")
    raise RuntimeError("GPU environment check failed.")

    if not (req.python_version[0][0] <= py_ver.major <= req.python_version[1][0] and
    req.python_version[0][1] <= py_ver.minor <= req.python_version[1][1]):
    logger.error(
    f"Python {req.python_version[0]}-{req.python_version[1]} required, "
    f"found {py_ver.major}.{py_ver.minor}."
    )
    raise RuntimeError("GPU environment check failed.")

    def _train(cfg: DictConfig) -> None:
    _check_gpu_environment(cfg.gpu_requirements)
    ### get parameters ###
    # get data
    data_path = Path(cfg.data.prefix.processed) / "processed_dataset.h5"
    X, y, _ = get_processed_dataset(cfg, data_path, "Training")
    # set GPU memory growth
    if cfg.train.is_memory_growth:
    _allow_memory_growth()
    # enable asynchronous CUDA memory allocation
    os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async" # NOTE: set TensorFlow to use NVIDIA’s stream-ordered memory allocator (based on `cudaMallocAsync/cudaFreeAsync`) instead of TF’s legacy BFC (Best-Fit with Coalescing) GPU allocator. In other words, it switches TF’s GPU allocator to the async CUDA allocator (introduced in CUDA 11.2+), which tends to suffer far less fragmentation than BFC. `cudaMallocAsync/cudaFreeAsync` tie allocations/frees to CUDA streams, so memory can be returned to a pool and reused without device-wide synchronization. This reduces stalls and fragmentation that often bite deep-learning workloads with many short-lived tensors. Instead of large fixed reservations and fragmented free lists in the BFC dump, the async allocator uses a memory pool that grows as needed and reuses blocks more effectively, stopping the “allocator ran out of memory” problem, which occurs even though total VRAM was available.
    os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" # NOTE: disable oneDNN optimizations to avoid unwanted behavior in some operations (e.g., `BatchNormalization`). Using this library can provide significant speedups, sometimes 2–4×, especially on Intel CPUs with features like AVX‑512, VNNI, etc. However, because oneDNN may change the order of operations, it can introduce tiny floating-point numerical differences compared to the standard TensorFlow path. This is purely due to rounding—not a bug, but something to be aware of. Setting TF_ENABLE_ONEDNN_OPTS=0 disables these optimizations, restoring TensorFlow’s default computation behavior.
    # model
    Clf = get_classifier(cfg.model.name)
    # instantiate the model
    _set_seed(cfg.seed)
    clf = Clf(random_state=cfg.seed)
    # instantiate the K-fold cross-validation object
    cv = StratifiedKFold(
    n_splits=cfg.train.n_folds,
    shuffle=False,#cfg.train.is_shuffle,
    random_state=cfg.seed
    )
    # instantiate the grid search object
    grid_search = TSCGridSearchCV(
    estimator=clf,
    param_grid=dict(cfg.train.grid_params),
    cv=cv,
    n_jobs=cfg.train.n_jobs,
    scoring=cfg.train.scoring,
    verbose=4-(logger.level(cfg.log.cli.level).no//10),
    return_train_score=cfg.train.is_return_train_score,
    )
    # train classifier with k-fold cross-validation
    logger.info(f"Start training with grid search over {len(cfg.train.grid_params)} hyperparameter sets for the classifier {cfg.model.name} and seed={cfg.seed}.")
    _set_seed(cfg.seed) # ???: the seed needs to be reset before calling `fit()`, otherwise the results will not be reproducible (?)
    logger.info(f"The seed used during the training phase is {cfg.seed}.")
    grid_search.fit(X, y)
    logger.info(f"The best parameter set is {grid_search.best_params_}, leading to an avarage {cfg.train.scoring} of {grid_search.best_score_:.4f}.")
    if cfg.log.cli.level == "DEBUG":
    grid_search.best_estimator_.model_.summary() # HACK: `summary()` directly sends to STDOUT, so no `print` or `logger.debug()` is required
    # save best parameters for DVC Studio consumption
    dvcstudio_bypass_path = Path(cfg.out.prefix) / "dvcstudio_bypass.yaml"
    with open(dvcstudio_bypass_path, "w") as f:
    # Store best estimator hyperparameters as top-level keys
    grid_search.best_params_['best_score'] = float(grid_search.best_score_)
    yaml.safe_dump(grid_search.best_params_, f, default_flow_style=False, sort_keys=False)
    # save classifier
    clf_path = Path(cfg.model.prefix) / "classifier"
    grid_search.best_estimator_.save(clf_path)
    if cfg.log.cli.level == "DEBUG":
    clf_digest = get_md5sum_file(f"{clf_path}.zip", cfg.md5sum_digest_chunksize)
    logger.debug(f"The MD5 checksum for best trained classifier: {clf_digest}.")

    # save per-fold validation scores of the best parameter set
    # NOTE: `cv_test_scores_best` (actually, the validation scores during CV): What: One score per split on the held‑out validation fold (the “test” fold in CV) for the hyperparameter set that ultimately won; When: During grid_search.fit, after training on the 4‑fold union, it evaluates on the 1 held‑out fold. Do this for all 5 splits → 5 numbers.
    # NOTE: `cv_train_scores_best`: What: One score per split on the training portion of that split (i.e., the union of the 4 folds used to fit for that split), for the same best hyperparameter set; When: Also during grid_search.fit, right after fitting on the 4‑fold union, it scores on that same training data. It’s not 4 separate scores; it’s a single score over the concatenated 4 folds per split → 5 numbers.
    # NOTE: Both sets are produced during cross-validation and are used for diagnostics (only the mean validation/test score drives model selection). After selection, with refit=True, the best_estimator_ is retrained on the full training data; that refit doesn’t produce per‑fold scores.
    try:
    cvres = grid_search.cv_results_
    n_splits = cfg.train.n_folds
    split_keys = [f"split{i}_test_score" for i in range(n_splits)]
    scores = [float(cvres[k][grid_search.best_index_]) for k in split_keys]
    # Write as YAML mapping so DVC Experiments treats each as a scalar
    cv_scores_yaml = {f"fold_{i}": s for i, s in enumerate(scores, start=1)}
    cv_scores_path = Path(cfg.out.prefix) / "cv_val_scores_best.yaml"
    with open(cv_scores_path, "w") as f:
    yaml.safe_dump({"cv_val_scores_best": cv_scores_yaml}, f, sort_keys=False)
    logger.info(f"Saved per-fold validation {cfg.train.scoring} for best params at {cv_scores_path}.")
    except Exception as e:
    logger.warning(f"Could not extract per-fold validation scores for best params: {e}")

    # save per-fold TRAIN scores of the best parameter set (if requested)
    if getattr(cfg.train, "is_return_train_score", False):
    try:
    cvres = grid_search.cv_results_
    n_splits = cfg.train.n_folds
    split_train_keys = [f"split{i}_train_score" for i in range(n_splits)]
    train_scores_available = all(k in cvres for k in split_train_keys)

    train_scores = None
    if train_scores_available:
    train_scores = [float(cvres[k][grid_search.best_index_]) for k in split_train_keys]

    # mean train score may also be present
    mean_train_score = None
    if "mean_train_score" in cvres:
    mean_train_score = float(cvres["mean_train_score"][grid_search.best_index_])

    # Only write if we actually have something
    if train_scores is not None or mean_train_score is not None:
    payload = {}
    if train_scores is not None:
    payload["cv_train_scores_best"] = {
    f"fold_{i}": s for i, s in enumerate(train_scores, start=1)
    }
    if mean_train_score is not None:
    payload["mean_train_score_best"] = mean_train_score

    cv_train_scores_path = Path(cfg.out.prefix) / "cv_train_scores_best.yaml"
    with open(cv_train_scores_path, "w") as f:
    yaml.safe_dump(payload, f, sort_keys=False)
    logger.info(
    f"Saved train scores for best params at {cv_train_scores_path}."
    )
    else:
    logger.warning(
    "return_train_score enabled but train scores not found in cv_results_."
    )
    except Exception as e:
    logger.warning(f"Could not extract/train scores for best params: {e}")
    # save learning curve
    learning_curve = grid_search.best_estimator_.summary()["accuracy"]
    df = pd.DataFrame({
    "epoch": np.arange(1, len(learning_curve)+1),
    "accuracies": learning_curve,
    })
    learning_curve_path = Path(cfg.out.prefix) / "learning_curve.csv"
    df.to_csv(learning_curve_path, index=False)

    @hydra.main(version_base=None, config_path="../", config_name="params.yaml")
    def _main(cfg: DictConfig) -> int:
    cfg = set_hydra_cfg(cfg)
    _train(cfg)
    return 0

    if __name__ == "__main__":
    sys.exit(_main())