Message ID | 20211211182524.1807371-1-JPEWhacker@gmail.com |
---|---|
State | New |
Headers | show |
Series | [bitbake-devel,RFC] bitbake-worker: Switch to use asyncio | expand |
> -----Original Message----- > From: bitbake-devel@lists.openembedded.org <bitbake- > devel@lists.openembedded.org> On Behalf Of Joshua Watt > Sent: den 11 december 2021 19:25 > To: bitbake-devel@lists.openembedded.org > Cc: richard.purdie@linuxfoundation.org; ross@burtonini.com; kergoth@gmail.com; Joshua Watt <JPEWhacker@gmail.com> > Subject: [bitbake-devel][RFC] bitbake-worker: Switch to use asyncio > > Switches bitbake-worker to use asyncio. This is a good canidate for > initial modernization using asyncio because it is self-contained will a Change "will" to "with". //Peter > well defined interface to the bitbake server process. > > Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> > --- > bitbake/bin/bitbake-worker | 554 +++++++++++++++++++------------------ > 1 file changed, 284 insertions(+), 270 deletions(-) > > diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker > index bf96207edc..d5cc4fa248 100755 > --- a/bitbake/bin/bitbake-worker > +++ b/bitbake/bin/bitbake-worker > @@ -11,16 +11,14 @@ sys.path.insert(0, > os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), ' > from bb import fetch2 > import logging > import bb > -import select > import errno > import signal > import pickle > import traceback > -import queue > import shlex > import subprocess > from multiprocessing import Lock > -from threading import Thread > +import asyncio > > if sys.getfilesystemencoding() != "utf-8": > sys.exit("Please use a locale setting which supports UTF-8 (such as > LANG=en_US.UTF-8).\nPython can't change the filesystem locale after > loading so we need a UTF-8 when Python starts or things won't work.") > @@ -53,14 +51,15 @@ except: > > logger = logging.getLogger("BitBake") > > -worker_pipe = sys.stdout.fileno() > -bb.utils.nonblockingfd(worker_pipe) > -# Need to guard against multiprocessing being used in child processes > -# and multiple processes trying to write to the parent at the same time > -worker_pipe_lock = None > > -handler = bb.event.LogHandler() > -logger.addHandler(handler) > +async def connect_stdout(): > + loop = asyncio.get_event_loop() > + w_transport, w_protocol = await > loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout) > + writer = asyncio.StreamWriter(w_transport, w_protocol, None, loop) > + return writer > + > +log_handler = bb.event.LogHandler() > +logger.addHandler(log_handler) > > if 0: > # Code to write out a log file of all events passing through the > worker > @@ -71,73 +70,270 @@ if 0: > consolelog.setFormatter(conlogformat) > logger.addHandler(consolelog) > > -worker_queue = queue.Queue() > - > -def worker_fire(event, d): > - data = b"<event>" + pickle.dumps(event) + b"</event>" > - worker_fire_prepickled(data) > +async def read_messages(fd, handlers): > + buf = b"" > + event = asyncio.Event() > + done = False > > -def worker_fire_prepickled(event): > - global worker_queue > + def read_data(): > + nonlocal buf > + nonlocal fd > + nonlocal event > + nonlocal done > > - worker_queue.put(event) > + try: > + data = os.read(fd, 102400) > + except (OSError, IOError) as e: > + if e.errno != errno.EAGAIN: > + raise > + return > > -# > -# We can end up with write contention with the cooker, it can be trying > to send commands > -# and we can be trying to send event data back. Therefore use a separate > thread for writing > -# back data to cooker. > -# > -worker_thread_exit = False > + if len(data) == 0: > + done = True > + else: > + buf += data > + event.set() > > -def worker_flush(worker_queue): > - worker_queue_int = b"" > - global worker_pipe, worker_thread_exit > + asyncio.get_event_loop().add_reader(fd, read_data) > > - while True: > + try: > + while not done: > + for name, handler in handlers.items(): > + prefix = b"<" + name + b">" > + if buf.startswith(prefix): > + suffix = b"</" + name + b">" > + index = buf.find(suffix) > + if index != -1: > + try: > + workerlog_write("%d: Handling %r\n" % (fd, > name)) > + await handler(buf[len(prefix):index]) > + except pickle.UnpicklingError: > + workerlog_write("Unable to unpickle data: > %s\n" % ":".join("{:02x}".format(c) for c in buf)) > + raise > + > + buf = buf[index + len(suffix):] > + break > + # TODO: The old code would keep looking for an > ending > + # tag in a loop, so that a stream like > + # <A>foo</A>bar</A> was valid. This doesn't > appear to > + # be necessary anymore? > + #index = self.buf.find(b"</" + item + b">") > + else: > + # Nothing found in the buffer. Wait for more data > + await event.wait() > + event.clear() > + finally: > + asyncio.get_event_loop().remove_reader(fd) > + > +class ChildHandler(object): > + def __init__(self, writer, task, pid, pipeinfd, pipeoutfd): > + self.task = task > + self.writer = writer > + self.pid = pid > + self.pipeinfd = pipeinfd > + if pipeoutfd >= 0: > + os.close(pipeoutfd) > + > + self.done_event = asyncio.Event() > + self.loop = asyncio.get_running_loop() > + > + asyncio.get_child_watcher().add_child_handler(self.pid, > self._child_watcher_callback) > + > + def _child_watcher_callback(self, pid, status): > + # The callback may be called in a thread, so call_soon_threadsafe > is > + # recommended to get back to the main loop. > + self.loop.call_soon_threadsafe(self.child_exited, pid, status) > + > + async def main_loop(self): > + bb.utils.nonblockingfd(self.pipeinfd) > + await read_messages(self.pipeinfd, { > + b"event": self.handle_event, > + }) > + os.close(self.pipeinfd) > + await self.done_event.wait() > + > + async def handle_event(self, data): > + self.writer.write(b"<event>") > + self.writer.write(data) > + self.writer.write(b"</event>") > + await self.writer.drain() > + > + def child_exited(self, pid, status): > try: > - worker_queue_int = worker_queue_int + worker_queue.get(True, > 1) > - except queue.Empty: > - pass > - while (worker_queue_int or not worker_queue.empty()): > + if pid != self.pid: > + return > + > + workerlog_write("Exit code of %d for pid %d (fd %d)\n" % > (status, pid, self.pipeinfd)) > + > + asyncio.get_child_watcher().remove_child_handler(self.pid) > + > + if os.WIFEXITED(status): > + status = os.WEXITSTATUS(status) > + elif os.WIFSIGNALED(status): > + # Per shell conventions for $?, when a process exits due > to > + # a signal, we return an exit code of 128 + SIGNUM > + status = 128 + os.WTERMSIG(status) > + > + self.writer.write(b"<exitcode>") > + self.writer.write(pickle.dumps((self.task, status))) > + self.writer.write(b"</exitcode>") > + > + self.done_event.set() > + except Exception as e: > + workerlog_write("%s\n%s\n" % (traceback.format_exc(), e)) > + raise e > + > + def close(self): > + if not self.done_event.is_set(): > try: > - (_, ready, _) = select.select([], [worker_pipe], [], 1) > - if not worker_queue.empty(): > - worker_queue_int = worker_queue_int + > worker_queue.get() > - written = os.write(worker_pipe, worker_queue_int) > - worker_queue_int = worker_queue_int[written:] > - except (IOError, OSError) as e: > - if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: > - raise > - if worker_thread_exit and worker_queue.empty() and not > worker_queue_int: > - return > + os.kill(-self.pid, signal.SIGTERM) > + except OSError: > + pass > + > + > +class MainHandler(object): > + def __init__(self, writer): > + self.writer = writer > + self.cookercfg = None > + self.databuilder = None > + self.data = None > + self.extraconfigdata = None > + self.children = [] > + self.child_tasks = [] > > -worker_thread = Thread(target=worker_flush, args=(worker_queue,)) > -worker_thread.start() > + async def main_loop(self): > + loop = asyncio.get_running_loop() > + fd = sys.stdin.fileno() > + bb.utils.nonblockingfd(fd) > > -def worker_child_fire(event, d): > - global worker_pipe > - global worker_pipe_lock > + loop.add_signal_handler(signal.SIGTERM, self.signal_handler) > + loop.add_signal_handler(signal.SIGHUP, self.signal_handler) > > - data = b"<event>" + pickle.dumps(event) + b"</event>" > - try: > - worker_pipe_lock.acquire() > - while(len(data)): > - written = worker_pipe.write(data) > - data = data[written:] > - worker_pipe_lock.release() > - except IOError: > - sigterm_handler(None, None) > - raise > + try: > + await read_messages(fd, { > + b"cookerconfig": self.handle_cookercfg, > + b"extraconfigdata": self.handle_extraconfigdata, > + b"workerdata": self.handle_workerdata, > + b"newtaskhashes": self.handle_newtaskhashes, > + b"runtask": self.handle_runtask, > + b"finishnow": self.handle_finishnow, > + b"ping": self.handle_ping, > + b"quit": self.handle_quit, > + } > + ) > + finally: > + loop.remove_signal_handler(signal.SIGTERM) > + loop.remove_signal_handler(signal.SIGHUP) > + > + def signal_handler(self, signum, stackframe): > + loop = asyncio.get_running_loop() > + > + if signum == signal.SIGTERM: > + bb.warn("Worker received SIGTERM, shutting down...") > + elif signum == signal.SIGHUP: > + bb.warn("Worker received SIGHUP, shutting down...") > + > + self.handle_finishnow(None) > + loop.remove_signal_handler(signal.SIGTERM) > + os.kill(os.getpid(), signal.SIGTERM) > + > + async def handle_cookercfg(self, data): > + self.cookercfg = pickle.loads(data) > + self.databuilder = > bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) > + self.databuilder.parseBaseConfiguration() > + self.data = self.databuilder.data > + > + async def handle_extraconfigdata(self, data): > + self.extraconfigdata = pickle.loads(data) > + > + async def handle_workerdata(self, data): > + self.workerdata = pickle.loads(data) > + bb.build.verboseShellLogging = > self.workerdata["build_verbose_shell"] > + bb.build.verboseStdoutLogging = > self.workerdata["build_verbose_stdout"] > + bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] > + bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] > + for mc in self.databuilder.mcdata: > + self.databuilder.mcdata[mc].setVar("PRSERV_HOST", > self.workerdata["prhost"]) > + self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", > self.workerdata["hashservaddr"]) > + > + async def handle_newtaskhashes(self, data): > + self.workerdata["newhashes"] = pickle.loads(data) > + > + async def handle_ping(self, _): > + logger.warning("Pong from bitbake-worker!") > + > + async def handle_quit(self, data): > + global normalexit > + normalexit = True > + sys.exit(0) > + > + async def run_child(self, child): > + await child.main_loop() > + self.children.remove(child) > + self.child_tasks.remove(asyncio.current_task()) > + > + async def handle_runtask(self, data): > + fn, task, taskname, taskhash, unihash, quieterrors, appends, > taskdepdata, dry_run_exec = pickle.loads(data) > + workerlog_write("Handling runtask %s %s %s\n" % (task, fn, > taskname)) > + > + pid, pipeinfd, pipeoutfd = fork_off_task(self.cookercfg, > self.data, self.databuilder, self.workerdata, fn, task, taskname, > taskhash, unihash, appends, taskdepdata, self.extraconfigdata, > quieterrors, dry_run_exec) > + > + child = ChildHandler(self.writer, task, pid, pipeinfd, pipeoutfd) > + self.children.append(child) > + > + t = asyncio.ensure_future(self.run_child(child)) > + self.child_tasks.append(t) > + > + async def handle_finishnow(self, _=None): > + for c in self.children: > + c.close() > + > + workerlog_write("Waiting for %d child tasks: %s\n" % > (len(self.child_tasks), > + " ".join(str(c.pid) for c in self.children))) > + > + # Wait for all outstanding children to exit > + await asyncio.gather(*self.child_tasks) > + > +async def main(): > + writer = await connect_stdout() > + worker_queue = [] > + > + def worker_fire(event, d): > + nonlocal worker_queue > + > + async def flush_worker_queue(): > + nonlocal writer > + nonlocal worker_queue > + > + if worker_queue: > + for m in worker_queue: > + writer.write(m) > + worker_queue = [] > + await writer.drain() > + > + # To ensure the messages are sent out in the order they are > received, > + # put them in a list then schedule a task to write them out > + data = b"<event>" + pickle.dumps(event) + b"</event>" > + worker_queue.append(data) > + asyncio.ensure_future(flush_worker_queue()) > > -bb.event.worker_fire = worker_fire > + bb.event.worker_fire = worker_fire > + > + handler = MainHandler(writer) > + > + await asyncio.gather(handler.main_loop()) > + > +normalexit = False > > lf = None > #lf = open("/tmp/workercommandlog", "w+") > def workerlog_write(msg): > + global lf > if lf: > lf.write(msg) > lf.flush() > > + > def sigterm_handler(signum, frame): > signal.signal(signal.SIGTERM, signal.SIG_DFL) > os.killpg(0, signal.SIGTERM) > @@ -191,9 +387,7 @@ def fork_off_task(cfg, data, databuilder, workerdata, > fn, task, taskname, taskha > sys.stderr.flush() > > try: > - pipein, pipeout = os.pipe() > - pipein = os.fdopen(pipein, 'rb', 4096) > - pipeout = os.fdopen(pipeout, 'wb', 0) > + pipeinfd, pipeoutfd = os.pipe() > pid = os.fork() > except OSError as e: > logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) > @@ -201,18 +395,30 @@ def fork_off_task(cfg, data, databuilder, > workerdata, fn, task, taskname, taskha > > if pid == 0: > def child(): > - global worker_pipe > - global worker_pipe_lock > - pipein.close() > + os.close(pipeinfd) > > bb.utils.signal_on_parent_exit("SIGTERM") > > + pipeout = os.fdopen(pipeoutfd, 'wb', 0) > + pipelock = Lock() > + def worker_child_fire(event, d): > + nonlocal pipeout > + nonlocal pipelock > + > + data = b"<event>" + pickle.dumps(event) + b"</event>" > + try: > + with pipelock: > + while(len(data)): > + written = pipeout.write(data) > + data = data[written:] > + except IOError: > + sigterm_handler(None, None) > + raise > + > # Save out the PID so that the event can include it the > # events > bb.event.worker_pid = os.getpid() > bb.event.worker_fire = worker_child_fire > - worker_pipe = pipeout > - worker_pipe_lock = Lock() > > # Make the child the process group leader and ensure no > # child process will be controlled by the current terminal > @@ -315,225 +521,33 @@ def fork_off_task(cfg, data, databuilder, > workerdata, fn, task, taskname, taskha > else: > os.environ[key] = value > > - return pid, pipein, pipeout > - > -class runQueueWorkerPipe(): > - """ > - Abstraction for a pipe between a worker thread and the worker server > - """ > - def __init__(self, pipein, pipeout): > - self.input = pipein > - if pipeout: > - pipeout.close() > - bb.utils.nonblockingfd(self.input) > - self.queue = b"" > - > - def read(self): > - start = len(self.queue) > - try: > - self.queue = self.queue + (self.input.read(102400) or b"") > - except (OSError, IOError) as e: > - if e.errno != errno.EAGAIN: > - raise > - > - end = len(self.queue) > - index = self.queue.find(b"</event>") > - while index != -1: > - msg = self.queue[:index+8] > - assert msg.startswith(b"<event>") and msg.count(b"<event>") > == 1 > - worker_fire_prepickled(msg) > - self.queue = self.queue[index+8:] > - index = self.queue.find(b"</event>") > - return (end > start) > - > - def close(self): > - while self.read(): > - continue > - if len(self.queue) > 0: > - print("Warning, worker child left partial message: %s" % > self.queue) > - self.input.close() > - > -normalexit = False > + return pid, pipeinfd, pipeoutfd > > -class BitbakeWorker(object): > - def __init__(self, din): > - self.input = din > - bb.utils.nonblockingfd(self.input) > - self.queue = b"" > - self.cookercfg = None > - self.databuilder = None > - self.data = None > - self.extraconfigdata = None > - self.build_pids = {} > - self.build_pipes = {} > - > - signal.signal(signal.SIGTERM, self.sigterm_exception) > - # Let SIGHUP exit as SIGTERM > - signal.signal(signal.SIGHUP, self.sigterm_exception) > - if "beef" in sys.argv[1]: > - bb.utils.set_process_name("Worker (Fakeroot)") > - else: > - bb.utils.set_process_name("Worker") > - > - def sigterm_exception(self, signum, stackframe): > - if signum == signal.SIGTERM: > - bb.warn("Worker received SIGTERM, shutting down...") > - elif signum == signal.SIGHUP: > - bb.warn("Worker received SIGHUP, shutting down...") > - self.handle_finishnow(None) > - signal.signal(signal.SIGTERM, signal.SIG_DFL) > - os.kill(os.getpid(), signal.SIGTERM) > - > - def serve(self): > - while True: > - (ready, _, _) = select.select([self.input] + [i.input for i > in self.build_pipes.values()], [] , [], 1) > - if self.input in ready: > - try: > - r = self.input.read() > - if len(r) == 0: > - # EOF on pipe, server must have terminated > - self.sigterm_exception(signal.SIGTERM, None) > - self.queue = self.queue + r > - except (OSError, IOError): > - pass > - if len(self.queue): > - self.handle_item(b"cookerconfig", self.handle_cookercfg) > - self.handle_item(b"extraconfigdata", > self.handle_extraconfigdata) > - self.handle_item(b"workerdata", self.handle_workerdata) > - self.handle_item(b"newtaskhashes", > self.handle_newtaskhashes) > - self.handle_item(b"runtask", self.handle_runtask) > - self.handle_item(b"finishnow", self.handle_finishnow) > - self.handle_item(b"ping", self.handle_ping) > - self.handle_item(b"quit", self.handle_quit) > - > - for pipe in self.build_pipes: > - if self.build_pipes[pipe].input in ready: > - self.build_pipes[pipe].read() > - if len(self.build_pids): > - while self.process_waitpid(): > - continue > - > - > - def handle_item(self, item, func): > - if self.queue.startswith(b"<" + item + b">"): > - index = self.queue.find(b"</" + item + b">") > - while index != -1: > - try: > - func(self.queue[(len(item) + 2):index]) > - except pickle.UnpicklingError: > - workerlog_write("Unable to unpickle data: %s\n" % > ":".join("{:02x}".format(c) for c in self.queue)) > - raise > - self.queue = self.queue[(index + len(item) + 3):] > - index = self.queue.find(b"</" + item + b">") > - > - def handle_cookercfg(self, data): > - self.cookercfg = pickle.loads(data) > - self.databuilder = > bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) > - self.databuilder.parseBaseConfiguration() > - self.data = self.databuilder.data > - > - def handle_extraconfigdata(self, data): > - self.extraconfigdata = pickle.loads(data) > - > - def handle_workerdata(self, data): > - self.workerdata = pickle.loads(data) > - bb.build.verboseShellLogging = > self.workerdata["build_verbose_shell"] > - bb.build.verboseStdoutLogging = > self.workerdata["build_verbose_stdout"] > - bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] > - bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] > - for mc in self.databuilder.mcdata: > - self.databuilder.mcdata[mc].setVar("PRSERV_HOST", > self.workerdata["prhost"]) > - self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", > self.workerdata["hashservaddr"]) > - > - def handle_newtaskhashes(self, data): > - self.workerdata["newhashes"] = pickle.loads(data) > - > - def handle_ping(self, _): > - workerlog_write("Handling ping\n") > - > - logger.warning("Pong from bitbake-worker!") > - > - def handle_quit(self, data): > - workerlog_write("Handling quit\n") > - > - global normalexit > - normalexit = True > - sys.exit(0) > - > - def handle_runtask(self, data): > - fn, task, taskname, taskhash, unihash, quieterrors, appends, > taskdepdata, dry_run_exec = pickle.loads(data) > - workerlog_write("Handling runtask %s %s %s\n" % (task, fn, > taskname)) > - > - pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, > self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, > appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) > - > - self.build_pids[pid] = task > - self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) > - > - def process_waitpid(self): > - """ > - Return none is there are no processes awaiting result collection, > otherwise > - collect the process exit codes and close the information pipe. > - """ > - try: > - pid, status = os.waitpid(-1, os.WNOHANG) > - if pid == 0 or os.WIFSTOPPED(status): > - return False > - except OSError: > - return False > - > - workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) > - > - if os.WIFEXITED(status): > - status = os.WEXITSTATUS(status) > - elif os.WIFSIGNALED(status): > - # Per shell conventions for $?, when a process exits due to > - # a signal, we return an exit code of 128 + SIGNUM > - status = 128 + os.WTERMSIG(status) > - > - task = self.build_pids[pid] > - del self.build_pids[pid] > - > - self.build_pipes[pid].close() > - del self.build_pipes[pid] > - > - worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, > status)) + b"</exitcode>") > - > - return True > +try: > + if "beef" in sys.argv[1]: > + bb.utils.set_process_name("Worker (Fakeroot)") > + else: > + bb.utils.set_process_name("Worker") > > - def handle_finishnow(self, _): > - if self.build_pids: > - logger.info("Sending SIGTERM to remaining %s tasks", > len(self.build_pids)) > - for k, v in iter(self.build_pids.items()): > - try: > - os.kill(-k, signal.SIGTERM) > - os.waitpid(-1, 0) > - except: > - pass > - for pipe in self.build_pipes: > - self.build_pipes[pipe].read() > + loop = asyncio.get_event_loop() > > -try: > - worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) > if not profiling: > - worker.serve() > + loop.run_until_complete(main()) > else: > profname = "profile-worker.log" > prof = profile.Profile() > try: > - profile.Profile.runcall(prof, worker.serve) > + profile.Profile.runcall(prof, loop.run_until_complete, > main()) > finally: > prof.dump_stats(profname) > bb.utils.process_profilelog(profname) > -except BaseException as e: > +except Exception as e: > + workerlog_write("%s\n%s\n" % (traceback.format_exc(), e)) > if not normalexit: > - import traceback > sys.stderr.write(traceback.format_exc()) > sys.stderr.write(str(e)) > -finally: > - worker_thread_exit = True > - worker_thread.join() > > -workerlog_write("exiting") > +workerlog_write("exiting\n") > if not normalexit: > sys.exit(1) > sys.exit(0) > -- > 2.33.0
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index bf96207edc..d5cc4fa248 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker @@ -11,16 +11,14 @@ sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), ' from bb import fetch2 import logging import bb -import select import errno import signal import pickle import traceback -import queue import shlex import subprocess from multiprocessing import Lock -from threading import Thread +import asyncio if sys.getfilesystemencoding() != "utf-8": sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") @@ -53,14 +51,15 @@ except: logger = logging.getLogger("BitBake") -worker_pipe = sys.stdout.fileno() -bb.utils.nonblockingfd(worker_pipe) -# Need to guard against multiprocessing being used in child processes -# and multiple processes trying to write to the parent at the same time -worker_pipe_lock = None -handler = bb.event.LogHandler() -logger.addHandler(handler) +async def connect_stdout(): + loop = asyncio.get_event_loop() + w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout) + writer = asyncio.StreamWriter(w_transport, w_protocol, None, loop) + return writer + +log_handler = bb.event.LogHandler() +logger.addHandler(log_handler) if 0: # Code to write out a log file of all events passing through the worker @@ -71,73 +70,270 @@ if 0: consolelog.setFormatter(conlogformat) logger.addHandler(consolelog) -worker_queue = queue.Queue() - -def worker_fire(event, d): - data = b"<event>" + pickle.dumps(event) + b"</event>" - worker_fire_prepickled(data) +async def read_messages(fd, handlers): + buf = b"" + event = asyncio.Event() + done = False -def worker_fire_prepickled(event): - global worker_queue + def read_data(): + nonlocal buf + nonlocal fd + nonlocal event + nonlocal done - worker_queue.put(event) + try: + data = os.read(fd, 102400) + except (OSError, IOError) as e: + if e.errno != errno.EAGAIN: + raise + return -# -# We can end up with write contention with the cooker, it can be trying to send commands -# and we can be trying to send event data back. Therefore use a separate thread for writing -# back data to cooker. -# -worker_thread_exit = False + if len(data) == 0: + done = True + else: + buf += data + event.set() -def worker_flush(worker_queue): - worker_queue_int = b"" - global worker_pipe, worker_thread_exit + asyncio.get_event_loop().add_reader(fd, read_data) - while True: + try: + while not done: + for name, handler in handlers.items(): + prefix = b"<" + name + b">" + if buf.startswith(prefix): + suffix = b"</" + name + b">" + index = buf.find(suffix) + if index != -1: + try: + workerlog_write("%d: Handling %r\n" % (fd, name)) + await handler(buf[len(prefix):index]) + except pickle.UnpicklingError: + workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in buf)) + raise + + buf = buf[index + len(suffix):] + break + # TODO: The old code would keep looking for an ending + # tag in a loop, so that a stream like + # <A>foo</A>bar</A> was valid. This doesn't appear to + # be necessary anymore? + #index = self.buf.find(b"</" + item + b">") + else: + # Nothing found in the buffer. Wait for more data + await event.wait() + event.clear() + finally: + asyncio.get_event_loop().remove_reader(fd) + +class ChildHandler(object): + def __init__(self, writer, task, pid, pipeinfd, pipeoutfd): + self.task = task + self.writer = writer + self.pid = pid + self.pipeinfd = pipeinfd + if pipeoutfd >= 0: + os.close(pipeoutfd) + + self.done_event = asyncio.Event() + self.loop = asyncio.get_running_loop() + + asyncio.get_child_watcher().add_child_handler(self.pid, self._child_watcher_callback) + + def _child_watcher_callback(self, pid, status): + # The callback may be called in a thread, so call_soon_threadsafe is + # recommended to get back to the main loop. + self.loop.call_soon_threadsafe(self.child_exited, pid, status) + + async def main_loop(self): + bb.utils.nonblockingfd(self.pipeinfd) + await read_messages(self.pipeinfd, { + b"event": self.handle_event, + }) + os.close(self.pipeinfd) + await self.done_event.wait() + + async def handle_event(self, data): + self.writer.write(b"<event>") + self.writer.write(data) + self.writer.write(b"</event>") + await self.writer.drain() + + def child_exited(self, pid, status): try: - worker_queue_int = worker_queue_int + worker_queue.get(True, 1) - except queue.Empty: - pass - while (worker_queue_int or not worker_queue.empty()): + if pid != self.pid: + return + + workerlog_write("Exit code of %d for pid %d (fd %d)\n" % (status, pid, self.pipeinfd)) + + asyncio.get_child_watcher().remove_child_handler(self.pid) + + if os.WIFEXITED(status): + status = os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + # Per shell conventions for $?, when a process exits due to + # a signal, we return an exit code of 128 + SIGNUM + status = 128 + os.WTERMSIG(status) + + self.writer.write(b"<exitcode>") + self.writer.write(pickle.dumps((self.task, status))) + self.writer.write(b"</exitcode>") + + self.done_event.set() + except Exception as e: + workerlog_write("%s\n%s\n" % (traceback.format_exc(), e)) + raise e + + def close(self): + if not self.done_event.is_set(): try: - (_, ready, _) = select.select([], [worker_pipe], [], 1) - if not worker_queue.empty(): - worker_queue_int = worker_queue_int + worker_queue.get() - written = os.write(worker_pipe, worker_queue_int) - worker_queue_int = worker_queue_int[written:] - except (IOError, OSError) as e: - if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: - raise - if worker_thread_exit and worker_queue.empty() and not worker_queue_int: - return + os.kill(-self.pid, signal.SIGTERM) + except OSError: + pass + + +class MainHandler(object): + def __init__(self, writer): + self.writer = writer + self.cookercfg = None + self.databuilder = None + self.data = None + self.extraconfigdata = None + self.children = [] + self.child_tasks = [] -worker_thread = Thread(target=worker_flush, args=(worker_queue,)) -worker_thread.start() + async def main_loop(self): + loop = asyncio.get_running_loop() + fd = sys.stdin.fileno() + bb.utils.nonblockingfd(fd) -def worker_child_fire(event, d): - global worker_pipe - global worker_pipe_lock + loop.add_signal_handler(signal.SIGTERM, self.signal_handler) + loop.add_signal_handler(signal.SIGHUP, self.signal_handler) - data = b"<event>" + pickle.dumps(event) + b"</event>" - try: - worker_pipe_lock.acquire() - while(len(data)): - written = worker_pipe.write(data) - data = data[written:] - worker_pipe_lock.release() - except IOError: - sigterm_handler(None, None) - raise + try: + await read_messages(fd, { + b"cookerconfig": self.handle_cookercfg, + b"extraconfigdata": self.handle_extraconfigdata, + b"workerdata": self.handle_workerdata, + b"newtaskhashes": self.handle_newtaskhashes, + b"runtask": self.handle_runtask, + b"finishnow": self.handle_finishnow, + b"ping": self.handle_ping, + b"quit": self.handle_quit, + } + ) + finally: + loop.remove_signal_handler(signal.SIGTERM) + loop.remove_signal_handler(signal.SIGHUP) + + def signal_handler(self, signum, stackframe): + loop = asyncio.get_running_loop() + + if signum == signal.SIGTERM: + bb.warn("Worker received SIGTERM, shutting down...") + elif signum == signal.SIGHUP: + bb.warn("Worker received SIGHUP, shutting down...") + + self.handle_finishnow(None) + loop.remove_signal_handler(signal.SIGTERM) + os.kill(os.getpid(), signal.SIGTERM) + + async def handle_cookercfg(self, data): + self.cookercfg = pickle.loads(data) + self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) + self.databuilder.parseBaseConfiguration() + self.data = self.databuilder.data + + async def handle_extraconfigdata(self, data): + self.extraconfigdata = pickle.loads(data) + + async def handle_workerdata(self, data): + self.workerdata = pickle.loads(data) + bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] + bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] + bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] + bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] + for mc in self.databuilder.mcdata: + self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) + self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) + + async def handle_newtaskhashes(self, data): + self.workerdata["newhashes"] = pickle.loads(data) + + async def handle_ping(self, _): + logger.warning("Pong from bitbake-worker!") + + async def handle_quit(self, data): + global normalexit + normalexit = True + sys.exit(0) + + async def run_child(self, child): + await child.main_loop() + self.children.remove(child) + self.child_tasks.remove(asyncio.current_task()) + + async def handle_runtask(self, data): + fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) + workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) + + pid, pipeinfd, pipeoutfd = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) + + child = ChildHandler(self.writer, task, pid, pipeinfd, pipeoutfd) + self.children.append(child) + + t = asyncio.ensure_future(self.run_child(child)) + self.child_tasks.append(t) + + async def handle_finishnow(self, _=None): + for c in self.children: + c.close() + + workerlog_write("Waiting for %d child tasks: %s\n" % (len(self.child_tasks), + " ".join(str(c.pid) for c in self.children))) + + # Wait for all outstanding children to exit + await asyncio.gather(*self.child_tasks) + +async def main(): + writer = await connect_stdout() + worker_queue = [] + + def worker_fire(event, d): + nonlocal worker_queue + + async def flush_worker_queue(): + nonlocal writer + nonlocal worker_queue + + if worker_queue: + for m in worker_queue: + writer.write(m) + worker_queue = [] + await writer.drain() + + # To ensure the messages are sent out in the order they are received, + # put them in a list then schedule a task to write them out + data = b"<event>" + pickle.dumps(event) + b"</event>" + worker_queue.append(data) + asyncio.ensure_future(flush_worker_queue()) -bb.event.worker_fire = worker_fire + bb.event.worker_fire = worker_fire + + handler = MainHandler(writer) + + await asyncio.gather(handler.main_loop()) + +normalexit = False lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): + global lf if lf: lf.write(msg) lf.flush() + def sigterm_handler(signum, frame): signal.signal(signal.SIGTERM, signal.SIG_DFL) os.killpg(0, signal.SIGTERM) @@ -191,9 +387,7 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha sys.stderr.flush() try: - pipein, pipeout = os.pipe() - pipein = os.fdopen(pipein, 'rb', 4096) - pipeout = os.fdopen(pipeout, 'wb', 0) + pipeinfd, pipeoutfd = os.pipe() pid = os.fork() except OSError as e: logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) @@ -201,18 +395,30 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha if pid == 0: def child(): - global worker_pipe - global worker_pipe_lock - pipein.close() + os.close(pipeinfd) bb.utils.signal_on_parent_exit("SIGTERM") + pipeout = os.fdopen(pipeoutfd, 'wb', 0) + pipelock = Lock() + def worker_child_fire(event, d): + nonlocal pipeout + nonlocal pipelock + + data = b"<event>" + pickle.dumps(event) + b"</event>" + try: + with pipelock: + while(len(data)): + written = pipeout.write(data) + data = data[written:] + except IOError: + sigterm_handler(None, None) + raise + # Save out the PID so that the event can include it the # events bb.event.worker_pid = os.getpid() bb.event.worker_fire = worker_child_fire - worker_pipe = pipeout - worker_pipe_lock = Lock() # Make the child the process group leader and ensure no # child process will be controlled by the current terminal @@ -315,225 +521,33 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha else: os.environ[key] = value - return pid, pipein, pipeout - -class runQueueWorkerPipe(): - """ - Abstraction for a pipe between a worker thread and the worker server - """ - def __init__(self, pipein, pipeout): - self.input = pipein - if pipeout: - pipeout.close() - bb.utils.nonblockingfd(self.input) - self.queue = b"" - - def read(self): - start = len(self.queue) - try: - self.queue = self.queue + (self.input.read(102400) or b"") - except (OSError, IOError) as e: - if e.errno != errno.EAGAIN: - raise - - end = len(self.queue) - index = self.queue.find(b"</event>") - while index != -1: - msg = self.queue[:index+8] - assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1 - worker_fire_prepickled(msg) - self.queue = self.queue[index+8:] - index = self.queue.find(b"</event>") - return (end > start) - - def close(self): - while self.read(): - continue - if len(self.queue) > 0: - print("Warning, worker child left partial message: %s" % self.queue) - self.input.close() - -normalexit = False + return pid, pipeinfd, pipeoutfd -class BitbakeWorker(object): - def __init__(self, din): - self.input = din - bb.utils.nonblockingfd(self.input) - self.queue = b"" - self.cookercfg = None - self.databuilder = None - self.data = None - self.extraconfigdata = None - self.build_pids = {} - self.build_pipes = {} - - signal.signal(signal.SIGTERM, self.sigterm_exception) - # Let SIGHUP exit as SIGTERM - signal.signal(signal.SIGHUP, self.sigterm_exception) - if "beef" in sys.argv[1]: - bb.utils.set_process_name("Worker (Fakeroot)") - else: - bb.utils.set_process_name("Worker") - - def sigterm_exception(self, signum, stackframe): - if signum == signal.SIGTERM: - bb.warn("Worker received SIGTERM, shutting down...") - elif signum == signal.SIGHUP: - bb.warn("Worker received SIGHUP, shutting down...") - self.handle_finishnow(None) - signal.signal(signal.SIGTERM, signal.SIG_DFL) - os.kill(os.getpid(), signal.SIGTERM) - - def serve(self): - while True: - (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) - if self.input in ready: - try: - r = self.input.read() - if len(r) == 0: - # EOF on pipe, server must have terminated - self.sigterm_exception(signal.SIGTERM, None) - self.queue = self.queue + r - except (OSError, IOError): - pass - if len(self.queue): - self.handle_item(b"cookerconfig", self.handle_cookercfg) - self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) - self.handle_item(b"workerdata", self.handle_workerdata) - self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) - self.handle_item(b"runtask", self.handle_runtask) - self.handle_item(b"finishnow", self.handle_finishnow) - self.handle_item(b"ping", self.handle_ping) - self.handle_item(b"quit", self.handle_quit) - - for pipe in self.build_pipes: - if self.build_pipes[pipe].input in ready: - self.build_pipes[pipe].read() - if len(self.build_pids): - while self.process_waitpid(): - continue - - - def handle_item(self, item, func): - if self.queue.startswith(b"<" + item + b">"): - index = self.queue.find(b"</" + item + b">") - while index != -1: - try: - func(self.queue[(len(item) + 2):index]) - except pickle.UnpicklingError: - workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) - raise - self.queue = self.queue[(index + len(item) + 3):] - index = self.queue.find(b"</" + item + b">") - - def handle_cookercfg(self, data): - self.cookercfg = pickle.loads(data) - self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) - self.databuilder.parseBaseConfiguration() - self.data = self.databuilder.data - - def handle_extraconfigdata(self, data): - self.extraconfigdata = pickle.loads(data) - - def handle_workerdata(self, data): - self.workerdata = pickle.loads(data) - bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] - bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] - bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] - bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] - for mc in self.databuilder.mcdata: - self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) - self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) - - def handle_newtaskhashes(self, data): - self.workerdata["newhashes"] = pickle.loads(data) - - def handle_ping(self, _): - workerlog_write("Handling ping\n") - - logger.warning("Pong from bitbake-worker!") - - def handle_quit(self, data): - workerlog_write("Handling quit\n") - - global normalexit - normalexit = True - sys.exit(0) - - def handle_runtask(self, data): - fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) - workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) - - pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) - - self.build_pids[pid] = task - self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) - - def process_waitpid(self): - """ - Return none is there are no processes awaiting result collection, otherwise - collect the process exit codes and close the information pipe. - """ - try: - pid, status = os.waitpid(-1, os.WNOHANG) - if pid == 0 or os.WIFSTOPPED(status): - return False - except OSError: - return False - - workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) - - if os.WIFEXITED(status): - status = os.WEXITSTATUS(status) - elif os.WIFSIGNALED(status): - # Per shell conventions for $?, when a process exits due to - # a signal, we return an exit code of 128 + SIGNUM - status = 128 + os.WTERMSIG(status) - - task = self.build_pids[pid] - del self.build_pids[pid] - - self.build_pipes[pid].close() - del self.build_pipes[pid] - - worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") - - return True +try: + if "beef" in sys.argv[1]: + bb.utils.set_process_name("Worker (Fakeroot)") + else: + bb.utils.set_process_name("Worker") - def handle_finishnow(self, _): - if self.build_pids: - logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) - for k, v in iter(self.build_pids.items()): - try: - os.kill(-k, signal.SIGTERM) - os.waitpid(-1, 0) - except: - pass - for pipe in self.build_pipes: - self.build_pipes[pipe].read() + loop = asyncio.get_event_loop() -try: - worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) if not profiling: - worker.serve() + loop.run_until_complete(main()) else: profname = "profile-worker.log" prof = profile.Profile() try: - profile.Profile.runcall(prof, worker.serve) + profile.Profile.runcall(prof, loop.run_until_complete, main()) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) -except BaseException as e: +except Exception as e: + workerlog_write("%s\n%s\n" % (traceback.format_exc(), e)) if not normalexit: - import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) -finally: - worker_thread_exit = True - worker_thread.join() -workerlog_write("exiting") +workerlog_write("exiting\n") if not normalexit: sys.exit(1) sys.exit(0)
Switches bitbake-worker to use asyncio. This is a good canidate for initial modernization using asyncio because it is self-contained will a well defined interface to the bitbake server process. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/bin/bitbake-worker | 554 +++++++++++++++++++------------------ 1 file changed, 284 insertions(+), 270 deletions(-)