@@ -57,9 +57,16 @@ def testexport_main(d):
logger = logging.getLogger("BitBake")
+ target_kwargs = { }
+ target_kwargs['machine'] = d.getVar("MACHINE") or None
+ target_kwargs['serialcontrol_cmd'] = d.getVar("TEST_SERIALCONTROL_CMD") or None
+ target_kwargs['serialcontrol_extra_args'] = d.getVar("TEST_SERIALCONTROL_EXTRA_ARGS") or ""
+ target_kwargs['serialcontrol_ps1'] = d.getVar("TEST_SERIALCONTROL_PS1") or None
+ target_kwargs['serialcontrol_connect_timeout'] = d.getVar("TEST_SERIALCONTROL_CONNECT_TIMEOUT") or None
+
target = OERuntimeTestContextExecutor.getTarget(
d.getVar("TEST_TARGET"), None, d.getVar("TEST_TARGET_IP"),
- d.getVar("TEST_SERVER_IP"))
+ d.getVar("TEST_SERVER_IP"), **target_kwargs)
image_manifest = "%s.manifest" % image_name
image_packages = OERuntimeTestContextExecutor.readPackagesManifest(image_manifest)
@@ -239,6 +239,8 @@ def testimage_main(d):
bb.fatal('Unsupported image type built. Add a compatible image to '
'IMAGE_FSTYPES. Supported types: %s' %
', '.join(supported_fstypes))
+ elif d.getVar("TEST_TARGET") == "serial":
+ bb.fatal('Serial target is currently only supported in testexport.')
qfstype = fstypes[0]
qdeffstype = d.getVar("QB_DEFAULT_FSTYPE")
if qdeffstype:
@@ -429,7 +429,9 @@ TEST_SUITES[doc] = "An ordered list of tests (modules) to run against an image w
TEST_POWERCONTROL_CMD[doc] = "For automated hardware testing, specifies the command to use to control the power of the target machine under test"
TEST_POWERCONTROL_EXTRA_ARGS[doc] = "For automated hardware testing, specifies additional arguments to pass through to the command specified in TEST_POWERCONTROL_CMD"
TEST_SERIALCONTROL_CMD[doc] = "For automated hardware testing, specifies the command to use to connect to the serial console of the target machine under test"
+TEST_SERIALCONTROL_CONNECT_TIMEOUT[doc] = "For automated hardware testing, specifies the timeout in seconds for the initial connection to the target. Defaults to '10'."
TEST_SERIALCONTROL_EXTRA_ARGS[doc] = "For automated hardware testing, specifies additional arguments to pass through to the command specified in TEST_SERIALCONTROL_CMD"
+TEST_SERIALCONTROL_PS1[doc] = "For automated hardware testing, specifies a regex string representing an empty prompt on the target terminal. Example: 'root@target:.*#'. Defaults to 'root@${MACHINE}:.*#'."
TEST_TARGET[doc] = "For automated runtime testing, specifies the method of deploying the image and running tests on the target machine"
THISDIR[doc] = "The directory in which the file BitBake is currently parsing is located."
TIME[doc] = "The time the build was started using HMS format."
new file mode 100644
@@ -0,0 +1,315 @@
+#
+# SPDX-License-Identifier: MIT
+#
+
+import base64
+import logging
+import os
+from threading import Lock
+from . import OETarget
+
+class OESerialTarget(OETarget):
+
+ def __init__(self, logger, target_ip, server_ip, server_port=0,
+ timeout=300, serialcontrol_cmd=None, serialcontrol_extra_args=None,
+ serialcontrol_ps1=None, serialcontrol_connect_timeout=None,
+ machine=None, **kwargs):
+ if not logger:
+ logger = logging.getLogger('target')
+ logger.setLevel(logging.INFO)
+ filePath = os.path.join(os.getcwd(), 'remoteTarget.log')
+ fileHandler = logging.FileHandler(filePath, 'w', 'utf-8')
+ formatter = logging.Formatter(
+ '%(asctime)s.%(msecs)03d %(levelname)s: %(message)s',
+ '%H:%M:%S')
+ fileHandler.setFormatter(formatter)
+ logger.addHandler(fileHandler)
+
+ super(OESerialTarget, self).__init__(logger)
+
+ if serialcontrol_ps1:
+ self.target_ps1 = serialcontrol_ps1
+ elif machine:
+ # fallback to a default value which assumes root@machine
+ self.target_ps1 = f'root@{machine}:.*# '
+ else:
+ raise ValueError("Unable to determine shell command prompt (PS1) format.")
+
+ if not serialcontrol_cmd:
+ raise ValueError("Unable to determine serial control command.")
+
+ if serialcontrol_extra_args:
+ self.connection_script = f'{serialcontrol_cmd} {serialcontrol_extra_args}'
+ else:
+ self.connection_script = serialcontrol_cmd
+
+ if serialcontrol_connect_timeout:
+ self.connect_timeout = serialcontrol_connect_timeout
+ else:
+ self.connect_timeout = 10 # default to 10s connection timeout
+
+ self.default_command_timeout = timeout
+ self.ip = target_ip
+ self.server_ip = server_ip
+ self.server_port = server_port
+ self.conn = None
+ self.mutex = Lock()
+
+ def start(self, **kwargs):
+ pass
+
+ def stop(self, **kwargs):
+ pass
+
+ def get_connection(self):
+ if self.conn is None:
+ self.conn = SerialConnection(self.connection_script,
+ self.target_ps1,
+ self.connect_timeout,
+ self.default_command_timeout)
+
+ return self.conn
+
+ def run(self, cmd, timeout=None):
+ """
+ Runs command on target over the provided serial connection.
+ The first call will open the connection, and subsequent
+ calls will re-use the same connection to send new commands.
+
+ command: Command to run on target.
+ timeout: <value>: Kill command after <val> seconds.
+ None: Kill command default value seconds.
+ 0: No timeout, runs until return.
+ """
+ # Lock needed to avoid multiple threads running commands concurrently
+ # A serial connection can only be used by one caller at a time
+ with self.mutex:
+ conn = self.get_connection()
+
+ self.logger.debug(f"[Running]$ {cmd}")
+ # Run the command, then echo $? to get the command's return code
+ try:
+ output = conn.run_command(cmd, timeout)
+ status = conn.run_command("echo $?")
+ self.logger.debug(f" [stdout]: {output}")
+ self.logger.debug(f" [ret code]: {status}\n\n")
+ except SerialTimeoutException as e:
+ self.logger.debug(e)
+ output = ""
+ status = 255
+
+ # Return to $HOME after each command to simulate a stateless SSH connection
+ conn.run_command('cd "$HOME"')
+
+ return (int(status), output)
+
+ def copyTo(self, localSrc, remoteDst):
+ """
+ Copies files by converting them to base 32, then transferring
+ the ASCII text to the target, and decoding it in place on the
+ target.
+
+ On a 115k baud serial connection, this method transfers at
+ roughly 30kbps.
+ """
+ with open(localSrc, 'rb') as file:
+ data = file.read()
+
+ b32 = base64.b32encode(data).decode('utf-8')
+
+ # To avoid shell line limits, send a chunk at a time
+ SPLIT_LEN = 512
+ lines = [b32[i:i+SPLIT_LEN] for i in range(0, len(b32), SPLIT_LEN)]
+
+ with self.mutex:
+ conn = self.get_connection()
+
+ filename = os.path.basename(localSrc)
+ TEMP = f'/tmp/{filename}.b32'
+
+ # Create or empty out the temp file
+ conn.run_command(f'echo -n "" > {TEMP}')
+
+ for line in lines:
+ conn.run_command(f'echo -n {line} >> {TEMP}')
+
+ # Check to see whether the remoteDst is a directory
+ is_directory = conn.run_command(f'[[ -d {remoteDst} ]]; echo $?')
+ if int(is_directory) == 0:
+ # append the localSrc filename to the end of remoteDst
+ remoteDst = os.path.join(remoteDst, filename)
+
+ conn.run_command(f'base32 -d {TEMP} > {remoteDst}')
+ conn.run_command(f'rm {TEMP}')
+
+ return 0, 'Success'
+
+ def copyFrom(self, remoteSrc, localDst):
+ """
+ Copies files by converting them to base 32 on the target, then
+ transferring the ASCII text to the host. That text is then
+ decoded here and written out to the destination.
+
+ On a 115k baud serial connection, this method transfers at
+ roughly 30kbps.
+ """
+ with self.mutex:
+ b32 = self.get_connection().run_command(f'base32 {remoteSrc}')
+
+ data = base64.b32decode(b32.replace('\r\n', ''))
+
+ # If the local path is a directory, get the filename from
+ # the remoteSrc path and append it to localDst
+ if os.path.isdir(localDst):
+ filename = os.path.basename(remoteSrc)
+ localDst = os.path.join(localDst, filename)
+
+ with open(localDst, 'wb') as file:
+ file.write(data)
+
+ return 0, 'Success'
+
+ def copyDirTo(self, localSrc, remoteDst):
+ """
+ Copy recursively localSrc directory to remoteDst in target.
+ """
+
+ for root, dirs, files in os.walk(localSrc):
+ # Create directories in the target as needed
+ for d in dirs:
+ tmpDir = os.path.join(root, d).replace(localSrc, "")
+ newDir = os.path.join(remoteDst, tmpDir.lstrip("/"))
+ cmd = "mkdir -p %s" % newDir
+ self.run(cmd)
+
+ # Copy files into the target
+ for f in files:
+ tmpFile = os.path.join(root, f).replace(localSrc, "")
+ dstFile = os.path.join(remoteDst, tmpFile.lstrip("/"))
+ srcFile = os.path.join(root, f)
+ self.copyTo(srcFile, dstFile)
+
+ def deleteFiles(self, remotePath, files):
+ """
+ Deletes files in target's remotePath.
+ """
+
+ cmd = "rm"
+ if not isinstance(files, list):
+ files = [files]
+
+ for f in files:
+ cmd = "%s %s" % (cmd, os.path.join(remotePath, f))
+
+ self.run(cmd)
+
+ def deleteDir(self, remotePath):
+ """
+ Deletes target's remotePath directory.
+ """
+
+ cmd = "rmdir %s" % remotePath
+ self.run(cmd)
+
+ def deleteDirStructure(self, localPath, remotePath):
+ """
+ Delete recursively localPath structure directory in target's remotePath.
+
+ This function is useful to delete a package that is installed in the
+ device under test (DUT) and the host running the test has such package
+ extracted in tmp directory.
+
+ Example:
+ pwd: /home/user/tmp
+ tree: .
+ └── work
+ ├── dir1
+ │ └── file1
+ └── dir2
+
+ localpath = "/home/user/tmp" and remotepath = "/home/user"
+
+ With the above variables this function will try to delete the
+ directory in the DUT in this order:
+ /home/user/work/dir1/file1
+ /home/user/work/dir1 (if dir is empty)
+ /home/user/work/dir2 (if dir is empty)
+ /home/user/work (if dir is empty)
+ """
+
+ for root, dirs, files in os.walk(localPath, topdown=False):
+ # Delete files first
+ tmpDir = os.path.join(root).replace(localPath, "")
+ remoteDir = os.path.join(remotePath, tmpDir.lstrip("/"))
+ self.deleteFiles(remoteDir, files)
+
+ # Remove dirs if empty
+ for d in dirs:
+ tmpDir = os.path.join(root, d).replace(localPath, "")
+ remoteDir = os.path.join(remotePath, tmpDir.lstrip("/"))
+ self.deleteDir(remoteDir)
+
+class SerialTimeoutException(Exception):
+ def __init__(self, msg):
+ self.msg = msg
+ def __str__(self):
+ return self.msg
+
+class SerialConnection:
+
+ def __init__(self, script, target_prompt, connect_timeout, default_command_timeout):
+ import pexpect # limiting scope to avoid build dependency
+ self.prompt = target_prompt
+ self.connect_timeout = connect_timeout
+ self.default_command_timeout = default_command_timeout
+ self.conn = pexpect.spawn('/bin/bash', ['-c', script], encoding='utf8')
+ self._seek_to_clean_shell()
+ # Disable echo to avoid the need to parse the outgoing command
+ self.run_command('stty -echo')
+
+ def _seek_to_clean_shell(self):
+ """
+ Attempts to find a clean shell, meaning it is clear and
+ ready to accept a new command. This is necessary to ensure
+ the correct output is captured from each command.
+ """
+ import pexpect # limiting scope to avoid build dependency
+ # Look for a clean shell
+ # Wait a short amount of time for the connection to finish
+ pexpect_code = self.conn.expect([self.prompt, pexpect.TIMEOUT],
+ timeout=self.connect_timeout)
+
+ # if a timeout occurred, send an empty line and wait for a clean shell
+ if pexpect_code == 1:
+ # send a newline to clear and present the shell
+ self.conn.sendline("")
+ pexpect_code = self.conn.expect(self.prompt)
+
+ def run_command(self, cmd, timeout=None):
+ """
+ Runs command on target over the provided serial connection.
+ Returns any output on the shell while the command was run.
+
+ command: Command to run on target.
+ timeout: <value>: Kill command after <val> seconds.
+ None: Kill command default value seconds.
+ 0: No timeout, runs until return.
+ """
+ import pexpect # limiting scope to avoid build dependency
+ # Convert from the OETarget defaults to pexpect timeout values
+ if timeout is None:
+ timeout = self.default_command_timeout
+ elif timeout == 0:
+ timeout = None # passing None to pexpect is infinite timeout
+
+ self.conn.sendline(cmd)
+ pexpect_code = self.conn.expect([self.prompt, pexpect.TIMEOUT], timeout=timeout)
+
+ # check for timeout
+ if pexpect_code == 1:
+ self.conn.send('\003') # send Ctrl+C
+ self._seek_to_clean_shell()
+ raise SerialTimeoutException(f'Timeout executing: {cmd} after {timeout}s')
+
+ return self.conn.before.removesuffix('\r\n')
+
@@ -8,6 +8,7 @@ import os
import sys
from oeqa.core.context import OETestContext, OETestContextExecutor
+from oeqa.core.target.serial import OESerialTarget
from oeqa.core.target.ssh import OESSHTarget
from oeqa.core.target.qemu import OEQemuTarget
@@ -60,7 +61,7 @@ class OERuntimeTestContextExecutor(OETestContextExecutor):
runtime_group = self.parser.add_argument_group('runtime options')
runtime_group.add_argument('--target-type', action='store',
- default=self.default_target_type, choices=['simpleremote', 'qemu'],
+ default=self.default_target_type, choices=['simpleremote', 'qemu', 'serial'],
help="Target type of device under test, default: %s" \
% self.default_target_type)
runtime_group.add_argument('--target-ip', action='store',
@@ -108,6 +109,8 @@ class OERuntimeTestContextExecutor(OETestContextExecutor):
target = OESSHTarget(logger, target_ip, server_ip, **kwargs)
elif target_type == 'qemu':
target = OEQemuTarget(logger, server_ip, **kwargs)
+ elif target_type == 'serial':
+ target = OESerialTarget(logger, target_ip, server_ip, **kwargs)
else:
# XXX: This code uses the old naming convention for controllers and
# targets, the idea it is to leave just targets as the controller
@@ -203,8 +206,15 @@ class OERuntimeTestContextExecutor(OETestContextExecutor):
super(OERuntimeTestContextExecutor, self)._process_args(logger, args)
+ td = self.tc_kwargs['init']['td']
+
target_kwargs = {}
+ target_kwargs['machine'] = td.get("MACHINE") or None
target_kwargs['qemuboot'] = args.qemu_boot
+ target_kwargs['serialcontrol_cmd'] = td.get("TEST_SERIALCONTROL_CMD") or None
+ target_kwargs['serialcontrol_extra_args'] = td.get("TEST_SERIALCONTROL_EXTRA_ARGS") or ""
+ target_kwargs['serialcontrol_ps1'] = td.get("TEST_SERIALCONTROL_PS1") or None
+ target_kwargs['serialcontrol_connect_timeout'] = td.get("TEST_SERIALCONTROL_CONNECT_TIMEOUT") or None
self.tc_kwargs['init']['target'] = \
OERuntimeTestContextExecutor.getTarget(args.target_type,