Skip to content

Instantly share code, notes, and snippets.

@javiermolinar
Last active March 6, 2023 14:20
Show Gist options
  • Save javiermolinar/e2ab76c20d5459a1d4e0172ae921d20e to your computer and use it in GitHub Desktop.
Save javiermolinar/e2ab76c20d5459a1d4e0172ae921d20e to your computer and use it in GitHub Desktop.
Create and download exports from Google Vault
from __future__ import print_function
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.oauth2.credentials import Credentials
from google.cloud import storage
from zipfile import ZipFile
import os
import pathlib
# If modifying these scopes, delete the file token.json.
import mailbox
import bs4
def get_html_text(html):
try:
return bs4.BeautifulSoup(html, "lxml").body.get_text(" ", strip=True)
except AttributeError: # message contents empty
return None
class GmailMboxMessage:
def __init__(self, email_data):
if not isinstance(email_data, mailbox.mboxMessage):
raise TypeError("Variable must be type mailbox.mboxMessage")
self.email_data = email_data
def parse_email(self):
return {
"email_labels": self.email_data["X-Gmail-Labels"],
"email_date": self.email_data["Date"],
"email_from": self.email_data["From"],
"email_to": self.email_data["To"],
"email_subject": self.email_data["Subject"],
"email_text": self.read_email_payload(),
}
def read_email_payload(self):
email_payload = self.email_data.get_payload()
if self.email_data.is_multipart():
email_messages = list(self._get_email_messages(email_payload))
else:
email_messages = [email_payload]
return [self._read_email_text(msg) for msg in email_messages]
def _get_email_messages(self, email_payload):
for msg in email_payload:
if isinstance(msg, (list, tuple)):
for submsg in self._get_email_messages(msg):
yield submsg
elif msg.is_multipart():
for submsg in self._get_email_messages(msg.get_payload()):
yield submsg
else:
yield msg
def _read_email_text(self, msg):
content_type = "NA" if isinstance(msg, str) else msg.get_content_type()
encoding = (
"NA" if isinstance(msg, str) else msg.get("Content-Transfer-Encoding", "NA")
)
if "text/plain" in content_type and "base64" not in encoding:
msg_text = msg.get_payload()
elif "text/html" in content_type and "base64" not in encoding:
msg_text = get_html_text(msg.get_payload())
elif content_type == "NA":
msg_text = get_html_text(msg)
else:
msg_text = None
return (content_type, encoding, msg_text)
def main():
"""Shows basic usage of the Vault API.
Prints the names and IDs of the first 10 matters in Vault.
"""
creds = None
try:
creds = Credentials(
token=""
)
service = build("vault", "v1", credentials=creds)
# Call the Vault API
# Create a matter
matter = service.matters().create(body={"name": "test_program"}).execute()
# Create an export
export = (
service.matters()
.exports()
.create(
matterId=matter["matterId"],
body={
"name": "test:export",
"query": {
"corpus": "MAIL",
"dataScope": "ALL_DATA",
"searchMethod": "ACCOUNT",
"terms": "label:^deleted",
"startTime": "2023-01-01T00:00:00Z",
"endTime": "2023-01-11T00:00:00Z",
"accountInfo": {"emails": ["[email protected]"]},
"mailOptions": {},
"timeZone": "Europe/Madrid",
"method": "ACCOUNT",
},
},
)
.execute()
)
status = "IN_PROGRESS"
while status == "IN_PROGRESS":
export = (
service.matters()
.exports()
.get(matterId=matter["matterId"], exportId=export["id"])
.execute()
)
status = export["status"]
# Download file
storage_client = storage.Client(credentials=creds)
bucket = storage_client.bucket(
bucket_name=export["cloudStorageSink"]["files"][0]["bucketName"]
)
for file in export["cloudStorageSink"]["files"]:
name = file["objectName"]
blob = bucket.get_blob(file["objectName"])
blob.download_to_filename(name.split("/")[-1])
if "zip" in name:
with ZipFile(name.split("/")[-1]) as z:
z.extractall()
path = pathlib.Path().resolve()
for file in os.listdir(path):
# List files with .py
if file.endswith(".mbox"):
mbox_obj = mailbox.mbox(f"{path}/{file}")
for email_obj in mbox_obj:
email_data = GmailMboxMessage(email_obj)
email = email_data.parse_email()
print(email)
except HttpError as err:
print(err)
if __name__ == "__main__":
main()
@javiermolinar
Copy link
Author

Pip freeze

beautifulsoup4==4.11.1
bs4==0.0.1
cachetools==5.3.0
certifi==2022.12.7
charset-normalizer==3.0.1
google-api-core==2.11.0
google-api-python-client==2.73.0
google-auth==2.16.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.8.0
google-cloud-core==2.3.2
google-cloud-storage==2.7.0
google-crc32c==1.5.0
google-resumable-media==2.4.1
googleapis-common-protos==1.58.0
httplib2==0.21.0
idna==3.4
oauthlib==3.2.2
protobuf==4.21.12
pyasn1==0.4.8
pyasn1-modules==0.2.8
pyparsing==3.0.9
requests==2.28.2
requests-oauthlib==1.3.1
rsa==4.9
six==1.16.0
soupsieve==2.3.2.post1
uritemplate==4.1.1
urllib3==1.26.14

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment