# -*- coding: utf-8 -*- ########################################################################### # Copyright (c), The AiiDA team. All rights reserved. # # This file is part of the AiiDA code. # # # # The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### # pylint: disable=no-name-in-module """Tests to run with a running daemon.""" import subprocess import sys import time from aiida.common import exceptions from aiida.engine import run, submit from aiida.engine.daemon.client import get_daemon_client from aiida.engine.persistence import ObjectLoader from aiida.manage.caching import enable_caching from aiida.orm import CalcJobNode, load_node, Int, Str, List, Dict, load_code from aiida.plugins import CalculationFactory, WorkflowFactory from workchains import ( NestedWorkChain, DynamicNonDbInput, DynamicDbInput, DynamicMixedInput, ListEcho, CalcFunctionRunnerWorkChain, WorkFunctionRunnerWorkChain, NestedInputNamespace, SerializeWorkChain, ArithmeticAddBaseWorkChain ) CODENAME_ADD = 'add@localhost' CODENAME_DOUBLER = 'doubler' TIMEOUTSECS = 4 * 60 # 4 minutes NUMBER_CALCULATIONS = 10 # Number of calculations to submit NUMBER_WORKCHAINS = 20 # Number of workchains to submit def print_daemon_log(): """Print daemon log.""" daemon_client = get_daemon_client() daemon_log = daemon_client.daemon_log_file print(f"Output of 'cat {daemon_log}':") try: print(subprocess.check_output( ['cat', f'{daemon_log}'], stderr=subprocess.STDOUT, )) except subprocess.CalledProcessError as exception: print(f'Note: the command failed, message: {exception}') def jobs_have_finished(pks): """Check if jobs with given pks have finished.""" finished_list = [load_node(pk).is_terminated for pk in pks] node_list = [load_node(pk) for pk in pks] num_finished = len([_ for _ in finished_list if _]) for node in node_list: if not node.is_terminated: print(f'not terminated: {node.pk} [{node.process_state}]') print(f'{num_finished}/{len(finished_list)} finished') return False not in finished_list def print_report(pk): """Print the process report for given pk.""" print(f"Output of 'verdi process report {pk}':") try: print(subprocess.check_output( ['verdi', 'process', 'report', f'{pk}'], stderr=subprocess.STDOUT, )) except subprocess.CalledProcessError as exception: print(f'Note: the command failed, message: {exception}') def validate_process_functions(expected_results): """Validate the calcfunction and workfunction.""" valid = True for pk, expected_result in expected_results.items(): calc = load_node(pk) if not calc.is_finished_ok: print(f'Calc<{pk}> not finished ok: process_state<{calc.process_state}> exit_status<{calc.exit_status}>') print_report(pk) valid = False try: actual_result = calc.outputs.result except exceptions.NotExistent: print(f'Could not retrieve `result` output for process<{pk}>') print_report(pk) valid = False if actual_result != expected_result: print(f'* UNEXPECTED VALUE {actual_result} for calc pk={pk}: I expected {expected_result}') valid = False return valid def validate_calculations(expected_results): """Validate the calculations.""" valid = True actual_dict = {} for pk, expected_dict in expected_results.items(): calc = load_node(pk) if not calc.is_finished_ok: print(f'Calc<{pk}> not finished ok: process_state<{calc.process_state}> exit_status<{calc.exit_status}>') print_report(pk) valid = False try: actual_dict = calc.outputs.output_parameters.get_dict() except exceptions.NotExistent: print(f'Could not retrieve `output_parameters` node for Calculation<{pk}>') print_report(pk) valid = False try: actual_dict['retrieved_temporary_files'] = dict(actual_dict['retrieved_temporary_files']) except KeyError: # If the retrieval fails we simply pass as the following check of the actual value will fail anyway pass if actual_dict != expected_dict: print(f'* UNEXPECTED VALUE {actual_dict} for calc pk={pk}: I expected {expected_dict}') valid = False return valid def validate_workchains(expected_results): """Validate the workchains.""" valid = True for pk, expected_value in expected_results.items(): this_valid = True try: calc = load_node(pk) actual_value = calc.outputs.output except (exceptions.NotExistent, AttributeError) as exception: print( '* UNABLE TO RETRIEVE VALUE for workchain pk={}: I expected {}, I got {}: {}'.format( pk, expected_value, type(exception), exception ) ) valid = False this_valid = False actual_value = None # I check only if this_valid, otherwise calc could not exist if this_valid and not calc.is_finished_ok: print( 'Calculation<{}> not finished ok: process_state<{}> exit_status<{}>'.format( pk, calc.process_state, calc.exit_status ) ) print_report(pk) valid = False this_valid = False # I check only if this_valid, otherwise actual_value could be unset if this_valid and actual_value != expected_value: print( '* UNEXPECTED VALUE {}, type {} for workchain pk={}: I expected {}, type {}'.format( actual_value, type(actual_value), pk, expected_value, type(expected_value) ) ) valid = False this_valid = False return valid def validate_cached(cached_calcs): """ Check that the calculations with created with caching are indeed cached. """ valid = True for calc in cached_calcs: if not calc.is_finished_ok: print( 'Cached calculation<{}> not finished ok: process_state<{}> exit_status<{}>'.format( calc.pk, calc.process_state, calc.exit_status ) ) print_report(calc.pk) valid = False if '_aiida_cached_from' not in calc.extras or calc.get_hash() != calc.get_extra('_aiida_hash'): print(f'Cached calculation<{calc.pk}> has invalid hash') print_report(calc.pk) valid = False if isinstance(calc, CalcJobNode): original_calc = load_node(calc.get_extra('_aiida_cached_from')) files_original = original_calc.list_object_names() files_cached = calc.list_object_names() if not files_cached: print(f'Cached calculation <{calc.pk}> does not have any raw inputs files') print_report(calc.pk) valid = False if not files_original: print( 'Original calculation <{}> does not have any raw inputs files after being cached from.'.format( original_calc.pk ) ) valid = False if set(files_original) != set(files_cached): print( 'different raw input files [{}] vs [{}] for original<{}> and cached<{}> calculation'.format( set(files_original), set(files_cached), original_calc.pk, calc.pk ) ) valid = False return valid def launch_calculation(code, counter, inputval): """ Launch calculations to the daemon through the Process layer """ process, inputs, expected_result = create_calculation_process(code=code, inputval=inputval) calc = submit(process, **inputs) print(f'[{counter}] launched calculation {calc.uuid}, pk={calc.pk}') return calc, expected_result def run_calculation(code, counter, inputval): """ Run a calculation through the Process layer. """ process, inputs, expected_result = create_calculation_process(code=code, inputval=inputval) _, calc = run.get_node(process, **inputs) print(f'[{counter}] ran calculation {calc.uuid}, pk={calc.pk}') return calc, expected_result def create_calculation_process(code, inputval): """ Create the process and inputs for a submitting / running a calculation. """ TemplatereplacerCalculation = CalculationFactory('templatereplacer') parameters = Dict(dict={'value': inputval}) template = Dict( dict={ # The following line adds a significant sleep time. # I set it to 1 second to speed up tests # I keep it to a non-zero value because I want # To test the case when AiiDA finds some calcs # in a queued state # 'cmdline_params': ["{}".format(counter % 3)], # Sleep time 'cmdline_params': ['1'], 'input_file_template': '{value}', # File just contains the value to double 'input_file_name': 'value_to_double.txt', 'output_file_name': 'output.txt', 'retrieve_temporary_files': ['triple_value.tmp'] } ) options = { 'resources': { 'num_machines': 1 }, 'max_wallclock_seconds': 5 * 60, 'withmpi': False, 'parser_name': 'templatereplacer.doubler', } expected_result = {'value': 2 * inputval, 'retrieved_temporary_files': {'triple_value.tmp': str(inputval * 3)}} inputs = { 'code': code, 'parameters': parameters, 'template': template, 'metadata': { 'options': options, } } return TemplatereplacerCalculation, inputs, expected_result def run_arithmetic_add(): """Run the `ArithmeticAddCalculation`.""" ArithmeticAddCalculation = CalculationFactory('arithmetic.add') code = load_code(CODENAME_ADD) inputs = { 'x': Int(1), 'y': Int(2), 'code': code, } # Normal inputs should run just fine results, node = run.get_node(ArithmeticAddCalculation, **inputs) assert node.is_finished_ok, node.exit_status assert results['sum'] == 3 def run_base_restart_workchain(): """Run the `AddArithmeticBaseWorkChain` a few times for various inputs.""" code = load_code(CODENAME_ADD) inputs = { 'add': { 'x': Int(1), 'y': Int(2), 'code': code, } } # Normal inputs should run just fine results, node = run.get_node(ArithmeticAddBaseWorkChain, **inputs) assert node.is_finished_ok, node.exit_status assert len(node.called) == 1 assert 'sum' in results assert results['sum'].value == 3 # With one input negative, the sum will be negative which will fail the calculation, but the error handler should # fix it, so the second calculation should finish successfully inputs['add']['y'] = Int(-4) results, node = run.get_node(ArithmeticAddBaseWorkChain, **inputs) assert node.is_finished_ok, node.exit_status assert len(node.called) == 2 assert 'sum' in results assert results['sum'].value == 5 # The silly sanity check aborts the workchain if the sum is bigger than 10 inputs['add']['y'] = Int(10) results, node = run.get_node(ArithmeticAddBaseWorkChain, **inputs) assert not node.is_finished_ok, node.process_state assert node.exit_status == ArithmeticAddBaseWorkChain.exit_codes.ERROR_TOO_BIG.status, node.exit_status # pylint: disable=no-member assert len(node.called) == 1 # Check that overriding default handler enabled status works inputs['add']['y'] = Int(1) inputs['handler_overrides'] = Dict(dict={'disabled_handler': True}) results, node = run.get_node(ArithmeticAddBaseWorkChain, **inputs) assert not node.is_finished_ok, node.process_state assert node.exit_status == ArithmeticAddBaseWorkChain.exit_codes.ERROR_ENABLED_DOOM.status, node.exit_status # pylint: disable=no-member assert len(node.called) == 1 def run_multiply_add_workchain(): """Run the `MultiplyAddWorkChain`.""" MultiplyAddWorkChain = WorkflowFactory('arithmetic.multiply_add') code = load_code(CODENAME_ADD) inputs = { 'x': Int(1), 'y': Int(2), 'z': Int(3), 'code': code, } # Normal inputs should run just fine results, node = run.get_node(MultiplyAddWorkChain, **inputs) assert node.is_finished_ok, node.exit_status assert len(node.called) == 2 assert 'result' in results assert results['result'].value == 5 def main(): """Launch a bunch of calculation jobs and workchains.""" # pylint: disable=too-many-locals,too-many-statements,too-many-branches expected_results_process_functions = {} expected_results_calculations = {} expected_results_workchains = {} code_doubler = load_code(CODENAME_DOUBLER) # Submitting the Calculations the new way directly through the launchers print(f'Submitting {NUMBER_CALCULATIONS} calculations to the daemon') for counter in range(1, NUMBER_CALCULATIONS + 1): inputval = counter calc, expected_result = launch_calculation(code=code_doubler, counter=counter, inputval=inputval) expected_results_calculations[calc.pk] = expected_result # Submitting the Workchains print(f'Submitting {NUMBER_WORKCHAINS} workchains to the daemon') for index in range(NUMBER_WORKCHAINS): inp = Int(index) submit(NestedWorkChain, inp=inp) print("Submitting a workchain with 'submit'.") builder = NestedWorkChain.get_builder() input_val = 4 builder.inp = Int(input_val) proc = submit(builder) expected_results_workchains[proc.pk] = input_val print('Submitting a workchain with a nested input namespace.') value = Int(-12) pk = submit(NestedInputNamespace, foo={'bar': {'baz': value}}).pk print('Submitting a workchain with a dynamic non-db input.') value = [4, 2, 3] pk = submit(DynamicNonDbInput, namespace={'input': value}).pk expected_results_workchains[pk] = value print('Submitting a workchain with a dynamic db input.') value = 9 pk = submit(DynamicDbInput, namespace={'input': Int(value)}).pk expected_results_workchains[pk] = value print('Submitting a workchain with a mixed (db / non-db) dynamic input.') value_non_db = 3 value_db = Int(2) pk = submit(DynamicMixedInput, namespace={'inputs': {'input_non_db': value_non_db, 'input_db': value_db}}).pk expected_results_workchains[pk] = value_non_db + value_db print('Submitting the serializing workchain') pk = submit(SerializeWorkChain, test=Int).pk expected_results_workchains[pk] = ObjectLoader().identify_object(Int) print('Submitting the ListEcho workchain.') list_value = List() list_value.extend([1, 2, 3]) pk = submit(ListEcho, list=list_value).pk expected_results_workchains[pk] = list_value print('Submitting a WorkChain which contains a workfunction.') value = Str('workfunction test string') pk = submit(WorkFunctionRunnerWorkChain, input=value).pk expected_results_workchains[pk] = value print('Submitting a WorkChain which contains a calcfunction.') value = Int(1) pk = submit(CalcFunctionRunnerWorkChain, input=value).pk expected_results_workchains[pk] = Int(2) calculation_pks = sorted(expected_results_calculations.keys()) workchains_pks = sorted(expected_results_workchains.keys()) process_functions_pks = sorted(expected_results_process_functions.keys()) pks = calculation_pks + workchains_pks + process_functions_pks print('Wating for end of execution...') start_time = time.time() exited_with_timeout = True while time.time() - start_time < TIMEOUTSECS: time.sleep(15) # Wait a few seconds # Print some debug info, both for debugging reasons and to avoid # that the test machine is shut down because there is no output print('#' * 78) print(f'####### TIME ELAPSED: {time.time() - start_time} s') print('#' * 78) print("Output of 'verdi process list -a':") try: print(subprocess.check_output( ['verdi', 'process', 'list', '-a'], stderr=subprocess.STDOUT, )) except subprocess.CalledProcessError as exception: print(f'Note: the command failed, message: {exception}') print("Output of 'verdi daemon status':") try: print(subprocess.check_output( ['verdi', 'daemon', 'status'], stderr=subprocess.STDOUT, )) except subprocess.CalledProcessError as exception: print(f'Note: the command failed, message: {exception}') if jobs_have_finished(pks): print('Calculation terminated its execution') exited_with_timeout = False break if exited_with_timeout: print_daemon_log() print('') print(f'Timeout!! Calculation did not complete after {TIMEOUTSECS} seconds') sys.exit(2) if __name__ == '__main__': main()