diff mbox series

[v2] cooker: Use a queue to feed parsing jobs

Message ID 20250705062601.3875763-1-richard.purdie@linuxfoundation.org
State New
Headers show
Series [v2] cooker: Use a queue to feed parsing jobs | expand

Commit Message

Richard Purdie July 5, 2025, 6:26 a.m. UTC
Curerntly, recipes to parse are split into equal groups and passed to
each parse thread at the start of parsing. We can replace this with
a queue and collect a new job as each parsing process becomes idle
to better spread load in the case of slow parsing jobs.

Some of the data we need has to be passed in at fork time since it
can't be pickled, so the job to parse is only referenced as an index
in that list.

This should better spread load for slow to parse recipes such as those
with many class extensions.

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
 lib/bb/cooker.py | 32 ++++++++++++++++++++------------
 1 file changed, 20 insertions(+), 12 deletions(-)
diff mbox series

Patch

diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
index 91e3ee025ea..8b959cc942e 100644
--- a/lib/bb/cooker.py
+++ b/lib/bb/cooker.py
@@ -1998,8 +1998,9 @@  class ParsingFailure(Exception):
         Exception.__init__(self, realexception, recipe)
 
 class Parser(multiprocessing.Process):
-    def __init__(self, jobs, results, quit, profile):
+    def __init__(self, jobs, jobid_queue, results, quit, profile):
         self.jobs = jobs
+        self.jobid_queue = jobid_queue
         self.results = results
         self.quit = quit
         multiprocessing.Process.__init__(self)
@@ -2064,12 +2065,14 @@  class Parser(multiprocessing.Process):
                 if self.quit.is_set():
                     break
 
-                job = None
+                jobid = None
                 try:
-                    job = self.jobs.pop()
-                except IndexError:
+                    jobid = self.jobid_queue.get(True, 0.5)
+                except (ValueError, OSError, queue.Empty) as e:
                     havejobs = False
-                if job:
+
+                if jobid is not None:
+                    job = self.jobs[jobid]
                     result = self.parse(*job)
                     # Clear the siggen cache after parsing to control memory usage, its huge
                     bb.parse.siggen.postparsing_clean_cache()
@@ -2082,6 +2085,7 @@  class Parser(multiprocessing.Process):
                     except queue.Full:
                         pending.append(result)
         finally:
+            self.jobs.close()
             self.results.close()
             self.results.join_thread()
 
@@ -2134,13 +2138,13 @@  class CookerParser(object):
 
         self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array)
         self.fromcache = set()
-        self.willparse = set()
+        self.willparse = []
         for mc in self.cooker.multiconfigs:
             for filename in self.mcfilelist[mc]:
                 appends = self.cooker.collections[mc].get_file_appends(filename)
                 layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
                 if not self.bb_caches[mc].cacheValid(filename, appends):
-                    self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername))
+                    self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername))
                 else:
                     self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername))
 
@@ -2159,22 +2163,26 @@  class CookerParser(object):
     def start(self):
         self.results = self.load_cached()
         self.processes = []
+
         if self.toparse:
             bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
 
+            self.toparse_queue = multiprocessing.Queue(len(self.willparse))
             self.parser_quit = multiprocessing.Event()
             self.result_queue = multiprocessing.Queue()
 
-            def chunkify(lst,n):
-                return [lst[i::n] for i in range(n)]
-            self.jobs = chunkify(list(self.willparse), self.num_processes)
-
+            # Have to pass in willparse at fork time so all parsing processes have the unpickleable data
+            # then access it by index from the parse queue.
             for i in range(0, self.num_processes):
-                parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile)
+                parser = Parser(self.willparse, self.toparse_queue, self.result_queue, self.parser_quit, self.cooker.configuration.profile)
                 parser.start()
                 self.process_names.append(parser.name)
                 self.processes.append(parser)
 
+            for jobid in range(len(self.willparse)):
+                self.toparse_queue.put(jobid)
+            self.toparse_queue.close()
+
             self.results = itertools.chain(self.results, self.parse_generator())
 
     def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"):