Initial commit

This commit is contained in:
Your Name
2026-04-23 17:07:55 +08:00
commit b7e39e063b
16725 changed files with 1625565 additions and 0 deletions
+76
View File
@@ -0,0 +1,76 @@
= OEQA (v2) Framework =
== Introduction ==
This is version 2 of the OEQA framework. Base clases are located in the
'oeqa/core' directory and subsequent components must extend from these.
The main design consideration was to implement the needed functionality on
top of the Python unittest framework. To achieve this goal, the following
modules are used:
* oeqa/core/runner.py: Provides OETestResult and OETestRunner base
classes extending the unittest class. These classes support exporting
results to different formats; currently RAW and XML support exist.
* oeqa/core/loader.py: Provides OETestLoader extending the unittest class.
It also features a unified implementation of decorator support and
filtering test cases.
* oeqa/core/case.py: Provides OETestCase base class extending
unittest.TestCase and provides access to the Test data (td), Test context
and Logger functionality.
* oeqa/core/decorator: Provides OETestDecorator, a new class to implement
decorators for Test cases.
* oeqa/core/context: Provides OETestContext, a high-level API for
loadTests and runTests of certain Test component and
OETestContextExecutor a base class to enable oe-test to discover/use
the Test component.
Also, a new 'oe-test' runner is located under 'scripts', allowing scans for components
that supports OETestContextExecutor (see below).
== Terminology ==
* Test component: The area of testing in the Project, for example: runtime, SDK, eSDK, selftest.
* Test data: Data associated with the Test component. Currently we use bitbake datastore as
a Test data input.
* Test context: A context of what tests needs to be run and how to do it; this additionally
provides access to the Test data and could have custom methods and/or attrs.
== oe-test ==
The new tool, oe-test, has the ability to scan the code base for test components and provide
a unified way to run test cases. Internally it scans folders inside oeqa module in order to find
specific classes that implement a test component.
== Usage ==
Executing the example test component
$ source oe-init-build-env
$ oe-test core
Getting help
$ oe-test -h
== Creating new Test Component ==
Adding a new test component the developer needs to extend OETestContext/OETestContextExecutor
(from context.py) and OETestCase (from case.py)
== Selftesting the framework ==
Run all tests:
$ PATH=$PATH:../../ python3 -m unittest discover -s tests
Run some test:
$ cd tests/
$ ./test_data.py
View File
+105
View File
@@ -0,0 +1,105 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import base64
import zlib
import unittest
from oeqa.core.exception import OEQAMissingVariable
def _validate_td_vars(td, td_vars, type_msg):
if td_vars:
for v in td_vars:
if not v in td:
raise OEQAMissingVariable("Test %s need %s variable but"\
" isn't into td" % (type_msg, v))
class OETestCase(unittest.TestCase):
# TestContext and Logger instance set by OETestLoader.
tc = None
logger = None
# td has all the variables needed by the test cases
# is the same across all the test cases.
td = None
# td_vars has the variables needed by a test class
# or test case instance, if some var isn't into td a
# OEQAMissingVariable exception is raised
td_vars = None
@classmethod
def _oeSetUpClass(clss):
_validate_td_vars(clss.td, clss.td_vars, "class")
if hasattr(clss, 'setUpHooker') and callable(getattr(clss, 'setUpHooker')):
clss.setUpHooker()
clss.setUpClassMethod()
@classmethod
def _oeTearDownClass(clss):
clss.tearDownClassMethod()
def _oeSetUp(self):
try:
for d in self.decorators:
d.setUpDecorator()
except:
for d in self.decorators:
d.tearDownDecorator()
raise
self.setUpMethod()
def _oeTearDown(self):
for d in self.decorators:
d.tearDownDecorator()
self.tearDownMethod()
class OEPTestResultTestCase:
"""
Mix-in class to provide functions to make interacting with extraresults for
the purposes of storing ptestresult data.
"""
@staticmethod
def _compress_log(log):
logdata = log.encode("utf-8") if isinstance(log, str) else log
logdata = zlib.compress(logdata)
logdata = base64.b64encode(logdata).decode("utf-8")
return {"compressed" : logdata}
def ptest_rawlog(self, log):
if not hasattr(self, "extraresults"):
self.extraresults = {"ptestresult.sections" : {}}
self.extraresults["ptestresult.rawlogs"] = {"log" : self._compress_log(log)}
def ptest_section(self, section, duration = None, log = None, logfile = None, exitcode = None):
if not hasattr(self, "extraresults"):
self.extraresults = {"ptestresult.sections" : {}}
sections = self.extraresults.get("ptestresult.sections")
if section not in sections:
sections[section] = {}
if log is not None:
sections[section]["log"] = self._compress_log(log)
elif logfile is not None:
with open(logfile, "rb") as f:
sections[section]["log"] = self._compress_log(f.read())
if duration is not None:
sections[section]["duration"] = duration
if exitcode is not None:
sections[section]["exitcode"] = exitcode
def ptest_result(self, section, test, result):
if not hasattr(self, "extraresults"):
self.extraresults = {"ptestresult.sections" : {}}
sections = self.extraresults.get("ptestresult.sections")
if section not in sections:
sections[section] = {}
resultname = "ptestresult.{}.{}".format(section, test)
self.extraresults[resultname] = {"status" : result}
@@ -0,0 +1 @@
{"ARCH": "x86", "IMAGE": "core-image-minimal"}
@@ -0,0 +1,22 @@
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from oeqa.core.case import OETestCase
from oeqa.core.decorator.depends import OETestDepends
class OETestExample(OETestCase):
def test_example(self):
self.logger.info('IMAGE: %s' % self.td.get('IMAGE'))
self.assertEqual('core-image-minimal', self.td.get('IMAGE'))
self.logger.info('ARCH: %s' % self.td.get('ARCH'))
self.assertEqual('x86', self.td.get('ARCH'))
class OETestExampleDepend(OETestCase):
@OETestDepends(['OETestExample.test_example'])
def test_example_depends(self):
pass
def test_example_no_depends(self):
pass
+246
View File
@@ -0,0 +1,246 @@
## Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import os
import sys
import json
import time
import logging
import collections
import unittest
from oeqa.core.loader import OETestLoader
from oeqa.core.runner import OETestRunner
from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound
class OETestContext(object):
loaderClass = OETestLoader
runnerClass = OETestRunner
files_dir = os.path.abspath(os.path.join(os.path.dirname(
os.path.abspath(__file__)), "../files"))
def __init__(self, td=None, logger=None):
if not type(td) is dict:
raise TypeError("td isn't dictionary type")
self.td = td
self.logger = logger
self._registry = {}
self._registry['cases'] = collections.OrderedDict()
self.results = unittest.TestResult()
unittest.registerResult(self.results)
def _read_modules_from_manifest(self, manifest):
if not os.path.exists(manifest):
raise OEQAMissingManifest("Manifest does not exist on %s" % manifest)
modules = []
for line in open(manifest).readlines():
line = line.strip()
if line and not line.startswith("#"):
modules.append(line)
return modules
def skipTests(self, skips):
if not skips:
return
def skipfuncgen(skipmsg):
def func():
raise unittest.SkipTest(skipmsg)
return func
class_ids = {}
for test in self.suites:
if test.__class__ not in class_ids:
class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1])
for skip in skips:
if (test.id()+'.').startswith(skip+'.'):
setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip))
for tclass in class_ids:
cid = class_ids[tclass]
for skip in skips:
if (cid + '.').startswith(skip + '.'):
setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip))
def loadTests(self, module_paths, modules=[], tests=[],
modules_manifest="", modules_required=[], **kwargs):
if modules_manifest:
modules = self._read_modules_from_manifest(modules_manifest)
self.loader = self.loaderClass(self, module_paths, modules, tests,
modules_required, **kwargs)
self.suites = self.loader.discover()
def prepareSuite(self, suites, processes):
return suites
def runTests(self, processes=None, skips=[]):
self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
# Dynamically skip those tests specified though arguments
self.skipTests(skips)
self._run_start_time = time.time()
self._run_end_time = self._run_start_time
if not processes:
self.runner.buffer = True
result = self.runner.run(self.prepareSuite(self.suites, processes))
self._run_end_time = time.time()
return result
def listTests(self, display_type):
self.runner = self.runnerClass(self, verbosity=2)
return self.runner.list_tests(self.suites, display_type)
class OETestContextExecutor(object):
_context_class = OETestContext
_script_executor = 'oe-test'
name = 'core'
help = 'core test component example'
description = 'executes core test suite example'
datetime = time.strftime("%Y%m%d%H%M%S")
default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
'cases/example')]
default_test_data = os.path.join(default_cases[0], 'data.json')
default_tests = None
default_json_result_dir = None
def register_commands(self, logger, subparsers):
self.parser = subparsers.add_parser(self.name, help=self.help,
description=self.description, group='components')
self.default_output_log = '%s-results-%s.log' % (self.name, self.datetime)
self.parser.add_argument('--output-log', action='store',
default=self.default_output_log,
help="results output log, default: %s" % self.default_output_log)
self.parser.add_argument('--json-result-dir', action='store',
default=self.default_json_result_dir,
help="json result output dir, default: %s" % self.default_json_result_dir)
group = self.parser.add_mutually_exclusive_group()
group.add_argument('--run-tests', action='store', nargs='+',
default=self.default_tests,
help="tests to run in <module>[.<class>[.<name>]]")
group.add_argument('--list-tests', action='store',
choices=('module', 'class', 'name'),
help="lists available tests")
if self.default_test_data:
self.parser.add_argument('--test-data-file', action='store',
default=self.default_test_data,
help="data file to load, default: %s" % self.default_test_data)
else:
self.parser.add_argument('--test-data-file', action='store',
help="data file to load")
if self.default_cases:
self.parser.add_argument('CASES_PATHS', action='store',
default=self.default_cases, nargs='*',
help="paths to directories with test cases, default: %s"\
% self.default_cases)
else:
self.parser.add_argument('CASES_PATHS', action='store',
nargs='+', help="paths to directories with test cases")
self.parser.set_defaults(func=self.run)
def _setup_logger(self, logger, args):
formatter = logging.Formatter('%(asctime)s - ' + self.name + \
' - %(levelname)s - %(message)s')
sh = logger.handlers[0]
sh.setFormatter(formatter)
fh = logging.FileHandler(args.output_log)
fh.setFormatter(formatter)
logger.addHandler(fh)
if getattr(args, 'verbose', False):
logger.setLevel('DEBUG')
return logger
def _process_args(self, logger, args):
self.tc_kwargs = {}
self.tc_kwargs['init'] = {}
self.tc_kwargs['load'] = {}
self.tc_kwargs['list'] = {}
self.tc_kwargs['run'] = {}
self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
if args.test_data_file:
self.tc_kwargs['init']['td'] = json.load(
open(args.test_data_file, "r"))
else:
self.tc_kwargs['init']['td'] = {}
if args.run_tests:
self.tc_kwargs['load']['modules'] = args.run_tests
self.tc_kwargs['load']['modules_required'] = args.run_tests
else:
self.tc_kwargs['load']['modules'] = []
self.tc_kwargs['run']['skips'] = []
self.module_paths = args.CASES_PATHS
def _get_json_result_dir(self, args):
return args.json_result_dir
def _get_configuration(self):
td = self.tc_kwargs['init']['td']
configuration = {'TEST_TYPE': self.name,
'MACHINE': td.get("MACHINE"),
'DISTRO': td.get("DISTRO"),
'IMAGE_BASENAME': td.get("IMAGE_BASENAME"),
'DATETIME': td.get("DATETIME")}
return configuration
def _get_result_id(self, configuration):
return '%s_%s_%s_%s' % (configuration['TEST_TYPE'], configuration['IMAGE_BASENAME'],
configuration['MACHINE'], self.datetime)
def _pre_run(self):
pass
def run(self, logger, args):
self._process_args(logger, args)
self.tc = self._context_class(**self.tc_kwargs['init'])
try:
self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
except OEQATestNotFound as ex:
logger.error(ex)
sys.exit(1)
if args.list_tests:
rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list'])
else:
self._pre_run()
rc = self.tc.runTests(**self.tc_kwargs['run'])
json_result_dir = self._get_json_result_dir(args)
if json_result_dir:
configuration = self._get_configuration()
rc.logDetails(json_result_dir,
configuration,
self._get_result_id(configuration))
else:
rc.logDetails()
rc.logSummary(self.name)
output_link = os.path.join(os.path.dirname(args.output_log),
"%s-results.log" % self.name)
if os.path.exists(output_link):
os.remove(output_link)
os.symlink(args.output_log, output_link)
return rc
_executor_class = OETestContextExecutor
@@ -0,0 +1,74 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from functools import wraps
from abc import ABCMeta
decoratorClasses = set()
def registerDecorator(cls):
decoratorClasses.add(cls)
return cls
class OETestDecorator(object, metaclass=ABCMeta):
case = None # Reference of OETestCase decorated
attrs = None # Attributes to be loaded by decorator implementation
def __init__(self, *args, **kwargs):
if not self.attrs:
return
for idx, attr in enumerate(self.attrs):
if attr in kwargs:
value = kwargs[attr]
else:
value = args[idx]
setattr(self, attr, value)
def __call__(self, func):
@wraps(func)
def wrapped_f(*args, **kwargs):
self.attrs = self.attrs # XXX: Enables OETestLoader discover
return func(*args, **kwargs)
return wrapped_f
# OETestLoader call it when is loading test cases.
# XXX: Most methods would change the registry for later
# processing; be aware that filtrate method needs to
# run later than bind, so there could be data (in the
# registry) of a cases that were filtered.
def bind(self, registry, case):
self.case = case
self.logger = case.tc.logger
self.case.decorators.append(self)
# OETestRunner call this method when tries to run
# the test case.
def setUpDecorator(self):
pass
# OETestRunner call it after a test method has been
# called even if the method raised an exception.
def tearDownDecorator(self):
pass
class OETestDiscover(OETestDecorator):
# OETestLoader call it after discover test cases
# needs to return the cases to be run.
@staticmethod
def discover(registry):
return registry['cases']
def OETestTag(*tags):
def decorator(item):
if hasattr(item, "__oeqa_testtags"):
# do not append, create a new list (to handle classes with inheritance)
item.__oeqa_testtags = list(item.__oeqa_testtags) + list(tags)
else:
item.__oeqa_testtags = tags
return item
return decorator
+220
View File
@@ -0,0 +1,220 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from oeqa.core.exception import OEQAMissingVariable
from . import OETestDecorator, registerDecorator
def has_feature(td, feature):
"""
Checks for feature in DISTRO_FEATURES or IMAGE_FEATURES.
"""
if (feature in td.get('DISTRO_FEATURES', '').split() or
feature in td.get('IMAGE_FEATURES', '').split()):
return True
return False
def has_machine(td, machine):
"""
Checks for MACHINE.
"""
if (machine == td.get('MACHINE', '')):
return True
return False
@registerDecorator
class skipIfDataVar(OETestDecorator):
"""
Skip test based on value of a data store's variable.
It will get the info of var from the data store and will
check it against value; if are equal it will skip the test
with msg as the reason.
"""
attrs = ('var', 'value', 'msg')
def setUpDecorator(self):
msg = ('Checking if %r value is %r to skip test' %
(self.var, self.value))
self.logger.debug(msg)
if self.case.td.get(self.var) == self.value:
self.case.skipTest(self.msg)
@registerDecorator
class skipIfNotDataVar(OETestDecorator):
"""
Skip test based on value of a data store's variable.
It will get the info of var from the data store and will
check it against value; if are not equal it will skip the
test with msg as the reason.
"""
attrs = ('var', 'value', 'msg')
def setUpDecorator(self):
msg = ('Checking if %r value is not %r to skip test' %
(self.var, self.value))
self.logger.debug(msg)
if not self.case.td.get(self.var) == self.value:
self.case.skipTest(self.msg)
@registerDecorator
class skipIfInDataVar(OETestDecorator):
"""
Skip test if value is in data store's variable.
"""
attrs = ('var', 'value', 'msg')
def setUpDecorator(self):
msg = ('Checking if %r value contains %r to skip '
'the test' % (self.var, self.value))
self.logger.debug(msg)
if self.value in (self.case.td.get(self.var)):
self.case.skipTest(self.msg)
@registerDecorator
class skipIfNotInDataVar(OETestDecorator):
"""
Skip test if value is not in data store's variable.
"""
attrs = ('var', 'value', 'msg')
def setUpDecorator(self):
msg = ('Checking if %r value contains %r to run '
'the test' % (self.var, self.value))
self.logger.debug(msg)
if not self.value in (self.case.td.get(self.var) or ""):
self.case.skipTest(self.msg)
@registerDecorator
class OETestDataDepends(OETestDecorator):
attrs = ('td_depends',)
def setUpDecorator(self):
for v in self.td_depends:
try:
value = self.case.td[v]
except KeyError:
raise OEQAMissingVariable("Test case need %s variable but"\
" isn't into td" % v)
@registerDecorator
class skipIfNotFeature(OETestDecorator):
"""
Skip test based on DISTRO_FEATURES.
value must be in distro features or it will skip the test
with msg as the reason.
"""
attrs = ('value', 'msg')
def setUpDecorator(self):
msg = ('Checking if %s is in DISTRO_FEATURES '
'or IMAGE_FEATURES' % (self.value))
self.logger.debug(msg)
if not has_feature(self.case.td, self.value):
self.case.skipTest(self.msg)
@registerDecorator
class skipIfFeature(OETestDecorator):
"""
Skip test based on DISTRO_FEATURES.
value must not be in distro features or it will skip the test
with msg as the reason.
"""
attrs = ('value', 'msg')
def setUpDecorator(self):
msg = ('Checking if %s is not in DISTRO_FEATURES '
'or IMAGE_FEATURES' % (self.value))
self.logger.debug(msg)
if has_feature(self.case.td, self.value):
self.case.skipTest(self.msg)
@registerDecorator
class skipIfNotMachine(OETestDecorator):
"""
Skip test based on MACHINE.
value must be match MACHINE or it will skip the test
with msg as the reason.
"""
attrs = ('value', 'msg')
def setUpDecorator(self):
msg = ('Checking if %s is not this MACHINE' % self.value)
self.logger.debug(msg)
if not has_machine(self.case.td, self.value):
self.case.skipTest(self.msg)
@registerDecorator
class skipIfMachine(OETestDecorator):
"""
Skip test based on Machine.
value must not be this machine or it will skip the test
with msg as the reason.
"""
attrs = ('value', 'msg')
def setUpDecorator(self):
msg = ('Checking if %s is this MACHINE' % self.value)
self.logger.debug(msg)
if has_machine(self.case.td, self.value):
self.case.skipTest(self.msg)
@registerDecorator
class skipIfNotQemu(OETestDecorator):
"""
Skip test if MACHINE is not qemu*
"""
def setUpDecorator(self):
self.logger.debug("Checking if not qemu MACHINE")
if not self.case.td.get('MACHINE', '').startswith('qemu'):
self.case.skipTest('Test only runs on qemu machines')
@registerDecorator
class skipIfQemu(OETestDecorator):
"""
Skip test if MACHINE is qemu*
"""
def setUpDecorator(self):
self.logger.debug("Checking if qemu MACHINE")
if self.case.td.get('MACHINE', '').startswith('qemu'):
self.case.skipTest('Test only runs on real hardware')
@registerDecorator
class skipIfArch(OETestDecorator):
"""
Skip test if HOST_ARCH is present in the tuple specified.
"""
attrs = ('archs',)
def setUpDecorator(self):
arch = self.case.td['HOST_ARCH']
if arch in self.archs:
self.case.skipTest('Test skipped on %s' % arch)
@registerDecorator
class skipIfNotArch(OETestDecorator):
"""
Skip test if HOST_ARCH is not present in the tuple specified.
"""
attrs = ('archs',)
def setUpDecorator(self):
arch = self.case.td['HOST_ARCH']
if arch not in self.archs:
self.case.skipTest('Test skipped on %s' % arch)
@@ -0,0 +1,98 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from unittest import SkipTest
from oeqa.core.exception import OEQADependency
from . import OETestDiscover, registerDecorator
def _add_depends(registry, case, depends):
module_name = case.__module__
class_name = case.__class__.__name__
case_id = case.id()
for depend in depends:
dparts = depend.split('.')
if len(dparts) == 1:
depend_id = ".".join((module_name, class_name, dparts[0]))
elif len(dparts) == 2:
depend_id = ".".join((module_name, dparts[0], dparts[1]))
else:
depend_id = depend
if not case_id in registry:
registry[case_id] = []
if not depend_id in registry[case_id]:
registry[case_id].append(depend_id)
def _validate_test_case_depends(cases, depends):
for case in depends:
if not case in cases:
continue
for dep in depends[case]:
if not dep in cases:
raise OEQADependency("TestCase %s depends on %s and isn't available"\
", cases available %s." % (case, dep, str(cases.keys())))
def _order_test_case_by_depends(cases, depends):
def _dep_resolve(graph, node, resolved, seen):
seen.append(node)
for edge in graph[node]:
if edge not in resolved:
if edge in seen:
raise OEQADependency("Test cases %s and %s have a circular" \
" dependency." % (node, edge))
_dep_resolve(graph, edge, resolved, seen)
resolved.append(node)
dep_graph = {}
dep_graph['__root__'] = cases.keys()
for case in cases:
if case in depends:
dep_graph[case] = depends[case]
else:
dep_graph[case] = []
cases_ordered = []
_dep_resolve(dep_graph, '__root__', cases_ordered, [])
cases_ordered.remove('__root__')
return [cases[case_id] for case_id in cases_ordered]
def _skipTestDependency(case, depends):
for dep in depends:
found = False
for test, _ in case.tc.results.successes:
if test.id() == dep:
found = True
break
if not found:
raise SkipTest("Test case %s depends on %s but it didn't pass/run." \
% (case.id(), dep))
@registerDecorator
class OETestDepends(OETestDiscover):
attrs = ('depends',)
def bind(self, registry, case):
super(OETestDepends, self).bind(registry, case)
if not registry.get('depends'):
registry['depends'] = {}
_add_depends(registry['depends'], case, self.depends)
@staticmethod
def discover(registry):
if registry.get('depends'):
_validate_test_case_depends(registry['cases'], registry['depends'])
return _order_test_case_by_depends(registry['cases'], registry['depends'])
else:
return [registry['cases'][case_id] for case_id in registry['cases']]
def setUpDecorator(self):
_skipTestDependency(self.case, self.depends)
@@ -0,0 +1,29 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import signal
from . import OETestDecorator, registerDecorator
from oeqa.core.exception import OEQATimeoutError
@registerDecorator
class OETimeout(OETestDecorator):
attrs = ('oetimeout',)
def setUpDecorator(self):
timeout = self.oetimeout
def _timeoutHandler(signum, frame):
raise OEQATimeoutError("Timed out after %s "
"seconds of execution" % timeout)
self.logger.debug("Setting up a %d second(s) timeout" % self.oetimeout)
self.alarmSignal = signal.signal(signal.SIGALRM, _timeoutHandler)
signal.alarm(self.oetimeout)
def tearDownDecorator(self):
signal.alarm(0)
if hasattr(self, 'alarmSignal'):
signal.signal(signal.SIGALRM, self.alarmSignal)
self.logger.debug("Removed SIGALRM handler")
+26
View File
@@ -0,0 +1,26 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
class OEQAException(Exception):
pass
class OEQATimeoutError(OEQAException):
pass
class OEQAMissingVariable(OEQAException):
pass
class OEQADependency(OEQAException):
pass
class OEQAMissingManifest(OEQAException):
pass
class OEQAPreRun(OEQAException):
pass
class OEQATestNotFound(OEQAException):
pass
+350
View File
@@ -0,0 +1,350 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import os
import re
import sys
import unittest
import inspect
from oeqa.core.utils.path import findFile
from oeqa.core.utils.test import getSuiteModules, getCaseID
from oeqa.core.exception import OEQATestNotFound
from oeqa.core.case import OETestCase
from oeqa.core.decorator import decoratorClasses, OETestDecorator, \
OETestDiscover
# When loading tests, the unittest framework stores any exceptions and
# displays them only when the run method is called.
#
# For our purposes, it is better to raise the exceptions in the loading
# step rather than waiting to run the test suite.
#
# Generate the function definition because this differ across python versions
# Python >= 3.4.4 uses tree parameters instead four but for example Python 3.5.3
# ueses four parameters so isn't incremental.
_failed_test_args = inspect.getfullargspec(unittest.loader._make_failed_test).args
exec("""def _make_failed_test(%s): raise exception""" % ', '.join(_failed_test_args))
unittest.loader._make_failed_test = _make_failed_test
def _find_duplicated_modules(suite, directory):
for module in getSuiteModules(suite):
path = findFile('%s.py' % module, directory)
if path:
raise ImportError("Duplicated %s module found in %s" % (module, path))
def _built_modules_dict(modules, logger):
modules_dict = {}
if modules == None:
return modules_dict
for module in modules:
# Assumption: package and module names do not contain upper case
# characters, whereas class names do
m = re.match(r'^([0-9a-z_.]+)(?:\.(\w[^.]*)(?:\.([^.]+))?)?$', module, flags=re.ASCII)
if not m:
logger.warn("module '%s' was skipped from selected modules, "\
"because it doesn't match with module name assumptions: "\
"package and module names do not contain upper case characters, whereas class names do" % module)
continue
module_name, class_name, test_name = m.groups()
if module_name and module_name not in modules_dict:
modules_dict[module_name] = {}
if class_name and class_name not in modules_dict[module_name]:
modules_dict[module_name][class_name] = []
if test_name and test_name not in modules_dict[module_name][class_name]:
modules_dict[module_name][class_name].append(test_name)
if modules and not modules_dict:
raise OEQATestNotFound("All selected modules were skipped, this would trigger selftest with all tests and -r ignored.")
return modules_dict
class OETestLoader(unittest.TestLoader):
caseClass = OETestCase
kwargs_names = ['testMethodPrefix', 'sortTestMethodUsing', 'suiteClass',
'_top_level_dir']
def __init__(self, tc, module_paths, modules, tests, modules_required,
*args, **kwargs):
self.tc = tc
self.modules = _built_modules_dict(modules, tc.logger)
self.tests = tests
self.modules_required = modules_required
self.tags_filter = kwargs.get("tags_filter", None)
if isinstance(module_paths, str):
module_paths = [module_paths]
elif not isinstance(module_paths, list):
raise TypeError('module_paths must be a str or a list of str')
self.module_paths = module_paths
for kwname in self.kwargs_names:
if kwname in kwargs:
setattr(self, kwname, kwargs[kwname])
self._patchCaseClass(self.caseClass)
super(OETestLoader, self).__init__()
def _patchCaseClass(self, testCaseClass):
# Adds custom attributes to the OETestCase class
setattr(testCaseClass, 'tc', self.tc)
setattr(testCaseClass, 'td', self.tc.td)
setattr(testCaseClass, 'logger', self.tc.logger)
def _registerTestCase(self, case):
case_id = case.id()
self.tc._registry['cases'][case_id] = case
def _handleTestCaseDecorators(self, case):
def _handle(obj):
if isinstance(obj, OETestDecorator):
if not obj.__class__ in decoratorClasses:
raise Exception("Decorator %s isn't registered" \
" in decoratorClasses." % obj.__name__)
obj.bind(self.tc._registry, case)
def _walk_closure(obj):
if hasattr(obj, '__closure__') and obj.__closure__:
for f in obj.__closure__:
obj = f.cell_contents
_handle(obj)
_walk_closure(obj)
method = getattr(case, case._testMethodName, None)
_walk_closure(method)
def _filterTest(self, case):
"""
Returns True if test case must be filtered, False otherwise.
"""
# XXX; If the module has more than one namespace only use
# the first to support run the whole module specifying the
# <module_name>.[test_class].[test_name]
module_name_small = case.__module__.split('.')[0]
module_name = case.__module__
class_name = case.__class__.__name__
test_name = case._testMethodName
# 'auto' is a reserved key word to run test cases automatically
# warn users if their test case belong to a module named 'auto'
if module_name_small == "auto":
bb.warn("'auto' is a reserved key word for TEST_SUITES. "
"But test case '%s' is detected to belong to auto module. "
"Please condier using a new name for your module." % str(case))
# check if case belongs to any specified module
# if 'auto' is specified, such check is skipped
if self.modules and not 'auto' in self.modules:
module = None
try:
module = self.modules[module_name_small]
except KeyError:
try:
module = self.modules[module_name]
except KeyError:
return True
if module:
if not class_name in module:
return True
if module[class_name]:
if test_name not in module[class_name]:
return True
# Decorator filters
if self.tags_filter is not None and callable(self.tags_filter):
alltags = set()
# pull tags from the case class
if hasattr(case, "__oeqa_testtags"):
for t in getattr(case, "__oeqa_testtags"):
alltags.add(t)
# pull tags from the method itself
if hasattr(case, test_name):
method = getattr(case, test_name)
if hasattr(method, "__oeqa_testtags"):
for t in getattr(method, "__oeqa_testtags"):
alltags.add(t)
if self.tags_filter(alltags):
return True
return False
def _getTestCase(self, testCaseClass, tcName):
if not hasattr(testCaseClass, '__oeqa_loader') and \
issubclass(testCaseClass, OETestCase):
# In order to support data_vars validation
# monkey patch the default setUp/tearDown{Class} to use
# the ones provided by OETestCase
setattr(testCaseClass, 'setUpClassMethod',
getattr(testCaseClass, 'setUpClass'))
setattr(testCaseClass, 'tearDownClassMethod',
getattr(testCaseClass, 'tearDownClass'))
setattr(testCaseClass, 'setUpClass',
testCaseClass._oeSetUpClass)
setattr(testCaseClass, 'tearDownClass',
testCaseClass._oeTearDownClass)
# In order to support decorators initialization
# monkey patch the default setUp/tearDown to use
# a setUpDecorators/tearDownDecorators that methods
# will call setUp/tearDown original methods.
setattr(testCaseClass, 'setUpMethod',
getattr(testCaseClass, 'setUp'))
setattr(testCaseClass, 'tearDownMethod',
getattr(testCaseClass, 'tearDown'))
setattr(testCaseClass, 'setUp', testCaseClass._oeSetUp)
setattr(testCaseClass, 'tearDown', testCaseClass._oeTearDown)
setattr(testCaseClass, '__oeqa_loader', True)
case = testCaseClass(tcName)
if isinstance(case, OETestCase):
setattr(case, 'decorators', [])
return case
def loadTestsFromTestCase(self, testCaseClass):
"""
Returns a suite of all tests cases contained in testCaseClass.
"""
if issubclass(testCaseClass, unittest.suite.TestSuite):
raise TypeError("Test cases should not be derived from TestSuite." \
" Maybe you meant to derive %s from TestCase?" \
% testCaseClass.__name__)
if not issubclass(testCaseClass, unittest.case.TestCase):
raise TypeError("Test %s is not derived from %s" % \
(testCaseClass.__name__, unittest.case.TestCase.__name__))
testCaseNames = self.getTestCaseNames(testCaseClass)
if not testCaseNames and hasattr(testCaseClass, 'runTest'):
testCaseNames = ['runTest']
suite = []
for tcName in testCaseNames:
case = self._getTestCase(testCaseClass, tcName)
# Filer by case id
if not (self.tests and not 'auto' in self.tests
and not getCaseID(case) in self.tests):
self._handleTestCaseDecorators(case)
# Filter by decorators
if not self._filterTest(case):
self._registerTestCase(case)
suite.append(case)
return self.suiteClass(suite)
def _required_modules_validation(self):
"""
Search in Test context registry if a required
test is found, raise an exception when not found.
"""
for module in self.modules_required:
found = False
# The module name is splitted to only compare the
# first part of a test case id.
comp_len = len(module.split('.'))
for case in self.tc._registry['cases']:
case_comp = '.'.join(case.split('.')[0:comp_len])
if module == case_comp:
found = True
break
if not found:
raise OEQATestNotFound("Not found %s in loaded test cases" % \
module)
def discover(self):
big_suite = self.suiteClass()
for path in self.module_paths:
_find_duplicated_modules(big_suite, path)
suite = super(OETestLoader, self).discover(path,
pattern='*.py', top_level_dir=path)
big_suite.addTests(suite)
cases = None
discover_classes = [clss for clss in decoratorClasses
if issubclass(clss, OETestDiscover)]
for clss in discover_classes:
cases = clss.discover(self.tc._registry)
if self.modules_required:
self._required_modules_validation()
return self.suiteClass(cases) if cases else big_suite
def _filterModule(self, module):
if module.__name__ in sys.builtin_module_names:
msg = 'Tried to import %s test module but is a built-in'
raise ImportError(msg % module.__name__)
# XXX; If the module has more than one namespace only use
# the first to support run the whole module specifying the
# <module_name>.[test_class].[test_name]
module_name_small = module.__name__.split('.')[0]
module_name = module.__name__
# Normal test modules are loaded if no modules were specified,
# if module is in the specified module list or if 'auto' is in
# module list.
# Underscore modules are loaded only if specified in module list.
load_module = True if not module_name.startswith('_') \
and (not self.modules \
or module_name in self.modules \
or module_name_small in self.modules \
or 'auto' in self.modules) \
else False
load_underscore = True if module_name.startswith('_') \
and (module_name in self.modules or \
module_name_small in self.modules) \
else False
if any(c.isupper() for c in module.__name__):
raise SystemExit("Module '%s' contains uppercase characters and this isn't supported. Please fix the module name." % module.__name__)
return (load_module, load_underscore)
# XXX After Python 3.5, remove backward compatibility hacks for
# use_load_tests deprecation via *args and **kws. See issue 16662.
if sys.version_info >= (3,5):
def loadTestsFromModule(self, module, *args, pattern=None, **kws):
"""
Returns a suite of all tests cases contained in module.
"""
load_module, load_underscore = self._filterModule(module)
if load_module or load_underscore:
return super(OETestLoader, self).loadTestsFromModule(
module, *args, pattern=pattern, **kws)
else:
return self.suiteClass()
else:
def loadTestsFromModule(self, module, use_load_tests=True):
"""
Returns a suite of all tests cases contained in module.
"""
load_module, load_underscore = self._filterModule(module)
if load_module or load_underscore:
return super(OETestLoader, self).loadTestsFromModule(
module, use_load_tests)
else:
return self.suiteClass()
+357
View File
@@ -0,0 +1,357 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import os
import time
import unittest
import logging
import re
import json
import sys
from unittest import TextTestResult as _TestResult
from unittest import TextTestRunner as _TestRunner
class OEStreamLogger(object):
def __init__(self, logger):
self.logger = logger
self.buffer = ""
def write(self, msg):
if len(msg) > 1 and msg[0] != '\n':
if '...' in msg:
self.buffer += msg
elif self.buffer:
self.buffer += msg
self.logger.log(logging.INFO, self.buffer)
self.buffer = ""
else:
self.logger.log(logging.INFO, msg)
def flush(self):
for handler in self.logger.handlers:
handler.flush()
class OETestResult(_TestResult):
def __init__(self, tc, *args, **kwargs):
super(OETestResult, self).__init__(*args, **kwargs)
self.successes = []
self.starttime = {}
self.endtime = {}
self.progressinfo = {}
self.extraresults = {}
# Inject into tc so that TestDepends decorator can see results
tc.results = self
self.tc = tc
# stdout and stderr for each test case
self.logged_output = {}
def startTest(self, test):
# May have been set by concurrencytest
if test.id() not in self.starttime:
self.starttime[test.id()] = time.time()
super(OETestResult, self).startTest(test)
def stopTest(self, test):
self.endtime[test.id()] = time.time()
if self.buffer:
self.logged_output[test.id()] = (
sys.stdout.getvalue(), sys.stderr.getvalue())
super(OETestResult, self).stopTest(test)
if test.id() in self.progressinfo:
self.tc.logger.info(self.progressinfo[test.id()])
# Print the errors/failures early to aid/speed debugging, its a pain
# to wait until selftest finishes to see them.
for t in ['failures', 'errors', 'skipped', 'expectedFailures']:
for (scase, msg) in getattr(self, t):
if test.id() == scase.id():
self.tc.logger.info(str(msg))
break
def logSummary(self, component, context_msg=''):
elapsed_time = self.tc._run_end_time - self.tc._run_start_time
self.tc.logger.info("SUMMARY:")
self.tc.logger.info("%s (%s) - Ran %d test%s in %.3fs" % (component,
context_msg, self.testsRun, self.testsRun != 1 and "s" or "",
elapsed_time))
if self.wasSuccessful():
msg = "%s - OK - All required tests passed" % component
else:
msg = "%s - FAIL - Required tests failed" % component
msg += " (successes=%d, skipped=%d, failures=%d, errors=%d)" % (len(self.successes), len(self.skipped), len(self.failures), len(self.errors))
self.tc.logger.info(msg)
def _getTestResultDetails(self, case):
result_types = {'failures': 'FAILED', 'errors': 'ERROR', 'skipped': 'SKIPPED',
'expectedFailures': 'EXPECTEDFAIL', 'successes': 'PASSED',
'unexpectedSuccesses' : 'PASSED'}
for rtype in result_types:
found = False
for resultclass in getattr(self, rtype):
# unexpectedSuccesses are just lists, not lists of tuples
if isinstance(resultclass, tuple):
scase, msg = resultclass
else:
scase, msg = resultclass, None
if case.id() == scase.id():
found = True
break
scase_str = str(scase.id())
# When fails at module or class level the class name is passed as string
# so figure out to see if match
m = re.search(r"^setUpModule \((?P<module_name>.*)\).*$", scase_str)
if m:
if case.__class__.__module__ == m.group('module_name'):
found = True
break
m = re.search(r"^setUpClass \((?P<class_name>.*)\).*$", scase_str)
if m:
class_name = "%s.%s" % (case.__class__.__module__,
case.__class__.__name__)
if class_name == m.group('class_name'):
found = True
break
if found:
return result_types[rtype], msg
return 'UNKNOWN', None
def extractExtraResults(self, test, details = None):
extraresults = None
if details is not None and "extraresults" in details:
extraresults = details.get("extraresults", {})
elif hasattr(test, "extraresults"):
extraresults = test.extraresults
if extraresults is not None:
for k, v in extraresults.items():
# handle updating already existing entries (e.g. ptestresults.sections)
if k in self.extraresults:
self.extraresults[k].update(v)
else:
self.extraresults[k] = v
def addError(self, test, *args, details = None):
self.extractExtraResults(test, details = details)
return super(OETestResult, self).addError(test, *args)
def addFailure(self, test, *args, details = None):
self.extractExtraResults(test, details = details)
return super(OETestResult, self).addFailure(test, *args)
def addSuccess(self, test, details = None):
#Added so we can keep track of successes too
self.successes.append((test, None))
self.extractExtraResults(test, details = details)
return super(OETestResult, self).addSuccess(test)
def addExpectedFailure(self, test, *args, details = None):
self.extractExtraResults(test, details = details)
return super(OETestResult, self).addExpectedFailure(test, *args)
def addUnexpectedSuccess(self, test, details = None):
self.extractExtraResults(test, details = details)
return super(OETestResult, self).addUnexpectedSuccess(test)
def logDetails(self, json_file_dir=None, configuration=None, result_id=None,
dump_streams=False):
self.tc.logger.info("RESULTS:")
result = self.extraresults
logs = {}
if hasattr(self.tc, "extraresults"):
result.update(self.tc.extraresults)
for case_name in self.tc._registry['cases']:
case = self.tc._registry['cases'][case_name]
(status, log) = self._getTestResultDetails(case)
t = ""
duration = 0
if case.id() in self.starttime and case.id() in self.endtime:
duration = self.endtime[case.id()] - self.starttime[case.id()]
t = " (" + "{0:.2f}".format(duration) + "s)"
if status not in logs:
logs[status] = []
logs[status].append("RESULTS - %s: %s%s" % (case.id(), status, t))
report = {'status': status}
if log:
report['log'] = log
if duration:
report['duration'] = duration
alltags = []
# pull tags from the case class
if hasattr(case, "__oeqa_testtags"):
alltags.extend(getattr(case, "__oeqa_testtags"))
# pull tags from the method itself
test_name = case._testMethodName
if hasattr(case, test_name):
method = getattr(case, test_name)
if hasattr(method, "__oeqa_testtags"):
alltags.extend(getattr(method, "__oeqa_testtags"))
if alltags:
report['oetags'] = alltags
if dump_streams and case.id() in self.logged_output:
(stdout, stderr) = self.logged_output[case.id()]
report['stdout'] = stdout
report['stderr'] = stderr
result[case.id()] = report
for i in ['PASSED', 'SKIPPED', 'EXPECTEDFAIL', 'ERROR', 'FAILED', 'UNKNOWN']:
if i not in logs:
continue
for l in logs[i]:
self.tc.logger.info(l)
if json_file_dir:
tresultjsonhelper = OETestResultJSONHelper()
tresultjsonhelper.dump_testresult_file(json_file_dir, configuration, result_id, result)
def wasSuccessful(self):
# Override as we unexpected successes aren't failures for us
return (len(self.failures) == len(self.errors) == 0)
def hasAnyFailingTest(self):
# Account for expected failures
return not self.wasSuccessful() or len(self.expectedFailures)
class OEListTestsResult(object):
def wasSuccessful(self):
return True
class OETestRunner(_TestRunner):
streamLoggerClass = OEStreamLogger
def __init__(self, tc, *args, **kwargs):
kwargs['stream'] = self.streamLoggerClass(tc.logger)
super(OETestRunner, self).__init__(*args, **kwargs)
self.tc = tc
self.resultclass = OETestResult
def _makeResult(self):
return self.resultclass(self.tc, self.stream, self.descriptions,
self.verbosity)
def _walk_suite(self, suite, func):
for obj in suite:
if isinstance(obj, unittest.suite.TestSuite):
if len(obj._tests):
self._walk_suite(obj, func)
elif isinstance(obj, unittest.case.TestCase):
func(self.tc.logger, obj)
self._walked_cases = self._walked_cases + 1
def _list_tests_name(self, suite):
self._walked_cases = 0
def _list_cases(logger, case):
oetags = []
if hasattr(case, '__oeqa_testtags'):
oetags = getattr(case, '__oeqa_testtags')
if oetags:
logger.info("%s (%s)" % (case.id(), ",".join(oetags)))
else:
logger.info("%s" % (case.id()))
self.tc.logger.info("Listing all available tests:")
self._walked_cases = 0
self.tc.logger.info("test (tags)")
self.tc.logger.info("-" * 80)
self._walk_suite(suite, _list_cases)
self.tc.logger.info("-" * 80)
self.tc.logger.info("Total found:\t%s" % self._walked_cases)
def _list_tests_class(self, suite):
self._walked_cases = 0
curr = {}
def _list_classes(logger, case):
if not 'module' in curr or curr['module'] != case.__module__:
curr['module'] = case.__module__
logger.info(curr['module'])
if not 'class' in curr or curr['class'] != \
case.__class__.__name__:
curr['class'] = case.__class__.__name__
logger.info(" -- %s" % curr['class'])
logger.info(" -- -- %s" % case._testMethodName)
self.tc.logger.info("Listing all available test classes:")
self._walk_suite(suite, _list_classes)
def _list_tests_module(self, suite):
self._walked_cases = 0
listed = []
def _list_modules(logger, case):
if not case.__module__ in listed:
if case.__module__.startswith('_'):
logger.info("%s (hidden)" % case.__module__)
else:
logger.info(case.__module__)
listed.append(case.__module__)
self.tc.logger.info("Listing all available test modules:")
self._walk_suite(suite, _list_modules)
def list_tests(self, suite, display_type):
if display_type == 'name':
self._list_tests_name(suite)
elif display_type == 'class':
self._list_tests_class(suite)
elif display_type == 'module':
self._list_tests_module(suite)
return OEListTestsResult()
class OETestResultJSONHelper(object):
testresult_filename = 'testresults.json'
def _get_existing_testresults_if_available(self, write_dir):
testresults = {}
file = os.path.join(write_dir, self.testresult_filename)
if os.path.exists(file):
with open(file, "r") as f:
testresults = json.load(f)
return testresults
def _write_file(self, write_dir, file_name, file_content):
file_path = os.path.join(write_dir, file_name)
with open(file_path, 'w') as the_file:
the_file.write(file_content)
def dump_testresult_file(self, write_dir, configuration, result_id, test_result):
try:
import bb
has_bb = True
bb.utils.mkdirhier(write_dir)
lf = bb.utils.lockfile(os.path.join(write_dir, 'jsontestresult.lock'))
except ImportError:
has_bb = False
os.makedirs(write_dir, exist_ok=True)
test_results = self._get_existing_testresults_if_available(write_dir)
test_results[result_id] = {'configuration': configuration, 'result': test_result}
json_testresults = json.dumps(test_results, sort_keys=True, indent=4)
self._write_file(write_dir, self.testresult_filename, json_testresults)
if has_bb:
bb.utils.unlockfile(lf)
@@ -0,0 +1,36 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from abc import abstractmethod
class OETarget(object):
def __init__(self, logger, *args, **kwargs):
self.logger = logger
@abstractmethod
def start(self):
pass
@abstractmethod
def stop(self):
pass
@abstractmethod
def run(self, cmd, timeout=None):
pass
@abstractmethod
def copyTo(self, localSrc, remoteDst):
pass
@abstractmethod
def copyFrom(self, remoteSrc, localDst):
pass
@abstractmethod
def copyDirTo(self, localSrc, remoteDst):
pass
+104
View File
@@ -0,0 +1,104 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import os
import sys
import signal
import time
import glob
import subprocess
from collections import defaultdict
from .ssh import OESSHTarget
from oeqa.utils.qemurunner import QemuRunner
from oeqa.utils.dump import MonitorDumper
from oeqa.utils.dump import TargetDumper
supported_fstypes = ['ext3', 'ext4', 'cpio.gz', 'wic']
class OEQemuTarget(OESSHTarget):
def __init__(self, logger, server_ip, timeout=300, user='root',
port=None, machine='', rootfs='', kernel='', kvm=False, slirp=False,
dump_dir='', display='', bootlog='',
tmpdir='', dir_image='', boottime=60, serial_ports=2,
boot_patterns = defaultdict(str), ovmf=False, tmpfsdir=None, **kwargs):
super(OEQemuTarget, self).__init__(logger, None, server_ip, timeout,
user, port)
self.server_ip = server_ip
self.server_port = 0
self.machine = machine
self.rootfs = rootfs
self.kernel = kernel
self.kvm = kvm
self.ovmf = ovmf
self.use_slirp = slirp
self.boot_patterns = boot_patterns
self.dump_dir = dump_dir
self.bootlog = bootlog
self.runner = QemuRunner(machine=machine, rootfs=rootfs, tmpdir=tmpdir,
deploy_dir_image=dir_image, display=display,
logfile=bootlog, boottime=boottime,
use_kvm=kvm, use_slirp=slirp, dump_dir=dump_dir, logger=logger,
serial_ports=serial_ports, boot_patterns = boot_patterns,
use_ovmf=ovmf, tmpfsdir=tmpfsdir)
dump_monitor_cmds = kwargs.get("testimage_dump_monitor")
self.monitor_dumper = MonitorDumper(dump_monitor_cmds, dump_dir, self.runner)
if self.monitor_dumper:
self.monitor_dumper.create_dir("qmp")
dump_target_cmds = kwargs.get("testimage_dump_target")
self.target_dumper = TargetDumper(dump_target_cmds, dump_dir, self.runner)
self.target_dumper.create_dir("qemu")
def start(self, params=None, extra_bootparams=None, runqemuparams=''):
if self.use_slirp and not self.server_ip:
self.logger.error("Could not start qemu with slirp without server ip - provide 'TEST_SERVER_IP'")
raise RuntimeError("FAILED to start qemu - check the task log and the boot log")
if self.runner.start(params, extra_bootparams=extra_bootparams, runqemuparams=runqemuparams):
self.ip = self.runner.ip
if self.use_slirp:
target_ip_port = self.runner.ip.split(':')
if len(target_ip_port) == 2:
target_ip = target_ip_port[0]
port = target_ip_port[1]
self.ip = target_ip
self.ssh = self.ssh + ['-p', port]
self.scp = self.scp + ['-P', port]
else:
self.logger.error("Could not get host machine port to connect qemu with slirp, ssh will not be "
"able to connect to qemu with slirp")
if self.runner.server_ip:
self.server_ip = self.runner.server_ip
else:
self.stop()
# Display the first 20 lines of top and
# last 20 lines of the bootlog when the
# target is not being booted up.
topfile = glob.glob(self.dump_dir + "/*_qemu/host_*_top")
msg = "\n\n===== start: snippet =====\n\n"
for f in topfile:
msg += "file: %s\n\n" % f
with open(f) as tf:
for x in range(20):
msg += next(tf)
msg += "\n\n===== end: snippet =====\n\n"
blcmd = ["tail", "-20", self.bootlog]
msg += "===== start: snippet =====\n\n"
try:
out = subprocess.check_output(blcmd, stderr=subprocess.STDOUT, timeout=1).decode('utf-8')
msg += "file: %s\n\n" % self.bootlog
msg += out
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError) as err:
msg += "Error running command: %s\n%s\n" % (blcmd, err)
msg += "\n\n===== end: snippet =====\n"
raise RuntimeError("FAILED to start qemu - check the task log and the boot log %s" % (msg))
def stop(self):
self.runner.stop()
+319
View File
@@ -0,0 +1,319 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import os
import time
import select
import logging
import subprocess
import codecs
from . import OETarget
class OESSHTarget(OETarget):
def __init__(self, logger, ip, server_ip, timeout=300, user='root',
port=None, server_port=0, **kwargs):
if not logger:
logger = logging.getLogger('target')
logger.setLevel(logging.INFO)
filePath = os.path.join(os.getcwd(), 'remoteTarget.log')
fileHandler = logging.FileHandler(filePath, 'w', 'utf-8')
formatter = logging.Formatter(
'%(asctime)s.%(msecs)03d %(levelname)s: %(message)s',
'%H:%M:%S')
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
super(OESSHTarget, self).__init__(logger)
self.ip = ip
self.server_ip = server_ip
self.server_port = server_port
self.timeout = timeout
self.user = user
ssh_options = [
'-o', 'ServerAliveCountMax=2',
'-o', 'ServerAliveInterval=30',
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'StrictHostKeyChecking=no',
'-o', 'LogLevel=ERROR'
]
scp_options = [
'-r'
]
self.ssh = ['ssh', '-l', self.user ] + ssh_options
self.scp = ['scp'] + ssh_options + scp_options
if port:
self.ssh = self.ssh + [ '-p', port ]
self.scp = self.scp + [ '-P', port ]
self._monitor_dumper = None
self.target_dumper = None
def start(self, **kwargs):
pass
def stop(self, **kwargs):
pass
@property
def monitor_dumper(self):
return self._monitor_dumper
@monitor_dumper.setter
def monitor_dumper(self, dumper):
self._monitor_dumper = dumper
self.monitor_dumper.dump_monitor()
def _run(self, command, timeout=None, ignore_status=True):
"""
Runs command in target using SSHProcess.
"""
self.logger.debug("[Running]$ %s" % " ".join(command))
starttime = time.time()
status, output = SSHCall(command, self.logger, timeout)
self.logger.debug("[Command returned '%d' after %.2f seconds]"
"" % (status, time.time() - starttime))
if status and not ignore_status:
raise AssertionError("Command '%s' returned non-zero exit "
"status %d:\n%s" % (command, status, output))
return (status, output)
def run(self, command, timeout=None, ignore_status=True):
"""
Runs command in target.
command: Command to run on target.
timeout: <value>: Kill command after <val> seconds.
None: Kill command default value seconds.
0: No timeout, runs until return.
"""
targetCmd = 'export PATH=/usr/sbin:/sbin:/usr/bin:/bin; %s' % command
sshCmd = self.ssh + [self.ip, targetCmd]
if timeout:
processTimeout = timeout
elif timeout==0:
processTimeout = None
else:
processTimeout = self.timeout
status, output = self._run(sshCmd, processTimeout, ignore_status)
self.logger.debug('Command: %s\nStatus: %d Output: %s\n' % (command, status, output))
if (status == 255) and (('No route to host') in output):
if self.monitor_dumper:
self.monitor_dumper.dump_monitor()
if status == 255:
if self.target_dumper:
self.target_dumper.dump_target()
if self.monitor_dumper:
self.monitor_dumper.dump_monitor()
return (status, output)
def copyTo(self, localSrc, remoteDst):
"""
Copy file to target.
If local file is symlink, recreate symlink in target.
"""
if os.path.islink(localSrc):
link = os.readlink(localSrc)
dstDir, dstBase = os.path.split(remoteDst)
sshCmd = 'cd %s; ln -s %s %s' % (dstDir, link, dstBase)
return self.run(sshCmd)
else:
remotePath = '%s@%s:%s' % (self.user, self.ip, remoteDst)
scpCmd = self.scp + [localSrc, remotePath]
return self._run(scpCmd, ignore_status=False)
def copyFrom(self, remoteSrc, localDst, warn_on_failure=False):
"""
Copy file from target.
"""
remotePath = '%s@%s:%s' % (self.user, self.ip, remoteSrc)
scpCmd = self.scp + [remotePath, localDst]
(status, output) = self._run(scpCmd, ignore_status=warn_on_failure)
if warn_on_failure and status:
self.logger.warning("Copy returned non-zero exit status %d:\n%s" % (status, output))
return (status, output)
def copyDirTo(self, localSrc, remoteDst):
"""
Copy recursively localSrc directory to remoteDst in target.
"""
for root, dirs, files in os.walk(localSrc):
# Create directories in the target as needed
for d in dirs:
tmpDir = os.path.join(root, d).replace(localSrc, "")
newDir = os.path.join(remoteDst, tmpDir.lstrip("/"))
cmd = "mkdir -p %s" % newDir
self.run(cmd)
# Copy files into the target
for f in files:
tmpFile = os.path.join(root, f).replace(localSrc, "")
dstFile = os.path.join(remoteDst, tmpFile.lstrip("/"))
srcFile = os.path.join(root, f)
self.copyTo(srcFile, dstFile)
def deleteFiles(self, remotePath, files):
"""
Deletes files in target's remotePath.
"""
cmd = "rm"
if not isinstance(files, list):
files = [files]
for f in files:
cmd = "%s %s" % (cmd, os.path.join(remotePath, f))
self.run(cmd)
def deleteDir(self, remotePath):
"""
Deletes target's remotePath directory.
"""
cmd = "rmdir %s" % remotePath
self.run(cmd)
def deleteDirStructure(self, localPath, remotePath):
"""
Delete recursively localPath structure directory in target's remotePath.
This function is very usefult to delete a package that is installed in
the DUT and the host running the test has such package extracted in tmp
directory.
Example:
pwd: /home/user/tmp
tree: .
└── work
├── dir1
│   └── file1
└── dir2
localpath = "/home/user/tmp" and remotepath = "/home/user"
With the above variables this function will try to delete the
directory in the DUT in this order:
/home/user/work/dir1/file1
/home/user/work/dir1 (if dir is empty)
/home/user/work/dir2 (if dir is empty)
/home/user/work (if dir is empty)
"""
for root, dirs, files in os.walk(localPath, topdown=False):
# Delete files first
tmpDir = os.path.join(root).replace(localPath, "")
remoteDir = os.path.join(remotePath, tmpDir.lstrip("/"))
self.deleteFiles(remoteDir, files)
# Remove dirs if empty
for d in dirs:
tmpDir = os.path.join(root, d).replace(localPath, "")
remoteDir = os.path.join(remotePath, tmpDir.lstrip("/"))
self.deleteDir(remoteDir)
def SSHCall(command, logger, timeout=None, **opts):
def run():
nonlocal output
nonlocal process
output_raw = b''
starttime = time.time()
process = subprocess.Popen(command, **options)
if timeout:
endtime = starttime + timeout
eof = False
os.set_blocking(process.stdout.fileno(), False)
while time.time() < endtime and not eof:
try:
logger.debug('Waiting for process output: time: %s, endtime: %s' % (time.time(), endtime))
if select.select([process.stdout], [], [], 5)[0] != []:
# wait a bit for more data, tries to avoid reading single characters
time.sleep(0.2)
data = process.stdout.read()
if not data:
eof = True
else:
output_raw += data
# ignore errors to capture as much as possible
logger.debug('Partial data from SSH call:\n%s' % data.decode('utf-8', errors='ignore'))
endtime = time.time() + timeout
except InterruptedError:
logger.debug('InterruptedError')
continue
process.stdout.close()
# process hasn't returned yet
if not eof:
process.terminate()
time.sleep(5)
try:
process.kill()
except OSError:
logger.debug('OSError when killing process')
pass
endtime = time.time() - starttime
lastline = ("\nProcess killed - no output for %d seconds. Total"
" running time: %d seconds." % (timeout, endtime))
logger.debug('Received data from SSH call:\n%s ' % lastline)
output += lastline
else:
output_raw = process.communicate()[0]
output = output_raw.decode('utf-8', errors='ignore')
logger.debug('Data from SSH call:\n%s' % output.rstrip())
# timout or not, make sure process exits and is not hanging
if process.returncode == None:
try:
process.wait(timeout=5)
except TimeoutExpired:
try:
process.kill()
except OSError:
logger.debug('OSError')
pass
options = {
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
"stdin": None,
"shell": False,
"bufsize": -1,
"start_new_session": True,
}
options.update(opts)
output = ''
process = None
# Unset DISPLAY which means we won't trigger SSH_ASKPASS
env = os.environ.copy()
if "DISPLAY" in env:
del env['DISPLAY']
options['env'] = env
try:
run()
except:
# Need to guard against a SystemExit or other exception ocurring
# whilst running and ensure we don't leave a process behind.
if process.poll() is None:
process.kill()
logger.debug('Something went wrong, killing SSH process')
raise
return (process.returncode, output.rstrip())
@@ -0,0 +1,23 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from oeqa.core.case import OETestCase
from oeqa.core.decorator import OETestTag
from oeqa.core.decorator.data import OETestDataDepends
class DataTest(OETestCase):
data_vars = ['IMAGE', 'ARCH']
@OETestDataDepends(['MACHINE',])
@OETestTag('dataTestOk')
def testDataOk(self):
self.assertEqual(self.td.get('IMAGE'), 'core-image-minimal')
self.assertEqual(self.td.get('ARCH'), 'x86')
self.assertEqual(self.td.get('MACHINE'), 'qemuarm')
@OETestTag('dataTestFail')
def testDataFail(self):
pass
@@ -0,0 +1,41 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from oeqa.core.case import OETestCase
from oeqa.core.decorator.depends import OETestDepends
class DependsTest(OETestCase):
def testDependsFirst(self):
self.assertTrue(True, msg='How is this possible?')
@OETestDepends(['testDependsFirst'])
def testDependsSecond(self):
self.assertTrue(True, msg='How is this possible?')
@OETestDepends(['testDependsSecond'])
def testDependsThird(self):
self.assertTrue(True, msg='How is this possible?')
@OETestDepends(['testDependsSecond'])
def testDependsFourth(self):
self.assertTrue(True, msg='How is this possible?')
@OETestDepends(['testDependsThird', 'testDependsFourth'])
def testDependsFifth(self):
self.assertTrue(True, msg='How is this possible?')
@OETestDepends(['testDependsCircular3'])
def testDependsCircular1(self):
self.assertTrue(True, msg='How is this possible?')
@OETestDepends(['testDependsCircular1'])
def testDependsCircular2(self):
self.assertTrue(True, msg='How is this possible?')
@OETestDepends(['testDependsCircular2'])
def testDependsCircular3(self):
self.assertTrue(True, msg='How is this possible?')
@@ -0,0 +1,12 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from oeqa.core.case import OETestCase
class AnotherTest(OETestCase):
def testAnother(self):
self.assertTrue(True, msg='How is this possible?')
@@ -0,0 +1,38 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from oeqa.core.case import OETestCase
from oeqa.core.decorator import OETestTag
class TagTest(OETestCase):
@OETestTag('goodTag')
def testTagGood(self):
self.assertTrue(True, msg='How is this possible?')
@OETestTag('otherTag')
def testTagOther(self):
self.assertTrue(True, msg='How is this possible?')
@OETestTag('otherTag', 'multiTag')
def testTagOtherMulti(self):
self.assertTrue(True, msg='How is this possible?')
def testTagNone(self):
self.assertTrue(True, msg='How is this possible?')
@OETestTag('classTag')
class TagClassTest(OETestCase):
@OETestTag('otherTag')
def testTagOther(self):
self.assertTrue(True, msg='How is this possible?')
@OETestTag('otherTag', 'multiTag')
def testTagOtherMulti(self):
self.assertTrue(True, msg='How is this possible?')
def testTagNone(self):
self.assertTrue(True, msg='How is this possible?')
@@ -0,0 +1,34 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
from time import sleep
from oeqa.core.case import OETestCase
from oeqa.core.decorator.oetimeout import OETimeout
from oeqa.core.decorator.depends import OETestDepends
class TimeoutTest(OETestCase):
@OETimeout(1)
def testTimeoutPass(self):
self.assertTrue(True, msg='How is this possible?')
@OETimeout(1)
def testTimeoutFail(self):
sleep(2)
self.assertTrue(True, msg='How is this possible?')
def testTimeoutSkip(self):
self.skipTest("This test needs to be skipped, so that testTimeoutDepends()'s OETestDepends kicks in")
@OETestDepends(["timeout.TimeoutTest.testTimeoutSkip"])
@OETimeout(3)
def testTimeoutDepends(self):
self.assertTrue(False, msg='How is this possible?')
def testTimeoutUnrelated(self):
sleep(6)
+38
View File
@@ -0,0 +1,38 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import sys
import os
import unittest
import logging
import os
logger = logging.getLogger("oeqa")
logger.setLevel(logging.INFO)
consoleHandler = logging.StreamHandler()
formatter = logging.Formatter('OEQATest: %(message)s')
consoleHandler.setFormatter(formatter)
logger.addHandler(consoleHandler)
def setup_sys_path():
directory = os.path.dirname(os.path.abspath(__file__))
oeqa_lib = os.path.realpath(os.path.join(directory, '../../../'))
if not oeqa_lib in sys.path:
sys.path.insert(0, oeqa_lib)
class TestBase(unittest.TestCase):
def setUp(self):
self.logger = logger
directory = os.path.dirname(os.path.abspath(__file__))
self.cases_path = os.path.join(directory, 'cases')
def _testLoader(self, d={}, modules=[], tests=[], **kwargs):
from oeqa.core.context import OETestContext
tc = OETestContext(d, self.logger)
tc.loadTests(self.cases_path, modules=modules, tests=tests,
**kwargs)
return tc
+55
View File
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import unittest
import logging
import os
from common import setup_sys_path, TestBase
setup_sys_path()
from oeqa.core.exception import OEQAMissingVariable
from oeqa.core.utils.test import getCaseMethod, getSuiteCasesNames
class TestData(TestBase):
modules = ['data']
def test_data_fail_missing_variable(self):
expectedException = "oeqa.core.exception.OEQAMissingVariable"
tc = self._testLoader(modules=self.modules)
results = tc.runTests()
self.assertFalse(results.wasSuccessful())
for test, data in results.errors:
expect = False
if expectedException in data:
expect = True
self.assertTrue(expect)
def test_data_fail_wrong_variable(self):
expectedError = 'AssertionError'
d = {'IMAGE' : 'core-image-weston', 'ARCH' : 'arm'}
tc = self._testLoader(d=d, modules=self.modules)
results = tc.runTests()
self.assertFalse(results.wasSuccessful())
for test, data in results.failures:
expect = False
if expectedError in data:
expect = True
self.assertTrue(expect)
def test_data_ok(self):
d = {'IMAGE' : 'core-image-minimal', 'ARCH' : 'x86', 'MACHINE' : 'qemuarm'}
tc = self._testLoader(d=d, modules=self.modules)
self.assertEqual(True, tc.runTests().wasSuccessful())
if __name__ == '__main__':
unittest.main()
+143
View File
@@ -0,0 +1,143 @@
#!/usr/bin/env python3
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import signal
import unittest
from common import setup_sys_path, TestBase
setup_sys_path()
from oeqa.core.exception import OEQADependency
from oeqa.core.utils.test import getCaseMethod, getSuiteCasesNames, getSuiteCasesIDs
class TestTagDecorator(TestBase):
def _runTest(self, modules, filterfn, expect):
tc = self._testLoader(modules = modules, tags_filter = filterfn)
test_loaded = set(getSuiteCasesIDs(tc.suites))
self.assertEqual(expect, test_loaded)
def test_oetag(self):
# get all cases without any filtering
self._runTest(['oetag'], None, {
'oetag.TagTest.testTagGood',
'oetag.TagTest.testTagOther',
'oetag.TagTest.testTagOtherMulti',
'oetag.TagTest.testTagNone',
'oetag.TagClassTest.testTagOther',
'oetag.TagClassTest.testTagOtherMulti',
'oetag.TagClassTest.testTagNone',
})
# exclude any case with tags
self._runTest(['oetag'], lambda tags: tags, {
'oetag.TagTest.testTagNone',
})
# exclude any case with otherTag
self._runTest(['oetag'], lambda tags: "otherTag" in tags, {
'oetag.TagTest.testTagGood',
'oetag.TagTest.testTagNone',
'oetag.TagClassTest.testTagNone',
})
# exclude any case with classTag
self._runTest(['oetag'], lambda tags: "classTag" in tags, {
'oetag.TagTest.testTagGood',
'oetag.TagTest.testTagOther',
'oetag.TagTest.testTagOtherMulti',
'oetag.TagTest.testTagNone',
})
# include any case with classTag
self._runTest(['oetag'], lambda tags: "classTag" not in tags, {
'oetag.TagClassTest.testTagOther',
'oetag.TagClassTest.testTagOtherMulti',
'oetag.TagClassTest.testTagNone',
})
# include any case with classTag or no tags
self._runTest(['oetag'], lambda tags: tags and "classTag" not in tags, {
'oetag.TagTest.testTagNone',
'oetag.TagClassTest.testTagOther',
'oetag.TagClassTest.testTagOtherMulti',
'oetag.TagClassTest.testTagNone',
})
class TestDependsDecorator(TestBase):
modules = ['depends']
def test_depends_order(self):
tests = ['depends.DependsTest.testDependsFirst',
'depends.DependsTest.testDependsSecond',
'depends.DependsTest.testDependsThird',
'depends.DependsTest.testDependsFourth',
'depends.DependsTest.testDependsFifth']
tests2 = list(tests)
tests2[2], tests2[3] = tests[3], tests[2]
tc = self._testLoader(modules=self.modules, tests=tests)
test_loaded = getSuiteCasesIDs(tc.suites)
result = True if test_loaded == tests or test_loaded == tests2 else False
msg = 'Failed to order tests using OETestDepends decorator.\nTest order:'\
' %s.\nExpected: %s\nOr: %s' % (test_loaded, tests, tests2)
self.assertTrue(result, msg=msg)
def test_depends_fail_missing_dependency(self):
expect = "TestCase depends.DependsTest.testDependsSecond depends on "\
"depends.DependsTest.testDependsFirst and isn't available"
tests = ['depends.DependsTest.testDependsSecond']
try:
# Must throw OEQADependency because missing 'testDependsFirst'
tc = self._testLoader(modules=self.modules, tests=tests)
self.fail('Expected OEQADependency exception')
except OEQADependency as e:
result = True if expect in str(e) else False
msg = 'Expected OEQADependency exception missing testDependsFirst test'
self.assertTrue(result, msg=msg)
def test_depends_fail_circular_dependency(self):
expect = 'have a circular dependency'
tests = ['depends.DependsTest.testDependsCircular1',
'depends.DependsTest.testDependsCircular2',
'depends.DependsTest.testDependsCircular3']
try:
# Must throw OEQADependency because circular dependency
tc = self._testLoader(modules=self.modules, tests=tests)
self.fail('Expected OEQADependency exception')
except OEQADependency as e:
result = True if expect in str(e) else False
msg = 'Expected OEQADependency exception having a circular dependency'
self.assertTrue(result, msg=msg)
class TestTimeoutDecorator(TestBase):
modules = ['timeout']
def test_timeout(self):
tests = ['timeout.TimeoutTest.testTimeoutPass']
msg = 'Failed to run test using OETestTimeout'
alarm_signal = signal.getsignal(signal.SIGALRM)
tc = self._testLoader(modules=self.modules, tests=tests)
self.assertTrue(tc.runTests().wasSuccessful(), msg=msg)
msg = "OETestTimeout didn't restore SIGALRM"
self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg)
def test_timeout_fail(self):
tests = ['timeout.TimeoutTest.testTimeoutFail']
msg = "OETestTimeout test didn't timeout as expected"
alarm_signal = signal.getsignal(signal.SIGALRM)
tc = self._testLoader(modules=self.modules, tests=tests)
self.assertFalse(tc.runTests().wasSuccessful(), msg=msg)
msg = "OETestTimeout didn't restore SIGALRM"
self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg)
def test_timeout_cancel(self):
tests = ['timeout.TimeoutTest.testTimeoutSkip', 'timeout.TimeoutTest.testTimeoutDepends', 'timeout.TimeoutTest.testTimeoutUnrelated']
msg = 'Unrelated test failed to complete'
tc = self._testLoader(modules=self.modules, tests=tests)
self.assertTrue(tc.runTests().wasSuccessful(), msg=msg)
if __name__ == '__main__':
unittest.main()
+63
View File
@@ -0,0 +1,63 @@
#!/usr/bin/env python3
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import os
import unittest
from common import setup_sys_path, TestBase
setup_sys_path()
from oeqa.core.exception import OEQADependency
from oeqa.core.utils.test import getSuiteModules, getSuiteCasesIDs
class TestLoader(TestBase):
@unittest.skip("invalid directory is missing oetag.py")
def test_fail_duplicated_module(self):
cases_path = self.cases_path
invalid_path = os.path.join(cases_path, 'loader', 'invalid')
self.cases_path = [self.cases_path, invalid_path]
expect = 'Duplicated oetag module found in'
msg = 'Expected ImportError exception for having duplicated module'
try:
# Must throw ImportEror because duplicated module
tc = self._testLoader()
self.fail(msg)
except ImportError as e:
result = True if expect in str(e) else False
self.assertTrue(result, msg=msg)
finally:
self.cases_path = cases_path
def test_filter_modules(self):
expected_modules = {'oetag'}
tc = self._testLoader(modules=expected_modules)
modules = getSuiteModules(tc.suites)
msg = 'Expected just %s modules' % ', '.join(expected_modules)
self.assertEqual(modules, expected_modules, msg=msg)
def test_filter_cases(self):
modules = ['oetag', 'data']
expected_cases = {'data.DataTest.testDataOk',
'oetag.TagTest.testTagGood'}
tc = self._testLoader(modules=modules, tests=expected_cases)
cases = set(getSuiteCasesIDs(tc.suites))
msg = 'Expected just %s cases' % ', '.join(expected_cases)
self.assertEqual(cases, expected_cases, msg=msg)
def test_import_from_paths(self):
cases_path = self.cases_path
cases2_path = os.path.join(cases_path, 'loader', 'valid')
expected_modules = {'another'}
self.cases_path = [self.cases_path, cases2_path]
tc = self._testLoader(modules=expected_modules)
modules = getSuiteModules(tc.suites)
self.cases_path = cases_path
msg = 'Expected modules from two different paths'
self.assertEqual(modules, expected_modules, msg=msg)
if __name__ == '__main__':
unittest.main()
+40
View File
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import unittest
import logging
import tempfile
from common import setup_sys_path, TestBase
setup_sys_path()
from oeqa.core.runner import OEStreamLogger
class TestRunner(TestBase):
def test_stream_logger(self):
fp = tempfile.TemporaryFile(mode='w+')
logging.basicConfig(format='%(message)s', stream=fp)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
oeSL = OEStreamLogger(logger)
lines = ['init', 'bigline_' * 65535, 'morebigline_' * 65535 * 4, 'end']
for line in lines:
oeSL.write(line)
fp.seek(0)
fp_lines = fp.readlines()
for i, fp_line in enumerate(fp_lines):
fp_line = fp_line.strip()
self.assertEqual(lines[i], fp_line)
fp.close()
if __name__ == '__main__':
unittest.main()
@@ -0,0 +1,335 @@
#!/usr/bin/env python3
#
# Copyright OpenEmbedded Contributors
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Modified for use in OE by Richard Purdie, 2018
#
# Modified by: Corey Goldberg, 2013
# License: GPLv2+
#
# Original code from:
# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
# Copyright (C) 2005-2011 Canonical Ltd
# License: GPLv2+
import os
import sys
import traceback
import unittest
import subprocess
import testtools
import threading
import time
import io
import json
import subunit
from queue import Queue
from itertools import cycle
from subunit import ProtocolTestCase, TestProtocolClient
from subunit.test_results import AutoTimingTestResultDecorator
from testtools import ThreadsafeForwardingResult, iterate_tests
from testtools.content import Content
from testtools.content_type import ContentType
from oeqa.utils.commands import get_test_layer
import bb.utils
import oe.path
_all__ = [
'ConcurrentTestSuite',
'fork_for_tests',
'partition_tests',
]
#
# Patch the version from testtools to allow access to _test_start and allow
# computation of timing information and threading progress
#
class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult):
super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
self.threadnum = threadnum
self.totalinprocess = totalinprocess
self.totaltests = totaltests
self.buffer = True
self.outputbuf = output
self.finalresult = finalresult
self.finalresult.buffer = True
self.target = target
def _add_result_with_semaphore(self, method, test, *args, **kwargs):
self.semaphore.acquire()
try:
if self._test_start:
self.result.starttime[test.id()] = self._test_start.timestamp()
self.result.threadprogress[self.threadnum].append(test.id())
totalprogress = sum(len(x) for x in self.result.threadprogress.values())
self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % (
self.threadnum,
len(self.result.threadprogress[self.threadnum]),
self.totalinprocess,
totalprogress,
self.totaltests,
"{0:.2f}".format(time.time()-self._test_start.timestamp()),
self.target.failed_tests,
test.id())
finally:
self.semaphore.release()
self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8"))
self.finalresult._stdout_buffer = io.StringIO()
super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
class ProxyTestResult:
# a very basic TestResult proxy, in order to modify add* calls
def __init__(self, target):
self.result = target
self.failed_tests = 0
def _addResult(self, method, test, *args, exception = False, **kwargs):
return method(test, *args, **kwargs)
def addError(self, test, err = None, **kwargs):
self.failed_tests += 1
self._addResult(self.result.addError, test, err, exception = True, **kwargs)
def addFailure(self, test, err = None, **kwargs):
self.failed_tests += 1
self._addResult(self.result.addFailure, test, err, exception = True, **kwargs)
def addSuccess(self, test, **kwargs):
self._addResult(self.result.addSuccess, test, **kwargs)
def addExpectedFailure(self, test, err = None, **kwargs):
self._addResult(self.result.addExpectedFailure, test, err, exception = True, **kwargs)
def addUnexpectedSuccess(self, test, **kwargs):
self._addResult(self.result.addUnexpectedSuccess, test, **kwargs)
def wasSuccessful(self):
return self.failed_tests == 0
def __getattr__(self, attr):
return getattr(self.result, attr)
class ExtraResultsDecoderTestResult(ProxyTestResult):
def _addResult(self, method, test, *args, exception = False, **kwargs):
if "details" in kwargs and "extraresults" in kwargs["details"]:
if isinstance(kwargs["details"]["extraresults"], Content):
kwargs = kwargs.copy()
kwargs["details"] = kwargs["details"].copy()
extraresults = kwargs["details"]["extraresults"]
data = bytearray()
for b in extraresults.iter_bytes():
data += b
extraresults = json.loads(data.decode())
kwargs["details"]["extraresults"] = extraresults
return method(test, *args, **kwargs)
class ExtraResultsEncoderTestResult(ProxyTestResult):
def _addResult(self, method, test, *args, exception = False, **kwargs):
if hasattr(test, "extraresults"):
extras = lambda : [json.dumps(test.extraresults).encode()]
kwargs = kwargs.copy()
if "details" not in kwargs:
kwargs["details"] = {}
else:
kwargs["details"] = kwargs["details"].copy()
kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras)
# if using details, need to encode any exceptions into the details obj,
# testtools does not handle "err" and "details" together.
if "details" in kwargs and exception and (len(args) >= 1 and args[0] is not None):
kwargs["details"]["traceback"] = testtools.content.TracebackContent(args[0], test)
args = []
return method(test, *args, **kwargs)
#
# We have to patch subunit since it doesn't understand how to handle addError
# outside of a running test case. This can happen if classSetUp() fails
# for a class of tests. This unfortunately has horrible internal knowledge.
#
def outSideTestaddError(self, offset, line):
"""An 'error:' directive has been read."""
test_name = line[offset:-1].decode('utf8')
self.parser._current_test = subunit.RemotedTestCase(test_name)
self.parser.current_test_description = test_name
self.parser._state = self.parser._reading_error_details
self.parser._reading_error_details.set_simple()
self.parser.subunitLineReceived(line)
subunit._OutSideTest.addError = outSideTestaddError
# Like outSideTestaddError above, we need an equivalent for skips
# happening at the setUpClass() level, otherwise we will see "UNKNOWN"
# as a result for concurrent tests
#
def outSideTestaddSkip(self, offset, line):
"""A 'skip:' directive has been read."""
test_name = line[offset:-1].decode('utf8')
self.parser._current_test = subunit.RemotedTestCase(test_name)
self.parser.current_test_description = test_name
self.parser._state = self.parser._reading_skip_details
self.parser._reading_skip_details.set_simple()
self.parser.subunitLineReceived(line)
subunit._OutSideTest.addSkip = outSideTestaddSkip
#
# A dummy structure to add to io.StringIO so that the .buffer object
# is available and accepts writes. This allows unittest with buffer=True
# to interact ok with subunit which wants to access sys.stdout.buffer.
#
class dummybuf(object):
def __init__(self, parent):
self.p = parent
def write(self, data):
self.p.write(data.decode("utf-8"))
#
# Taken from testtools.ConncurrencyTestSuite but modified for OE use
#
class ConcurrentTestSuite(unittest.TestSuite):
def __init__(self, suite, processes, setupfunc, removefunc):
super(ConcurrentTestSuite, self).__init__([suite])
self.processes = processes
self.setupfunc = setupfunc
self.removefunc = removefunc
def run(self, result):
testservers, totaltests = fork_for_tests(self.processes, self)
try:
threads = {}
queue = Queue()
semaphore = threading.Semaphore(1)
result.threadprogress = {}
for i, (testserver, testnum, output) in enumerate(testservers):
result.threadprogress[i] = []
process_result = BBThreadsafeForwardingResult(
ExtraResultsDecoderTestResult(result),
semaphore, i, testnum, totaltests, output, result)
reader_thread = threading.Thread(
target=self._run_test, args=(testserver, process_result, queue))
threads[testserver] = reader_thread, process_result
reader_thread.start()
while threads:
finished_test = queue.get()
threads[finished_test][0].join()
del threads[finished_test]
except:
for thread, process_result in threads.values():
process_result.stop()
raise
finally:
for testserver in testservers:
testserver[0]._stream.close()
def _run_test(self, testserver, process_result, queue):
try:
try:
testserver.run(process_result)
except Exception:
# The run logic itself failed
case = testtools.ErrorHolder(
"broken-runner",
error=sys.exc_info())
case.run(process_result)
finally:
queue.put(testserver)
def fork_for_tests(concurrency_num, suite):
testservers = []
if 'BUILDDIR' in os.environ:
selftestdir = get_test_layer()
test_blocks = partition_tests(suite, concurrency_num)
# Clear the tests from the original suite so it doesn't keep them alive
suite._tests[:] = []
totaltests = sum(len(x) for x in test_blocks)
for process_tests in test_blocks:
numtests = len(process_tests)
process_suite = unittest.TestSuite(process_tests)
# Also clear each split list so new suite has only reference
process_tests[:] = []
c2pread, c2pwrite = os.pipe()
# Clear buffers before fork to avoid duplicate output
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid == 0:
ourpid = os.getpid()
try:
newbuilddir = None
stream = os.fdopen(c2pwrite, 'wb', 1)
os.close(c2pread)
(builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite)
# Leave stderr and stdout open so we can see test noise
# Close stdin so that the child goes away if it decides to
# read from stdin (otherwise its a roulette to see what
# child actually gets keystrokes for pdb etc).
newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
# Send stdout/stderr over the stream
os.dup2(c2pwrite, sys.stdout.fileno())
os.dup2(c2pwrite, sys.stderr.fileno())
subunit_client = TestProtocolClient(stream)
subunit_result = AutoTimingTestResultDecorator(subunit_client)
unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
if ourpid != os.getpid():
os._exit(0)
if newbuilddir and unittest_result.wasSuccessful():
suite.removefunc(newbuilddir)
except:
# Don't do anything with process children
if ourpid != os.getpid():
os._exit(1)
# Try and report traceback on stream, but exit with error
# even if stream couldn't be created or something else
# goes wrong. The traceback is formatted to a string and
# written in one go to avoid interleaving lines from
# multiple failing children.
try:
stream.write(traceback.format_exc().encode('utf-8'))
except:
sys.stderr.write(traceback.format_exc())
finally:
if newbuilddir:
suite.removefunc(newbuilddir)
stream.flush()
os._exit(1)
stream.flush()
os._exit(0)
else:
os.close(c2pwrite)
stream = os.fdopen(c2pread, 'rb', 1)
# Collect stdout/stderr into an io buffer
output = io.BytesIO()
testserver = ProtocolTestCase(stream, passthrough=output)
testservers.append((testserver, numtests, output))
return testservers, totaltests
def partition_tests(suite, count):
# Keep tests from the same class together but allow tests from modules
# to go to different processes to aid parallelisation.
modules = {}
for test in iterate_tests(suite):
m = test.__module__ + "." + test.__class__.__name__
if m not in modules:
modules[m] = []
modules[m].append(test)
# Simply divide the test blocks between the available processes
partitions = [list() for _ in range(count)]
for partition, m in zip(cycle(partitions), modules):
partition.extend(modules[m])
# No point in empty threads so drop them
return [p for p in partitions if p]
+22
View File
@@ -0,0 +1,22 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import os
import sys
def findFile(file_name, directory):
"""
Search for a file in directory and returns its complete path.
"""
for r, d, f in os.walk(directory):
if file_name in f:
return os.path.join(r, file_name)
return None
def remove_safe(path):
if os.path.exists(path):
os.remove(path)
+89
View File
@@ -0,0 +1,89 @@
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: MIT
#
import os
import inspect
import unittest
def getSuiteCases(suite):
"""
Returns individual test from a test suite.
"""
tests = []
if isinstance(suite, unittest.TestCase):
tests.append(suite)
elif isinstance(suite, unittest.suite.TestSuite):
for item in suite:
tests.extend(getSuiteCases(item))
return tests
def getSuiteModules(suite):
"""
Returns modules in a test suite.
"""
modules = set()
for test in getSuiteCases(suite):
modules.add(getCaseModule(test))
return modules
def getSuiteCasesInfo(suite, func):
"""
Returns test case info from suite. Info is fetched from func.
"""
tests = []
for test in getSuiteCases(suite):
tests.append(func(test))
return tests
def getSuiteCasesNames(suite):
"""
Returns test case names from suite.
"""
return getSuiteCasesInfo(suite, getCaseMethod)
def getSuiteCasesIDs(suite):
"""
Returns test case ids from suite.
"""
return getSuiteCasesInfo(suite, getCaseID)
def getSuiteCasesFiles(suite):
"""
Returns test case files paths from suite.
"""
return getSuiteCasesInfo(suite, getCaseFile)
def getCaseModule(test_case):
"""
Returns test case module name.
"""
return test_case.__module__
def getCaseClass(test_case):
"""
Returns test case class name.
"""
return test_case.__class__.__name__
def getCaseID(test_case):
"""
Returns test case complete id.
"""
return test_case.id()
def getCaseFile(test_case):
"""
Returns test case file path.
"""
return inspect.getsourcefile(test_case.__class__)
def getCaseMethod(test_case):
"""
Returns test case method name.
"""
return getCaseID(test_case).split('.')[-1]