Source code for qepy.calculator

import numpy as np
from copy import deepcopy
from qepy.driver import Driver
from qepy.io import QEInput
from qepy import constants
from ase.calculators.calculator import Calculator
from ase.units import create_units
from ase.geometry import wrap_positions
import ase.io

units = create_units('2006').copy()
units['Bohr'] = constants.BOHR_RADIUS_SI * 1E10
units['Ry'] = constants.RYTOEV

[docs]class QEpyCalculator(Calculator): """QEpy Calculator for ASE. QEpy initialization depends on the input file. Here are some ways to generate input file : - Give a file `inputfile` or/and a dict `qe_options` will generate a temporary input file with new atomic information. - Give input file name `inputfile` and `from_file = True` will read the structures to atoms (ase.Atoms) from `inputfile` and initialize the QEpy with `inputfile`. - Give dict `ase_espresso` will generate a temporary input file with `ase.io.espresso.write_espresso_in <https://wiki.fysik.dtu.dk/ase/ase/io/formatoptions.html#ase.io.espresso.write_espresso_in>`__ (Only works for program PW). Parameters ---------- atoms : ase.Atoms ASE atoms object inputfile : str Name of QE input file from_file : str Read the structure from inputfile, and use the inputfile for the QE Initialization. wrap : bool If True, wrap the atoms back to cell before update the ions. extrapolation : str or bool If False, every step will re-run the QE from file. ase_espresso : dict A dictionary with some parameters to generate PW input. qe_options: dict A dictionary with input parameters for QE to generate QE input file. comm : object Parallel communicator ldescf : bool If True, also print the scf correction term in the iteratively mode iterative : bool Iteratively run the scf or tddft task : str Task of the driver : - 'scf' : scf task - 'optical' : Optical absorption spectrum (TDDFT) - 'nscf' : read file from scf embed : object (Fortran) embed object for QE prefix : str prefix of QE input/output outdir : str The output directory of QE logfile : str, file or bool The screen output of QE. If it is a file descriptor, it should open with 'w+'. - None : Show on the screen. - str : Save to the given file. - True : Save to the temporary file, see also :func:`qepy.driver.Driver.get_output`. .. note:: - if `from_file` is True, the qe_options will not use. - The bool `gather` parameter in functions means the data is gathered in root or distributed in all cpus. - if `ase_espresso` is set, will use ase module to write QE input file. `ase_espresso` contains : + input_data: dict + pseudopotentials: dict + kpts: (int, int, int) or dict + koffset: (int, int, int) + crystal_coordinates: bool More details, see `ase.io.espresso.write_espresso_in <https://wiki.fysik.dtu.dk/ase/ase/io/formatoptions.html#ase.io.espresso.write_espresso_in>`__. """ implemented_properties=['energy', 'forces', 'stress'] def __init__(self, atoms = None, inputfile = None, from_file = False, wrap = False, extrapolation = True, ase_espresso = None, qe_options = None, comm = None, ldescf = False, iterative = False, task = 'scf', embed = None, prefix = None, outdir = None, logfile = None, prog = 'pw', **kwargs): Calculator.__init__(self, atoms = atoms, **kwargs) # self.lstart = False self.optimizer = None # self.atoms = atoms self.inputfile = inputfile self.wrap = wrap self.extrapolation = extrapolation self.from_file = from_file self.ase_espresso = ase_espresso self.prog = prog self.qe_options = qe_options # if self.inputfile is not None and self.atoms is None : self.atoms = ase.io.read(self.inputfile, format='espresso-in') if self.atoms is not None : self.atoms.calc = self # if self.from_file : self.basefile = None else : self.basefile = self.inputfile self.inputfile = 'qepy_input_tmp.in' # self.atoms_save = None self.driver = None self.iter = 0 self.qeinput = QEInput() # self.qepy_options = { 'comm' : comm, 'ldescf' : ldescf, 'iterative' : iterative, 'task' : task, 'embed' : embed, 'logfile' : logfile, } self.qepy_options.update(kwargs) # self.parameters['qe_options'] = qe_options self.parameters['prog'] = prog self.parameters['kpts'] = None # self.restart() def set_atoms(self, atoms): atoms.pbc = [True, True, True] self.atoms = atoms @property def qe_options(self): return self._qe_options @qe_options.setter def qe_options(self, value): if value is not None : # avoid changing the input value value = deepcopy(value) self._qe_options = value def restart_driver(self, qe_options=None, inputfile=None, atoms=None): if qe_options is not None : self.from_file = False self.qe_options = qe_options if inputfile is not None : self.from_file = True self.inputfile = inputfile if atoms is not None : self.atoms_save = atoms.copy() self.atoms = atoms if self.lstart : self.stop() self.driver_initialise() def restart(self): self.results = {} def driver_initialise(self, atoms = None, **kwargs): atoms = atoms or self.atoms if self.wrap : atoms.wrap() if not self.from_file : if self.ase_espresso : ase.io.write(self.inputfile, atoms, format = 'espresso-in', **self.ase_espresso) else : self.qeinput.write_qe_input(self.inputfile, atoms=atoms, basefile=self.basefile, qe_options=self.qe_options, prog=self.prog) self.driver = Driver(self.inputfile, **self.qepy_options) self.lstart = True def update_atoms(self, atoms = None, **kwargs): atoms = atoms or self.atoms # if not self.extrapolation : self.stop() self.driver_initialise(atoms = atoms) return # if self.wrap : positions = wrap_positions(atoms.positions, atoms.cell) else : positions = atoms.positions positions = positions /units['Bohr'] # lattice = atoms.cell /units['Bohr'] lattice_old =self.driver.get_ions_lattice() if np.allclose(lattice, lattice_old): lattice = None # self.driver.update_ions(positions=positions, lattice=lattice, **kwargs) def check_restart(self, atoms = None): restart = True if atoms is None or (self.atoms_save and atoms == self.atoms_save): restart = False if self.iter == 0 : restart = True if restart : if self.atoms is None : self.atoms = atoms self.atoms.calc = self if atoms is not None : self.atoms_save = atoms.copy() else : atoms = self.atoms self.restart() if self.iter > 0 : self.update_atoms(atoms) else : self.driver_initialise(atoms = atoms) return restart def update_optimizer(self, atoms = None, **kwargs): if self.check_restart(atoms): restart = True self.iter += 1 self.driver.scf(**kwargs) else : restart = False return restart
[docs] def stop(self, what = 'all', **kwargs): """See :func:`qepy.driver.Driver.stop`""" if not self.lstart : pass else : self.lstart = False return self.driver.stop(what = 'all', **kwargs)
@property def is_root(self): """See :func:`qepy.driver.Driver.is_root`""" return self.driver.is_root
[docs] def get_density(self, gather = True): """See :func:`qepy.driver.Driver.get_density`""" return self.driver.get_density(gather=gather) / (units['Bohr'] ** 3)
[docs] def get_wave_function(self, band=None, kpt=0): """See :func:`qepy.driver.Driver.get_wave_function`""" return self.driver.get_wave_function(band=band, kpt=kpt)
[docs] def get_number_of_k_points(self): """See :func:`qepy.driver.Driver.get_number_of_k_points`""" return self.driver.get_number_of_k_points()
[docs] def get_potential_energy(self, atoms = None, **kwargs): """See :func:`qepy.driver.Driver.get_potential_energy`""" self.update_optimizer(atoms) return self.driver.get_energy(**kwargs) * units['Ry']
[docs] def get_forces(self, atoms = None, icalc = 0): """See :func:`qepy.driver.Driver.get_forces`""" if self.update_optimizer(atoms) or 'forces' not in self.results : self.results['forces'] = self.driver.get_forces(icalc=icalc)* units['Ry'] / units['Bohr'] return self.results['forces']
[docs] def get_dos(self, qe_options = None, spin = None, inputfile = None, **kwargs): """ calculate density of states (DOS) and return the tuple (energies, dos) Parameters ---------- qe_options : dict A dictionary with input parameters for QE to generate QE input file. spin : int - None : total DOS - 0 : up - 1 : down inputfile : str Directly use the inputfile to run the QE calculation. """ if qe_options or inputfile : if qe_options is not None : qe_options = deepcopy(qe_options) qe_options_dos = { '&control' : {'calculation': "'nscf'"}, '&system' : {'occupations': "'tetrahedra'"}, } qe_options = QEInput.update_options(options=qe_options_dos, qe_options=qe_options) self.restart_driver(qe_options=qe_options, inputfile=inputfile) self.driver.non_scf() from ase.dft.dos import DOS dos = DOS(self, **kwargs) energies = dos.get_energies() dos = dos.get_dos(spin=spin) return energies, dos
[docs] def get_band_structure(self, qe_options = None, kpts = None, reference = None, inputfile = None, **kwargs): """ calculate band structure and return ase BandStructure. Parameters ---------- qe_options : dict A dictionary with input parameters for QE to generate QE input file. kpts : ase.dft.kpoints.bandpath A band path reference : float Fermi level which should come from the SCF or DOS calculation. If not set, will try to use previous calculation instead of band structure calculation. inputfile : str Directly use the inputfile to run the QE calculation. """ from ase.calculators.calculator import kpts2ndarray from ase.spectrum.band_structure import get_band_structure if reference is None and self.lstart : reference = self.get_fermi_level() if qe_options is not None and kpts is not None : qe_options = deepcopy(qe_options) kpoints = [] kgrid = kpts2ndarray(kpts, atoms=self.atoms) kpoints.append(len(kgrid)) w = 1.0/len(kgrid) for k in kgrid: kpoints.append(f'{k[0]:.10f} {k[1]:.10f} {k[2]:.10f} {w}') qe_options_bands = { '&control' : {'calculation': "'bands'"}, 'k_points crystal_b' : kpoints } qe_options = QEInput.update_options(options=qe_options_bands, qe_options=qe_options) self.restart_driver(qe_options=qe_options, inputfile=inputfile) self.driver.non_scf() band = get_band_structure(calc = self, path = kpts, reference=reference) return band
[docs] def get_stress(self, atoms = None): """Return the stress tensor in the Voigt order (xx, yy, zz, yz, xz, xy)""" if self.update_optimizer(atoms) or 'stress' not in self.results : stress = self.driver.get_stress() self.results['stress'] = np.array( [stress[0, 0], stress[1, 1], stress[2, 2], stress[1, 2], stress[0, 2], stress[0, 1]]) * -1 * units['Ry'] / (units['Bohr'] ** 3) return self.results['stress']
[docs] def get_number_of_bands(self): """See :func:`qepy.driver.Driver.get_number_of_bands`""" return self.driver.get_number_of_bands()
[docs] def get_xc_functional(self): """See :func:`qepy.driver.Driver.get_xc_functional`""" return self.driver.get_xc_functional()
[docs] def get_bz_k_points(self): """See :func:`qepy.driver.Driver.get_bz_k_points`""" return self.driver.get_bz_k_points()
def get_bz_to_ibz_map(self): return None
[docs] def get_number_of_spins(self): """See :func:`qepy.driver.Driver.get_number_of_spins`""" return self.driver.get_number_of_spins()
[docs] def get_spin_polarized(self): """See :func:`qepy.driver.Driver.get_spin_polarized`""" return self.driver.get_spin_polarized()
[docs] def get_ibz_k_points(self): """See :func:`qepy.driver.Driver.get_ibz_k_points`""" return self.driver.get_ibz_k_points()
[docs] def get_k_point_weights(self): """See :func:`qepy.driver.Driver.get_k_point_weights`""" return self.driver.get_k_point_weights()
[docs] def get_pseudo_density(self, spin=None, pad=True, gather = False): """See :func:`qepy.driver.Driver.get_pseudo_density`""" return self.driver.get_pseudo_density(spin=spin, pad=pad, gather=gather) / (units['Bohr'] ** 3)
# def get_effective_potential(self, spin=0, pad=True): # """See :func:`qepy.driver.Driver.get_effective_potential`""" # return self.driver.get_effective_potential(spin=spin, pad=pad)
[docs] def get_pseudo_wave_function(self, band=None, kpt=0, spin=0, broadcast=True, pad=True): """See :func:`qepy.driver.Driver.get_pseudo_wave_function`""" return self.driver.get_pseudo_wave_function(band=band, kpt=kpt, spin=spin, broadcast=broadcast, pad=pad)
[docs] def get_eigenvalues(self, kpt=0, spin=0): """See :func:`qepy.driver.Driver.get_eigenvalues`""" return self.driver.get_eigenvalues(kpt=kpt, spin=spin) * units['Ry']
[docs] def get_occupation_numbers(self, kpt=0, spin=0): """See :func:`qepy.driver.Driver.get_occupation_numbers`""" return self.driver.get_occupation_numbers(kpt=kpt, spin=spin)
[docs] def get_fermi_level(self): """See :func:`qepy.driver.Driver.get_fermi_level`""" return self.driver.get_fermi_level() * units['Ry']
# def initial_wannier(self, initialwannier, kpointgrid, fixedstates, # edf, spin, nbands): # """See :func:`qepy.driver.Driver.initial_wannier`""" # raise NotImplementedError # def get_wannier_localization_matrix(self, nbands, dirG, kpoint, # nextkpoint, G_I, spin): # """See :func:`qepy.driver.Driver.get_wannier_localization_matrix`""" # raise NotImplementedError
[docs] def get_magnetic_moment(self, atoms=None): """See :func:`qepy.driver.Driver.get_magnetic_moment`""" return self.driver.get_magnetic_moment(atoms=atoms)
[docs] def get_number_of_grid_points(self, gather = True): """See :func:`qepy.driver.Driver.get_number_of_grid_points`""" return self.driver.get_number_of_grid_points(gather=gather)
[docs] @staticmethod def get_atoms_from_qepy(): """Return the atom.Atoms from QE.""" atoms = Driver.get_ase_atoms() return atoms