diff --git a/src/python/twitter/pants/BUILD b/src/python/twitter/pants/BUILD index c4cfaac..656392d 100644 --- a/src/python/twitter/pants/BUILD +++ b/src/python/twitter/pants/BUILD @@ -27,13 +27,13 @@ python_library( name='pants-deps', dependencies=[ pants('3rdparty/python:ansicolors'), + pants('3rdparty/python:coverage'), pants('3rdparty/python:elementtree'), pants('3rdparty/python:mako'), pants('3rdparty/python:markdown'), pants('3rdparty/python:psutil'), pants('3rdparty/python:pygments'), pants('3rdparty/python:pytest'), - pants('3rdparty/python:pytest-cov'), pants('3rdparty/python:requests'), python_requirement('pylint==0.25.1', version_filter=pylint_build_filter), ] @@ -44,6 +44,7 @@ python_library( dependencies=[ pants(':pants-deps'), pants('src/python/twitter/common/collections'), + pants('src/python/twitter/common/concurrent'), pants('src/python/twitter/common/config'), pants('src/python/twitter/common/confluence'), pants('src/python/twitter/common/contextutil'), @@ -53,6 +54,7 @@ python_library( pants('src/python/twitter/common/log'), pants('src/python/twitter/common/process'), pants('src/python/twitter/common/python'), + pants('src/python/twitter/common/recordio'), pants('src/python/twitter/common/quantity'), pants('src/python/twitter/common/util'), pants('src/python/twitter/thrift/text'), @@ -77,7 +79,7 @@ python_library( # TODO(wickman) This version should be automatically pulled in from twitter.pants.version provides=setup_py( name='twitter.pants', - version='0.0.12', + version='0.0.14', description="Twitter's pants build tool.", url='https://github.com/twitter/commons', license='Apache License, Version 2.0', diff --git a/src/python/twitter/pants/base/address.py b/src/python/twitter/pants/base/address.py index 8d4a192..25178a4 100644 --- a/src/python/twitter/pants/base/address.py +++ b/src/python/twitter/pants/base/address.py @@ -93,5 +93,15 @@ class Address(object): def __ne__(self, other): return not self.__eq__(other) + def compact_repr(self, source_root=None): + def _repr(): + if os.path.basename(self.buildfile.parent_path) == self.target_name: + return os.path.dirname(self.buildfile.relpath) + else: + return '%s:%s' % (os.path.dirname(self.buildfile.relpath), self.target_name) + if source_root: + return os.path.relpath(_repr(), source_root) + return _repr() + def __repr__(self): return "%s:%s" % (self.buildfile, self.target_name) diff --git a/src/python/twitter/pants/commands/build.py b/src/python/twitter/pants/commands/build.py index b32363c..b678c82 100644 --- a/src/python/twitter/pants/commands/build.py +++ b/src/python/twitter/pants/commands/build.py @@ -95,6 +95,7 @@ class Build(Command): if not target: self.error("Target %s does not exist" % address) + self.targets.update(tgt for tgt in target.resolve() if is_concrete(tgt)) def debug(self, message): diff --git a/src/python/twitter/pants/python/pytest_runner.py b/src/python/twitter/pants/python/pytest_runner.py new file mode 100644 index 0000000..62a56a4 --- /dev/null +++ b/src/python/twitter/pants/python/pytest_runner.py @@ -0,0 +1,169 @@ +""" +Pants Python pytest driver. For more information about the pytest plugin interface, see: +http://pytest.org/latest/plugins.html#well-specified-hooks + +This plugin provides a recordio interface to test results as well as a code coverage driver. +""" + +import json +import os +import struct + +from pytest import Function + + +_RECORD_STREAM = None +_COV = None + + +class PythonTestRecord(object): + def __init__(self, json_dict): + pass + + +class PythonTestHeader(PythonTestRecord): + def __init__(self, json_dict): + self.filename, self.functions = map(json_dict.get, ('filename', 'tests')) + super(PythonTestHeader, self).__init__(json_dict) + + +class PythonTestResult(PythonTestRecord): + def __init__(self, json_dict): + self.function, self.filename, self.lineno, self.outcome = map( + json_dict.get, ('function', 'filename', 'lineno', 'outcome')) + super(PythonTestResult, self).__init__(json_dict) + + +class PythonTestFooter(PythonTestRecord): + pass + + +def write_record(fp, json_dict): + """Write one Pants pytest driver record (json_dict) to the stream (fp).""" + json_blob = json.dumps(json_dict).encode('utf-8') + header = struct.pack('>L', len(json_blob)) + fp.write(header + json_blob) + fp.flush() + + +def read_record(fp): + """Read one record Pants pytest driver record from the stream fp. + + The pytest_runner and test_builder communicate via a simple recordio-based mechanism, + framed in the following fashion: + + [4 byte int "len1"][string blob of length "len1"] + [4 byte int "len2"][string blob of length "len2"] + ... + + The return value will be an object of type PythonTestRecord or None if nothing was processed. + There are three distinct subtypes: + - PythonTestHeader* + - PythonTestResult* + - PythonTestFooter + + PythonTestResult has the following attributes: + .function + .filename + .lineno + .outcome (one of 'passed', 'failed', 'skipped') + + Messages types with '*' may be delivered multiple times. Only one footer will be delivered + and signifies that a test run has completed. + """ + header = fp.read(4) + if len(header) == 0: + fp.seek(fp.tell()) + return + blob_len = struct.unpack('>L', header)[0] + read_blob = fp.read(blob_len) + if len(read_blob) != blob_len: + raise ValueError('Failed to read stream!') + read_blob = json.loads(read_blob) + if read_blob['type'] == 'header': + return PythonTestHeader(read_blob) + elif read_blob['type'] == 'footer': + return PythonTestFooter(read_blob) + elif read_blob['type'] == 'test': + return PythonTestResult(read_blob) + raise ValueError('Unknown record type encountered: %s' % read_blob.get('type')) + + +def pytest_addoption(parser): + group = parser.getgroup("terminal reporting") + group.addoption('--pants_test_recordio', + action="store", + dest="pants_test_recordio", + metavar="PATH", + default=None, + help="Create a pants test recordio at this path.") + group.addoption('--pants_test_coverage_root', + action="store", + dest="pants_test_coverage_root", + metavar="PATH", + default=None, + help="If specified, enable coverage at this root directory.") + + +def pytest_configure(config): + global _RECORD_STREAM + global _COV + if config.option.pants_test_recordio: + _RECORD_STREAM = open(config.option.pants_test_recordio, 'ab+') + if config.option.pants_test_coverage_root: + root = config.option.pants_test_coverage_root + try: + from coverage import coverage + except ImportError: + print('Failed to import coverage. Coverage disabled.') + return + _COV = coverage( + config_file=os.path.join(root, 'config'), + auto_data=True, + ) + _COV.start() + + +def translate_function_names(report): + for result in report.result: + if not isinstance(result, Function): + continue + nodeid = report.nodeid.split('::') + if len(nodeid) == 1: + yield result.name + else: + yield '%s.%s' % (nodeid[1], result.name) + + +def pytest_collectreport(report): + write_record(_RECORD_STREAM, dict( + type='header', + filename=report.nodeid.split('::')[0], + tests=list(translate_function_names(report)) + )) + + +def pytest_runtest_logreport(report): + if _RECORD_STREAM is None or report.when != 'call': + return + _, lineno, function = report.location + filename = report.fspath + outcome = report.outcome + write_record(_RECORD_STREAM, dict( + type='test', + function=function, + filename=filename, + lineno=lineno, + outcome=outcome, + )) + + +def pytest_unconfigure(config): + if _RECORD_STREAM is None: + return + if _COV: + _COV.stop() + write_record(_RECORD_STREAM, dict( + type='footer' + )) + _RECORD_STREAM.close() diff --git a/src/python/twitter/pants/python/python_chroot.py b/src/python/twitter/pants/python/python_chroot.py index d566ec0..4588a11 100644 --- a/src/python/twitter/pants/python/python_chroot.py +++ b/src/python/twitter/pants/python/python_chroot.py @@ -61,16 +61,23 @@ class PythonChroot(object): def __init__(self, target): Exception.__init__(self, "Not a valid Python dependency! Found: %s" % target) - def __init__(self, target, root_dir, extra_targets=None, builder=None, interpreter=None, - conn_timeout=None): + def __init__(self, target, + root_dir, + extra_targets=None, + builder=None, + interpreter=None, + platforms=None, + conn_timeout=None): self._config = Config.load() self._target = target self._root = root_dir self._interpreter = interpreter or PythonInterpreter.get() - self._cache = BuildCache(os.path.join(self._config.get('python-setup', 'artifact_cache'), - '%s' % self._interpreter.identity)) + self._cache = BuildCache( + os.path.join(self._config.get('python-setup', 'artifact_cache'), + '%s' % self._interpreter.identity)) self._extra_targets = list(extra_targets) if extra_targets is not None else [] - self._resolver = MultiResolver(self._config, target, conn_timeout=conn_timeout) + self._resolver = MultiResolver( + self._config, target, platforms=platforms, conn_timeout=conn_timeout) self._builder = builder or PEXBuilder(tempfile.mkdtemp(), interpreter=self._interpreter) def __del__(self): @@ -165,15 +172,15 @@ class PythonChroot(object): for antlr in targets['antlrs']: generated_reqs.add(self._generate_antlr_requirement(antlr)) + reqs_to_build = OrderedSet() targets['reqs'] |= generated_reqs for req in targets['reqs']: if not req.should_build(self._interpreter.python, Platform.current()): self.debug('Skipping %s based upon version filter' % req) continue + reqs_to_build.add(req) self._dump_requirement(req._requirement, False, req._repository) - reqs_to_build = (req for req in targets['reqs'] - if req.should_build(self._interpreter.python, Platform.current())) for dist in self._resolver.resolve(reqs_to_build, interpreter=self._interpreter): self._dump_distribution(dist) diff --git a/src/python/twitter/pants/python/resolver.py b/src/python/twitter/pants/python/resolver.py index 0a9b604..dd56a31 100644 --- a/src/python/twitter/pants/python/resolver.py +++ b/src/python/twitter/pants/python/resolver.py @@ -40,8 +40,9 @@ class MultiResolver(ResolverBase): return Crawler(cache=config.get('python-setup', 'download_cache'), conn_timeout=conn_timeout) - def __init__(self, config, target, conn_timeout=None): - platforms = config.getlist('python-setup', 'platforms', ['current']) + def __init__(self, config, target, platforms=None, conn_timeout=None): + if platforms is None: + platforms = config.getlist('python-setup', 'platforms', ['current']) if isinstance(target, PythonBinary) and target.platforms: platforms = target.platforms self._install_cache = config.get('python-setup', 'install_cache') diff --git a/src/python/twitter/pants/python/sdist_builder.py b/src/python/twitter/pants/python/sdist_builder.py index f35f693..4472557 100644 --- a/src/python/twitter/pants/python/sdist_builder.py +++ b/src/python/twitter/pants/python/sdist_builder.py @@ -21,7 +21,6 @@ import os import subprocess import sys -from twitter.common.contextutil import pushd from twitter.common.python.installer import Packager diff --git a/src/python/twitter/pants/python/test_builder.py b/src/python/twitter/pants/python/test_builder.py index 6f7a019..aba8840 100644 --- a/src/python/twitter/pants/python/test_builder.py +++ b/src/python/twitter/pants/python/test_builder.py @@ -16,126 +16,247 @@ from __future__ import print_function -__author__ = 'Brian Wickman' - try: import configparser except ImportError: import ConfigParser as configparser +from contextlib import contextmanager import errno +import json import os -import time +import pkgutil +import shutil import signal +import subprocess import sys +from textwrap import dedent +import threading +import time from twitter.common.collections import OrderedSet -from twitter.common.contextutil import temporary_file -from twitter.common.dirutil import safe_mkdir +from twitter.common.contextutil import temporary_dir +from twitter.common.dirutil import safe_mkdir, safe_open from twitter.common.lang import Compatibility -from twitter.common.quantity import Amount, Time from twitter.common.python.interpreter import PythonInterpreter from twitter.common.python.pex import PEX from twitter.common.python.pex_builder import PEXBuilder - +from twitter.common.quantity import Amount, Time from twitter.pants.base import Config, ParseContext -from twitter.pants.python.python_chroot import PythonChroot from twitter.pants.targets import ( + PythonLibrary, PythonRequirement, - PythonTarget, PythonTestSuite, - PythonTests) + PythonTests, +) +try: + from Queue import Queue, Empty +except ImportError: + from queue import Queue, Empty -class PythonTestResult(object): - @staticmethod - def timeout(): - return PythonTestResult('TIMEOUT') +from .python_chroot import PythonChroot +from .pytest_runner import ( + read_record as read_pytest_record, + PythonTestFooter, + PythonTestResult, +) - @staticmethod - def exception(): - return PythonTestResult('EXCEPTION') +from pkg_resources import DistributionNotFound - @staticmethod - def rc(value): - return PythonTestResult('SUCCESS' if value == 0 else 'FAILURE', - rc=value) - def __init__(self, msg, rc=None): - self._rc = rc - self._msg = msg +class Display(object): + try: + import colors + except ImportError: + colors = None - def __str__(self): - return self._msg + DEFAULT_WIDTH = 100 + + @classmethod + def render(cls, string, color): + if cls.colors is None: + return string + color_fn = getattr(cls.colors, color, None) + if not callable(color_fn): + return string + return color_fn(string) + + @classmethod + def isatty(cls, filelike): + if not hasattr(filelike, 'fileno'): + return False + return os.isatty(filelike.fileno()) and filelike.fileno() in ( + sys.stdout.fileno(), sys.stderr.fileno()) + + @classmethod + def length(cls, string): + """calculate printable length of the string""" + if cls.colors is None: + return len(str(string)) + return len(cls.colors.strip_color(str(string))) + + def __init__(self, filelike, width=DEFAULT_WIDTH): + self._filelike = filelike + self._isatty = self.isatty(filelike) + self._colorful = self._isatty + self._width = width + self._width_duration = 0.5 + self._width_expiration = time.time() @property - def success(self): - return self._rc == 0 + def width(self): + """Return a guess for the terminal width.""" + if not self._isatty: + return self._width + now = time.time() + if now > self._width_expiration: + self._width_expiration = now + self._width_duration + try: + self._width = int(os.popen('stty size', 'r').read().split()[1]) + except (TypeError, ValueError): + pass + return self._width + + def center(self, string, filler='='): + filler_width = self.width - self.length(string) + if not self._colorful and self.colors: + string = self.colors.strip_color(string) + print('%s%s%s' % (filler * (filler_width // 2), string, + (filler * (filler_width // 2 + filler_width % 2))), file=self._filelike) + + def write(self, string): + if not self._colorful and self.colors: + string = self.colors.strip_color(string) + print(string, file=self._filelike) + + def left_right(self, left_string, right_string): + width = self.width + if not self._colorful and self.colors: + left_string = self.colors.strip_color(left_string) + right_string = self.colors.strip_color(right_string) + left_length = self.length(left_string) + right_length = self.length(right_string) + leftover = max(width - left_length - right_length, 1) + dots_length = 15 - right_length + dots_string = leftover * ' ' + dots_length * '.' + ' ' + print('%s%s%s' % (left_string, dots_string[-leftover:], right_string), + file=self._filelike) + + +class PythonTestOutcome(object): + SUCCESS = 0 + FAILURE = 1 + SKIPPED = 2 + TIMEOUT = 3 + CANCELLED = 4 + EXCEPTION = 5 + + NAME_MAP = { + SUCCESS: 'SUCCESS', + FAILURE: 'FAILURE', + SKIPPED: 'SKIPPED', + TIMEOUT: 'TIMEOUT', + CANCELLED: 'CANCELLED', + EXCEPTION: 'EXCEPTION', + } + + COLOR_MAP = { + SUCCESS: 'green', + FAILURE: 'red', + SKIPPED: 'yellow', + TIMEOUT: 'cyan', + CANCELLED: 'yellow', + EXCEPTION: 'red', + } + @classmethod + def timeout(cls): + return cls(cls.TIMEOUT) -DEFAULT_COVERAGE_CONFIG = """ -[run] -branch = True -timid = True + @classmethod + def exception(cls): + return cls(cls.EXCEPTION) -[report] -exclude_lines = - def __repr__ - raise NotImplementedError + @classmethod + def cancelled(cls): + return cls(cls.CANCELLED) -ignore_errors = True -""" + @classmethod + def success(cls): + return cls(cls.SUCCESS) -def generate_coverage_config(target): - cp = configparser.ConfigParser() - cp.readfp(Compatibility.StringIO(DEFAULT_COVERAGE_CONFIG)) - cp.add_section('html') - target_dir = os.path.join(Config.load().getdefault('pants_distdir'), 'coverage', - os.path.dirname(target.address.buildfile.relpath), target.name) - safe_mkdir(target_dir) - cp.set('html', 'directory', target_dir) - return cp + @classmethod + def failure(cls): + return cls(cls.FAILURE) + @classmethod + def skipped(cls): + return cls(cls.SKIPPED) -class PythonTestBuilder(object): - class InvalidDependencyException(Exception): pass - class ChrootBuildingException(Exception): pass + def __init__(self, type_): + if type_ not in self.NAME_MAP: + raise ValueError('Unknown type %s' % type_) + self._type = type_ - TESTING_TARGETS = None + def __str__(self): + return Display.render(self.NAME_MAP[self._type], self.COLOR_MAP[self._type]) - # TODO(wickman) Expose these as configuratable parameters - TEST_TIMEOUT = Amount(2, Time.MINUTES) - TEST_POLL_PERIOD = Amount(100, Time.MILLISECONDS) + @property + def succeeded(self): + return self._type in (self.SUCCESS, self.SKIPPED) - def __init__(self, targets, args, root_dir, interpreter=None, conn_timeout=None): - self.targets = targets - self.args = args - self.root_dir = root_dir - self.interpreter = interpreter or PythonInterpreter.get() - self.successes = {} - self._conn_timeout = conn_timeout + @property + def failed(self): + return self._type in (self.FAILURE, self.EXCEPTION, self.TIMEOUT) + + +DEFAULT_COVERAGE_CONFIG = dedent(""" + [run] + branch = True + timid = True + + [report] + exclude_lines = + def __repr__ + raise NotImplementedError + + ignore_errors = True +""") + + +@contextmanager +def coverage_config(path): + with safe_open(path, 'wb') as fp: + cp = configparser.ConfigParser() + cp.readfp(Compatibility.StringIO(DEFAULT_COVERAGE_CONFIG)) + yield cp + cp.write(fp) - def run(self): - self.successes = {} - rv = self._run_tests(self.targets) - for target in sorted(self.successes): - print('%-80s.....%10s' % (target, self.successes[target])) - return 0 if rv.success else 1 + +class PythonTestChroot(object): + """ + A wrapper around PythonChroot to encapsulate test-specific dependencies (e.g. pytest, + coverage) and drivers (e.g. pytest_runner.) + """ + class Error(Exception): pass + class ChrootBuildingException(Error): pass + + TESTING_TARGETS = {} @classmethod - def generate_test_targets(cls): - if cls.TESTING_TARGETS is None: + def generate_test_targets(cls, interpreter): + if interpreter.version_string not in cls.TESTING_TARGETS: with ParseContext.temp(): - cls.TESTING_TARGETS = [ + cls.TESTING_TARGETS[interpreter.version_string] = [ PythonRequirement('pytest'), - PythonRequirement('pytest-cov'), - PythonRequirement('coverage==3.6b1'), + PythonRequirement('coverage'), PythonRequirement('unittest2', version_filter=lambda py, pl: py.startswith('2')), PythonRequirement('unittest2py3k', version_filter=lambda py, pl: py.startswith('3')) ] - return cls.TESTING_TARGETS + return cls.TESTING_TARGETS[interpreter.version_string] - @staticmethod - def generate_junit_args(target): + @classmethod + def generate_junit_args(cls, target): args = [] xml_base = os.getenv('JUNIT_XML_BASE') if xml_base: @@ -146,94 +267,345 @@ class PythonTestBuilder(object): os.makedirs(os.path.dirname(xml_path)) except OSError as e: if e.errno != errno.EEXIST: - raise PythonTestBuilder.ChrootBuildingException( + raise cls.ChrootBuildingException( "Unable to establish JUnit target: %s! %s" % (target, e)) args.append('--junitxml=%s' % xml_path) return args - @staticmethod - def cov_setup(target, chroot): - cp = generate_coverage_config(target) - with temporary_file(cleanup=False) as fp: - cp.write(fp) - filename = fp.name - if target.coverage: - source = target.coverage + COVERAGE_ROOT = '.coverage' + + @classmethod + def coverage_root(cls, builder): + return os.path.join(builder.path(), cls.COVERAGE_ROOT) + + @classmethod + def write_coverage_setup(cls, target, builder): + coverage_root = cls.coverage_root(builder) + with coverage_config(os.path.join(coverage_root, 'config')) as cp: + if target.coverage: + modules = target.coverage + else: + # This technically makes the assumption that tests/python/ will be testing + # src/python/. To change to honest measurements, do target.walk() here insead, + # however this results in very useless and noisy coverage reports. + modules = set(os.path.dirname(source).replace(os.sep, '.') for source in target.sources) + + def as_list(iterable): + return '\n' + '\n'.join(iterable) + + cp.set('run', 'source', as_list(modules)) + cp.set('run', 'data_file', os.path.join(coverage_root, 'data')) + cp.set('run', 'parallel', True) + return coverage_root + + @classmethod + def write_coverage_sitecustomize(cls, builder): + coverage_root = cls.coverage_root(builder) + with safe_open(os.path.join(coverage_root, 'sitecustomize.py'), 'wb') as fp: + fp.write(dedent(''' + import os + try: + from coverage import coverage + cov = coverage(config_file=os.path.join(%r, 'config'), auto_data=True) + cov.start() + except ImportError: + pass + ''' % coverage_root)) + return coverage_root + + @classmethod + def write_pytest_runner(cls, builder): + builder.chroot().write(pkgutil.get_data(__name__, 'pytest_runner.py'), 'pytest_runner.py') + + def __init__(self, + target, + root_dir, + interpreter=None, + coverage=False, + conn_timeout=None, + redirect=False, + timeout=None): + """ + :param target: The PythonTests target to chroot + :param root_dir: The build root of the repository. + :keyword interpreter: If None, use the current interpreter, otherwise suppled + PythonInterpreter. + :keyword coverage: If True, enable coverage reporting for this test. + :keyword conn_timeout: Use this connection timeout for all downstream resolvers. + :keyword redirect: If True, redirect stdout to a file. Otherwise map to sys.stdout + :keyword timeout: The Amount of Time this test has to complete before being cancelled. + """ + if not isinstance(target, PythonTests): + raise TypeError('PythonTestChroot takes a PythonTests object.') + self.target = target + self.root_dir = root_dir + self._builder = self._chroot = None + self.coverage = coverage + self.interpreter = interpreter or PythonInterpreter.get() + self._conn_timeout = conn_timeout + self._built = False + self._recordio = None + self.results = set() + self.acknowledged_results = set() + self.state = None + self._processed_footer = False + self._redirect = redirect + self._expiration = None + self._timeout = timeout or Amount(1000, Time.HOURS) + # Pants is not threadsafe -- calling self.target.sources actually + # changes the cwd if it's never been called before. I spent almost 5 + # hours tracking down nondeterministic bugs caused by this. Moving from + # .popen to .__init__ fixes it, since the builds are serialized, but + # this seems brittle. + self._sources = [os.path.abspath(os.path.join(self.target.target_base, source)) + for source in self.target.sources] + + def build(self): + builder = PEXBuilder() + builder.info.entry_point = 'pytest' + builder.info.ignore_errors = self.target._soft_dependencies + self._chroot = PythonChroot( + self.target, + self.root_dir, + interpreter=self.interpreter, + platforms=('current',), # only build tests for the current platform + extra_targets=self.generate_test_targets(PythonInterpreter.get()), + builder=builder, + conn_timeout=self._conn_timeout) + self._builder = self._chroot.dump() + self.write_pytest_runner(self._builder) + self._builder.freeze() + self._recordio_path = os.path.join(self._builder.path(), 'tests.recordio') + self._recordio = open(self._recordio_path, 'wb+') + self._built = True + self._stdout = None + + def pytest_runner_args(self): + return ['-p', 'pytest_runner', '--pants_test_recordio', self._recordio_path] + + def popen(self, args): + if not self._built: + self.build() + test_args = self.generate_junit_args(self.target) + test_args.extend(self.pytest_runner_args()) + if self._redirect: + # If we are redirecting for the purpose of the parallel test runner, make output as quiet + # as possible but capture stdout/stderr of the processes so that they can be printed. + test_args.append('-qs') + test_args.extend(args) + env = os.environ.copy() + if self.coverage: + test_args.append( + '--pants_test_coverage=%s' % self.write_coverage_setup(self.target, self._builder)) + coverage_root = self.write_coverage_sitecustomize(self._builder) + pythonpath = [coverage_root] + list(filter(None, env.get('PYTHONPATH', '').split(':'))) + env['PYTHONPATH'] = ':'.join(pythonpath) + if self._redirect: + self._stdout = open(os.path.join(self._builder.path(), 'stdout'), 'w') + pex = PEX(self._builder.path(), interpreter=self.interpreter) + self._po = pex.run( + args=test_args + self._sources, + blocking=False, + setsid=True, + stdout=self._stdout, + env=env, + stderr=subprocess.STDOUT) + self._expiration = time.time() + self._timeout.as_(Time.SECONDS) + return self._po + + @property + def output(self): + if not self._redirect: + return '' + with open(os.path.join(self._builder.path(), 'stdout')) as fp: + return fp.read() + + @property + def finished(self): + return self.results == self.acknowledged_results and self._processed_footer + + def _process_record(self, record): + if isinstance(record, PythonTestResult): + self.results.add(record) + return record + elif isinstance(record, PythonTestFooter): + self._processed_footer = True + if self._stdout: + self._stdout.close() + self._po.wait() + + def read(self): + if not self._processed_footer and time.time() > self._expiration: + self.state = 'timeout' + self.kill() + return + record = read_pytest_record(self._recordio) + if record is not None: + return self._process_record(record) + + def acknowledge(self, result): + self.acknowledged_results.add(result) + + def mark_skipped(self): + self.kill(state='skipped') + + @property + def outcome(self): + if not self.finished and self.state is None: + return None + if self.state == 'timeout': + return PythonTestOutcome.timeout() + elif self.state == 'skipped': + return PythonTestOutcome.skipped() + elif self.state == 'cancelled': + return PythonTestOutcome.cancelled() else: - # This technically makes the assumption that tests/python/ will be testing - # src/python/. To change to honest measurements, do target.walk() here instead, - # however this results in very useless and noisy coverage reports. - source = set(os.path.dirname(source).replace(os.sep, '.') for source in target.sources) - args = ['-p', 'pytest_cov', - '--cov-config', filename, - '--cov-report', 'html', - '--cov-report', 'term'] - for module in source: - args.extend(['--cov', module]) - return filename, args - - @staticmethod - def wait_on(popen, timeout=TEST_TIMEOUT): - total_wait = Amount(0, Time.SECONDS) - while total_wait < timeout: - rc = popen.poll() - if rc is not None: - return PythonTestResult.rc(rc) - total_wait += PythonTestBuilder.TEST_POLL_PERIOD - time.sleep(PythonTestBuilder.TEST_POLL_PERIOD.as_(Time.SECONDS)) - popen.kill() - return PythonTestResult.timeout() - - def _run_python_test(self, target): - po = None - rv = PythonTestResult.exception() - coverage_rc = None - coverage_enabled = 'PANTS_PY_COVERAGE' in os.environ + success = all(result.outcome in ('skipped', 'passed') for result in self.results) + return PythonTestOutcome.success() if success else PythonTestOutcome.failure() + + def iter_coverage_data(self): + """Yield source_root, data file tuples from this chroot.""" + if self.coverage: + coverage_root = os.path.join(self._builder.path(), self.COVERAGE_ROOT) + for path in os.listdir(coverage_root): + if path.startswith('data'): + yield (self._builder.path(), os.path.join(coverage_root, path)) + + def _kill(self): + if self._po: + self._po.poll() + if self._po and self._po.returncode != 0: + try: + os.killpg(self._po.pid, signal.SIGTERM) + except OSError as e: + print('Failed to kill process group %s: %s' % (self._po.pid, e)) - try: - builder = PEXBuilder(interpreter=self.interpreter) - builder.info.entry_point = target.entry_point - builder.info.ignore_errors = target._soft_dependencies - chroot = PythonChroot( - target, - self.root_dir, - extra_targets=self.generate_test_targets(), - builder=builder, - interpreter=self.interpreter, - conn_timeout=self._conn_timeout) - builder = chroot.dump() - builder.freeze() - test_args = PythonTestBuilder.generate_junit_args(target) - test_args.extend(self.args) - if coverage_enabled: - coverage_rc, args = self.cov_setup(target, builder.chroot()) - test_args.extend(args) - sources = [os.path.join(target.target_base, source) for source in target.sources] - po = PEX(builder.path(), interpreter=self.interpreter).run( - args=test_args + sources, blocking=False, setsid=True) - # TODO(wickman) If coverage is enabled, write an intermediate .html that points to - # each of the coverage reports generated and webbrowser.open to that page. - rv = PythonTestBuilder.wait_on(po, timeout=target.timeout) - except Exception as e: - import traceback - print('Failed to run test!', file=sys.stderr) - traceback.print_exc() - rv = PythonTestResult.exception() - finally: - if coverage_rc: - os.unlink(coverage_rc) - if po and po.returncode != 0: - try: - os.killpg(po.pid, signal.SIGTERM) - except OSError as e: - if e.errno == errno.EPERM: - print("Unable to kill process group: %d" % po.pid) - elif e.errno != errno.ESRCH: - rv = PythonTestResult.exception() - self.successes[target._create_id()] = rv - return rv - - def _run_python_test_suite(self, target, fail_hard=True): + def kill(self, state='cancelled'): + if not self.finished: + self.state = state + self._kill() + + +class PythonTestBuilder(object): + class Error(Exception): pass + class InvalidDependencyException(Error): pass + + # TODO(wickman) Expose these as configurable parameters + TEST_TIMEOUT = Amount(2, Time.MINUTES) + TEST_POLL_PERIOD = Amount(100, Time.MILLISECONDS) + + def __init__(self, targets, args, root_dir, interpreter=None, conn_timeout=None): + self.targets = targets + self.args = args + self.root_dir = root_dir + self.successes = {} + self._display = Display(sys.stdout) + self.interpreter = interpreter or PythonInterpreter.get() + self.parallelism = max(1, int(os.environ.get('PANTS_PY_PARALLELISM', 1))) + self.cancelled = threading.Event() + self._producers = 0 + self._producer_lock = threading.Lock() + self.done = threading.Event() + self._conn_timeout = conn_timeout + self._queue = Queue() + self.coverage = 'PANTS_PY_COVERAGE' in os.environ + self.coverage_dir = os.path.join(Config.load().getdefault('pants_distdir'), 'coverage') + if self.coverage: + safe_mkdir(self.coverage_dir, clean=True) + self._equivalent_prefixes = OrderedSet([]) + self._data_files = [] + + def build_chroot(self, target): + chroot = PythonTestChroot( + target, + self.root_dir, + interpreter=self.interpreter, + coverage='PANTS_PY_COVERAGE' in os.environ, + redirect=self.parallelism > 1, + conn_timeout=self._conn_timeout) + chroot.build() + return chroot + + def drain_queue(self, chroot): + while True: + result = chroot.read() + if result: + self._queue.put((chroot, result)) + chroot.acknowledge(result) + else: + break + + def _collect_source_roots(self, target): + for dependency in target.dependencies: + for library in dependency.resolve(): + if isinstance(library, PythonLibrary): + yield library.target_base + + def _collect_coverage(self, chroot): + for prefix, data_file in chroot.iter_coverage_data(): + self._equivalent_prefixes.add(os.path.realpath(prefix)) + target_file = os.path.join(self.coverage_dir, os.path.basename(data_file)) + shutil.copy(data_file, target_file) + self._data_files.append(target_file) + + @classmethod + def _generate_coverage_config(cls, coverage_root, equivalent_prefixes): + with coverage_config(os.path.join(coverage_root, '.coveragerc')) as cp: + def as_list(iterable): + return '\n' + '\n'.join(iterable) + cp.set('run', 'data_file', os.path.join(coverage_root, 'data')) + cp.add_section('paths') + cp.set('paths', 'source', as_list(equivalent_prefixes)) + return os.path.join(coverage_root, '.coveragerc') + + @classmethod + @contextmanager + def _gen_cov(cls, coveragerc): + from coverage import coverage + cov = coverage(config_file=coveragerc) + cov.combine() + yield cov + + def run_report(self, source_root): + self._display.center(' coverage report source root: %s ' % source_root) + with temporary_dir() as td: + for filename in os.listdir(self.coverage_dir): + if filename.startswith('data'): + shutil.copy(os.path.join(self.coverage_dir, filename), os.path.join(td, filename)) + coveragerc = self._generate_coverage_config(td, + [source_root] + list(self._equivalent_prefixes)) + with self._gen_cov(coveragerc) as cov: + cov.report(show_missing=False) + self._display.center('') + self._display.write('HTML report: file://%s' % os.path.realpath( + os.path.join(self.coverage_dir, source_root, 'index.html'))) + cov.html_report(directory=os.path.join(self.coverage_dir, source_root)) + self._display.center('') + print('\n') + + def _run_one(self, chroot): + if self.cancelled.is_set(): + with self._producer_lock: + self._producers -= 1 + chroot.mark_skipped() + return + + chroot.popen(self.args) + + while not chroot.finished and not self.cancelled.is_set(): + self.drain_queue(chroot) + self.cancelled.wait(timeout=0.2) + self.drain_queue(chroot) + + if self.cancelled.is_set(): + chroot.kill() + + self._collect_coverage(chroot) + + with self._producer_lock: + self._producers -= 1 + + def _iter_test_suite(self, target): tests = OrderedSet([]) def _gather_deps(trg): if isinstance(trg, PythonTests): @@ -243,33 +615,112 @@ class PythonTestBuilder(object): for dep in dependency.resolve(): _gather_deps(dep) _gather_deps(target) + return iter(tests) + + def _iter_tests(self, targets): + for target in targets: + if isinstance(target, PythonTests): + yield target + elif isinstance(target, PythonTestSuite): + for subtarget in self._iter_test_suite(target): + yield subtarget + else: + raise self.InvalidDependencyException( + "Invalid dependency in python test target: %s" % target) + + def _parallel_test_driver(self, tests): + from concurrent import futures + from concurrent.futures.thread import ThreadPoolExecutor + with ThreadPoolExecutor(self.parallelism) as executor: + fs = [executor.submit(self._run_one, chroot) for test, chroot in tests.items()] + futures.wait(fs, return_when=futures.ALL_COMPLETED) + for future in fs: + if future.exception(): + print('Future %s excepted: %s' % (future, future.exception())) + self.done.set() + + def run(self): + tests = {} + failed_chroots = set() + self._display.center(' building chroots ') + for test in self._iter_tests(self.targets): + try: + tests[test] = self.build_chroot(test) + result = Display.render('finished', 'green') + except DistributionNotFound: + failed_chroots.add(test) + result = Display.render('failed', 'red') + self._display.left_right(test.address.compact_repr(), result) - failed = False - for test in tests: - rv = self._run_python_test(test) - if not rv.success: - failed = True - if fail_hard: - return rv - return PythonTestResult.rc(1 if failed else 0) + try: + self._run_tests(tests) + except KeyboardInterrupt: + self.cancelled.set() + + self.done.wait() + + self._display.center(' result by target ') + for target, chroot in sorted(tests.items()): + self._display.left_right(target.address.compact_repr(), chroot.outcome) + if chroot.outcome.failed: + output = chroot.output + if output: + self._display.center(' error output ') + self._display.write('\n'.join(chroot.output.splitlines())) + self._display.center('') + for target in failed_chroots: + self._display.left_right(target.address.compact_repr(), PythonTestOutcome.exception()) + + if self.coverage: + source_roots = set() + for target in tests: + source_roots.update(self._collect_source_roots(target)) + for source_root in source_roots: + self.run_report(source_root) + + success = all(chroot.outcome is not None and chroot.outcome.success for chroot in tests.values()) + success = False if failed_chroots else success + return 0 if success else 1 + + def _run_tests(self, tests): + if not tests: + self.done.set() + return + + self._producers = len(tests) - def _run_tests(self, targets): fail_hard = 'PANTS_PYTHON_TEST_FAILSOFT' not in os.environ + if 'PANTS_PY_COVERAGE' in os.environ: # Coverage often throws errors despite tests succeeding, so make PANTS_PY_COVERAGE # force FAILSOFT. fail_hard = False - failed = False - for target in targets: - if isinstance(target, PythonTests): - rv = self._run_python_test(target) - elif isinstance(target, PythonTestSuite): - rv = self._run_python_test_suite(target, fail_hard) - else: - raise PythonTestBuilder.InvalidDependencyException( - "Invalid dependency in python test target: %s" % target) - if not rv.success: - failed = True - if fail_hard: - return rv - return PythonTestResult.rc(1 if failed else 0) + + test_runner_thread = threading.Thread(target=lambda: self._parallel_test_driver(tests)) + test_runner_thread.start() + + if self.parallelism > 1: + self._display.center(' result by test ') + + COLOR_MAP = { + 'passed': 'green', + 'failed': 'red', + 'skipped': 'yellow', + } + + def result_str(result): + return Display.render(result.outcome, COLOR_MAP.get(result.outcome, 'yellow')) + + while self._producers > 0: + while True: + try: + chroot, result = self._queue.get(timeout=0.2) + self._queue.task_done() + except Empty: + break + # If running in parallel mode, serialize test results ourselves. + if self.parallelism > 1: + self._display.left_right( + '%s:%s: %s' % (result.filename, result.lineno, result.function), result_str(result)) + if result.outcome != 'passed' and fail_hard: + self.cancelled.set()