Last active
March 6, 2023 14:20
-
-
Save javiermolinar/e2ab76c20d5459a1d4e0172ae921d20e to your computer and use it in GitHub Desktop.
Create and download exports from Google Vault
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import print_function | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| from google.oauth2.credentials import Credentials | |
| from google.cloud import storage | |
| from zipfile import ZipFile | |
| import os | |
| import pathlib | |
| # If modifying these scopes, delete the file token.json. | |
| import mailbox | |
| import bs4 | |
| def get_html_text(html): | |
| try: | |
| return bs4.BeautifulSoup(html, "lxml").body.get_text(" ", strip=True) | |
| except AttributeError: # message contents empty | |
| return None | |
| class GmailMboxMessage: | |
| def __init__(self, email_data): | |
| if not isinstance(email_data, mailbox.mboxMessage): | |
| raise TypeError("Variable must be type mailbox.mboxMessage") | |
| self.email_data = email_data | |
| def parse_email(self): | |
| return { | |
| "email_labels": self.email_data["X-Gmail-Labels"], | |
| "email_date": self.email_data["Date"], | |
| "email_from": self.email_data["From"], | |
| "email_to": self.email_data["To"], | |
| "email_subject": self.email_data["Subject"], | |
| "email_text": self.read_email_payload(), | |
| } | |
| def read_email_payload(self): | |
| email_payload = self.email_data.get_payload() | |
| if self.email_data.is_multipart(): | |
| email_messages = list(self._get_email_messages(email_payload)) | |
| else: | |
| email_messages = [email_payload] | |
| return [self._read_email_text(msg) for msg in email_messages] | |
| def _get_email_messages(self, email_payload): | |
| for msg in email_payload: | |
| if isinstance(msg, (list, tuple)): | |
| for submsg in self._get_email_messages(msg): | |
| yield submsg | |
| elif msg.is_multipart(): | |
| for submsg in self._get_email_messages(msg.get_payload()): | |
| yield submsg | |
| else: | |
| yield msg | |
| def _read_email_text(self, msg): | |
| content_type = "NA" if isinstance(msg, str) else msg.get_content_type() | |
| encoding = ( | |
| "NA" if isinstance(msg, str) else msg.get("Content-Transfer-Encoding", "NA") | |
| ) | |
| if "text/plain" in content_type and "base64" not in encoding: | |
| msg_text = msg.get_payload() | |
| elif "text/html" in content_type and "base64" not in encoding: | |
| msg_text = get_html_text(msg.get_payload()) | |
| elif content_type == "NA": | |
| msg_text = get_html_text(msg) | |
| else: | |
| msg_text = None | |
| return (content_type, encoding, msg_text) | |
| def main(): | |
| """Shows basic usage of the Vault API. | |
| Prints the names and IDs of the first 10 matters in Vault. | |
| """ | |
| creds = None | |
| try: | |
| creds = Credentials( | |
| token="" | |
| ) | |
| service = build("vault", "v1", credentials=creds) | |
| # Call the Vault API | |
| # Create a matter | |
| matter = service.matters().create(body={"name": "test_program"}).execute() | |
| # Create an export | |
| export = ( | |
| service.matters() | |
| .exports() | |
| .create( | |
| matterId=matter["matterId"], | |
| body={ | |
| "name": "test:export", | |
| "query": { | |
| "corpus": "MAIL", | |
| "dataScope": "ALL_DATA", | |
| "searchMethod": "ACCOUNT", | |
| "terms": "label:^deleted", | |
| "startTime": "2023-01-01T00:00:00Z", | |
| "endTime": "2023-01-11T00:00:00Z", | |
| "accountInfo": {"emails": ["[email protected]"]}, | |
| "mailOptions": {}, | |
| "timeZone": "Europe/Madrid", | |
| "method": "ACCOUNT", | |
| }, | |
| }, | |
| ) | |
| .execute() | |
| ) | |
| status = "IN_PROGRESS" | |
| while status == "IN_PROGRESS": | |
| export = ( | |
| service.matters() | |
| .exports() | |
| .get(matterId=matter["matterId"], exportId=export["id"]) | |
| .execute() | |
| ) | |
| status = export["status"] | |
| # Download file | |
| storage_client = storage.Client(credentials=creds) | |
| bucket = storage_client.bucket( | |
| bucket_name=export["cloudStorageSink"]["files"][0]["bucketName"] | |
| ) | |
| for file in export["cloudStorageSink"]["files"]: | |
| name = file["objectName"] | |
| blob = bucket.get_blob(file["objectName"]) | |
| blob.download_to_filename(name.split("/")[-1]) | |
| if "zip" in name: | |
| with ZipFile(name.split("/")[-1]) as z: | |
| z.extractall() | |
| path = pathlib.Path().resolve() | |
| for file in os.listdir(path): | |
| # List files with .py | |
| if file.endswith(".mbox"): | |
| mbox_obj = mailbox.mbox(f"{path}/{file}") | |
| for email_obj in mbox_obj: | |
| email_data = GmailMboxMessage(email_obj) | |
| email = email_data.parse_email() | |
| print(email) | |
| except HttpError as err: | |
| print(err) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Pip freeze