Source code for qepy.calculator

import numpy as np
from copy import deepcopy
from qepy.driver import Driver
from qepy_modules import constants
from qepy.io import QEInput
from ase.calculators.calculator import Calculator
from ase.units import create_units
from ase.geometry import wrap_positions
import ase.io

units = create_units('2006').copy()
units['Bohr'] = constants.BOHR_RADIUS_SI * 1E10
units['Ry'] = constants.RYTOEV

[docs] class QEpyCalculator(Calculator): """QEpy Calculator for ASE. QEpy initialization depends on the input file. Here are some ways to generate input file : - Give a file `inputfile` or/and a dict `qe_options` will generate a temporary input file with new atomic information. - Give input file name `inputfile` and `from_file = True` will read the structures to atoms (ase.Atoms) from `inputfile` and initialize the QEpy with `inputfile`. - Give dict `ase_espresso` will generate a temporary input file with `ase.io.espresso.write_espresso_in <https://wiki.fysik.dtu.dk/ase/ase/io/formatoptions.html#ase.io.espresso.write_espresso_in>`__ (Only works for program PW). Parameters ---------- atoms : ase.Atoms ASE atoms object inputfile : str Name of QE input file from_file : str Read the structure from inputfile, and use the inputfile for the QE Initialization. wrap : bool If True, wrap the atoms back to cell before update the ions. extrapolation : str or bool If False, every step will re-run the QE from file. ase_espresso : dict A dictionary with some parameters to generate PW input. qe_options: dict A dictionary with input parameters for QE to generate QE input file. comm : object Parallel communicator ldescf : bool If True, also print the scf correction term in the iteratively mode iterative : bool Iteratively run the scf or tddft task : str Task of the driver : - 'scf' : scf task - 'optical' : Optical absorption spectrum (TDDFT) - 'nscf' : read file from scf embed : object (Fortran) embed object for QE prefix : str prefix of QE input/output outdir : str The output directory of QE logfile : str, file or bool The screen output of QE. If it is a file descriptor, it should open with 'w+'. - None : Show on the screen. - str : Save to the given file. - True : Save to the temporary file, see also :func:`qepy.driver.Driver.get_output`. .. note:: - if `from_file` is True, the qe_options will not use. - The bool `gather` parameter in functions means the data is gathered in root or distributed in all cpus. - if `ase_espresso` is set, will use ase module to write QE input file. `ase_espresso` contains : + input_data: dict + pseudopotentials: dict + kpts: (int, int, int) or dict + koffset: (int, int, int) + crystal_coordinates: bool More details, see `ase.io.espresso.write_espresso_in <https://wiki.fysik.dtu.dk/ase/ase/io/formatoptions.html#ase.io.espresso.write_espresso_in>`__. """ implemented_properties=['energy', 'forces', 'stress'] def __init__(self, atoms = None, inputfile = None, from_file = False, wrap = False, extrapolation = True, ase_espresso = None, qe_options = None, comm = None, ldescf = False, iterative = False, task = 'scf', embed = None, prefix = None, outdir = None, logfile = None, prog = 'pw', **kwargs): Calculator.__init__(self, atoms = atoms, **kwargs) # self.lstart = False self.optimizer = None # self.atoms = atoms self.inputfile = inputfile self.wrap = wrap self.extrapolation = extrapolation self.from_file = from_file self.ase_espresso = ase_espresso self.prog = prog self.qe_options = qe_options # if self.inputfile is not None and self.atoms is None : self.atoms = ase.io.read(self.inputfile, format='espresso-in') if self.atoms is not None : self.atoms.calc = self # if self.from_file : self.basefile = None else : self.basefile = self.inputfile self.inputfile = 'qepy_input_tmp.in' # self.atoms_save = None self.driver = None self.iter = 0 self.qeinput = QEInput() # self.qepy_options = { 'comm' : comm, 'ldescf' : ldescf, 'iterative' : iterative, 'task' : task, 'embed' : embed, 'logfile' : logfile, } self.qepy_options.update(kwargs) # self.parameters['qe_options'] = qe_options self.parameters['prog'] = prog self.parameters['kpts'] = None # self.restart() def set_atoms(self, atoms): atoms.pbc = [True, True, True] self.atoms = atoms @property def qe_options(self): return self._qe_options @qe_options.setter def qe_options(self, value): if value is not None : # avoid changing the input value value = deepcopy(value) self._qe_options = value def restart_driver(self, qe_options=None, inputfile=None, atoms=None): if qe_options is not None : self.from_file = False self.qe_options = qe_options if inputfile is not None : self.from_file = True self.inputfile = inputfile if atoms is not None : self.atoms_save = atoms.copy() self.atoms = atoms if self.lstart : self.stop() self.driver_initialise() def restart(self): self.results = {} def driver_initialise(self, atoms = None, **kwargs): atoms = atoms or self.atoms if self.wrap : atoms.wrap() if not self.from_file : if self.ase_espresso : ase.io.write(self.inputfile, atoms, format = 'espresso-in', **self.ase_espresso) else : self.qeinput.write_qe_input(self.inputfile, atoms=atoms, basefile=self.basefile, qe_options=self.qe_options, prog=self.prog) self.driver = Driver(self.inputfile, **self.qepy_options) self.lstart = True def update_atoms(self, atoms = None, **kwargs): atoms = atoms or self.atoms # if not self.extrapolation : self.stop() self.driver_initialise(atoms = atoms) return # if self.wrap : positions = wrap_positions(atoms.positions, atoms.cell) else : positions = atoms.positions positions = positions /units['Bohr'] # lattice = atoms.cell /units['Bohr'] lattice_old =self.driver.get_ions_lattice() if np.allclose(lattice, lattice_old): lattice = None # self.driver.update_ions(positions=positions, lattice=lattice, **kwargs) def check_restart(self, atoms = None): restart = True if atoms is None or (self.atoms_save and atoms == self.atoms_save): restart = False if self.iter == 0 : restart = True if restart : if self.atoms is None : self.atoms = atoms self.atoms.calc = self if atoms is not None : self.atoms_save = atoms.copy() else : atoms = self.atoms self.restart() if self.iter > 0 : self.update_atoms(atoms) else : self.driver_initialise(atoms = atoms) return restart def update_optimizer(self, atoms = None, **kwargs): if self.check_restart(atoms): restart = True self.iter += 1 self.driver.scf(**kwargs) else : restart = False return restart
[docs] def stop(self, what = 'all', **kwargs): """See :func:`qepy.driver.Driver.stop`""" if not self.lstart : pass else : self.lstart = False return self.driver.stop(what = 'all', **kwargs)
@property def is_root(self): """See :func:`qepy.driver.Driver.is_root`""" return self.driver.is_root
[docs] def get_density(self, gather = True): """See :func:`qepy.driver.Driver.get_density`""" return self.driver.get_density(gather=gather) / (units['Bohr'] ** 3)
[docs] def get_wave_function(self, band=None, kpt=0): """See :func:`qepy.driver.Driver.get_wave_function`""" return self.driver.get_wave_function(band=band, kpt=kpt)
[docs] def get_number_of_k_points(self): """See :func:`qepy.driver.Driver.get_number_of_k_points`""" return self.driver.get_number_of_k_points()
[docs] def get_potential_energy(self, atoms = None, **kwargs): """See :func:`qepy.driver.Driver.get_potential_energy`""" self.update_optimizer(atoms) return self.driver.get_energy(**kwargs) * units['Ry']
[docs] def get_forces(self, atoms = None, icalc = 0): """See :func:`qepy.driver.Driver.get_forces`""" if self.update_optimizer(atoms) or 'forces' not in self.results : self.results['forces'] = self.driver.get_forces(icalc=icalc)* units['Ry'] / units['Bohr'] return self.results['forces']
[docs] def get_dos(self, qe_options = None, spin = None, inputfile = None, **kwargs): """ calculate density of states (DOS) and return the tuple (energies, dos) Parameters ---------- qe_options : dict A dictionary with input parameters for QE to generate QE input file. spin : int - None : total DOS - 0 : up - 1 : down inputfile : str Directly use the inputfile to run the QE calculation. """ if qe_options or inputfile : if qe_options is not None : qe_options = deepcopy(qe_options) qe_options_dos = { '&control' : {'calculation': "'nscf'"}, '&system' : {'occupations': "'tetrahedra'"}, } qe_options = QEInput.update_options(options=qe_options_dos, qe_options=qe_options) self.restart_driver(qe_options=qe_options, inputfile=inputfile) self.driver.non_scf() from ase.dft.dos import DOS dos = DOS(self, **kwargs) energies = dos.get_energies() dos = dos.get_dos(spin=spin) return energies, dos
[docs] def get_band_structure(self, qe_options = None, kpts = None, reference = None, inputfile = None, **kwargs): """ calculate band structure and return ase BandStructure. Parameters ---------- qe_options : dict A dictionary with input parameters for QE to generate QE input file. kpts : ase.dft.kpoints.bandpath A band path reference : float Fermi level which should come from the SCF or DOS calculation. If not set, will try to use previous calculation instead of band structure calculation. inputfile : str Directly use the inputfile to run the QE calculation. """ from ase.calculators.calculator import kpts2ndarray from ase.spectrum.band_structure import get_band_structure if reference is None and self.lstart : reference = self.get_fermi_level() if qe_options is not None and kpts is not None : qe_options = deepcopy(qe_options) kpoints = [] kgrid = kpts2ndarray(kpts, atoms=self.atoms) kpoints.append(len(kgrid)) w = 1.0/len(kgrid) for k in kgrid: kpoints.append(f'{k[0]:.10f} {k[1]:.10f} {k[2]:.10f} {w}') qe_options_bands = { '&control' : {'calculation': "'bands'"}, 'k_points crystal_b' : kpoints } qe_options = QEInput.update_options(options=qe_options_bands, qe_options=qe_options) self.restart_driver(qe_options=qe_options, inputfile=inputfile) self.driver.non_scf() band = get_band_structure(calc = self, path = kpts, reference=reference) return band
[docs] def get_stress(self, atoms = None): """Return the stress tensor in the Voigt order (xx, yy, zz, yz, xz, xy)""" if self.update_optimizer(atoms) or 'stress' not in self.results : stress = self.driver.get_stress() self.results['stress'] = np.array( [stress[0, 0], stress[1, 1], stress[2, 2], stress[1, 2], stress[0, 2], stress[0, 1]]) * -1 * units['Ry'] / (units['Bohr'] ** 3) return self.results['stress']
[docs] def get_number_of_bands(self): """See :func:`qepy.driver.Driver.get_number_of_bands`""" return self.driver.get_number_of_bands()
[docs] def get_xc_functional(self): """See :func:`qepy.driver.Driver.get_xc_functional`""" return self.driver.get_xc_functional()
[docs] def get_bz_k_points(self): """See :func:`qepy.driver.Driver.get_bz_k_points`""" return self.driver.get_bz_k_points()
def get_bz_to_ibz_map(self): return None
[docs] def get_number_of_spins(self): """See :func:`qepy.driver.Driver.get_number_of_spins`""" return self.driver.get_number_of_spins()
[docs] def get_spin_polarized(self): """See :func:`qepy.driver.Driver.get_spin_polarized`""" return self.driver.get_spin_polarized()
[docs] def get_ibz_k_points(self): """See :func:`qepy.driver.Driver.get_ibz_k_points`""" return self.driver.get_ibz_k_points()
[docs] def get_k_point_weights(self): """See :func:`qepy.driver.Driver.get_k_point_weights`""" return self.driver.get_k_point_weights()
[docs] def get_pseudo_density(self, spin=None, pad=True, gather = False): """See :func:`qepy.driver.Driver.get_pseudo_density`""" return self.driver.get_pseudo_density(spin=spin, pad=pad, gather=gather) / (units['Bohr'] ** 3)
# def get_effective_potential(self, spin=0, pad=True): # """See :func:`qepy.driver.Driver.get_effective_potential`""" # return self.driver.get_effective_potential(spin=spin, pad=pad)
[docs] def get_pseudo_wave_function(self, band=None, kpt=0, spin=0, broadcast=True, pad=True): """See :func:`qepy.driver.Driver.get_pseudo_wave_function`""" return self.driver.get_pseudo_wave_function(band=band, kpt=kpt, spin=spin, broadcast=broadcast, pad=pad)
[docs] def get_eigenvalues(self, kpt=0, spin=0): """See :func:`qepy.driver.Driver.get_eigenvalues`""" return self.driver.get_eigenvalues(kpt=kpt, spin=spin) * units['Ry']
[docs] def get_occupation_numbers(self, kpt=0, spin=0): """See :func:`qepy.driver.Driver.get_occupation_numbers`""" return self.driver.get_occupation_numbers(kpt=kpt, spin=spin)
[docs] def get_fermi_level(self): """See :func:`qepy.driver.Driver.get_fermi_level`""" return self.driver.get_fermi_level() * units['Ry']
# def initial_wannier(self, initialwannier, kpointgrid, fixedstates, # edf, spin, nbands): # """See :func:`qepy.driver.Driver.initial_wannier`""" # raise NotImplementedError # def get_wannier_localization_matrix(self, nbands, dirG, kpoint, # nextkpoint, G_I, spin): # """See :func:`qepy.driver.Driver.get_wannier_localization_matrix`""" # raise NotImplementedError
[docs] def get_magnetic_moment(self, atoms=None): """See :func:`qepy.driver.Driver.get_magnetic_moment`""" return self.driver.get_magnetic_moment(atoms=atoms)
[docs] def get_number_of_grid_points(self, gather = True): """See :func:`qepy.driver.Driver.get_number_of_grid_points`""" return self.driver.get_number_of_grid_points(gather=gather)
[docs] @staticmethod def get_atoms_from_qepy(): """Return the atom.Atoms from QE.""" atoms = Driver.get_ase_atoms() return atoms