"""\
==================
TrajectoryEnsemble
==================
The :class:`~ENPMDA.preprocessing.TrajectoryEnsemble` class both store
the information of simulations in the ensemble and preload it by
serialization. It can also apply on-the-fly transformations to the
trajectories and extract selected components (e.g. only protein)
to seperated files.
A ``TrajectoryEnsemble`` is created from files::
from ENPMDA.preprocessing import TrajectoryEnsemble
traj_ensemble = TrajectoryEnsemble(ensemble_name='ensemble',
topology_list=ensemble_top,
trajectory_list=ensemble_traj)
traj_ensemble.load_ensemble()
In order to add transformations e.g. wrap/unwrap, extra ``bonded_topology_list``
is required as input to provide bonded information. Note the speed of
on-the-fly transformations is really slow for large systems and I
recommend patching https://github.com/MDAnalysis/mdanalysis/pull/3169
to your MDAnalysis installation. Alternatively, you can do trajectory
preprocessing in advance (with e.g. ``gmx trjconv``) and use the output
trajectory while setting ``bonded_topology_list=None`` and ``only_raw=True``.
Classes
=======
.. autoclass:: TrajectoryEnsemble
:members:
"""
import os.path
import warnings
from datetime import datetime
import gc
import pickle
import os
import MDAnalysis as mda
import MDAnalysis.transformations as trans
import dask
import numpy as np
from typing import Optional
from ENPMDA.utils import GroupHug
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
[docs]class TrajectoryEnsemble(object):
r"""Class to store an ensemble of simulations.
It can also be used to apply transformations to
the trajectory and either return the raw trajectory or
the processed trajectory for protein and system.
Note
----
Each trajectory should be stored in its own folder.
Warning
-------
If `only_raw=False`, `protein.pdb`, `system.pdb`,
`protein.xtc`, and `system.xtc` files will be generated
in the same folder as the loading trajectory.
"""
[docs] def __init__(
self,
ensemble_name: str,
topology_list: list,
trajectory_list: list,
# bonded_topology_list is an optional list of str
bonded_topology_list: Optional[list] = None,
skip: int = 1,
timestamp: str = timestamp,
updating: bool = True,
only_raw: bool = False,
wrapping: bool = True,
protein_selection: str = "protein",
):
r"""
Parameters
----------
ensemble_name: str
The name of the ensemble. It will be used as the folder
to save the pickled Universes.
It can also be the absolute path to the folder.
topology_list: list
List of topology files, e.g. gro, pdb, etc.
trajectory_list: list
List of trajectory files, e.g. xtc, trr, etc.
bonded_topology_list: list, optional
List of tpr files. For providing extra bonded information.
skip: int, optional
The number of frame interval to skip.
This number only applies to the processed trajectory.
If you set `only_raw=True`, this skip number will be ignored.
timestamp: str, optional
The timestamp of creating the ensemble
It will be set to the current time if not provided.
updating: bool, optional
If True, the trajectory will be updated
even if the trajectory was processed before.
only_raw: bool, optional
If True, only the raw trajectory will be returned.
Otherwise, on-the-fly transformation will be applied
to trajectories and processed trajectories
for protein and system will be returned.
wrapping: bool, optional
If True, all the atoms will be wrapped inside the box.
protein_selection: str, optional
The selection string for `protein.pdb`. Default is "protein".
Can also be any selection string supported by MDAnalysis.
"""
if len(topology_list) != len(trajectory_list):
raise ValueError(
"topology_list and trajectory_list must have the same length."
)
self.ensemble_name = ensemble_name
self.topology_list = topology_list
self.trajectory_list = trajectory_list
self.bonded_topology_list = bonded_topology_list
self.skip = skip
self.timestamp = timestamp
self.updating = updating
self.only_raw = only_raw
self.wrapping = wrapping
self.protein_selection = protein_selection
if self.bonded_topology_list is None:
self.fix_chain = False
print(
"No bonded_topology_list provided. \n", "PBC and chain cannot be fixed."
)
else:
if len(bonded_topology_list) != len(trajectory_list):
raise ValueError(
"bonded_topology_list and trajectory_list must have the same length."
)
self.fix_chain = True
if not os.path.isabs(self.ensemble_name):
self.working_dir = os.getcwd() + "/"
else:
self.working_dir = ""
# store meta information
self.trajectory_dt = np.zeros(len(self.trajectory_list))
self.trajectory_time = np.zeros(len(self.trajectory_list))
self.trajectory_frame = np.zeros(len(self.trajectory_list))
os.makedirs(self.filename, exist_ok=True)
[docs] def load_ensemble(self):
r"""Load the ensemble of trajectories."""
if self.updating or not os.path.isfile(self.filename + "raw_traj.pickle"):
self._processing_ensemble()
else:
self.trajectory_files = pickle.load(
open(self.filename + "raw_traj.pickle", "rb")
)
if not self.only_raw:
if self.updating or not os.path.isfile(self.filename + "protein.pickle"):
self._processing_protein()
else:
self.protein_trajectory_files = pickle.load(
open(self.filename + "protein.pickle", "rb")
)
if self.updating or not os.path.isfile(self.filename + "system.pickle"):
self._processing_system()
else:
self.system_trajectory_files = pickle.load(
open(self.filename + "system.pickle", "rb")
)
else:
self.system_trajectory_files = None
self.protein_trajectory_files = None
# check if dt is the same
if not len(set(self.trajectory_dt)) <= 1:
warnings.warn("dt is not the same for all trajectories.", stacklevel=2)
def _processing_ensemble(self):
load_job_list = []
for ind, (topology, trajectory) in enumerate(
zip(self.topology_list, self.trajectory_list)
):
output_pdb = (
os.path.dirname(trajectory) + "/skip" + str(self.skip) + "/system.pdb"
)
if not os.path.isfile(output_pdb):
print(trajectory + " new")
load_job_list.append(
dask.delayed(self._preprocessing_raw_trajectory)(
topology, trajectory, ind, self.protein_selection
)
)
elif os.path.getmtime(trajectory) > os.path.getmtime(output_pdb):
print(trajectory + " modified.")
load_job_list.append(
dask.delayed(self._preprocessing_raw_trajectory)(
topology, trajectory, ind, self.protein_selection
)
)
else:
print(trajectory + " on hold.")
load_job_list.append(
dask.delayed(self._load_preprocessing_trajectory)(trajectory)
)
self.trajectory_files = dask.compute(load_job_list)[0]
print("dask finished")
with open(self.filename + "raw_traj.pickle", "wb") as f:
pickle.dump(self.trajectory_files, f)
print("pickle raw_traj universe done")
def _processing_protein(self):
load_job_list = []
for trajectory in self.trajectory_list:
traj_path = os.path.dirname(trajectory)
if os.path.isfile(traj_path + "/skip" + str(self.skip) + "/protein.xtc"):
load_job_list.append(dask.delayed(self._load_protein)(trajectory))
self.protein_trajectory_files = dask.compute(load_job_list)[0]
print("dask finished")
with open(self.filename + "protein.pickle", "wb") as f:
pickle.dump(self.protein_trajectory_files, f)
print("pickle traj protein universe done")
def _processing_system(self):
load_job_list = []
for trajectory in self.trajectory_list:
traj_path = os.path.dirname(trajectory)
os.makedirs(traj_path + "/skip" + str(self.skip), exist_ok=True)
if os.path.isfile(traj_path + "/skip" + str(self.skip) + "/system.xtc"):
load_job_list.append(dask.delayed(self._load_system)(trajectory))
self.system_trajectory_files = dask.compute(load_job_list)[0]
print("dask finished")
with open(self.filename + "system.pickle", "wb") as f:
pickle.dump(self.system_trajectory_files, f)
print("pickle traj system universe done")
def _preprocessing_raw_trajectory(self, topology, trajectory, ind,
protein_selection="protein"):
# print(trajectory)
traj_path = os.path.dirname(trajectory)
os.makedirs(traj_path + "/skip" + str(self.skip), exist_ok=True)
# to ignore most unnecessary warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
u = mda.Universe(topology, trajectory)
u_prot = u.select_atoms(protein_selection)
# only work in the presence of bonded information
if self.fix_chain:
u_bond = mda.Universe(self.bonded_topology_list[ind])
u.add_bonds(u_bond.bonds.to_indices())
prot_chain_list = []
# group all the protein chains
for chain in u_prot.segments:
prot_chain_list.append(chain.atoms)
prot_group = GroupHug(*prot_chain_list)
unwrap = trans.unwrap(u.atoms)
center_in_box = trans.center_in_box(u_prot)
rot_fit_trans = trans.fit_rot_trans(
u.select_atoms("name CA"), u.select_atoms("name CA")
)
if self.wrapping:
non_prot = u.select_atoms(f"not {protein_selection}")
wrap = trans.wrap(non_prot)
u.trajectory.add_transformations(
*[unwrap, prot_group, center_in_box, wrap, rot_fit_trans]
)
else:
u.trajectory.add_transformations(
*[unwrap, prot_group, center_in_box, rot_fit_trans]
)
if not self.only_raw:
with mda.Writer(
traj_path + "/skip" + str(self.skip) + "/protein.xtc",
u_prot.n_atoms,
) as W_prot, mda.Writer(
traj_path + "/skip" + str(self.skip) + "/system.xtc",
u.atoms.n_atoms,
) as W_sys:
for time, ts in enumerate(u.trajectory[:: self.skip]):
W_prot.write(u.select_atoms(protein_selection))
W_sys.write(u.atoms)
u_prot.write(
traj_path + "/skip" + str(self.skip) + "/protein.pdb", bonds=None
)
u.atoms.write(
traj_path + "/skip" + str(self.skip) + "/system.pdb", bonds=None
)
with open(
self.filename + "_".join(trajectory.split("/")) + ".pickle", "wb"
) as f:
pickle.dump(u, f)
if self.only_raw:
self.trajectory_dt[ind] = u.trajectory.dt
self.trajectory_frame[ind] = u.trajectory.n_frames
else:
self.trajectory_dt[ind] = u.trajectory.dt * self.skip
self.trajectory_frame[ind] = int(u.trajectory.n_frames // self.skip)
self.trajectory_time[ind] = u.trajectory.totaltime
# clean-up memory
del u
if self.fix_chain:
del u_bond
gc.collect()
return self.filename + "_".join(trajectory.split("/")) + ".pickle"
def _load_preprocessing_trajectory(self, trajectory):
return self.filename + "_".join(trajectory.split("/")) + ".pickle"
def _load_protein(self, trajectory):
traj_path = os.path.dirname(trajectory)
u = mda.Universe(
traj_path + "/skip" + str(self.skip) + "/protein.pdb",
traj_path + "/skip" + str(self.skip) + "/protein.xtc",
)
with open(
self.filename + "_".join(trajectory.split("/")) + "_prot.pickle", "wb"
) as f:
pickle.dump(u, f)
return self.filename + "_".join(trajectory.split("/")) + "_prot.pickle"
def _load_system(self, trajectory):
traj_path = os.path.dirname(trajectory)
u = mda.Universe(
traj_path + "/skip" + str(self.skip) + "/system.pdb",
traj_path + "/skip" + str(self.skip) + "/system.xtc",
)
with open(
self.filename + "_".join(trajectory.split("/")) + "_sys.pickle", "wb"
) as f:
pickle.dump(u, f)
return self.filename + "_".join(trajectory.split("/")) + "_sys.pickle"
@property
def filename(self):
"""
The saving location of all the pickled files.
"""
return (
os.path.abspath(self.working_dir + self.ensemble_name)
+ "/skip"
+ str(self.skip)
+ "/"
)