Source code for mdance.inputs.preprocess

import glob
from itertools import chain
import re

import MDAnalysis as mda
import numpy as np


[docs]def gen_traj_numpy(prmtopFileName, trajFileName, atomSel, verbose=True): """Reads in a trajectory and returns a 2D numpy array of the coordinates of the selected atoms. Parameters ---------- prmtopFileName : str The file path of the topology file. trajFileName : str The file path of the trajectory file. atomSel : str The atom selection string. For example, ``resid 3:12 and name N H CA C O``. View details in the `MDAnalysis documentation`_. Returns ------- traj_numpy : np.ndarray The 2D numpy array of shape (n_frames, n_atoms*3) containing the coordinates of the selected atoms. Examples -------- >>> from mdance.inputs.preprocess import gen_traj_numpy >>> traj_numpy = gen_traj_numpy('aligned_tau.pdb', 'aligned_tau.dcd', 'resid 3:12 and name N CA C') .. _MDAnalysis documentation: https://docs.mdanalysis.org/stable/documentation_pages/selections.html """ coord = mda.Universe(prmtopFileName,trajFileName) atomSel = coord.select_atoms(atomSel) if verbose: print('Number of atoms in trajectory:', coord.atoms.n_atoms) print('Number of frames in trajectory:', coord.trajectory.n_frames) print('Number of atoms in selection:', atomSel.n_atoms) # Create traj data of the atom selection traj_numpy = np.empty((coord.trajectory.n_frames,atomSel.n_atoms, 3), dtype=float) # Loop every frame and store the coordinates of the atom selection for ts in coord.trajectory: traj_numpy[ts.frame,:] = atomSel.positions # Flatten 3D array to 2D array traj_numpy = traj_numpy.reshape(traj_numpy.shape[0],-1) return traj_numpy
class Normalizer: """A class for normalizing data from cpptraj CRD/MDCRD files. Parameters ---------- file_path : str, optional The file path of the input data. If provided, the data is read from the file. Defaults to None. data : array_like of shape (n_samples, n_features), optional The input data as a numpy array. If provided, the file_path argument is ignored. Defaults to None. custom_min : {float, None}, optional The minimum value to use for normalization. If not provided, the minimum value of the input data is used. Defaults to None. custom_max : {float, None}, optional The maximum value to use for normalization. If not provided, the maximum value of the input data is used. Defaults to None. custom_avg : {float, None}, optional The average value to use for normalization. If not provided, the average value of the input data is used. Defaults to None. Attributes ---------- file_path : str, optional The file path of the input data. If provided, the data is read from the file. Defaults to None. data : array_like of shape (n_samples, n_features), optional The input data as a numpy array. If provided, the file_path argument is ignored. Defaults to None. custom_min : float or None, optional The minimum value to use for normalization. If not provided, the minimum value of the input data is used. Defaults to None. custom_max : float or None, optional The maximum value to use for normalization. If not provided, the maximum value of the input data is used. Defaults to None. normed_data : np.ndarray The normalized input data as a numpy array. c_total : np.ndarray The sum of columns of the normalized input data. min : float The minimum value of the input data. max : float The maximum value of the input data. Notes ----- Used for non-Molecular Dynamics data. Please use ``gen_traj_numpy`` for all Molecular Dynamics data. """ def __init__(self, file_path=None, data=None, custom_min=None, custom_max=None, custom_avg=None): if file_path: self.file_path = file_path self.data = np.genfromtxt(self.file_path) elif data is not None: self.data = data if custom_min and custom_max: self.min = custom_min self.max = custom_max else: self.min = np.min(self.data) self.max = np.max(self.data) self.v3_norm = (self.data - self.min) / (self.max - self.min) if custom_avg is not None: self.avg = custom_avg else: self.avg = np.mean(self.v3_norm, axis=0) self.v2_norm = 1 - np.abs(self.v3_norm - self.avg) self.c_total = np.sum(1 - np.abs(self.v3_norm - np.mean(self.v3_norm, axis=0)), axis=0) def get_min_max(self): """Returns the minimum and maximum values of the input data.""" return self.min, self.max, self.avg def get_v2_norm(self): """Returns the ``v2`` normalized data.""" return self.v2_norm def get_v3_norm(self): """Returns the ``v3`` normalized data.""" return self.v3_norm def get_c_total(self): """Returns the ``c_total`` values.""" return self.c_total def read_cpptraj(break_line=None, norm_type=None, min=None, max=None, avg=None, normalize=False): """Read multiple AMBER CRD files to convert to numpy ndarray formatting and normalize the data. Parameters ---------- break_line : int The number of columns per line of the input file. (have to n-1 because ignore first line) norm_type : str The type of normalization to use. Can be ``v2`` or ``v3``. min : float or None, optional The minimum value to use for normalization. If not provided, the minimum value of the input data is used. Defaults to None. max : float or None, optional The maximum value to use for normalization. If not provided, the maximum value of the input data is used. Defaults to None. avg : float or None, optional The average value to use for normalization. If not provided, the average value of the input data is used. Defaults to None. normalize : bool, optional Whether to normalize the input data. If True, the data is normalized to the range [0, 1]. Defaults to False. Returns ------- np.ndarray The concatenated input data as a numpy array. Notes ----- Not recommended due to inefficiency and 3-decimal precision loss. Please use ``gen_traj_numpy`` for all Molecular Dynamics data. """ input_files = sorted(glob.glob("clusttraj.c*"), key=lambda x: int(re.findall("\d+", x)[0])) break_line = break_line frames_list = [] count_frames = [] for file in input_files: with open(file, 'r') as infile: lines = [line.rstrip() for line in infile][1:] sep_lines = [[line[i:i+8] for i in range(0, len(line), 8)] for line in lines] chunks = [sep_lines[i:i+break_line] for i in range(0, len(sep_lines), break_line)] str_frames = [list(chain.from_iterable(chunk)) for chunk in chunks] str_frames = [' '.join(frame) for frame in str_frames] frames = np.array([np.fromstring(frame, dtype='float32', sep=' ') for frame in str_frames]) if normalize: norm = Normalizer(data=frames, custom_min=min, custom_max=max, custom_avg=avg) if norm_type == "v2": normed_frame = norm.get_v2_norm() elif norm_type == "v3": normed_frame = norm.get_v3_norm() np.savetxt(f"normed_{file}", normed_frame) else: frames_list.append(frames) count_frames.append(len(frames)) if not normalize: data = np.concatenate(frames_list, axis=0) return data
[docs]def normalize_file(file, break_line=None, norm_type=None): """Normalize a single file and output the normalized data to a new file. Parameters ---------- file : str The file path of the input data. output : str The file path of the output data. break_line : int The number of columns per line of the input file. (have to n-1 because ignore first line) norm_type : str The type of normalization to use. Can be ``v2`` or ``v3``. min : float or None, optional The minimum value to use for normalization. If not provided, the minimum value of the input data is used. Defaults to None. max : float or None, optional The maximum value to use for normalization. If not provided, the maximum value of the input data is used. Defaults to None. avg : float or None, optional The average value to use for normalization. If not provided, the average value of the input data is used. Defaults to None. Returns ------- tuple The minimum, maximum, and average values of the input data. """ if file is not isinstance(file, str): frames = file if break_line: break_line = break_line with open(file, 'r') as infile: lines = [line.rstrip() for line in infile][1:] sep_lines = [[line[i:i+8] for i in range(0, len(line), 8)] for line in lines] chunks = [sep_lines[i:i+break_line] for i in range(0, len(sep_lines), break_line)] str_frames = [list(chain.from_iterable(chunk)) for chunk in chunks] str_frames = [' '.join(frame) for frame in str_frames] frames = np.array([np.fromstring(frame, dtype='float32', sep=' ') for frame in str_frames]) if norm_type == "v2": norm = Normalizer(data=frames) normed_frame = norm.get_v2_norm() elif norm_type == "v3": norm = Normalizer(data=frames) normed_frame = norm.get_v3_norm() min, max, avg = norm.get_min_max() return normed_frame, min, max, avg