@@ -1998,8 +1998,9 @@ class ParsingFailure(Exception):
Exception.__init__(self, realexception, recipe)
class Parser(multiprocessing.Process):
- def __init__(self, jobs, results, quit, profile):
+ def __init__(self, jobs, jobid_queue, results, quit, profile):
self.jobs = jobs
+ self.jobid_queue = jobid_queue
self.results = results
self.quit = quit
multiprocessing.Process.__init__(self)
@@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process):
if self.quit.is_set():
break
- job = None
+ jobid = None
try:
- job = self.jobs.pop()
- except IndexError:
+ jobid = self.jobid_queue.get(True, 0.5)
+ except (ValueError, OSError, queue.Empty) as e:
havejobs = False
- if job:
+
+ if jobid is not None:
+ job = self.jobs[jobid]
result = self.parse(*job)
# Clear the siggen cache after parsing to control memory usage, its huge
bb.parse.siggen.postparsing_clean_cache()
@@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process):
except queue.Full:
pending.append(result)
finally:
+ self.jobs.close()
self.results.close()
self.results.join_thread()
@@ -2134,13 +2138,13 @@ class CookerParser(object):
self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array)
self.fromcache = set()
- self.willparse = set()
+ self.willparse = []
for mc in self.cooker.multiconfigs:
for filename in self.mcfilelist[mc]:
appends = self.cooker.collections[mc].get_file_appends(filename)
layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
if not self.bb_caches[mc].cacheValid(filename, appends):
- self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername))
+ self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername))
else:
self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername))
@@ -2159,22 +2163,26 @@ class CookerParser(object):
def start(self):
self.results = self.load_cached()
self.processes = []
+
if self.toparse:
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
+ self.toparse_queue = multiprocessing.Queue(len(self.willparse))
self.parser_quit = multiprocessing.Event()
self.result_queue = multiprocessing.Queue()
- def chunkify(lst,n):
- return [lst[i::n] for i in range(n)]
- self.jobs = chunkify(list(self.willparse), self.num_processes)
-
+ # Have to pass in willparse at fork time so all parsing processes have the unpickleable data
+ # then access it by index from the parse queue.
for i in range(0, self.num_processes):
- parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile)
+ parser = Parser(self.willparse, self.toparse_queue, self.result_queue, self.parser_quit, self.cooker.configuration.profile)
parser.start()
self.process_names.append(parser.name)
self.processes.append(parser)
+ for jobid in range(len(self.willparse)):
+ self.toparse_queue.put(jobid)
+ self.toparse_queue.close()
+
self.results = itertools.chain(self.results, self.parse_generator())
def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"):
Curerntly, recipes to parse are split into equal groups and passed to each parse thread at the start of parsing. We can replace this with a queue and collect a new job as each parsing process becomes idle to better spread load in the case of slow parsing jobs. Some of the data we need has to be passed in at fork time since it can't be pickled, so the job to parse is only referenced as an index in that list. This should better spread load for slow to parse recipes such as those with many class extensions. Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org> --- lib/bb/cooker.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-)