from datetime import datetime, timedelta import concurrent.futures import csv import html import os import time from bs4 import BeautifulSoup from dotenv import load_dotenv import nltk import openai import pandas as pd from sec_api import ExtractorApi, QueryApi import tiktoken # Load environment variables for sensitive data and configuration load_dotenv() # Global configurations for tickers, API keys, and output settings # Create a .env file with your sec and openai API keys # SEC_API_KEY="..." # OPENAI_API_KEY="sk-..." # You can get your free api key here https://sec-api.io/signup/free SEC_API_KEY = os.getenv("SEC_API_KEY") # Your OpenAI API key OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # Add more tickers here TICKERS = ["AAPL"] FILING_URLS_FILE = "filing_urls.csv" OUTPUT_BASE_DIR = datetime.now().strftime('%Y%m%d_%H%M%S') os.makedirs(OUTPUT_BASE_DIR, exist_ok=True) # Initialize SEC and OpenAI API clients queryApi = QueryApi(api_key=SEC_API_KEY) extractorApi = ExtractorApi(api_key=SEC_API_KEY) openai.api_key = OPENAI_API_KEY encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") def get_date_range(): """ Returns a date range of one week from today's date. """ end_date = datetime.now().strftime('%Y-%m-%d') # Іelect dataframe here start_date = (datetime.now() - timedelta(days=90)).strftime('%Y-%m-%d') return start_date, end_date def fetch_filing_urls(start_date, end_date): """ Fetches filing URLs from the SEC API for the specified tickers within the given date range. """ base_query = { "query": { "query_string": { "query": "PLACEHOLDER", "time_zone": "America/New_York" } }, "from": "0", "size": "200", "sort": [{"filedAt": {"order": "desc"}}] } with open(FILING_URLS_FILE, "w", newline='') as log_file: writer = csv.writer(log_file) writer.writerow(['Company', 'Ticker', 'Filed Date', 'Report Year', 'Report Type', 'URL', 'Report Date']) for ticker in TICKERS: print(f"Starting download for ticker {ticker}") universe_query = ( f'formType:(\"10-K\" OR \"10-Q\") AND ' f'filedAt:[{start_date} TO {end_date}] AND ' f'ticker:{ticker}' ) base_query["query"]["query_string"]["query"] = universe_query for from_batch in range(0, 9800, 200): base_query["from"] = str(from_batch) response = queryApi.get_filings(base_query) if len(response["filings"]) == 0: break rows = [ [ x['companyName'], ticker, x['filedAt'], int(x['filedAt'][:4]), x['formType'], x["linkToFilingDetails"], x['filedAt'] ] for x in response["filings"] ] writer.writerows(rows) print(f"Filing URLs downloaded for {ticker}") def mark_tables_in_html(html_content): """ Marks tables in the provided HTML content for easier processing later. """ soup = BeautifulSoup(html_content, 'html.parser') for table in soup.find_all('table'): table_str = '##TABLE_START\n' + str(table) + '\n##TABLE_END' table.replace_with(BeautifulSoup(table_str, 'html.parser')) return soup.get_text() def split_text(input_text, token_limit=6000): """ Splits the text into sections ensuring that each section is below the specified token limit so ChatGPT can process it. """ sections = [] current_section = "" current_count = 0 table_flag = False sentences = nltk.sent_tokenize(input_text) for sentence in sentences: tokens = nltk.word_tokenize(sentence) if '##TABLE_START' in tokens: table_flag = True elif '##TABLE_END' in tokens: table_flag = False token_count = len(encoding.encode(sentence)) if current_count + token_count <= token_limit or table_flag: current_section += sentence + " " current_count += token_count else: sections.append(current_section.strip()) current_section = sentence + " " current_count = token_count if not table_flag and current_count + len(encoding.encode(current_section)) > token_limit: sections.append(current_section.strip()) current_section = "" current_count = 0 if current_section: sections.append(current_section.strip()) return sections def process_report(row): """ Extracts the Management Discussion & Analysis section from a 10-K or 10-Q report and processes it. """ report_type = row['Report Type'] filing_url = row['URL'] if report_type == "10-K": section_text = extractorApi.get_section(filing_url, '7', 'html') elif report_type == "10-Q": section_text = extractorApi.get_section(filing_url, 'part1item2', 'html') else: print(f"Unknown report type: {report_type} for company: {row['Company']}, year: {row['Report Year']}") return marked_text = mark_tables_in_html(section_text) decoded_text = html.unescape(marked_text) sections = split_text(decoded_text) for section in sections: with open(os.path.join(OUTPUT_BASE_DIR, 'all_reports.csv'), 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([row['Company'], row['Report Year'], report_type, row['Report Date'], section]) def summarize_row(row, index): """ Summarizes a row from the extracted report using gpt-3.5-16k """ while True: try: # Use the OpenAI API to summarize the text response = openai.ChatCompletion.create( model="gpt-3.5-turbo-16k", messages = [ { "role": "system", "content": "You are an assistant." }, { "role": "user", "content": ( f'This is a table/page from a Management Discussion & Analysis section ' f'of {row["Report Year"]} {row["Report Type"]} report from {row["Company"]} published. ' 'Using only data provided below please write a short and structured executive summary, ' f'use numbers and relative metrics.\n\n Table/page:\n"{row["Section"]}"' ) } ] ) # Extract the assistant's reply summarized_text = response['choices'][0]['message']['content'] return index, summarized_text except Exception as e: print(f"An error occurred: {e}. Retrying...") time.sleep(5) def generate_summaries_gpt35(): """ Uses gpt-3.5-16k to generate summaries for all the reports in 4 parallel streams. """ input_file = os.path.join(OUTPUT_BASE_DIR, 'all_reports.csv') df = pd.read_csv(input_file) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(summarize_row, row, index) for index, row in df.iterrows()] for future in concurrent.futures.as_completed(futures): index, summarized_text = future.result() # Add the summarized text to the original dataframe df.loc[index, 'Summarized'] = summarized_text # Save the dataframe with the summarized text to a new csv file after each summary output_file = os.path.join(OUTPUT_BASE_DIR, os.path.basename(input_file).split('.')[0] + '_gpt35_summary.csv') df.sort_index().to_csv(output_file, quoting=csv.QUOTE_NONNUMERIC, index=False) print(f"Total lines processed: {len(df[df['Summarized'].notnull()])}") def create_3_tweets(row, index): """ Uses gpt-4 to generate 3 tweets per summary. """ while True: try: # Use the OpenAI API to write funny tweets response = openai.ChatCompletion.create( model="gpt-4", messages = [ { "role": "system", "content": "You are one of the best comedians in the world writing hilarious jokes about the stocks." }, { "role": "user", "content": ( f'Write 3 funny and sarcastic tweets about {row["Company"]} ' f'performance based on the summary of their {row["Report Type"]} ' f'financial report for {row["Report Year"]} below. ' 'Make sure to use numbers and metrics, be insightful. ' 'Try to be really creative, mix satire, sarcasm, unexpectedness, ' 'exaggeration, provocation and risk to create the top jokes:' f'\n"{row["Summarized"]}"' ) } ] ) summarized_text = response['choices'][0]['message']['content'] return index, summarized_text except Exception as e: print(f"An error occurred: {e}. Retrying...") time.sleep(5) def generate_tweets_gpt4(): """ Uses gpt-4 to generate tweets for all the summaries in 2 parallel streams. """ # Adjust this path to match where the summarized reports are stored input_file = os.path.join(OUTPUT_BASE_DIR, 'all_reports_gpt35_summary.csv') df = pd.read_csv(input_file, encoding='utf-8') with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(create_3_tweets, row, index) for index, row in df.iterrows()] for future in concurrent.futures.as_completed(futures): index, tweet_text = future.result() df.loc[index, 'Tweets'] = tweet_text output_file = input_file.split('.')[0] + '_gpt4_tweets.csv' df.sort_index().to_csv(output_file, quoting=csv.QUOTE_NONNUMERIC, index=False, encoding='utf-8') print(f"Total lines processed: {len(df[df['Tweets'].notnull()])}") def main(): # Downloading the required dataset for sentence tokenization nltk.download('punkt') # Fetch the date range and filing URLs start_date, end_date = get_date_range() fetch_filing_urls(start_date, end_date) # Initialize an empty DataFrame to store the filings filings_df = pd.read_csv(FILING_URLS_FILE) # Initialize the CSV file to store all reports with open(os.path.join(OUTPUT_BASE_DIR, 'all_reports.csv'), 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['Company', 'Report Year', 'Report Type', 'Report Date', 'Section']) # Process each report directly for _, row in filings_df.iterrows(): process_report(row) # Summarize the reports generate_summaries_gpt35() # Create funny tweets generate_tweets_gpt4() if __name__ == '__main__': main()