from io import StringIO from linkedin_scraper.company import json from linkedin_scraper.person import os from pandas.io.parsers.readers import csv from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from bs4 import BeautifulSoup as bs import pandas as pd import time from datetime import datetime from dateutil.relativedelta import relativedelta from selenium import webdriver # Initialize Chrome options chrome_options = Options() today = datetime.today().strftime("%Y-%m-%d") SCROLL_PAUSE_TIME = 1.5 MAX_SCROLLS = False FILENAME = "./tracker-export-2024-10-28_Arpit_shortened.csv" COLUMN_NAME = "company-linkedin" # LinkedIn Credentials username = "aqoyoxuma@email1.io" password = "!@#$%^&*()" ## Setup export directory # Construct the base export directory path export_dir_base = os.path.join(os.getcwd(), "export") export_dir = "" # Check if the base export directory exists if not os.path.exists(export_dir_base): export_dir = export_dir_base else: # If the base export directory exists, find the next available index index = 0 while os.path.exists(f"{export_dir_base}{index}"): index += 1 export_dir = f"{export_dir_base}{index}" # Create the export directory os.makedirs(export_dir, exist_ok=True) # Initialize WebDriver for Chrome browser = webdriver.Chrome() # Helper functions for date and reaction conversions def get_past_date(days=0, weeks=0, months=0, years=0): date_format = "%Y-%m-%d" dtObj = datetime.strptime(today, date_format) past_date = dtObj - relativedelta( days=days, weeks=weeks, months=months, years=years ) past_date_str = past_date.strftime(date_format) return past_date_str def get_actual_date(date): today = datetime.today().strftime("%Y-%m-%d") current_year = datetime.today().strftime("%Y") past_date = date if "hour" in date: past_date = today elif "day" in date: date.split(" ")[0] past_date = get_past_date(days=int(date.split(" ")[0])) elif "week" in date: past_date = get_past_date(weeks=int(date.split(" ")[0])) elif "month" in date: past_date = get_past_date(months=int(date.split(" ")[0])) elif "year" in date: past_date = get_past_date(months=int(date.split(" ")[0])) else: split_date = date.split("-") if len(split_date) == 2: past_month = split_date[0] past_day = split_date[1] if len(past_month) < 2: past_month = "0" + past_month if len(past_day) < 2: past_day = "0" + past_day past_date = f"{current_year}-{past_month}-{past_day}" elif len(split_date) == 3: past_month = split_date[0] past_day = split_date[1] past_year = split_date[2] if len(past_month) < 2: past_month = "0" + past_month if len(past_day) < 2: past_day = "0" + past_day past_date = f"{past_year}-{past_month}-{past_day}" return past_date def convert_abbreviated_to_number(s): if "K" in s: return int(float(s.replace("K", "")) * 1000) elif "M" in s: return int(float(s.replace("M", "")) * 1000000) else: return int(s) # Functions to extract text from a container def get_text(container, selector, attributes): try: element = container.find(selector, attributes) if element: return element.text.strip() except Exception as e: print(e) return "" def get_aria_label(container, selector, attributes): try: element = container.find(selector, attributes) if element: return element.get("aria-label") else: return "NA" except Exception as e: print(e) return "" # Function to extract media information def get_media_info(container): media_info = [ ("div", {"class": "update-components-video"}, "Video"), ("div", {"class": "update-components-linkedin-video"}, "Video"), ("div", {"class": "update-components-image"}, "Image"), ("article", {"class": "update-components-article"}, "Article"), ("div", {"class": "feed-shared-external-video__meta"}, "Youtube Video"), ( "div", { "class": "feed-shared-mini-update-v2 feed-shared-update-v2__update-content-wrapper artdeco-card" }, "Shared Post", ), ( "div", {"class": "feed-shared-poll ember-view"}, "Other: Poll, Shared Post, etc", ), ] for selector, attrs, media_type in media_info: element = container.find(selector, attrs) if element: link = element.find("a", href=True) return link["href"] if link else "None", media_type return "None", "Unknown" def save_csv_file(data, file_name="file.csv"): """ Save a Pandas DataFrame to a CSV file in the ./export/ directory. If the export directory already exists, it will create a new directory with an incremented index (e.g., ./export0/, ./export1/, etc.). Parameters: data (pandas.DataFrame): The data to be saved to the CSV file. file_name (str, optional): The name of the CSV file to be saved. Defaults to 'file.csv'. """ # Construct the full file path csv_export_dir = os.path.join(export_dir, "csv") os.makedirs(csv_export_dir, exist_ok=True) file_path = os.path.join(csv_export_dir, file_name) # Save the DataFrame to a CSV file data.to_csv(file_path, index=False) print(f"CSV file saved at: {file_path}") # Navigate to the posts page of the company def scrape_page(page): post_page = page + "/posts" post_page = post_page.replace("//posts", "/posts") browser.get(post_page) # Extract company name from URL company_name = page.rstrip("/").split("/")[-1].replace("-", " ").title() print(company_name) # Set parameters for scrolling through the page last_height = browser.execute_script("return document.body.scrollHeight") scrolls = 0 no_change_count = 0 # Scroll through the page until no new content is loaded while True: # Expand all ...more texts buttons = browser.find_elements( By.CSS_SELECTOR, "button.feed-shared-inline-show-more-text__see-more-less-toggle", ) for button in buttons: if button.is_displayed(): try: webdriver.ActionChains(browser).move_to_element( button ).perform() # Scroll to the button button.click() ## [!NOTE] THIS CAN FAIL specially if there is a popup on the screen time.sleep(1) # Add a small delay after each click except Exception as e: print(f"Could not click on button: {e}") browser.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(SCROLL_PAUSE_TIME) new_height = browser.execute_script("return document.body.scrollHeight") no_change_count = no_change_count + 1 if new_height == last_height else 0 if no_change_count >= 3 or (MAX_SCROLLS and scrolls >= MAX_SCROLLS): break last_height = new_height scrolls += 1 # Parse the page source with BeautifulSoup company_page = browser.page_source linkedin_soup = bs(company_page.encode("utf-8"), "html.parser") # Save the parsed HTML to a file file_name = os.path.join(export_dir, "soup") os.makedirs(file_name, exist_ok=True) file_name = os.path.join(file_name, f"{company_name}_soup.txt") with open(file_name, "w+") as t: t.write(linkedin_soup.prettify()) # Extract post containers from the HTML containers = [ container for container in linkedin_soup.find_all( "div", {"class": "feed-shared-update-v2"} ) if "activity" in container.get("data-urn", "") ] # Define a data structure to hold all the post information posts_data = [] # Main loop to process each container index = 0 for container in containers: post_text = get_text( container, "div", {"class": "feed-shared-update-v2__description-wrapper"} ) post_date = get_aria_label( container, "a", {"class": "app-aware-link update-components-actor__sub-description-link"}, ) post_date = get_actual_date(post_date) # media_link, media_type = get_media_info(container) # # Reactions (likes) # reactions_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'reaction' in tag['aria-label'].lower()) # reactions_idx = 1 if len(reactions_element) > 1 else 0 # post_reactions = reactions_element[reactions_idx].text.strip() if reactions_element and reactions_element[reactions_idx].text.strip() != '' else 0 # # Comments # comment_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'comment' in tag['aria-label'].lower()) # comment_idx = 1 if len(comment_element) > 1 else 0 # post_comments = comment_element[comment_idx].text.strip() if comment_element and comment_element[comment_idx].text.strip() != '' else 0 # # Shares # shares_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'repost' in tag['aria-label'].lower()) # shares_idx = 1 if len(shares_element) > 1 else 0 # post_shares = shares_element[shares_idx].text.strip() if shares_element and shares_element[shares_idx].text.strip() != '' else 0 posts_data.append({ "index": index, "post_date": post_date, "post_text": post_text, # "media_link" : media_link, # "media_type" : media_type }) index += 1 try: final = json.dumps(posts_data, indent=2) try: df = pd.read_json(StringIO(final)) df.sort_values(by="post_date", inplace=True, ascending=False) csv_file = f"{company_name}_posts.csv" save_csv_file(df, csv_file) print(f"Data exported to {csv_file}") except Exception as e: print("error: ", e) except Exception as e: print("error: ", e) def get_linkedin_urls_from_csv(file_path): """ Read a CSV file and return a list of values from the 'linkedin_urls' column. Parameters: file_path (str): The path to the CSV file. Returns: list: A list of LinkedIn URLs from the 'linkedin_urls' column. """ linkedin_urls = [] with open(file_path, "r") as csv_file: reader = csv.DictReader(csv_file) for row in reader: linkedin_urls.append(row[COLUMN_NAME]) return linkedin_urls def init(): # Set LinkedIn page URL for scraping page = "https://www.linkedin.com/company/nike" # Open LinkedIn login page browser.get("https://www.linkedin.com/login") # Enter login credentials and submit elementID = browser.find_element(By.ID, "username") elementID.send_keys(username) elementID = browser.find_element(By.ID, "password") elementID.send_keys(password) elementID.submit() while True: print("Seens a verification prompt came plese complete human verification, Waiting 5 secs..") if(browser.current_url == "https://www.linkedin.com/feed/"): print("Completed verification") break time.sleep(5) pages = get_linkedin_urls_from_csv(FILENAME) for page in pages: scrape_page(page) if __name__ == "__main__": init()