Source code for aiida_hubbard.workflows.hp.base

# -*- coding: utf-8 -*-
"""Workchain to run a Quantum ESPRESSO hp.x calculation with automated error handling and restarts."""
from aiida import orm
from aiida.common import AttributeDict
from aiida.common.lang import type_check
from aiida.engine import BaseRestartWorkChain, ProcessHandlerReport, process_handler, while_
from aiida.plugins import CalculationFactory
from aiida_quantumespresso.workflows.protocols.utils import ProtocolMixin

[docs]HpCalculation = CalculationFactory('quantumespresso.hp')
[docs]class HpBaseWorkChain(BaseRestartWorkChain, ProtocolMixin): """Workchain to run a Quantum ESPRESSO hp.x calculation with automated error handling and restarts."""
[docs] _process_class = HpCalculation
[docs] defaults = AttributeDict({ 'delta_factor_alpha_mix': 0.5, 'delta_factor_niter_max': 2, 'delta_factor_max_seconds': 0.95, })
@classmethod
[docs] def define(cls, spec): """Define the process specification.""" # yapf: disable super().define(spec) spec.expose_inputs(HpCalculation, namespace='hp') spec.input('only_initialization', valid_type=orm.Bool, default=lambda: orm.Bool(False)) spec.input('clean_workdir', valid_type=orm.Bool, default=lambda: orm.Bool(False), help='If `True`, work directories of all called calculation will be cleaned at the end of execution.') spec.outline( cls.setup, cls.validate_parameters, while_(cls.should_run_process)( cls.prepare_process, cls.run_process, cls.inspect_process, ), cls.results, ) spec.expose_outputs(HpCalculation) spec.exit_code(300, 'ERROR_UNRECOVERABLE_FAILURE', message='The calculation failed with an unrecoverable error.')
@classmethod
[docs] def get_protocol_filepath(cls): """Return ``pathlib.Path`` to the ``.yaml`` file that defines the protocols.""" from importlib_resources import files from ..protocols import hp as hp_protocols return files(hp_protocols) / 'base.yaml'
@classmethod
[docs] def get_builder_from_protocol( cls, code, protocol=None, parent_scf_folder=None, parent_hp_folders: dict = None, overrides=None, options=None, **_ ): """Return a builder prepopulated with inputs selected according to the chosen protocol. :param code: the ``Code`` instance configured for the ``quantumespresso.hp`` plugin. :param protocol: protocol to use, if not specified, the default will be used. :param parent_scf_folder: the parent ``RemoteData`` of the respective SCF calcualtion. :param parent_hp_folders: the parent ``FolderData`` of the respective single atoms HP calcualtions. :param overrides: optional dictionary of inputs to override the defaults of the protocol. :param options: A dictionary of options that will be recursively set for the ``metadata.options`` input of all the ``CalcJobs`` that are nested in this work chain. :return: a process builder instance with all inputs defined ready for launch. """ from aiida_quantumespresso.workflows.protocols.utils import recursive_merge if isinstance(code, str): code = orm.load_code(code) type_check(code, orm.AbstractCode) inputs = cls.get_protocol_inputs(protocol, overrides) # Update the parameters based on the protocol inputs parameters = inputs['hp']['parameters'] metadata = inputs['hp']['metadata'] qpoints_mesh = inputs['hp'].pop('qpoints') qpoints = orm.KpointsData() qpoints.set_kpoints_mesh(qpoints_mesh) # If overrides are provided, they are considered absolute if overrides: parameter_overrides = overrides.get('hp', {}).get('parameters', {}) parameters = recursive_merge(parameters, parameter_overrides) if options: metadata['options'] = recursive_merge(inputs['hp']['metadata']['options'], options) hubbard_structure = inputs['hp'].pop('hubbard_structure', None) parent_scf = parent_scf_folder if not 'parent_scf' in inputs['hp'] else inputs['hp']['parent_scf'] parent_hp = parent_hp_folders if not 'parent_scf' in inputs['hp'] else inputs['hp']['parent_scf'] # pylint: disable=no-member builder = cls.get_builder() builder.hp['code'] = code builder.hp['qpoints'] = qpoints builder.hp['parameters'] = orm.Dict(parameters) builder.hp['metadata'] = metadata if 'settings' in inputs['hp']: builder.hp['settings'] = orm.Dict(inputs['hp']['settings']) if hubbard_structure: builder.hp['hubbard_structure'] = hubbard_structure if parent_scf: builder.hp['parent_scf'] = parent_scf if parent_hp: builder.hp['parent_hp'] = parent_hp builder.only_initialization = orm.Bool(inputs['only_initialization']) builder.clean_workdir = orm.Bool(inputs['clean_workdir']) # pylint: enable=no-member return builder
[docs] def setup(self): """Call the `setup` of the `BaseRestartWorkChain` and then create the inputs dictionary in `self.ctx.inputs`. This `self.ctx.inputs` dictionary will be used by the `BaseRestartWorkChain` to submit the calculations in the internal loop. """ super().setup() self.ctx.restart_calc = None self.ctx.inputs = AttributeDict(self.exposed_inputs(HpCalculation, 'hp'))
[docs] def validate_parameters(self): """Validate inputs that might depend on each other and cannot be validated by the spec.""" self.ctx.inputs.parameters = self.ctx.inputs.parameters.get_dict() self.ctx.inputs.parameters.setdefault('INPUTHP', {}) if self.inputs.only_initialization.value: self.ctx.inputs.parameters['INPUTHP']['determine_num_pert_only'] = True self.ctx.inputs.settings = self.ctx.inputs.settings.get_dict() if 'settings' in self.ctx.inputs else {}
[docs] def set_max_seconds(self, max_wallclock_seconds): """Set the `max_seconds` to a fraction of `max_wallclock_seconds` option to prevent out-of-walltime problems. :param max_wallclock_seconds: the maximum wallclock time that will be set in the scheduler settings. """ max_seconds_factor = self.defaults.delta_factor_max_seconds max_seconds = max_wallclock_seconds * max_seconds_factor self.ctx.inputs.parameters['INPUTHP']['max_seconds'] = max_seconds
[docs] def prepare_process(self): """Prepare the inputs for the next calculation.""" max_wallclock_seconds = self.ctx.inputs.metadata.options.get('max_wallclock_seconds', None) if max_wallclock_seconds is not None and 'max_seconds' not in self.ctx.inputs.parameters['INPUTHP']: self.set_max_seconds(max_wallclock_seconds)
[docs] def report_error_handled(self, node, action): """Report an action taken for a calculation that has failed. This should be called in a registered error handler if its condition is met and an action was taken. :param node: the failed calculation node :param action: a string message with the action taken """ self.report(f'{node.process_label}<{node.pk}> failed with exit status {node.exit_status}: {node.exit_message}') self.report(f'Action taken: {action}')
@process_handler(priority=600)
[docs] def handle_unrecoverable_failure(self, node): """Handle calculations with an exit status below 400 which are unrecoverable, so abort the work chain.""" if node.is_failed and node.exit_status < 400: self.report_error_handled(node, 'unrecoverable error, aborting...') return ProcessHandlerReport(True, self.exit_codes.ERROR_UNRECOVERABLE_FAILURE)
@process_handler(priority=460, exit_codes=HpCalculation.exit_codes.ERROR_COMPUTING_CHOLESKY)
[docs] def handle_computing_cholesky(self, _): """Handle `ERROR_COMPUTING_CHOLESKY`: set parallel diagonalization to 1 and restart. Parallelization of diagonalization may produce in some cases too much numerical noise, giving rise to Cholesky factorization issues. As other diagonalization algorithms are not available in `hp.x`, we try to set the diagonalization flag to 1, if not already set. """ settings = self.ctx.inputs.settings cmdline = settings.get('cmdline', []) for key in ['-ndiag', '-northo', '-nd']: if key in cmdline: index = cmdline.index(key) if int(cmdline[index+1]) == 1: self.report('diagonalization flag already to 1, stopping') return ProcessHandlerReport(False) cmdline[index+1] = '1' # enforce to be 1 break else: cmdline += ['-nd', '1'] settings['cmdline'] = cmdline self.report('set parallelization flag for diagonalization to 1, restarting') return ProcessHandlerReport(True)
@process_handler(priority=410, exit_codes=HpCalculation.exit_codes.ERROR_CONVERGENCE_NOT_REACHED)
[docs] def handle_convergence_not_reached(self, _): """Handle `ERROR_CONVERGENCE_NOT_REACHED`: decrease `alpha_mix` and restart. Since `hp.x` does not support restarting from incomplete calculations, the entire calculation will have to be restarted from scratch. By decreasing `alpha_mix` there is a chance that the next run will converge. If these keys are present in the input parameters, they will be scaled by a default factor, otherwise, a hardcoded default value will be set that is lower/higher than that of the code's default. """ parameters = self.ctx.inputs.parameters['INPUTHP'] changes = [] # The `alpha_mix` parameter is an array and so all keys matching `alpha_mix(i)` with `i` some integer should # be corrected accordingly. If no such key exists, the default `alpha_mix(1)` is set. if any(parameter.startswith('alpha_mix(') for parameter in parameters.keys()): for parameter in parameters.keys(): if parameter.startswith('alpha_mix('): parameters[parameter] *= self.defaults.delta_factor_alpha_mix changes.append(f'changed `{parameter}` to {parameters[parameter]}') else: parameter = 'alpha_mix(1)' parameters[parameter] = 0.20 changes.append(f'set `{parameter}` to {parameters[parameter]}') if changes: self.report(f"convergence not reached: {', '.join(changes)}") else: self.report('convergence not reached, restarting') return ProcessHandlerReport(True)
[docs] def on_terminated(self): """Clean the working directories of all child calculations if `clean_workdir=True` in the inputs.""" super().on_terminated() if self.inputs.clean_workdir.value is False: self.report('remote folders will not be cleaned') return cleaned_calcs = [] for called_descendant in self.node.called_descendants: if isinstance(called_descendant, orm.CalcJobNode): try: called_descendant.outputs.remote_folder._clean() # pylint: disable=protected-access cleaned_calcs.append(called_descendant.pk) except (IOError, OSError, KeyError): pass if cleaned_calcs: self.report(f"cleaned remote folders of calculations: {' '.join(map(str, cleaned_calcs))}")