Source code for qepy.driver

import numpy as np
import tempfile
from functools import wraps
import qepy
import qepy.qepy_modules # import the QE MPI first
from qepy.core import env, qepy_clean_saved, QEpyLibs
from qepy.io import QEInput

def gathered(function):
    @wraps(function)
    def wrapper(self, **kwargs):
        out = kwargs.get('out', None)
        gather = kwargs.get('gather', True)
        if out is None : out = self.create_array(gather=gather, kind='rho')
        if gather and self.nproc > 1 :
            arr = self.create_array(gather=False, kind='rho')
        else :
            arr = out
        #
        kwargs['out'] = arr
        results = function(self, **kwargs)
        #
        if gather and self.nproc > 1 :
            self.qepy_pw.qepy_mod.qepy_get_value(arr, out, gather = True)
            if isinstance(results, (tuple, list)):
                results = out, *results[1:]
            else :
                results = out
        return results
    return wrapper

[docs] class Driver(metaclass=QEpyLibs): """ The driver of QEpy. Parameters ---------- inputfile : str Name of QE input file comm : object Parallel communicator ldescf : bool If True, also print the SCF correction term for each SCF cycle iterative : bool Run SCF or real-time TDDFT one iteration at a time task : str Task to be performed by the driver : - 'scf' : Self consistent field - 'nscf' : initialization of Driver from previous SCF calculation - 'optical' : Optical absorption spectrum (real-time TDDFT with ce-tddft of Davide Ceresoli) - 'tddfpt_davidson' : TDDFPT with davidson algorithm embed : object (Fortran) embed object itialized in the Fortran side of QEpy needed to communicate with QE. prefix : str prefix of QE input/output outdir : str The output directory of QE logfile : str, file or bool The screen output of QE. If it is a file descriptor, it should open with 'w+'. - None : Show on the screen. - str : Save to the given file. - True : Save to the temporary file, see also :func:`qepy.driver.Driver.get_output`. outdir : str The output directory of QE qe_options : dict A dictionary with input parameters for QE to generate QE input file. See class QEInput prog : str The name of QE program, default is `pw` which is pw.x in QE. progress : bool If True, will continue run the QE without cleaning the workspace. Most times used for running TDDFT after scf. atoms : object An ase Atoms to generate the input file. needwf : bool If False, will not read wavefunctions and skip wavefunction-related initialization. kwargs : dict Other options .. note:: - The bool `gather` parameter in functions means the data is gathered on root or distributed on all cpus. - Units in QEpy are Bohr and Rydberg. These differ from ASE's units. Please, be careful! + positions : Bohr + lattice : Bohr + energy : Ry + forces : Ry/Bohr + stress : Ry/Bohr**3 + potential : Ry + density : 1.0/Bohr**3 """ POTNAMES = {'external' : 0, 'localpp' : 1, 'hartree' : 2, 'xc' : 4} FORCENAMES = {'ewald' : 1, 'localpp' : 2, 'nlcc' : 4} def __init__(self, inputfile = None, comm = None, ldescf = True, iterative = False, task = 'scf', embed = None, prefix = None, outdir = None, logfile = None, qe_options = None, prog = 'pw', progress = False, atoms = None, needwf = True, **kwargs): self.task = task self.embed = embed self.comm = comm self.inputfile = inputfile self.iterative = iterative self.prefix = prefix self.outdir = outdir self.logfile = logfile self.qe_options = qe_options self.prog = prog self.progress = progress self.atoms = atoms self.needwf = needwf # if embed is None: self.embed.ldescf = ldescf self.embed.iterative = iterative self.embed.tddft.iterative = iterative # self.comm = comm self.qepy = qepy self.qeinput = QEInput() # self.driver_initialize() def __getattr__(self, attr): return getattr(type(self), attr) # return _getattr_qepylibs(self, attr) def _init_log(self): """_initialize the QE output.""" self.fileobj_interact = False if self.logfile in [None, False]: self.fileobj = None else : # self.qepy_pw.qepy_mod.qepy_set_stdout(self.logfile) if self.logfile is True: self.fileobj_interact = True self.fileobj = tempfile.NamedTemporaryFile('w+') elif hasattr(self.logfile, 'write'): self.fileobj = self.logfile else : self.fileobj = open(self.logfile, 'w+') env['STDOUT'] = self.fileobj return self.fileobj @property def embed(self): if self._embed is None : self._embed = self.qepy_pw.qepy_common.embed_base() return self._embed @embed.setter def embed(self, value): self._embed = value @property def comm(self): return self._comm @comm.setter def comm(self, value): """Sets the MPI communicator with value given by mpi4py""" if value is True: try: from mpi4py import MPI value = MPI.COMM_WORLD except Exception: value = None self._comm = value if hasattr(self.comm, 'py2f') : self._commf = self.comm.py2f() else : self._commf = self.comm @property def commf(self): return self._commf @property def is_root(self): """Whether it is the root. If the comm is set, root is rank == 0. If comm not set, root is ionode of QE. """ if hasattr(self.comm, 'rank'): return self.comm.rank == 0 else : return self.qepy_modules.io_global.get_ionode() @property def nproc(self): if hasattr(self.comm, 'size'): return self.comm.size else : return self.qepy_modules.mp_world.get_nproc() @property def qe_is_mpi(self): return bool(self.qepy_pw.qepy_common.get_is_mpi()) @property def qe_is_openmp(self): return bool(self.qepy_pw.qepy_common.get_is_openmp())
[docs] def restart(self, prog=None, **kwargs): """Restart the driver losing all information about the previous driver""" prog = prog or self.prog self.driver_initialize()
[docs] def driver_initialize(self, **kwargs): """ Initialize the driver Parameters ---------- inputfile : str Name of QE input file commf : object mpi4py parallel communicator to be sent to Fortran task : str Task to be performed by the driver : - 'scf' : Self consistent field - 'nscf' : initialization of Driver from previous SCF calculation - 'optical' : Optical absorption spectrum (real-time TDDFT with ce-tddft of Davide Ceresoli) qe_options: dict A dictionary with input parameters for QE to generate QE input file. """ # stop the last driver and save new driver if not self.progress : if hasattr(env['DRIVER'], 'stop'): env['DRIVER'].stop() env['DRIVER'] = self # self._init_log() self.qepy_pw.qepy_common.set_embed(self.embed) # inputfile=self.inputfile commf=self.commf task=self.task qe_options = self.qe_options prog = self.prog # if qe_options : inputfile, basefile = 'input_tmp.in', inputfile self.qeinput.write_qe_input(inputfile, atoms = self.atoms, basefile=basefile, qe_options=qe_options, prog=prog) # if task == 'optical' : self.tddft_initialize(inputfile=inputfile, commf = commf, **kwargs) elif task.startswith('tddfpt_'): self.tddfpt_initialize(inputfile=inputfile, commf = commf, **kwargs) elif task == 'nscf' : inputobj = self.qepy_pw.qepy_common.input_base() if self.prefix : inputobj.prefix = self.prefix if self.outdir : inputobj.tmp_dir = str(self.outdir) + '/' if commf : inputobj.my_world_comm = commf self.qepy_pw.qepy_initial(inputobj) # tmpdir = inputobj.tmp_dir.decode().strip() + inputobj.prefix.decode().strip() + '.save' + '/' if self.needwf : self.qepy_pw.read_file() else : self.qepy_pw.read_file_new(False) if self.needwf : self.qepy_pw.qepy_mod.qepy_open_files() else : self.qepy_pw.qepy_pwscf(inputfile, commf) if self.embed.iterative : self.qepy_modules.control_flags.set_niter(1) # self.density = np.zeros((1, 1)) self.iter = 0
[docs] def tddft_initialize(self, inputfile = None, commf = None, **kwargs): """ Initialize the tddft Parameters ---------- inputfile : str Name of QE input file, which also contains `&inputtddft` section. commf : object mpi4py parallel communicator to be sent to Fortran """ if inputfile is None : inputfile = self.inputfile if commf is None : commf = self.commf # if self.progress : self.qepy_pw.wvfct.get_array_g2kin() self.qepy_cetddft.qepy_tddft_readin(inputfile) else : self.qepy_cetddft.qepy_tddft_main_initial(inputfile, commf) self.qepy_pw.read_file() self.qepy_cetddft.qepy_tddft_main_setup()
[docs] def tddfpt_initialize(self, inputfile = None, commf = None, **kwargs): """ Initialize the tddft Parameters ---------- inputfile : str Name of QE input file, which also contains `&inputtddft` section. commf : object mpi4py parallel communicator to be sent to Fortran """ if inputfile is None : inputfile = self.inputfile if commf is None : commf = self.commf # if self.progress : raise AttributeError("Not support 'progress' now") # self.qepy_pw.wvfct.get_array_g2kin() # self.qepy_cetddft.qepy_tddft_readin(inputfile) else : if self.task != 'tddfpt_davidson' : raise ValueError("Only support 'davidson' algorithm now.") # Fix the io problem if the job directly started after a PW calculation self.qepy_modules.control_flags.set_io_level(1) # self.qepy_tddfpt.qepy_lr_dav_main_initial(inputfile)
[docs] def diagonalize(self, print_level = 2, nscf = False, **kwargs): """Diagonalize the Hamiltonian Parameters ---------- print_level : The level of output of QE """ self.embed.lmovecell = False self.iter += 1 if self.task == 'optical' : self.qepy_cetddft.qepy_molecule_optical_absorption() elif self.task == 'tddfpt_davidson' : if self.qepy_tddfpt.lr_dav_variables.get_if_check_orth(): self.qepy_tddfpt.lr_dav_debug.check_orth() self.qepy_tddfpt.lr_dav_routines.one_dav_step() self.qepy_tddfpt.lr_dav_routines.dav_calc_residue() self.qepy_tddfpt.lr_dav_routines.dav_expan_basis() elif nscf : self.embed.task = 'nscf' self.qepy_pw.qepy_electrons_scf(print_level, 0) else : self.embed.mix_coef = -1.0 self.qepy_pw.qepy_electrons_scf(print_level, 0)
def propagate(self, **kwargs): return self.diagonalize(**kwargs)
[docs] def mix(self, mix_coef = 0.7, print_level = 2): """Mix the density with the QE density mixing Parameters ---------- print_level : The level of output of QE """ if self.task == 'optical' : return self.embed.mix_coef = mix_coef self.qepy_pw.qepy_electrons_scf(print_level, 0)
[docs] def check_convergence(self, **kwargs): """Check the convergence of the SCF""" if self.task == 'scf' : converged = bool(self.qepy_modules.control_flags.get_conv_elec()) if converged and not self.embed.initial : self.end_scf() elif self.task == 'tddfpt_davidson' : converged = bool(self.qepy_tddfpt.lr_dav_variables.get_dav_conv()) else: converged = False return converged
[docs] def get_scf_error(self, **kwargs): """Return the error of the SCF compared to previous cycle""" if self.embed.iterative : return self.embed.dnorm else : return self.qepy_modules.control_flags.get_scf_error()
[docs] def get_scf_steps(self, **kwargs): """Return the number of SCF steps""" if self.embed.iterative : return self.iter else : return self.qepy_modules.control_flags.get_n_scf_steps()
[docs] def scf(self, print_level = 2, maxiter = None, original = False, nscf = False, **kwargs): """Run the scf/tddft until converged or reached the maximum number of iterations""" if maxiter is not None and not self.embed.iterative : self.qepy_modules.control_flags.set_niter(maxiter) if self.task == 'optical' : self.qepy_cetddft.qepy_molecule_optical_absorption() elif self.task=='tddfpt_davidson' : self.tddfpt_davidson_scf() elif nscf : self.embed.task = 'nscf' self.qepy_pw.qepy_electrons_scf(print_level, 0) elif not self.embed.iterative and self.embed.exttype < 2 : # Use electrons to support hybrid xc functional return self.electrons(original=original) else : self.qepy_pw.qepy_electrons_scf(print_level, 0) return self.embed.etotal
def optical_absorption(self, **kwargs): return self.scf(**kwargs) def tddfpt_davidson_scf(self, **kwargs): max_iter = self.qepy_tddfpt.lr_dav_variables.get_max_iter() self.scf_conv_type = 'maxiter' for dav_iter in range(max_iter): # if self.qepy_tddfpt.lr_dav_variables.get_if_check_orth(): # self.qepy_tddfpt.lr_dav_debug.check_orth() # self.qepy_tddfpt.lr_dav_routines.one_dav_step() # self.qepy_tddfpt.lr_dav_routines.dav_calc_residue() # self.qepy_tddfpt.lr_dav_routines.dav_expan_basis() self.diagonalize(**kwargs) if self.qepy_tddfpt.lr_dav_variables.get_dav_conv(): self.scf_conv_type = 'conv' break if self.qepy_modules.check_stop.check_stop_now(): self.qepy_tddfpt.lr_dav_routines.lr_write_restart_dav() self.scf_conv_type = 'maxtime' break self.tddfpt_davidson_interpret(**kwargs) def tddfpt_davidson_interpret(self, **kwargs): self.qepy_tddfpt.lr_dav_routines.interpret_eign('END') lplot_drho = self.qepy_tddfpt.lr_dav_variables.get_lplot_drho() if lplot_drho: self.qepy_tddfpt.lr_dav_routines.plot_drho()
[docs] def non_scf(self, **kwargs): """Single "non-selfconsistent" calculation""" # fix some saved variables from last scf calculations self.qepy_modules.control_flags.set_lscf(0) self.qepy_modules.control_flags.set_lbfgs(0) self.qepy_modules.control_flags.set_lmd(0) self.qepy_modules.control_flags.set_lwf(0) # self.qepy_pw.non_scf() return self.qepy_pw.ener.get_etot()
[docs] def electrons(self, original = False, **kwargs): """Execute original QE routine "electrons" """ if original : self.qepy_pw.electrons() else : self.qepy_pw.qepy_electrons() return self.qepy_pw.ener.get_etot()
[docs] def end_scf(self, nscf = False, **kwargs): """Ends the SCF and cleans the SCF workspace. Only run when in iterative mode (i.e., one cycle at a time)""" if self.embed.iterative : if self.task == 'optical' : self.embed.tddft.finish = True self.qepy_cetddft.qepy_molecule_optical_absorption() else : self.embed.finish = True self.qepy_pw.qepy_electrons_scf(0, 0)
[docs] def stop(self, exit_status = 0, what = 'all', print_flag = 0, **kwargs): """Stop the driver. This must be done anytime a new driver is created. This method is invoked automatically if a running driver is detected. Only one driver can run at any given time. Parameters ---------- exit_status : int 0 : QE will remove it temporary files print_flag : int 0 : Not print time informations what : str see :func:`qepy.driver.Driver.save`. """ if self.task == 'optical' : self.tddft_stop(exit_status, print_flag = print_flag, what = what, **kwargs) elif self.task == 'tddfpt_davidson' : self.tddfpt_davidson_stop(exit_status, print_flag = print_flag, what = what, **kwargs) else : if not self.embed.initial : self.end_scf() self.qepy_pw.qepy_stop_run(exit_status, print_flag = print_flag, what = what, finalize = False) if hasattr(self.fileobj, 'close'): self.fileobj.close() qepy_clean_saved() # qepy_clean_saved(self.qepy_modules) # qepy_clean_saved(self.qepy_pw) # if self.task == 'optical' : # qepy_clean_saved(self.qepy_cetddft) # env['DRIVER'] = None env['STDOUT'] = None
[docs] def tddft_restart(self, istep=None, **kwargs): """Restarts the TDDFT from previous interrupted run. Parameters ---------- istep : int Start number of steps, just for output. """ self.qepy_cetddft.qepy_tddft_mod.qepy_cetddft_wfc2rho() if istep is not None : self.embed.tddft.istep = istep
[docs] def tddft_stop(self, exit_status = 0, what = 'no', print_flag = 0, **kwargs): """Stops TDDFT run""" if not self.embed.tddft.initial : self.end_scf() #! Do not save the PW files, otherwise the initial wfcs will be overwritten. self.qepy_pw.qepy_stop_run(exit_status, print_flag = print_flag, what = 'no', finalize = False) self.qepy_cetddft.qepy_stop_tddft(exit_status)
[docs] def tddfpt_davidson_stop(self, exit_status = 0, what = 'no', print_flag = 0, **kwargs): """Stops TDDFPT run""" self.qepy_tddfpt.qepy_lr_dav_main_finalise()
[docs] def save(self, what = 'all', **kwargs): """ Save the QE data to the disk in original QE files that can be accessed via post-processing or read with a QEpy driver Parameters ---------- what : str * what = 'all' : write xml data file, charge density, wavefunctions (for final data); * what = 'config' : write xml data file and charge density; also, for nks=1, wavefunctions in plain binary format (see why in comments below). For intermediate or incomplete results; * what = 'config-nowf' : write xml data file and charge density only; (save density); * what = 'config-init' : write xml data file only excluding final results (for dry run, can be called at early stages). see PW/src/punch.f90 """ self.qepy_pw.punch(what)
# self.qepy_pw.close_files(False) def update_run_options(self, qe_options = {}, **kwargs): pass
[docs] def get_energy(self, **kwargs): """Return the total energy. Nota bene: Only use for regular QE runs (i.e., not when doing embedding).""" if abs(self.qepy_pw.ener.get_etot()) > 1E-16 : energy = self.qepy_pw.ener.get_etot() elif abs(self.embed.etotal) > 1E-16 : energy = self.embed.etotal else : energy = self.calc_energy(**kwargs) return energy
[docs] def calc_energy(self, **kwargs): """Calculate the energy with PW2CASINO of QE. Use this only if you have a good reason to use it! For example embedding calculations.""" self.qepy_pw.qepy_calc_energies() return self.embed.etotal
[docs] def update_ions(self, positions = None, lattice = None, update = 0, **kwargs): """update the ions of QE. This is used for dynamic runs. Parameters ---------- positions : np.ndarray (n, 3) positions of ions lattice : 3x3 matrix lattice of ions update : int - 0 : update everything (default) - 1 : only update atomic information, not update potential """ positions = positions.T if lattice is not None : lattice = lattice.T if not self.qepy_pw.cellmd.get_lmovecell(): raise ValueError(" Lattice update only works for variable-cell simulations.\n Please restart the QEpy with calculation= 'vc-relax' or 'vc-md'") self.qepy_pw.qepy_mod.qepy_update_ions(positions, update, lattice) else : self.qepy_pw.qepy_mod.qepy_update_ions(positions, update)
[docs] def pwscf_restart(self, starting_pot='file', starting_wfc='file'): """Read PW ouput/restart files. Parameters ---------- """ self.qepy_pw.qepy_mod.qepy_restart_from_xml() if self.qepy_pw.basis.get_starting_pot().strip() != starting_pot : self.qepy_pw.basis.set_starting_pot(starting_pot) self.qepy_pw.potinit() if self.qepy_pw.basis.get_starting_wfc().strip() != starting_wfc : self.qepy_pw.basis.set_starting_wfc(starting_wfc) self.qepy_pw.wfcinit()
[docs] def create_array(self, gather = True, kind = 'rho'): """Returns an empty array in real space. Nota bene: this is for real-space arrays like the density (rho) and potentials.""" if kind == 'rho' : nspin = self.qepy_pw.lsda_mod.get_nspin() if gather and self.nproc > 1 : nr = self.get_number_of_grid_points(gather = gather) if self.embed.dfftp.mype == 0: out = np.zeros((np.prod(nr), nspin), order = 'F') else : out = np.zeros((1, nspin), order = 'F') else : nnr = self.embed.dfftp.nnr out = np.zeros((nnr, nspin), order = 'F') else : raise ValueError('Only support "rho"') return out
[docs] def get_density(self, gather = True, out = None): """Returns valence pseudodensity array in real space.""" if out is None : out = self.create_array(gather=gather, kind='rho') self.qepy_pw.qepy_mod.qepy_get_rho(out, gather = gather) return out
[docs] def get_core_density(self, gather = True, out = None): """Returns core density array in real space.""" if out is None : out = self.create_array(gather=gather, kind='rho') self.qepy_pw.qepy_mod.qepy_get_rho_core(out, gather = gather) return out
[docs] def get_kinetic_energy_density(self, gather = True, out = None): """Returns KS kinetic energy density array in real space.""" if out is None : out = self.create_array(gather=gather, kind='rho') self.qepy_pw.qepy_mod.qepy_get_tau(out, gather = gather) return out
[docs] def get_wave_function(self, band=None, kpt=0): """Returns wave-function array in real space.""" self.qepy_pw.qepy_mod.qepy_get_evc(kpt + 1) nrs = np.zeros(3, dtype = 'int32') self.qepy_pw.qepy_mod.qepy_get_grid_smooth(nrs) if self.is_root : wf = np.empty(np.prod(nrs), order = 'F', dtype = np.complex128) else : wf = np.empty(1, order = 'F', dtype = np.complex128) if band is None : band = np.arange(self.get_number_of_bands()) else : band = np.asarray(band) if band.ndim == 0 : band = [band] wfs = [] for ibnd in band : self.qepy_pw.qepy_mod.qepy_get_wf(kpt + 1, ibnd + 1, wf) wfs.append(wf.copy()) return wfs
[docs] def get_dipole_tddft(self): """Return the total dipole computed by a TDDFT run. Nota bene: only available during TDDFT runs.""" # dipole = self.qepy_cetddft.qepy_tddft_common.get_array_dipole().copy() dipole = self.embed.tddft.dipole return dipole
[docs] def set_density(self, density, gather = True, **kwargs): """Set density array in real space.""" # if density is None : density = np.zeros((1, 1)) if density.ndim != 2 : raise ValueError("The array should be 2-d.") # if gather and self.nproc > 1 : self.qepy_pw.qepy_mod._mp_bcast_group_real_2(density) self.qepy_pw.qepy_mod.qepy_set_rho(density, gather = gather)
[docs] def set_external_potential(self, potential, exttype = None, gather = True, extene = None, **kwargs): """Set an external potential in addition to the ones already included in the QEpy run according to the logic enumerated below. Parameters ---------- potential : (nnr, nspin) The external potential exttype : list or int The type of external potential. It can be a list of name or a integer. e.g. `exttype = ('localpp', 'xc')` or `exttype = 5`. `external` stands for an additional external potential. ==== ============================== === type potential bin ==== ============================== === 0 external 000 1 localpp 001 2 hartree 010 4 xc 100 ==== ============================== === """ if exttype is not None : if not isinstance(exttype, int): exttype = self.potname2type(exttype) self.embed.exttype = exttype if extene is not None : self.embed.extene = extene else : self.embed.extene = 0.0 # if potential is None : potential = np.zeros((1, 1)) if potential.ndim != 2 : raise ValueError("The array should be 2-d.") # if gather and self.nproc > 1 : self.qepy_pw.qepy_mod._mp_bcast_group_real_2(potential) self.qepy_pw.qepy_mod.qepy_set_extpot(potential, gather = gather)
[docs] def get_output(self): """Return the output of QE. It depends on the `logfile` of the initialization of the driver : - None : return None. - str : return all outputs. - True : It will return the output from last time. """ if self.fileobj is not None : if self.fileobj_interact : self.fileobj.flush() self.fileobj.seek(0) lines = self.fileobj.readlines() self.fileobj.close() # self._init_log() # return lines else : self.fileobj.seek(0) return self.fileobj.readlines() else : return None
[docs] @gathered def get_elf(self, gather = True, out = None, **kwargs): """Return electron localization function.""" self.qepy_pp.do_elf(out) return out
[docs] @gathered def get_rdg(self, gather = True, out = None, **kwargs): """Return reduced density gradient.""" self.qepy_pp.do_rdg(out) return out
#-----------------------------------------------------------------------
[docs] @gathered def get_local_pp(self, gather = True, out = None, **kwargs): """Return local component of the pseudopotential.""" for i in range(out.shape[1]): out[:, i] = self.qepy_pw.scf.get_array_vltot() return out
[docs] @gathered def get_hartree(self, gather = True, out = None, add=False, **kwargs): """Return information about Hartree energy: tuple (potential, energy, total charge) """ if not add : out[:] = 0.0 ehart, charge = self.qepy_pw.v_h(self.embed.rho.of_g[:,0], out) return out, ehart, charge
[docs] @gathered def get_exchange_correlation(self, gather = True, out = None, tau=None, **kwargs): """Return information about the exchange-correlation: LDA, GGA: tuple (potential, energy, v*rho) MGGA: tuple (potential, energy, v*rho, tau) TODO : The interface will changed in the new version of QE! Note : For metaGGA, only return scattered tau """ etxc = vtxc = 0.0 rho_obj = self.embed.rho rho_core = self.qepy_pw.scf.get_array_rho_core() rhog_core = self.qepy_pw.scf.get_array_rhog_core() is_meta = self.qepy_xclib.dft_setting_routines.xclib_dft_is('meta') if is_meta: if tau is None : tau = out*0.0 self.qepy_pw.v_xc_meta(rho_obj, rho_core, rhog_core, etxc, vtxc, out, tau) return out, etxc, vtxc, tau else : etxc, vtxc = self.qepy_pw.v_xc(rho_obj, rho_core, rhog_core, out) return out, etxc, vtxc
[docs] @gathered def get_density_functional(self, gather = True, out = None, add = False, **kwargs): """Return effective potential information. Note : Then final potential is saved in the v_obj """ rho_obj = self.embed.rho rho_core = self.qepy_pw.scf.get_array_rho_core() rhog_core = self.qepy_pw.scf.get_array_rhog_core() v_obj = self.embed.v etotefield = 0.0 # v_obj.of_r[:] = 0.0 ehart, etxc, vtxc, eth, charge = self.qepy_pw.qepy_v_of_rho(rho_obj, rho_core, rhog_core, etotefield, v_obj) info = [ehart, etxc, vtxc, eth, etotefield, charge] if add : out += v_obj.of_r else : out[:] = v_obj.of_r return out, info
def get_hartree_potential(self, gather = True, out = None, **kwargs): return self.get_hartree(gather=gather, out=out, **kwargs)[0] def get_exchange_correlation_potential(self, gather = True, out = None, **kwargs): return self.get_exchange_correlation(gather=gather, out=out, **kwargs)[0] def get_density_functional_potential(self, gather = True, out = None, **kwargs): return self.get_density_functional(gather=gather, out=out, **kwargs)[0] def get_effective_potential(self, gather = True, out = None, **kwargs): out = self.get_local_pp(gather=gather, out=out, **kwargs) out = self.get_density_functional_potential(gather=gather, out=out, add = True, **kwargs) return out #----------------------------------------------------------------------- #----------------------------------------------------------------------- #ASE Calculator
[docs] def get_potential_energy(self, **kwargs): """Return the potential energy.""" return self.get_energy()
[docs] def get_forces(self, icalc = 0, ignore = (), **kwargs): """Return the total forces. (n, 3) Parameters ---------- icalc : int ===== ============================= === icalc without bin ===== ============================= === 0 all 000 1 no ewald 001 2 no localpp 010 4 no nlcc 100 ===== ============================= === ignore : list ignore some forces, which does same job as `icalc`. - ewald (1) - localpp (2) - nlcc (4) """ if len(ignore) > 0 : icalc = self.forcename2type(ignore) self.qepy_pw.qepy_forces(icalc) forces = self.qepy_pw.force_mod.get_array_force().T return forces
[docs] @classmethod def get_stress(cls, **kwargs): """Return the stress (3, 3).""" stress = np.zeros((3, 3), order='F') cls.qepy_pw.stress(stress) return stress
#ASE DFTCalculator
[docs] @classmethod def get_number_of_bands(cls): """Return the number of bands.""" return cls.qepy_pw.wvfct.get_nbnd()
[docs] @classmethod def get_xc_functional(cls): """Return the XC-functional identifier. 'LDA', 'PBE', ...""" return cls.qepy_modules.funct.get_dft_short().decode("utf-8")
[docs] @classmethod def get_bz_k_points(cls): """Return all the k-points in the 1. Brillouin zone. The coordinates are relative to reciprocal latice vectors.""" return cls.qepy_pw.klist.get_array_xk()
[docs] @classmethod def get_number_of_spins(cls): """Return the number of spins in the calculation. Spin-paired calculations: 1, spin-polarized calculation: 2.""" return cls.qepy_pw.lsda_mod.get_nspin()
[docs] @classmethod def get_spin_polarized(cls): """Is it a spin-polarized calculation?""" return bool(cls.qepy_pw.lsda_mod.get_lsda())
[docs] @classmethod def get_ibz_k_points(cls): """Return k-points in the irreducible part of the Brillouin zone. The coordinates are relative to reciprocal latice vectors.""" xk = cls.qepy_pw.klist.get_array_xk()[:, :cls.get_number_of_k_points()].T return xk @ cls.qepy_modules.cell_base.get_array_at()
[docs] @classmethod def get_k_point_weights(cls): """Weights of the k-points. The sum of all weights is one.""" return cls.qepy_pw.klist.get_array_wk()[:cls.get_number_of_k_points()]
[docs] def get_pseudo_density(self, spin=None, pad=True, gather = True): """Return pseudo-density array. If *spin* is not given, then the total density is returned. Otherwise, the spin up or down density is returned (spin=0 or 1).""" density = self.get_density(gather = gather) if spin is None : return density.sum(axis=1) else : return density[:, spin]
[docs] @classmethod def get_pseudo_wave_function(cls, band=None, kpt=0, spin=0, broadcast=True, pad=True): """Return pseudo-wave-function array.""" cls.qepy_pw.qepy_mod.qepy_get_evc(kpt + 1) evc = cls.qepy_modules.wavefunctions.get_array_evc() if band is None : return evc else : return evc[:, band]
[docs] @classmethod def get_eigenvalues(cls, kpt=0, spin=0): """Return eigenvalue array.""" return cls.qepy_pw.wvfct.get_array_et().T[kpt]
[docs] @classmethod def get_occupation_numbers(cls, kpt=0, spin=0): """Return occupation number array.""" return cls.qepy_pw.wvfct.get_array_wg().T[kpt]
[docs] @classmethod def get_fermi_level(cls): """Return the Fermi level.""" return cls.qepy_pw.ener.get_ef()
# def initial_wannier(self, initialwannier, kpointgrid, fixedstates, # edf, spin, nbands): # """Initial guess for the shape of wannier functions. # Use initial guess for wannier orbitals to determine rotation # matrices U and C. # """ # raise NotImplementedError # def get_wannier_localization_matrix(self, nbands, dirG, kpoint, # nextkpoint, G_I, spin): # """Calculate integrals for maximally localized Wannier functions.""" # raise NotImplementedError
[docs] @classmethod def get_magnetic_moment(cls, **kwargs): """Return the total magnetic moment.""" return cls.qepy_pw.lsda_mod.get_magtot()
[docs] @classmethod def get_number_of_grid_points(cls, gather = True): """Return the shape of arrays.""" nr = np.zeros(3, dtype = 'int32') cls.qepy_pw.qepy_mod.qepy_get_grid(nr, gather) return nr
#ASE DFTCalculator END #-----------------------------------------------------------------------
[docs] @classmethod def get_number_of_k_points(cls): """Return the number of kpoints.""" return cls.qepy_pw.klist.get_nkstot()
[docs] @classmethod def get_volume(cls): """Return the volume.""" return cls.qepy_modules.cell_base.get_omega()
[docs] @classmethod def get_ecutrho(cls): """Return the cutoff for density.""" return cls.qepy_modules.gvect.get_ecutrho()
[docs] @classmethod def get_ions_lattice(cls): """Return the matrix of the lattice vectors in Bohrs.""" alat = cls.qepy_modules.cell_base.get_alat() lattice = cls.qepy_modules.cell_base.get_array_at() * alat return lattice.T
[docs] @classmethod def get_ions_positions(cls): """Return the cartesian positions of ions in Bohrs.""" alat = cls.qepy_modules.cell_base.get_alat() pos = cls.qepy_modules.ions_base.get_array_tau().T * alat return pos
[docs] @classmethod def get_ions_symbols(cls): """Return the symbols of ions.""" ityp = cls.qepy_modules.ions_base.get_array_ityp() - 1 nat = cls.qepy_modules.ions_base.get_nat() label = cls.qepy_modules.ions_base.get_array_atm().T.view('S3')[:,0].astype('U3') label = [x.strip() for x in label] symbols = [] for i in range(nat): symbols.append(label[ityp[i]]) return symbols
[docs] @classmethod def data2field(cls, data, cell = None, grid = None, rank = None, cplx=False): """QE data to DFTpy DirectField. If data is None or small temporary array will return np.zeros(1). """ from dftpy.field import DirectField from dftpy.grid import DirectGrid # if data is None or data.size < 8 : return np.zeros(1) # if cell is None : cell = cls.get_ions_lattice() if grid is None : grid = DirectGrid(lattice=cell, nr=cls.get_number_of_grid_points(), ecut=cls.get_ecutrho(), cplx=cplx) if not rank : rank = data.shape[1] if data.ndim == 2 else 1 # if data.size != grid.nnrR*rank : raise ValueError('The size of data not match the grid,\ please check the data or set another grid', data.size, grid.nnrR*rank) # field = DirectField(grid=grid, data=data.ravel(order='C'), order='F', rank=rank, cplx=cplx) return field
[docs] @classmethod def field2data(cls, field, data = None): """DFTpy DirectField to QE data. If the input field is not 3d array, will return np.zeros((1, 1)). """ # if field is None or field.ndim < 3 : return np.zeros((1, 1)) # ns = field.shape[0] if field.ndim == 4 else 1 if data is None : nnrR = np.prod(cls.get_number_of_grid_points()) data = np.empty((nnrR, ns)) # if data.size != field.size : raise ValueError('The size of field is different with data,\ please check the field or set another data', field.size, data.size) # if ns == 1 : field = [field] for i in range(ns): data[:, i] = field[i].ravel(order = 'F') return data
[docs] @classmethod def get_dftpy_grid(cls, nr = None, cell = None, mp = None, **kwargs): """Return the DFTpy DirectGrid from QE.""" from dftpy.grid import DirectGrid if cell is None : cell = cls.get_ions_lattice() if nr is None : nr = cls.get_number_of_grid_points() ecut = cls.get_ecutrho() else : ecut = None grid = DirectGrid(lattice=cell, nr=nr, ecut=ecut, mp=mp, **kwargs) return grid
[docs] @classmethod def get_ase_atoms(cls): """Return the atom.Atoms from QE to ASE (using ASE units).""" from ase.atoms import Atoms units_Bohr = cls.qepy_modules.constants.BOHR_RADIUS_SI * 1E10 # symbols = cls.get_ions_symbols() positions = cls.get_ions_positions() * units_Bohr lattice = cls.get_ions_lattice() * units_Bohr atoms = Atoms(symbols = symbols, positions = positions, cell = lattice) return atoms
[docs] @classmethod def get_dftpy_ions(cls): """Return the DFTpy.Ions from QE. (with DFTpy units)""" from dftpy.ions import Ions atoms = cls.get_ase_atoms() return Ions.from_ase(atoms)
[docs] @classmethod def switch_nlpp(cls, nhm=0, nbetam=0, nkb=0, nh=None, **kwargs): """Switch on/off the nonlocal part of the pseudopotential""" nhm_ = cls.qepy_upflib.uspp_param.get_nhm() nbetam_ = cls.qepy_upflib.uspp_param.get_nbetam() nh_ = cls.qepy_upflib.uspp_param.get_array_nh().copy() nkb_ = cls.qepy_upflib.uspp.get_nkb() if nh is None: nh = nh_ * 0 cls.qepy_upflib.uspp_param.set_nhm(nhm) cls.qepy_upflib.uspp_param.set_nbetam(nbetam) cls.qepy_upflib.uspp.set_nkb(nkb) cls.qepy_upflib.uspp_param.set_array_nh(nh) pp_options = { 'nhm' : nhm_, 'nbetam' : nbetam_, 'nh': nh_, 'nkb': nkb_, } return pp_options
[docs] @classmethod def update_exchange_correlation(cls, xc=None, libxc=None, **kwargs): """Switch XC on the fly""" if libxc : xc = None cls.qepy_pw.qepy_mod.qepy_set_dft(xc)
[docs] @classmethod def sum_band(cls, occupations = None, **kwargs): """Same as sum_band of QE with input occupations: Parameters ---------- occupations: np.ndarray (nbnd, nk) occupation numbers """ cls.qepy_pw.qepy_mod.qepy_sum_band(occupations)
@staticmethod def name2type(dictionary, name): if isinstance(name, int): value = [] for k, v in dictionary.items(): if name & v == v : value.append(k) else : if isinstance(name, str): name = [name] value = 0 for key in name : i = dictionary.get(key, None) if i is None : raise AttributeError(f"The key '{key}' not in the given dictionary.") value += i return value return value @classmethod def potname2type(cls, name): return cls.name2type(cls.POTNAMES, name) @classmethod def forcename2type(cls, name = None): return cls.name2type(cls.FORCENAMES, name)