Source code for ccs_fit.fitting.objective

# ------------------------------------------------------------------------------#
#  CCS: Curvature Constrained Splines                                          #
#  Copyright (C) 2019 - 2023  CCS developers group                             #
#                                                                              #
#  See the LICENSE file for terms of usage and distribution.                   #
# ------------------------------------------------------------------------------#


"""This module constructs and solves the spline objective."""


import logging
from tqdm import tqdm
import itertools
import json
import bisect
from collections import OrderedDict
import numpy as np
from cvxopt import matrix, solvers
from scipy.linalg import block_diag
from math import isnan

logger = logging.getLogger(__name__)


[docs]class Objective: """Objective function for the ccs method.""" def __init__( self, l_twb, l_one, sto, energy_ref, force_ref, gen_params, energy_ewald=[], force_ewald=[], ): """Generates Objective class object. Args: l_twb (list): list of Twobody class objects l_one (list): list of Onebody class objects sto (ndarray): array containing number of atoms of each type energy_ref (ndarray): reference energies ge_params (dict) : optionself.sto, s ewald (list, optional) : ewald energy values for CCS+Q """ self.l_twb = l_twb self.l_one = l_one self.sto = sto self.sto_full = self.sto # print(self.sto, self.sto_full) self.energy_ref = energy_ref self.force_ref_x = [x[0] for x in force_ref] self.force_ref_y = [y[1] for y in force_ref] self.force_ref_z = [z[2] for z in force_ref] self.force_ref = np.array( [*self.force_ref_x, *self.force_ref_y, *self.force_ref_z] ) self.ref = np.hstack((self.energy_ref, self.force_ref)) # If you want to change the weighting of the forces, you can do so here! [TJAMS] # WHY DO I NEED TO FLATTEN? self.ewald_energy = np.array(energy_ewald).reshape(-1, 1).flatten() self.force_ewald_x = [x[0] for x in force_ewald] self.force_ewald_y = [y[1] for y in force_ewald] self.force_ewald_z = [z[2] for z in force_ewald] self.force_ewald = np.array( [*self.force_ewald_x, *self.force_ewald_y, *self.force_ewald_z] ) # WHY DO I NEED TO FLATTEN? self.ewald = np.hstack((self.ewald_energy, self.force_ewald)).flatten() self.charge_scaling = 0.0 for kk, vv in gen_params.items(): setattr(self, kk, vv) self.cols_sto = self.sto.shape[1] self.np = len(l_twb) self.no = len(l_one) self.cparams = [self.l_twb[i].N for i in range(self.np)] self.ns = len(energy_ref) logger.debug( "The reference energy : \n %s \n Number of pairs:%s", self.energy_ref, self.np, )
[docs] def reduce_stoichiometry(self): reduce = True n_redundant = 0 while reduce: check = 0 for ci in range(np.shape(self.sto)[1]): if np.linalg.matrix_rank(self.sto[:, 0: ci + 1]) < (ci + 1): print(" There is linear dependence in stochiometry matrix!") print( " Removing onebody term: " + self.l_one[ci + n_redundant].name ) self.sto = np.delete(self.sto, ci, 1) self.l_one[ci + n_redundant].epsilon_supported = False check = 1 n_redundant += 1 break if check == 0: reduce = False assert self.sto.shape[1] == np.linalg.matrix_rank( self.sto ), "Linear dependence in stochiometry matrix" self.cols_sto = self.sto.shape[1]
[docs] def solution(self): """Function to solve the objective with constraints.""" # COMMENTS MERGING THE INTERVALS try: if self.merging == "True": self.merge_intervals() except: pass # Reduce stoichiometry self.reduce_stoichiometry() self.mm = self.get_m() logger.debug("\n Shape of M matrix is : %s", self.mm.shape) pp = matrix(np.transpose(self.mm).dot(self.mm)) eigvals = np.linalg.eigvals(pp) qq = -1 * matrix(np.transpose(self.mm).dot(self.ref)) nswitch_list = self.list_iterator() obj = [] sol_list = [] logger.info("positive definite:%s", np.all((eigvals > 0))) logger.info("Condition number:%f", np.linalg.cond(pp)) #Evaluting the fittnes mm_trimmed=self.mm mm_trimmed=np.delete(mm_trimmed,0,1) pp_trimmed=matrix(np.transpose(mm_trimmed).dot(mm_trimmed)) eigvals_trimmed = np.linalg.eigvals(pp_trimmed) # print(f" Condition number is: {np.linalg.cond(pp_trimmed)} ( {len(eigvals_trimmed)} {np.abs(max(eigvals_trimmed))} {np.abs(min(eigvals_trimmed))})") if self.do_unconstrained_fit == "True": self.unconstrained_fit() if self.do_ridge_regression == "True": self.ridge_regresssion() for n_switch_id in tqdm( nswitch_list, desc=" Finding optimum switch", colour="#800080" ): [gg, aa] = self.get_g(n_switch_id) hh = np.zeros(gg.shape[0]) bb = np.zeros(aa.shape[0]) sol = self.solver(pp, qq, matrix(gg), matrix(hh), matrix(aa), matrix(bb)) obj.append(float(self.eval_obj(sol["x"]))) obj = np.asarray(obj) mse = np.min(obj) opt_sol_index = int(np.ravel(np.argwhere(obj == mse)[0])) # logger.info( # "\n The best switch is : %s with mse: %s", * # nswitch_list[opt_sol_index], mse # ) best_switch_r = np.around([nswitch_list[opt_sol_index][elem]*self.l_twb[elem].res+self.l_twb[elem].Rmin for elem in range(self.np)], decimals=2) elem_pairs = [self.l_twb[elem].name for elem in range(self.np)] best_switch_dict = {} for i, elem_pair, in enumerate(elem_pairs): best_switch_dict[elem_pair] = best_switch_r[i] results_dict = {"rmse": mse**0.5, "best_switches": best_switch_dict} with open("rmse.json", "w") as outfile: json.dump(results_dict, outfile) print( f" The best switch is {nswitch_list[opt_sol_index][:]} with rmse: {mse**0.5}, corresponding to distances of {best_switch_r} Å for element pairs {elem_pairs[:]}.") # [{' '.join(['{:2f}'.format(best_switch_r[elem]) for elem in range(self.np)])}] [g_opt, aa] = self.get_g(nswitch_list[opt_sol_index]) bb = np.zeros(aa.shape[0]) opt_sol = self.solver(pp, qq, matrix( g_opt), matrix(hh), matrix(aa), matrix(bb)) xx = np.array(opt_sol["x"]) self.assign_parameter_values(xx) self.model_energies = np.ravel( self.mm[0: self.l_twb[0].Nconfs, :].dot(xx)) if self.l_twb[0].Nconfs_forces > 0: model_forces = np.ravel( self.mm[-3*self.l_twb[0].Nconfs_forces:, :].dot(xx) ) self.write_error_forces(model_forces, self.force_ref) self.write_error() # COMMENT: Unfold the spline to an equidistant grid try: if self.merging == "True": self.unfold_intervals() except: pass x_unfolded = [] for ii in range(self.np): self.l_twb[ii].get_spline_coeffs() self.l_twb[ii].get_expcoeffs() x_unfolded = np.hstack( (x_unfolded, np.array(self.l_twb[ii].curvatures).flatten()) ) for onb in self.l_one: if onb.epsilon_supported: x_unfolded = np.hstack((x_unfolded, np.array(onb.epsilon))) else: x_unfolded = np.hstack((x_unfolded, 0.0)) xx = x_unfolded self.write_CCS_params() return self.model_energies, mse, xx
[docs] def predict(self, xx): """Predict results. Args: xx (ndarrray): Solution array from training. Needs to be updated to handle merging and dissolving intervals """ self.sto = self.sto_full self.mm = self.get_m() try: self.model_energies = np.ravel( self.mm[0: self.l_twb[0].Nconfs, :].dot(xx)) error = self.model_energies - self.energy_ref mse = ((error) ** 2).mean() except: self.model_energies = [] error = [] mse = 0 self.write_error(fname="error_test.out") return self.model_energies, error
[docs] @ staticmethod def solver(pp, qq, gg, hh, aa, bb, maxiter=300, tol=(1e-10, 1e-10, 1e-10)): """The solver for the objective. Args: pp (matrix): P matrix as per standard Quadratic Programming(QP) notation. qq (matrix): q matrix as per standard QP notation. gg (matrix): G matrix as per standard QP notation. hh (matrix): h matrix as per standard QP notation aa (matrix): A matrix as per standard QP notation. bb (matrix): b matrix as per standard QP notation maxiter (int, optional): maximum iteration steps (default: 300). tol (tuple, optional): tolerance value of the solution (default: (1e-10, 1e-10, 1e-10)). Returns: sol (dict): dictionary containing solution details """ solvers.options["show_progress"] = False solvers.options["maxiters"] = maxiter solvers.options["feastol"] = tol[0] solvers.options["abstol"] = tol[1] solvers.options["reltol"] = tol[2] if aa: sol = solvers.qp(pp, qq, gg, hh, aa, bb) else: sol = solvers.qp(pp, qq, gg, hh) return sol
[docs] def merge_intervals(self): # Merge intervals for i in range(self.np): self.l_twb[i].merge_intervals() self.cparams = [self.l_twb[i].N for i in range(self.np)]
[docs] def unfold_intervals(self): for ii in range(self.np): self.l_twb[ii].dissolve_interval()
[docs] def eval_obj(self, xx): """Mean squared error function. Args: xx (ndarray): the solution for the objective Returns: float: mean square error """ return np.format_float_scientific( np.sum((self.ref - (np.ravel(self.mm.dot(xx)))) ** 2) / self.ns, precision=4 )
[docs] def assign_parameter_values(self, xx): # Onebodies counter = -1 if self.interface == "CCS+Q": counter = 0 self.charge_scaling = xx[-1] ** 0.5 for k in range(self.no): i = self.no - k - 1 if self.l_one[i].epsilon_supported: counter += 1 self.l_one[i].epsilon = float(xx[-1 - counter]) # Two-bodies ind = 0 for ii in range(self.np): self.l_twb[ii].curvatures = np.asarray( xx[ind: ind + self.cparams[ii]]) ind = ind + self.cparams[ii]
# Unfold the spline to an equdistant grid # self.l_twb[ii].dissolve_interval() # self.l_twb[ii].get_spline_coeffs() # self.l_twb[ii].get_expcoeffs()
[docs] def list_iterator(self): """Iterates over the self.np attribute.""" tmp = [] for elem in range(self.np): if self.l_twb[elem].Swtype == "rep": tmp.append([self.l_twb[elem].N]) if self.l_twb[elem].Swtype == "att": tmp.append([0]) if self.l_twb[elem].Swtype == "sw": if self.l_twb[elem].search_mode.lower() == "full": tmp.append(self.l_twb[elem].indices) elif self.l_twb[elem].search_mode.lower() == "range": range_center = self.l_twb[elem].range_center range_width = self.l_twb[elem].range_width Rmin = self.l_twb[elem].Rmin Rcut = self.l_twb[elem].Rcut res = self.l_twb[elem].res range_min = max(0, bisect.bisect_left(self.l_twb[elem].rn, (range_center - range_width/2))) range_max = min(self.l_twb[elem].N, bisect.bisect_left(self.l_twb[elem].rn, (range_center + range_width/2))) tmp.append(self.l_twb[elem].indices[range_min:range_max]) print(" Range search turned on for element pair {}; {} possible switch indices in range of {:.2f}-{:.2f} Å.".format(self.l_twb[elem].name, len(self.l_twb[elem].indices[range_min:range_max]), max(Rmin, int((range_center - range_width - Rmin)/res)*res + Rmin), min(Rcut, int((range_center + range_width - Rmin)/res)*res + Rmin))) elif self.l_twb[elem].search_mode.lower() == "point": search_indices = [bisect.bisect_left(self.l_twb[elem].rn, search_point) for search_point in self.l_twb[elem].search_points] search_indices = np.unique(search_indices).tolist() print(" Switch points located at {} to for element pair {} based on point search.".format('[' + ', '.join(["{:.2f}".format(self.l_twb[elem].rn[search_index]) for search_index in search_indices]) + '] Å', self.l_twb[elem].name)) tmp.append([self.l_twb[elem].indices[search_index] for search_index in search_indices]) else: raise SyntaxError("Error: search mode not recognized! Please use one of the following recognized options; [\"full\", \"range\", \"point\"]") n_list = list(itertools.product(*tmp)) return n_list
[docs] def get_m(self): """Returns the M matrix. Returns: ndarray: The M matrix. """ # Add energy data tmp = [] for ii in range(self.np): tmp.append(self.l_twb[ii].vv) vv = np.hstack([*tmp]) mm = np.hstack((vv, self.sto)) # Add force data tmp = [] for ii in range(self.np): tmp.append(self.l_twb[ii].fvv_x) fvv_x = np.hstack([*tmp]) fvv_x = np.hstack( (fvv_x, np.zeros( (self.l_twb[ii].Nconfs_forces, np.shape(self.sto)[1]))) ) ### This is where one could scale the forces by a scalar, do for x, y and z! Don't forget to also scale the target forces! [TJAMS] mm = np.vstack((mm, fvv_x)) tmp = [] for ii in range(self.np): tmp.append(self.l_twb[ii].fvv_y) fvv_y = np.hstack([*tmp]) fvv_y = np.hstack( (fvv_y, np.zeros( (self.l_twb[ii].Nconfs_forces, np.shape(self.sto)[1]))) ) mm = np.vstack((mm, fvv_y)) tmp = [] for ii in range(self.np): tmp.append(self.l_twb[ii].fvv_z) fvv_z = np.hstack([*tmp]) fvv_z = np.hstack( (fvv_z, np.zeros( (self.l_twb[ii].Nconfs_forces, np.shape(self.sto)[1]))) ) mm = np.vstack((mm, fvv_z)) if self.interface == "CCS+Q": # THIS IS A BIT AKWARD CAN IT BE FIXED? mm = np.hstack((mm, np.atleast_2d(self.ewald).T)) return mm
[docs] def get_g(self, n_switch): """Returns constraints matrix. Args: n_switch (int): switching point to change signs of curvatures. Returns: ndarray: returns G and A matrix """ aa = np.zeros(0) tmp = [] for elem in range(self.np): tmp.append(self.l_twb[elem].switch_const(n_switch[elem])) gg = block_diag(*tmp) gg = block_diag(gg, np.zeros_like(np.eye(self.cols_sto))) if self.interface == "CCS+Q": gg = block_diag(gg, -1) return gg, aa
[docs] def write_error(self, fname="CCS_error.out"): """Prints the errors in a file. Args: mdl_eng (ndarray): Energy prediction values from splines. ref_eng (ndarray): Reference energy values. mse (float): Mean square error. fname (str, optional): Output filename (default: 'error.out'). """ header = "{:<15}{:<15}{:<15}{:<15}".format( "Reference", "Predicted", "Error", "#atoms" ) error = abs(self.energy_ref - self.model_energies) maxerror = max(abs(error)) mse = ((error) ** 2).mean() Natoms = self.l_one[0].stomat for i in range(1, self.no): Natoms = Natoms + self.l_one[i].stomat footer = "MSE = {:2.5E}\nMaxerror = {:2.5E}".format(mse, maxerror) np.savetxt( fname, np.transpose( [self.energy_ref, self.model_energies, error, Natoms]), header=header, footer=footer, fmt="%-15.5f", )
# print(" Final root mean square error in energy: ", (np.square(error/Natoms)).mean() # ** 0.5, " (eV/atoms) [NOTE: Only elements specified in Onebody are considered in atom count!]")
[docs] def write_error_forces(self, mdl_for, ref_for, fname="CCS_error_forces.out"): """Prints the errors in a file. Args: mdl_eng (ndarray): Energy prediction values from splines. ref_eng (ndarray): Reference energy values. mse (float): Mean square error. fname (str, optional): Output filename (default: 'error.out'). """ header = "{:<15}{:<15}{:<15}".format("Reference", "Predicted", "Error") error = abs(ref_for - mdl_for) maxerror = max(abs(error)) mse = ((error) ** 2).mean() footer = "MSE = {:2.5E}\nMaxerror = {:2.5E}".format(mse, maxerror) np.savetxt(fname, np.transpose([ref_for, mdl_for, error]), header=header, footer=footer, fmt='%-15.5f')
[docs] def write_CCS_params(self,fname="CCS_params.json"): CCS_params = OrderedDict() CCS_params["Charge scaling factor"] = float(self.charge_scaling) eps_params = OrderedDict() for k in range(self.no): if self.l_one[k].epsilon_supported: eps_params[self.l_one[k].name] = self.l_one[k].epsilon CCS_params["One_body"] = eps_params two_bodies_dict = OrderedDict() for k in range(self.np): two_body_dict = OrderedDict() two_body_dict["r_min"] = self.l_twb[k].rn[0] two_body_dict["r_cut"] = self.l_twb[k].Rcut two_body_dict["dr"] = self.l_twb[k].res r_values = list(np.array(self.l_twb[k].rn)) two_body_dict["r"] = list(r_values) two_body_dict["exp_a"] = self.l_twb[k].expcoeffs[0] if not isnan(self.l_twb[k].expcoeffs[1]): two_body_dict["exp_b"] = self.l_twb[k].expcoeffs[1] else: # print("WARNING: THE EXPONENTIAL FOR PAIR {} IS POORLY RESOLVED, PROCEED WITH CAUTION!".format(self.l_twb[k].name)) two_body_dict["exp_b"] = 0 if not isnan(self.l_twb[k].expcoeffs[2]): two_body_dict["exp_c"] = self.l_twb[k].expcoeffs[2] else: two_body_dict["exp_c"] = 0 # if two_body_dict["exp_a"]<0: # print("STRONG WARNING: THE EXPONENTIAL WALL IS ACTUALLY ATTRACTIVE!!!!!!!") a_values = list(self.l_twb[k].splcoeffs[:, 0]) a_values.append(0) two_body_dict["spl_a"] = a_values b_values = list(self.l_twb[k].splcoeffs[:, 1]) # if b_values[0]>0: # print("WARNING: THE PAIR {} IS ONLY SAMPLED IN A REGION WHERE IT IS STILL REPULSIVE. THIS INDICATES THAT THE INTERACTION IS MOST LIKELY POORLY RESOLVED, PROCEED WITH CAUTION!".format(self.l_twb[k].name)) b_values.append(0) two_body_dict["spl_b"] = b_values c_values = list(self.l_twb[k].splcoeffs[:, 2]) c_values.append(0) two_body_dict["spl_c"] = c_values d_values = list(self.l_twb[k].splcoeffs[:, 3]) d_values.append(0) two_body_dict["spl_d"] = d_values two_bodies_dict[self.l_twb[k].name] = two_body_dict CCS_params["Two_body"] = two_bodies_dict with open(fname, "w") as f: json.dump(CCS_params, f, indent=8)
[docs] def gen_Buckingham(self): print("Getting to generate a Buckingham potential from the spline data!")
[docs] def unconstrained_fit(self): #Solving unconstrained problem xx=np.linalg.lstsq(self.mm,self.ref,rcond=None) xx=xx[0] print(" MSE of unconstrained problem is: ", ((self.mm.dot(xx) - self.ref)**2).mean() ) xx=xx.reshape(len(xx),1) self.assign_parameter_values(xx) self.model_energies = np.ravel( self.mm[0: self.l_twb[0].Nconfs, :].dot(xx)) self.write_error(fname="UNC_error.out") if self.l_twb[0].Nconfs_forces > 0: model_forces = np.ravel( self.mm[-3*self.l_twb[0].Nconfs_forces:, :].dot(xx) ) self.write_error_forces(model_forces, self.force_ref,fname="UNC_error_forces.out") try: if self.merging == "True": self.unfold_intervals() except: pass x_unfolded = [] for ii in range(self.np): self.l_twb[ii].get_spline_coeffs() self.l_twb[ii].get_expcoeffs() x_unfolded = np.hstack( (x_unfolded, np.array(self.l_twb[ii].curvatures).flatten()) ) for onb in self.l_one: if onb.epsilon_supported: x_unfolded = np.hstack((x_unfolded, np.array(onb.epsilon))) else: x_unfolded = np.hstack((x_unfolded, 0.0)) xx = x_unfolded self.write_CCS_params(fname="UNC_params.json") try: if self.merging == "True": self.merge_intervals() except: pass
[docs] def ridge_regresssion(self): #Solving ridge regression problem from sklearn import linear_model ridge=linear_model.Ridge(alpha=self.ridge_lambda,fit_intercept=False) ridge.fit(self.mm,self.ref) ridge_pred=ridge.predict(self.mm) print(" MSE from ridge regression: ", ((ridge_pred - self.ref)**2).mean(), "Regularization (alpha): ",self.ridge_lambda) xx=ridge.coef_ self.assign_parameter_values(xx) self.model_energies = np.ravel( self.mm[0: self.l_twb[0].Nconfs, :].dot(xx)) self.write_error(fname="RIDGE_error.out") if self.l_twb[0].Nconfs_forces > 0: model_forces = np.ravel( self.mm[-3*self.l_twb[0].Nconfs_forces:, :].dot(xx) ) self.write_error_forces(model_forces, self.force_ref,fname="RIDGE_error_forces.out") try: if self.merging == "True": self.unfold_intervals() except: pass x_unfolded = [] for ii in range(self.np): self.l_twb[ii].get_spline_coeffs() self.l_twb[ii].get_expcoeffs() x_unfolded = np.hstack( (x_unfolded, np.array(self.l_twb[ii].curvatures).flatten()) ) for onb in self.l_one: if onb.epsilon_supported: x_unfolded = np.hstack((x_unfolded, np.array(onb.epsilon))) else: x_unfolded = np.hstack((x_unfolded, 0.0)) xx = x_unfolded self.write_CCS_params(fname="RIDGE_params.json") try: if self.merging == "True": self.merge_intervals() except: pass