Activated Learning

The activated learning recipe actively samples a potential energy surface. The workflow is controlled by several subworkflows, including the training, sampling, and labelling processes. The overall workflow is shown in the following diagram:

workflow

The workflow can be used as either an entrypoint or a subworkflow. Some parameters that set up the initial datasets and models are taken by the entrypoint only, see below for usage and tables of parameters.

Entrypoint

Convert initial dataset and geometry from a ASE trajectory

As a example, consider a trajectory file readable by ASE as input, the tips convert CLI tool can be used get the initial dataset and geometries:

# generating my_ds.{yml,tfr} 
tips convert -f asetraj data.traj -of pinn my_ds
# generating the initial geometry
tips convert -f asetraj data.traj --subsample uniform -of idx.xyz

To run the workflow as an entrypoint (single quotes are necessary):

nextflow run main.nf -entry acle --init_ds 'myds.{yml,tfr}' --init_geo '*.xyz' ...

It is possible to restart a project from a ceratin generation, while keeping the folder structure:

nextflow run main.nf -entry acle --restart_from 30 --restart_conv true

When restarted in the above way the init_* parameters will be ignored. This method is mainly for small changes of the sampled ensemble, e.g., an ad hook change of temperature. In cases where this is not enough, it is advisable to rerun the workflow under a different --proj.

Parameters

Parameter Description Default
init_geo inital geometries for sampling input/geo/*.xyz
init_model initial model or model parameters input/pinn/pinet-adam.yml
init_ds initial dataset input/ds/init-ds.{yml,tfr}
init_time sampling time scale in ps 1.0
init_steps training steps for initial model 100000
restart_from restart from a given generation false
restart_conv restart from a converged model (model will be retrained if false) false

Subworkflow

Input/Output Channels

Channel I/O[idx] Type Description
gen in[0] val generation of the model
geo in[1] file initial geometry
ds in[2] file training dataset
steps in[3] val training steps
time in[4] val sampling timescale
converge in[5] val whether the input model is deemed converged

The AcLe subworkflow is a recursive workflow, and the input and output shares the same data structure.

Parameters

Parameter Description Default
proj folder for storing results acle
ref reference calculation module dftb
ref_inp reference input file input/dftb/xtb.py
mpl machine learning potential module pinn
train_flags mlp training flags --log-every 10000 --ckpt-every 100000 --batch 1 --max-ckpts 1 --shuffle buffter 3000
train_init mlp training flags --init
max_gen maximal number of generations 40
min_time minimal timescale for sampling 1.0
max_time maximal timescale for sampling 1000.0
md_flags flags for md sampling, see ase module for details --ensemble nvt --dt 0.5 --log-every 100 --T 340
collect_flags collection flags for the data to label -f asetraj --subsample uniform --nsample 10 -of idx.xyz -o ds
sp_points number of single points per sampled trajectory 10
old_flag selection rule for the old dataset --nsample 2700
new_flag selection rule for the new dataset --nsample 300
sp_points number of single points for each sampled trajectory 50
emaxtol toleranace for max error error 0.020
ermsetol toleranace for energy RMSE 0.005
fmaxtol toleranace for max force (component) error 0.800
frmsetol toleranace for force (component) RMSE 0.200
retrain_step number of retrain steps per generation 100000
acc_fac factor to acceralate the sampling 2.0
brake_fac factor to slow down the sampling 1.0

Source Code: nextflow/acle.nf
#!/usr/bin/env nextflow

// The activated learning workflow  ======================================================
//
// The '--proj' parameter controls the output directory. See the parameters
// sections below for other parameters that can be tuned for the workflow.
//
//                                         written by  Yunqi Shao, first ver.: 2022.Aug.29
//                                                 adapted as PiNNAcLe recipe: 2023.Apr.24
//========================================================================================

nextflow.enable.dsl = 2
nextflow.preview.recursion = true

def logger (msg) {
  logfile = file("$params.publish/pinnacle.log")
  if (!logfile.getParent().exists()) {logfile.getParent().mkdirs()}
  logfile.append("$msg \n")
}

// entrypoint parameters ==================================================================
params.publish       = 'acle'
params.init_geo      = 'input/geo/*.xyz'
params.init_model    = 'input/pinn/pinet-adam.yml'
params.init_ds       = 'input/dataset/init-ds.{yml,tfr}'
params.init_time     = 0.5
params.init_steps    = 200000
params.ens_size      = 1
params.restart_from  = false
params.restart_conv  = false
//========================================================================================

// acle parameters =======================================================================
params.ref           = 'dftb' // reference (module name)
params.ref_inp       = 'input/dftb/xtb.py'
params.mpl           = 'pinn' // machine learning potential (module name)
params.train_flags   = '--log-every 10000 --ckpt-every 100000 --batch 1 --max-ckpts 1 --shuffle-buffer 3000'
params.train_init    = '--init'
params.exit_at_max_time = false
params.max_gen       = 40
params.min_time      = 0.5
params.max_time      = 1000.0
params.md_flags      = '--ensemble nvt --dt 0.5 --log-every 100 --T 340'
params.collect_flags = '-f asetraj --subsample uniform --nsample 10 -of idx.xyz -o ds'
params.sp_points     = 10
params.merge_flags   = '-f asetraj'
params.old_flag      = '--nsample 240'
params.new_flag      = '--psample 100'
params.frmsetol      = 0.150
params.ermsetol      = 0.005
params.fmaxtol       = 2.000
params.emaxtol       = 0.020
params.retrain_step  = 100000
params.acc_fac       = 4.0
params.brake_fac     = 1.0
//========================================================================================

// Imports (publish directories are set here) ============================================
include { convert} from './module/tips.nf' addParams(publish: "$params.publish/collect")
include { dsmix } from './module/tips.nf' addParams(publish: "$params.publish/dsmix")
include { merge } from './module/tips.nf' addParams(publish: "$params.publish/merge")
include { check } from './module/tips.nf' addParams(publish: "$params.publish/check")
include { train } from "./module/${params.mpl}.nf" addParams(publish: "$params.publish/models")
include { md } from "./module/${params.mpl}.nf" addParams(publish: "$params.publish/md")
include { sp } from "./module/${params.ref}.nf" addParams(publish: "$params.publish/label")
//========================================================================================

// Entry point
workflow entry {
  logger('Starting an AcLe Loop')
  init_ds = file(params.init_ds)
  init_geo = file(params.init_geo)
  params.geo_size = init_geo.size
  ens_size = params.ens_size.toInteger()
  logger("Initial dataset: ${init_ds.name};")
  logger("Initial geometries ($params.geo_size) in ${params.init_geo}")

  if (params.restart_from) {
    init_gen = params.restart_from.toString()
    init_models = file("${params.publish}/models/gen${init_gen}/*/model", type:'dir')
    init_geo = file("${params.publish}/check/gen${init_gen}/*/*.xyz")
    init_ds = file("${params.publish}/dsmix/${init_gen}/mix-ds.{yml,tfr}")
    logger("restarting from gen$init_gen ensemble of size $ens_size;")
    init_gen = (init_gen.toInteger()+1).toString()
  } else{
    init_gen = '0'
    init_models = file(params.init_model, type:'any')
    if (!(init_models instanceof Path)) {
      logger("restarting from an ensemble of size $ens_size;")
    } else {
      init_models = [init_models] * ens_size
      logger("starting from scratch with the input $init_models.name of size $ens_size;")
    }
  }
  assert ens_size == init_models.size : "ens_size ($ens_size) does not match input ($init_models.size)"

  steps = params.init_steps.toInteger()
  time = params.init_time.toFloat()
  converge = params.restart_conv.toBoolean()

  init_inp = [init_gen, init_geo, init_ds, init_models, steps, time, converge]
  ch_inp = Channel.value(init_inp)
  acle(ch_inp)
}

// Main Iteration and Loops ==============================================================
workflow acle {
  take:
    ch_init

  main:
  loop.recurse(ch_init)
    .until{ it[0].toInteger()>params.max_gen || (it[5]>=params.max_time.toFloat() && params.exit_at_max_time) }
}

// Loop for each iteration =================================================================
workflow loop {
  take: ch_inp

  main:
  // retrain or keep the model ============================================================
  ch_inp \
    | branch {gen, geo, ds, models, step, time, converge -> \
              keep: converge
                return [gen, models]
              retrain: !converge
                return [gen, models, ds, (1..params.ens_size).toList(), step]} \
    | set {ch_model}


  ch_model.retrain.transpose(by:[1,3]) \
    | map {gen, model, ds, seed, steps -> \
           ["gen$gen/model$seed", ds, model,
            params.train_flags+
            " --seed $seed --train-steps $steps"+
            (gen.toInteger()==1?" $params.train_init":'')]}\
    | train

  train.out.model \
    | map {name, model -> (name=~/gen(\d+)\/model(\d+)/)[0][1,2]+[model]} \
    | map {gen, seed, model -> [gen, model]} \
    | mix (ch_model.keep.transpose()) \
    | groupTuple(size:params.ens_size) \
    | set {nx_models}
  //=======================================================================================

  // sampling with ensable NN =============================================================
  ch_inp | map {[it[0], it[1], it[5]]} | transpose | set {ch_init_t} // init and time

  nx_models \
    | combine (ch_init_t, by:0)  \
    | map {gen, models, init, t -> \
           ["gen$gen/$init.baseName", models, init, params.md_flags+" --t $t"]} \
    | md
  md.out.traj.set {ch_trajs}
  //=======================================================================================

  // relabel with reference ===============================================================
  ref_inp = file(params.ref_inp)
  ch_trajs \
    | map {name, traj -> [name, traj, params.collect_flags]} \
    | convert \
    | flatMap {name, inps -> inps.collect {["$name/$it.baseName", it]}} \
    | map {name, geo -> [name, ref_inp, geo]} \
    | sp

  sp.out \
    | map {name, logs -> (name=~/(gen\d+\/.+)\/(\d+)/)[0][1,2]+[logs]} \
    | map {name, idx, logs -> [name, idx.toInteger(), logs]} \
    | groupTuple(size:params.sp_points) \
    | map {name, idx, logs -> [name, idx, logs, params.merge_flags]} \
    | merge \
    | set {ch_new_ds}
  //=======================================================================================

  // check convergence ====================================================================
  ch_new_ds \
    | join(ch_trajs) \
    | check \

  check.out \
    | map{name,geo,msg-> \
          [(name=~/gen(\d+)\/.+/)[0][1], geo, msg.contains('Converged')]} \
    | groupTuple(size:params.geo_size.toInteger()) \
    | map {gen, geo, conv -> [gen, geo, conv.every()]}
    | set {nx_geo_converge}

  //=======================================================================================

  // mix the new dataset ==================================================================
  ch_inp.map {[it[0], it[2]]}.set{ ch_old_ds }
  ch_new_ds \
    | map {name, idx, ds -> [(name=~/gen(\d+)\/.+/)[0][1], ds]} \
    | groupTuple(size:params.geo_size.toInteger()) \
    | join(ch_old_ds) \
    | map {it+[params.new_flag, params.old_flag]} \
    | dsmix \
    | set {nx_ds}
  //=======================================================================================

  // combine everything for new inputs ====================================================
  ch_inp.map{[it[0], it[4]]}.set {nx_step}
  ch_inp.map{[it[0], it[5]]}.set {nx_time}

  acc_fac = params.acc_fac.toFloat()
  brake_fac = params.brake_fac.toFloat()
  min_time = params.min_time.toFloat()
  max_time = params.max_time.toFloat()
  retrain_step = params.retrain_step.toInteger()

  nx_geo_converge | join(nx_models) | join(nx_ds) | join(nx_time) | join (nx_step) \
    | map {gen, geo, converge, models, ds, time, step -> \
           [(gen.toInteger()+1).toString(),
            geo, ds, models, \
            converge ? step : step+retrain_step, \
            converge ? Math.min(time*acc_fac, max_time) : Math.max(time*brake_fac, min_time), \
            converge]} \
    | set {nx_inp}
  //=======================================================================================

  ch_inp.subscribe {logger("[gen${it[0]}] ${it[-1]? 'not training': 'training'} the models.")}
  check.out.subscribe {name, geo, msg -> logger("[$name] ${msg.trim()}")}
  nx_inp.subscribe {logger('-'*80+'\n'+"[gen${it[0]}] next time scale ${it[5]} ps, ${it[6] ? 'no training planned' : 'next training step '+it[4]}.") } \

  emit:
  nx_inp
}
« Previous
Next »