diff mbox series

[2/2] cooker: Use a queue to feed parsing jobs

Message ID 20250702222437.3733042-2-richard.purdie@linuxfoundation.org
State New
Headers show
Series [1/2] cooker: Try and avoid parseing hangs | expand

Commit Message

Richard Purdie July 2, 2025, 10:24 p.m. UTC
Curerntly, recipes to parse are split into equal groups and passed to
each parse thread at the start of parsing. We can replace this with
a queue and collect a new job as each parsing process becomes idle
to better spread load in the case of slow parsing jobs.

Some of the data we need has to be passed in at fork time since it
can't be pickled, so the job to parse is only referenced as an index
in that list.

This should better spread load for slow to parse recipes such as those
with many class extensions.

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
 lib/bb/cooker.py | 29 ++++++++++++++++++-----------
 1 file changed, 18 insertions(+), 11 deletions(-)

Comments

Joshua Watt July 3, 2025, 2:27 p.m. UTC | #1
On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via
lists.openembedded.org
<richard.purdie=linuxfoundation.org@lists.openembedded.org> wrote:
>
> Curerntly, recipes to parse are split into equal groups and passed to
> each parse thread at the start of parsing. We can replace this with
> a queue and collect a new job as each parsing process becomes idle
> to better spread load in the case of slow parsing jobs.
>
> Some of the data we need has to be passed in at fork time since it
> can't be pickled, so the job to parse is only referenced as an index
> in that list.
>
> This should better spread load for slow to parse recipes such as those
> with many class extensions.
>
> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
> ---
>  lib/bb/cooker.py | 29 ++++++++++++++++++-----------
>  1 file changed, 18 insertions(+), 11 deletions(-)
>
> diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
> index 91e3ee025ea..e88ad24cf61 100644
> --- a/lib/bb/cooker.py
> +++ b/lib/bb/cooker.py
> @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception):
>          Exception.__init__(self, realexception, recipe)
>
>  class Parser(multiprocessing.Process):
> -    def __init__(self, jobs, results, quit, profile):
> +    def __init__(self, jobs, jobid_queue, results, quit, profile):
>          self.jobs = jobs
> +        self.jobid_queue = jobid_queue
>          self.results = results
>          self.quit = quit
>          multiprocessing.Process.__init__(self)
> @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process):
>                  if self.quit.is_set():
>                      break
>
> -                job = None
> +                jobid = None
>                  try:
> -                    job = self.jobs.pop()
> -                except IndexError:
> +                    jobid = self.jobid_queue.get(True, 0.5)
> +                except (ValueError, OSError):
>                      havejobs = False
> -                if job:
> +
> +                if jobid is not None:
> +                    job = self.jobs[jobid]
>                      result = self.parse(*job)
>                      # Clear the siggen cache after parsing to control memory usage, its huge
>                      bb.parse.siggen.postparsing_clean_cache()
> @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process):
>                      except queue.Full:
>                          pending.append(result)
>          finally:
> +            self.jobs.close()
>              self.results.close()
>              self.results.join_thread()
>
> @@ -2134,13 +2138,13 @@ class CookerParser(object):
>
>          self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array)
>          self.fromcache = set()
> -        self.willparse = set()
> +        self.willparse = []
>          for mc in self.cooker.multiconfigs:
>              for filename in self.mcfilelist[mc]:
>                  appends = self.cooker.collections[mc].get_file_appends(filename)
>                  layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
>                  if not self.bb_caches[mc].cacheValid(filename, appends):
> -                    self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername))
> +                    self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername))
>                  else:
>                      self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername))
>
> @@ -2159,22 +2163,25 @@ class CookerParser(object):
>      def start(self):
>          self.results = self.load_cached()
>          self.processes = []
> +
>          if self.toparse:
>              bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
>
> +            self.toparse_queue = multiprocessing.Queue(len(self.willparse))
>              self.parser_quit = multiprocessing.Event()
>              self.result_queue = multiprocessing.Queue()
>
> -            def chunkify(lst,n):
> -                return [lst[i::n] for i in range(n)]
> -            self.jobs = chunkify(list(self.willparse), self.num_processes)
> +            for jobid in range(len(self.willparse)):
> +                self.toparse_queue.put(jobid)

It's generally not a good idea to assume you can push all the items on
the queue before there are any consumers; if the queue fills, this
will deadlock. It would be better to push the items to the queue after
creating the processes. If you do that, I also don't see any reason to
push the indexes instead of the jobs directly; basically instead of
"chunkifying" and pre-determining the assignment of jobs to processes,
have them all pull from one queue of jobs.

>
>              for i in range(0, self.num_processes):
> -                parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile)
> +                parser = Parser(self.willparse, self.toparse_queue, self.result_queue, self.parser_quit, self.cooker.configuration.profile)
>                  parser.start()
>                  self.process_names.append(parser.name)
>                  self.processes.append(parser)
>
> +            self.toparse_queue.close()
> +
>              self.results = itertools.chain(self.results, self.parse_generator())
>
>      def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"):
>
> -=-=-=-=-=-=-=-=-=-=-=-
> Links: You receive all messages sent to this group.
> View/Reply Online (#17738): https://lists.openembedded.org/g/bitbake-devel/message/17738
> Mute This Topic: https://lists.openembedded.org/mt/113957083/3616693
> Group Owner: bitbake-devel+owner@lists.openembedded.org
> Unsubscribe: https://lists.openembedded.org/g/bitbake-devel/unsub [JPEWhacker@gmail.com]
> -=-=-=-=-=-=-=-=-=-=-=-
>
Richard Purdie July 3, 2025, 2:30 p.m. UTC | #2
On Thu, 2025-07-03 at 08:27 -0600, Joshua Watt wrote:
> On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via
> lists.openembedded.org
> <richard.purdie=linuxfoundation.org@lists.openembedded.org> wrote:
> > 
> > Curerntly, recipes to parse are split into equal groups and passed to
> > each parse thread at the start of parsing. We can replace this with
> > a queue and collect a new job as each parsing process becomes idle
> > to better spread load in the case of slow parsing jobs.
> > 
> > Some of the data we need has to be passed in at fork time since it
> > can't be pickled, so the job to parse is only referenced as an index
> > in that list.
> > 
> > This should better spread load for slow to parse recipes such as those
> > with many class extensions.
> > 
> > Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
> > ---
> >  lib/bb/cooker.py | 29 ++++++++++++++++++-----------
> >  1 file changed, 18 insertions(+), 11 deletions(-)
> > 
> > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
> > index 91e3ee025ea..e88ad24cf61 100644
> > --- a/lib/bb/cooker.py
> > +++ b/lib/bb/cooker.py
> > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception):
> >          Exception.__init__(self, realexception, recipe)
> > 
> >  class Parser(multiprocessing.Process):
> > -    def __init__(self, jobs, results, quit, profile):
> > +    def __init__(self, jobs, jobid_queue, results, quit, profile):
> >          self.jobs = jobs
> > +        self.jobid_queue = jobid_queue
> >          self.results = results
> >          self.quit = quit
> >          multiprocessing.Process.__init__(self)
> > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process):
> >                  if self.quit.is_set():
> >                      break
> > 
> > -                job = None
> > +                jobid = None
> >                  try:
> > -                    job = self.jobs.pop()
> > -                except IndexError:
> > +                    jobid = self.jobid_queue.get(True, 0.5)
> > +                except (ValueError, OSError):
> >                      havejobs = False
> > -                if job:
> > +
> > +                if jobid is not None:
> > +                    job = self.jobs[jobid]
> >                      result = self.parse(*job)
> >                      # Clear the siggen cache after parsing to control memory usage, its huge
> >                      bb.parse.siggen.postparsing_clean_cache()
> > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process):
> >                      except queue.Full:
> >                          pending.append(result)
> >          finally:
> > +            self.jobs.close()
> >              self.results.close()
> >              self.results.join_thread()
> > 
> > @@ -2134,13 +2138,13 @@ class CookerParser(object):
> > 
> >          self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array)
> >          self.fromcache = set()
> > -        self.willparse = set()
> > +        self.willparse = []
> >          for mc in self.cooker.multiconfigs:
> >              for filename in self.mcfilelist[mc]:
> >                  appends = self.cooker.collections[mc].get_file_appends(filename)
> >                  layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
> >                  if not self.bb_caches[mc].cacheValid(filename, appends):
> > -                    self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername))
> > +                    self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername))
> >                  else:
> >                      self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername))
> > 
> > @@ -2159,22 +2163,25 @@ class CookerParser(object):
> >      def start(self):
> >          self.results = self.load_cached()
> >          self.processes = []
> > +
> >          if self.toparse:
> >              bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
> > 
> > +            self.toparse_queue = multiprocessing.Queue(len(self.willparse))
> >              self.parser_quit = multiprocessing.Event()
> >              self.result_queue = multiprocessing.Queue()
> > 
> > -            def chunkify(lst,n):
> > -                return [lst[i::n] for i in range(n)]
> > -            self.jobs = chunkify(list(self.willparse), self.num_processes)
> > +            for jobid in range(len(self.willparse)):
> > +                self.toparse_queue.put(jobid)
> 
> It's generally not a good idea to assume you can push all the items on
> the queue before there are any consumers; if the queue fills, this
> will deadlock. It would be better to push the items to the queue after
> creating the processes.

True. I did make sure the queue was large enough above however it would
depend on the size of the job entries.

>  If you do that, I also don't see any reason to
> push the indexes instead of the jobs directly; basically instead of
> "chunkifying" and pre-determining the assignment of jobs to processes,
> have them all pull from one queue of jobs.

I added this comment to the patch in master next:
 
# Have to pass in willparse at fork time so all parsing processes have the unpickleable data
# then access it by index from the parse queue.


As if you don't do that, it can't rebuild the data as the cache objects
aren't pickleable :(

Cheers,

Richard
Joshua Watt July 3, 2025, 2:50 p.m. UTC | #3
On Thu, Jul 3, 2025 at 8:30 AM Richard Purdie
<richard.purdie@linuxfoundation.org> wrote:
>
> On Thu, 2025-07-03 at 08:27 -0600, Joshua Watt wrote:
> > On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via
> > lists.openembedded.org
> > <richard.purdie=linuxfoundation.org@lists.openembedded.org> wrote:
> > >
> > > Curerntly, recipes to parse are split into equal groups and passed to
> > > each parse thread at the start of parsing. We can replace this with
> > > a queue and collect a new job as each parsing process becomes idle
> > > to better spread load in the case of slow parsing jobs.
> > >
> > > Some of the data we need has to be passed in at fork time since it
> > > can't be pickled, so the job to parse is only referenced as an index
> > > in that list.
> > >
> > > This should better spread load for slow to parse recipes such as those
> > > with many class extensions.
> > >
> > > Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
> > > ---
> > >  lib/bb/cooker.py | 29 ++++++++++++++++++-----------
> > >  1 file changed, 18 insertions(+), 11 deletions(-)
> > >
> > > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
> > > index 91e3ee025ea..e88ad24cf61 100644
> > > --- a/lib/bb/cooker.py
> > > +++ b/lib/bb/cooker.py
> > > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception):
> > >          Exception.__init__(self, realexception, recipe)
> > >
> > >  class Parser(multiprocessing.Process):
> > > -    def __init__(self, jobs, results, quit, profile):
> > > +    def __init__(self, jobs, jobid_queue, results, quit, profile):
> > >          self.jobs = jobs
> > > +        self.jobid_queue = jobid_queue
> > >          self.results = results
> > >          self.quit = quit
> > >          multiprocessing.Process.__init__(self)
> > > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process):
> > >                  if self.quit.is_set():
> > >                      break
> > >
> > > -                job = None
> > > +                jobid = None
> > >                  try:
> > > -                    job = self.jobs.pop()
> > > -                except IndexError:
> > > +                    jobid = self.jobid_queue.get(True, 0.5)
> > > +                except (ValueError, OSError):
> > >                      havejobs = False
> > > -                if job:
> > > +
> > > +                if jobid is not None:
> > > +                    job = self.jobs[jobid]
> > >                      result = self.parse(*job)
> > >                      # Clear the siggen cache after parsing to control memory usage, its huge
> > >                      bb.parse.siggen.postparsing_clean_cache()
> > > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process):
> > >                      except queue.Full:
> > >                          pending.append(result)
> > >          finally:
> > > +            self.jobs.close()
> > >              self.results.close()
> > >              self.results.join_thread()
> > >
> > > @@ -2134,13 +2138,13 @@ class CookerParser(object):
> > >
> > >          self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array)
> > >          self.fromcache = set()
> > > -        self.willparse = set()
> > > +        self.willparse = []
> > >          for mc in self.cooker.multiconfigs:
> > >              for filename in self.mcfilelist[mc]:
> > >                  appends = self.cooker.collections[mc].get_file_appends(filename)
> > >                  layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
> > >                  if not self.bb_caches[mc].cacheValid(filename, appends):
> > > -                    self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername))
> > > +                    self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername))
> > >                  else:
> > >                      self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername))
> > >
> > > @@ -2159,22 +2163,25 @@ class CookerParser(object):
> > >      def start(self):
> > >          self.results = self.load_cached()
> > >          self.processes = []
> > > +
> > >          if self.toparse:
> > >              bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
> > >
> > > +            self.toparse_queue = multiprocessing.Queue(len(self.willparse))
> > >              self.parser_quit = multiprocessing.Event()
> > >              self.result_queue = multiprocessing.Queue()
> > >
> > > -            def chunkify(lst,n):
> > > -                return [lst[i::n] for i in range(n)]
> > > -            self.jobs = chunkify(list(self.willparse), self.num_processes)
> > > +            for jobid in range(len(self.willparse)):
> > > +                self.toparse_queue.put(jobid)
> >
> > It's generally not a good idea to assume you can push all the items on
> > the queue before there are any consumers; if the queue fills, this
> > will deadlock. It would be better to push the items to the queue after
> > creating the processes.
>
> True. I did make sure the queue was large enough above however it would
> depend on the size of the job entries.
>
> >  If you do that, I also don't see any reason to
> > push the indexes instead of the jobs directly; basically instead of
> > "chunkifying" and pre-determining the assignment of jobs to processes,
> > have them all pull from one queue of jobs.
>
> I added this comment to the patch in master next:
>
> # Have to pass in willparse at fork time so all parsing processes have the unpickleable data
> # then access it by index from the parse queue.
>
>
> As if you don't do that, it can't rebuild the data as the cache objects
> aren't pickleable :(

Ah, OK. We must be relying on the multiprocess being implemented with
fork() then to pass the data. That's reasonable and the indexes make
sense (but we should still push them after creating the process to
avoid the possibility of deadlock)

>
> Cheers,
>
> Richard
Richard Purdie July 4, 2025, 9:42 p.m. UTC | #4
On Thu, 2025-07-03 at 08:50 -0600, Joshua Watt wrote:
> On Thu, Jul 3, 2025 at 8:30 AM Richard Purdie
> <richard.purdie@linuxfoundation.org> wrote:
> > 
> > On Thu, 2025-07-03 at 08:27 -0600, Joshua Watt wrote:
> > > On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via
> > > lists.openembedded.org
> > > <richard.purdie=linuxfoundation.org@lists.openembedded.org>
> > > wrote:
> > > > 
> > > > Curerntly, recipes to parse are split into equal groups and
> > > > passed to
> > > > each parse thread at the start of parsing. We can replace this
> > > > with
> > > > a queue and collect a new job as each parsing process becomes
> > > > idle
> > > > to better spread load in the case of slow parsing jobs.
> > > > 
> > > > Some of the data we need has to be passed in at fork time since
> > > > it
> > > > can't be pickled, so the job to parse is only referenced as an
> > > > index
> > > > in that list.
> > > > 
> > > > This should better spread load for slow to parse recipes such
> > > > as those
> > > > with many class extensions.
> > > > 
> > > > Signed-off-by: Richard Purdie
> > > > <richard.purdie@linuxfoundation.org>
> > > > ---
> > > >  lib/bb/cooker.py | 29 ++++++++++++++++++-----------
> > > >  1 file changed, 18 insertions(+), 11 deletions(-)
> > > > 
> > > > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
> > > > index 91e3ee025ea..e88ad24cf61 100644
> > > > --- a/lib/bb/cooker.py
> > > > +++ b/lib/bb/cooker.py
> > > > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception):
> > > >          Exception.__init__(self, realexception, recipe)
> > > > 
> > > >  class Parser(multiprocessing.Process):
> > > > -    def __init__(self, jobs, results, quit, profile):
> > > > +    def __init__(self, jobs, jobid_queue, results, quit,
> > > > profile):
> > > >          self.jobs = jobs
> > > > +        self.jobid_queue = jobid_queue
> > > >          self.results = results
> > > >          self.quit = quit
> > > >          multiprocessing.Process.__init__(self)
> > > > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process):
> > > >                  if self.quit.is_set():
> > > >                      break
> > > > 
> > > > -                job = None
> > > > +                jobid = None
> > > >                  try:
> > > > -                    job = self.jobs.pop()
> > > > -                except IndexError:
> > > > +                    jobid = self.jobid_queue.get(True, 0.5)
> > > > +                except (ValueError, OSError):
> > > >                      havejobs = False
> > > > -                if job:
> > > > +
> > > > +                if jobid is not None:
> > > > +                    job = self.jobs[jobid]
> > > >                      result = self.parse(*job)
> > > >                      # Clear the siggen cache after parsing to
> > > > control memory usage, its huge
> > > >                      bb.parse.siggen.postparsing_clean_cache()
> > > > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process):
> > > >                      except queue.Full:
> > > >                          pending.append(result)
> > > >          finally:
> > > > +            self.jobs.close()
> > > >              self.results.close()
> > > >              self.results.join_thread()
> > > > 
> > > > @@ -2134,13 +2138,13 @@ class CookerParser(object):
> > > > 
> > > >          self.bb_caches =
> > > > bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash,
> > > > cooker.caches_array)
> > > >          self.fromcache = set()
> > > > -        self.willparse = set()
> > > > +        self.willparse = []
> > > >          for mc in self.cooker.multiconfigs:
> > > >              for filename in self.mcfilelist[mc]:
> > > >                  appends =
> > > > self.cooker.collections[mc].get_file_appends(filename)
> > > >                  layername =
> > > > self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
> > > >                  if not self.bb_caches[mc].cacheValid(filename,
> > > > appends):
> > > > -                    self.willparse.add((mc,
> > > > self.bb_caches[mc], filename, appends, layername))
> > > > +                    self.willparse.append((mc,
> > > > self.bb_caches[mc], filename, appends, layername))
> > > >                  else:
> > > >                      self.fromcache.add((mc,
> > > > self.bb_caches[mc], filename, appends, layername))
> > > > 
> > > > @@ -2159,22 +2163,25 @@ class CookerParser(object):
> > > >      def start(self):
> > > >          self.results = self.load_cached()
> > > >          self.processes = []
> > > > +
> > > >          if self.toparse:
> > > >              bb.event.fire(bb.event.ParseStarted(self.toparse),
> > > > self.cfgdata)
> > > > 
> > > > +            self.toparse_queue =
> > > > multiprocessing.Queue(len(self.willparse))
> > > >              self.parser_quit = multiprocessing.Event()
> > > >              self.result_queue = multiprocessing.Queue()
> > > > 
> > > > -            def chunkify(lst,n):
> > > > -                return [lst[i::n] for i in range(n)]
> > > > -            self.jobs = chunkify(list(self.willparse),
> > > > self.num_processes)
> > > > +            for jobid in range(len(self.willparse)):
> > > > +                self.toparse_queue.put(jobid)
> > > 
> > > It's generally not a good idea to assume you can push all the
> > > items on
> > > the queue before there are any consumers; if the queue fills,
> > > this
> > > will deadlock. It would be better to push the items to the queue
> > > after
> > > creating the processes.
> > 
> > True. I did make sure the queue was large enough above however it
> > would
> > depend on the size of the job entries.
> > 
> > >  If you do that, I also don't see any reason to
> > > push the indexes instead of the jobs directly; basically instead
> > > of
> > > "chunkifying" and pre-determining the assignment of jobs to
> > > processes,
> > > have them all pull from one queue of jobs.
> > 
> > I added this comment to the patch in master next:
> > 
> > # Have to pass in willparse at fork time so all parsing processes
> > have the unpickleable data
> > # then access it by index from the parse queue.
> > 
> > 
> > As if you don't do that, it can't rebuild the data as the cache
> > objects
> > aren't pickleable :(
> 
> Ah, OK. We must be relying on the multiprocess being implemented with
> fork() then to pass the data. That's reasonable and the indexes make
> sense (but we should still push them after creating the process to
> avoid the possibility of deadlock)

I changed it to queue after starting the parser processes however I
think they exit early due to no work and it causes occasional build
failures that way around so the code is going to need more extensive
changes.

Cheers,

Richard
Richard Purdie July 5, 2025, 6:27 a.m. UTC | #5
On Fri, 2025-07-04 at 22:42 +0100, Richard Purdie via lists.openembedded.org wrote:
> On Thu, 2025-07-03 at 08:50 -0600, Joshua Watt wrote:
> > On Thu, Jul 3, 2025 at 8:30 AM Richard Purdie
> > <richard.purdie@linuxfoundation.org> wrote:
> > > 
> > > On Thu, 2025-07-03 at 08:27 -0600, Joshua Watt wrote:
> > > > On Wed, Jul 2, 2025 at 4:24 PM Richard Purdie via
> > > > lists.openembedded.org
> > > > <richard.purdie=linuxfoundation.org@lists.openembedded.org>
> > > > wrote:
> > > > > 
> > > > > Curerntly, recipes to parse are split into equal groups and
> > > > > passed to
> > > > > each parse thread at the start of parsing. We can replace this
> > > > > with
> > > > > a queue and collect a new job as each parsing process becomes
> > > > > idle
> > > > > to better spread load in the case of slow parsing jobs.
> > > > > 
> > > > > Some of the data we need has to be passed in at fork time since
> > > > > it
> > > > > can't be pickled, so the job to parse is only referenced as an
> > > > > index
> > > > > in that list.
> > > > > 
> > > > > This should better spread load for slow to parse recipes such
> > > > > as those
> > > > > with many class extensions.
> > > > > 
> > > > > Signed-off-by: Richard Purdie
> > > > > <richard.purdie@linuxfoundation.org>
> > > > > ---
> > > > >  lib/bb/cooker.py | 29 ++++++++++++++++++-----------
> > > > >  1 file changed, 18 insertions(+), 11 deletions(-)
> > > > > 
> > > > > diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
> > > > > index 91e3ee025ea..e88ad24cf61 100644
> > > > > --- a/lib/bb/cooker.py
> > > > > +++ b/lib/bb/cooker.py
> > > > > @@ -1998,8 +1998,9 @@ class ParsingFailure(Exception):
> > > > >          Exception.__init__(self, realexception, recipe)
> > > > > 
> > > > >  class Parser(multiprocessing.Process):
> > > > > -    def __init__(self, jobs, results, quit, profile):
> > > > > +    def __init__(self, jobs, jobid_queue, results, quit,
> > > > > profile):
> > > > >          self.jobs = jobs
> > > > > +        self.jobid_queue = jobid_queue
> > > > >          self.results = results
> > > > >          self.quit = quit
> > > > >          multiprocessing.Process.__init__(self)
> > > > > @@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process):
> > > > >                  if self.quit.is_set():
> > > > >                      break
> > > > > 
> > > > > -                job = None
> > > > > +                jobid = None
> > > > >                  try:
> > > > > -                    job = self.jobs.pop()
> > > > > -                except IndexError:
> > > > > +                    jobid = self.jobid_queue.get(True, 0.5)
> > > > > +                except (ValueError, OSError):
> > > > >                      havejobs = False
> > > > > -                if job:
> > > > > +
> > > > > +                if jobid is not None:
> > > > > +                    job = self.jobs[jobid]
> > > > >                      result = self.parse(*job)
> > > > >                      # Clear the siggen cache after parsing to
> > > > > control memory usage, its huge
> > > > >                      bb.parse.siggen.postparsing_clean_cache()
> > > > > @@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process):
> > > > >                      except queue.Full:
> > > > >                          pending.append(result)
> > > > >          finally:
> > > > > +            self.jobs.close()
> > > > >              self.results.close()
> > > > >              self.results.join_thread()
> > > > > 
> > > > > @@ -2134,13 +2138,13 @@ class CookerParser(object):
> > > > > 
> > > > >          self.bb_caches =
> > > > > bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash,
> > > > > cooker.caches_array)
> > > > >          self.fromcache = set()
> > > > > -        self.willparse = set()
> > > > > +        self.willparse = []
> > > > >          for mc in self.cooker.multiconfigs:
> > > > >              for filename in self.mcfilelist[mc]:
> > > > >                  appends =
> > > > > self.cooker.collections[mc].get_file_appends(filename)
> > > > >                  layername =
> > > > > self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
> > > > >                  if not self.bb_caches[mc].cacheValid(filename,
> > > > > appends):
> > > > > -                    self.willparse.add((mc,
> > > > > self.bb_caches[mc], filename, appends, layername))
> > > > > +                    self.willparse.append((mc,
> > > > > self.bb_caches[mc], filename, appends, layername))
> > > > >                  else:
> > > > >                      self.fromcache.add((mc,
> > > > > self.bb_caches[mc], filename, appends, layername))
> > > > > 
> > > > > @@ -2159,22 +2163,25 @@ class CookerParser(object):
> > > > >      def start(self):
> > > > >          self.results = self.load_cached()
> > > > >          self.processes = []
> > > > > +
> > > > >          if self.toparse:
> > > > >              bb.event.fire(bb.event.ParseStarted(self.toparse),
> > > > > self.cfgdata)
> > > > > 
> > > > > +            self.toparse_queue =
> > > > > multiprocessing.Queue(len(self.willparse))
> > > > >              self.parser_quit = multiprocessing.Event()
> > > > >              self.result_queue = multiprocessing.Queue()
> > > > > 
> > > > > -            def chunkify(lst,n):
> > > > > -                return [lst[i::n] for i in range(n)]
> > > > > -            self.jobs = chunkify(list(self.willparse),
> > > > > self.num_processes)
> > > > > +            for jobid in range(len(self.willparse)):
> > > > > +                self.toparse_queue.put(jobid)
> > > > 
> > > > It's generally not a good idea to assume you can push all the
> > > > items on
> > > > the queue before there are any consumers; if the queue fills,
> > > > this
> > > > will deadlock. It would be better to push the items to the queue
> > > > after
> > > > creating the processes.
> > > 
> > > True. I did make sure the queue was large enough above however it
> > > would
> > > depend on the size of the job entries.
> > > 
> > > >  If you do that, I also don't see any reason to
> > > > push the indexes instead of the jobs directly; basically instead
> > > > of
> > > > "chunkifying" and pre-determining the assignment of jobs to
> > > > processes,
> > > > have them all pull from one queue of jobs.
> > > 
> > > I added this comment to the patch in master next:
> > > 
> > > # Have to pass in willparse at fork time so all parsing processes
> > > have the unpickleable data
> > > # then access it by index from the parse queue.
> > > 
> > > 
> > > As if you don't do that, it can't rebuild the data as the cache
> > > objects
> > > aren't pickleable :(
> > 
> > Ah, OK. We must be relying on the multiprocess being implemented with
> > fork() then to pass the data. That's reasonable and the indexes make
> > sense (but we should still push them after creating the process to
> > avoid the possibility of deadlock)
> 
> I changed it to queue after starting the parser processes however I
> think they exit early due to no work and it causes occasional build
> failures that way around so the code is going to need more extensive
> changes.

I found that issue in the exception handling and have sent a v2 of the
patch with that fixed.

Cheers,

Richard
diff mbox series

Patch

diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
index 91e3ee025ea..e88ad24cf61 100644
--- a/lib/bb/cooker.py
+++ b/lib/bb/cooker.py
@@ -1998,8 +1998,9 @@  class ParsingFailure(Exception):
         Exception.__init__(self, realexception, recipe)
 
 class Parser(multiprocessing.Process):
-    def __init__(self, jobs, results, quit, profile):
+    def __init__(self, jobs, jobid_queue, results, quit, profile):
         self.jobs = jobs
+        self.jobid_queue = jobid_queue
         self.results = results
         self.quit = quit
         multiprocessing.Process.__init__(self)
@@ -2064,12 +2065,14 @@  class Parser(multiprocessing.Process):
                 if self.quit.is_set():
                     break
 
-                job = None
+                jobid = None
                 try:
-                    job = self.jobs.pop()
-                except IndexError:
+                    jobid = self.jobid_queue.get(True, 0.5)
+                except (ValueError, OSError):
                     havejobs = False
-                if job:
+
+                if jobid is not None:
+                    job = self.jobs[jobid]
                     result = self.parse(*job)
                     # Clear the siggen cache after parsing to control memory usage, its huge
                     bb.parse.siggen.postparsing_clean_cache()
@@ -2082,6 +2085,7 @@  class Parser(multiprocessing.Process):
                     except queue.Full:
                         pending.append(result)
         finally:
+            self.jobs.close()
             self.results.close()
             self.results.join_thread()
 
@@ -2134,13 +2138,13 @@  class CookerParser(object):
 
         self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array)
         self.fromcache = set()
-        self.willparse = set()
+        self.willparse = []
         for mc in self.cooker.multiconfigs:
             for filename in self.mcfilelist[mc]:
                 appends = self.cooker.collections[mc].get_file_appends(filename)
                 layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
                 if not self.bb_caches[mc].cacheValid(filename, appends):
-                    self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername))
+                    self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername))
                 else:
                     self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername))
 
@@ -2159,22 +2163,25 @@  class CookerParser(object):
     def start(self):
         self.results = self.load_cached()
         self.processes = []
+
         if self.toparse:
             bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
 
+            self.toparse_queue = multiprocessing.Queue(len(self.willparse))
             self.parser_quit = multiprocessing.Event()
             self.result_queue = multiprocessing.Queue()
 
-            def chunkify(lst,n):
-                return [lst[i::n] for i in range(n)]
-            self.jobs = chunkify(list(self.willparse), self.num_processes)
+            for jobid in range(len(self.willparse)):
+                self.toparse_queue.put(jobid)
 
             for i in range(0, self.num_processes):
-                parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile)
+                parser = Parser(self.willparse, self.toparse_queue, self.result_queue, self.parser_quit, self.cooker.configuration.profile)
                 parser.start()
                 self.process_names.append(parser.name)
                 self.processes.append(parser)
 
+            self.toparse_queue.close()
+
             self.results = itertools.chain(self.results, self.parse_generator())
 
     def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"):