Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pickle dump with progress bar

I've a really big json object that I want to dump into a pickle file. Is there a way to display a progress bar while using pickle.dump?

like image 782
Sidd Avatar asked Jun 03 '15 05:06

Sidd


3 Answers

You can monitor the progress of the file as it's being read (whether or not pickle reads the entire file in advance of decoding it is another question).

class TQDMBytesReader(object):

    def __init__(self, fd, **kwargs):
        self.fd = fd
        from tqdm import tqdm
        self.tqdm = tqdm(**kwargs)

    def read(self, size=-1):
        bytes = self.fd.read(size)
        self.tqdm.update(len(bytes))
        return bytes

    def readline(self):
        bytes = self.fd.readline()
        self.tqdm.update(len(bytes))
        return bytes

    def __enter__(self):
        self.tqdm.__enter__()
        return self

    def __exit__(self, *args, **kwargs):
        return self.tqdm.__exit__(*args, **kwargs)

Sample usage:

def test_tqdm_reader():
    from pickle import Unpickler
    with open("/tmp/list.pkl", "rb") as fd:
        total = os.path.getsize("/tmp/list.pkl")
        with TQDMBytesReader(fd, total=total) as pbfd:
            up = Unpickler(pbfd)
            obj = up.load()
        print(f"Loaded {str(obj)}")
like image 128
technomage Avatar answered Oct 21 '22 04:10

technomage


The only way that I know of is to define getstate/setstate methods to return "sub objects" which can refresh the GUI when the get pickled/unpickled. For example, if your object is a list, you could use something like this:

import pickle

class SubList:
    on_pickling = None

    def __init__(self, sublist):
        print('SubList', sublist)
        self.data = sublist

    def __getstate__(self):
        if SubList.on_pickling is not None:
            print('SubList pickle state fetch: calling sub callback')
            SubList.on_pickling()
        return self.data

    def __setstate__(self, obj):
        if SubList.on_pickling is not None:
            print('SubList pickle state restore: calling sub callback')
            SubList.on_pickling()
        self.data = obj


class ListSubPickler:
    def __init__(self, data: list):
        self.data = data

    def __getstate__(self):
        print('creating SubLists for pickling long list')
        num_chunks = 10
        span = int(len(self.data) / num_chunks)
        SubLists = [SubList(self.data[i:(i + span)]) for i in range(0, len(self.data), span)]
        return SubLists

    def __setstate__(self, subpickles):
        self.data = []
        print('restoring Pickleable(list)')
        for subpickle in subpickles:
            self.data.extend(subpickle.data)
        print('final', self.data)


def refresh():
    # do something: refresh GUI (for example, qApp.processEvents() for Qt), show progress, etc
    print('refreshed')

If you run the following in that script,

data = list(range(100))  # your large data object
list_pickler = ListSubPickler(data)
SubList.on_pickling = refresh

print('\ndumping pickle of', list_pickler)
pickled = pickle.dumps(list_pickler)

print('\nloading from pickle')
new_list_pickler = pickle.loads(pickled)
assert new_list_pickler.data == data

print('\nloading from pickle, without on_pickling')
SubList.on_pickling = None
new_list_pickler = pickle.loads(pickled)
assert new_list_pickler.data == data

You will see that the refresh callback gets called 10 times. So if you have 2GB list to dump, and it takes 1 minute to dump, so you'd want roughly 60*10 = 600 GUI refreshes, you would then set your number of chunks to be 600.

Code is easily modified for a dict, numpy array, etc.

like image 2
Oliver Avatar answered Oct 21 '22 03:10

Oliver


So I found a solution that I find quite satisfying, although it is not 100% accurate, but so far, I found this to be completely unnoticable.

The basic idea of my approach is pretty simple: While pickle is dumping or loading a file, I compare the file size to the size of the data that is being dumped / loaded. The implementation however is a little bit tricky.

In order to continuosly check the file size you need threading (as far as I am aware that is). I used PyQt5's QThread since my application is running with PyQt5 already, but there are probably also other ways to do this.

You need two workers running in two corresponding threads. The first worker handles the dumping / loading of the file whereas the other checks the file size. Something like this:

import os
import time
import pickle
import numpy as np
import psutil

from PyQt5.QtWidgets import (
    QDialog, QProgressBar, QVBoxLayout, QWidget, QPushButton, QApplication,
    QTextEdit
)
from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread
from PyQt5.QtGui import QTextCursor

from rememberme import memory


class Worker(QObject):
    status_update = pyqtSignal(str)
    progress_update = pyqtSignal(float)
    finished = pyqtSignal(object)

    def __init__(self, file, data, process):
        super().__init__()
        self._isRunning = True
        self._success = False
        self.return_value = False
        self.file = file
        self.data = data
        self.process = process

    def run(self):
        self.status_update.emit(f'\n{self.process.title()}ing file...')
        if self.process == 'sav':
            with open(self.file, 'wb') as output_file:
                pickle.dump(self.data, output_file, pickle.HIGHEST_PROTOCOL)
        else:
            with open(self.file, 'rb') as input_file:
                self.return_value = pickle.load(input_file)

        self.status_update.emit('done.')
        self.progress_update.emit(100)
        self._success = True
        self.finished.emit(self.return_value)

    def stop(self):
        self._isRunning = False
        if self._success:
            self.status_update.emit(f' File {self.process}ed.')
        else:
            self.status_update.emit(
                f' {self.process.title()}ing process canceled.'
            )


class SizeChecker(QObject):
    progress_update = pyqtSignal(float)

    def __init__(self, target_size, file, process):
        super().__init__()
        self.file = file
        self.process = process
        self.target_size = target_size
        self._isRunning = True

    def run(self):
        if self.process == 'sav':
            while self._isRunning:
                time.sleep(0.01)
                progress = os.path.getsize(self.file) / self.target_size * 100
                self.progress_update.emit(progress)
        else:
            while self._isRunning:
                time.sleep(0.01)
                process = psutil.Process(os.getpid()).memory_info().rss
                progress = (process - self.file) / self.target_size * 100
                self.progress_update.emit(progress)

    def stop(self):
        self._isRunning = False


class Progress(QDialog):
    def __init__(self):
        super().__init__()
        self.progress = QProgressBar()
        lay = QVBoxLayout(self)
        lay.addWidget(self.progress)


class Widget(QWidget):
    def __init__(self, parent=None):
        super(Widget, self).__init__(parent)
        dump_btn = QPushButton("dump")
        dump_btn.clicked.connect(lambda: self.handle('sav'))
        load_btn = QPushButton("load")
        load_btn.clicked.connect(lambda: self.handle('load'))
        self.status = QTextEdit()

        self.file = 'test'
        self.data = [np.full(1000, 1000) for _ in range(500000)] # some arbitrary data
        self.popup = None
        self.worker_thread = QThread()
        self.worker = None
        self.checker_thread = QThread()
        self.size_checker = None

        lay = QVBoxLayout(self)
        lay.addWidget(dump_btn)
        lay.addWidget(load_btn)
        lay.addWidget(self.status)
        lay.addStretch()

    @pyqtSlot()
    def handle(self, process):
        self.popup = Progress()
        self.popup.setWindowTitle(f'{process.title()}ing data...')
        self.popup.finished.connect(self.finish_process)
        self.popup.show()
        data = self.data if process == 'sav' else None
        self.worker = Worker(self.file, data, process)
        if process == 'sav':
            target_size = memory(self.data)
            file = self.file
        else:
            target_size = os.path.getsize(self.file)
            file = psutil.Process(os.getpid()).memory_info().rss
        self.size_checker = SizeChecker(target_size, file, process)
        self.size_checker.moveToThread(self.checker_thread)
        self.size_checker.progress_update.connect(self.update_progress)
        self.checker_thread.started.connect(self.size_checker.run)
        self.checker_thread.start()

        self.worker.moveToThread(self.worker_thread)
        self.worker.status_update.connect(self.report_status)
        self.worker.progress_update.connect(self.update_progress)
        self.worker.finished.connect(self.finish_process)
        self.worker_thread.started.connect(self.worker.run)
        self.worker_thread.start()

    def finish_process(self):
        self.size_checker.stop()
        self.size_checker.progress_update.disconnect(self.update_progress)
        self.checker_thread.started.disconnect(self.size_checker.run)
        self.size_checker = None
        self.checker_thread.terminate()

        self.worker.stop()
        self.worker.status_update.disconnect(self.report_status)
        self.worker.progress_update.disconnect(self.update_progress)
        self.worker.finished.disconnect(self.finish_process)
        self.worker_thread.started.disconnect(self.worker.run)
        self.worker = None
        self.worker_thread.terminate()
        self.popup.finished.disconnect(self.finish_process)
        if self.popup.isVisible():
            self.popup.close()

    def update_progress(self, value):
        self.popup.progress.setValue(value)

    def report_status(self, text):
        self.status.insertPlainText(text)
        self.status.moveCursor(QTextCursor.End)


if __name__ == '__main__':
    import sys
    app = QApplication(sys.argv)
    w = Widget()
    w.resize(640, 480)
    w.show()
    sys.exit(app.exec_())

In this example you can see that in the case of dumping data to a file, I use the memory function of the RememberMe tool as explained here in order to get an idea about the size of the memory that is allocated to the data. I then continuously compare the size of the file the data is written to to that number and emit the percentage. As I said this is not 100% accurate as the file size and the allocated RAM differ by a few percent but it is usually good enough.

If a file is loaded, it is even more trickier. The file size as well as the allocated RAM of the whole Python process (as explained here) prior to loading the file is stored as a reference. Then, during the loading process, the difference between the initially allocated RAM of the Python process and the current one is compared to the file size. Again, this is not 100% accurate but typically close enough.

I'm sure someone more skilled can improve this method, but I think the general idea is pretty good.

like image 1
mapf Avatar answered Oct 21 '22 03:10

mapf