scmkl.data_processing
1import numpy as np 2import scipy 3from sklearn.decomposition import TruncatedSVD, PCA 4import anndata as ad 5 6 7def sparse_var(X: scipy.sparse._csc.csc_matrix | np.ndarray, axis: int | None=None): 8 """ 9 Function to calculate variance on a scipy sparse matrix. 10 11 Parameters 12 ---------- 13 X : scipy.sparse._csc.csc_matrix | np.ndarray 14 A scipy sparse or numpy array 15 16 axis : int | None 17 Determines which axis variance is calculated on. Same usage 18 as Numpy. 19 20 Returns 21 ------- 22 var : np.ndarray | float 23 Variance values calculated over the given axis. 24 """ 25 # E[X^2] - E[X]^2 26 if scipy.sparse.issparse(X): 27 exp_mean = np.asarray(X.power(2).mean(axis = axis)).flatten() 28 sq_mean = np.asarray(np.square(X.mean(axis = axis))).flatten() 29 var = np.array(exp_mean - sq_mean) 30 else: 31 var = np.asarray(np.var(X, axis = axis)).flatten() 32 33 return var.ravel() 34 35 36def process_data(X_train: np.ndarray | scipy.sparse._csc.csc_matrix, 37 X_test: np.ndarray | scipy.sparse._csc.csc_matrix | None=None, 38 scale_data: bool=True, transform_data: bool=False, 39 return_dense: bool=True): 40 """ 41 Function to preprocess data matrix according to type of data 42 (e.g. counts/rna, or binary/atac). Will process test data 43 according to parameters calculated from test data. 44 45 Parameters 46 ---------- 47 X_train : np.ndarray | scipy.sparse._csc.csc_matrix 48 A scipy sparse or numpy array of cells x features in the 49 training data. 50 51 X_test : np.ndarray | scipy.sparse._csc.csc_matrix 52 A scipy sparse or numpy array of cells x features in the 53 testing data. 54 55 scale_data : bool 56 If `True`, data will be logarithmized then z-score 57 transformed. 58 59 transform_data : bool 60 If `True`, data will be log1p transformed (recommended for 61 counts data). Default is `False`. 62 63 return_dense: bool 64 If `True`, a np.ndarray will be returned as opposed to a 65 scipy.sparse object. 66 67 Returns 68 ------- 69 X_train, X_test : np.ndarray, np.ndarray 70 Numpy arrays with the process train/test data 71 respectively. If X_test is `None`, only X_train is returned. 72 """ 73 if X_test is None: 74 # Creates dummy matrix to for the sake of calculation without 75 # increasing computational time 76 X_test = X_train[:1,:] 77 orig_test = None 78 else: 79 orig_test = 'given' 80 81 # Remove features that have no variance in the training data 82 # (will be uniformative) 83 var = sparse_var(X_train, axis = 0) 84 variable_features = np.where(var > 1e-5)[0] 85 86 X_train = X_train[:,variable_features] 87 X_test = X_test[:, variable_features] 88 89 # Data processing according to data type 90 if transform_data: 91 92 if scipy.sparse.issparse(X_train): 93 X_train = X_train.log1p() 94 X_test = X_test.log1p() 95 else: 96 X_train = np.log1p(X_train) 97 X_test = np.log1p(X_test) 98 99 if scale_data: 100 #Center and scale count data 101 train_means = np.mean(X_train, 0) 102 train_sds = np.sqrt(var[variable_features]) 103 104 # Perform transformation on test data according to parameters 105 # of the training data 106 X_train = (X_train - train_means) / train_sds 107 X_test = (X_test - train_means) / train_sds 108 109 110 if return_dense and scipy.sparse.issparse(X_train): 111 X_train = X_train.toarray() 112 X_test = X_test.toarray() 113 114 115 if orig_test is None: 116 return X_train 117 else: 118 return X_train, X_test 119 120 121def svd_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 122 X_test: scipy.sparse._csc.csc_matrix | 123 np.ndarray | None=None): 124 """ 125 Returns matrices with SVD reduction. If `X_test is None`, only 126 X_train is returned. 127 128 Parameters 129 ---------- 130 X_train : np.ndarray 131 A 2D array of cells x features filtered to desired features 132 for training data. 133 134 X_test : np.ndarray | None 135 A 2D array of cells x features filtered to desired features 136 for testing data. 137 138 Returns 139 ------- 140 X_train, X_test : np.ndarray, np.ndarray 141 Transformed matrices. Only X_train is returned if 142 `X_test is None`. 143 """ 144 n_components = np.min([50, X_train.shape[1]]) 145 SVD_func = TruncatedSVD(n_components = n_components, random_state = 1) 146 147 # Remove first component as it corresponds with sequencing depth 148 # We convert to a csr_array because the SVD function is faster on this 149 # matrix type 150 X_train = SVD_func.fit_transform(scipy.sparse.csr_array(X_train))[:, 1:] 151 152 if X_test is not None: 153 X_test = SVD_func.transform(scipy.sparse.csr_array(X_test))[:, 1:] 154 155 return X_train, X_test 156 157 158def sample_cells(train_indices: np.ndarray, 159 sample_size: int, 160 seed_obj: np.random._generator.Generator): 161 """ 162 Samples cells indices from training indices for calculations. 163 164 Parameters 165 ---------- 166 train_indices : np.ndarray 167 An array of indices to sample from. 168 169 sample_size : int 170 Number of samples to take from `train_indices`. Must be 171 smaller than length of `train_indices`. 172 173 Returns 174 ------- 175 indices : np.ndarray 176 The sampled indices from `train_indices`. 177 """ 178 n_samples = np.min((train_indices.shape[0], sample_size)) 179 indices = seed_obj.choice(train_indices, n_samples, replace = False) 180 181 return indices 182 183 184def pca_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 185 X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None): 186 """ 187 Returns matrices with PCA reduction. If `X_test is None`, only 188 X_train is returned. 189 190 Parameters 191 ---------- 192 X_train : scipy.sparse._csc.csc_matrix | np.ndarray 193 A 2D array of cells x features filtered to desired features 194 for training data. 195 196 X_test : scipy.sparse._csc.csc_matrix | np.ndarray | None 197 A 2D array of cells x features filtered to desired features 198 for testing data. 199 200 Returns 201 ------- 202 X_train, X_test : np.ndarray, np.ndarray 203 Transformed matrices. Only X_train is returned if 204 `X_test is None`. 205 """ 206 n_components = np.min([50, X_train.shape[1]]) 207 PCA_func = PCA(n_components = n_components, random_state = 1) 208 209 X_train = PCA_func.fit_transform(np.asarray(X_train)) 210 211 if X_test is not None: 212 X_test = PCA_func.transform(np.asarray(X_test)) 213 214 return X_train, X_test 215 216 217def _no_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 218 X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None): 219 """ 220 Dummy function used to return mat inputs. 221 """ 222 return X_train, X_test 223 224 225def get_reduction(reduction: str): 226 """ 227 Function used to identify reduction type and return function to 228 apply to data matrices. 229 230 Parameters 231 ---------- 232 reduction : str 233 The reduction for data transformation. Options are `['pca', 234 'svd', 'None']`. 235 236 Returns 237 ------- 238 red_func : function 239 The function to reduce the data. 240 """ 241 match reduction: 242 case 'pca': 243 red_func = pca_transformation 244 case 'svd': 245 red_func = svd_transformation 246 case 'None': 247 red_func = _no_transformation 248 249 return red_func 250 251 252def get_group_mat(adata: ad.AnnData, n_features: int, 253 group_features: np.ndarray, 254 n_group_features: int, 255 process_test: bool=False) -> np.ndarray: 256 """ 257 Filters to only features in group. Will sample features if 258 `n_features < n_group_features`. 259 260 Parameters 261 ---------- 262 adata : anndata.AnnData 263 anndata object with `'seed_obj'`, `'train_indices'`, and 264 `'test_indices'` in `.uns`. 265 266 n_features : int 267 Maximum number of features to keep in matrix. Only 268 impacts mat if `n_features < n_group_features`. 269 270 group_features : list | tuple | np.ndarray 271 Feature names in group to filter matrices to. 272 273 n_group_features : int 274 Number of features in group. 275 276 n_samples : int 277 Number of samples to filter X_train to. 278 279 Returns 280 ------- 281 X_train, X_test : np.ndarray, np.ndarray 282 Filtered matrices. If `n_samples` is provided, only `X_train` 283 is returned. If `adata.uns['reduction']` is `'pca'` or 284 `'svd'` the matrices are transformed before being returned. 285 """ 286 # Getting reduction function 287 reduction_func = get_reduction(adata.uns['reduction']) 288 289 # Sample up to n_features features- important for scalability if 290 # using large groupings 291 # Will use all features if the grouping contains fewer than n_features 292 number_features = np.min([n_features, n_group_features]) 293 group_array = np.array(list(group_features)) 294 group_features = adata.uns['seed_obj'].choice(group_array, 295 number_features, 296 replace = False) 297 298 # Create data arrays containing only features within this group 299 if process_test: 300 X_train = adata[adata.uns['train_indices'],:][:, group_features].X 301 X_test = adata[adata.uns['test_indices'],:][:, group_features].X 302 X_train, X_test = reduction_func(X_train, X_test) 303 return X_train, X_test 304 305 else: 306 X_train = adata[:, group_features].X 307 return X_train
def
sparse_var( X: scipy.sparse._csc.csc_matrix | numpy.ndarray, axis: int | None = None):
8def sparse_var(X: scipy.sparse._csc.csc_matrix | np.ndarray, axis: int | None=None): 9 """ 10 Function to calculate variance on a scipy sparse matrix. 11 12 Parameters 13 ---------- 14 X : scipy.sparse._csc.csc_matrix | np.ndarray 15 A scipy sparse or numpy array 16 17 axis : int | None 18 Determines which axis variance is calculated on. Same usage 19 as Numpy. 20 21 Returns 22 ------- 23 var : np.ndarray | float 24 Variance values calculated over the given axis. 25 """ 26 # E[X^2] - E[X]^2 27 if scipy.sparse.issparse(X): 28 exp_mean = np.asarray(X.power(2).mean(axis = axis)).flatten() 29 sq_mean = np.asarray(np.square(X.mean(axis = axis))).flatten() 30 var = np.array(exp_mean - sq_mean) 31 else: 32 var = np.asarray(np.var(X, axis = axis)).flatten() 33 34 return var.ravel()
Function to calculate variance on a scipy sparse matrix.
Parameters
- X (scipy.sparse._csc.csc_matrix | np.ndarray): A scipy sparse or numpy array
- axis (int | None): Determines which axis variance is calculated on. Same usage as Numpy.
Returns
- var (np.ndarray | float): Variance values calculated over the given axis.
def
process_data( X_train: numpy.ndarray | scipy.sparse._csc.csc_matrix, X_test: numpy.ndarray | scipy.sparse._csc.csc_matrix | None = None, scale_data: bool = True, transform_data: bool = False, return_dense: bool = True):
37def process_data(X_train: np.ndarray | scipy.sparse._csc.csc_matrix, 38 X_test: np.ndarray | scipy.sparse._csc.csc_matrix | None=None, 39 scale_data: bool=True, transform_data: bool=False, 40 return_dense: bool=True): 41 """ 42 Function to preprocess data matrix according to type of data 43 (e.g. counts/rna, or binary/atac). Will process test data 44 according to parameters calculated from test data. 45 46 Parameters 47 ---------- 48 X_train : np.ndarray | scipy.sparse._csc.csc_matrix 49 A scipy sparse or numpy array of cells x features in the 50 training data. 51 52 X_test : np.ndarray | scipy.sparse._csc.csc_matrix 53 A scipy sparse or numpy array of cells x features in the 54 testing data. 55 56 scale_data : bool 57 If `True`, data will be logarithmized then z-score 58 transformed. 59 60 transform_data : bool 61 If `True`, data will be log1p transformed (recommended for 62 counts data). Default is `False`. 63 64 return_dense: bool 65 If `True`, a np.ndarray will be returned as opposed to a 66 scipy.sparse object. 67 68 Returns 69 ------- 70 X_train, X_test : np.ndarray, np.ndarray 71 Numpy arrays with the process train/test data 72 respectively. If X_test is `None`, only X_train is returned. 73 """ 74 if X_test is None: 75 # Creates dummy matrix to for the sake of calculation without 76 # increasing computational time 77 X_test = X_train[:1,:] 78 orig_test = None 79 else: 80 orig_test = 'given' 81 82 # Remove features that have no variance in the training data 83 # (will be uniformative) 84 var = sparse_var(X_train, axis = 0) 85 variable_features = np.where(var > 1e-5)[0] 86 87 X_train = X_train[:,variable_features] 88 X_test = X_test[:, variable_features] 89 90 # Data processing according to data type 91 if transform_data: 92 93 if scipy.sparse.issparse(X_train): 94 X_train = X_train.log1p() 95 X_test = X_test.log1p() 96 else: 97 X_train = np.log1p(X_train) 98 X_test = np.log1p(X_test) 99 100 if scale_data: 101 #Center and scale count data 102 train_means = np.mean(X_train, 0) 103 train_sds = np.sqrt(var[variable_features]) 104 105 # Perform transformation on test data according to parameters 106 # of the training data 107 X_train = (X_train - train_means) / train_sds 108 X_test = (X_test - train_means) / train_sds 109 110 111 if return_dense and scipy.sparse.issparse(X_train): 112 X_train = X_train.toarray() 113 X_test = X_test.toarray() 114 115 116 if orig_test is None: 117 return X_train 118 else: 119 return X_train, X_test
Function to preprocess data matrix according to type of data (e.g. counts/rna, or binary/atac). Will process test data according to parameters calculated from test data.
Parameters
- X_train (np.ndarray | scipy.sparse._csc.csc_matrix): A scipy sparse or numpy array of cells x features in the training data.
- X_test (np.ndarray | scipy.sparse._csc.csc_matrix): A scipy sparse or numpy array of cells x features in the testing data.
- scale_data (bool):
If
True, data will be logarithmized then z-score transformed. - transform_data (bool):
If
True, data will be log1p transformed (recommended for counts data). Default isFalse. - return_dense (bool):
If
True, a np.ndarray will be returned as opposed to a scipy.sparse object.
Returns
- X_train, X_test (np.ndarray, np.ndarray):
Numpy arrays with the process train/test data
respectively. If X_test is
None, only X_train is returned.
def
svd_transformation( X_train: scipy.sparse._csc.csc_matrix | numpy.ndarray, X_test: scipy.sparse._csc.csc_matrix | numpy.ndarray | None = None):
122def svd_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 123 X_test: scipy.sparse._csc.csc_matrix | 124 np.ndarray | None=None): 125 """ 126 Returns matrices with SVD reduction. If `X_test is None`, only 127 X_train is returned. 128 129 Parameters 130 ---------- 131 X_train : np.ndarray 132 A 2D array of cells x features filtered to desired features 133 for training data. 134 135 X_test : np.ndarray | None 136 A 2D array of cells x features filtered to desired features 137 for testing data. 138 139 Returns 140 ------- 141 X_train, X_test : np.ndarray, np.ndarray 142 Transformed matrices. Only X_train is returned if 143 `X_test is None`. 144 """ 145 n_components = np.min([50, X_train.shape[1]]) 146 SVD_func = TruncatedSVD(n_components = n_components, random_state = 1) 147 148 # Remove first component as it corresponds with sequencing depth 149 # We convert to a csr_array because the SVD function is faster on this 150 # matrix type 151 X_train = SVD_func.fit_transform(scipy.sparse.csr_array(X_train))[:, 1:] 152 153 if X_test is not None: 154 X_test = SVD_func.transform(scipy.sparse.csr_array(X_test))[:, 1:] 155 156 return X_train, X_test
Returns matrices with SVD reduction. If X_test is None, only
X_train is returned.
Parameters
- X_train (np.ndarray): A 2D array of cells x features filtered to desired features for training data.
- X_test (np.ndarray | None): A 2D array of cells x features filtered to desired features for testing data.
Returns
- X_train, X_test (np.ndarray, np.ndarray):
Transformed matrices. Only X_train is returned if
X_test is None.
def
sample_cells( train_indices: numpy.ndarray, sample_size: int, seed_obj: numpy.random._generator.Generator):
159def sample_cells(train_indices: np.ndarray, 160 sample_size: int, 161 seed_obj: np.random._generator.Generator): 162 """ 163 Samples cells indices from training indices for calculations. 164 165 Parameters 166 ---------- 167 train_indices : np.ndarray 168 An array of indices to sample from. 169 170 sample_size : int 171 Number of samples to take from `train_indices`. Must be 172 smaller than length of `train_indices`. 173 174 Returns 175 ------- 176 indices : np.ndarray 177 The sampled indices from `train_indices`. 178 """ 179 n_samples = np.min((train_indices.shape[0], sample_size)) 180 indices = seed_obj.choice(train_indices, n_samples, replace = False) 181 182 return indices
Samples cells indices from training indices for calculations.
Parameters
- train_indices (np.ndarray): An array of indices to sample from.
- sample_size (int):
Number of samples to take from
train_indices. Must be smaller than length oftrain_indices.
Returns
- indices (np.ndarray):
The sampled indices from
train_indices.
def
pca_transformation( X_train: scipy.sparse._csc.csc_matrix | numpy.ndarray, X_test: scipy.sparse._csc.csc_matrix | numpy.ndarray | None = None):
185def pca_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 186 X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None): 187 """ 188 Returns matrices with PCA reduction. If `X_test is None`, only 189 X_train is returned. 190 191 Parameters 192 ---------- 193 X_train : scipy.sparse._csc.csc_matrix | np.ndarray 194 A 2D array of cells x features filtered to desired features 195 for training data. 196 197 X_test : scipy.sparse._csc.csc_matrix | np.ndarray | None 198 A 2D array of cells x features filtered to desired features 199 for testing data. 200 201 Returns 202 ------- 203 X_train, X_test : np.ndarray, np.ndarray 204 Transformed matrices. Only X_train is returned if 205 `X_test is None`. 206 """ 207 n_components = np.min([50, X_train.shape[1]]) 208 PCA_func = PCA(n_components = n_components, random_state = 1) 209 210 X_train = PCA_func.fit_transform(np.asarray(X_train)) 211 212 if X_test is not None: 213 X_test = PCA_func.transform(np.asarray(X_test)) 214 215 return X_train, X_test
Returns matrices with PCA reduction. If X_test is None, only
X_train is returned.
Parameters
- X_train (scipy.sparse._csc.csc_matrix | np.ndarray): A 2D array of cells x features filtered to desired features for training data.
- X_test (scipy.sparse._csc.csc_matrix | np.ndarray | None): A 2D array of cells x features filtered to desired features for testing data.
Returns
- X_train, X_test (np.ndarray, np.ndarray):
Transformed matrices. Only X_train is returned if
X_test is None.
def
get_reduction(reduction: str):
226def get_reduction(reduction: str): 227 """ 228 Function used to identify reduction type and return function to 229 apply to data matrices. 230 231 Parameters 232 ---------- 233 reduction : str 234 The reduction for data transformation. Options are `['pca', 235 'svd', 'None']`. 236 237 Returns 238 ------- 239 red_func : function 240 The function to reduce the data. 241 """ 242 match reduction: 243 case 'pca': 244 red_func = pca_transformation 245 case 'svd': 246 red_func = svd_transformation 247 case 'None': 248 red_func = _no_transformation 249 250 return red_func
Function used to identify reduction type and return function to apply to data matrices.
Parameters
- reduction (str):
The reduction for data transformation. Options are
['pca', 'svd', 'None'].
Returns
- red_func (function): The function to reduce the data.
def
get_group_mat( adata: anndata._core.anndata.AnnData, n_features: int, group_features: numpy.ndarray, n_group_features: int, process_test: bool = False) -> numpy.ndarray:
253def get_group_mat(adata: ad.AnnData, n_features: int, 254 group_features: np.ndarray, 255 n_group_features: int, 256 process_test: bool=False) -> np.ndarray: 257 """ 258 Filters to only features in group. Will sample features if 259 `n_features < n_group_features`. 260 261 Parameters 262 ---------- 263 adata : anndata.AnnData 264 anndata object with `'seed_obj'`, `'train_indices'`, and 265 `'test_indices'` in `.uns`. 266 267 n_features : int 268 Maximum number of features to keep in matrix. Only 269 impacts mat if `n_features < n_group_features`. 270 271 group_features : list | tuple | np.ndarray 272 Feature names in group to filter matrices to. 273 274 n_group_features : int 275 Number of features in group. 276 277 n_samples : int 278 Number of samples to filter X_train to. 279 280 Returns 281 ------- 282 X_train, X_test : np.ndarray, np.ndarray 283 Filtered matrices. If `n_samples` is provided, only `X_train` 284 is returned. If `adata.uns['reduction']` is `'pca'` or 285 `'svd'` the matrices are transformed before being returned. 286 """ 287 # Getting reduction function 288 reduction_func = get_reduction(adata.uns['reduction']) 289 290 # Sample up to n_features features- important for scalability if 291 # using large groupings 292 # Will use all features if the grouping contains fewer than n_features 293 number_features = np.min([n_features, n_group_features]) 294 group_array = np.array(list(group_features)) 295 group_features = adata.uns['seed_obj'].choice(group_array, 296 number_features, 297 replace = False) 298 299 # Create data arrays containing only features within this group 300 if process_test: 301 X_train = adata[adata.uns['train_indices'],:][:, group_features].X 302 X_test = adata[adata.uns['test_indices'],:][:, group_features].X 303 X_train, X_test = reduction_func(X_train, X_test) 304 return X_train, X_test 305 306 else: 307 X_train = adata[:, group_features].X 308 return X_train
Filters to only features in group. Will sample features if
n_features < n_group_features.
Parameters
- adata (anndata.AnnData):
anndata object with
'seed_obj','train_indices', and'test_indices'in.uns. - n_features (int):
Maximum number of features to keep in matrix. Only
impacts mat if
n_features < n_group_features. - group_features (list | tuple | np.ndarray): Feature names in group to filter matrices to.
- n_group_features (int): Number of features in group.
- n_samples (int): Number of samples to filter X_train to.
Returns
- X_train, X_test (np.ndarray, np.ndarray):
Filtered matrices. If
n_samplesis provided, onlyX_trainis returned. Ifadata.uns['reduction']is'pca'or'svd'the matrices are transformed before being returned.