Created
September 20, 2024 06:01
-
-
Save Sambosis/99a2d3d42df5c119fe79c5b39e39d26b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Install necessary packages | |
| # Uncomment the following lines if running in a new environment | |
| # !pip install fluidstack -q | |
| # !pip install pytorch_lightning tensorflow icecream tensorboardX rich wandb -q | |
| # Standard Library Imports | |
| from calendar import c | |
| import os | |
| import io | |
| import time | |
| import random | |
| from dataclasses import dataclass | |
| import multiprocessing | |
| # Third-Party Imports | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.metrics import precision_recall_curve | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import torch.nn.functional as F | |
| from torch.utils.data import DataLoader, Dataset | |
| import pytorch_lightning as pl | |
| from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping, RichProgressBar | |
| from icecream import ic | |
| from tqdm import tqdm | |
| from sklearn.preprocessing import MinMaxScaler | |
| import matplotlib.pyplot as plt | |
| from PIL import Image | |
| import requests | |
| from rich.console import Console | |
| from rich.table import Table | |
| from rich.text import Text | |
| from rich.box import ROUNDED | |
| import wandb | |
| from pytorch_lightning.loggers import WandbLogger | |
| # Initialize Rich Console | |
| console = Console() | |
| num_cpus = multiprocessing.cpu_count() | |
| print(f"Number of CPU cores available: {num_cpus}") | |
| # Configuration Dataclass | |
| @dataclass | |
| class Config: | |
| VERSION_N: int = 68 | |
| RECORDS_TO_LOAD: int = 225040 | |
| N_PAST: int = 3 * 12 * 3 # 1 week of 10-minute intervals | |
| N_FUTURE: int = 1 * 12 * 2 # 1 day of 10-minute intervals | |
| BATCH_SIZE: int = 2100 | |
| HIDDEN_SIZE: int = 512 | |
| NUM_LAYERS: int = 6 | |
| NUM_EPOCHS: int = 60 | |
| HOT_RESTART: bool = False | |
| TRAIN_FIRST: bool = True | |
| EPOCH_TO_RESTART: int = 50 | |
| BATCH_FACTOR: int = 81 | |
| DEBUG_FREQ: int = 180 | |
| num_cpus = multiprocessing.cpu_count() | |
| NUM_WORKERS = min(32, num_cpus // 5) | |
| DEBUG_ON: bool = True | |
| DATA_URL: str = 'https://sambo.us-iad-1.linodeobjects.com/fillnan_combined_df.csv' | |
| DATA_FILE: str = './data/fill_nan_df.csv' | |
| MODEL_PATH: str = "/home/ubuntu/models/TransformerModel/model-epoch=15-val_loss=3.64.ckpt" | |
| MODEL_SAVE_PATH: str = './models/TransformerModel' | |
| DEVICE: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| EPSILON: float = 1e-4 | |
| # Initialize Configuration | |
| cfg = Config() | |
| # Set Random Seed for Reproducibility | |
| pl.seed_everything(42, workers=True) | |
| # Print Device Information | |
| print(f"Using device: {cfg.DEVICE}") | |
| # Initialize IceCream Debugging | |
| if cfg.DEBUG_ON: | |
| ic.enable() | |
| else: | |
| ic.disable() | |
| # Ensure Model Save Directory Exists | |
| os.makedirs(cfg.MODEL_SAVE_PATH, exist_ok=True) | |
| # PositionalEncoding Class | |
| class PositionalEncoding(nn.Module): | |
| def __init__(self, d_model, max_len=5000): | |
| super(PositionalEncoding, self).__init__() | |
| pe = torch.zeros(max_len, d_model) # (max_len, d_model) | |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (max_len, 1) | |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) # (d_model/2,) | |
| pe[:, 0::2] = torch.sin(position * div_term) # Even indices | |
| pe[:, 1::2] = torch.cos(position * div_term) # Odd indices | |
| pe = pe.unsqueeze(0) # (1, max_len, d_model) | |
| self.register_buffer('pe', pe) | |
| def forward(self, x): | |
| seq_len = x.size(1) | |
| x = x + self.pe[:, :seq_len, :] | |
| return x | |
| # Transformer-Based Model | |
| class CryptoTransformer(nn.Module): | |
| def __init__( | |
| self, | |
| input_size, | |
| d_model=512, | |
| nhead=8, | |
| num_encoder_layers=6, | |
| num_decoder_layers=6, | |
| dim_feedforward=2048, | |
| dropout=0.2, | |
| activation="relu", | |
| n_future=24, | |
| num_outputs=24, | |
| max_seq_length=5000 | |
| ): | |
| super(CryptoTransformer, self).__init__() | |
| self.input_size = input_size | |
| self.d_model = d_model | |
| self.n_future = n_future | |
| self.num_outputs = num_outputs | |
| # Input linear layer | |
| self.input_fc = nn.Linear(input_size, d_model) | |
| # Positional Encoding | |
| self.pos_encoder = PositionalEncoding(d_model, max_len=max_seq_length) | |
| self.pos_decoder = PositionalEncoding(d_model, max_len=max_seq_length) | |
| encoder_layer = nn.TransformerEncoderLayer( | |
| d_model=d_model, | |
| nhead=nhead, | |
| dim_feedforward=dim_feedforward, | |
| dropout=dropout, | |
| activation=activation, | |
| batch_first=True # Add this parameter | |
| ) | |
| self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers) | |
| decoder_layer = nn.TransformerDecoderLayer( | |
| d_model=d_model, | |
| nhead=nhead, | |
| dim_feedforward=dim_feedforward, | |
| dropout=dropout, | |
| activation=activation, | |
| batch_first=True # Add this parameter | |
| ) | |
| self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_decoder_layers) | |
| # Output linear layer | |
| self.output_fc = nn.Linear(d_model, num_outputs) | |
| # Layer normalization | |
| self.layer_norm = nn.LayerNorm(d_model) | |
| # Dropout | |
| self.dropout = nn.Dropout(dropout) | |
| def generate_square_subsequent_mask(self, sz): | |
| mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) | |
| mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) | |
| return mask # (sz, sz) | |
| def forward(self, src, tgt): | |
| """ | |
| Args: | |
| src: (batch_size, n_past, num_features) | |
| tgt: (batch_size, n_future, num_features) | |
| Returns: | |
| out: (batch_size, n_future, num_outputs) | |
| """ | |
| # print(f"src shape: {src.shape}") # Expected: (batch_size, n_past, d_model) | |
| # print(f"tgt shape: {tgt.shape}") # Expected: (batch_size, n_future, d_model) | |
| batch_size = src.size(0) | |
| # Input embedding | |
| src = self.input_fc(src) * np.sqrt(self.d_model) # (batch_size, n_past, d_model) | |
| tgt = self.input_fc(tgt) * np.sqrt(self.d_model) # (batch_size, n_future, d_model) | |
| # Add positional encoding | |
| src = self.pos_encoder(src) # (batch_size, n_past, d_model) | |
| tgt = self.pos_decoder(tgt) # (batch_size, n_future, d_model) | |
| # Create masks | |
| tgt_mask = self.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device) # (n_future, n_future) | |
| # Transformer forward pass | |
| memory = self.transformer_encoder(src) # (batch_size, n_past, d_model) | |
| output = self.transformer_decoder(tgt, memory, tgt_mask=tgt_mask) # (batch_size, n_future, d_model) | |
| # Final linear layer | |
| out = self.output_fc(output) # (batch_size, n_future, num_outputs) | |
| return out # (batch_size, n_future, num_outputs) | |
| # BalancedCryptoLoss | |
| class BalancedCryptoLoss(nn.Module): | |
| def __init__(self, config: Config): | |
| super(BalancedCryptoLoss, self).__init__() | |
| self.config = config | |
| self.mse_weight = 800.0 | |
| self.mae_weight = 15.0 | |
| self.max_diff_weight = 3.0 | |
| self.balance_weight =35.0 | |
| self.direction_weight = 0.001 | |
| self.mean_diff_weight = 15.0 | |
| self.perc_diff_weight = 15.0 | |
| self.within_1pct_reward_weight = 30.0 | |
| self.reward_scaling = 0.6 | |
| self.epsilon = config.EPSILON | |
| self.debug_freq = config.DEBUG_FREQ | |
| self.epoch = 0 | |
| self.mean_mean_diff = 0.0 | |
| self.perc_cutoff = 0.005 | |
| def directional_loss(self, preds, target): | |
| direction_pred = (preds[:, 1:] - preds[:, :-1]).sign() | |
| direction_true = (target[:, 1:] - target[:, :-1]).sign() | |
| # Convert signs to 0 and 1 | |
| direction_pred = (direction_pred + 1) / 2 | |
| direction_true = (direction_true + 1) / 2 | |
| # Clamp values to prevent BCE from receiving exact 0 or 1 | |
| direction_pred = torch.clamp(direction_pred, 1e-7, 1 - 1e-7) | |
| direction_true = torch.clamp(direction_true, 1e-7, 1 - 1e-7) | |
| return F.binary_cross_entropy(direction_pred, direction_true).mean() * self.direction_weight | |
| def mse_loss_component(self, y_pred, y_true): | |
| return F.mse_loss(y_pred, y_true) * self.mse_weight | |
| def mae_loss_component(self, y_pred, y_true): | |
| return F.l1_loss(y_pred, y_true) * self.mae_weight | |
| def percentage_diff_component(self, y_pred, y_true): | |
| perc_diff = torch.abs((y_pred - y_true) / (self.epsilon + y_true)) | |
| self.mean_mean_diff = torch.mean(perc_diff).item() | |
| return torch.mean(perc_diff) * self.perc_diff_weight | |
| def max_diff_component(self, perc_diff): | |
| max_diffs, _ = torch.max(perc_diff, dim=1) | |
| return torch.mean(max_diffs) * self.max_diff_weight | |
| def imbalance_component(self, perc_diff): | |
| overpredict = torch.relu(perc_diff) | |
| underpredict = torch.relu(-perc_diff) | |
| imbalance = torch.abs(torch.mean(overpredict, dim=1) - torch.mean(underpredict, dim=1)) | |
| return torch.mean(imbalance) * self.balance_weight | |
| def reward_component(self, y_pred, y_true): | |
| percentage_diff = torch.abs((y_pred - y_true) / (self.epsilon + y_true)) | |
| within_1pct = (percentage_diff <= self.perc_cutoff).float() | |
| within_1pct_ratio = torch.mean(within_1pct) | |
| return within_1pct_ratio * self.within_1pct_reward_weight | |
| def compute_all_losses(self, y_pred, y_true): | |
| mse_loss = self.mse_loss_component(y_pred, y_true) | |
| mae_loss = self.mae_loss_component(y_pred, y_true) | |
| perc_diff_loss = self.percentage_diff_component(y_pred, y_true) | |
| max_diff_loss = self.max_diff_component(torch.abs((y_pred - y_true) / (self.epsilon + y_true))) | |
| imbalance_loss = self.imbalance_component(torch.abs((y_pred - y_true) / (self.epsilon + y_true))) | |
| direction_loss = self.directional_loss(y_pred, y_true) | |
| reward = self.reward_component(y_pred, y_true) | |
| return mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward | |
| def forward(self, y_pred, y_true): | |
| self.epoch += 1 | |
| # Compute Loss Components | |
| mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward = self.compute_all_losses(y_pred, y_true) | |
| # Combine Loss Components | |
| final_loss = (mse_loss + mae_loss + perc_diff_loss + max_diff_loss + | |
| imbalance_loss + direction_loss - (reward * self.reward_scaling)) | |
| # Clamp Final Loss to prevent negative values | |
| final_loss = torch.clamp(final_loss, min=0.0) | |
| perc_cutoff = self.get_perc_cutoff( ) | |
| return final_loss, mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward, perc_cutoff | |
| def set_perc_cutoff(self, perc_cutoff): | |
| self.perc_cutoff = perc_cutoff | |
| def get_last_mean_diff(self): | |
| return self.mean_mean_diff | |
| def get_perc_cutoff(self): | |
| return self.perc_cutoff | |
| # Custom Dataset | |
| class CryptoDataset(Dataset): | |
| def __init__(self, data: pd.DataFrame, n_past: int, n_future: int): | |
| self.data = data | |
| self.n_past = n_past | |
| self.n_future = n_future | |
| def __len__(self): | |
| return len(self.data) - self.n_past - self.n_future + 1 | |
| def __getitem__(self, idx): | |
| x = self.data.iloc[idx:idx + self.n_past].values # (n_past, num_features) | |
| y = self.data.iloc[idx + self.n_past:idx + self.n_past + self.n_future].values # (n_future, num_features) | |
| return torch.FloatTensor(x), torch.FloatTensor(y) | |
| # Utility Functions | |
| def get_random_sample(dataframe: pd.DataFrame): | |
| """ | |
| Retrieve a random sample from the DataFrame. | |
| Args: | |
| dataframe (pd.DataFrame): DataFrame to sample from. | |
| Returns: | |
| tuple: (input_data, target_data) | |
| """ | |
| random_index = random.randint(0, len(dataframe) - cfg.N_PAST - cfg.N_FUTURE) | |
| input_data = dataframe.iloc[random_index:random_index + cfg.N_PAST].values | |
| target_data = dataframe.iloc[random_index + cfg.N_PAST:random_index + cfg.N_PAST + cfg.N_FUTURE].values | |
| return torch.FloatTensor(input_data), torch.FloatTensor(target_data) | |
| def prepare_input(input_data, device): | |
| """ | |
| Prepare input tensor for the model. | |
| Args: | |
| input_data (torch.FloatTensor): Input data. | |
| Returns: | |
| torch.FloatTensor: Prepared input tensor. | |
| """ | |
| return input_data.unsqueeze(0).to(device) | |
| def convert_to_numpy(input_data, target, prediction): | |
| """ | |
| Convert tensors to NumPy arrays. | |
| Args: | |
| input_data (torch.FloatTensor): Input data. | |
| target (torch.FloatTensor): Target data. | |
| prediction (torch.FloatTensor): Prediction data. | |
| Returns: | |
| tuple: (input_np, target_np, prediction_np) | |
| """ | |
| return input_data.cpu().numpy(), target.cpu().numpy(), prediction.cpu().numpy() | |
| def moving_average(data, window_size): | |
| """ | |
| Compute the moving average of the data. | |
| Args: | |
| data (np.ndarray): Input data. | |
| window_size (int): Window size for moving average. | |
| Returns: | |
| np.ndarray: Moving average of the data. | |
| """ | |
| return np.convolve(data, np.ones(window_size), 'valid') / window_size | |
| def gaussian_smoothing(data, window_size, sigma): | |
| """ | |
| Compute the Gaussian smoothing of the data. | |
| Args: | |
| data (np.ndarray): Input data. | |
| window_size (int): Window size for Gaussian smoothing. | |
| sigma (float): Standard deviation of the Gaussian kernel. | |
| Returns: | |
| np.ndarray: Gaussian smoothed data. | |
| """ | |
| # Generate Gaussian kernel | |
| x = np.linspace(-window_size // 2, window_size // 2, window_size) | |
| kernel = np.exp(-(x ** 2) / (2 * sigma ** 2)) | |
| kernel /= kernel.sum() | |
| # Convolve data with Gaussian kernel | |
| return np.convolve(data, kernel, 'valid') | |
| def visualize_predictions(target_np, prediction_np, n_future, scalers, filtered_df, model_save_path): | |
| """ | |
| Visualize and save prediction vs target plots. | |
| Args: | |
| target_np (np.ndarray): Target data. | |
| prediction_np (np.ndarray): Prediction data. | |
| n_future (int): Number of future intervals. | |
| scalers (dict): Dictionary of scalers for each column. | |
| filtered_df (pd.DataFrame): Filtered DataFrame for plotting. | |
| model_save_path (str): Path to save the plots. | |
| Returns: | |
| str: Path to the saved image. | |
| """ | |
| num_features = filtered_df.shape[1] | |
| max_cols = 7 | |
| num_rows = (num_features - 1) // max_cols + 1 | |
| num_cols = min(num_features, max_cols) | |
| plt.figure(figsize=(18 * num_cols / max_cols, 6 * num_rows)) | |
| window_size = 6 # Adjust this value to change the smoothing level | |
| for j in range(num_features): | |
| plt.subplot(num_rows, num_cols, j+1) | |
| col_name = filtered_df.columns[j] | |
| if col_name.endswith('XBT_price'): | |
| target_inverted = np.exp(scalers[col_name].inverse_transform(target_np[:, j].reshape(-1, 1)).flatten()) * 50000 | |
| prediction_inverted = np.exp(scalers[col_name].inverse_transform(prediction_np[0, :, j].reshape(-1, 1)).flatten()) * 50000 | |
| else: | |
| target_inverted = np.exp(scalers[col_name].inverse_transform(target_np[:, j].reshape(-1, 1)).flatten()) | |
| prediction_inverted = np.exp(scalers[col_name].inverse_transform(prediction_np[0, :, j].reshape(-1, 1)).flatten()) | |
| # Smooth the inverted data | |
| target_smooth = gaussian_smoothing(target_inverted, window_size, sigma=4) | |
| prediction_smooth = gaussian_smoothing(prediction_inverted, window_size, sigma=4) | |
| # Plot original data as light lines | |
| plt.plot(range(n_future), target_inverted, 'g', alpha=0.4) | |
| plt.plot(range(n_future), prediction_inverted, 'r', alpha=0.4) | |
| # Plot smoothed data as bold lines | |
| plt.plot(range(window_size - 1, n_future), target_smooth, "g", label='Target', linewidth=2) | |
| plt.plot(range(window_size - 1, n_future), prediction_smooth, "r", label='Prediction', linewidth=2) | |
| # Plot straight lines from start to end | |
| plt.plot([0, n_future - 1], [target_inverted[0], target_inverted[-1]], 'g--') | |
| plt.plot([0, n_future - 1], [prediction_inverted[0], prediction_inverted[-1]], 'r--') | |
| # Set y-axis limits to accommodate both target and prediction | |
| y_min = min(target_inverted.min(), prediction_inverted.min()) | |
| y_max = max(target_inverted.max(), prediction_inverted.max()) | |
| y_range = y_max - y_min | |
| plt.ylim(y_min - 0.1 * y_range, y_max + 0.1 * y_range) | |
| plt.title(col_name) | |
| plt.legend() | |
| plt.tight_layout() | |
| time_date = time.strftime("%Y%m%d-%H%M%S") | |
| image_path = os.path.join(model_save_path, f"{time_date}_predictions.png") | |
| plt.savefig(image_path) | |
| plt.close() | |
| return image_path | |
| # Lightning Wrapper | |
| class LightningWrapper(pl.LightningModule): | |
| def __init__(self, model, criterion, optimizer, scheduler, num_epochs: int, scaler_dict: dict, val_data: pd.DataFrame): | |
| super().__init__() | |
| self.model = model | |
| self.criterion = criterion | |
| self.optimizer = optimizer | |
| self.scheduler = scheduler | |
| self.num_epochs = num_epochs | |
| self.scaler_dict = scaler_dict | |
| self.val_data = val_data # For making predictions during logging | |
| self.perc_cutoff = criterion.get_perc_cutoff() | |
| self.reward = 0.0 | |
| def forward(self, src, tgt): | |
| return self.model(src, tgt) | |
| def training_step(self, batch, batch_idx): | |
| batch_X, batch_y = batch | |
| # Shift target sequence to the right and prepend zeros | |
| tgt_input = torch.zeros_like(batch_y) | |
| tgt_input[:, 1:, :] = batch_y[:, :-1, :] | |
| y_pred = self.model(batch_X, tgt_input) | |
| final_loss, mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward, perc_cutoff= self.criterion(y_pred, batch_y) | |
| # Log all loss components | |
| self.log('train/final_loss', final_loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=True) | |
| self.log('train/mse_loss', mse_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('train/mae_loss', mae_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('train/perc_diff_loss', perc_diff_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('train/max_diff_loss', max_diff_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('train/imbalance_loss', imbalance_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('train/direction_loss', direction_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('train/reward', reward, on_step=False, on_epoch=True, sync_dist=True) | |
| return final_loss | |
| def validation_step(self, batch, batch_idx): | |
| batch_X, batch_y = batch | |
| tgt_input = torch.zeros_like(batch_y) | |
| tgt_input[:, 1:, :] = batch_y[:, :-1, :] | |
| y_pred = self.model(batch_X, tgt_input) | |
| final_loss, mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward, perc_cutoff = self.criterion(y_pred, batch_y) | |
| # Log all loss components | |
| self.log('val_loss', final_loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=True) | |
| self.log('val/mse_loss', mse_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('val/mae_loss', mae_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('val/perc_diff_loss', perc_diff_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('val/max_diff_loss', max_diff_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('val/imbalance_loss', imbalance_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('val/direction_loss', direction_loss, on_step=False, on_epoch=True, sync_dist=True) | |
| self.log('val/reward', reward, on_step=False, on_epoch=True, sync_dist=True) | |
| self.reward = reward | |
| return final_loss | |
| def configure_optimizers(self): | |
| return { | |
| 'optimizer': self.optimizer, | |
| 'lr_scheduler': { | |
| 'scheduler': self.scheduler, | |
| 'monitor': 'val_loss' | |
| } | |
| } | |
| def on_validation_epoch_end(self): | |
| # Only the main process should perform logging | |
| if self.global_rank == 0: | |
| plot_path = self.generate_and_log_plots() | |
| if plot_path: | |
| # Read the image and log it to Wandb | |
| img = Image.open(plot_path) | |
| self.logger.experiment.log({ | |
| "Validation/Prediction_vs_Target": wandb.Image(img), | |
| "global_step": self.global_step | |
| }) | |
| # Optionally, remove the image file after logging | |
| os.remove(plot_path) | |
| # check if the reward is greater than 0.9 and if so reduce the percentage cutoff | |
| if self.reward > 0.9: | |
| self.criterion.set_perc_cutoff(self.criterion.get_perc_cutoff() * 0.8) | |
| self.log('val/perc_cutoff', self.criterion.get_perc_cutoff(), on_step=False, on_epoch=True, sync_dist=True) | |
| def generate_and_log_plots(self): | |
| """ | |
| Generate prediction vs target plots and save them to a temporary file. | |
| Returns the path to the saved image. | |
| """ | |
| # Make predictions on a random sample from validation data | |
| sample = get_random_sample(self.val_data) | |
| input_data, target = sample | |
| input_tensor = prepare_input(input_data, self.device) | |
| tgt_input = torch.zeros_like(target).unsqueeze(0).to(self.device) | |
| prediction = self.model(input_tensor, tgt_input) | |
| _, target_np, prediction_np = convert_to_numpy(input_tensor, target, prediction) | |
| # Prepare DataFrame for plotting | |
| predicted_df = pd.DataFrame(prediction_np.squeeze(), columns=self.val_data.columns) | |
| start_idx = random.randint(0, len(self.val_data) - cfg.N_FUTURE) | |
| last_timestamp = self.val_data.index[start_idx + cfg.N_PAST - 1] | |
| predicted_df.index = pd.date_range(start=last_timestamp, periods=cfg.N_FUTURE, freq='5min') | |
| start_timestamp = last_timestamp - (3) * pd.Timedelta(minutes=5) | |
| end_timestamp = last_timestamp + (3 + cfg.N_FUTURE) * pd.Timedelta(minutes=5) | |
| filtered_df = self.val_data[(self.val_data.index >= start_timestamp) & (self.val_data.index <= end_timestamp)] | |
| # Generate and save plot | |
| image_path = visualize_predictions(target_np, prediction_np, cfg.N_FUTURE, self.scaler_dict, filtered_df, cfg.MODEL_SAVE_PATH) | |
| return image_path | |
| def on_train_epoch_end(self): | |
| # Only the main process should perform logging | |
| if self.global_rank == 0: | |
| # Log weights and biases histograms | |
| for name, param in self.named_parameters(): | |
| self.logger.experiment.log({ | |
| f"Weights/{name}": wandb.Histogram(param.detach().cpu().numpy()), | |
| 'epoch': self.current_epoch | |
| }) | |
| # Log gradients if they exist | |
| if param.grad is not None: | |
| self.logger.experiment.log({ | |
| f"Gradients/{name}": wandb.Histogram(param.grad.detach().cpu().numpy()), | |
| 'epoch': self.current_epoch | |
| }) | |
| # Log learning rate | |
| optimizer = self.optimizers() | |
| lr = optimizer.param_groups[0]['lr'] | |
| self.logger.experiment.log({'learning_rate': lr, 'epoch': self.current_epoch}) | |
| def on_after_backward(self): | |
| # Only the main process should perform logging | |
| if self.global_rank == 0: | |
| total_norm = 0.0 | |
| for p in self.model.parameters(): | |
| if p.grad is not None: | |
| param_norm = p.grad.detach().data.norm(2) | |
| total_norm += param_norm.item() ** 2 | |
| total_norm = total_norm ** 0.5 | |
| self.logger.experiment.log({'Gradients/grad_total_norm': total_norm, 'step': self.global_step}) | |
| # Data Loading and Preprocessing | |
| def load_and_preprocess_data(file_path: str, download_url: str = None): | |
| """ | |
| Load and preprocess data from a CSV file. If the file does not exist, download it. | |
| Args: | |
| file_path (str): Path to the CSV file. | |
| download_url (str, optional): URL to download the CSV file. Defaults to None. | |
| Returns: | |
| pd.DataFrame: Preprocessed DataFrame. | |
| dict: Dictionary of scalers used for each column. | |
| """ | |
| ic("Starting data loading and preprocessing...") | |
| start_time = time.time() | |
| # Check if the file exists | |
| if not os.path.exists(file_path): | |
| ic(f"File {file_path} does not exist.") | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| if download_url: | |
| ic(f"Downloading file from {download_url}...") | |
| try: | |
| response = requests.get(download_url, stream=True) | |
| response.raise_for_status() | |
| with open(file_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| ic(f"File downloaded and saved to {file_path}") | |
| except requests.exceptions.RequestException as e: | |
| ic(f"Failed to download the file: {e}") | |
| raise | |
| else: | |
| ic("Download URL not provided. Cannot download the file.") | |
| raise FileNotFoundError(f"The file {file_path} does not exist and no download URL was provided.") | |
| # Load the DataFrame | |
| df = pd.read_csv(file_path, parse_dates=['timestamp']) | |
| df.set_index('timestamp', inplace=True) | |
| df = df.tail(cfg.RECORDS_TO_LOAD) | |
| # Preprocess the data | |
| scalers = {} | |
| for col in df.columns: | |
| df[col] = np.log(df[col]) | |
| scalers[col] = MinMaxScaler() | |
| df[col] = scalers[col].fit_transform(df[[col]]) | |
| ic(f"Data preprocessing completed in {time.time() - start_time:.2f} seconds") | |
| ic(f"DataFrame shape: {df.shape}") | |
| return df, scalers | |
| # Main Execution Block | |
| if __name__ == "__main__": | |
| torch.set_float32_matmul_precision("medium") | |
| # Load and preprocess data | |
| df, scalers = load_and_preprocess_data(cfg.DATA_FILE, cfg.DATA_URL) | |
| NUM_FEATURES = df.shape[1] | |
| # Initialize the Wandb logger and name your Wandb project | |
| logger = WandbLogger(project='my-awesome-project', log_model=True) # Set log_model to True | |
| # Log hyperparameters to Wandb | |
| logger.log_hyperparams({ | |
| "batch_size": cfg.BATCH_SIZE, | |
| "hidden_size": cfg.HIDDEN_SIZE, | |
| "num_layers": cfg.NUM_LAYERS, | |
| "num_epochs": cfg.NUM_EPOCHS, | |
| "learning_rate": 2e-4, | |
| "weight_decay": 1e-5 | |
| }) | |
| # Split data into training and validation | |
| train_size = int(0.8 * len(df)) | |
| train_data = df.iloc[:train_size] | |
| val_data = df.iloc[train_size:] | |
| # Create Datasets | |
| train_dataset = CryptoDataset(train_data, cfg.N_PAST, cfg.N_FUTURE) | |
| val_dataset = CryptoDataset(val_data, cfg.N_PAST, cfg.N_FUTURE) | |
| train_loader = DataLoader( | |
| train_dataset, | |
| batch_size=cfg.BATCH_SIZE, | |
| shuffle=False, | |
| num_workers=cfg.NUM_WORKERS, | |
| pin_memory=True | |
| ) | |
| val_loader = DataLoader( | |
| val_dataset, | |
| batch_size=cfg.BATCH_SIZE, | |
| shuffle=False, | |
| num_workers=cfg.NUM_WORKERS, | |
| pin_memory=True | |
| ) | |
| # Initialize Transformer Model | |
| model = CryptoTransformer( | |
| input_size=NUM_FEATURES, | |
| d_model=cfg.HIDDEN_SIZE, | |
| nhead=8, | |
| num_encoder_layers=cfg.NUM_LAYERS, | |
| num_decoder_layers=cfg.NUM_LAYERS, | |
| dim_feedforward=2048, | |
| dropout=0.2, | |
| activation="relu", | |
| n_future=cfg.N_FUTURE, | |
| num_outputs=NUM_FEATURES, | |
| max_seq_length=cfg.N_PAST + cfg.N_FUTURE | |
| ).to(cfg.DEVICE) | |
| # Initialize Loss Function | |
| criterion = BalancedCryptoLoss(cfg) | |
| # Initialize Optimizer and Scheduler | |
| optimizer = optim.AdamW(model.parameters(), lr=2e-4, weight_decay=1e-5) | |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.8, patience=5, min_lr=4e-6) | |
| # Handle Hot Restart | |
| if cfg.HOT_RESTART: | |
| try: | |
| model.load_state_dict(torch.load(cfg.MODEL_PATH), strict=False) | |
| print(f"Loaded previously saved final model from {cfg.MODEL_PATH}") | |
| except FileNotFoundError: | |
| print(f"No previous final model found at {cfg.MODEL_PATH}. Starting with a fresh model.") | |
| else: | |
| print("Starting with a fresh model.") | |
| if cfg.TRAIN_FIRST: | |
| # Initialize Lightning Wrapper | |
| wrapped_model = LightningWrapper( | |
| model=model, | |
| criterion=criterion, | |
| optimizer=optimizer, | |
| scheduler=scheduler, | |
| num_epochs=cfg.NUM_EPOCHS, | |
| scaler_dict=scalers, | |
| val_data=val_data | |
| ) | |
| # Update the EarlyStopping callback | |
| early_stopping_callback = EarlyStopping( | |
| monitor='val_loss', # Ensure this matches the logged metric | |
| patience=20, | |
| mode='min' | |
| ) | |
| # Update the ModelCheckpoint callback | |
| checkpoint_callback = ModelCheckpoint( | |
| monitor='val_loss', # Ensure this matches the logged metric | |
| dirpath=cfg.MODEL_SAVE_PATH, | |
| filename='model-{epoch:02d}-{val_loss:.2f}', | |
| save_top_k=3, | |
| mode='min', | |
| ) | |
| # Initialize Progress Bar Callback | |
| progress_bar = RichProgressBar(refresh_rate=11) # Set your desired refresh rate | |
| # Initialize Trainer with Wandb logger | |
| trainer = pl.Trainer( | |
| max_epochs=cfg.NUM_EPOCHS, | |
| logger=logger, # Use Wandb logger here | |
| accelerator='gpu', | |
| devices=torch.cuda.device_count(), | |
| strategy='ddp_find_unused_parameters_true', # Distributed Data Parallel | |
| callbacks=[progress_bar, checkpoint_callback, early_stopping_callback], | |
| enable_progress_bar=True, | |
| log_every_n_steps=25, | |
| # precision=16, # Optional: Use mixed precision for faster training | |
| gradient_clip_val=10.0, # Optional: Gradient clipping | |
| ) | |
| # Start Training | |
| trainer.fit(wrapped_model, train_dataloaders=train_loader, val_dataloaders=val_loader) | |
| else: | |
| print("Skipping training as TRAIN_FIRST is set to False.") | |
| wandb.finish() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment