Source code for ENPMDA.ENPMDA

"""\
===========
MDDataFrame
===========
The :class:`~ENPMDA.MDDataFrame` class both store
the metadata of simulations in the ensemble and functions as
a dask dataframe to add, compute, and store analysis.

A ``MDDataFrame`` is created from files::

    from ENPMDA import MDDataFrame
    md_dataframe = MDDataFrame()
    md_dataframe.add_traj_ensemble(traj_ensemble, npartitions=16)


Classes
=======
.. autoclass:: MDDataFrame
   :members:
"""


from datetime import datetime
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import numpy as np
#import awkward as ak

import dask.dataframe as dd
import dask
import pandas as pd
import MDAnalysis as mda
import os
import pickle
import shutil
import gc
from tqdm import tqdm
from sklearn import preprocessing


from ENPMDA.analysis.base import AnalysisResult
from ENPMDA.preprocessing import TrajectoryEnsemble

timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
meta_data_list = [
    "universe_protein",
    "universe_system",
    "system",
    "traj_name",
    "frame",
    "traj_time",
    "stride",
]


[docs]class MDDataFrame(object): r""" Class to store the metadata and analysis results of the ensemble simulations. It uses pandas.DataFrame to store metadata and dask.DataFrame to distribute computation jobs so that the parallel analysis can be performed not only for one trajectory but also across simulations and analyses. """
[docs] def __init__( self, dataframe_name, meta_data_list=meta_data_list, timestamp=timestamp ): """ Parameters ---------- dataframe_name: str The name of the dataframe It will be used as the folder to save all the analysis results. It can also be the absolute path to the folder. meta_data_list: list, optional List of metadata in the dataframe. In default, the locations of pickled universes of protein and system, the system index, the trajectory filename, the frame index, the trajectory time, and the stride are stored. timestamp: str, optional The timestamp of creating the ensemble It will be set to the current time if not provided. """ self.dataframe_name = dataframe_name self.dataframe = pd.DataFrame(columns=meta_data_list) self.computed = False self.sorted = False # set working dir to absolute directory if not os.path.isabs(self.dataframe_name): self.working_dir = os.getcwd() + "/" else: self.working_dir = os.path.relpath(self.dataframe, os.getcwd()) self.timestamp = timestamp self.trajectory_ensemble = None self.analysis_list = []
[docs] def add_traj_ensemble( self, trajectory_ensemble: TrajectoryEnsemble, npartitions, stride=1 ): """ Parameters ---------- trajectory_ensemble: ENPMDA.TrajectoryEnsemble The trajectory ensemble to be added to the dataframe. npartitions: int The number of partitions to be used in the dask dataframe. stride: int, optional The stride to be used in the dask dataframe. It is used to skip frames in the trajectory. """ if self.trajectory_ensemble is not None: raise ValueError("Trajectory ensemble already added") self.trajectory_ensemble = trajectory_ensemble if trajectory_ensemble.protein_trajectory_files is None: warnings.warn( "The provided trajectory ensemble " "only contain raw trajectories " "all analysis will be performed on the raw trajectories", stacklevel=2, ) self.trajectory_files = trajectory_ensemble.trajectory_files self.protein_trajectory_files = trajectory_ensemble.trajectory_files self.system_trajectory_files = trajectory_ensemble.trajectory_files else: self.trajectory_files = trajectory_ensemble.trajectory_files self.protein_trajectory_files = trajectory_ensemble.protein_trajectory_files self.system_trajectory_files = trajectory_ensemble.system_trajectory_files self.npartitions = npartitions self.stride = stride meta_data_jobs = [] for ind, trajectory in enumerate(self.protein_trajectory_files): meta_data_jobs.append( dask.delayed(self._append_metadata)(trajectory, system=ind) ) meta_data = dask.compute(meta_data_jobs)[0] for i, trajectory in enumerate(self.protein_trajectory_files): self.dataframe = pd.concat( [ self.dataframe, pd.DataFrame(meta_data[i], columns=self.dataframe.columns), ], ignore_index=True, ) self.dataframe.frame = self.dataframe.frame.apply(int) self.dataframe.traj_time = self.dataframe.traj_time.apply(float) self._init_dd_dataframe()
def _append_metadata(self, universe, system): universe_system = self.system_trajectory_files[system] u = pickle.load(open(universe, "rb")) u_sys = pickle.load(open(universe_system, "rb")) if u.trajectory.n_frames != u_sys.trajectory.n_frames: raise ValueError( f"In system {system}, number of frames in protein and system trajectories are different!" ) rep_data = [] md_name = u.trajectory.filename timestep = u.trajectory.dt for i in range(0, u.trajectory.n_frames, self.stride): rep_data.append( [ universe, universe_system, system, md_name, i, i * timestep, self.stride, ] ) del u return rep_data def _init_dd_dataframe(self): self.dd_dataframe = dd.from_pandas(self.dataframe, npartitions=self.npartitions) print("Requested number of partitions: ", self.npartitions) if self.dd_dataframe.npartitions != self.npartitions: print("Actual {} partitions".format(self.dd_dataframe.npartitions)) self.npartitions = self.dd_dataframe.npartitions self.analysis_results = AnalysisResult( self.dd_dataframe, self.dataframe, working_dir=self.filename, timestamp=self.timestamp, )
[docs] def add_analysis(self, analysis, overwrite=False, **kwargs): """ Add an analysis to the dataframe. Parameters ---------- analysis: ENPMDA.analysis.base.DaskChunkMdanalysis The analysis to be added to the dataframe. overwrite: bool, optional Whether to overwrite the analysis if it is already in the dataframe. **kwargs: dict, optional Keyword arguments to be passed to the analysis. """ self.computed = False self.sorted = False if analysis.name in self.analysis_list and not overwrite: warnings.warn( f"Analysis {analysis.name} already added, add overwrite=True to overwrite", stacklevel=2, ) elif analysis.name in self.analysis_list and overwrite: warnings.warn(f"Analysis {analysis.name} overwrites!", stacklevel=2) self.analysis_list.remove(analysis.name) self.analysis_list.append(analysis.name) self.analysis_results.add_column_to_results(analysis, **kwargs) print(f"Analysis {analysis.name} overwritten") else: self.analysis_list.append(analysis.name) self.analysis_results.add_column_to_results(analysis, **kwargs) print(f"Analysis {analysis.name} added")
[docs] def compute(self): """ Compute the analysis results. It will be append the analysis results to the dataframe. """ if not self.computed: self.analysis_results.compute() self.analysis_results.append_to_dataframe(self.dataframe) self.computed = True # reinstantiate the dask dataframe self._init_dd_dataframe()
[docs] def get_feature_info(self, feature_name): """ Get the information about a feature. Parameters ---------- feature_name: str The name of the feature. """ feat_info = np.load( self.analysis_results.filename + feature_name + "_feature_info.npy" ) return feat_info
[docs] def get_feature(self, feature_list, extra_metadata=[], in_memory=True): """ Get the features from the dataframe. Parameters ---------- feature_list: list of str The list of features to be extracted. extra_metadata: list of str, optional The list of extra metadata to be extracted. in_memory: bool, optional Whether to load the features in memory. """ meta_data = ["system", "traj_name", "frame", "traj_time"] + extra_metadata if not self.computed: self.compute() if isinstance(feature_list, str): feature_list = [feature_list] for feature in feature_list: if feature not in self.analysis_list: raise ValueError(f"Feature {feature} not in analysis list") if in_memory: feature_dataframe = self.dataframe[meta_data].copy() for feature in tqdm(feature_list, desc="Loading features"): raw_data = np.concatenate( [ np.load(location, allow_pickle=True) for location, df in self.dataframe.groupby(feature, sort=False) ] ) feat_info = np.load( self.analysis_results.filename + feature + "_feature_info.npy" ) col_names = [feature + "_" + feat for feat in feat_info] if raw_data.ndim == 1 and len(feat_info) != 1: raw_data_con = [] for raw_data_single in raw_data: raw_data_con.append(list(raw_data_single)) raw_data_concat = pd.DataFrame(raw_data_con, columns=col_names) else: raw_data = raw_data.reshape(raw_data.shape[0], -1) raw_data_concat = pd.DataFrame(raw_data, columns=col_names) feature_dataframe = pd.concat( [feature_dataframe, raw_data_concat], axis=1 ) return feature_dataframe.reset_index(drop=True) else: if not self.sorted: self.sort_analysis_result() feature_dataframe = pd.DataFrame(columns=meta_data + feature_list) for ind, (system, df) in tqdm( enumerate(self.dataframe.groupby("system", sort=False)), desc="Loading features", total=len(self.dataframe.system.unique()), ): feature_dataframe = pd.concat( [ feature_dataframe, pd.DataFrame( [ [ system, df.traj_name.values[-1], df.frame.values[-1], df.traj_time.values[-1], ] + [df[feat].values[-1] for feat in feature_list] ], columns=feature_dataframe.columns, ), ] ) return feature_dataframe.reset_index(drop=True)
[docs] def save(self, name="dataframe", overwrite=False): """ Compute the analysis results and save the dataframe to a pickle file. Parameters ---------- name: str, optional The name of the pickle file. It will be saved in the working directory. overwrite: bool, optional Whether to overwrite the file if it exists. """ if not self.computed: self.compute() self.save_name = name if overwrite: self.dump(name) return if not os.path.exists(f"{self.filename}{name}.pickle"): self.dump(name) else: md_dataframe_old = pickle.load( open(f"{self.filename}{name}_md_dataframe.pickle", "rb") ) md_data_old = md_dataframe_old.dataframe if set(md_data_old.universe_protein) != set( self.dataframe.universe_protein ): print("Seeds changed") self.dump(name) if md_data_old.shape[0] != self.dataframe.shape[0]: print("Trajectory length changed") self.dump(name) elif set(md_data_old.columns) != set(self.dataframe.columns): print("# features changed") old_cols = md_data_old.columns new_cols = self.dataframe.columns print("New: " + np.setdiff1d(new_cols, old_cols)) old_extra_cols = np.setdiff1d(old_cols, new_cols) for old_extra_col in old_extra_cols: self.analysis_list.append(old_extra_col) shutil.copyfile( f"{md_dataframe_old.analysis_results.filename}{old_extra_col}_feature_info.npy", f"{self.analysis_results.filename}{old_extra_col}_feature_info.npy", ) extra_cols = np.setdiff1d(new_cols, old_cols) for extra_col in extra_cols: md_data_old[extra_col] = self.dataframe[extra_col] print("Common: " + np.intersect1d(new_cols, old_cols)) common_cols = np.intersect1d(new_cols, old_cols) for common_col in common_cols: md_data_old[common_col] = self.dataframe[common_col] self.dataframe = md_data_old self.dump(name, backup=True) else: print("No changes") self.dump(name)
def dump(self, filename, backup=False): if backup: try: shutil.copyfile( f"{self.filename}{filename}.pickle", f"{self.filename}{filename}_{self.timestamp}.pickle", ) except FileNotFoundError: pass try: shutil.copyfile( f"{self.filename}{filename}_md_dataframe.pickle", f"{self.filename}{filename}_md_dataframe_{self.timestamp}.pickle", ) except FileNotFoundError: pass with open(f"{self.filename}{filename}.pickle", "wb") as f: pickle.dump(self.dataframe, f) with open(f"{self.filename}{filename}_md_dataframe.pickle", "wb") as f: pickle.dump(self, f) def sort_analysis_result(self): if not self.computed: self.compute() if not self.sorted: for feature in self.analysis_list: if self.dataframe[feature][0].split("_")[-1] == "0.npy": print(f"{feature} already sorted") continue print(f"start to sort {feature}.") # builder = ak.ArrayBuilder() # for location, df in self.dataframe.groupby(feature, sort=False): # builder.append(np.load(location, allow_pickle=True)) old_locations = [ location for location, df in self.dataframe.groupby(feature, sort=False) ] raw_data = np.concatenate( [ np.load(location, allow_pickle=True) for location in old_locations ], axis=0, ) reordered_feat_loc = [] for sys, df in self.dataframe.groupby(["system"]): sys_data = raw_data[df.index[0] : df.index[-1] + 1] np.save( f"{self.analysis_results.filename}{feature}_{sys}.npy", sys_data ) reordered_feat_loc.append( [f"{self.analysis_results.filename}{feature}_{sys}.npy"] * len(df) ) self.dataframe[feature] = np.concatenate(reordered_feat_loc) print(f"{feature} sorted.") del raw_data gc.collect() _ = [os.remove(location) for location in old_locations] self.sorted = True # update the analysis results self._init_dd_dataframe() if hasattr(self, "save_name"): print(f"Saving sorted results to {self.save_name}") self.save(self.save_name, overwrite=True) else: print("Already sorted") def add_analysis_result_from_data(self, data, feature_name, feature_info): if data.shape[0] != self.dataframe.shape[0]: print( f"Data shape {data.shape[0]} does not match the dataframe shape {self.dataframe.shape[0]}." ) return if feature_name in self.analysis_list: print(f"{feature_name} already exists.") return feat_locs = [] for sys, df in tqdm( self.dataframe.groupby(["system"]), total=self.dataframe.system.nunique() ): sys_data = data[df.index[0] : df.index[-1] + 1] np.save( f"{self.analysis_results.filename}{feature_name}_{sys}.npy", sys_data ) feat_locs.append( [f"{self.analysis_results.filename}{feature_name}_{sys}.npy"] * len(df) ) self.dataframe[f"{feature_name}"] = np.concatenate(feat_locs) self.analysis_list.append(f"{feature_name}") np.save( f"{self.analysis_results.filename}{feature_name}_feature_info.npy", feature_info, ) if hasattr(self, "save_name"): self.save(self.save_name, overwrite=True) def transform_to_logistic(self, feature_name, logistic): raw_data = np.concatenate( [ np.load(location, allow_pickle=True) for location, df in self.dataframe.groupby(feature_name, sort=False) ] ) # if raw_data.shape[1] == 1: # raw_data = np.hstack(raw_data).T scaler = preprocessing.MinMaxScaler(feature_range=(-logistic, logistic)) scaled_data = scaler.fit_transform(raw_data) log_data = 1 / (1 + np.exp(-scaled_data)) feat_locs = [] for sys, df in tqdm( self.dataframe.groupby(["system"]), total=self.dataframe.system.nunique() ): sys_data = log_data[df.index[0] : df.index[-1] + 1] np.save( f"{self.analysis_results.filename}{feature_name}_log{logistic}_{sys}.npy", sys_data, ) feat_locs.append( [ f"{self.analysis_results.filename}{feature_name}_log{logistic}_{sys}.npy" ] * len(df) ) self.dataframe[f"{feature_name}_log{logistic}"] = np.concatenate(feat_locs) self.analysis_list.append(f"{feature_name}_log{logistic}") # TODO rename features shutil.copyfile( f"{self.analysis_results.filename}{feature_name}_feature_info.npy", f"{self.analysis_results.filename}{feature_name}_log{logistic}_feature_info.npy", ) print("Finish transforming to logistic.") del raw_data gc.collect() if hasattr(self, "save_name"): self.save(self.save_name, overwrite=True) def transform_to_logistic_with_minmax( self, feature_name, logistic, min_arr, max_arr ): raw_data = np.concatenate( [ np.load(location, allow_pickle=True) for location, df in self.dataframe.groupby(feature_name, sort=False) ] ) scaled_data = (raw_data - min_arr) / (max_arr - min_arr) scaled_data = scaled_data * (2 * logistic) - logistic log_data = 1 / (1 + np.exp(-scaled_data)) feat_locs = [] for sys, df in tqdm( self.dataframe.groupby(["system"]), total=self.dataframe.system.nunique() ): sys_data = log_data[df.index[0] : df.index[-1] + 1] np.save( f"{self.analysis_results.filename}{feature_name}_logminmax{logistic}_{sys}.npy", sys_data, ) feat_locs.append( [ f"{self.analysis_results.filename}{feature_name}_logminmax{logistic}_{sys}.npy" ] * len(df) ) self.dataframe[f"{feature_name}_logminmax{logistic}"] = np.concatenate( feat_locs ) self.analysis_list.append(f"{feature_name}_logminmax{logistic}") # TODO rename features shutil.copyfile( f"{self.analysis_results.filename}{feature_name}_feature_info.npy", f"{self.analysis_results.filename}{feature_name}_logminmax{logistic}_feature_info.npy", ) print("Finish transforming to logistic.") del raw_data gc.collect() if hasattr(self, "save_name"): self.save(self.save_name, overwrite=True) def transform_to_reciprocal(self, feature_name): raw_data = np.concatenate( [ np.load(location, allow_pickle=True) for location, df in self.dataframe.groupby(feature_name, sort=False) ] ) # if raw_data.shape[1] == 1: # raw_data = np.hstack(raw_data).T _ = np.reciprocal( raw_data.astype(np.float64), out=raw_data, where=raw_data != 0 ) feat_locs = [] for sys, df in tqdm( self.dataframe.groupby(["system"]), total=self.dataframe.system.nunique() ): sys_data = raw_data[df.index[0] : df.index[-1] + 1] np.save( f"{self.analysis_results.filename}{feature_name}_reciprocal_{sys}.npy", sys_data, ) feat_locs.append( [f"{self.analysis_results.filename}{feature_name}_reciprocal_{sys}.npy"] * len(df) ) self.dataframe[f"{feature_name}_reciprocal"] = np.concatenate(feat_locs) self.analysis_list.append(f"{feature_name}_reciprocal") # TODO rename features shutil.copyfile( f"{self.analysis_results.filename}{feature_name}_feature_info.npy", f"{self.analysis_results.filename}{feature_name}_reciprocal_feature_info.npy", ) print("Finish transforming to reciprocal.") del raw_data gc.collect() if hasattr(self, "save_name"): self.save(self.save_name, overwrite=True)
[docs] @classmethod def load_dataframe(cls, filename) -> "MDDataFrame": """ Load the dataframe from a pickle file. Parameters ---------- filename: str, optional The name of the pickle file. """ if os.path.isfile(f"{filename}.pickle"): print(f"Loading {filename}.pickle") with open(f"{filename}.pickle", "rb") as f: md_data = pickle.load(f) else: print(f"Loading {filename}/{filename}_md_dataframe.pickle") with open(f"{filename}/{filename}_md_dataframe.pickle", "rb") as f: md_data = pickle.load(f) return md_data
@property def filename(self): """ The saving location of all the pickled files. """ return os.path.abspath(self.working_dir + self.dataframe_name) + "/"