Source code for ENPMDA.analysis.base

"""\
========
Analysis
========
The :class:`~ENPMDA.analysis.base.DaskChunkMdanalysis` class is
the base class to define multi-frame parallel analysis for
the MD trajectories. It functions as a wrapper of the
MDAnalysis analysis functions to map the analysis to the Dask dataframe.
This class takes care of loading the right universe and dumping the
results as a `npy` file to avoid huge memory footprint
and dask scheduler clogging. 

To define a new analysis,
:class:`~ENPMDA.analysis.base.DaskChunkMdanalysis`
needs to be subclassed.
``set_feature_info(self, universe)`` and ``run_analysis(self, universe, start, stop, step)`` need to be defined. ``set_feature_info`` should return
a list of feature name e.g. the name of each torsion angle. ``run_analysis`` should return a list of analysis results.

``name`` will be the feature name appending to the dataframe.
In default, only protein universe file will be used to run analysis.
It can be overridden by defining ``universe_file=system``::

    from ENPMDA.analysis import DaskChunkMdanalysis
    class NewAnalysis(DaskChunkMdanalysis):
        name = 'new_analysis'
        universe_file = 'protein'

        def set_feature_info(self, universe):
            return ['some_info']

        def run_analysis(self, universe, start, stop, step):
            result = []
            for ts in universe.trajectory[start:stop:step]:
                result.append(some_analysis(universe.atoms))
            return result

Classes
=======
.. autoclass:: DaskChunkMdanalysis
   :members:
"""
import os
import dask
import numpy as np
import uuid
import pickle
import itertools
import warnings
import dask.dataframe as dd


class AnalysisResult(dict):
    """
    This class is used internally to store the results of the analysis.
    """

    def __init__(self, dd_dataframe, dataframe, working_dir, timestamp):
        super().__init__()

        self.dd_dataframe = dd_dataframe
        self.dataframe = dataframe
        self.working_dir = working_dir

        self.timestamp = timestamp

        os.makedirs(self.filename, exist_ok=True)

    def compute(self, item=None):
        if item is None:
            for item, df in self.items():
                if isinstance(df, dd.core.DataFrame):
                    self[item] = df.compute()[["system", item]]
        elif item in self.keys():
            if isinstance(df, dd.core.DataFrame):
                self[item] = df.compute()[["system", item]]
        else:
            raise ValueError(item + " not in dict")

    def add_column_to_results(self, analysis_function, **kwargs):
        # sanity check and get feature info
        check_analysis_function = analysis_function(filename=self.filename, **kwargs)
        if check_analysis_function.universe_file == "protein":
            universe = pickle.load(open(self.dataframe.iloc[0].universe_protein, "rb"))
        else:
            universe = pickle.load(open(self.dataframe.iloc[0].universe_system, "rb"))

        feature_info = check_analysis_function.set_feature_info(universe)
        if check_analysis_function.output == "array":
            check_result = np.asarray(
                check_analysis_function.run_analysis(universe, 0, 2, 1)
            )
        else:
            check_result = np.asarray(
                check_analysis_function.run_analysis(universe, 0, 2, 1), dtype=object
            )
        if check_result.ndim > 2:
            warnings.warn(
                f"The result of the analysis function is not 2D."
                f"Make sure the shape of feature info ({len(feature_info)}) "
                f"does match the shape of analysis ({check_result.shape[1]}).",
                stacklevel=2,
            )
            check_result = check_result.reshape(
                check_result.shape[0], len(feature_info), -1
            )
        else:
            check_result = check_result.reshape(check_result.shape[0], -1)
            if check_result.shape[1] != len(feature_info):
                raise ValueError(
                    f"The shape of feature info ({len(feature_info)}) "
                    f"does not match the shape of analysis ({check_result.shape[1]})."
                )
        if check_result.shape[0] != 2:
            raise ValueError(
                "The len of the result"
                "does not match the number of the frame."
                "Hint: run_analysis should return a list"
                "with the shape as the trajectory frames."
            )
        np.save(check_analysis_function.feature_info_loc, feature_info)
        check_analysis_function._feature_info = feature_info

        item_name = analysis_function.name
        meta = dict(self.dd_dataframe.dtypes)
        meta[item_name] = "f8"

        # add analysis to dataframe
        kwargs.update(check_analysis_function.__dict__)
        self[item_name] = self.dd_dataframe.map_partitions(
            lambda df: df.assign(**{item_name: analysis_function(**kwargs)(df)}),
            meta=meta,
            token=item_name,
        ).persist()

    def append_to_dataframe(self, dataframe):
        for item in self.keys():
            dataframe[item] = self[item].iloc[:, -1]

    @property
    def filename(self):
        return self.working_dir + "/analysis_results/" + self.timestamp + "/"


[docs]class DaskChunkMdanalysis(object): """ This class is the base class for all analysis classes. The analysis results will be dumped as a `npy` file with unique uuid for each partition. """ name = "analysis" universe_file = "protein" output = "array" def __init__(self, filename, **kwargs): self._feature_info = [] self.filename = filename self.__dict__.update(kwargs) # get unique uuid for each partition self._partition = uuid.uuid4().hex def __call__(self, df): return self.run_df(df) def run_df(self, df): result = [] # run analysis for system, df_sys in df.groupby(["system"]): # if system information has to be used set `universe_file = # 'system'` if self.universe_file == "protein": universe = pickle.load(open(df_sys.universe_protein.iloc[0], "rb")) else: universe = pickle.load(open(df_sys.universe_system.iloc[0], "rb")) start = df_sys.frame.iloc[0] stop = df_sys.frame.iloc[-1] + 1 step = df_sys.stride.iloc[0] start, stop, step = universe.trajectory.check_slice_indices( start, stop, step ) result.append(self.run_analysis(universe, start, stop, step)) del universe result_ordered = list(itertools.chain.from_iterable(result)) # make sure the result is 2D with the shape of the trajectory frames and # the number of features if self.output == "array": result_ordered_arr = np.asarray(result_ordered) else: result_ordered_arr = np.empty(len(result_ordered), dtype=object) result_ordered_arr[...] = [ np.asarray(x, dtype=object).reshape(len(self.feature_info), -1) for x in result_ordered ] # result_ordered = result_ordered.reshape(result_ordered.shape[0], len(self.feature_info), -1) # store results and only return the location of the files np.save(self.location, result_ordered_arr) n_result = len(result_ordered_arr) del result_ordered_arr return n_result * [self.location] # not returning the actual array # return list(itertools.chain.from_iterable(result))
[docs] def set_feature_info(self, universe): """ This function is used to set the feature information. Shold return a list of features. """ raise NotImplementedError("Only for inheritance.")
[docs] def run_analysis(self, universe, start, stop, step): """ The function to be overwritten by the analysis class. """ raise NotImplementedError("Only for inheritance.")
[docs] @classmethod def test_on_universe(cls, universe, start=0, stop=2, step=1): """ This function is used to test the analysis function on a universe. """ return cls(filename='test').run_analysis(universe, start, stop, step)
@property def partition(self): return str(self._partition) @property def location(self): return self.filename + self.name + "_" + self.partition + ".npy" @property def feature_info_loc(self): return self.filename + self.name + "_feature_info.npy" @property def feature_info(self): return self._feature_info