diff mbox series

[bitbake-devel] cooker: Use shared counter for processing parser jobs

Message ID 20250708154222.1479350-1-JPEWhacker@gmail.com
State New
Headers show
Series [bitbake-devel] cooker: Use shared counter for processing parser jobs | expand

Commit Message

Joshua Watt July 8, 2025, 3:42 p.m. UTC
Instead of pre-partitioning which jobs will go to which parser
processes, pass the list of all jobs to all the parser processes
(efficiently via fork()), then used a shared counter of the next index
in the list that needs to be processed. This allows the parser processes
to run independently of needing to be feed by the parent process, and
load balances them much better.

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 bitbake/lib/bb/cooker.py | 30 ++++++++++++++++++------------
 1 file changed, 18 insertions(+), 12 deletions(-)
diff mbox series

Patch

diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
index 2bb80e330d3..dc131939ed0 100644
--- a/bitbake/lib/bb/cooker.py
+++ b/bitbake/lib/bb/cooker.py
@@ -26,6 +26,7 @@  import json
 import pickle
 import codecs
 import hashserv
+import ctypes
 
 logger      = logging.getLogger("BitBake")
 collectlog  = logging.getLogger("BitBake.Collection")
@@ -1998,8 +1999,9 @@  class ParsingFailure(Exception):
         Exception.__init__(self, realexception, recipe)
 
 class Parser(multiprocessing.Process):
-    def __init__(self, jobs, results, quit, profile):
+    def __init__(self, jobs, next_job_id, results, quit, profile):
         self.jobs = jobs
+        self.next_job_id = next_job_id
         self.results = results
         self.quit = quit
         multiprocessing.Process.__init__(self)
@@ -2065,10 +2067,14 @@  class Parser(multiprocessing.Process):
                     break
 
                 job = None
-                try:
-                    job = self.jobs.pop()
-                except IndexError:
-                    havejobs = False
+                if havejobs:
+                    with self.next_job_id.get_lock():
+                        if self.next_job_id.value < len(self.jobs):
+                            job = self.jobs[self.next_job_id.value]
+                            self.next_job_id.value += 1
+                        else:
+                            havejobs = False
+
                 if job:
                     result = self.parse(*job)
                     # Clear the siggen cache after parsing to control memory usage, its huge
@@ -2134,13 +2140,13 @@  class CookerParser(object):
 
         self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array)
         self.fromcache = set()
-        self.willparse = set()
+        self.willparse = []
         for mc in self.cooker.multiconfigs:
             for filename in self.mcfilelist[mc]:
                 appends = self.cooker.collections[mc].get_file_appends(filename)
                 layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
                 if not self.bb_caches[mc].cacheValid(filename, appends):
-                    self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername))
+                    self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername))
                 else:
                     self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername))
 
@@ -2159,18 +2165,18 @@  class CookerParser(object):
     def start(self):
         self.results = self.load_cached()
         self.processes = []
+
         if self.toparse:
             bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
 
+            next_job_id = multiprocessing.Value(ctypes.c_int, 0)
             self.parser_quit = multiprocessing.Event()
             self.result_queue = multiprocessing.Queue()
 
-            def chunkify(lst,n):
-                return [lst[i::n] for i in range(n)]
-            self.jobs = chunkify(list(self.willparse), self.num_processes)
-
+            # Have to pass in willparse at fork time so all parsing processes have the unpickleable data
+            # then access it by index from the parse queue.
             for i in range(0, self.num_processes):
-                parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile)
+                parser = Parser(self.willparse, next_job_id, self.result_queue, self.parser_quit, self.cooker.configuration.profile)
                 parser.start()
                 self.process_names.append(parser.name)
                 self.processes.append(parser)