{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# import stuffs" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import json\n", "import os\n", "import re\n", "import time\n", "import datetime\n", "import random\n", "import demoji\n", "import numpy as np\n", "import pandas as pd\n", "import undetected_chromedriver as uc\n", "\n", "from datetime import date\n", "from glob import glob\n", "from pathlib import Path\n", "from selenium.webdriver import ActionChains\n", "from selenium.webdriver.common.by import By\n", "from selenium.webdriver.common.keys import Keys\n", "from selenium.webdriver.support import expected_conditions as EC\n", "from selenium.webdriver.support.ui import WebDriverWait\n", "from selenium.common.exceptions import TimeoutException\n", "from tqdm import tqdm" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**CSS classes updated on 23/11/2023**" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "THREE_DOTS = \"#radix-\\:r1p\\:\"\n", "DELETE_BTN = \"#radix-\\:r1q\\: > div.flex.gap-2.m-1\\.5.rounded.px-5.py-2\\.5.text-sm.cursor-pointer.focus\\:ring-0.hover\\:bg-black\\/5.dark\\:hover\\:bg-white\\/5.radix-disabled\\:pointer-events-none.radix-disabled\\:opacity-50.group.text-red-500\"\n", "CONFIRM_DELETE_BTN = \".btn-danger\"\n", "RESPONSE_AREA = \".text-token-text-primary .p\\-4.gizmo\\:py\\-2\"\n", "HOME_MODAL_CONFIRM_BTN = \"#radix\\-\\:rh\\: button.btn.relative.btn-primary\"\n", "\n", "JSON_PATTERN = r\"\\[\\n*\\s*{(?:\\n*.*\\n)*\\]\"\n", "\n", "REACH_REQUEST_LIMIT_MESSAGE = [\n", " \"!\\nYou've reached our limit of messages per hour. Please try again later.\",\n", " \"Too many requests in 1 hour. Try again later.\",\n", " \"!\\nChatGPT\\nYou've reached our limit of messages per hour. Please try again later.\"\n", "]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cwd = Path.cwd()\n", "save_path = cwd / \"data/chatgpt\"\n", "\n", "# Seed data is used for few shots prompting.\n", "seed_instruction_data = json.load(open(cwd / \"data/train.json\", \"r\"))\n", "num_prompt_instructions = 4 # Number of shots." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def init_browser(headless=True):\n", " driver = uc.Chrome(\n", " headless=headless,\n", " use_subprocess=False,\n", " version_main=119)\n", " return driver\n", " \n", "def select_element_if_clickable(driver, css_class, t=5):\n", " element = WebDriverWait(driver, t).until(\n", " EC.element_to_be_clickable((By.CSS_SELECTOR, css_class))\n", " )\n", " return element\n", " \n", "def go_to_home(br):\n", " br.get(chatgpt_url)\n", " time.sleep(5)\n", " try:\n", " br.find_element(By.CSS_SELECTOR, HOME_MODAL_CONFIRM_BTN).click()\n", " except:\n", " pass\n", "\n", "def enter_prompt(br, prompt):\n", " prompt_input = br.find_element(By.CSS_SELECTOR, \"#prompt-textarea\")\n", " prompt_input.clear()\n", " js_send_keys(br, prompt_input, prompt)\n", " time.sleep(1)\n", " prompt_input.send_keys(Keys.BACK_SPACE)\n", " prompt_input.send_keys(Keys.ENTER)\n", " time.sleep(15)\n", "\n", "\n", "def js_send_keys(br, el, text):\n", " JS_ADD_TEXT_TO_INPUT = \"\"\"\n", " var elm = arguments[0], txt = arguments[1];\n", " elm.value += txt;\n", " elm.dispatchEvent(new Event('change'));\n", " \"\"\"\n", " if text == \"\":\n", " print(\"found null in text\")\n", " br.execute_script(JS_ADD_TEXT_TO_INPUT, el, text)\n", "\n", "\n", "def parse_response(br, response):\n", " task_types = []\n", " instructions = []\n", " inputs_ = []\n", " outputs = []\n", " for line in response.split(\"\\n\"):\n", " line = line.strip()\n", " if \"Task type\" in line:\n", " task_types.append(line.replace(\"Task type: \", \"\"))\n", " elif \"Instruction\" in line:\n", " instructions.append(line.replace(\"Instruction: \", \"\"))\n", " elif \"Input\" in line:\n", " inputs_.append(line.replace(\"Input: \", \"\"))\n", " elif \"Output\" in line:\n", " outputs.append(line.replace(\"Output: \", \"\"))\n", " m_l = min(len(task_types), len(instructions), len(inputs_), len(outputs))\n", " return task_types[:m_l], instructions[:m_l], inputs_[:m_l], outputs[:m_l]\n", "\n", "def is_prompt_ok(br):\n", " try:\n", " content = br.find_element(\n", " By.CSS_SELECTOR,\n", " 'div.flex.flex-grow.flex-col.gap-3.max-w-full > div[data-message-author-role=\"user\"]',\n", " ).text\n", " except:\n", " return False\n", " if 'Give me the next set of diverse task instructions in Vietnamese' in content:\n", " return False\n", " return len(content) > 1\n", "\n", "def wait_when_limit_reached(br, t=60 * 60, navigate_home=True):\n", " time.sleep(t)\n", "\n", " if navigate_home:\n", " go_to_home(br)\n", "\n", "def delete_current_chat(br):\n", " # Delete last conversation.\n", " three_dot = select_element_if_clickable(br, THREE_DOTS, 5)\n", " three_dot.click()\n", " time.sleep(1)\n", " \n", " delete_button = select_element_if_clickable(br, DELETE_BTN, 20)\n", " delete_button.click()\n", " # br.find_element(By.CSS_SELECTOR, DELETE_BTN).click()\n", " time.sleep(1)\n", " confirm_delete_button = select_element_if_clickable(br, CONFIRM_DELETE_BTN, 5)\n", " confirm_delete_button.click()\n", " # br.find_element(By.CSS_SELECTOR, CONFIRM_DELETE_BTN).click()\n", " time.sleep(1)\n", "\n", "def response_to_instructions(br, response):\n", " \"\"\"Parse instruction from response. You should create your own function.\"\"\"\n", " instructions = []\n", " for task in re.split(r'\\n\\n*[0-9]+\\s*\\n*', response):\n", " res = re.split('(Question|Choices|Answer|Explanation):\\s+', task)\n", " if len(res) != 9:\n", " continue\n", " # task_type = res[2].strip()\n", " task_question = res[2].strip()\n", " task_choices = res[4].strip().split('\\n')\n", " task_explaination = res[6].strip()\n", " task_answer = res[8].strip()\n", " instructions.append({\"question\": task_question, \"choices\": task_choices, \"explanation\": task_explaination, \"answer\": task_answer})\n", " return instructions\n", " \n", "def prompt_with_fewshot(prompt_instructions, prompt_path=cwd / \"data/prompt/math_alpaca_v2.txt\"):\n", " prompt = open(prompt_path).read() + \"\\n\"\n", " \n", " for idx, task_dict in enumerate(prompt_instructions):\n", " # id = task_dict['id']\n", " question = task_dict['question']\n", " choices = task_dict['choices']\n", " explaination = task_dict.get('explanation', '')\n", " answer = task_dict['answer']\n", " \n", " prompt += f'### {idx + 1}\\n'\n", " prompt += f'Question: {question}\\n'\n", " prompt += f'Choices: '\n", " for choice in choices:\n", " prompt += choice + '\\n'\n", " prompt += f'Explanation: {explaination}\\n'\n", " prompt += f'Answer: {answer}\\n'\n", " \n", " prompt += '###'\n", " return prompt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# main" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true, "tags": [] }, "outputs": [], "source": [ "try:\n", " br\n", " br.quit()\n", "except:\n", " pass\n", "\n", "chatgpt_url = \"https://chat.openai.com\"\n", "br = init_browser(headless=False)\n", "br.get(chatgpt_url)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Notice:** after this, you should manually log-in or write a script for that (because I didn't xD)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "all_data = []\n", "\n", "try:\n", " reach_limit = False\n", " for _ in range(1000):\n", " new_chat = True\n", " last_response = None\n", "\n", " # Repeat n=5 times in the same conversation.\n", " # This helps decrease duplication but also decrease diversity.\n", " for _ in range(5):\n", " if new_chat:\n", " # seed_number = random.randint(1, 123456)\n", " prompt_fewshot_examples = random.sample(seed_instruction_data['data'], num_prompt_instructions)\n", " prompt = prompt_with_fewshot(prompt_fewshot_examples)\n", " \n", " enter_prompt(br, prompt)\n", " new_chat = False\n", " else:\n", " enter_prompt(br, \"Increase the difficulty a little bit and generate 10 more questions. Make sure they are feasible for elementary students.\")\n", " if not is_prompt_ok(br):\n", " break\n", " response = br.find_elements(By.CSS_SELECTOR, RESPONSE_AREA)[-1].text\n", " \n", " # Reach request limit per hour.\n", " # Let's wait for 60 minutes.\n", " if response in REACH_REQUEST_LIMIT_MESSAGE:\n", " print(response)\n", " reach_limit = True\n", " wait_when_limit_reached(\n", " br, t=30 * 60, navigate_home=False\n", " )\n", " break\n", " elif response.startswith(\"I apologize\") or response == last_response or 'network error' in response:\n", " go_to_home(br)\n", " new_chat = True\n", " else:\n", " instructions = response_to_instructions(br, response)\n", " all_data.extend(instructions)\n", " last_response = response\n", " try:\n", " delete_current_chat(br)\n", " except:\n", " go_to_home(br)\n", " print('Got', len(all_data), '.')\n", "except Exception as e:\n", " raise e\n", "finally:\n", " if len(all_data) > 0:\n", " # Unique save file name.\n", " uid = datetime.datetime.now().strftime(\"%Y%m%d-%H%M%S\")\n", " with open(save_path / \"raw\" / f\"{uid}.json\", \"w\") as f:\n", " json.dump(all_data, f, ensure_ascii=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "python310", "language": "python", "name": "python310" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.11" } }, "nbformat": 4, "nbformat_minor": 4 }