bitbake-worker: add header with length of message

Commit Message

Etienne Cordonnier Sept. 21, 2023, 7:56 a.m. UTC
From: Etienne Cordonnier <ecordonnier@snap.com>

The IPC mechanism between runqueue.py and bitbake-worker is currently
not scalable:

The data is sent with the format <tag>pickled-data</tag>, and bitbake-worker
has no information about the size of the message. Therefore, the bitbake-worker
is calling select() and read() in a loop, and then calling "self.queue.find(b"</" + item + b">")"
for each chunk received.

This does not scale, because queue.find has a linear complexity relative to the size of the queue,
and workerdata messages get very big e.g. for builds which reference a lot of files in SRC_URI.
The number of chunks varies, but on my test system a lot of chunks of 65536 bytes are sent, and each
iteration takes 0.1 seconds, making the transfer of the "workerdata" data very slow (on my test setup
35 seconds before this fix, and 1.5 seconds after this fix).

This commit adds a 4 bytes header after <tag>, so that bitbake-worker knows how many bytes need to be
received, and does not need to constantly search the whole queue for </tag>.

Signed-off-by: Etienne Cordonnier <ecordonnier@snap.com>
 bin/bitbake-worker | 34 +++++++++++++++++++++++-----------
 lib/bb/runqueue.py | 34 ++++++++++++++++++++++------------
 2 files changed, 45 insertions(+), 23 deletions(-)


Richard Purdie Sept. 21, 2023, 8:22 a.m. UTC | #1
On Thu, 2023-09-21 at 09:56 +0200, Etienne Cordonnier via
lists.yoctoproject.org wrote:
> From: Etienne Cordonnier <ecordonnier@snap.com>
> The IPC mechanism between runqueue.py and bitbake-worker is currently
> not scalable:
> The data is sent with the format <tag>pickled-data</tag>, and bitbake-worker
> has no information about the size of the message. Therefore, the bitbake-worker
> is calling select() and read() in a loop, and then calling "self.queue.find(b"</" + item + b">")"
> for each chunk received.
> This does not scale, because queue.find has a linear complexity relative to the size of the queue,
> and workerdata messages get very big e.g. for builds which reference a lot of files in SRC_URI.
> The number of chunks varies, but on my test system a lot of chunks of 65536 bytes are sent, and each
> iteration takes 0.1 seconds, making the transfer of the "workerdata" data very slow (on my test setup
> 35 seconds before this fix, and 1.5 seconds after this fix).
> This commit adds a 4 bytes header after <tag>, so that bitbake-worker knows how many bytes need to be
> received, and does not need to constantly search the whole queue for </tag>.
> Signed-off-by: Etienne Cordonnier <ecordonnier@snap.com>
> ---
>  bin/bitbake-worker | 34 +++++++++++++++++++++++-----------
>  lib/bb/runqueue.py | 34 ++++++++++++++++++++++------------
>  2 files changed, 45 insertions(+), 23 deletions(-)

This does look interesting. The IPC mechanism was never intended to
carry large data and it has changed over time in ways it was never
really designed for.

I'm guessing just one of the objects is particularly large? You mention
lots of files in SRC_URI causing the issue, I'd like to better
understand that.

I'm just wondering whether instead of improving the IPC, we could avoid
some of the data in the first place?


Etienne Cordonnier Sept. 21, 2023, 11:09 a.m. UTC | #2
Hi Richard,
we have a use-case similar to the use-case for the externalsrc class, where
we add whole directories containing many source-code files to SRC_URI of
various recipes (instead of pointing to a tarball or to a git repository
like poky recipes do). For that reason, we transmit 180MB of "workerdata"
for one build instead of 18MB of workerdata e.g. for core-image-sato. The
workerdata contains the signatures of many files and is therefore quite big
(the "sigdata" field of the workerdata dictionary).
However with this fix the transmission is quite fast (about 1.5 seconds for
the call to "RunQueue.send_pickled_data(worker, workerdata, "workerdata")",
and could probably be optimized even more by passing the length-header
first in the IPC and doing only one blocking read for all the data).

Here is an analysis of the workerdata payload in my test setup (I dumped
workerdata to a file, then used awk '{ print length }' file.txt to count
each line):
'taskdeps': 16.520.856
'fakerootenv': 10.318.907
'fakerootdirs': 884.364
'fakerootnoenv': 569.865
'sigdata': 160.786.962
'logdefaultlevel': 23
'build_verbose_shell': 30
'build_verbose_stdout': 31
'logdefaultdomain': 24
'prhost': 16
'buildname': 31
'date': 20
'time': 18
'hashservaddr': 22
'umask': 17

Reducing the size of the data is maybe an option (I don't know enough about
bitbake architecture to judge whether it's possible or not), but I would
improve the IPC anyway, since there is no drawback in making it more


On Thu, Sep 21, 2023 at 10:22 AM Richard Purdie <
richard.purdie@linuxfoundation.org> wrote:

> On Thu, 2023-09-21 at 09:56 +0200, Etienne Cordonnier via
> lists.yoctoproject.org wrote:
> > From: Etienne Cordonnier <ecordonnier@snap.com>
> >
> > The IPC mechanism between runqueue.py and bitbake-worker is currently
> > not scalable:
> >
> > The data is sent with the format <tag>pickled-data</tag>, and
> bitbake-worker
> > has no information about the size of the message. Therefore, the
> bitbake-worker
> > is calling select() and read() in a loop, and then calling
> "self.queue.find(b"</" + item + b">")"
> > for each chunk received.
> >
> > This does not scale, because queue.find has a linear complexity relative
> to the size of the queue,
> > and workerdata messages get very big e.g. for builds which reference a
> lot of files in SRC_URI.
> > The number of chunks varies, but on my test system a lot of chunks of
> 65536 bytes are sent, and each
> > iteration takes 0.1 seconds, making the transfer of the "workerdata"
> data very slow (on my test setup
> > 35 seconds before this fix, and 1.5 seconds after this fix).
> >
> > This commit adds a 4 bytes header after <tag>, so that bitbake-worker
> knows how many bytes need to be
> > received, and does not need to constantly search the whole queue for
> </tag>.
> >
> > Signed-off-by: Etienne Cordonnier <ecordonnier@snap.com>
> > ---
> >  bin/bitbake-worker | 34 +++++++++++++++++++++++-----------
> >  lib/bb/runqueue.py | 34 ++++++++++++++++++++++------------
> >  2 files changed, 45 insertions(+), 23 deletions(-)
> This does look interesting. The IPC mechanism was never intended to
> carry large data and it has changed over time in ways it was never
> really designed for.
> I'm guessing just one of the objects is particularly large? You mention
> lots of files in SRC_URI causing the issue, I'd like to better
> understand that.
> I'm just wondering whether instead of improving the IPC, we could avoid
> some of the data in the first place?
> Cheers,
> Richard
diff --git a/bin/bitbake-worker b/bin/bitbake-worker
index 451e6926..a4e78991 100755
--- a/bin/bitbake-worker
+++ b/bin/bitbake-worker
@@ -433,18 +433,30 @@  class BitbakeWorker(object):
                 while self.process_waitpid():
     def handle_item(self, item, func):
-        if self.queue.startswith(b"<" + item + b">"):
-            index = self.queue.find(b"</" + item + b">")
-            while index != -1:
-                try:
-                    func(self.queue[(len(item) + 2):index])
-                except pickle.UnpicklingError:
-                    workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
-                    raise
-                self.queue = self.queue[(index + len(item) + 3):]
-                index = self.queue.find(b"</" + item + b">")
+        opening_tag = b"<" + item + b">"
+        if not self.queue.startswith(opening_tag):
+            return
+        tag_len = len(opening_tag)
+        if len(self.queue) < tag_len + 4:
+            # we need to receive more data
+            return
+        header = self.queue[tag_len:tag_len + 4]
+        payload_len = int.from_bytes(header, 'big')
+        # closing tag has length (tag_len + 1)
+        if len(self.queue) < tag_len * 2 + 1 + payload_len:
+            # we need to receive more data
+            return
+        index = self.queue.find(b"</" + item + b">")
+        if index != -1:
+            try:
+                func(self.queue[(tag_len + 4):index])
+            except pickle.UnpicklingError:
+                workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
+                raise
+            self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
     def handle_cookercfg(self, data):
         self.cookercfg = pickle.loads(data)
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index c88d7129..9afb899c 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1318,6 +1318,16 @@  class RunQueue:
         self.worker = {}
         self.fakeworker = {}
+    @staticmethod
+    def send_pickled_data(worker, data, name):
+        msg = bytearray()
+        msg.extend(b"<" + name.encode() + b">")
+        pickled_data = pickle.dumps(data)
+        msg.extend(len(pickled_data).to_bytes(4, 'big'))
+        msg.extend(pickled_data)
+        msg.extend(b"</" + name.encode() + b">")
+        worker.stdin.write(msg)
     def _start_worker(self, mc, fakeroot = False, rqexec = None):
         logger.debug("Starting bitbake-worker")
         magic = "decafbad"
@@ -1353,9 +1363,9 @@  class RunQueue:
             "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
-        worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
-        worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
-        worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
+        RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
+        RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
+        RunQueue.send_pickled_data(worker, workerdata, "workerdata")
         return RunQueueWorker(worker, workerpipe)
@@ -1365,7 +1375,7 @@  class RunQueue:
         logger.debug("Teardown for bitbake-worker")
-           worker.process.stdin.write(b"<quit></quit>")
+           RunQueue.send_pickled_data(worker.process, b"", "quit")
         except IOError:
@@ -1892,14 +1902,14 @@  class RunQueueExecute:
     def finish_now(self):
         for mc in self.rq.worker:
-                self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
+                RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
             except IOError:
                 # worker must have died?
         for mc in self.rq.fakeworker:
-                self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
+                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
             except IOError:
                 # worker must have died?
@@ -2194,10 +2204,10 @@  class RunQueueExecute:
             if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
                 if not mc in self.rq.fakeworker:
                     self.rq.start_fakeworker(self, mc)
-                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
-                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+                RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
             self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2295,10 +2305,10 @@  class RunQueueExecute:
                         self.rq.state = runQueueFailed
                         return True
-                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
-                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+                RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
             self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2500,9 +2510,9 @@  class RunQueueExecute:
         if changed:
             for mc in self.rq.worker:
-                self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+                RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
             for mc in self.rq.fakeworker:
-                self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
             hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))