I'm beginning to learn Spark using pyspark and wonder what the meaning of the following log message is?
UserWarning: Please install psutil to have better support with spilling
The operation that is causing the spill is a join
between two RDDs:
print(user_types.join(user_genres).collect())
This may sound somewhat obvious, but my first question is
I did indeed install psutil
, and the warning went away, but I'd like to understand what exactly is occurring. There is a very similar question here, but the OP was asking mostly how to just install psutil
.
Spill here means writing the in-memory dataframes to disk, which reduces the performance of pyspark, since writing to disk is slow.
To check the used memory of node.
This is the original snippet from pyspark source code shuffle.py taken from here which throws the warning. Below code defines a function to get the used memory if psutil is present or if the system is linux.
try:
import psutil
def get_used_memory():
""" Return the used memory in MB """
process = psutil.Process(os.getpid())
if hasattr(process, "memory_info"):
info = process.memory_info()
else:
info = process.get_memory_info()
return info.rss >> 20
except ImportError:
def get_used_memory():
""" Return the used memory in MB """
if platform.system() == 'Linux':
for line in open('/proc/self/status'):
if line.startswith('VmRSS:'):
return int(line.split()[1]) >> 10
else:
warnings.warn("Please install psutil to have better "
"support with spilling")
if platform.system() == "Darwin":
import resource
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
return rss >> 20
# TODO: support windows
return 0
Below code invokes writing the dataframes to disk if used memory of node is greater than pre-set limit.
def mergeCombiners(self, iterator, check=True):
""" Merge (K,V) pair by mergeCombiner """
iterator = iter(iterator)
# speedup attribute lookup
d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
c = 0
for k, v in iterator:
d[k] = comb(d[k], v) if k in d else v
if not check:
continue
c += 1
if c % batch == 0 and get_used_memory() > self.memory_limit:
self._spill()
self._partitioned_mergeCombiners(iterator, self._next_limit())
break
This code actually writes aka spills the dataframes to disk in case the memory used is greater than preset limit.
def _spill(self):
"""
dump already partitioned data into disks.
It will dump the data in batch for better performance.
"""
global MemoryBytesSpilled, DiskBytesSpilled
path = self._get_spill_dir(self.spills)
if not os.path.exists(path):
os.makedirs(path)
used_memory = get_used_memory()
if not self.pdata:
# The data has not been partitioned, it will iterator the
# dataset once, write them into different files, has no
# additional memory. It only called when the memory goes
# above limit at the first time.
# open all the files for writing
streams = [open(os.path.join(path, str(i)), 'w')
for i in range(self.partitions)]
for k, v in self.data.iteritems():
h = self._partition(k)
# put one item in batch, make it compatitable with load_stream
# it will increase the memory if dump them in batch
self.serializer.dump_stream([(k, v)], streams[h])
for s in streams:
DiskBytesSpilled += s.tell()
s.close()
self.data.clear()
self.pdata = [{} for i in range(self.partitions)]
else:
for i in range(self.partitions):
p = os.path.join(path, str(i))
with open(p, "w") as f:
# dump items in batch
self.serializer.dump_stream(self.pdata[i].iteritems(), f)
self.pdata[i].clear()
DiskBytesSpilled += os.path.getsize(p)
self.spills += 1
gc.collect() # release the memory as much as possible
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With