Skip to content

Instantly share code, notes, and snippets.

@Sambosis
Created September 20, 2024 06:01
Show Gist options
  • Select an option

  • Save Sambosis/99a2d3d42df5c119fe79c5b39e39d26b to your computer and use it in GitHub Desktop.

Select an option

Save Sambosis/99a2d3d42df5c119fe79c5b39e39d26b to your computer and use it in GitHub Desktop.
# Install necessary packages
# Uncomment the following lines if running in a new environment
# !pip install fluidstack -q
# !pip install pytorch_lightning tensorflow icecream tensorboardX rich wandb -q
# Standard Library Imports
from calendar import c
import os
import io
import time
import random
from dataclasses import dataclass
import multiprocessing
# Third-Party Imports
import numpy as np
import pandas as pd
from sklearn.metrics import precision_recall_curve
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping, RichProgressBar
from icecream import ic
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from PIL import Image
import requests
from rich.console import Console
from rich.table import Table
from rich.text import Text
from rich.box import ROUNDED
import wandb
from pytorch_lightning.loggers import WandbLogger
# Initialize Rich Console
console = Console()
num_cpus = multiprocessing.cpu_count()
print(f"Number of CPU cores available: {num_cpus}")
# Configuration Dataclass
@dataclass
class Config:
VERSION_N: int = 68
RECORDS_TO_LOAD: int = 225040
N_PAST: int = 3 * 12 * 3 # 1 week of 10-minute intervals
N_FUTURE: int = 1 * 12 * 2 # 1 day of 10-minute intervals
BATCH_SIZE: int = 2100
HIDDEN_SIZE: int = 512
NUM_LAYERS: int = 6
NUM_EPOCHS: int = 60
HOT_RESTART: bool = False
TRAIN_FIRST: bool = True
EPOCH_TO_RESTART: int = 50
BATCH_FACTOR: int = 81
DEBUG_FREQ: int = 180
num_cpus = multiprocessing.cpu_count()
NUM_WORKERS = min(32, num_cpus // 5)
DEBUG_ON: bool = True
DATA_URL: str = 'https://sambo.us-iad-1.linodeobjects.com/fillnan_combined_df.csv'
DATA_FILE: str = './data/fill_nan_df.csv'
MODEL_PATH: str = "/home/ubuntu/models/TransformerModel/model-epoch=15-val_loss=3.64.ckpt"
MODEL_SAVE_PATH: str = './models/TransformerModel'
DEVICE: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EPSILON: float = 1e-4
# Initialize Configuration
cfg = Config()
# Set Random Seed for Reproducibility
pl.seed_everything(42, workers=True)
# Print Device Information
print(f"Using device: {cfg.DEVICE}")
# Initialize IceCream Debugging
if cfg.DEBUG_ON:
ic.enable()
else:
ic.disable()
# Ensure Model Save Directory Exists
os.makedirs(cfg.MODEL_SAVE_PATH, exist_ok=True)
# PositionalEncoding Class
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model) # (max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (max_len, 1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) # (d_model/2,)
pe[:, 0::2] = torch.sin(position * div_term) # Even indices
pe[:, 1::2] = torch.cos(position * div_term) # Odd indices
pe = pe.unsqueeze(0) # (1, max_len, d_model)
self.register_buffer('pe', pe)
def forward(self, x):
seq_len = x.size(1)
x = x + self.pe[:, :seq_len, :]
return x
# Transformer-Based Model
class CryptoTransformer(nn.Module):
def __init__(
self,
input_size,
d_model=512,
nhead=8,
num_encoder_layers=6,
num_decoder_layers=6,
dim_feedforward=2048,
dropout=0.2,
activation="relu",
n_future=24,
num_outputs=24,
max_seq_length=5000
):
super(CryptoTransformer, self).__init__()
self.input_size = input_size
self.d_model = d_model
self.n_future = n_future
self.num_outputs = num_outputs
# Input linear layer
self.input_fc = nn.Linear(input_size, d_model)
# Positional Encoding
self.pos_encoder = PositionalEncoding(d_model, max_len=max_seq_length)
self.pos_decoder = PositionalEncoding(d_model, max_len=max_seq_length)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
activation=activation,
batch_first=True # Add this parameter
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
decoder_layer = nn.TransformerDecoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
activation=activation,
batch_first=True # Add this parameter
)
self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)
# Output linear layer
self.output_fc = nn.Linear(d_model, num_outputs)
# Layer normalization
self.layer_norm = nn.LayerNorm(d_model)
# Dropout
self.dropout = nn.Dropout(dropout)
def generate_square_subsequent_mask(self, sz):
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask # (sz, sz)
def forward(self, src, tgt):
"""
Args:
src: (batch_size, n_past, num_features)
tgt: (batch_size, n_future, num_features)
Returns:
out: (batch_size, n_future, num_outputs)
"""
# print(f"src shape: {src.shape}") # Expected: (batch_size, n_past, d_model)
# print(f"tgt shape: {tgt.shape}") # Expected: (batch_size, n_future, d_model)
batch_size = src.size(0)
# Input embedding
src = self.input_fc(src) * np.sqrt(self.d_model) # (batch_size, n_past, d_model)
tgt = self.input_fc(tgt) * np.sqrt(self.d_model) # (batch_size, n_future, d_model)
# Add positional encoding
src = self.pos_encoder(src) # (batch_size, n_past, d_model)
tgt = self.pos_decoder(tgt) # (batch_size, n_future, d_model)
# Create masks
tgt_mask = self.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device) # (n_future, n_future)
# Transformer forward pass
memory = self.transformer_encoder(src) # (batch_size, n_past, d_model)
output = self.transformer_decoder(tgt, memory, tgt_mask=tgt_mask) # (batch_size, n_future, d_model)
# Final linear layer
out = self.output_fc(output) # (batch_size, n_future, num_outputs)
return out # (batch_size, n_future, num_outputs)
# BalancedCryptoLoss
class BalancedCryptoLoss(nn.Module):
def __init__(self, config: Config):
super(BalancedCryptoLoss, self).__init__()
self.config = config
self.mse_weight = 800.0
self.mae_weight = 15.0
self.max_diff_weight = 3.0
self.balance_weight =35.0
self.direction_weight = 0.001
self.mean_diff_weight = 15.0
self.perc_diff_weight = 15.0
self.within_1pct_reward_weight = 30.0
self.reward_scaling = 0.6
self.epsilon = config.EPSILON
self.debug_freq = config.DEBUG_FREQ
self.epoch = 0
self.mean_mean_diff = 0.0
self.perc_cutoff = 0.005
def directional_loss(self, preds, target):
direction_pred = (preds[:, 1:] - preds[:, :-1]).sign()
direction_true = (target[:, 1:] - target[:, :-1]).sign()
# Convert signs to 0 and 1
direction_pred = (direction_pred + 1) / 2
direction_true = (direction_true + 1) / 2
# Clamp values to prevent BCE from receiving exact 0 or 1
direction_pred = torch.clamp(direction_pred, 1e-7, 1 - 1e-7)
direction_true = torch.clamp(direction_true, 1e-7, 1 - 1e-7)
return F.binary_cross_entropy(direction_pred, direction_true).mean() * self.direction_weight
def mse_loss_component(self, y_pred, y_true):
return F.mse_loss(y_pred, y_true) * self.mse_weight
def mae_loss_component(self, y_pred, y_true):
return F.l1_loss(y_pred, y_true) * self.mae_weight
def percentage_diff_component(self, y_pred, y_true):
perc_diff = torch.abs((y_pred - y_true) / (self.epsilon + y_true))
self.mean_mean_diff = torch.mean(perc_diff).item()
return torch.mean(perc_diff) * self.perc_diff_weight
def max_diff_component(self, perc_diff):
max_diffs, _ = torch.max(perc_diff, dim=1)
return torch.mean(max_diffs) * self.max_diff_weight
def imbalance_component(self, perc_diff):
overpredict = torch.relu(perc_diff)
underpredict = torch.relu(-perc_diff)
imbalance = torch.abs(torch.mean(overpredict, dim=1) - torch.mean(underpredict, dim=1))
return torch.mean(imbalance) * self.balance_weight
def reward_component(self, y_pred, y_true):
percentage_diff = torch.abs((y_pred - y_true) / (self.epsilon + y_true))
within_1pct = (percentage_diff <= self.perc_cutoff).float()
within_1pct_ratio = torch.mean(within_1pct)
return within_1pct_ratio * self.within_1pct_reward_weight
def compute_all_losses(self, y_pred, y_true):
mse_loss = self.mse_loss_component(y_pred, y_true)
mae_loss = self.mae_loss_component(y_pred, y_true)
perc_diff_loss = self.percentage_diff_component(y_pred, y_true)
max_diff_loss = self.max_diff_component(torch.abs((y_pred - y_true) / (self.epsilon + y_true)))
imbalance_loss = self.imbalance_component(torch.abs((y_pred - y_true) / (self.epsilon + y_true)))
direction_loss = self.directional_loss(y_pred, y_true)
reward = self.reward_component(y_pred, y_true)
return mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward
def forward(self, y_pred, y_true):
self.epoch += 1
# Compute Loss Components
mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward = self.compute_all_losses(y_pred, y_true)
# Combine Loss Components
final_loss = (mse_loss + mae_loss + perc_diff_loss + max_diff_loss +
imbalance_loss + direction_loss - (reward * self.reward_scaling))
# Clamp Final Loss to prevent negative values
final_loss = torch.clamp(final_loss, min=0.0)
perc_cutoff = self.get_perc_cutoff( )
return final_loss, mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward, perc_cutoff
def set_perc_cutoff(self, perc_cutoff):
self.perc_cutoff = perc_cutoff
def get_last_mean_diff(self):
return self.mean_mean_diff
def get_perc_cutoff(self):
return self.perc_cutoff
# Custom Dataset
class CryptoDataset(Dataset):
def __init__(self, data: pd.DataFrame, n_past: int, n_future: int):
self.data = data
self.n_past = n_past
self.n_future = n_future
def __len__(self):
return len(self.data) - self.n_past - self.n_future + 1
def __getitem__(self, idx):
x = self.data.iloc[idx:idx + self.n_past].values # (n_past, num_features)
y = self.data.iloc[idx + self.n_past:idx + self.n_past + self.n_future].values # (n_future, num_features)
return torch.FloatTensor(x), torch.FloatTensor(y)
# Utility Functions
def get_random_sample(dataframe: pd.DataFrame):
"""
Retrieve a random sample from the DataFrame.
Args:
dataframe (pd.DataFrame): DataFrame to sample from.
Returns:
tuple: (input_data, target_data)
"""
random_index = random.randint(0, len(dataframe) - cfg.N_PAST - cfg.N_FUTURE)
input_data = dataframe.iloc[random_index:random_index + cfg.N_PAST].values
target_data = dataframe.iloc[random_index + cfg.N_PAST:random_index + cfg.N_PAST + cfg.N_FUTURE].values
return torch.FloatTensor(input_data), torch.FloatTensor(target_data)
def prepare_input(input_data, device):
"""
Prepare input tensor for the model.
Args:
input_data (torch.FloatTensor): Input data.
Returns:
torch.FloatTensor: Prepared input tensor.
"""
return input_data.unsqueeze(0).to(device)
def convert_to_numpy(input_data, target, prediction):
"""
Convert tensors to NumPy arrays.
Args:
input_data (torch.FloatTensor): Input data.
target (torch.FloatTensor): Target data.
prediction (torch.FloatTensor): Prediction data.
Returns:
tuple: (input_np, target_np, prediction_np)
"""
return input_data.cpu().numpy(), target.cpu().numpy(), prediction.cpu().numpy()
def moving_average(data, window_size):
"""
Compute the moving average of the data.
Args:
data (np.ndarray): Input data.
window_size (int): Window size for moving average.
Returns:
np.ndarray: Moving average of the data.
"""
return np.convolve(data, np.ones(window_size), 'valid') / window_size
def gaussian_smoothing(data, window_size, sigma):
"""
Compute the Gaussian smoothing of the data.
Args:
data (np.ndarray): Input data.
window_size (int): Window size for Gaussian smoothing.
sigma (float): Standard deviation of the Gaussian kernel.
Returns:
np.ndarray: Gaussian smoothed data.
"""
# Generate Gaussian kernel
x = np.linspace(-window_size // 2, window_size // 2, window_size)
kernel = np.exp(-(x ** 2) / (2 * sigma ** 2))
kernel /= kernel.sum()
# Convolve data with Gaussian kernel
return np.convolve(data, kernel, 'valid')
def visualize_predictions(target_np, prediction_np, n_future, scalers, filtered_df, model_save_path):
"""
Visualize and save prediction vs target plots.
Args:
target_np (np.ndarray): Target data.
prediction_np (np.ndarray): Prediction data.
n_future (int): Number of future intervals.
scalers (dict): Dictionary of scalers for each column.
filtered_df (pd.DataFrame): Filtered DataFrame for plotting.
model_save_path (str): Path to save the plots.
Returns:
str: Path to the saved image.
"""
num_features = filtered_df.shape[1]
max_cols = 7
num_rows = (num_features - 1) // max_cols + 1
num_cols = min(num_features, max_cols)
plt.figure(figsize=(18 * num_cols / max_cols, 6 * num_rows))
window_size = 6 # Adjust this value to change the smoothing level
for j in range(num_features):
plt.subplot(num_rows, num_cols, j+1)
col_name = filtered_df.columns[j]
if col_name.endswith('XBT_price'):
target_inverted = np.exp(scalers[col_name].inverse_transform(target_np[:, j].reshape(-1, 1)).flatten()) * 50000
prediction_inverted = np.exp(scalers[col_name].inverse_transform(prediction_np[0, :, j].reshape(-1, 1)).flatten()) * 50000
else:
target_inverted = np.exp(scalers[col_name].inverse_transform(target_np[:, j].reshape(-1, 1)).flatten())
prediction_inverted = np.exp(scalers[col_name].inverse_transform(prediction_np[0, :, j].reshape(-1, 1)).flatten())
# Smooth the inverted data
target_smooth = gaussian_smoothing(target_inverted, window_size, sigma=4)
prediction_smooth = gaussian_smoothing(prediction_inverted, window_size, sigma=4)
# Plot original data as light lines
plt.plot(range(n_future), target_inverted, 'g', alpha=0.4)
plt.plot(range(n_future), prediction_inverted, 'r', alpha=0.4)
# Plot smoothed data as bold lines
plt.plot(range(window_size - 1, n_future), target_smooth, "g", label='Target', linewidth=2)
plt.plot(range(window_size - 1, n_future), prediction_smooth, "r", label='Prediction', linewidth=2)
# Plot straight lines from start to end
plt.plot([0, n_future - 1], [target_inverted[0], target_inverted[-1]], 'g--')
plt.plot([0, n_future - 1], [prediction_inverted[0], prediction_inverted[-1]], 'r--')
# Set y-axis limits to accommodate both target and prediction
y_min = min(target_inverted.min(), prediction_inverted.min())
y_max = max(target_inverted.max(), prediction_inverted.max())
y_range = y_max - y_min
plt.ylim(y_min - 0.1 * y_range, y_max + 0.1 * y_range)
plt.title(col_name)
plt.legend()
plt.tight_layout()
time_date = time.strftime("%Y%m%d-%H%M%S")
image_path = os.path.join(model_save_path, f"{time_date}_predictions.png")
plt.savefig(image_path)
plt.close()
return image_path
# Lightning Wrapper
class LightningWrapper(pl.LightningModule):
def __init__(self, model, criterion, optimizer, scheduler, num_epochs: int, scaler_dict: dict, val_data: pd.DataFrame):
super().__init__()
self.model = model
self.criterion = criterion
self.optimizer = optimizer
self.scheduler = scheduler
self.num_epochs = num_epochs
self.scaler_dict = scaler_dict
self.val_data = val_data # For making predictions during logging
self.perc_cutoff = criterion.get_perc_cutoff()
self.reward = 0.0
def forward(self, src, tgt):
return self.model(src, tgt)
def training_step(self, batch, batch_idx):
batch_X, batch_y = batch
# Shift target sequence to the right and prepend zeros
tgt_input = torch.zeros_like(batch_y)
tgt_input[:, 1:, :] = batch_y[:, :-1, :]
y_pred = self.model(batch_X, tgt_input)
final_loss, mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward, perc_cutoff= self.criterion(y_pred, batch_y)
# Log all loss components
self.log('train/final_loss', final_loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=True)
self.log('train/mse_loss', mse_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('train/mae_loss', mae_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('train/perc_diff_loss', perc_diff_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('train/max_diff_loss', max_diff_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('train/imbalance_loss', imbalance_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('train/direction_loss', direction_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('train/reward', reward, on_step=False, on_epoch=True, sync_dist=True)
return final_loss
def validation_step(self, batch, batch_idx):
batch_X, batch_y = batch
tgt_input = torch.zeros_like(batch_y)
tgt_input[:, 1:, :] = batch_y[:, :-1, :]
y_pred = self.model(batch_X, tgt_input)
final_loss, mse_loss, mae_loss, perc_diff_loss, max_diff_loss, imbalance_loss, direction_loss, reward, perc_cutoff = self.criterion(y_pred, batch_y)
# Log all loss components
self.log('val_loss', final_loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=True)
self.log('val/mse_loss', mse_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('val/mae_loss', mae_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('val/perc_diff_loss', perc_diff_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('val/max_diff_loss', max_diff_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('val/imbalance_loss', imbalance_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('val/direction_loss', direction_loss, on_step=False, on_epoch=True, sync_dist=True)
self.log('val/reward', reward, on_step=False, on_epoch=True, sync_dist=True)
self.reward = reward
return final_loss
def configure_optimizers(self):
return {
'optimizer': self.optimizer,
'lr_scheduler': {
'scheduler': self.scheduler,
'monitor': 'val_loss'
}
}
def on_validation_epoch_end(self):
# Only the main process should perform logging
if self.global_rank == 0:
plot_path = self.generate_and_log_plots()
if plot_path:
# Read the image and log it to Wandb
img = Image.open(plot_path)
self.logger.experiment.log({
"Validation/Prediction_vs_Target": wandb.Image(img),
"global_step": self.global_step
})
# Optionally, remove the image file after logging
os.remove(plot_path)
# check if the reward is greater than 0.9 and if so reduce the percentage cutoff
if self.reward > 0.9:
self.criterion.set_perc_cutoff(self.criterion.get_perc_cutoff() * 0.8)
self.log('val/perc_cutoff', self.criterion.get_perc_cutoff(), on_step=False, on_epoch=True, sync_dist=True)
def generate_and_log_plots(self):
"""
Generate prediction vs target plots and save them to a temporary file.
Returns the path to the saved image.
"""
# Make predictions on a random sample from validation data
sample = get_random_sample(self.val_data)
input_data, target = sample
input_tensor = prepare_input(input_data, self.device)
tgt_input = torch.zeros_like(target).unsqueeze(0).to(self.device)
prediction = self.model(input_tensor, tgt_input)
_, target_np, prediction_np = convert_to_numpy(input_tensor, target, prediction)
# Prepare DataFrame for plotting
predicted_df = pd.DataFrame(prediction_np.squeeze(), columns=self.val_data.columns)
start_idx = random.randint(0, len(self.val_data) - cfg.N_FUTURE)
last_timestamp = self.val_data.index[start_idx + cfg.N_PAST - 1]
predicted_df.index = pd.date_range(start=last_timestamp, periods=cfg.N_FUTURE, freq='5min')
start_timestamp = last_timestamp - (3) * pd.Timedelta(minutes=5)
end_timestamp = last_timestamp + (3 + cfg.N_FUTURE) * pd.Timedelta(minutes=5)
filtered_df = self.val_data[(self.val_data.index >= start_timestamp) & (self.val_data.index <= end_timestamp)]
# Generate and save plot
image_path = visualize_predictions(target_np, prediction_np, cfg.N_FUTURE, self.scaler_dict, filtered_df, cfg.MODEL_SAVE_PATH)
return image_path
def on_train_epoch_end(self):
# Only the main process should perform logging
if self.global_rank == 0:
# Log weights and biases histograms
for name, param in self.named_parameters():
self.logger.experiment.log({
f"Weights/{name}": wandb.Histogram(param.detach().cpu().numpy()),
'epoch': self.current_epoch
})
# Log gradients if they exist
if param.grad is not None:
self.logger.experiment.log({
f"Gradients/{name}": wandb.Histogram(param.grad.detach().cpu().numpy()),
'epoch': self.current_epoch
})
# Log learning rate
optimizer = self.optimizers()
lr = optimizer.param_groups[0]['lr']
self.logger.experiment.log({'learning_rate': lr, 'epoch': self.current_epoch})
def on_after_backward(self):
# Only the main process should perform logging
if self.global_rank == 0:
total_norm = 0.0
for p in self.model.parameters():
if p.grad is not None:
param_norm = p.grad.detach().data.norm(2)
total_norm += param_norm.item() ** 2
total_norm = total_norm ** 0.5
self.logger.experiment.log({'Gradients/grad_total_norm': total_norm, 'step': self.global_step})
# Data Loading and Preprocessing
def load_and_preprocess_data(file_path: str, download_url: str = None):
"""
Load and preprocess data from a CSV file. If the file does not exist, download it.
Args:
file_path (str): Path to the CSV file.
download_url (str, optional): URL to download the CSV file. Defaults to None.
Returns:
pd.DataFrame: Preprocessed DataFrame.
dict: Dictionary of scalers used for each column.
"""
ic("Starting data loading and preprocessing...")
start_time = time.time()
# Check if the file exists
if not os.path.exists(file_path):
ic(f"File {file_path} does not exist.")
os.makedirs(os.path.dirname(file_path), exist_ok=True)
if download_url:
ic(f"Downloading file from {download_url}...")
try:
response = requests.get(download_url, stream=True)
response.raise_for_status()
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
ic(f"File downloaded and saved to {file_path}")
except requests.exceptions.RequestException as e:
ic(f"Failed to download the file: {e}")
raise
else:
ic("Download URL not provided. Cannot download the file.")
raise FileNotFoundError(f"The file {file_path} does not exist and no download URL was provided.")
# Load the DataFrame
df = pd.read_csv(file_path, parse_dates=['timestamp'])
df.set_index('timestamp', inplace=True)
df = df.tail(cfg.RECORDS_TO_LOAD)
# Preprocess the data
scalers = {}
for col in df.columns:
df[col] = np.log(df[col])
scalers[col] = MinMaxScaler()
df[col] = scalers[col].fit_transform(df[[col]])
ic(f"Data preprocessing completed in {time.time() - start_time:.2f} seconds")
ic(f"DataFrame shape: {df.shape}")
return df, scalers
# Main Execution Block
if __name__ == "__main__":
torch.set_float32_matmul_precision("medium")
# Load and preprocess data
df, scalers = load_and_preprocess_data(cfg.DATA_FILE, cfg.DATA_URL)
NUM_FEATURES = df.shape[1]
# Initialize the Wandb logger and name your Wandb project
logger = WandbLogger(project='my-awesome-project', log_model=True) # Set log_model to True
# Log hyperparameters to Wandb
logger.log_hyperparams({
"batch_size": cfg.BATCH_SIZE,
"hidden_size": cfg.HIDDEN_SIZE,
"num_layers": cfg.NUM_LAYERS,
"num_epochs": cfg.NUM_EPOCHS,
"learning_rate": 2e-4,
"weight_decay": 1e-5
})
# Split data into training and validation
train_size = int(0.8 * len(df))
train_data = df.iloc[:train_size]
val_data = df.iloc[train_size:]
# Create Datasets
train_dataset = CryptoDataset(train_data, cfg.N_PAST, cfg.N_FUTURE)
val_dataset = CryptoDataset(val_data, cfg.N_PAST, cfg.N_FUTURE)
train_loader = DataLoader(
train_dataset,
batch_size=cfg.BATCH_SIZE,
shuffle=False,
num_workers=cfg.NUM_WORKERS,
pin_memory=True
)
val_loader = DataLoader(
val_dataset,
batch_size=cfg.BATCH_SIZE,
shuffle=False,
num_workers=cfg.NUM_WORKERS,
pin_memory=True
)
# Initialize Transformer Model
model = CryptoTransformer(
input_size=NUM_FEATURES,
d_model=cfg.HIDDEN_SIZE,
nhead=8,
num_encoder_layers=cfg.NUM_LAYERS,
num_decoder_layers=cfg.NUM_LAYERS,
dim_feedforward=2048,
dropout=0.2,
activation="relu",
n_future=cfg.N_FUTURE,
num_outputs=NUM_FEATURES,
max_seq_length=cfg.N_PAST + cfg.N_FUTURE
).to(cfg.DEVICE)
# Initialize Loss Function
criterion = BalancedCryptoLoss(cfg)
# Initialize Optimizer and Scheduler
optimizer = optim.AdamW(model.parameters(), lr=2e-4, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.8, patience=5, min_lr=4e-6)
# Handle Hot Restart
if cfg.HOT_RESTART:
try:
model.load_state_dict(torch.load(cfg.MODEL_PATH), strict=False)
print(f"Loaded previously saved final model from {cfg.MODEL_PATH}")
except FileNotFoundError:
print(f"No previous final model found at {cfg.MODEL_PATH}. Starting with a fresh model.")
else:
print("Starting with a fresh model.")
if cfg.TRAIN_FIRST:
# Initialize Lightning Wrapper
wrapped_model = LightningWrapper(
model=model,
criterion=criterion,
optimizer=optimizer,
scheduler=scheduler,
num_epochs=cfg.NUM_EPOCHS,
scaler_dict=scalers,
val_data=val_data
)
# Update the EarlyStopping callback
early_stopping_callback = EarlyStopping(
monitor='val_loss', # Ensure this matches the logged metric
patience=20,
mode='min'
)
# Update the ModelCheckpoint callback
checkpoint_callback = ModelCheckpoint(
monitor='val_loss', # Ensure this matches the logged metric
dirpath=cfg.MODEL_SAVE_PATH,
filename='model-{epoch:02d}-{val_loss:.2f}',
save_top_k=3,
mode='min',
)
# Initialize Progress Bar Callback
progress_bar = RichProgressBar(refresh_rate=11) # Set your desired refresh rate
# Initialize Trainer with Wandb logger
trainer = pl.Trainer(
max_epochs=cfg.NUM_EPOCHS,
logger=logger, # Use Wandb logger here
accelerator='gpu',
devices=torch.cuda.device_count(),
strategy='ddp_find_unused_parameters_true', # Distributed Data Parallel
callbacks=[progress_bar, checkpoint_callback, early_stopping_callback],
enable_progress_bar=True,
log_every_n_steps=25,
# precision=16, # Optional: Use mixed precision for faster training
gradient_clip_val=10.0, # Optional: Gradient clipping
)
# Start Training
trainer.fit(wrapped_model, train_dataloaders=train_loader, val_dataloaders=val_loader)
else:
print("Skipping training as TRAIN_FIRST is set to False.")
wandb.finish()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment