scmkl.data_processing

  1import numpy as np
  2import scipy
  3from sklearn.decomposition import TruncatedSVD, PCA
  4import anndata as ad
  5
  6
  7def sparse_var(X: scipy.sparse._csc.csc_matrix | np.ndarray, axis: int | None=None):
  8    """
  9    Function to calculate variance on a scipy sparse matrix.
 10    
 11    Parameters
 12    ----------
 13    X : scipy.sparse._csc.csc_matrix | np.ndarray
 14        A scipy sparse or numpy array
 15        
 16    axis : int | None
 17        Determines which axis variance is calculated on. Same usage 
 18        as Numpy.
 19    
 20    Returns
 21    -------
 22    var : np.ndarray | float
 23        Variance values calculated over the given axis.
 24    """
 25    # E[X^2] - E[X]^2
 26    if scipy.sparse.issparse(X):
 27        exp_mean = np.asarray(X.power(2).mean(axis = axis)).flatten()
 28        sq_mean = np.asarray(np.square(X.mean(axis = axis))).flatten()
 29        var = np.array(exp_mean - sq_mean)
 30    else:
 31        var = np.asarray(np.var(X, axis = axis)).flatten()
 32
 33    return var.ravel()
 34
 35
 36def process_data(X_train: np.ndarray | scipy.sparse._csc.csc_matrix,
 37                 X_test: np.ndarray | scipy.sparse._csc.csc_matrix | None=None,
 38                 scale_data: bool=True, 
 39                 return_dense: bool=True):
 40    """
 41    Function to preprocess data matrix according to type of data 
 42    (e.g. counts/rna, or binary/atac). Will process test data 
 43    according to parameters calculated from test data.
 44    
 45    Parameters
 46    ----------
 47    X_train : np.ndarray | scipy.sparse._csc.csc_matrix
 48        A scipy sparse or numpy array of cells x features in the 
 49        training data.
 50
 51    X_test : np.ndarray | scipy.sparse._csc.csc_matrix
 52        A scipy sparse or numpy array of cells x features in the 
 53        testing data.
 54
 55    scale_data : bool
 56        If `True`, data will be logarithmized then z-score 
 57        transformed.
 58
 59    return_dense: bool
 60        If `True`, a np.ndarray will be returned as opposed to a 
 61        scipy.sparse object.
 62    
 63    Returns
 64    -------
 65    X_train, X_test : np.ndarray, np.ndarray
 66        Numpy arrays with the process train/test data 
 67        respectively. If X_test is `None`, only X_train is returned.
 68    """
 69    if X_test is None:
 70        # Creates dummy matrix to for the sake of calculation without 
 71        # increasing computational time
 72        X_test = X_train[:1,:] 
 73        orig_test = None
 74    else:
 75        orig_test = 'given'
 76
 77    # Remove features that have no variance in the training data 
 78    # (will be uniformative)
 79    var = sparse_var(X_train, axis = 0)
 80    variable_features = np.where(var > 1e-5)[0]
 81
 82    X_train = X_train[:,variable_features]
 83    X_test = X_test[:, variable_features]
 84
 85    # Data processing according to data type
 86    if scale_data:
 87
 88        if scipy.sparse.issparse(X_train):
 89            X_train = X_train.log1p()
 90            X_test = X_test.log1p()
 91        else:
 92            X_train = np.log1p(X_train)
 93            X_test = np.log1p(X_test)
 94            
 95        #Center and scale count data
 96        train_means = np.mean(X_train, 0)
 97        train_sds = np.sqrt(var[variable_features])
 98
 99        # Perform transformation on test data according to parameters 
100        # of the training data
101        X_train = (X_train - train_means) / train_sds
102        X_test = (X_test - train_means) / train_sds
103
104
105    if return_dense and scipy.sparse.issparse(X_train):
106        X_train = X_train.toarray()
107        X_test = X_test.toarray()
108
109
110    if orig_test is None:
111        return X_train
112    else:
113        return X_train, X_test
114    
115
116def svd_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
117                       X_test: scipy.sparse._csc.csc_matrix | 
118                       np.ndarray | None=None):
119    """
120    Returns matrices with SVD reduction. If `X_test is None`, only 
121    X_train is returned.
122
123    Parameters
124    ----------
125    X_train : np.ndarray
126        A 2D array of cells x features filtered to desired features 
127        for training data.
128
129    X_test : np.ndarray | None
130        A 2D array of cells x features filtered to desired features 
131        for testing data.
132    
133    Returns
134    -------
135    X_train, X_test : np.ndarray, np.ndarray
136        Transformed matrices. Only X_train is returned if 
137        `X_test is None`.
138    """
139    n_components = np.min([50, X_train.shape[1]])
140    SVD_func = TruncatedSVD(n_components = n_components, random_state = 1)
141    
142    # Remove first component as it corresponds with sequencing depth
143    # We convert to a csr_array because the SVD function is faster on this
144    # matrix type
145    X_train = SVD_func.fit_transform(scipy.sparse.csr_array(X_train))[:, 1:]
146
147    if X_test is not None:
148        X_test = SVD_func.transform(scipy.sparse.csr_array(X_test))[:, 1:]
149    
150    return X_train, X_test
151
152
153def sample_cells(train_indices: np.ndarray,
154                 sample_size: int,
155                 seed_obj: np.random._generator.Generator):
156    """
157    Samples cells indices from training indices for calculations.
158
159    Parameters
160    ----------
161    train_indices : np.ndarray
162        An array of indices to sample from.
163
164    sample_size : int
165        Number of samples to take from `train_indices`. Must be 
166        smaller than length of `train_indices`.
167
168    Returns
169    -------
170    indices : np.ndarray
171        The sampled indices from `train_indices`.
172    """
173    n_samples = np.min((train_indices.shape[0], sample_size))
174    indices = seed_obj.choice(train_indices, n_samples, replace = False)
175
176    return indices
177
178
179def pca_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
180                       X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None):
181    """
182    Returns matrices with PCA reduction. If `X_test is None`, only 
183    X_train is returned.
184
185    Parameters
186    ----------
187    X_train : scipy.sparse._csc.csc_matrix | np.ndarray
188        A 2D array of cells x features filtered to desired features 
189        for training data.
190
191    X_test : scipy.sparse._csc.csc_matrix | np.ndarray | None
192        A 2D array of cells x features filtered to desired features 
193        for testing data.
194    
195    Returns
196    -------
197    X_train, X_test : np.ndarray, np.ndarray
198        Transformed matrices. Only X_train is returned if 
199        `X_test is None`.
200    """
201    n_components = np.min([50, X_train.shape[1]])
202    PCA_func = PCA(n_components = n_components, random_state = 1)
203
204    X_train = PCA_func.fit_transform(np.asarray(X_train))
205
206    if X_test is not None:
207        X_test = PCA_func.transform(np.asarray(X_test))
208    
209    return X_train, X_test
210
211
212def _no_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
213                      X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None):
214    """
215    Dummy function used to return mat inputs.
216    """
217    return X_train, X_test
218
219
220def get_reduction(reduction: str):
221    """
222    Function used to identify reduction type and return function to 
223    apply to data matrices.
224
225    Parameters
226    ----------
227    reduction : str
228        The reduction for data transformation. Options are `['pca', 
229        'svd', 'None']`.
230
231    Returns
232    -------
233    red_func : function
234        The function to reduce the data.
235    """
236    match reduction:
237        case 'pca':
238            red_func = pca_transformation
239        case 'svd':
240            red_func = svd_transformation
241        case 'None':
242            red_func = _no_transformation
243
244    return red_func
245
246
247def get_group_mat(adata: ad.AnnData, n_features: int,
248                  group_features: np.ndarray,
249                  n_group_features: int, 
250                  process_test: bool=False) -> np.ndarray:
251    """
252    Filters to only features in group. Will sample features if 
253    `n_features < n_group_features`.
254
255    Parameters
256    ----------
257    adata : anndata.AnnData
258        anndata object with `'seed_obj'`, `'train_indices'`, and 
259        `'test_indices'` in `.uns`.
260
261    n_features : int
262        Maximum number of features to keep in matrix. Only 
263        impacts mat if `n_features < n_group_features`.
264    
265    group_features : list | tuple | np.ndarray
266        Feature names in group to filter matrices to.
267
268    n_group_features : int
269        Number of features in group.
270
271    n_samples : int
272        Number of samples to filter X_train to.
273
274    Returns
275    -------
276    X_train, X_test : np.ndarray, np.ndarray
277        Filtered matrices. If `n_samples` is provided, only `X_train` 
278        is returned. If `adata.uns['reduction']` is `'pca'` or 
279        `'svd'` the matrices are transformed before being returned.
280    """
281    # Getting reduction function
282    reduction_func = get_reduction(adata.uns['reduction'])
283
284    # Sample up to n_features features- important for scalability if 
285    # using large groupings
286    # Will use all features if the grouping contains fewer than n_features
287    number_features = np.min([n_features, n_group_features])
288    group_array = np.array(list(group_features))
289    group_features = adata.uns['seed_obj'].choice(group_array, 
290                                                  number_features, 
291                                                  replace = False) 
292
293    # Create data arrays containing only features within this group
294    if process_test:
295        X_train = adata[adata.uns['train_indices'],:][:, group_features].X
296        X_test = adata[adata.uns['test_indices'],:][:, group_features].X
297        X_train, X_test = reduction_func(X_train, X_test)
298        return X_train, X_test
299
300    else:
301        X_train = adata[:, group_features].X
302        return X_train
def sparse_var( X: scipy.sparse._csc.csc_matrix | numpy.ndarray, axis: int | None = None):
 8def sparse_var(X: scipy.sparse._csc.csc_matrix | np.ndarray, axis: int | None=None):
 9    """
10    Function to calculate variance on a scipy sparse matrix.
11    
12    Parameters
13    ----------
14    X : scipy.sparse._csc.csc_matrix | np.ndarray
15        A scipy sparse or numpy array
16        
17    axis : int | None
18        Determines which axis variance is calculated on. Same usage 
19        as Numpy.
20    
21    Returns
22    -------
23    var : np.ndarray | float
24        Variance values calculated over the given axis.
25    """
26    # E[X^2] - E[X]^2
27    if scipy.sparse.issparse(X):
28        exp_mean = np.asarray(X.power(2).mean(axis = axis)).flatten()
29        sq_mean = np.asarray(np.square(X.mean(axis = axis))).flatten()
30        var = np.array(exp_mean - sq_mean)
31    else:
32        var = np.asarray(np.var(X, axis = axis)).flatten()
33
34    return var.ravel()

Function to calculate variance on a scipy sparse matrix.

Parameters
  • X (scipy.sparse._csc.csc_matrix | np.ndarray): A scipy sparse or numpy array
  • axis (int | None): Determines which axis variance is calculated on. Same usage as Numpy.
Returns
  • var (np.ndarray | float): Variance values calculated over the given axis.
def process_data( X_train: numpy.ndarray | scipy.sparse._csc.csc_matrix, X_test: numpy.ndarray | scipy.sparse._csc.csc_matrix | None = None, scale_data: bool = True, return_dense: bool = True):
 37def process_data(X_train: np.ndarray | scipy.sparse._csc.csc_matrix,
 38                 X_test: np.ndarray | scipy.sparse._csc.csc_matrix | None=None,
 39                 scale_data: bool=True, 
 40                 return_dense: bool=True):
 41    """
 42    Function to preprocess data matrix according to type of data 
 43    (e.g. counts/rna, or binary/atac). Will process test data 
 44    according to parameters calculated from test data.
 45    
 46    Parameters
 47    ----------
 48    X_train : np.ndarray | scipy.sparse._csc.csc_matrix
 49        A scipy sparse or numpy array of cells x features in the 
 50        training data.
 51
 52    X_test : np.ndarray | scipy.sparse._csc.csc_matrix
 53        A scipy sparse or numpy array of cells x features in the 
 54        testing data.
 55
 56    scale_data : bool
 57        If `True`, data will be logarithmized then z-score 
 58        transformed.
 59
 60    return_dense: bool
 61        If `True`, a np.ndarray will be returned as opposed to a 
 62        scipy.sparse object.
 63    
 64    Returns
 65    -------
 66    X_train, X_test : np.ndarray, np.ndarray
 67        Numpy arrays with the process train/test data 
 68        respectively. If X_test is `None`, only X_train is returned.
 69    """
 70    if X_test is None:
 71        # Creates dummy matrix to for the sake of calculation without 
 72        # increasing computational time
 73        X_test = X_train[:1,:] 
 74        orig_test = None
 75    else:
 76        orig_test = 'given'
 77
 78    # Remove features that have no variance in the training data 
 79    # (will be uniformative)
 80    var = sparse_var(X_train, axis = 0)
 81    variable_features = np.where(var > 1e-5)[0]
 82
 83    X_train = X_train[:,variable_features]
 84    X_test = X_test[:, variable_features]
 85
 86    # Data processing according to data type
 87    if scale_data:
 88
 89        if scipy.sparse.issparse(X_train):
 90            X_train = X_train.log1p()
 91            X_test = X_test.log1p()
 92        else:
 93            X_train = np.log1p(X_train)
 94            X_test = np.log1p(X_test)
 95            
 96        #Center and scale count data
 97        train_means = np.mean(X_train, 0)
 98        train_sds = np.sqrt(var[variable_features])
 99
100        # Perform transformation on test data according to parameters 
101        # of the training data
102        X_train = (X_train - train_means) / train_sds
103        X_test = (X_test - train_means) / train_sds
104
105
106    if return_dense and scipy.sparse.issparse(X_train):
107        X_train = X_train.toarray()
108        X_test = X_test.toarray()
109
110
111    if orig_test is None:
112        return X_train
113    else:
114        return X_train, X_test

Function to preprocess data matrix according to type of data (e.g. counts/rna, or binary/atac). Will process test data according to parameters calculated from test data.

Parameters
  • X_train (np.ndarray | scipy.sparse._csc.csc_matrix): A scipy sparse or numpy array of cells x features in the training data.
  • X_test (np.ndarray | scipy.sparse._csc.csc_matrix): A scipy sparse or numpy array of cells x features in the testing data.
  • scale_data (bool): If True, data will be logarithmized then z-score transformed.
  • return_dense (bool): If True, a np.ndarray will be returned as opposed to a scipy.sparse object.
Returns
  • X_train, X_test (np.ndarray, np.ndarray): Numpy arrays with the process train/test data respectively. If X_test is None, only X_train is returned.
def svd_transformation( X_train: scipy.sparse._csc.csc_matrix | numpy.ndarray, X_test: scipy.sparse._csc.csc_matrix | numpy.ndarray | None = None):
117def svd_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
118                       X_test: scipy.sparse._csc.csc_matrix | 
119                       np.ndarray | None=None):
120    """
121    Returns matrices with SVD reduction. If `X_test is None`, only 
122    X_train is returned.
123
124    Parameters
125    ----------
126    X_train : np.ndarray
127        A 2D array of cells x features filtered to desired features 
128        for training data.
129
130    X_test : np.ndarray | None
131        A 2D array of cells x features filtered to desired features 
132        for testing data.
133    
134    Returns
135    -------
136    X_train, X_test : np.ndarray, np.ndarray
137        Transformed matrices. Only X_train is returned if 
138        `X_test is None`.
139    """
140    n_components = np.min([50, X_train.shape[1]])
141    SVD_func = TruncatedSVD(n_components = n_components, random_state = 1)
142    
143    # Remove first component as it corresponds with sequencing depth
144    # We convert to a csr_array because the SVD function is faster on this
145    # matrix type
146    X_train = SVD_func.fit_transform(scipy.sparse.csr_array(X_train))[:, 1:]
147
148    if X_test is not None:
149        X_test = SVD_func.transform(scipy.sparse.csr_array(X_test))[:, 1:]
150    
151    return X_train, X_test

Returns matrices with SVD reduction. If X_test is None, only X_train is returned.

Parameters
  • X_train (np.ndarray): A 2D array of cells x features filtered to desired features for training data.
  • X_test (np.ndarray | None): A 2D array of cells x features filtered to desired features for testing data.
Returns
  • X_train, X_test (np.ndarray, np.ndarray): Transformed matrices. Only X_train is returned if X_test is None.
def sample_cells( train_indices: numpy.ndarray, sample_size: int, seed_obj: numpy.random._generator.Generator):
154def sample_cells(train_indices: np.ndarray,
155                 sample_size: int,
156                 seed_obj: np.random._generator.Generator):
157    """
158    Samples cells indices from training indices for calculations.
159
160    Parameters
161    ----------
162    train_indices : np.ndarray
163        An array of indices to sample from.
164
165    sample_size : int
166        Number of samples to take from `train_indices`. Must be 
167        smaller than length of `train_indices`.
168
169    Returns
170    -------
171    indices : np.ndarray
172        The sampled indices from `train_indices`.
173    """
174    n_samples = np.min((train_indices.shape[0], sample_size))
175    indices = seed_obj.choice(train_indices, n_samples, replace = False)
176
177    return indices

Samples cells indices from training indices for calculations.

Parameters
  • train_indices (np.ndarray): An array of indices to sample from.
  • sample_size (int): Number of samples to take from train_indices. Must be smaller than length of train_indices.
Returns
  • indices (np.ndarray): The sampled indices from train_indices.
def pca_transformation( X_train: scipy.sparse._csc.csc_matrix | numpy.ndarray, X_test: scipy.sparse._csc.csc_matrix | numpy.ndarray | None = None):
180def pca_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray,
181                       X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None):
182    """
183    Returns matrices with PCA reduction. If `X_test is None`, only 
184    X_train is returned.
185
186    Parameters
187    ----------
188    X_train : scipy.sparse._csc.csc_matrix | np.ndarray
189        A 2D array of cells x features filtered to desired features 
190        for training data.
191
192    X_test : scipy.sparse._csc.csc_matrix | np.ndarray | None
193        A 2D array of cells x features filtered to desired features 
194        for testing data.
195    
196    Returns
197    -------
198    X_train, X_test : np.ndarray, np.ndarray
199        Transformed matrices. Only X_train is returned if 
200        `X_test is None`.
201    """
202    n_components = np.min([50, X_train.shape[1]])
203    PCA_func = PCA(n_components = n_components, random_state = 1)
204
205    X_train = PCA_func.fit_transform(np.asarray(X_train))
206
207    if X_test is not None:
208        X_test = PCA_func.transform(np.asarray(X_test))
209    
210    return X_train, X_test

Returns matrices with PCA reduction. If X_test is None, only X_train is returned.

Parameters
  • X_train (scipy.sparse._csc.csc_matrix | np.ndarray): A 2D array of cells x features filtered to desired features for training data.
  • X_test (scipy.sparse._csc.csc_matrix | np.ndarray | None): A 2D array of cells x features filtered to desired features for testing data.
Returns
  • X_train, X_test (np.ndarray, np.ndarray): Transformed matrices. Only X_train is returned if X_test is None.
def get_reduction(reduction: str):
221def get_reduction(reduction: str):
222    """
223    Function used to identify reduction type and return function to 
224    apply to data matrices.
225
226    Parameters
227    ----------
228    reduction : str
229        The reduction for data transformation. Options are `['pca', 
230        'svd', 'None']`.
231
232    Returns
233    -------
234    red_func : function
235        The function to reduce the data.
236    """
237    match reduction:
238        case 'pca':
239            red_func = pca_transformation
240        case 'svd':
241            red_func = svd_transformation
242        case 'None':
243            red_func = _no_transformation
244
245    return red_func

Function used to identify reduction type and return function to apply to data matrices.

Parameters
  • reduction (str): The reduction for data transformation. Options are ['pca', 'svd', 'None'].
Returns
  • red_func (function): The function to reduce the data.
def get_group_mat( adata: anndata._core.anndata.AnnData, n_features: int, group_features: numpy.ndarray, n_group_features: int, process_test: bool = False) -> numpy.ndarray:
248def get_group_mat(adata: ad.AnnData, n_features: int,
249                  group_features: np.ndarray,
250                  n_group_features: int, 
251                  process_test: bool=False) -> np.ndarray:
252    """
253    Filters to only features in group. Will sample features if 
254    `n_features < n_group_features`.
255
256    Parameters
257    ----------
258    adata : anndata.AnnData
259        anndata object with `'seed_obj'`, `'train_indices'`, and 
260        `'test_indices'` in `.uns`.
261
262    n_features : int
263        Maximum number of features to keep in matrix. Only 
264        impacts mat if `n_features < n_group_features`.
265    
266    group_features : list | tuple | np.ndarray
267        Feature names in group to filter matrices to.
268
269    n_group_features : int
270        Number of features in group.
271
272    n_samples : int
273        Number of samples to filter X_train to.
274
275    Returns
276    -------
277    X_train, X_test : np.ndarray, np.ndarray
278        Filtered matrices. If `n_samples` is provided, only `X_train` 
279        is returned. If `adata.uns['reduction']` is `'pca'` or 
280        `'svd'` the matrices are transformed before being returned.
281    """
282    # Getting reduction function
283    reduction_func = get_reduction(adata.uns['reduction'])
284
285    # Sample up to n_features features- important for scalability if 
286    # using large groupings
287    # Will use all features if the grouping contains fewer than n_features
288    number_features = np.min([n_features, n_group_features])
289    group_array = np.array(list(group_features))
290    group_features = adata.uns['seed_obj'].choice(group_array, 
291                                                  number_features, 
292                                                  replace = False) 
293
294    # Create data arrays containing only features within this group
295    if process_test:
296        X_train = adata[adata.uns['train_indices'],:][:, group_features].X
297        X_test = adata[adata.uns['test_indices'],:][:, group_features].X
298        X_train, X_test = reduction_func(X_train, X_test)
299        return X_train, X_test
300
301    else:
302        X_train = adata[:, group_features].X
303        return X_train

Filters to only features in group. Will sample features if n_features < n_group_features.

Parameters
  • adata (anndata.AnnData): anndata object with 'seed_obj', 'train_indices', and 'test_indices' in .uns.
  • n_features (int): Maximum number of features to keep in matrix. Only impacts mat if n_features < n_group_features.
  • group_features (list | tuple | np.ndarray): Feature names in group to filter matrices to.
  • n_group_features (int): Number of features in group.
  • n_samples (int): Number of samples to filter X_train to.
Returns
  • X_train, X_test (np.ndarray, np.ndarray): Filtered matrices. If n_samples is provided, only X_train is returned. If adata.uns['reduction'] is 'pca' or 'svd' the matrices are transformed before being returned.