Searching the Internet for accessing Outlook PST-Files with Python gives very little results (and most of the stuff shown is outdated). Does anyone know how to read a PST with or without a library? Unfortunately I am not good enough in programming to build a PST-reader without the help of a library.
My target is to get the following information about the content:
I already tried the following things:
libpff / pypff : crashes and seems to read whole file in memory before doing something (no good solution as the PST-files are kept on a slow network storage).
Libratom : same problem as it is based on libpff.
Libpst : unclear how this is used / comes as a binary (no explanation how to install) / see answer on this post / does not seems to be maintained or updated.
win32 (mounting PST in Outlook) : one tutorial showed how to mount the PST into a locally installed Outlook and getting the contents with MAPI-access, but this is also very, very slow and not a good solution as Outlook is needed.
Asponse Email Python : promising at the start though documentation is not very good (no Python examples / different naming e.g. for the PersonalStorage object and many others / stops after 50 items per folder (maybe a limit of the non-free version, but unclear due to lack of explanation on the publishers website).
This is an example from the Asponse-website:
personalStorage = PersonalStorage.from_file(dataDir + "Outlook.pst")
folderInfoCollection = personalStorage.root_folder.get_sub_folders()
for folderInfo in folderInfoCollection:
print("Folder: " + folderInfo.display_name)
print("Total Items: " + str(folderInfo.content_count))
print("Total Unread Items: " + str(folderInfo.content_unread_count))
print("----------------------")
I did heavy googling to find the fitting import-statement to make this run.
Does anyone have a stable clear approach for reading Outlook PST files? Even a solution using Asponse would be great exceeding the 50 items limit.
Redemption (I am its author) can be another choice - it is a wrapper around Extended MAPI, so you would still need to have Outlook installed (for its MAPI system), but unlike Outlook Object Model, it can be used from a service and does not require starting outlook.exe and/or adding PST files to the user's default profile. You can use either RDOSession.LogonPstStore (it creates and deletes a temporary profile configured to use the specified PST file) and/or RDOSession.Stores.AddPstStore to the add a PST file to an existing session (e.g. used by Outlook or created by LogonPstStore).
I have been working on this as well, and finally got to a working solution! Here is my code that worked on a 16GB pst file.
from libratom.lib.pff import PffArchive
import os
import json
import re
from tqdm import tqdm
from unidecode import unidecode
import logging
from collections import defaultdict
"""
Needs a .json file email_list.json with the following format:
blacklist - list of blacklisted sender emails
whitelist - list of whitelisted sender emails
"""
# Configure logging
logging.basicConfig(
filename='email_extraction.log',
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.info("Logging is configured.")
LIMIT = 100000
# Load whitelist and blacklist from matt_list.yml
with open("email_list.json", "r") as file:
lists = json.load(file)
WHITELIST = lists.get("whitelist", [])
BLACKLIST = lists.get("blacklist", [])
def is_good_email(message, sender_email):
if sender_email in WHITELIST:
return True
if sender_email in BLACKLIST:
return False
return True
def extract_header_info(headers):
# Extract the sender name and email address from the headers
sender_re = re.search(r"From: (.+?) <(.+?)>", headers)
if sender_re:
sender_name = sender_re.group(1).strip('"')
sender_email = sender_re.group(2)
else:
sender_name = "Unknown Sender"
sender_email = "Unknown Email"
# Extract the timestamp from the headers
timestamp_re = re.search(r"Date: (.+)", headers)
if timestamp_re:
timestamp = timestamp_re.group(1)
else:
timestamp = "Unknown Timestamp"
return sender_name, sender_email, timestamp
def clean_subject(subject):
subject_ascii = unidecode(subject)
clean_subject = re.sub(r'[\\/*?:"<>|]', "_", subject_ascii).strip().rstrip(". ")
return clean_subject
def save_body(message, message_folder):
# Extract the email body (plain text, HTML, or RTF)
if message.plain_text_body:
body = message.plain_text_body
body_file = os.path.join(message_folder, "body.txt")
with open(body_file, "w", encoding="utf-8") as f:
f.write(body)
elif message.html_body:
body = message.html_body
body_file = os.path.join(message_folder, "body.html")
with open(body_file, "w", encoding="utf-8") as f:
f.write(body)
elif message.rtf_body:
try:
body = message.rtf_body
body_file = os.path.join(message_folder, "body.rtf")
# Decode RTF body from bytes to string
with open(body_file, "wb") as msg_file:
msg_file.write(body)
except UnicodeEncodeError:
# Handle encoding error by using a different encoding
logging.error("Encoding error encountered while processing RTF body.")
body = "Encoding error: Unable to extract body content."
body_file = os.path.join(message_folder, "body.txt")
with open(body_file, "w", encoding="utf-8") as f:
f.write(body)
else:
logging.warning("No body content found")
body = "No body content available"
body_file = os.path.join(message_folder, "body.txt")
with open(body_file, "w", encoding="utf-8") as f:
f.write(body)
return body_file
def save_attachments(message, message_folder):
# Initialize a list to store attachment paths
attachment_paths = []
try:
# Check if the message has attachments
if message.attachments:
for attachment in message.attachments:
attachment_name = attachment.name or "Unnamed_attachment"
attachment_path = os.path.join(message_folder, attachment_name)
with open(attachment_path, "wb") as f:
f.write(attachment.read_buffer(attachment.get_size()))
attachment_paths.append(attachment_path)
except OSError as e:
logging.error("Error saving attachment %s %s: %s", message.subject, attachment_name, e)
return attachment_paths
def download_emails(pst_file_path, output_folder):
"""Extract and save the first 10 email bodies from the given .pst file."""
# Open the .pst file using PffArchive from libratom
with PffArchive(pst_file_path) as archive:
# Initialize a counter to keep track of the number of processed emails
email_count = 0
name_counts = defaultdict(int)
senders = set()
email_list = []
# Iterate through all folders in the .pst file
for folder in archive.folders():
if folder.name != "Inbox":
continue
# Loop through each message in the folder
for index in tqdm(range(folder.get_number_of_sub_messages())):
# Get the message using the index
message = folder.get_sub_message(index)
if email_count >= LIMIT:
break
if message.subject and message.subject == "Your daily briefing":
continue # spooky stuff
if not message.transport_headers:
logging.warning("No headers found for message %s", message.subject)
continue
header_str = message.transport_headers.strip()
sender_name, sender_email, timestamp = extract_header_info(header_str)
# skip bad emails
if not is_good_email(message, sender_email):
continue
subject = message.subject or "(No Subject)"
clean_subject_name = clean_subject(subject)
# Check for duplicate subject names and append a number to the name
if clean_subject_name in name_counts:
name_counts[clean_subject_name] += 1
clean_subject_name = f"{clean_subject_name}_{name_counts[clean_subject_name]}"
else:
name_counts[clean_subject_name] = 1
message_folder = os.path.join(output_folder, folder.name, clean_subject_name)
try:
os.makedirs(message_folder, exist_ok=True)
except OSError as e:
logging.error("Error creating folder %s: subject %s clean %s", message_folder, subject, clean_subject_name)
continue
body_file = save_body(message, message_folder)
attachment_paths = save_attachments(message, message_folder)
# Add attachment paths to the email dictionary
senders.add(sender_email)
email_list.append({
"subject": subject,
"sender_name": sender_name,
"sender_email": sender_email,
"body": body_file,
"timestamp": timestamp,
"attachments": attachment_paths
})
email_count += 1
print("SENDERS", len(senders))
print("POST FILTER EMAIL COUNT", len(email_list))
with open("emails.json", "w", encoding="utf-8") as json_file:
json.dump(email_list, json_file, indent=4)
def clean_workspace(output_folder):
if os.path.exists(output_folder):
for root, dirs, files in os.walk(output_folder, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
else:
os.makedirs(output_folder, exist_ok=True)
def main():
# Replace with your .pst file path
pst_file_path = 'backup.pst'
output_folder = "./email_data"
clean_workspace(output_folder)
download_emails(pst_file_path, output_folder)
if __name__ == "__main__":
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With