Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to parse / read Outlook PST-Files with Python?

Searching the Internet for accessing Outlook PST-Files with Python gives very little results (and most of the stuff shown is outdated). Does anyone know how to read a PST with or without a library? Unfortunately I am not good enough in programming to build a PST-reader without the help of a library.

My target is to get the following information about the content:

  • Number of Items per Folder
  • Type of Item (Mail, Meeting, Contact...)
  • Size of Items
  • Attachments including size
  • maybe other Meta-Data like date, recipients etc. (optional)

I already tried the following things:

  1. libpff / pypff : crashes and seems to read whole file in memory before doing something (no good solution as the PST-files are kept on a slow network storage).

  2. Libratom : same problem as it is based on libpff.

  3. Libpst : unclear how this is used / comes as a binary (no explanation how to install) / see answer on this post / does not seems to be maintained or updated.

  4. win32 (mounting PST in Outlook) : one tutorial showed how to mount the PST into a locally installed Outlook and getting the contents with MAPI-access, but this is also very, very slow and not a good solution as Outlook is needed.

  5. Asponse Email Python : promising at the start though documentation is not very good (no Python examples / different naming e.g. for the PersonalStorage object and many others / stops after 50 items per folder (maybe a limit of the non-free version, but unclear due to lack of explanation on the publishers website).

This is an example from the Asponse-website:

personalStorage = PersonalStorage.from_file(dataDir + "Outlook.pst")

folderInfoCollection = personalStorage.root_folder.get_sub_folders()

for folderInfo in folderInfoCollection:

    print("Folder: " + folderInfo.display_name)
    print("Total Items: " + str(folderInfo.content_count))
    print("Total Unread Items: " + str(folderInfo.content_unread_count))
    print("----------------------")

I did heavy googling to find the fitting import-statement to make this run.

Does anyone have a stable clear approach for reading Outlook PST files? Even a solution using Asponse would be great exceeding the 50 items limit.

like image 341
Ulrich Avatar asked Mar 17 '26 05:03

Ulrich


2 Answers

Redemption (I am its author) can be another choice - it is a wrapper around Extended MAPI, so you would still need to have Outlook installed (for its MAPI system), but unlike Outlook Object Model, it can be used from a service and does not require starting outlook.exe and/or adding PST files to the user's default profile. You can use either RDOSession.LogonPstStore (it creates and deletes a temporary profile configured to use the specified PST file) and/or RDOSession.Stores.AddPstStore to the add a PST file to an existing session (e.g. used by Outlook or created by LogonPstStore).

like image 110
Dmitry Streblechenko Avatar answered Mar 20 '26 13:03

Dmitry Streblechenko


I have been working on this as well, and finally got to a working solution! Here is my code that worked on a 16GB pst file.

from libratom.lib.pff import PffArchive
import os
import json
import re
from tqdm import tqdm
from unidecode import unidecode
import logging
from collections import defaultdict

"""
    Needs a .json file email_list.json with the following format:
    blacklist - list of blacklisted sender emails
    whitelist - list of whitelisted sender emails
"""

# Configure logging
logging.basicConfig(
    filename='email_extraction.log',
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

logging.info("Logging is configured.")

LIMIT = 100000

# Load whitelist and blacklist from matt_list.yml
with open("email_list.json", "r") as file:
    lists = json.load(file)
    WHITELIST = lists.get("whitelist", [])
    BLACKLIST = lists.get("blacklist", [])

def is_good_email(message, sender_email):
    if sender_email in WHITELIST:
        return True
    if sender_email in BLACKLIST:
        return False
    return True

def extract_header_info(headers):
    # Extract the sender name and email address from the headers
    sender_re = re.search(r"From: (.+?) <(.+?)>", headers)
    if sender_re:
        sender_name = sender_re.group(1).strip('"')
        sender_email = sender_re.group(2)
    else:
        sender_name = "Unknown Sender"
        sender_email = "Unknown Email"

    # Extract the timestamp from the headers
    timestamp_re = re.search(r"Date: (.+)", headers)
    if timestamp_re:
        timestamp = timestamp_re.group(1)
    else:
        timestamp = "Unknown Timestamp"

    return sender_name, sender_email, timestamp

def clean_subject(subject):
    subject_ascii = unidecode(subject)
    clean_subject = re.sub(r'[\\/*?:"<>|]', "_", subject_ascii).strip().rstrip(". ")
    return clean_subject

def save_body(message, message_folder):
    # Extract the email body (plain text, HTML, or RTF)
    if message.plain_text_body:
        body = message.plain_text_body
        body_file = os.path.join(message_folder, "body.txt")
        with open(body_file, "w", encoding="utf-8") as f:
            f.write(body)
    elif message.html_body:
        body = message.html_body
        body_file = os.path.join(message_folder, "body.html")
        with open(body_file, "w", encoding="utf-8") as f:
            f.write(body)
    elif message.rtf_body:
        try:
            body = message.rtf_body
            body_file = os.path.join(message_folder, "body.rtf")
            # Decode RTF body from bytes to string
            with open(body_file, "wb") as msg_file:
                msg_file.write(body)
        except UnicodeEncodeError:
            # Handle encoding error by using a different encoding
            logging.error("Encoding error encountered while processing RTF body.")
            body = "Encoding error: Unable to extract body content."
            body_file = os.path.join(message_folder, "body.txt")
            with open(body_file, "w", encoding="utf-8") as f:
                f.write(body)
    else:
        logging.warning("No body content found")
        body = "No body content available"
        body_file = os.path.join(message_folder, "body.txt")
        with open(body_file, "w", encoding="utf-8") as f:
            f.write(body)

    return body_file

def save_attachments(message, message_folder):
    # Initialize a list to store attachment paths
    attachment_paths = []

    try:
        # Check if the message has attachments
        if message.attachments:
            for attachment in message.attachments:
                attachment_name = attachment.name or "Unnamed_attachment"
                attachment_path = os.path.join(message_folder, attachment_name)
                with open(attachment_path, "wb") as f:
                    f.write(attachment.read_buffer(attachment.get_size()))
                attachment_paths.append(attachment_path)
    except OSError as e:
        logging.error("Error saving attachment %s %s: %s", message.subject, attachment_name, e)

    return attachment_paths

def download_emails(pst_file_path, output_folder):
    """Extract and save the first 10 email bodies from the given .pst file."""
    # Open the .pst file using PffArchive from libratom
    with PffArchive(pst_file_path) as archive:
        # Initialize a counter to keep track of the number of processed emails
        email_count = 0
        name_counts = defaultdict(int)
        senders = set()
        email_list = []

        # Iterate through all folders in the .pst file
        for folder in archive.folders():
            if folder.name != "Inbox":
                continue
            # Loop through each message in the folder
            for index in tqdm(range(folder.get_number_of_sub_messages())):
                # Get the message using the index
                message = folder.get_sub_message(index)

                if email_count >= LIMIT:
                    break

                if message.subject and message.subject == "Your daily briefing":
                    continue # spooky stuff

                if not message.transport_headers:
                    logging.warning("No headers found for message %s", message.subject)
                    continue

                header_str = message.transport_headers.strip()
                sender_name, sender_email, timestamp = extract_header_info(header_str)

                # skip bad emails
                if not is_good_email(message, sender_email):
                    continue

                subject = message.subject or "(No Subject)"
                clean_subject_name = clean_subject(subject)

                # Check for duplicate subject names and append a number to the name
                if clean_subject_name in name_counts:
                    name_counts[clean_subject_name] += 1
                    clean_subject_name = f"{clean_subject_name}_{name_counts[clean_subject_name]}"
                else:
                    name_counts[clean_subject_name] = 1

                message_folder = os.path.join(output_folder, folder.name, clean_subject_name)
                try:
                    os.makedirs(message_folder, exist_ok=True)
                except OSError as e:
                    logging.error("Error creating folder %s: subject %s clean %s", message_folder, subject, clean_subject_name)
                    continue

                body_file = save_body(message, message_folder)
                attachment_paths = save_attachments(message, message_folder)

                # Add attachment paths to the email dictionary
                senders.add(sender_email)
                email_list.append({
                    "subject": subject,
                    "sender_name": sender_name,
                    "sender_email": sender_email,
                    "body": body_file,
                    "timestamp": timestamp,
                    "attachments": attachment_paths
                })

                email_count += 1

    print("SENDERS", len(senders))
    print("POST FILTER EMAIL COUNT", len(email_list))

    with open("emails.json", "w", encoding="utf-8") as json_file:
        json.dump(email_list, json_file, indent=4)

def clean_workspace(output_folder):
    if os.path.exists(output_folder):
        for root, dirs, files in os.walk(output_folder, topdown=False):
            for name in files:
                os.remove(os.path.join(root, name))
            for name in dirs:
                os.rmdir(os.path.join(root, name))
    else:
        os.makedirs(output_folder, exist_ok=True)

def main():
    # Replace with your .pst file path
    pst_file_path = 'backup.pst'
    output_folder = "./email_data"
    clean_workspace(output_folder)

    download_emails(pst_file_path, output_folder)

if __name__ == "__main__":
    main()

like image 29
Antonio Ferris Avatar answered Mar 20 '26 11:03

Antonio Ferris



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!