I am in the process of improving a program that parses XML and categorises and indexes its subtrees. The actual program is too large to show here, so I have brought it down to a minimal test case showing the issue I encounter.
The idea is:
alpino_ds
nodes in a file, in parallelNote that in the actual code there are some more caveats:
dict
s of four levels deep consisting of dicts
, set
s, int
s, and string
s, as well as dict-to-filehandle, and Counter()
objects;ThreadPoolExecutor
) and even though there was some gain (I calculated around 5% improvement in speed), this is not good enough for me;alpino_ds
tags per file. That is the main reason I want to run things in parallel - there is just so much data. That means that the nested objects get quite big as well, so merging/sharing these objects between processes may be a bottleneck in itself.Example code:
from pathlib import Path
from collections import Counter
from copy import copy
from lxml import etree
import concurrent.futures
class XmlGrinder:
def __init__(self, m=1):
if m is False:
self.m = 1
elif m == 0:
self.m = None
else:
self.m = m
self.max_a = 7
self.max_b = 1000
self.pdin = self.pdout = None
self.pattern_counter = self.fhs = self.corpus = None
def grind(self, din, dout):
self.pdin = Path(din)
self.pdout = Path(dout)
for file in self.pdin.glob('*.xml'):
self._grind_xml(file)
def _grind_xml(self, pfin):
self.pattern_counter = Counter()
self.filenames = set()
self.fhs = {}
self.corpus = pfin.stem
with concurrent.futures.ProcessPoolExecutor(max_workers=self.m) as executor:
jobs = []
context = etree.iterparse(str(pfin), tag='alpino_ds')
for _, node in context:
attrs = node.attrib
# node has to have id
if 'id' not in attrs:
continue
jobs.append(executor.submit(self._process_node, etree.tostring(node)))
# Makes sure our memory usage is kept in check by getting rid of unused elements
# Borrowed from https://stackoverflow.com/a/7171543/1150683
node.clear()
# Also eliminate now-empty references from the root node to elem
for ancestor in node.xpath('ancestor-or-self::*'):
while ancestor.getprevious() is not None:
del ancestor.getparent()[0]
# Get rid of xml iterator
del context
sentence_nr = 0
for job in concurrent.futures.as_completed(jobs):
sentence_nr += 1
print(f"Processed {self.corpus} sentence {sentence_nr:,d}", job.result(), flush=True)
# self.* variables are empty! :-(
print('pattern counter:', self.pattern_counter)
print('filenames:', self.filenames)
print('filehandles:', self.fhs)
# won't do anything because fh is empty:
for fh in self.fhs.values():
fh.close()
def _process_node(self, xml_str):
node = etree.fromstring(xml_str)
all_cats = ''
for subnode in node.iter('node'):
children_size = sum(1 for _ in subnode.iterchildren('node'))
descendants_size = sum(1 for _ in subnode.iter('node'))
# Size requirements of children and descendants
if children_size < 1 \
or self.max_a < children_size \
or descendants_size > self.max_b:
continue
# get attribute of node
cat = subnode.attrib['cat']
all_cats += cat
self.pattern_counter[cat] += 1
# Create new XML tree
tree_xml = etree.Element('tree', {
'index': f"{cat}-{self.pattern_counter[cat]}"
})
tree_xml.append(copy(subnode))
# open filehandle and write new tree to file
if cat not in self.fhs:
(self.pdout / self.corpus).mkdir(exist_ok=True, parents=True)
tree_filename = self.pdout / self.corpus / f"{self.corpus}-{cat}-trees.xml"
# open file handle and keep it open, only close after loop
self.fhs[cat] = tree_filename.open(mode='a', encoding='utf-8')
self.fhs[cat].write('\n\t\t' + etree.tostring(tree_xml, encoding='unicode'))
return all_cats
if __name__ == '__main__':
# use m=[int] to enable multiple cores or m=0 to utilise all cores
xml_grindr = XmlGrinder()
xml_grindr.grind(r'../data', r'../output')
Sample XML (save it to an XML file and put it inside a directory; use that directory as the first argument of xml_grindr.grind()
):
<?xml version="1.0" encoding="UTF-8"?><treebank><alpino_ds version="1.3" id="18.head.1.s.1"><node begin="0" cat="top" end="4" id="0" rel="top"><node begin="0" cat="conj" end="4" id="1" rel="--"><node begin="0" end="1" frame="within_word_conjunct" id="2" lcat="np" lemma="_" pos="prefix" postag="SPEC(afgebr)" pt="spec" rel="cnj" root="taal" sense="taal" spectype="afgebr" word="taal-"/><node begin="1" conjtype="neven" end="2" frame="conj(en)" id="3" lcat="vg" lemma="en" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="en"/><node begin="2" cat="mwu" end="4" id="4" mwu_root="spraaktechnologienieuws jul'03" mwu_sense="spraaktechnologienieuws jul'03" rel="cnj"><node begin="2" end="3" frame="proper_name(both)" genus="onz" getal="ev" graad="basis" id="5" lcat="np" lemma="spraaktechnologienieuws" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,onz,stan)" pt="n" rel="mwp" root="spraaktechnologienieuws" sense="spraaktechnologienieuws" word="spraaktechnologienieuws"/><node begin="3" end="4" frame="proper_name(both)" id="6" lcat="np" lemma="_" num="both" pos="name" postag="SPEC(symb)" pt="spec" rel="mwp" root="jul'03" sense="jul'03" spectype="symb" word="jul'03"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="noun(de,count,sg)" gen="de" id="1" lcat="np" lemma="1" num="sg" numtype="hoofd" pos="noun" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="1" sense="1" word="1"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="mwu" end="5" id="1" mwu_root="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" mwu_sense="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Discussie" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="DISCUSSIE" sense="DISCUSSIE" word="DISCUSSIE"/><node begin="1" end="2" frame="proper_name(both)" id="3" lcat="np" lemma="over" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="OVER" sense="OVER" vztype="init" word="OVER"/><node begin="2" end="3" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="4" lcat="np" lemma="taaltechnologie" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALTECHNOLOGIE" sense="TAALTECHNOLOGIE" word="TAALTECHNOLOGIE"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="iN" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="IN" sense="IN" vztype="init" word="IN"/><node begin="4" end="5" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="taalschrift" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALSCHRIFT" sense="TAALSCHRIFT" word="TAALSCHRIFT"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="2" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="2" sense="2" special="hoofd" word="2"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.2"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="mwu" end="7" id="1" mwu_root="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&C" mwu_sense="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&C" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Amerikaans" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="AMERIKAANSE" sense="AMERIKAANSE" word="AMERIKAANSE"/><node begin="1" end="2" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="3" lcat="np" lemma="overheid" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="OVERHEID" sense="OVERHEID" word="OVERHEID"/><node begin="2" end="3" frame="proper_name(both)" id="4" lcat="np" lemma="kiezen" num="both" pos="name" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="mwp" root="KIEST" sense="KIEST" word="KIEST" wvorm="pv"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="voor" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VOOR" sense="VOOR" vztype="init" word="VOOR"/><node begin="4" conjtype="neven" end="5" frame="proper_name(both)" id="6" lcat="np" lemma="linkfactory" num="both" pos="name" postag="VG(neven)" pt="vg" rel="mwp" root="LINKFACTORY" sense="LINKFACTORY" word="LINKFACTORY"/><node begin="5" end="6" frame="proper_name(both)" id="7" lcat="np" lemma="van" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VAN" sense="VAN" vztype="init" word="VAN"/><node begin="6" end="7" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="8" lcat="np" lemma="l&amp;C" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="L&C" sense="L&C" word="L&C"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="3" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="3" sense="3" special="hoofd" word="3"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.2"><node begin="0" cat="top" end="6" id="0" rel="top"><node begin="0" cat="np" end="6" id="1" rel="--"><node begin="0" buiging="met-e" end="1" frame="noun(both,both,both)" gen="both" graad="basis" id="2" lcat="np" lemma="Spraakgestuurde" naamval="stan" num="both" pos="noun" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="hd" root="spraakgestuurde" sense="spraakgestuurde" word="SPRAAKGESTUURDE"/><node begin="1" cat="mwu" end="6" id="3" mwu_root="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" mwu_sense="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" rel="app"><node begin="1" buiging="met-e" end="2" frame="proper_name(both)" graad="basis" id="4" lcat="np" lemma="Last-minuat" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="LAST-MINUTE" sense="LAST-MINUTE" word="LAST-MINUTE"/><node begin="2" end="3" frame="proper_name(both)" getal="mv" graad="basis" id="5" lcat="np" lemma="taalcursus" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TAALCURSUS" sense="TAALCURSUS" word="TAALCURSUS"/><node begin="3" end="4" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="Via" naamval="stan" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,zijd,stan)" pt="n" rel="mwp" root="VIA" sense="VIA" word="VIA"/><node begin="4" end="5" frame="proper_name(both)" id="7" lcat="np" lemma="dE" lwtype="bep" naamval="stan" npagr="rest" num="both" pos="name" postag="LID(bep,stan,rest)" pt="lid" rel="mwp" root="DE" sense="DE" word="DE"/><node begin="5" end="6" frame="proper_name(both)" getal="mv" graad="basis" id="8" lcat="np" lemma="telefoon" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TELEFOON" sense="TELEFOON" word="TELEFOON"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="4" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="4" sense="4" special="hoofd" word="4"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="np" end="5" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" id="2" lcat="np" lemma="_" num="pl" pos="noun" postag="SPEC(deeleigen)" pt="spec" rel="hd" root="aio_vacature" sense="aio_vacature" spectype="deeleigen" word="AIO-VACATURES"/><node begin="1" cat="pp" end="5" id="3" rel="mod"><node begin="1" end="2" frame="preposition(in,[])" id="4" lcat="pp" lemma="iN" pos="prep" postag="VZ(init)" pt="vz" rel="hd" root="in" sense="in" vztype="init" word="IN"/><node begin="2" cat="conj" end="5" id="5" rel="obj1"><node begin="2" end="3" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="6" lcat="np" lemma="Tilburg" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="TILBURG" sense="TILBURG" word="TILBURG"/><node begin="3" conjtype="neven" end="4" frame="conj(en)" id="7" lcat="vg" lemma="eN" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="EN"/><node begin="4" end="5" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="8" lcat="np" lemma="Amsterdam" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="AMSTERDAM" sense="AMSTERDAM" word="AMSTERDAM"/></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.1.s.1"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="smain" end="7" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" getal="mv" graad="basis" id="2" index="1" lcat="np" lemma="computer" ntype="soort" num="pl" pos="noun" postag="N(soort,mv,basis)" pt="n" rel="su" root="computer" sense="computer" word="Computers"/><node begin="1" end="2" frame="verb(hebben,pl,aux(te_inf))" id="3" infl="pl" lcat="smain" lemma="hoeven" pos="verb" postag="WW(pv,tgw,mv)" pt="ww" pvagr="mv" pvtijd="tgw" rel="hd" root="hoef" sc="aux(te_inf)" sense="hoef" tense="present" word="hoeven" wvorm="pv"/><node begin="0" cat="ti" end="7" id="4" rel="vc"><node begin="4" end="5" frame="complementizer(te)" id="5" lcat="cp" lemma="te" pos="comp" postag="VZ(init)" pt="vz" rel="cmp" root="te" sc="te" sense="te" vztype="init" word="te"/><node begin="0" cat="inf" end="7" id="6" rel="body"><node begin="0" end="1" id="7" index="1" rel="su"/><node begin="5" buiging="zonder" end="6" frame="verb('hebben/zijn',inf,aux(inf))" id="8" infl="inf" lcat="inf" lemma="kunnen" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="kan" sc="aux(inf)" sense="kan" word="kunnen" wvorm="inf"/><node begin="0" cat="inf" end="7" id="9" rel="vc"><node begin="0" end="1" id="10" index="1" rel="su"/><node begin="2" cat="np" end="4" id="11" rel="obj1"><node begin="2" buiging="zonder" end="3" frame="determiner(geen,nwh,mod,pro,yparg,nwkpro,geen)" id="12" infl="geen" lcat="detp" lemma="geen" naamval="stan" npagr="agr" pdtype="det" pos="det" positie="prenom" postag="VNW(onbep,det,stan,prenom,zonder,agr)" pt="vnw" rel="det" root="geen" sense="geen" vwtype="onbep" wh="nwh" word="geen"/><node begin="3" end="4" frame="noun(het,mass,sg)" gen="het" genus="onz" getal="ev" graad="basis" id="13" lcat="np" lemma="Nederlands" naamval="stan" ntype="eigen" num="sg" pos="noun" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="hd" root="Nederlands" sense="Nederlands" word="Nederlands"/></node><node begin="6" buiging="zonder" end="7" frame="verb(hebben,inf(no_e),transitive)" id="14" infl="inf(no_e)" lcat="inf" lemma="verstaan" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="versta" sc="transitive" sense="versta" word="verstaan" wvorm="inf"/></node></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.2.s.2"><node begin="0" cat="top" end="14" id="0" rel="top"><node begin="7" end="8" frame="punct(komma)" id="1" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="10" end="11" frame="punct(komma)" id="2" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="0" cat="smain" end="13" id="3" rel="--"><node begin="0" end="1" frame="er_adverb(voor)" id="4" lcat="pp" lemma="daarvoor" pos="pp" postag="BW()" pt="bw" rel="mod" root="daarvoor" sense="daarvoor" special="er" word="Daarvoor"/><node begin="1" end="2" frame="verb(unacc,sg3,intransitive)" id="5" infl="sg3" lcat="smain" lemma="verlopen" pos="verb" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="hd" root="verloop" sc="intransitive" sense="verloop" tense="present" word="verloopt" wvorm="pv"/><node begin="2" cat="np" end="5" id="6" rel="su"><node begin="2" end="3" frame="determiner(de)" id="7" infl="de" lcat="detp" lemma="de" lwtype="bep" naamval="stan" npagr="rest" pos="det" postag="LID(bep,stan,rest)" pt="lid" rel="det" root="de" sense="de" word="de"/><node aform="base" begin="3" buiging="met-e" end="4" frame="adjective(e)" graad="basis" id="8" infl="e" lcat="ap" lemma="menselijk" naamval="stan" pos="adj" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mod" root="menselijk" sense="menselijk" vform="adj" word="menselijke"/><node begin="4" end="5" frame="noun(de,count,sg)" gen="de" genus="zijd" getal="ev" graad="basis" id="9" lcat="np" lemma="communicatie" naamval="stan" ntype="soort" num="sg" pos="noun" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="hd" root="communicatie" sense="communicatie" word="communicatie"/></node><node begin="5" cat="ap" end="7" id="10" rel="mod"><node begin="5" end="6" frame="intensifier" id="11" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="6" buiging="zonder" end="7" frame="adjective(no_e(adv))" graad="basis" id="12" infl="no_e" lcat="ap" lemma="subtiel" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="subtiel" sense="subtiel" vform="adj" word="subtiel"/></node><node begin="8" cat="ppart" end="10" id="13" rel="mod"><node begin="8" end="9" frame="intensifier" id="14" lcat="advp" lemma="te" pos="adv" postag="VZ(init)" pt="vz" rel="mod" root="te" sense="te" special="intensifier" vztype="init" word="te"/><node aform="base" begin="9" buiging="zonder" end="10" frame="adjective(ge_no_e(adv))" id="15" infl="no_e" lcat="ppart" lemma="nuanceren" pos="adj" positie="vrij" postag="WW(vd,vrij,zonder)" pt="ww" rel="hd" root="genuanceerd" sense="genuanceerd" vform="psp" word="genuanceerd" wvorm="vd"/></node><node begin="11" cat="ap" end="13" id="16" rel="mod"><node begin="11" end="12" frame="intensifier" id="17" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="12" buiging="zonder" end="13" frame="adjective(no_e(adv))" graad="basis" id="18" infl="no_e" lcat="ap" lemma="rijk" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="rijk" sense="rijk" vform="adj" word="rijk"/></node></node><node begin="13" end="14" frame="punct(punt)" id="19" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds></treebank>
Pipfile
, in case you wish to set up a local environment for testing (the above script probably works with 3.4 and up, though):
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
lxml = ">=4.2.1"
[dev-packages]
[requires]
python_version = "3.6"
Depending on how you named the XML, the output of the script will be something as follows:
Processing WRPEE-dummy sentence 1 topconjmwu
Processing WRPEE-dummy sentence 2 top
Processing WRPEE-dummy sentence 3 topmwu
Processing WRPEE-dummy sentence 4 top
Processing WRPEE-dummy sentence 5 topmwu
Processing WRPEE-dummy sentence 6 top
Processing WRPEE-dummy sentence 7 topnpmwu
Processing WRPEE-dummy sentence 8 top
Processing WRPEE-dummy sentence 9 topnpppconj
Processing WRPEE-dummy sentence 10 topsmaintiinfinfnp
Processing WRPEE-dummy sentence 11 topsmainnpapppartap
pattern counter: Counter()
filenames: set()
filehandles: {}
This shows that the variables that I want to be shared between processes is indeed not shared or modified. (I am aware that this happens because the process and its variables are forked.) In this dummy example it may not seem very important (even though it is clear that now I can't close the opened file handles), but in the large program these and other variables are modified and used inside the forked processes. The question is, then, how to make that work.
I read about Pipe() and Queue() and it seems to me that I would need Queue()
. In addition, because I will be reading and writing very often from the different processes to the same object I think I need a Manager()
as well. This is the part that I am uncertain about, however. I tried reading this topic but it only confused me more. Furthermore, the comments suggest that even on 3.6.4 there may be issues when using complex objects. Remember that the actual data that I am sharing is nested and consists of different types.
The question, in summary, thus is: how can I re-write the above example code to ensure that all Process
es have (non-blocking) read/write access to instance variables inside _process_node
and methods that are called from within that process? I am willing to update my current Python version (3.6.4) to 3.7, and to use additional libraries.
class multiprocessing.shared_memory. ShareableList (sequence=None, *, name=None) ¶ Provides a mutable list-like object where all values stored within are stored in a shared memory block. This constrains storable values to only the int, float, bool, str (less than 10M bytes each), bytes (less than 10M bytes each), and None built-in data types.
SharedMemoryManager ([address authkey]]) ¶ A subclass of BaseManager which can be used for the management of shared memory blocks across processes. A call to start () on a SharedMemoryManager instance causes a new process to be started. This new process’s sole purpose is to manage the life cycle of all shared memory blocks created through it.
Processes are conventionally limited to only have access to their own process memory space but shared memory permits the sharing of data between processes, avoiding the need to instead send messages between processes containing that data.
Each shared memory block is assigned a unique name. In this way, one process can create a shared memory block with a particular name and a different process can attach to that same shared memory block using that same name. As a resource for sharing data across processes, shared memory blocks may outlive the original process that created them.
The multiprocessing
library lets you make use of parallelism in concurrent Python code. Without multiprocessing
the Python GIL tends to get in the way of true parallel execution, but you should see multiprocessing
code as no different from other concurrency techniques. Basically, the biggest difference between multiprocessing
and threads, is that state is shared via slow-ish IPC calls.
This means you need to carefully handle shared resources. Your current implementation doesn't do a great job of this; you have multiple concurrent tasks access shared resources without regard for what others may be doing. There are multiple opportunities for race conditions in your code, where multiple tasks can write to the same file, or where a nested data structure is updated without regard for other updates.
When you have to update shared data structures or files, you usually can pick between two options:
Note that you'll have to pass either of these objects (synchronisation primitives or queues) to child processes explicitly, see the programming guidelines; don't use references on an instance to share state.
For your case, I'd go with queues and dedicated tasks; your bottleneck is the data processing, writing data to disk and updating a few data structures with the results of the analysis tasks is relatively fast in comparison.
So use a single task to write to files; just put the serialised XML string together with the cat
value into a dedicated queue, and have a separate task that pulls these from the queue and writes them to files. This separate task is then responsible for all file access, including opening and closing. This serialises file access and removes the possibility for race conditions and clobbered writes. If the data for these files comes in so thick and fast as to make this task a bottleneck, create tasks per target file.
Do the same for the shared data structures; send mutations to a queue, leave it to a dedicated task to merge the data. Updating proxy objects is not really suitable because their changes propagate to other processes via RPC calls, increasing the chances for race conditions, and locking won't guarantee that the data is consistent across all task processes!
For your simple example, updates to the Counter()
object are not actually shared; each child process inherits a copy when it forks and updates that local copy, and the parent process will never see the changes made. So you'd use a local, new Counter()
instance, and push that into a queue. A dedicated task can then receive these from the queue and update a local Counter()
instance with the values by using total_counter.update(queued_counter)
, again ensuring that updates are serialised.
To illustrate, here is a contrived example counting Lorem Ipsum data; a series of count_words
tasks do the counting, but pass the Counter()
object they produce to a queue for a separate collating task to combine into a final word count. A separate logging task writes data from a logging queue to disk:
import datetime
import random
import re
import time
from collections import Counter
from functools import partial
from multiprocessing import Manager, Pool
from io import TextIOWrapper
from urllib.request import urlopen
COMPLETE = "COMPLETE"
def collating_task(countsqueue, logqueue):
wordcounts = Counter()
# Loop until COMPLETE is found in the queue
for counts in iter(countsqueue.get, COMPLETE):
wordcounts.update(counts)
logqueue.put(
f"collating: updating with {len(counts)} words "
f"(total {len(wordcounts)})"
)
return wordcounts
def logging_task(logqueue):
# Loop until COMPLETE is found in the queue
with open('logfile.txt', 'w') as logf:
for message in iter(logqueue.get, COMPLETE):
print(datetime.datetime.now(), message, flush=True, file=logf)
def count_words(line, countsqueue, logqueue):
findwords = re.compile(r"\w+").findall
counts = Counter(findwords(line))
logqueue.put(f"counting: counted {len(counts)} words")
# a random short delay to make this task 'heavy'
time.sleep(random.uniform(0.0, 0.05))
countsqueue.put(counts)
def main():
# Random latin text, 1000 paragraphs
loripsum_response = urlopen("https://loripsum.net/api/1000/long/plaintext")
text = list(TextIOWrapper(loripsum_response, encoding="utf8"))
print(f"Will process {len(text)} lines of data")
# create managed queues that can be passed in as arguments. The alternative
# is to create globals or Process() objects with queues passed in.
manager = Manager()
countsqueue, logqueue = manager.Queue(), manager.Queue()
with Pool() as pool:
# start processing tasks, these loop forever until signalled
collator = pool.apply_async(collating_task, (countsqueue, logqueue))
logger = pool.apply_async(logging_task, (logqueue,))
# process lines, blocks until complete
pool.map(partial(count_words, countsqueue=countsqueue, logqueue=logqueue), text)
countsqueue.put(COMPLETE)
wordcounts = collator.get()
logqueue.put(COMPLETE)
logger.wait()
print(f"Counted {len(wordcounts)} different words; top 5 is:")
for word, count in wordcounts.most_common(5):
print(f'{word:<10} {count:4d}')
if __name__ == "__main__":
main()
which produces something like:
Will process 2000 lines of data
Counted 5651 different words; top 5 is:
et 2078
in 2074
est 2036
non 1911
ut 1477
and a largish logfile.txt
with information pushed to the logging queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With