import os import json import time import asyncio import subprocess from modal import App, Image, Secret, Volume, build, enter, exit, gpu, method # We first set out configuration variables for our script. ## Embedding Containers Configuration # GPU_CONCURRENCY = 100 MODEL_ID = "nomic-ai/nomic-embed-text-v1.5" MODEL_SLUG = MODEL_ID.split("/")[-1] MODEL_DIR = "/model" MODEL_REVISION="main" GPU_CONCURRENCY = 100 # GPU_CONFIG = gpu.A100(size="80GB") GPU_CONFIG = gpu.A100(size="40GB") # GPU_CONFIG = gpu.H100() # BATCH_SIZE = 512 BATCH_SIZE = 64 # BATCH_SIZE = 128 # MAX_TOKENS = 8192 MAX_TOKENS = 2048 ## Dataset-Specific Configuration DATASET_READ_VOLUME = Volume.from_name( "embedding-fineweb-edu", create_if_missing=True ) EMBEDDING_CHECKPOINT_VOLUME = Volume.from_name( "checkpoint", create_if_missing=True ) DATASET_DIR = "/data" # DATASET_SAVE ="fineweb-edu-sample-10BT" DATASET_SAVE ="fineweb-edu-sample-10BT-100k" CHECKPOINT_DIR = "/checkpoint" SAVE_TO_DISK = True ## Upload-Specific Configuration # DATASET_HF_UPLOAD_REPO_NAME = "enjalot/fineweb-edu-sample-10BT" DATASET_HF_UPLOAD_REPO_NAME = f"enjalot/{DATASET_SAVE}" UPLOAD_TO_HF = True def download_model_to_image(model_dir, model_name, model_revision): from huggingface_hub import snapshot_download from transformers.utils import move_cache os.makedirs(model_dir, exist_ok=True) snapshot_download( repo_id=model_name, revision=model_revision, local_dir=model_dir, ignore_patterns=["*.pt", "*.bin"], # Using safetensors ) move_cache() st_image = ( Image.debian_slim(python_version="3.10") .pip_install( "torch==2.1.2", "numpy==1.26.3", "transformers==4.39.3", "hf-transfer==0.1.6", "huggingface_hub==0.22.2", "einops==0.7.0" ) .env({"HF_HUB_ENABLE_HF_TRANSFER": "1"}) .run_function( download_model_to_image, timeout=60 * 20, kwargs={ "model_dir": MODEL_DIR, "model_name": MODEL_ID, "model_revision": MODEL_REVISION, }, secrets=[Secret.from_name("huggingface-secret")], ) ) with st_image.imports(): import numpy as np import torch from torch.cuda.amp import autocast from transformers import AutoTokenizer, AutoModel app = App( "fineweb-embeddings-st" ) @app.cls( gpu=GPU_CONFIG, # cpu=16, concurrency_limit=GPU_CONCURRENCY, timeout=60 * 10, container_idle_timeout=60 * 10, allow_concurrent_inputs=1, image=st_image, ) class TransformerModel: @enter() def start_engine(self): # import torch # from transformers import AutoTokenizer, AutoModel self.device = torch.device("cuda") print("🥶 cold starting inference") start = time.monotonic_ns() self.model = AutoModel.from_pretrained(MODEL_ID, trust_remote_code=True, safe_serialization=True)#, rotary_scaling_factor=2 ) self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", model_max_length=MAX_TOKENS) self.model.to(self.device) self.model.eval() print(f"CUDA memory allocated: {torch.cuda.memory_allocated() / 1e6} MB") duration_s = (time.monotonic_ns() - start) / 1e9 print(f"🏎️ engine started in {duration_s:.0f}s") @method() def embed(self, inputs): # import numpy as np # import torch tok = self.tokenizer # TODO: better understanding of how this gets called print("inputs", len(inputs)) start = time.monotonic_ns() texts = [x[1] for x in inputs] texts = [t if len(t) <= 8000 else tok.decode(tok.encode(t)[:MAX_TOKENS]) for t in texts] print("truncated in", (time.monotonic_ns() - start) / 1e9) print("texts", len(texts)) # print(f"CUDA memory allocated before encoding: {torch.cuda.memory_allocated() / 1e6} MB") start = time.monotonic_ns() encoded_input = tok(texts, padding=True, truncation=True, return_tensors='pt') print("encoded in", (time.monotonic_ns() - start) / 1e9) start = time.monotonic_ns() # print("moving to device") encoded_input = {key: value.to(self.device) for key, value in encoded_input.items()} # print("moved to device", (time.monotonic_ns() - start) / 1e9) # print("encoded input size", encoded_input['input_ids'].nelement() * encoded_input['input_ids'].element_size() / 1e6, "MB") # print(f"CUDA memory allocated after encoding: {torch.cuda.memory_allocated() / 1e6} MB") start = time.monotonic_ns() # print(torch.cuda.memory_summary(device=None, abbreviated=False)) with torch.no_grad(), autocast(): print(f"CUDA memory allocated before embedding: {torch.cuda.memory_allocated() / 1e6} MB") model_output = self.model(**encoded_input) print(f"CUDA memory allocated after model output: {torch.cuda.memory_allocated() / 1e6} MB") # print(f"model output size: {model_output.nelement() * model_output.element_size() / 1e6} MB") embeddings = model_output[0][:, 0] # print(f"Embedding size: {embeddings.nelement() * embeddings.element_size() / 1e6} MB") # print(f"CUDA memory allocated after embedding: {torch.cuda.memory_allocated() / 1e6} MB") normalized_embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) normalized_embeddings_cpu = normalized_embeddings.cpu().numpy() # Clean up torch memory del encoded_input del model_output del embeddings del normalized_embeddings torch.cuda.empty_cache() duration_s = (time.monotonic_ns() - start) / 1e9 print(f"embedding took {duration_s:.0f}s") return inputs, normalized_embeddings_cpu def generate_chunks_from_dataset(xs, max_tokens: int): """ Generate chunks from a dataset. Args: xs (list): The dataset containing dictionaries with "id", "text" keys. chunk_size (int): The size of each chunk. Yields: tuple: A tuple containing the id and a chunk of text. """ for data in xs: yield (data["id"], "clustering: " + data["text"]) def generate_batches(xs, batch_size): batch = [] for x in xs: batch.append(x) if len(batch) == batch_size: yield batch batch = [] if batch: yield batch def load_dataset_from_disk(): """ Load a dataset from disk and return a subset of the training data. Returns: Dataset: A subset of the training data. """ import time from datasets import load_from_disk start = time.perf_counter() # Load the dataset as a Hugging Face dataset print(f"Loading dataset from {DATASET_DIR}/{DATASET_SAVE}") dataset = load_from_disk(f"{DATASET_DIR}/{DATASET_SAVE}") print(f"Dataset loaded in {time.perf_counter()-start:.2f} seconds") # return dataset["train"] # TODO: have the 100k subset be proper subset return dataset#["train"] def save_dataset_to_intermediate_checkpoint(acc_chunks, embeddings, batch_size): """Saves the dataset to an intermediate checkpoint. Args: acc_chunks (list): Accumulated chunks embeddings (list): Accumulated embeddings batch_size (int): Batch size """ import pyarrow as pa from datasets import Dataset table = pa.Table.from_arrays( [ pa.array([chunk[0] for chunk in acc_chunks]), # id pa.array([chunk[1] for chunk in acc_chunks]), # text pa.array(embeddings), ], names=["id", "text", "embedding"], ) path_parent_folder = f"{CHECKPOINT_DIR}/{DATASET_SAVE}/{MODEL_SLUG}-{batch_size}" dataset = Dataset(table) dataset.save_to_disk(path_parent_folder) EMBEDDING_CHECKPOINT_VOLUME.commit() print(f"Saved checkpoint at {path_parent_folder}") def upload_result_to_hf(batch_size: int) -> None: """ Uploads the result to the Hugging Face Hub. Args: batch_size (int): The batch size for the model. Returns: None """ import os import time from huggingface_hub import HfApi path_parent_folder = f"{CHECKPOINT_DIR}/{DATASET_SAVE}/{MODEL_SLUG}-{batch_size}" api = HfApi(token=os.environ["HUGGINGFACE_TOKEN"]) api.create_repo( repo_id=DATASET_HF_UPLOAD_REPO_NAME, private=False, repo_type="dataset", exist_ok=True, ) print(f"Pushing to hub {DATASET_HF_UPLOAD_REPO_NAME}") start = time.perf_counter() api.upload_folder( folder_path=path_parent_folder, repo_id=DATASET_HF_UPLOAD_REPO_NAME, repo_type="dataset", multi_commits=True, multi_commits_verbose=True, ) end = time.perf_counter() print(f"Uploaded in {end-start}s") @app.function( # cpu=1 image=Image.debian_slim().pip_install( "datasets", "pyarrow", "hf_transfer", "huggingface_hub", "transformers" ), volumes={ DATASET_DIR: DATASET_READ_VOLUME, CHECKPOINT_DIR: EMBEDDING_CHECKPOINT_VOLUME, }, timeout=86400, secrets=[Secret.from_name("huggingface-secret")], ) def embed_dataset(batch_size: int = 512 * 50): """ Embeds a dataset with the Text Embeddings Inference container. Args: batch_size (int): The batch size to use. Defaults to 512 * 50. Returns: dict: A dictionary containing the benchmark results. """ import datetime import time if UPLOAD_TO_HF and not SAVE_TO_DISK: raise ValueError( "Uploading to HF requires SAVE_TO_DISK to be set to true in case of intermediate failure." ) data = load_dataset_from_disk() model = TransformerModel() start = time.perf_counter() print("generating chunks") text_chunks = generate_chunks_from_dataset(data, max_tokens=MAX_TOKENS) print("generated chunks", time.perf_counter() - start) start = time.perf_counter() print("generating batches") batches = generate_batches(text_chunks, batch_size=batch_size) print("generated batches", time.perf_counter() - start) start = time.perf_counter() acc_chunks = [] embeddings = [] print("BATCHES", len(data) / batch_size) i = 0 for resp in model.embed.map( batches, order_outputs=False, return_exceptions=True ): if isinstance(resp, Exception): print(f"Exception: {resp}") # continue return batch_chunks, batch_embeddings = resp acc_chunks.extend(batch_chunks) embeddings.extend(batch_embeddings) print("done with batch", i) i+=1 end = time.perf_counter() duration = end - start resp = { "batch_size": batch_size, "n_gpu": GPU_CONCURRENCY, "duration_mins": duration / 60, } if SAVE_TO_DISK: save_dataset_to_intermediate_checkpoint( acc_chunks, embeddings, batch_size ) if UPLOAD_TO_HF: upload_result_to_hf(batch_size) return resp @app.local_entrypoint() def full_job(): batch_size = BATCH_SIZE with open("benchmarks.json", "a") as f: benchmark = embed_dataset.remote(batch_size=batch_size) f.write(json.dumps(benchmark, indent=2) + "\n")