from __future__ import print_function from googleapiclient.discovery import build from googleapiclient.errors import HttpError from google.oauth2.credentials import Credentials from google.cloud import storage from zipfile import ZipFile import os import pathlib # If modifying these scopes, delete the file token.json. import mailbox import bs4 def get_html_text(html): try: return bs4.BeautifulSoup(html, "lxml").body.get_text(" ", strip=True) except AttributeError: # message contents empty return None class GmailMboxMessage: def __init__(self, email_data): if not isinstance(email_data, mailbox.mboxMessage): raise TypeError("Variable must be type mailbox.mboxMessage") self.email_data = email_data def parse_email(self): return { "email_labels": self.email_data["X-Gmail-Labels"], "email_date": self.email_data["Date"], "email_from": self.email_data["From"], "email_to": self.email_data["To"], "email_subject": self.email_data["Subject"], "email_text": self.read_email_payload(), } def read_email_payload(self): email_payload = self.email_data.get_payload() if self.email_data.is_multipart(): email_messages = list(self._get_email_messages(email_payload)) else: email_messages = [email_payload] return [self._read_email_text(msg) for msg in email_messages] def _get_email_messages(self, email_payload): for msg in email_payload: if isinstance(msg, (list, tuple)): for submsg in self._get_email_messages(msg): yield submsg elif msg.is_multipart(): for submsg in self._get_email_messages(msg.get_payload()): yield submsg else: yield msg def _read_email_text(self, msg): content_type = "NA" if isinstance(msg, str) else msg.get_content_type() encoding = ( "NA" if isinstance(msg, str) else msg.get("Content-Transfer-Encoding", "NA") ) if "text/plain" in content_type and "base64" not in encoding: msg_text = msg.get_payload() elif "text/html" in content_type and "base64" not in encoding: msg_text = get_html_text(msg.get_payload()) elif content_type == "NA": msg_text = get_html_text(msg) else: msg_text = None return (content_type, encoding, msg_text) def main(): """Shows basic usage of the Vault API. Prints the names and IDs of the first 10 matters in Vault. """ creds = None try: creds = Credentials( token="" ) service = build("vault", "v1", credentials=creds) # Call the Vault API # Create a matter matter = service.matters().create(body={"name": "test_program"}).execute() # Create an export export = ( service.matters() .exports() .create( matterId=matter["matterId"], body={ "name": "test:export", "query": { "corpus": "MAIL", "dataScope": "ALL_DATA", "searchMethod": "ACCOUNT", "terms": "label:^deleted", "startTime": "2023-01-01T00:00:00Z", "endTime": "2023-01-11T00:00:00Z", "accountInfo": {"emails": ["antonio@onnatesting.com"]}, "mailOptions": {}, "timeZone": "Europe/Madrid", "method": "ACCOUNT", }, }, ) .execute() ) status = "IN_PROGRESS" while status == "IN_PROGRESS": export = ( service.matters() .exports() .get(matterId=matter["matterId"], exportId=export["id"]) .execute() ) status = export["status"] # Download file storage_client = storage.Client(credentials=creds) bucket = storage_client.bucket( bucket_name=export["cloudStorageSink"]["files"][0]["bucketName"] ) for file in export["cloudStorageSink"]["files"]: name = file["objectName"] blob = bucket.get_blob(file["objectName"]) blob.download_to_filename(name.split("/")[-1]) if "zip" in name: with ZipFile(name.split("/")[-1]) as z: z.extractall() path = pathlib.Path().resolve() for file in os.listdir(path): # List files with .py if file.endswith(".mbox"): mbox_obj = mailbox.mbox(f"{path}/{file}") for email_obj in mbox_obj: email_data = GmailMboxMessage(email_obj) email = email_data.parse_email() print(email) except HttpError as err: print(err) if __name__ == "__main__": main()