Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What does pyspark need psutil for? (faced "UserWarning: Please install psutil to have better support with spilling")?

I'm beginning to learn Spark using pyspark and wonder what the meaning of the following log message is?

UserWarning: Please install psutil to have better support with spilling

The operation that is causing the spill is a join between two RDDs:

print(user_types.join(user_genres).collect())

This may sound somewhat obvious, but my first question is

I did indeed install psutil, and the warning went away, but I'd like to understand what exactly is occurring. There is a very similar question here, but the OP was asking mostly how to just install psutil.

like image 929
Yu Chen Avatar asked Jul 07 '18 19:07

Yu Chen


1 Answers

Spill here means writing the in-memory dataframes to disk, which reduces the performance of pyspark, since writing to disk is slow.

Why psutil

To check the used memory of node.

This is the original snippet from pyspark source code shuffle.py taken from here which throws the warning. Below code defines a function to get the used memory if psutil is present or if the system is linux.

importing psutil and defining get_used_memory

try:
    import psutil
    def get_used_memory():
        """ Return the used memory in MB """
        process = psutil.Process(os.getpid())
        if hasattr(process, "memory_info"):
            info = process.memory_info()
        else:
            info = process.get_memory_info()
        return info.rss >> 20
except ImportError:
    def get_used_memory():
        """ Return the used memory in MB """
        if platform.system() == 'Linux':
            for line in open('/proc/self/status'):
                if line.startswith('VmRSS:'):
                    return int(line.split()[1]) >> 10
        else:
            warnings.warn("Please install psutil to have better "
                          "support with spilling")
            if platform.system() == "Darwin":
                import resource
                rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
                return rss >> 20
            # TODO: support windows
        return 0

Writing to disk

Below code invokes writing the dataframes to disk if used memory of node is greater than pre-set limit.

def mergeCombiners(self, iterator, check=True):
        """ Merge (K,V) pair by mergeCombiner """
        iterator = iter(iterator)
        # speedup attribute lookup
        d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
        c = 0
        for k, v in iterator:
            d[k] = comb(d[k], v) if k in d else v
            if not check:
                continue
            c += 1
            if c % batch == 0 and get_used_memory() > self.memory_limit:
                self._spill()
                self._partitioned_mergeCombiners(iterator, self._next_limit())
                break

Spill

This code actually writes aka spills the dataframes to disk in case the memory used is greater than preset limit.

def _spill(self):
        """
        dump already partitioned data into disks.
        It will dump the data in batch for better performance.
        """
        global MemoryBytesSpilled, DiskBytesSpilled
        path = self._get_spill_dir(self.spills)
        if not os.path.exists(path):
            os.makedirs(path)
        used_memory = get_used_memory()
        if not self.pdata:
            # The data has not been partitioned, it will iterator the
            # dataset once, write them into different files, has no
            # additional memory. It only called when the memory goes
            # above limit at the first time.
            # open all the files for writing
            streams = [open(os.path.join(path, str(i)), 'w')
                       for i in range(self.partitions)]
            for k, v in self.data.iteritems():
                h = self._partition(k)
                # put one item in batch, make it compatitable with load_stream
                # it will increase the memory if dump them in batch
                self.serializer.dump_stream([(k, v)], streams[h])
            for s in streams:
                DiskBytesSpilled += s.tell()
                s.close()
            self.data.clear()
            self.pdata = [{} for i in range(self.partitions)]
        else:
            for i in range(self.partitions):
                p = os.path.join(path, str(i))
                with open(p, "w") as f:
                    # dump items in batch
                    self.serializer.dump_stream(self.pdata[i].iteritems(), f)
                self.pdata[i].clear()
                DiskBytesSpilled += os.path.getsize(p)
        self.spills += 1
        gc.collect()  # release the memory as much as possible
        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
like image 151
Rahul Chawla Avatar answered Nov 06 '22 09:11

Rahul Chawla