Message ID | 20250702222437.3733042-2-richard.purdie@linuxfoundation.org |
---|---|
State | New |
Headers | show |
Series | [1/2] cooker: Try and avoid parseing hangs | expand |
On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via lists.openembedded.org <richard.purdie=linuxfoundation.org@lists.openembedded.org> wrote: > > Curerntly, recipes to parse are split into equal groups and passed to > each parse thread at the start of parsing. We can replace this with > a queue and collect a new job as each parsing process becomes idle > to better spread load in the case of slow parsing jobs. > > Some of the data we need has to be passed in at fork time since it > can't be pickled, so the job to parse is only referenced as an index > in that list. > > This should better spread load for slow to parse recipes such as those > with many class extensions. > > Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org> > --- > lib/bb/cooker.py | 29 ++++++++++++++++++----------- > 1 file changed, 18 insertions(+), 11 deletions(-) > > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py > index 91e3ee025ea..e88ad24cf61 100644 > --- a/lib/bb/cooker.py > +++ b/lib/bb/cooker.py > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception): > Exception.__init__(self, realexception, recipe) > > class Parser(multiprocessing.Process): > - def __init__(self, jobs, results, quit, profile): > + def __init__(self, jobs, jobid_queue, results, quit, profile): > self.jobs = jobs > + self.jobid_queue = jobid_queue > self.results = results > self.quit = quit > multiprocessing.Process.__init__(self) > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process): > if self.quit.is_set(): > break > > - job = None > + jobid = None > try: > - job = self.jobs.pop() > - except IndexError: > + jobid = self.jobid_queue.get(True, 0.5) > + except (ValueError, OSError): > havejobs = False > - if job: > + > + if jobid is not None: > + job = self.jobs[jobid] > result = self.parse(*job) > # Clear the siggen cache after parsing to control memory usage, its huge > bb.parse.siggen.postparsing_clean_cache() > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process): > except queue.Full: > pending.append(result) > finally: > + self.jobs.close() > self.results.close() > self.results.join_thread() > > @@ -2134,13 +2138,13 @@ class CookerParser(object): > > self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array) > self.fromcache = set() > - self.willparse = set() > + self.willparse = [] > for mc in self.cooker.multiconfigs: > for filename in self.mcfilelist[mc]: > appends = self.cooker.collections[mc].get_file_appends(filename) > layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2] > if not self.bb_caches[mc].cacheValid(filename, appends): > - self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername)) > + self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername)) > else: > self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername)) > > @@ -2159,22 +2163,25 @@ class CookerParser(object): > def start(self): > self.results = self.load_cached() > self.processes = [] > + > if self.toparse: > bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) > > + self.toparse_queue = multiprocessing.Queue(len(self.willparse)) > self.parser_quit = multiprocessing.Event() > self.result_queue = multiprocessing.Queue() > > - def chunkify(lst,n): > - return [lst[i::n] for i in range(n)] > - self.jobs = chunkify(list(self.willparse), self.num_processes) > + for jobid in range(len(self.willparse)): > + self.toparse_queue.put(jobid) It's generally not a good idea to assume you can push all the items on the queue before there are any consumers; if the queue fills, this will deadlock. It would be better to push the items to the queue after creating the processes. If you do that, I also don't see any reason to push the indexes instead of the jobs directly; basically instead of "chunkifying" and pre-determining the assignment of jobs to processes, have them all pull from one queue of jobs. > > for i in range(0, self.num_processes): > - parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile) > + parser = Parser(self.willparse, self.toparse_queue, self.result_queue, self.parser_quit, self.cooker.configuration.profile) > parser.start() > self.process_names.append(parser.name) > self.processes.append(parser) > > + self.toparse_queue.close() > + > self.results = itertools.chain(self.results, self.parse_generator()) > > def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"): > > -=-=-=-=-=-=-=-=-=-=-=- > Links: You receive all messages sent to this group. > View/Reply Online (#17738): https://lists.openembedded.org/g/bitbake-devel/message/17738 > Mute This Topic: https://lists.openembedded.org/mt/113957083/3616693 > Group Owner: bitbake-devel+owner@lists.openembedded.org > Unsubscribe: https://lists.openembedded.org/g/bitbake-devel/unsub [JPEWhacker@gmail.com] > -=-=-=-=-=-=-=-=-=-=-=- >
On Thu, 2025-07-03 at 08:27 -0600, Joshua Watt wrote: > On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via > lists.openembedded.org > <richard.purdie=linuxfoundation.org@lists.openembedded.org> wrote: > > > > Curerntly, recipes to parse are split into equal groups and passed to > > each parse thread at the start of parsing. We can replace this with > > a queue and collect a new job as each parsing process becomes idle > > to better spread load in the case of slow parsing jobs. > > > > Some of the data we need has to be passed in at fork time since it > > can't be pickled, so the job to parse is only referenced as an index > > in that list. > > > > This should better spread load for slow to parse recipes such as those > > with many class extensions. > > > > Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org> > > --- > > lib/bb/cooker.py | 29 ++++++++++++++++++----------- > > 1 file changed, 18 insertions(+), 11 deletions(-) > > > > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py > > index 91e3ee025ea..e88ad24cf61 100644 > > --- a/lib/bb/cooker.py > > +++ b/lib/bb/cooker.py > > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception): > > Exception.__init__(self, realexception, recipe) > > > > class Parser(multiprocessing.Process): > > - def __init__(self, jobs, results, quit, profile): > > + def __init__(self, jobs, jobid_queue, results, quit, profile): > > self.jobs = jobs > > + self.jobid_queue = jobid_queue > > self.results = results > > self.quit = quit > > multiprocessing.Process.__init__(self) > > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process): > > if self.quit.is_set(): > > break > > > > - job = None > > + jobid = None > > try: > > - job = self.jobs.pop() > > - except IndexError: > > + jobid = self.jobid_queue.get(True, 0.5) > > + except (ValueError, OSError): > > havejobs = False > > - if job: > > + > > + if jobid is not None: > > + job = self.jobs[jobid] > > result = self.parse(*job) > > # Clear the siggen cache after parsing to control memory usage, its huge > > bb.parse.siggen.postparsing_clean_cache() > > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process): > > except queue.Full: > > pending.append(result) > > finally: > > + self.jobs.close() > > self.results.close() > > self.results.join_thread() > > > > @@ -2134,13 +2138,13 @@ class CookerParser(object): > > > > self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array) > > self.fromcache = set() > > - self.willparse = set() > > + self.willparse = [] > > for mc in self.cooker.multiconfigs: > > for filename in self.mcfilelist[mc]: > > appends = self.cooker.collections[mc].get_file_appends(filename) > > layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2] > > if not self.bb_caches[mc].cacheValid(filename, appends): > > - self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername)) > > + self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername)) > > else: > > self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername)) > > > > @@ -2159,22 +2163,25 @@ class CookerParser(object): > > def start(self): > > self.results = self.load_cached() > > self.processes = [] > > + > > if self.toparse: > > bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) > > > > + self.toparse_queue = multiprocessing.Queue(len(self.willparse)) > > self.parser_quit = multiprocessing.Event() > > self.result_queue = multiprocessing.Queue() > > > > - def chunkify(lst,n): > > - return [lst[i::n] for i in range(n)] > > - self.jobs = chunkify(list(self.willparse), self.num_processes) > > + for jobid in range(len(self.willparse)): > > + self.toparse_queue.put(jobid) > > It's generally not a good idea to assume you can push all the items on > the queue before there are any consumers; if the queue fills, this > will deadlock. It would be better to push the items to the queue after > creating the processes. True. I did make sure the queue was large enough above however it would depend on the size of the job entries. > If you do that, I also don't see any reason to > push the indexes instead of the jobs directly; basically instead of > "chunkifying" and pre-determining the assignment of jobs to processes, > have them all pull from one queue of jobs. I added this comment to the patch in master next: # Have to pass in willparse at fork time so all parsing processes have the unpickleable data # then access it by index from the parse queue. As if you don't do that, it can't rebuild the data as the cache objects aren't pickleable :( Cheers, Richard
On Thu, Jul 3, 2025 at 8:30 AM Richard Purdie <richard.purdie@linuxfoundation.org> wrote: > > On Thu, 2025-07-03 at 08:27 -0600, Joshua Watt wrote: > > On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via > > lists.openembedded.org > > <richard.purdie=linuxfoundation.org@lists.openembedded.org> wrote: > > > > > > Curerntly, recipes to parse are split into equal groups and passed to > > > each parse thread at the start of parsing. We can replace this with > > > a queue and collect a new job as each parsing process becomes idle > > > to better spread load in the case of slow parsing jobs. > > > > > > Some of the data we need has to be passed in at fork time since it > > > can't be pickled, so the job to parse is only referenced as an index > > > in that list. > > > > > > This should better spread load for slow to parse recipes such as those > > > with many class extensions. > > > > > > Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org> > > > --- > > > lib/bb/cooker.py | 29 ++++++++++++++++++----------- > > > 1 file changed, 18 insertions(+), 11 deletions(-) > > > > > > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py > > > index 91e3ee025ea..e88ad24cf61 100644 > > > --- a/lib/bb/cooker.py > > > +++ b/lib/bb/cooker.py > > > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception): > > > Exception.__init__(self, realexception, recipe) > > > > > > class Parser(multiprocessing.Process): > > > - def __init__(self, jobs, results, quit, profile): > > > + def __init__(self, jobs, jobid_queue, results, quit, profile): > > > self.jobs = jobs > > > + self.jobid_queue = jobid_queue > > > self.results = results > > > self.quit = quit > > > multiprocessing.Process.__init__(self) > > > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process): > > > if self.quit.is_set(): > > > break > > > > > > - job = None > > > + jobid = None > > > try: > > > - job = self.jobs.pop() > > > - except IndexError: > > > + jobid = self.jobid_queue.get(True, 0.5) > > > + except (ValueError, OSError): > > > havejobs = False > > > - if job: > > > + > > > + if jobid is not None: > > > + job = self.jobs[jobid] > > > result = self.parse(*job) > > > # Clear the siggen cache after parsing to control memory usage, its huge > > > bb.parse.siggen.postparsing_clean_cache() > > > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process): > > > except queue.Full: > > > pending.append(result) > > > finally: > > > + self.jobs.close() > > > self.results.close() > > > self.results.join_thread() > > > > > > @@ -2134,13 +2138,13 @@ class CookerParser(object): > > > > > > self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array) > > > self.fromcache = set() > > > - self.willparse = set() > > > + self.willparse = [] > > > for mc in self.cooker.multiconfigs: > > > for filename in self.mcfilelist[mc]: > > > appends = self.cooker.collections[mc].get_file_appends(filename) > > > layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2] > > > if not self.bb_caches[mc].cacheValid(filename, appends): > > > - self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername)) > > > + self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername)) > > > else: > > > self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername)) > > > > > > @@ -2159,22 +2163,25 @@ class CookerParser(object): > > > def start(self): > > > self.results = self.load_cached() > > > self.processes = [] > > > + > > > if self.toparse: > > > bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) > > > > > > + self.toparse_queue = multiprocessing.Queue(len(self.willparse)) > > > self.parser_quit = multiprocessing.Event() > > > self.result_queue = multiprocessing.Queue() > > > > > > - def chunkify(lst,n): > > > - return [lst[i::n] for i in range(n)] > > > - self.jobs = chunkify(list(self.willparse), self.num_processes) > > > + for jobid in range(len(self.willparse)): > > > + self.toparse_queue.put(jobid) > > > > It's generally not a good idea to assume you can push all the items on > > the queue before there are any consumers; if the queue fills, this > > will deadlock. It would be better to push the items to the queue after > > creating the processes. > > True. I did make sure the queue was large enough above however it would > depend on the size of the job entries. > > > If you do that, I also don't see any reason to > > push the indexes instead of the jobs directly; basically instead of > > "chunkifying" and pre-determining the assignment of jobs to processes, > > have them all pull from one queue of jobs. > > I added this comment to the patch in master next: > > # Have to pass in willparse at fork time so all parsing processes have the unpickleable data > # then access it by index from the parse queue. > > > As if you don't do that, it can't rebuild the data as the cache objects > aren't pickleable :( Ah, OK. We must be relying on the multiprocess being implemented with fork() then to pass the data. That's reasonable and the indexes make sense (but we should still push them after creating the process to avoid the possibility of deadlock) > > Cheers, > > Richard
On Thu, 2025-07-03 at 08:50 -0600, Joshua Watt wrote: > On Thu, Jul 3, 2025 at 8:30 AM Richard Purdie > <richard.purdie@linuxfoundation.org> wrote: > > > > On Thu, 2025-07-03 at 08:27 -0600, Joshua Watt wrote: > > > On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via > > > lists.openembedded.org > > > <richard.purdie=linuxfoundation.org@lists.openembedded.org> > > > wrote: > > > > > > > > Curerntly, recipes to parse are split into equal groups and > > > > passed to > > > > each parse thread at the start of parsing. We can replace this > > > > with > > > > a queue and collect a new job as each parsing process becomes > > > > idle > > > > to better spread load in the case of slow parsing jobs. > > > > > > > > Some of the data we need has to be passed in at fork time since > > > > it > > > > can't be pickled, so the job to parse is only referenced as an > > > > index > > > > in that list. > > > > > > > > This should better spread load for slow to parse recipes such > > > > as those > > > > with many class extensions. > > > > > > > > Signed-off-by: Richard Purdie > > > > <richard.purdie@linuxfoundation.org> > > > > --- > > > > lib/bb/cooker.py | 29 ++++++++++++++++++----------- > > > > 1 file changed, 18 insertions(+), 11 deletions(-) > > > > > > > > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py > > > > index 91e3ee025ea..e88ad24cf61 100644 > > > > --- a/lib/bb/cooker.py > > > > +++ b/lib/bb/cooker.py > > > > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception): > > > > Exception.__init__(self, realexception, recipe) > > > > > > > > class Parser(multiprocessing.Process): > > > > - def __init__(self, jobs, results, quit, profile): > > > > + def __init__(self, jobs, jobid_queue, results, quit, > > > > profile): > > > > self.jobs = jobs > > > > + self.jobid_queue = jobid_queue > > > > self.results = results > > > > self.quit = quit > > > > multiprocessing.Process.__init__(self) > > > > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process): > > > > if self.quit.is_set(): > > > > break > > > > > > > > - job = None > > > > + jobid = None > > > > try: > > > > - job = self.jobs.pop() > > > > - except IndexError: > > > > + jobid = self.jobid_queue.get(True, 0.5) > > > > + except (ValueError, OSError): > > > > havejobs = False > > > > - if job: > > > > + > > > > + if jobid is not None: > > > > + job = self.jobs[jobid] > > > > result = self.parse(*job) > > > > # Clear the siggen cache after parsing to > > > > control memory usage, its huge > > > > bb.parse.siggen.postparsing_clean_cache() > > > > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process): > > > > except queue.Full: > > > > pending.append(result) > > > > finally: > > > > + self.jobs.close() > > > > self.results.close() > > > > self.results.join_thread() > > > > > > > > @@ -2134,13 +2138,13 @@ class CookerParser(object): > > > > > > > > self.bb_caches = > > > > bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, > > > > cooker.caches_array) > > > > self.fromcache = set() > > > > - self.willparse = set() > > > > + self.willparse = [] > > > > for mc in self.cooker.multiconfigs: > > > > for filename in self.mcfilelist[mc]: > > > > appends = > > > > self.cooker.collections[mc].get_file_appends(filename) > > > > layername = > > > > self.cooker.collections[mc].calc_bbfile_priority(filename)[2] > > > > if not self.bb_caches[mc].cacheValid(filename, > > > > appends): > > > > - self.willparse.add((mc, > > > > self.bb_caches[mc], filename, appends, layername)) > > > > + self.willparse.append((mc, > > > > self.bb_caches[mc], filename, appends, layername)) > > > > else: > > > > self.fromcache.add((mc, > > > > self.bb_caches[mc], filename, appends, layername)) > > > > > > > > @@ -2159,22 +2163,25 @@ class CookerParser(object): > > > > def start(self): > > > > self.results = self.load_cached() > > > > self.processes = [] > > > > + > > > > if self.toparse: > > > > bb.event.fire(bb.event.ParseStarted(self.toparse), > > > > self.cfgdata) > > > > > > > > + self.toparse_queue = > > > > multiprocessing.Queue(len(self.willparse)) > > > > self.parser_quit = multiprocessing.Event() > > > > self.result_queue = multiprocessing.Queue() > > > > > > > > - def chunkify(lst,n): > > > > - return [lst[i::n] for i in range(n)] > > > > - self.jobs = chunkify(list(self.willparse), > > > > self.num_processes) > > > > + for jobid in range(len(self.willparse)): > > > > + self.toparse_queue.put(jobid) > > > > > > It's generally not a good idea to assume you can push all the > > > items on > > > the queue before there are any consumers; if the queue fills, > > > this > > > will deadlock. It would be better to push the items to the queue > > > after > > > creating the processes. > > > > True. I did make sure the queue was large enough above however it > > would > > depend on the size of the job entries. > > > > > If you do that, I also don't see any reason to > > > push the indexes instead of the jobs directly; basically instead > > > of > > > "chunkifying" and pre-determining the assignment of jobs to > > > processes, > > > have them all pull from one queue of jobs. > > > > I added this comment to the patch in master next: > > > > # Have to pass in willparse at fork time so all parsing processes > > have the unpickleable data > > # then access it by index from the parse queue. > > > > > > As if you don't do that, it can't rebuild the data as the cache > > objects > > aren't pickleable :( > > Ah, OK. We must be relying on the multiprocess being implemented with > fork() then to pass the data. That's reasonable and the indexes make > sense (but we should still push them after creating the process to > avoid the possibility of deadlock) I changed it to queue after starting the parser processes however I think they exit early due to no work and it causes occasional build failures that way around so the code is going to need more extensive changes. Cheers, Richard
On Fri, 2025-07-04 at 22:42 +0100, Richard Purdie via lists.openembedded.org wrote: > On Thu, 2025-07-03 at 08:50 -0600, Joshua Watt wrote: > > On Thu, Jul 3, 2025 at 8:30 AM Richard Purdie > > <richard.purdie@linuxfoundation.org> wrote: > > > > > > On Thu, 2025-07-03 at 08:27 -0600, Joshua Watt wrote: > > > > On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via > > > > lists.openembedded.org > > > > <richard.purdie=linuxfoundation.org@lists.openembedded.org> > > > > wrote: > > > > > > > > > > Curerntly, recipes to parse are split into equal groups and > > > > > passed to > > > > > each parse thread at the start of parsing. We can replace this > > > > > with > > > > > a queue and collect a new job as each parsing process becomes > > > > > idle > > > > > to better spread load in the case of slow parsing jobs. > > > > > > > > > > Some of the data we need has to be passed in at fork time since > > > > > it > > > > > can't be pickled, so the job to parse is only referenced as an > > > > > index > > > > > in that list. > > > > > > > > > > This should better spread load for slow to parse recipes such > > > > > as those > > > > > with many class extensions. > > > > > > > > > > Signed-off-by: Richard Purdie > > > > > <richard.purdie@linuxfoundation.org> > > > > > --- > > > > > lib/bb/cooker.py | 29 ++++++++++++++++++----------- > > > > > 1 file changed, 18 insertions(+), 11 deletions(-) > > > > > > > > > > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py > > > > > index 91e3ee025ea..e88ad24cf61 100644 > > > > > --- a/lib/bb/cooker.py > > > > > +++ b/lib/bb/cooker.py > > > > > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception): > > > > > Exception.__init__(self, realexception, recipe) > > > > > > > > > > class Parser(multiprocessing.Process): > > > > > - def __init__(self, jobs, results, quit, profile): > > > > > + def __init__(self, jobs, jobid_queue, results, quit, > > > > > profile): > > > > > self.jobs = jobs > > > > > + self.jobid_queue = jobid_queue > > > > > self.results = results > > > > > self.quit = quit > > > > > multiprocessing.Process.__init__(self) > > > > > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process): > > > > > if self.quit.is_set(): > > > > > break > > > > > > > > > > - job = None > > > > > + jobid = None > > > > > try: > > > > > - job = self.jobs.pop() > > > > > - except IndexError: > > > > > + jobid = self.jobid_queue.get(True, 0.5) > > > > > + except (ValueError, OSError): > > > > > havejobs = False > > > > > - if job: > > > > > + > > > > > + if jobid is not None: > > > > > + job = self.jobs[jobid] > > > > > result = self.parse(*job) > > > > > # Clear the siggen cache after parsing to > > > > > control memory usage, its huge > > > > > bb.parse.siggen.postparsing_clean_cache() > > > > > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process): > > > > > except queue.Full: > > > > > pending.append(result) > > > > > finally: > > > > > + self.jobs.close() > > > > > self.results.close() > > > > > self.results.join_thread() > > > > > > > > > > @@ -2134,13 +2138,13 @@ class CookerParser(object): > > > > > > > > > > self.bb_caches = > > > > > bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, > > > > > cooker.caches_array) > > > > > self.fromcache = set() > > > > > - self.willparse = set() > > > > > + self.willparse = [] > > > > > for mc in self.cooker.multiconfigs: > > > > > for filename in self.mcfilelist[mc]: > > > > > appends = > > > > > self.cooker.collections[mc].get_file_appends(filename) > > > > > layername = > > > > > self.cooker.collections[mc].calc_bbfile_priority(filename)[2] > > > > > if not self.bb_caches[mc].cacheValid(filename, > > > > > appends): > > > > > - self.willparse.add((mc, > > > > > self.bb_caches[mc], filename, appends, layername)) > > > > > + self.willparse.append((mc, > > > > > self.bb_caches[mc], filename, appends, layername)) > > > > > else: > > > > > self.fromcache.add((mc, > > > > > self.bb_caches[mc], filename, appends, layername)) > > > > > > > > > > @@ -2159,22 +2163,25 @@ class CookerParser(object): > > > > > def start(self): > > > > > self.results = self.load_cached() > > > > > self.processes = [] > > > > > + > > > > > if self.toparse: > > > > > bb.event.fire(bb.event.ParseStarted(self.toparse), > > > > > self.cfgdata) > > > > > > > > > > + self.toparse_queue = > > > > > multiprocessing.Queue(len(self.willparse)) > > > > > self.parser_quit = multiprocessing.Event() > > > > > self.result_queue = multiprocessing.Queue() > > > > > > > > > > - def chunkify(lst,n): > > > > > - return [lst[i::n] for i in range(n)] > > > > > - self.jobs = chunkify(list(self.willparse), > > > > > self.num_processes) > > > > > + for jobid in range(len(self.willparse)): > > > > > + self.toparse_queue.put(jobid) > > > > > > > > It's generally not a good idea to assume you can push all the > > > > items on > > > > the queue before there are any consumers; if the queue fills, > > > > this > > > > will deadlock. It would be better to push the items to the queue > > > > after > > > > creating the processes. > > > > > > True. I did make sure the queue was large enough above however it > > > would > > > depend on the size of the job entries. > > > > > > > If you do that, I also don't see any reason to > > > > push the indexes instead of the jobs directly; basically instead > > > > of > > > > "chunkifying" and pre-determining the assignment of jobs to > > > > processes, > > > > have them all pull from one queue of jobs. > > > > > > I added this comment to the patch in master next: > > > > > > # Have to pass in willparse at fork time so all parsing processes > > > have the unpickleable data > > > # then access it by index from the parse queue. > > > > > > > > > As if you don't do that, it can't rebuild the data as the cache > > > objects > > > aren't pickleable :( > > > > Ah, OK. We must be relying on the multiprocess being implemented with > > fork() then to pass the data. That's reasonable and the indexes make > > sense (but we should still push them after creating the process to > > avoid the possibility of deadlock) > > I changed it to queue after starting the parser processes however I > think they exit early due to no work and it causes occasional build > failures that way around so the code is going to need more extensive > changes. I found that issue in the exception handling and have sent a v2 of the patch with that fixed. Cheers, Richard
diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py index 91e3ee025ea..e88ad24cf61 100644 --- a/lib/bb/cooker.py +++ b/lib/bb/cooker.py @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception): Exception.__init__(self, realexception, recipe) class Parser(multiprocessing.Process): - def __init__(self, jobs, results, quit, profile): + def __init__(self, jobs, jobid_queue, results, quit, profile): self.jobs = jobs + self.jobid_queue = jobid_queue self.results = results self.quit = quit multiprocessing.Process.__init__(self) @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process): if self.quit.is_set(): break - job = None + jobid = None try: - job = self.jobs.pop() - except IndexError: + jobid = self.jobid_queue.get(True, 0.5) + except (ValueError, OSError): havejobs = False - if job: + + if jobid is not None: + job = self.jobs[jobid] result = self.parse(*job) # Clear the siggen cache after parsing to control memory usage, its huge bb.parse.siggen.postparsing_clean_cache() @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process): except queue.Full: pending.append(result) finally: + self.jobs.close() self.results.close() self.results.join_thread() @@ -2134,13 +2138,13 @@ class CookerParser(object): self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array) self.fromcache = set() - self.willparse = set() + self.willparse = [] for mc in self.cooker.multiconfigs: for filename in self.mcfilelist[mc]: appends = self.cooker.collections[mc].get_file_appends(filename) layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2] if not self.bb_caches[mc].cacheValid(filename, appends): - self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername)) + self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername)) else: self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername)) @@ -2159,22 +2163,25 @@ class CookerParser(object): def start(self): self.results = self.load_cached() self.processes = [] + if self.toparse: bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) + self.toparse_queue = multiprocessing.Queue(len(self.willparse)) self.parser_quit = multiprocessing.Event() self.result_queue = multiprocessing.Queue() - def chunkify(lst,n): - return [lst[i::n] for i in range(n)] - self.jobs = chunkify(list(self.willparse), self.num_processes) + for jobid in range(len(self.willparse)): + self.toparse_queue.put(jobid) for i in range(0, self.num_processes): - parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile) + parser = Parser(self.willparse, self.toparse_queue, self.result_queue, self.parser_quit, self.cooker.configuration.profile) parser.start() self.process_names.append(parser.name) self.processes.append(parser) + self.toparse_queue.close() + self.results = itertools.chain(self.results, self.parse_generator()) def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"):
Curerntly, recipes to parse are split into equal groups and passed to each parse thread at the start of parsing. We can replace this with a queue and collect a new job as each parsing process becomes idle to better spread load in the case of slow parsing jobs. Some of the data we need has to be passed in at fork time since it can't be pickled, so the job to parse is only referenced as an index in that list. This should better spread load for slow to parse recipes such as those with many class extensions. Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org> --- lib/bb/cooker.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-)