scmkl.data_processing

  1import numpy as np
  2import scipy
  3from sklearn.decomposition import TruncatedSVD, PCA
  4import anndata as ad
  5
  6
  7def sparse_var(X: scipy.sparse._csc.csc_matrix | np.ndarray, axis: int | None=None):
  8    """
  9    Function to calculate variance on a scipy sparse matrix.
 10    
 11    Parameters
 12    ----------
 13    X : scipy.sparse._csc.csc_matrix | np.ndarray
 14        A scipy sparse or numpy array
 15        
 16    axis : int | None
 17        Determines which axis variance is calculated on. Same usage 
 18        as Numpy.
 19    
 20    Returns
 21    -------
 22    var : np.ndarray | float
 23        Variance values calculated over the given axis.
 24    """
 25    # E[X^2] - E[X]^2
 26    if scipy.sparse.issparse(X):
 27        exp_mean = np.asarray(X.power(2).mean(axis = axis)).flatten()
 28        sq_mean = np.asarray(np.square(X.mean(axis = axis))).flatten()
 29        var = np.array(exp_mean - sq_mean)
 30    else:
 31        var = np.asarray(np.var(X, axis = axis)).flatten()
 32
 33    return var.ravel()
 34
 35
 36def process_data(X_train: np.ndarray | scipy.sparse._csc.csc_matrix,
 37                 X_test: np.ndarray | scipy.sparse._csc.csc_matrix | None=None,
 38                 scale_data: bool=True, transform_data: bool=False,
 39                 return_dense: bool=True):
 40    """
 41    Function to preprocess data matrix according to type of data 
 42    (e.g. counts/rna, or binary/atac). Will process test data 
 43    according to parameters calculated from test data.
 44    
 45    Parameters
 46    ----------
 47    X_train : np.ndarray | scipy.sparse._csc.csc_matrix
 48        A scipy sparse or numpy array of cells x features in the 
 49        training data.
 50
 51    X_test : np.ndarray | scipy.sparse._csc.csc_matrix
 52        A scipy sparse or numpy array of cells x features in the 
 53        testing data.
 54
 55    scale_data : bool
 56        If `True`, data will be logarithmized then z-score 
 57        transformed.
 58
 59    transform_data : bool
 60        If `True`, data will be log1p transformed (recommended for 
 61        counts data). Default is `False`.
 62
 63    return_dense: bool
 64        If `True`, a np.ndarray will be returned as opposed to a 
 65        scipy.sparse object.
 66    
 67    Returns
 68    -------
 69    X_train, X_test : np.ndarray, np.ndarray
 70        Numpy arrays with the process train/test data 
 71        respectively. If X_test is `None`, only X_train is returned.
 72    """
 73    if X_test is None:
 74        # Creates dummy matrix to for the sake of calculation without 
 75        # increasing computational time
 76        X_test = X_train[:1,:] 
 77        orig_test = None
 78    else:
 79        orig_test = 'given'
 80
 81    # Remove features that have no variance in the training data 
 82    # (will be uniformative)
 83    var = sparse_var(X_train, axis = 0)
 84    variable_features = np.where(var > 1e-5)[0]
 85
 86    X_train = X_train[:,variable_features]
 87    X_test = X_test[:, variable_features]
 88
 89    # Data processing according to data type
 90    if transform_data:
 91
 92        if scipy.sparse.issparse(X_train):
 93            X_train = X_train.log1p()
 94            X_test = X_test.log1p()
 95        else:
 96            X_train = np.log1p(X_train)
 97            X_test = np.log1p(X_test)
 98    
 99    if scale_data:
100        #Center and scale count data
101        train_means = np.mean(X_train, 0)
102        train_sds = np.sqrt(var[variable_features])
103
104        # Perform transformation on test data according to parameters 
105        # of the training data
106        X_train = (X_train - train_means) / train_sds
107        X_test = (X_test - train_means) / train_sds
108
109
110    if return_dense and scipy.sparse.issparse(X_train):
111        X_train = X_train.toarray()
112        X_test = X_test.toarray()
113
114
115    if orig_test is None:
116        return X_train
117    else:
118        return X_train, X_test
119    
120
121def svd_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
122                       X_test: scipy.sparse._csc.csc_matrix | 
123                       np.ndarray | None=None):
124    """
125    Returns matrices with SVD reduction. If `X_test is None`, only 
126    X_train is returned.
127
128    Parameters
129    ----------
130    X_train : np.ndarray
131        A 2D array of cells x features filtered to desired features 
132        for training data.
133
134    X_test : np.ndarray | None
135        A 2D array of cells x features filtered to desired features 
136        for testing data.
137    
138    Returns
139    -------
140    X_train, X_test : np.ndarray, np.ndarray
141        Transformed matrices. Only X_train is returned if 
142        `X_test is None`.
143    """
144    n_components = np.min([50, X_train.shape[1]])
145    SVD_func = TruncatedSVD(n_components = n_components, random_state = 1)
146    
147    # Remove first component as it corresponds with sequencing depth
148    # We convert to a csr_array because the SVD function is faster on this
149    # matrix type
150    X_train = SVD_func.fit_transform(scipy.sparse.csr_array(X_train))[:, 1:]
151
152    if X_test is not None:
153        X_test = SVD_func.transform(scipy.sparse.csr_array(X_test))[:, 1:]
154    
155    return X_train, X_test
156
157
158def sample_cells(train_indices: np.ndarray,
159                 sample_size: int,
160                 seed_obj: np.random._generator.Generator):
161    """
162    Samples cells indices from training indices for calculations.
163
164    Parameters
165    ----------
166    train_indices : np.ndarray
167        An array of indices to sample from.
168
169    sample_size : int
170        Number of samples to take from `train_indices`. Must be 
171        smaller than length of `train_indices`.
172
173    Returns
174    -------
175    indices : np.ndarray
176        The sampled indices from `train_indices`.
177    """
178    n_samples = np.min((train_indices.shape[0], sample_size))
179    indices = seed_obj.choice(train_indices, n_samples, replace = False)
180
181    return indices
182
183
184def pca_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
185                       X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None):
186    """
187    Returns matrices with PCA reduction. If `X_test is None`, only 
188    X_train is returned.
189
190    Parameters
191    ----------
192    X_train : scipy.sparse._csc.csc_matrix | np.ndarray
193        A 2D array of cells x features filtered to desired features 
194        for training data.
195
196    X_test : scipy.sparse._csc.csc_matrix | np.ndarray | None
197        A 2D array of cells x features filtered to desired features 
198        for testing data.
199    
200    Returns
201    -------
202    X_train, X_test : np.ndarray, np.ndarray
203        Transformed matrices. Only X_train is returned if 
204        `X_test is None`.
205    """
206    n_components = np.min([50, X_train.shape[1]])
207    PCA_func = PCA(n_components = n_components, random_state = 1)
208
209    X_train = PCA_func.fit_transform(np.asarray(X_train))
210
211    if X_test is not None:
212        X_test = PCA_func.transform(np.asarray(X_test))
213    
214    return X_train, X_test
215
216
217def _no_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
218                      X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None):
219    """
220    Dummy function used to return mat inputs.
221    """
222    return X_train, X_test
223
224
225def get_reduction(reduction: str):
226    """
227    Function used to identify reduction type and return function to 
228    apply to data matrices.
229
230    Parameters
231    ----------
232    reduction : str
233        The reduction for data transformation. Options are `['pca', 
234        'svd', 'None']`.
235
236    Returns
237    -------
238    red_func : function
239        The function to reduce the data.
240    """
241    match reduction:
242        case 'pca':
243            red_func = pca_transformation
244        case 'svd':
245            red_func = svd_transformation
246        case 'None':
247            red_func = _no_transformation
248
249    return red_func
250
251
252def get_group_mat(adata: ad.AnnData, n_features: int,
253                  group_features: np.ndarray,
254                  n_group_features: int, 
255                  process_test: bool=False) -> np.ndarray:
256    """
257    Filters to only features in group. Will sample features if 
258    `n_features < n_group_features`.
259
260    Parameters
261    ----------
262    adata : anndata.AnnData
263        anndata object with `'seed_obj'`, `'train_indices'`, and 
264        `'test_indices'` in `.uns`.
265
266    n_features : int
267        Maximum number of features to keep in matrix. Only 
268        impacts mat if `n_features < n_group_features`.
269    
270    group_features : list | tuple | np.ndarray
271        Feature names in group to filter matrices to.
272
273    n_group_features : int
274        Number of features in group.
275
276    n_samples : int
277        Number of samples to filter X_train to.
278
279    Returns
280    -------
281    X_train, X_test : np.ndarray, np.ndarray
282        Filtered matrices. If `n_samples` is provided, only `X_train` 
283        is returned. If `adata.uns['reduction']` is `'pca'` or 
284        `'svd'` the matrices are transformed before being returned.
285    """
286    # Getting reduction function
287    reduction_func = get_reduction(adata.uns['reduction'])
288
289    # Sample up to n_features features- important for scalability if 
290    # using large groupings
291    # Will use all features if the grouping contains fewer than n_features
292    number_features = np.min([n_features, n_group_features])
293    group_array = np.array(list(group_features))
294    group_features = adata.uns['seed_obj'].choice(group_array, 
295                                                  number_features, 
296                                                  replace = False) 
297
298    # Create data arrays containing only features within this group
299    if process_test:
300        X_train = adata[adata.uns['train_indices'],:][:, group_features].X
301        X_test = adata[adata.uns['test_indices'],:][:, group_features].X
302        X_train, X_test = reduction_func(X_train, X_test)
303        return X_train, X_test
304
305    else:
306        X_train = adata[:, group_features].X
307        return X_train
def sparse_var( X: scipy.sparse._csc.csc_matrix | numpy.ndarray, axis: int | None = None):
 8def sparse_var(X: scipy.sparse._csc.csc_matrix | np.ndarray, axis: int | None=None):
 9    """
10    Function to calculate variance on a scipy sparse matrix.
11    
12    Parameters
13    ----------
14    X : scipy.sparse._csc.csc_matrix | np.ndarray
15        A scipy sparse or numpy array
16        
17    axis : int | None
18        Determines which axis variance is calculated on. Same usage 
19        as Numpy.
20    
21    Returns
22    -------
23    var : np.ndarray | float
24        Variance values calculated over the given axis.
25    """
26    # E[X^2] - E[X]^2
27    if scipy.sparse.issparse(X):
28        exp_mean = np.asarray(X.power(2).mean(axis = axis)).flatten()
29        sq_mean = np.asarray(np.square(X.mean(axis = axis))).flatten()
30        var = np.array(exp_mean - sq_mean)
31    else:
32        var = np.asarray(np.var(X, axis = axis)).flatten()
33
34    return var.ravel()

Function to calculate variance on a scipy sparse matrix.

Parameters
  • X (scipy.sparse._csc.csc_matrix | np.ndarray): A scipy sparse or numpy array
  • axis (int | None): Determines which axis variance is calculated on. Same usage as Numpy.
Returns
  • var (np.ndarray | float): Variance values calculated over the given axis.
def process_data( X_train: numpy.ndarray | scipy.sparse._csc.csc_matrix, X_test: numpy.ndarray | scipy.sparse._csc.csc_matrix | None = None, scale_data: bool = True, transform_data: bool = False, return_dense: bool = True):
 37def process_data(X_train: np.ndarray | scipy.sparse._csc.csc_matrix,
 38                 X_test: np.ndarray | scipy.sparse._csc.csc_matrix | None=None,
 39                 scale_data: bool=True, transform_data: bool=False,
 40                 return_dense: bool=True):
 41    """
 42    Function to preprocess data matrix according to type of data 
 43    (e.g. counts/rna, or binary/atac). Will process test data 
 44    according to parameters calculated from test data.
 45    
 46    Parameters
 47    ----------
 48    X_train : np.ndarray | scipy.sparse._csc.csc_matrix
 49        A scipy sparse or numpy array of cells x features in the 
 50        training data.
 51
 52    X_test : np.ndarray | scipy.sparse._csc.csc_matrix
 53        A scipy sparse or numpy array of cells x features in the 
 54        testing data.
 55
 56    scale_data : bool
 57        If `True`, data will be logarithmized then z-score 
 58        transformed.
 59
 60    transform_data : bool
 61        If `True`, data will be log1p transformed (recommended for 
 62        counts data). Default is `False`.
 63
 64    return_dense: bool
 65        If `True`, a np.ndarray will be returned as opposed to a 
 66        scipy.sparse object.
 67    
 68    Returns
 69    -------
 70    X_train, X_test : np.ndarray, np.ndarray
 71        Numpy arrays with the process train/test data 
 72        respectively. If X_test is `None`, only X_train is returned.
 73    """
 74    if X_test is None:
 75        # Creates dummy matrix to for the sake of calculation without 
 76        # increasing computational time
 77        X_test = X_train[:1,:] 
 78        orig_test = None
 79    else:
 80        orig_test = 'given'
 81
 82    # Remove features that have no variance in the training data 
 83    # (will be uniformative)
 84    var = sparse_var(X_train, axis = 0)
 85    variable_features = np.where(var > 1e-5)[0]
 86
 87    X_train = X_train[:,variable_features]
 88    X_test = X_test[:, variable_features]
 89
 90    # Data processing according to data type
 91    if transform_data:
 92
 93        if scipy.sparse.issparse(X_train):
 94            X_train = X_train.log1p()
 95            X_test = X_test.log1p()
 96        else:
 97            X_train = np.log1p(X_train)
 98            X_test = np.log1p(X_test)
 99    
100    if scale_data:
101        #Center and scale count data
102        train_means = np.mean(X_train, 0)
103        train_sds = np.sqrt(var[variable_features])
104
105        # Perform transformation on test data according to parameters 
106        # of the training data
107        X_train = (X_train - train_means) / train_sds
108        X_test = (X_test - train_means) / train_sds
109
110
111    if return_dense and scipy.sparse.issparse(X_train):
112        X_train = X_train.toarray()
113        X_test = X_test.toarray()
114
115
116    if orig_test is None:
117        return X_train
118    else:
119        return X_train, X_test

Function to preprocess data matrix according to type of data (e.g. counts/rna, or binary/atac). Will process test data according to parameters calculated from test data.

Parameters
  • X_train (np.ndarray | scipy.sparse._csc.csc_matrix): A scipy sparse or numpy array of cells x features in the training data.
  • X_test (np.ndarray | scipy.sparse._csc.csc_matrix): A scipy sparse or numpy array of cells x features in the testing data.
  • scale_data (bool): If True, data will be logarithmized then z-score transformed.
  • transform_data (bool): If True, data will be log1p transformed (recommended for counts data). Default is False.
  • return_dense (bool): If True, a np.ndarray will be returned as opposed to a scipy.sparse object.
Returns
  • X_train, X_test (np.ndarray, np.ndarray): Numpy arrays with the process train/test data respectively. If X_test is None, only X_train is returned.
def svd_transformation( X_train: scipy.sparse._csc.csc_matrix | numpy.ndarray, X_test: scipy.sparse._csc.csc_matrix | numpy.ndarray | None = None):
122def svd_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
123                       X_test: scipy.sparse._csc.csc_matrix | 
124                       np.ndarray | None=None):
125    """
126    Returns matrices with SVD reduction. If `X_test is None`, only 
127    X_train is returned.
128
129    Parameters
130    ----------
131    X_train : np.ndarray
132        A 2D array of cells x features filtered to desired features 
133        for training data.
134
135    X_test : np.ndarray | None
136        A 2D array of cells x features filtered to desired features 
137        for testing data.
138    
139    Returns
140    -------
141    X_train, X_test : np.ndarray, np.ndarray
142        Transformed matrices. Only X_train is returned if 
143        `X_test is None`.
144    """
145    n_components = np.min([50, X_train.shape[1]])
146    SVD_func = TruncatedSVD(n_components = n_components, random_state = 1)
147    
148    # Remove first component as it corresponds with sequencing depth
149    # We convert to a csr_array because the SVD function is faster on this
150    # matrix type
151    X_train = SVD_func.fit_transform(scipy.sparse.csr_array(X_train))[:, 1:]
152
153    if X_test is not None:
154        X_test = SVD_func.transform(scipy.sparse.csr_array(X_test))[:, 1:]
155    
156    return X_train, X_test

Returns matrices with SVD reduction. If X_test is None, only X_train is returned.

Parameters
  • X_train (np.ndarray): A 2D array of cells x features filtered to desired features for training data.
  • X_test (np.ndarray | None): A 2D array of cells x features filtered to desired features for testing data.
Returns
  • X_train, X_test (np.ndarray, np.ndarray): Transformed matrices. Only X_train is returned if X_test is None.
def sample_cells( train_indices: numpy.ndarray, sample_size: int, seed_obj: numpy.random._generator.Generator):
159def sample_cells(train_indices: np.ndarray,
160                 sample_size: int,
161                 seed_obj: np.random._generator.Generator):
162    """
163    Samples cells indices from training indices for calculations.
164
165    Parameters
166    ----------
167    train_indices : np.ndarray
168        An array of indices to sample from.
169
170    sample_size : int
171        Number of samples to take from `train_indices`. Must be 
172        smaller than length of `train_indices`.
173
174    Returns
175    -------
176    indices : np.ndarray
177        The sampled indices from `train_indices`.
178    """
179    n_samples = np.min((train_indices.shape[0], sample_size))
180    indices = seed_obj.choice(train_indices, n_samples, replace = False)
181
182    return indices

Samples cells indices from training indices for calculations.

Parameters
  • train_indices (np.ndarray): An array of indices to sample from.
  • sample_size (int): Number of samples to take from train_indices. Must be smaller than length of train_indices.
Returns
  • indices (np.ndarray): The sampled indices from train_indices.
def pca_transformation( X_train: scipy.sparse._csc.csc_matrix | numpy.ndarray, X_test: scipy.sparse._csc.csc_matrix | numpy.ndarray | None = None):
185def pca_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
186                       X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None):
187    """
188    Returns matrices with PCA reduction. If `X_test is None`, only 
189    X_train is returned.
190
191    Parameters
192    ----------
193    X_train : scipy.sparse._csc.csc_matrix | np.ndarray
194        A 2D array of cells x features filtered to desired features 
195        for training data.
196
197    X_test : scipy.sparse._csc.csc_matrix | np.ndarray | None
198        A 2D array of cells x features filtered to desired features 
199        for testing data.
200    
201    Returns
202    -------
203    X_train, X_test : np.ndarray, np.ndarray
204        Transformed matrices. Only X_train is returned if 
205        `X_test is None`.
206    """
207    n_components = np.min([50, X_train.shape[1]])
208    PCA_func = PCA(n_components = n_components, random_state = 1)
209
210    X_train = PCA_func.fit_transform(np.asarray(X_train))
211
212    if X_test is not None:
213        X_test = PCA_func.transform(np.asarray(X_test))
214    
215    return X_train, X_test

Returns matrices with PCA reduction. If X_test is None, only X_train is returned.

Parameters
  • X_train (scipy.sparse._csc.csc_matrix | np.ndarray): A 2D array of cells x features filtered to desired features for training data.
  • X_test (scipy.sparse._csc.csc_matrix | np.ndarray | None): A 2D array of cells x features filtered to desired features for testing data.
Returns
  • X_train, X_test (np.ndarray, np.ndarray): Transformed matrices. Only X_train is returned if X_test is None.
def get_reduction(reduction: str):
226def get_reduction(reduction: str):
227    """
228    Function used to identify reduction type and return function to 
229    apply to data matrices.
230
231    Parameters
232    ----------
233    reduction : str
234        The reduction for data transformation. Options are `['pca', 
235        'svd', 'None']`.
236
237    Returns
238    -------
239    red_func : function
240        The function to reduce the data.
241    """
242    match reduction:
243        case 'pca':
244            red_func = pca_transformation
245        case 'svd':
246            red_func = svd_transformation
247        case 'None':
248            red_func = _no_transformation
249
250    return red_func

Function used to identify reduction type and return function to apply to data matrices.

Parameters
  • reduction (str): The reduction for data transformation. Options are ['pca', 'svd', 'None'].
Returns
  • red_func (function): The function to reduce the data.
def get_group_mat( adata: anndata._core.anndata.AnnData, n_features: int, group_features: numpy.ndarray, n_group_features: int, process_test: bool = False) -> numpy.ndarray:
253def get_group_mat(adata: ad.AnnData, n_features: int,
254                  group_features: np.ndarray,
255                  n_group_features: int, 
256                  process_test: bool=False) -> np.ndarray:
257    """
258    Filters to only features in group. Will sample features if 
259    `n_features < n_group_features`.
260
261    Parameters
262    ----------
263    adata : anndata.AnnData
264        anndata object with `'seed_obj'`, `'train_indices'`, and 
265        `'test_indices'` in `.uns`.
266
267    n_features : int
268        Maximum number of features to keep in matrix. Only 
269        impacts mat if `n_features < n_group_features`.
270    
271    group_features : list | tuple | np.ndarray
272        Feature names in group to filter matrices to.
273
274    n_group_features : int
275        Number of features in group.
276
277    n_samples : int
278        Number of samples to filter X_train to.
279
280    Returns
281    -------
282    X_train, X_test : np.ndarray, np.ndarray
283        Filtered matrices. If `n_samples` is provided, only `X_train` 
284        is returned. If `adata.uns['reduction']` is `'pca'` or 
285        `'svd'` the matrices are transformed before being returned.
286    """
287    # Getting reduction function
288    reduction_func = get_reduction(adata.uns['reduction'])
289
290    # Sample up to n_features features- important for scalability if 
291    # using large groupings
292    # Will use all features if the grouping contains fewer than n_features
293    number_features = np.min([n_features, n_group_features])
294    group_array = np.array(list(group_features))
295    group_features = adata.uns['seed_obj'].choice(group_array, 
296                                                  number_features, 
297                                                  replace = False) 
298
299    # Create data arrays containing only features within this group
300    if process_test:
301        X_train = adata[adata.uns['train_indices'],:][:, group_features].X
302        X_test = adata[adata.uns['test_indices'],:][:, group_features].X
303        X_train, X_test = reduction_func(X_train, X_test)
304        return X_train, X_test
305
306    else:
307        X_train = adata[:, group_features].X
308        return X_train

Filters to only features in group. Will sample features if n_features < n_group_features.

Parameters
  • adata (anndata.AnnData): anndata object with 'seed_obj', 'train_indices', and 'test_indices' in .uns.
  • n_features (int): Maximum number of features to keep in matrix. Only impacts mat if n_features < n_group_features.
  • group_features (list | tuple | np.ndarray): Feature names in group to filter matrices to.
  • n_group_features (int): Number of features in group.
  • n_samples (int): Number of samples to filter X_train to.
Returns
  • X_train, X_test (np.ndarray, np.ndarray): Filtered matrices. If n_samples is provided, only X_train is returned. If adata.uns['reduction'] is 'pca' or 'svd' the matrices are transformed before being returned.