from __future__ import absolute_import, division, print_function
from os.path import dirname, join
import pandas as pd
from rdkit import RDConfig
from rdkit.Chem import FragmentCatalog
from rdkit.Chem import AllChem as Chem
from rdkit.Chem.Fingerprints import FingerprintMols
from rdkit import DataStructs
import pickle
import os
import statistics
import time
import random
import sys
__all__ = ["get_best", "molecular_similarity", "suppress_rdkit_sanity",
"generate_geneset", "load_data", "Chromosome",
"GeneSet", "Benchmark"]
"""
This GA uses RDKit to search molecular structure
"""
[docs]def get_best(get_fitness, optimalFitness, geneSet, display,
show_ion, target, parent_candidates):
"""
the primary public function of the engine
Parameters
----------
get_fitness : function
the fitness function. Usually based on a molecular property.
An example can be found in the salt_generator module
optimalFitness : float
0-1 the user specifies how close the engine should get to
the target (1 = exact)
geneSet : object
consists of atomtypes (by periodic number), rdkit molecular
fragments and custom fragments (that are currently hard
coded into the engine). These are the building blocks that
the engine can use to mutate the molecular candidate via the
_mutate() function
display : function
for printing results to the screen. Display is called for
every accepted mutation
show_ion : function
for printing results to the screen. show_ion is called when
a candidate has achieved the desired fitness score and is
returned by the engine
target : array, float, or int
the desired property value to be achieved by the engine.
If an array, a model containing multi-output targets must
be supplied to the engine
parent_candidates : array
an array of smiles strings that the engine uses to choose
a starting atomic configuration
Returns
----------
child : Chromosome object
the accepted molecular configuration. See Chromosome class
for details
"""
mutation_attempts = 0
attempts_since_last_adoption = 0
random.seed()
bestParent = _generate_parent(parent_candidates, get_fitness)
display(bestParent, "starting structure")
if bestParent.Fitness >= optimalFitness:
return bestParent
while True:
with suppress_rdkit_sanity():
child, mutation = _mutate(bestParent, geneSet, get_fitness, target)
mutation_attempts += 1
attempts_since_last_adoption += 1
if attempts_since_last_adoption > 1100:
child = _generate_parent(parent_candidates, get_fitness)
attempts_since_last_adoption = 0
print("starting from new parent")
elif bestParent.Fitness >= child.Fitness:
continue
display(child, mutation)
attempts_since_last_adoption = 0
if child.Fitness >= optimalFitness:
sim_score, sim_index = molecular_similarity(child,
parent_candidates)
molecular_relative = parent_candidates[sim_index]
show_ion(child.Genes, target, mutation_attempts, sim_score,
molecular_relative)
return child
bestParent = child
[docs]def molecular_similarity(best, parent_candidates, all=False):
"""
returns a similarity score (0-1) of best with the
closest molecular relative in parent_candidates
Parameters
----------
best : object
Chromosome object, the current
mutated candidate
parent_candidates : array
parent pool of molecules to compare with best.
These are represented by SMILES
all : boolean, optional, default = False
default behavior is false and the tanimoto
similarity score is returned. If True
tanimoto, dice, cosine, sokal, kulczynski,
and mcconnaughey similarities are returned
Returns
----------
similarity_score : float
similarity_index : int
if all=False the best tanimoto similarity score
as well as the index of the closest molecular
relative are returned
if all=True an array of best scores and indeces
of the closest molecular relative are returned
"""
scores = []
if all:
indices = []
metrics = [DataStructs.TanimotoSimilarity,
DataStructs.DiceSimilarity,
DataStructs.CosineSimilarity,
DataStructs.SokalSimilarity,
DataStructs.KulczynskiSimilarity,
DataStructs.McConnaugheySimilarity]
for j in range(len(metrics)):
scores_micro = []
for i in range(len(parent_candidates)):
ms = [best.Mol, Chem.MolFromSmiles(parent_candidates[i])]
fps = [FingerprintMols.FingerprintMol(x) for x in ms]
score = DataStructs.FingerprintSimilarity(fps[0], fps[1],
metric=metrics[j])
scores_micro.append(score)
scores.append(max(scores_micro))
indices.append(scores_micro.index(max(scores_micro)))
return scores, indices
else:
for i in range(len(parent_candidates)):
ms = [best.Mol, Chem.MolFromSmiles(parent_candidates[i])]
fps = [FingerprintMols.FingerprintMol(x) for x in ms]
score = DataStructs.FingerprintSimilarity(fps[0], fps[1])
scores.append(score)
return max(scores), scores.index(max(scores))
[docs]def load_data(data_file_name, pickleFile=False, simpleList=False):
"""
Loads data from module_path/data/data_file_name.
Parameters
----------
data_file_name : string
name of csv file to be loaded from module_path/data/
data_file_name.
pickleFile : boolean, optional, default = False
if True opens pickled file
simpleList : boolean, optional, default = False
if true will open the saved list and
properly handle split lines
Returns
-------
data : Pandas DataFrame
"""
module_path = dirname(__file__)
if pickleFile:
with open(join(module_path, 'data', data_file_name), 'rb') as \
pickle_file:
data = pickle.load(pickle_file, encoding='latin1')
elif simpleList:
with open(join(module_path, 'data', data_file_name)) as csv_file:
data = csv_file.read().splitlines()
else:
with open(join(module_path, 'data', data_file_name), 'rb') as csv_file:
data = pd.read_csv(csv_file, encoding='latin1')
return data
[docs]class suppress_rdkit_sanity(object):
"""
Context manager for doing a "deep suppression" of stdout and stderr
during certain calls to RDKit.
"""
def __init__(self):
# Open a pair of null files
self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)]
# Save the actual stdout (1) and stderr (2) file descriptors.
self.save_fds = [os.dup(1), os.dup(2)]
def __enter__(self):
# Assign the null pointers to stdout and stderr.
os.dup2(self.null_fds[0], 1)
os.dup2(self.null_fds[1], 2)
def __exit__(self, *_):
# Re-assign the real stdout/stderr back to (1) and (2)
os.dup2(self.save_fds[0], 1)
os.dup2(self.save_fds[1], 2)
# Close all file descriptors
for fd in self.null_fds + self.save_fds:
os.close(fd)
[docs]class Benchmark:
"""
benchmark method used by the unittests
"""
[docs] @staticmethod
def run(function):
timings = []
stdout = sys.stdout
for i in range(5):
sys.stdout = None
startTime = time.time()
function()
seconds = time.time() - startTime
sys.stdout = stdout
timings.append(seconds)
mean = statistics.mean(timings)
print("{} {:3.2f} {:3.2f}".format(
1 + i, mean,
statistics.stdev(timings, mean) if i > 1 else 0))
[docs]class GeneSet():
"""
Consists of atomtypes (by periodic number), rdkit molecular
fragments and custom fragments (that are currently hard
coded into the engine). These are the building blocks that
the engine can use to mutate the molecular candidate via the
_mutate() function
"""
def __init__(self, atoms, rdkitFrags, customFrags):
self.Atoms = atoms
self.RdkitFrags = rdkitFrags
self.CustomFrags = customFrags
[docs]class Chromosome(Chem.rdchem.Mol):
"""
The main object handled by the engine. The Chromosome object
inherits the RWMol and Mol attributes from rdkit. Two additional
attributes are added: genes and fitness. Genes is the SMILES
encoding of the molecule, fitness is the score (0-1) returned
by the fitness function
"""
def __init__(self, genes, fitness):
Chem.rdchem.Mol.__init__(self)
self.Genes = genes
self.Fitness = fitness
self.Mol = Chem.MolFromSmiles(genes)
self.RWMol = Chem.MolFromSmiles(genes)
self.RWMol = Chem.RWMol(Chem.MolFromSmiles(genes))
[docs]def generate_geneset():
"""
Populates the GeneSet class with atoms and fragments to be used
by the engine. As it stands these are hardcoded into the engine
but will probably be adapted in future versions
Parameters
----------
None
Returns
----------
GeneSet : object
returns an instance of the GeneSet class containing atoms,
rdkit fragments, and custom fragments
"""
atoms = [6, 7, 8, 9, 5, 15, 16, 17]
fName = os.path.join(RDConfig.RDDataDir, 'FunctionalGroups.txt')
rdkitFrags = FragmentCatalog.FragCatParams(1, 5, fName)
customFrags = FragmentCatalog.FragCatalog(rdkitFrags)
fcgen = FragmentCatalog.FragCatGenerator()
m = Chem.MolFromSmiles('CCCC')
fcgen.AddFragsFromMol(m, customFrags)
return GeneSet(atoms, rdkitFrags, customFrags)
def _generate_parent(parent_candidates, get_fitness):
df = parent_candidates
ohPickMe = random.sample(range(df.shape[0]), 1)
genes = df[ohPickMe[0]]
fitness, prediction = get_fitness(genes)
return Chromosome(genes, fitness)
def _mutate(parent, geneSet, get_fitness, target):
def replace_atom(childGenes, GeneSet, oldGene):
geneSet = GeneSet.Atoms
if childGenes.RWMol.GetAtomWithIdx(oldGene).IsInRing():
genes = Chem.MolToSmiles(parent.Mol)
return Chromosome(genes, 0)
newGene = random.sample(geneSet, 1)[0]
childGenes.RWMol.GetAtomWithIdx(oldGene).SetAtomicNum(newGene)
return childGenes
def add_atom(childGenes, GeneSet, oldGene):
geneSet = GeneSet.Atoms
newGeneNumber = childGenes.RWMol.GetNumAtoms()
newGene = random.sample(geneSet, 1)[0]
childGenes.RWMol.AddAtom(Chem.Atom(newGene))
childGenes.RWMol.AddBond(newGeneNumber, oldGene, Chem.BondType.SINGLE)
return childGenes
def remove_atom(childGenes, GeneSet, oldGene):
if childGenes.RWMol.GetAtomWithIdx(oldGene).GetExplicitValence() != 1:
genes = Chem.MolToSmiles(parent.Mol)
return Chromosome(genes, 0)
childGenes.RWMol.RemoveAtom(oldGene)
return childGenes
def add_custom_fragment(childGenes, GeneSet, oldGene):
geneSet = GeneSet.CustomFrags
newGene = Chromosome(geneSet.GetEntryDescription(
random.sample(range(geneSet.GetNumEntries()), 1)[0]), 0)
oldGene = oldGene + newGene.Mol.GetNumAtoms()
combined = Chem.EditableMol(Chem.CombineMols(newGene.Mol,
childGenes.Mol))
combined.AddBond(0, oldGene, order=Chem.rdchem.BondType.SINGLE)
childGenes = combined.GetMol()
try:
childGenes = Chromosome(Chem.MolToSmiles(childGenes), 0)
return childGenes
except BaseException:
return 0
def add_rdkit_fragment(childGenes, GeneSet, oldGene):
geneSet = GeneSet.RdkitFrags
try:
newGene = Chromosome(Chem.MolToSmiles(geneSet.GetFuncGroup(
random.sample(range(geneSet.GetNumFuncGroups()), 1)[0])), 0)
except BaseException:
return 0
oldGene = oldGene + newGene.Mol.GetNumAtoms()
combined = Chem.EditableMol(Chem.CombineMols(newGene.Mol,
childGenes.Mol))
combined.AddBond(1, oldGene, order=Chem.rdchem.BondType.SINGLE)
combined.RemoveAtom(0)
try:
childGenes = Chromosome(Chem.MolToSmiles(combined.GetMol()), 0)
return childGenes
except BaseException:
return 0
def remove_custom_fragment(childGenes, GeneSet, oldGene):
geneSet = GeneSet.CustomFrags
newGene = Chromosome(geneSet.GetEntryDescription(
random.sample(range(geneSet.GetNumEntries()), 1)[0]), 0)
try:
truncate = Chem.DeleteSubstructs(childGenes.Mol, newGene.Mol)
childGenes = truncate
childGenes = Chromosome(Chem.MolToSmiles(childGenes), 0)
return childGenes
except BaseException:
return 0
def remove_rdkit_fragment(childGenes, GeneSet, oldGene):
geneSet = GeneSet.RdkitFrags
try:
newGene = Chromosome(Chem.MolToSmiles(geneSet.GetFuncGroup(
random.sample(range(geneSet.GetNumFuncGroups()), 1)[0])), 0)
except BaseException:
return 0
try:
truncate = Chem.DeleteSubstructs(childGenes.Mol, newGene.Mol)
childGenes = truncate
childGenes = Chromosome(Chem.MolToSmiles(childGenes), 0)
return childGenes
except BaseException:
return 0
childGenes = Chromosome(parent.Genes, 0)
mutate_operations = [add_atom, remove_atom, remove_custom_fragment,
replace_atom, add_rdkit_fragment, add_custom_fragment,
remove_rdkit_fragment]
i = random.choice(range(len(mutate_operations)))
mutation = mutate_operations[i].__name__
try:
oldGene = random.sample(range(childGenes.RWMol.GetNumAtoms()), 1)[0]
except BaseException:
return Chromosome(parent.Genes, 0), mutation
childGenes = mutate_operations[i](childGenes, geneSet, oldGene)
try:
childGenes.RWMol.UpdatePropertyCache(strict=True)
Chem.SanitizeMol(childGenes.RWMol)
genes = Chem.MolToSmiles(childGenes.RWMol)
if "." in genes:
raise
fitness, prediction = get_fitness(genes)
return Chromosome(genes, fitness), mutation
except BaseException:
return Chromosome(parent.Genes, 0), mutation