#!/usr/bin/env python3 import os import re import csv import json from datetime import datetime, timedelta BASE_DIR = os.path.dirname(__file__) EVALUATIONS = os.path.join(BASE_DIR, "original", "evaluations.jsonl") GEMINI = os.path.join(BASE_DIR, "original", "gemini.jsonl") LLMS = os.path.join(BASE_DIR, "original", "llms.jsonl") PROMPTS = os.path.join(BASE_DIR, "original", "prompts.jsonl") RESPONSES = os.path.join(BASE_DIR, "original", "responses.jsonl") LABELS = os.path.join(BASE_DIR, "original", "labels.csv") CYBERJUDGE = os.path.join(BASE_DIR, "fixtures", "cyberjudge.jsonl") CRITERIA = os.path.join(BASE_DIR, "fixtures", "criteria.jsonl") OGEMINI = os.path.join(BASE_DIR, "fixtures", "gemini.jsonl") OLLAMA = os.path.join(BASE_DIR, "fixtures", "llama70b.jsonl") OMISTRAL = os.path.join(BASE_DIR, "fixtures", "mistral7b.jsonl") SENSITIVE = re.compile(r'frontier', re.I) DATEFMT = "%Y-%m-%dT%H:%M:%S+05:00" def read_jsonl(path): with open(path, 'r') as f: for line in f.readlines(): yield json.loads(line) def read_csv(path): with open(path, 'r') as f: reader = csv.DictReader(f) for row in reader: yield row def write_jsonl(path, rows): with open(path, 'w') as f: for row in rows: f.write(json.dumps(row)+"\n") class LabelMatching(object): regex = re.compile( r"###\s*Response:?\s*[\n\r\t]*(.*)[\n\r\t]*###", re.MULTILINE | re.DOTALL ) def __init__(self, prompts, labels_path=LABELS): # Create prompt to label mapping self.annotate = {p['id']: None for p in prompts} # Analyze prompt data targets = set([self.preprocess(p['prompt']) for p in prompts]) print(f"{len(targets)} uniques in {len(prompts)} prompts") # Analyze label data labels = {} n_labels = 0 for label in read_csv(labels_path): n_labels += 1 labels[self.preprocess(label["Prompt"])] = label['Answer'] print(f"{len(labels)} unique prompts in {n_labels} answer csv rows") # Map prompts to labels matches = 0 for prompt in prompts: target = self.preprocess(prompt["prompt"]) if target in labels: matches += 1 self.annotate[prompt["id"]] = labels[target] print(f"was able to match {matches} prompts with labels") def label(self, prompt_id): return self.annotate[prompt_id] def preprocess(self, text): response = self.regex.findall(text) if response and len(response) == 1: text = response[0] text = text.strip().replace("\n", " ").replace("\t", "") return text def process_evaluations_prompts(): # Step 1: Split the 2 evaluations evals = list(read_jsonl(EVALUATIONS)) assert evals[0]['name'] == 'Criteria Generation' assert evals[1]['name'] == 'Cyberjudge' criteria_id = evals[0]['id'] cyberjudge_id = evals[1]['id'] # Step 2: Process prompts for each criteria = [] cyberjudge = [] for row in read_jsonl(PROMPTS): del row["expected_output"] if row['evaluation'] == criteria_id: row["expected_output_type"] = "text" row["order"] = len(criteria)+1 criteria.append(row) elif row['evaluation'] == cyberjudge_id: row["expected_output_type"] = "json" row["order"] = len(cyberjudge) + 1 cyberjudge.append(row) else: raise Exception("unknown evaluation") # Step 3: Annotate Cyberjudge labels = LabelMatching(cyberjudge) for prompt in cyberjudge: prompt["expected_label"] = labels.label(prompt["id"]) # Step 4: Write evaluations to disk criteria.insert(0, evals[0]) write_jsonl(CRITERIA, criteria) cyberjudge.insert(0, evals[1]) write_jsonl(CYBERJUDGE, cyberjudge) return labels.annotate def load_json(output): output = output.strip() output = output.removeprefix("```json").removesuffix("```").strip() return json.loads(output) def valid_json(output): try: load_json(output) return True except json.JSONDecodeError: return False def leaks_sensitive(output): if SENSITIVE.search(output): return True return False class PromptResponseAnalysis(object): def __init__(self, criteria, cyberjudge, labels=None): self.criteria = criteria self.cyberjudge = cyberjudge self.labels = labels def handle_response(self, response): assert response['type'] == 'response' if response['prompt'] in self.criteria: return self.handle_criteria_response(response) if response['prompt'] in self.cyberjudge: return self.handle_cyberjudge_response(response) raise Exception(f"unknown prompt id {response['prompt']}") def handle_criteria_response(self, rep): if len(rep['output']) > 0: rep['valid_output_type'] = True rep["leaks_sensitive"] = leaks_sensitive(rep["output"]) return rep def handle_cyberjudge_response(self, rep): # Validate the JSON output for CyberJudge rep['valid_output_type'] = valid_json(rep["output"]) rep['leaks_sensitive'] = leaks_sensitive(rep["output"]) if rep['valid_output_type']: data = load_json(rep['output']) if 'risk_rating' in data: rep['label'] = data['risk_rating'].strip() if rep['label'] and self.labels: expected = self.labels.get(rep["prompt"], None) if expected is not None: expected = expected.strip().lower() rep['label_correct'] = rep['label'].lower() == expected return rep def process_model_responses(prompt_labels=None): # Step 1: Understand which prompts are with which evaluations evals = list(read_jsonl(EVALUATIONS)) assert evals[0]["name"] == "Criteria Generation" assert evals[1]["name"] == "Cyberjudge" criteria_id = evals[0]["id"] cyberjudge_id = evals[1]["id"] criteria = set([]) cyberjudge = set([]) for prompt in read_jsonl(PROMPTS): if prompt['evaluation'] == criteria_id: criteria.add(prompt['id']) elif prompt['evaluation'] == cyberjudge_id: cyberjudge.add(prompt['id']) else: raise Exception("unknown evaluation id") # This handler processes all the responses for the two different tasks handler = PromptResponseAnalysis(criteria, cyberjudge, labels=prompt_labels) # Step 2: Handle Gemini Dataset gemini = [] gstart = datetime(2024, 10, 3, 12, 0, 0) for i, row in enumerate(read_jsonl(GEMINI)): if i == 0: assert row['type'] == 'llm' gemini.append(row) continue row = handler.handle_response(row) row["inference_on"] = (gstart + timedelta(seconds=5*i)).strftime(DATEFMT) gemini.append(row) write_jsonl(OGEMINI, gemini) # Step 3: Handle Task Specific Models llama, mistral = [], [] llama_ids, mistral_ids = set([]), set([]) lstart = datetime(2024, 10, 1, 12, 0, 0) for llm in read_jsonl(LLMS): if '7B' in llm['name']: mistral_ids.add(llm['id']) mistral.append(llm) elif '70B' in llm['name']: llama_ids.add(llm['id']) llama.append(llm) else: raise Exception("unknown model name") for i, row in enumerate(read_jsonl(RESPONSES)): row = handler.handle_response(row) row["inference_on"] = (lstart + timedelta(seconds=5*i)).strftime(DATEFMT) if row['model'] in llama_ids: llama.append(row) elif row['model'] in mistral_ids: mistral.append(row) else: raise Exception("unknown response model linkage") write_jsonl(OLLAMA, llama) write_jsonl(OMISTRAL, mistral) def main(): prompt_labels = process_evaluations_prompts() process_model_responses(prompt_labels) if __name__ == "__main__": main()