new file mode 100644
new file mode 100644
@@ -0,0 +1,58 @@
+import json
+import pathlib
+import os
+
+
+def get_image_directory(machine=None):
+ """
+ Get the DEPLOY_DIR_IMAGE for the specified machine
+ (or the configured machine if not set).
+ """
+ try:
+ import bb.tinfoil
+ except ImportError as e:
+ raise RuntimeError("Cannot connect to BitBake, did you oe-init-build-env?") from e
+
+ if machine:
+ os.environ["MACHINE"] = machine
+
+ with bb.tinfoil.Tinfoil() as tinfoil:
+ tinfoil.prepare(config_only=True)
+ image_dir = tinfoil.config_data.getVar("DEPLOY_DIR_IMAGE")
+ return pathlib.Path(image_dir)
+
+def find(machine):
+ image_dir = get_image_directory(machine)
+ # All .fvpconf configuration files
+ configs = image_dir.glob("*.fvpconf")
+ # Just the files
+ configs = [p for p in configs if p.is_file() and not p.is_symlink()]
+ if not configs:
+ print(f"Cannot find any .fvpconf in {image_dir}")
+ raise RuntimeError()
+ # Sorted by modification time
+ configs = sorted(configs, key=lambda p: p.stat().st_mtime)
+ return configs[-1]
+
+
+def load(config_file):
+ with open(config_file) as f:
+ config = json.load(f)
+
+ # Ensure that all expected keys are present
+ def sanitise(key, value):
+ if key not in config or config[key] is None:
+ config[key] = value
+ sanitise("fvp-bindir", "")
+ sanitise("exe", "")
+ sanitise("parameters", {})
+ sanitise("data", {})
+ sanitise("applications", {})
+ sanitise("terminals", {})
+ sanitise("args", [])
+ sanitise("console", "")
+
+ if not config["exe"]:
+ raise ValueError("Required value FVP_EXE not set in machine configuration")
+
+ return config
new file mode 100644
@@ -0,0 +1,115 @@
+import asyncio
+import re
+import subprocess
+import os
+import shutil
+import sys
+
+from .terminal import terminals
+
+
+def cli_from_config(config, terminal_choice):
+ cli = []
+ if config["fvp-bindir"]:
+ cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
+ else:
+ cli.append(config["exe"])
+
+ for param, value in config["parameters"].items():
+ cli.extend(["--parameter", f"{param}={value}"])
+
+ for value in config["data"]:
+ cli.extend(["--data", value])
+
+ for param, value in config["applications"].items():
+ cli.extend(["--application", f"{param}={value}"])
+
+ for terminal, name in config["terminals"].items():
+ # If terminals are enabled and this terminal has been named
+ if terminal_choice != "none" and name:
+ # TODO if raw mode
+ # cli.extend(["--parameter", f"{terminal}.mode=raw"])
+ # TODO put name into terminal title
+ cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[terminal_choice].command}"])
+ else:
+ # Disable terminal
+ cli.extend(["--parameter", f"{terminal}.start_telnet=0"])
+
+ cli.extend(config["args"])
+
+ return cli
+
+def check_telnet():
+ # Check that telnet is present
+ if not bool(shutil.which("telnet")):
+ raise RuntimeError("Cannot find telnet, this is needed to connect to the FVP.")
+
+class FVPRunner:
+ def __init__(self, logger):
+ self._terminal_ports = {}
+ self._line_callbacks = []
+ self._logger = logger
+ self._fvp_process = None
+ self._telnets = []
+
+ def add_line_callback(self, callback):
+ self._line_callbacks.append(callback)
+
+ async def start(self, config, extra_args=[], terminal_choice="none"):
+ cli = cli_from_config(config, terminal_choice)
+ cli += extra_args
+ self._logger.debug(f"Constructed FVP call: {cli}")
+ self._fvp_process = await asyncio.create_subprocess_exec(*cli, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+ def detect_terminals(line):
+ m = re.match(r"^(\S+): Listening for serial connection on port (\d+)$", line)
+ if m:
+ terminal = m.group(1)
+ port = int(m.group(2))
+ self._terminal_ports[terminal] = port
+ self.add_line_callback(detect_terminals)
+
+ async def stop(self):
+ if self._fvp_process:
+ self._logger.debug(f"Killing FVP PID {self._fvp_process.pid}")
+ try:
+ self._fvp_process.terminate()
+ except ProcessLookupError:
+ pass
+
+ if await self._fvp_process.wait() != 0:
+ self._logger.info(f"FVP quit with code {self._fvp_process.returncode}")
+ return self._fvp_process.returncode
+ else:
+ return 0
+
+ for telnet in self._telnets:
+ await telnet.terminate()
+ await telnet.wait()
+
+ async def run(self, until=None):
+ if until and until():
+ return
+
+ async for line in self._fvp_process.stdout:
+ line = line.strip().decode("utf-8", errors="replace")
+ for callback in self._line_callbacks:
+ callback(line)
+ if until and until():
+ return
+
+ async def _get_terminal_port(self, terminal, timeout):
+ def terminal_exists():
+ return terminal in self._terminal_ports
+ await asyncio.wait_for(self.run(terminal_exists), timeout)
+ return self._terminal_ports[terminal]
+
+ async def create_telnet(self, terminal, timeout=15.0):
+ check_telnet()
+ port = await self._get_terminal_port(terminal, timeout)
+ telnet = await asyncio.create_subprocess_exec("telnet", "localhost", str(port), stdin=sys.stdin, stdout=sys.stdout)
+ self._telnets.append(telnet)
+ return telnet
+
+ def pid(self):
+ return self._fvp_process.pid
new file mode 100644
@@ -0,0 +1,59 @@
+import shutil
+import collections
+import pathlib
+import os
+
+from typing import List, Optional
+
+
+def get_config_dir() -> pathlib.Path:
+ value = os.environ.get("XDG_CONFIG_HOME")
+ if value and os.path.isabs(value):
+ return pathlib.Path(value)
+ else:
+ return pathlib.Path.home() / ".config"
+
+class Terminals:
+ Terminal = collections.namedtuple("Terminal", ["priority", "name", "command"])
+
+ def __init__(self):
+ self.terminals = []
+
+ def add_terminal(self, priority, name, command):
+ self.terminals.append(Terminals.Terminal(priority, name, command))
+ # Keep this list sorted by priority
+ self.terminals.sort(reverse=True, key=lambda t: t.priority)
+ self.name_map = {t.name: t for t in self.terminals}
+
+ def configured_terminal(self) -> Optional[str]:
+ import configparser
+
+ config = configparser.ConfigParser()
+ config.read(get_config_dir() / "runfvp.conf")
+ return config.get("RunFVP", "Terminal", fallback=None)
+
+ def preferred_terminal(self) -> str:
+ import shlex
+
+ preferred = self.configured_terminal()
+ if preferred:
+ return preferred
+
+ for t in self.terminals:
+ if t.command and shutil.which(shlex.split(t.command)[0]):
+ return t.name
+ return self.terminals[-1].name
+
+ def all_terminals(self) -> List[str]:
+ return self.name_map.keys()
+
+ def __getitem__(self, name: str):
+ return self.name_map[name]
+
+terminals = Terminals()
+# TODO: option to switch between telnet and netcat
+connect_command = "telnet localhost %port"
+terminals.add_terminal(2, "tmux", f"tmux new-window -n \"%title\" \"{connect_command}\""),
+terminals.add_terminal(2, "gnome-terminal", f"gnome-terminal --window --title \"%title\" --command \"{connect_command}\""),
+terminals.add_terminal(1, "xterm", f"xterm -title \"%title\" -e {connect_command}"),
+terminals.add_terminal(0, "none", None)
@@ -1,96 +1,23 @@
#! /usr/bin/env python3
import asyncio
-import collections
-import json
import os
-import re
-import shutil
+import pathlib
import signal
import sys
-import subprocess
-import pathlib
import logging
logger = logging.getLogger("RunFVP")
-from typing import List, Optional
-
-def get_config_dir() -> pathlib.Path:
- value = os.environ.get("XDG_CONFIG_HOME")
- if value and os.path.isabs(value):
- return pathlib.Path(value)
- else:
- return pathlib.Path.home() / ".config"
-
-class Terminals:
- Terminal = collections.namedtuple("Terminal", ["priority", "name", "command"])
-
- def __init__(self):
- self.terminals = []
-
- def add_terminal(self, priority, name, command):
- self.terminals.append(Terminals.Terminal(priority, name, command))
- # Keep this list sorted by priority
- self.terminals.sort(reverse=True, key=lambda t: t.priority)
- self.name_map = {t.name: t for t in self.terminals}
-
- def configured_terminal(self) -> Optional[str]:
- import configparser
-
- config = configparser.ConfigParser()
- config.read(get_config_dir() / "runfvp.conf")
- return config.get("RunFVP", "Terminal", fallback=None)
-
- def preferred_terminal(self) -> str:
- import shlex
-
- preferred = self.configured_terminal()
- if preferred:
- return preferred
-
- for t in self.terminals:
- if t.command and shutil.which(shlex.split(t.command)[0]):
- return t.name
- return self.terminals[-1].name
-
- def all_terminals(self) -> List[str]:
- return self.name_map.keys()
-
- def __getitem__(self, name: str):
- return self.name_map[name]
-
-terminals = Terminals()
-# TODO: option to switch between telnet and netcat
-connect_command = "telnet localhost %port"
-terminals.add_terminal(2, "tmux", f"tmux new-window -n \"%title\" \"{connect_command}\""),
-terminals.add_terminal(2, "gnome-terminal", f"gnome-terminal --window --title \"%title\" --command \"{connect_command}\""),
-terminals.add_terminal(1, "xterm", f"xterm -title \"%title\" -e {connect_command}"),
-terminals.add_terminal(0, "none", None)
-
-def get_image_directory(machine=None):
- """
- Get the DEPLOY_DIR_IMAGE for the specified machine
- (or the configured machine if not set).
- """
- try:
- import bb.tinfoil
- except ImportError:
- logger.error("Cannot connect to BitBake, did you oe-init-build-env?")
- sys.exit(1)
-
- if machine:
- os.environ["MACHINE"] = machine
-
- with bb.tinfoil.Tinfoil() as tinfoil:
- tinfoil.prepare(config_only=True)
- image_dir = tinfoil.config_data.getVar("DEPLOY_DIR_IMAGE")
- logger.debug(f"Got DEPLOY_DIR_IMAGE {image_dir}")
- return pathlib.Path(image_dir)
+# Add meta-arm/lib/ to path
+libdir = pathlib.Path(__file__).parents[1] / "meta-arm" / "lib"
+sys.path.insert(0, str(libdir))
+from fvp import terminal, runner, conffile
def parse_args(arguments):
import argparse
+ terminals = terminal.terminals
parser = argparse.ArgumentParser(description="Run images in a FVP")
parser.add_argument("config", nargs="?", help="Machine name or path to .fvpconf file")
@@ -120,148 +47,41 @@ def parse_args(arguments):
logger.debug(f"FVP arguments: {fvp_args}")
return args, fvp_args
-def find_config(args):
- if args.config and os.path.exists(args.config):
- return args.config
- else:
- image_dir = get_image_directory(args.config)
- # All .fvpconf configuration files
- configs = image_dir.glob("*.fvpconf")
- # Just the files
- configs = [p for p in configs if p.is_file() and not p.is_symlink()]
- if not configs:
- print(f"Cannot find any .fvpconf in {image_dir}")
- sys.exit(1)
- # Sorted by modification time
- configs = sorted(configs, key=lambda p: p.stat().st_mtime)
- return configs[-1]
-
-
-def load_config(config_file):
- logger.debug(f"Loading {config_file}")
- with open(config_file) as f:
- config = json.load(f)
-
- # Ensure that all expected keys are present
- def sanitise(key, value):
- if key not in config or config[key] is None:
- config[key] = value
- sanitise("fvp-bindir", "")
- sanitise("exe", "")
- sanitise("parameters", {})
- sanitise("data", {})
- sanitise("applications", {})
- sanitise("terminals", {})
- sanitise("args", [])
- sanitise("console", "")
-
- if not config["exe"]:
- logger.error("Required value FVP_EXE not set in machine configuration")
- sys.exit(1)
-
- return config
-def parse_config(args, config):
- cli = []
- if config["fvp-bindir"]:
- cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
- else:
- cli.append(config["exe"])
-
- for param, value in config["parameters"].items():
- cli.extend(["--parameter", f"{param}={value}"])
-
- for value in config["data"]:
- cli.extend(["--data", value])
-
- for param, value in config["applications"].items():
- cli.extend(["--application", f"{param}={value}"])
-
- for terminal, name in config["terminals"].items():
- # If terminals are enabled and this terminal has been named
- if args.terminals != "none" and name:
- # TODO if raw mode
- # cli.extend(["--parameter", f"{terminal}.mode=raw"])
- # TODO put name into terminal title
- cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[args.terminals].command}"])
- else:
- # Disable terminal
- cli.extend(["--parameter", f"{terminal}.start_telnet=0"])
-
- cli.extend(config["args"])
-
- return cli
-
-async def start_fvp(cli, console_cb):
+async def start_fvp(args, config, extra_args):
+ fvp = runner.FVPRunner(logger)
try:
- fvp_process = await asyncio.create_subprocess_exec(*cli, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-
- async for line in fvp_process.stdout:
- line = line.strip().decode("utf-8", errors="replace")
- if console_cb:
- logger.debug(f"FVP output: {line}")
- else:
- print(line)
+ await fvp.start(config, extra_args, args.terminals)
+
+ if args.console:
+ fvp.add_line_callback(lambda line: logger.debug(f"FVP output: {line}"))
+ expected_terminal = config["console"]
+ if not expected_terminal:
+ logger.error("--console used but FVP_CONSOLE not set in machine configuration")
+ return 1
+ telnet = await fvp.create_telnet(expected_terminal)
+ await telnet.wait()
+ logger.debug(f"Telnet quit, cancelling tasks")
+ else:
+ fvp.add_line_callback(lambda line: print(line))
- # Look for serial connections opening
- if console_cb:
- m = re.match(r"^(\S+): Listening for serial connection on port (\d+)$", line)
- if m:
- terminal = m.group(1)
- port = int(m.group(2))
- logger.debug(f"Console for {terminal} started on port {port}")
- # When we can assume Py3.7+, this can be create_task
- asyncio.ensure_future(console_cb(terminal, port))
+ await fvp.run()
finally:
- # If we get cancelled or throw an exception, kill the FVP
- logger.debug(f"Killing FVP PID {fvp_process.pid}")
- try:
- fvp_process.terminate()
- except ProcessLookupError:
- pass
-
- if await fvp_process.wait() != 0:
- logger.info(f"{cli[0]} quit with code {fvp_process.returncode}")
- return fvp_process.returncode
- else:
- return 0
+ await fvp.stop()
def runfvp(cli_args):
- args, fvp_args = parse_args(cli_args)
- config_file = find_config(args)
- config = load_config(config_file)
- cli = parse_config(args, config)
- cli.extend(fvp_args)
- logger.debug(f"Constructed FVP call: {cli}")
-
- # Check that telnet is present
- if not bool(shutil.which("telnet")):
- logger.error("Cannot find telnet, this is needed to connect to the FVP.")
- return 1
-
- if args.console:
- expected_terminal = config["console"]
- if not expected_terminal:
- logger.error("--console used but FVP_CONSOLE not set in machine configuration")
- return 1
+ args, extra_args = parse_args(cli_args)
+ if args.config and pathlib.Path(args.config).exists():
+ config_file = args.config
else:
- expected_terminal = None
-
- async def console_started(name, port):
- if name == expected_terminal:
- telnet = await asyncio.create_subprocess_exec("telnet", "localhost", str(port), stdin=sys.stdin, stdout=sys.stdout)
- await telnet.wait()
- logger.debug(f"Telnet quit, cancelling tasks")
- # TODO: this is 3.7+
- for t in asyncio.all_tasks():
- logger.debug(f"Cancelling {t}")
- t.cancel()
+ config_file = conffile.find(args.config)
+ logger.debug(f"Loading {config_file}")
+ config = conffile.load(config_file)
try:
# When we can assume Py3.7+, this can simply be asyncio.run()
loop = asyncio.get_event_loop()
- console_cb = expected_terminal and console_started or None
- return loop.run_until_complete(start_fvp(cli, console_cb=console_cb))
+ return loop.run_until_complete(start_fvp(args, config, extra_args))
except asyncio.CancelledError:
# This means telnet exited, which isn't an error
return 0