scmkl.data_processing
1import numpy as np 2import scipy 3from sklearn.decomposition import TruncatedSVD, PCA 4import anndata as ad 5 6 7def sparse_var(X: scipy.sparse._csc.csc_matrix | np.ndarray, axis: int | None=None): 8 """ 9 Function to calculate variance on a scipy sparse matrix. 10 11 Parameters 12 ---------- 13 X : scipy.sparse._csc.csc_matrix | np.ndarray 14 A scipy sparse or numpy array 15 16 axis : int | None 17 Determines which axis variance is calculated on. Same usage 18 as Numpy. 19 20 Returns 21 ------- 22 var : np.ndarray | float 23 Variance values calculated over the given axis. 24 """ 25 # E[X^2] - E[X]^2 26 if scipy.sparse.issparse(X): 27 exp_mean = np.asarray(X.power(2).mean(axis = axis)).flatten() 28 sq_mean = np.asarray(np.square(X.mean(axis = axis))).flatten() 29 var = np.array(exp_mean - sq_mean) 30 else: 31 var = np.asarray(np.var(X, axis = axis)).flatten() 32 33 return var.ravel() 34 35 36def process_data(X_train: np.ndarray | scipy.sparse._csc.csc_matrix, 37 X_test: np.ndarray | scipy.sparse._csc.csc_matrix | None=None, 38 scale_data: bool=True, 39 return_dense: bool=True): 40 """ 41 Function to preprocess data matrix according to type of data 42 (e.g. counts/rna, or binary/atac). Will process test data 43 according to parameters calculated from test data. 44 45 Parameters 46 ---------- 47 X_train : np.ndarray | scipy.sparse._csc.csc_matrix 48 A scipy sparse or numpy array of cells x features in the 49 training data. 50 51 X_test : np.ndarray | scipy.sparse._csc.csc_matrix 52 A scipy sparse or numpy array of cells x features in the 53 testing data. 54 55 scale_data : bool 56 If `True`, data will be logarithmized then z-score 57 transformed. 58 59 return_dense: bool 60 If `True`, a np.ndarray will be returned as opposed to a 61 scipy.sparse object. 62 63 Returns 64 ------- 65 X_train, X_test : np.ndarray, np.ndarray 66 Numpy arrays with the process train/test data 67 respectively. If X_test is `None`, only X_train is returned. 68 """ 69 if X_test is None: 70 # Creates dummy matrix to for the sake of calculation without 71 # increasing computational time 72 X_test = X_train[:1,:] 73 orig_test = None 74 else: 75 orig_test = 'given' 76 77 # Remove features that have no variance in the training data 78 # (will be uniformative) 79 var = sparse_var(X_train, axis = 0) 80 variable_features = np.where(var > 1e-5)[0] 81 82 X_train = X_train[:,variable_features] 83 X_test = X_test[:, variable_features] 84 85 # Data processing according to data type 86 if scale_data: 87 88 if scipy.sparse.issparse(X_train): 89 X_train = X_train.log1p() 90 X_test = X_test.log1p() 91 else: 92 X_train = np.log1p(X_train) 93 X_test = np.log1p(X_test) 94 95 #Center and scale count data 96 train_means = np.mean(X_train, 0) 97 train_sds = np.sqrt(var[variable_features]) 98 99 # Perform transformation on test data according to parameters 100 # of the training data 101 X_train = (X_train - train_means) / train_sds 102 X_test = (X_test - train_means) / train_sds 103 104 105 if return_dense and scipy.sparse.issparse(X_train): 106 X_train = X_train.toarray() 107 X_test = X_test.toarray() 108 109 110 if orig_test is None: 111 return X_train 112 else: 113 return X_train, X_test 114 115 116def svd_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 117 X_test: scipy.sparse._csc.csc_matrix | 118 np.ndarray | None=None): 119 """ 120 Returns matrices with SVD reduction. If `X_test is None`, only 121 X_train is returned. 122 123 Parameters 124 ---------- 125 X_train : np.ndarray 126 A 2D array of cells x features filtered to desired features 127 for training data. 128 129 X_test : np.ndarray | None 130 A 2D array of cells x features filtered to desired features 131 for testing data. 132 133 Returns 134 ------- 135 X_train, X_test : np.ndarray, np.ndarray 136 Transformed matrices. Only X_train is returned if 137 `X_test is None`. 138 """ 139 n_components = np.min([50, X_train.shape[1]]) 140 SVD_func = TruncatedSVD(n_components = n_components, random_state = 1) 141 142 # Remove first component as it corresponds with sequencing depth 143 # We convert to a csr_array because the SVD function is faster on this 144 # matrix type 145 X_train = SVD_func.fit_transform(scipy.sparse.csr_array(X_train))[:, 1:] 146 147 if X_test is not None: 148 X_test = SVD_func.transform(scipy.sparse.csr_array(X_test))[:, 1:] 149 150 return X_train, X_test 151 152 153def sample_cells(train_indices: np.ndarray, 154 sample_size: int, 155 seed_obj: np.random._generator.Generator): 156 """ 157 Samples cells indices from training indices for calculations. 158 159 Parameters 160 ---------- 161 train_indices : np.ndarray 162 An array of indices to sample from. 163 164 sample_size : int 165 Number of samples to take from `train_indices`. Must be 166 smaller than length of `train_indices`. 167 168 Returns 169 ------- 170 indices : np.ndarray 171 The sampled indices from `train_indices`. 172 """ 173 n_samples = np.min((train_indices.shape[0], sample_size)) 174 indices = seed_obj.choice(train_indices, n_samples, replace = False) 175 176 return indices 177 178 179def pca_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 180 X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None): 181 """ 182 Returns matrices with PCA reduction. If `X_test is None`, only 183 X_train is returned. 184 185 Parameters 186 ---------- 187 X_train : scipy.sparse._csc.csc_matrix | np.ndarray 188 A 2D array of cells x features filtered to desired features 189 for training data. 190 191 X_test : scipy.sparse._csc.csc_matrix | np.ndarray | None 192 A 2D array of cells x features filtered to desired features 193 for testing data. 194 195 Returns 196 ------- 197 X_train, X_test : np.ndarray, np.ndarray 198 Transformed matrices. Only X_train is returned if 199 `X_test is None`. 200 """ 201 n_components = np.min([50, X_train.shape[1]]) 202 PCA_func = PCA(n_components = n_components, random_state = 1) 203 204 X_train = PCA_func.fit_transform(np.asarray(X_train)) 205 206 if X_test is not None: 207 X_test = PCA_func.transform(np.asarray(X_test)) 208 209 return X_train, X_test 210 211 212def _no_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 213 X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None): 214 """ 215 Dummy function used to return mat inputs. 216 """ 217 return X_train, X_test 218 219 220def get_reduction(reduction: str): 221 """ 222 Function used to identify reduction type and return function to 223 apply to data matrices. 224 225 Parameters 226 ---------- 227 reduction : str 228 The reduction for data transformation. Options are `['pca', 229 'svd', 'None']`. 230 231 Returns 232 ------- 233 red_func : function 234 The function to reduce the data. 235 """ 236 match reduction: 237 case 'pca': 238 red_func = pca_transformation 239 case 'svd': 240 red_func = svd_transformation 241 case 'None': 242 red_func = _no_transformation 243 244 return red_func 245 246 247def get_group_mat(adata: ad.AnnData, n_features: int, 248 group_features: np.ndarray, 249 n_group_features: int, 250 process_test: bool=False) -> np.ndarray: 251 """ 252 Filters to only features in group. Will sample features if 253 `n_features < n_group_features`. 254 255 Parameters 256 ---------- 257 adata : anndata.AnnData 258 anndata object with `'seed_obj'`, `'train_indices'`, and 259 `'test_indices'` in `.uns`. 260 261 n_features : int 262 Maximum number of features to keep in matrix. Only 263 impacts mat if `n_features < n_group_features`. 264 265 group_features : list | tuple | np.ndarray 266 Feature names in group to filter matrices to. 267 268 n_group_features : int 269 Number of features in group. 270 271 n_samples : int 272 Number of samples to filter X_train to. 273 274 Returns 275 ------- 276 X_train, X_test : np.ndarray, np.ndarray 277 Filtered matrices. If `n_samples` is provided, only `X_train` 278 is returned. If `adata.uns['reduction']` is `'pca'` or 279 `'svd'` the matrices are transformed before being returned. 280 """ 281 # Getting reduction function 282 reduction_func = get_reduction(adata.uns['reduction']) 283 284 # Sample up to n_features features- important for scalability if 285 # using large groupings 286 # Will use all features if the grouping contains fewer than n_features 287 number_features = np.min([n_features, n_group_features]) 288 group_array = np.array(list(group_features)) 289 group_features = adata.uns['seed_obj'].choice(group_array, 290 number_features, 291 replace = False) 292 293 # Create data arrays containing only features within this group 294 if process_test: 295 X_train = adata[adata.uns['train_indices'],:][:, group_features].X 296 X_test = adata[adata.uns['test_indices'],:][:, group_features].X 297 X_train, X_test = reduction_func(X_train, X_test) 298 return X_train, X_test 299 300 else: 301 X_train = adata[:, group_features].X 302 return X_train
def
sparse_var( X: scipy.sparse._csc.csc_matrix | numpy.ndarray, axis: int | None = None):
8def sparse_var(X: scipy.sparse._csc.csc_matrix | np.ndarray, axis: int | None=None): 9 """ 10 Function to calculate variance on a scipy sparse matrix. 11 12 Parameters 13 ---------- 14 X : scipy.sparse._csc.csc_matrix | np.ndarray 15 A scipy sparse or numpy array 16 17 axis : int | None 18 Determines which axis variance is calculated on. Same usage 19 as Numpy. 20 21 Returns 22 ------- 23 var : np.ndarray | float 24 Variance values calculated over the given axis. 25 """ 26 # E[X^2] - E[X]^2 27 if scipy.sparse.issparse(X): 28 exp_mean = np.asarray(X.power(2).mean(axis = axis)).flatten() 29 sq_mean = np.asarray(np.square(X.mean(axis = axis))).flatten() 30 var = np.array(exp_mean - sq_mean) 31 else: 32 var = np.asarray(np.var(X, axis = axis)).flatten() 33 34 return var.ravel()
Function to calculate variance on a scipy sparse matrix.
Parameters
- X (scipy.sparse._csc.csc_matrix | np.ndarray): A scipy sparse or numpy array
- axis (int | None): Determines which axis variance is calculated on. Same usage as Numpy.
Returns
- var (np.ndarray | float): Variance values calculated over the given axis.
def
process_data( X_train: numpy.ndarray | scipy.sparse._csc.csc_matrix, X_test: numpy.ndarray | scipy.sparse._csc.csc_matrix | None = None, scale_data: bool = True, return_dense: bool = True):
37def process_data(X_train: np.ndarray | scipy.sparse._csc.csc_matrix, 38 X_test: np.ndarray | scipy.sparse._csc.csc_matrix | None=None, 39 scale_data: bool=True, 40 return_dense: bool=True): 41 """ 42 Function to preprocess data matrix according to type of data 43 (e.g. counts/rna, or binary/atac). Will process test data 44 according to parameters calculated from test data. 45 46 Parameters 47 ---------- 48 X_train : np.ndarray | scipy.sparse._csc.csc_matrix 49 A scipy sparse or numpy array of cells x features in the 50 training data. 51 52 X_test : np.ndarray | scipy.sparse._csc.csc_matrix 53 A scipy sparse or numpy array of cells x features in the 54 testing data. 55 56 scale_data : bool 57 If `True`, data will be logarithmized then z-score 58 transformed. 59 60 return_dense: bool 61 If `True`, a np.ndarray will be returned as opposed to a 62 scipy.sparse object. 63 64 Returns 65 ------- 66 X_train, X_test : np.ndarray, np.ndarray 67 Numpy arrays with the process train/test data 68 respectively. If X_test is `None`, only X_train is returned. 69 """ 70 if X_test is None: 71 # Creates dummy matrix to for the sake of calculation without 72 # increasing computational time 73 X_test = X_train[:1,:] 74 orig_test = None 75 else: 76 orig_test = 'given' 77 78 # Remove features that have no variance in the training data 79 # (will be uniformative) 80 var = sparse_var(X_train, axis = 0) 81 variable_features = np.where(var > 1e-5)[0] 82 83 X_train = X_train[:,variable_features] 84 X_test = X_test[:, variable_features] 85 86 # Data processing according to data type 87 if scale_data: 88 89 if scipy.sparse.issparse(X_train): 90 X_train = X_train.log1p() 91 X_test = X_test.log1p() 92 else: 93 X_train = np.log1p(X_train) 94 X_test = np.log1p(X_test) 95 96 #Center and scale count data 97 train_means = np.mean(X_train, 0) 98 train_sds = np.sqrt(var[variable_features]) 99 100 # Perform transformation on test data according to parameters 101 # of the training data 102 X_train = (X_train - train_means) / train_sds 103 X_test = (X_test - train_means) / train_sds 104 105 106 if return_dense and scipy.sparse.issparse(X_train): 107 X_train = X_train.toarray() 108 X_test = X_test.toarray() 109 110 111 if orig_test is None: 112 return X_train 113 else: 114 return X_train, X_test
Function to preprocess data matrix according to type of data (e.g. counts/rna, or binary/atac). Will process test data according to parameters calculated from test data.
Parameters
- X_train (np.ndarray | scipy.sparse._csc.csc_matrix): A scipy sparse or numpy array of cells x features in the training data.
- X_test (np.ndarray | scipy.sparse._csc.csc_matrix): A scipy sparse or numpy array of cells x features in the testing data.
- scale_data (bool):
If
True
, data will be logarithmized then z-score transformed. - return_dense (bool):
If
True
, a np.ndarray will be returned as opposed to a scipy.sparse object.
Returns
- X_train, X_test (np.ndarray, np.ndarray):
Numpy arrays with the process train/test data
respectively. If X_test is
None
, only X_train is returned.
def
svd_transformation( X_train: scipy.sparse._csc.csc_matrix | numpy.ndarray, X_test: scipy.sparse._csc.csc_matrix | numpy.ndarray | None = None):
117def svd_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 118 X_test: scipy.sparse._csc.csc_matrix | 119 np.ndarray | None=None): 120 """ 121 Returns matrices with SVD reduction. If `X_test is None`, only 122 X_train is returned. 123 124 Parameters 125 ---------- 126 X_train : np.ndarray 127 A 2D array of cells x features filtered to desired features 128 for training data. 129 130 X_test : np.ndarray | None 131 A 2D array of cells x features filtered to desired features 132 for testing data. 133 134 Returns 135 ------- 136 X_train, X_test : np.ndarray, np.ndarray 137 Transformed matrices. Only X_train is returned if 138 `X_test is None`. 139 """ 140 n_components = np.min([50, X_train.shape[1]]) 141 SVD_func = TruncatedSVD(n_components = n_components, random_state = 1) 142 143 # Remove first component as it corresponds with sequencing depth 144 # We convert to a csr_array because the SVD function is faster on this 145 # matrix type 146 X_train = SVD_func.fit_transform(scipy.sparse.csr_array(X_train))[:, 1:] 147 148 if X_test is not None: 149 X_test = SVD_func.transform(scipy.sparse.csr_array(X_test))[:, 1:] 150 151 return X_train, X_test
Returns matrices with SVD reduction. If X_test is None
, only
X_train is returned.
Parameters
- X_train (np.ndarray): A 2D array of cells x features filtered to desired features for training data.
- X_test (np.ndarray | None): A 2D array of cells x features filtered to desired features for testing data.
Returns
- X_train, X_test (np.ndarray, np.ndarray):
Transformed matrices. Only X_train is returned if
X_test is None
.
def
sample_cells( train_indices: numpy.ndarray, sample_size: int, seed_obj: numpy.random._generator.Generator):
154def sample_cells(train_indices: np.ndarray, 155 sample_size: int, 156 seed_obj: np.random._generator.Generator): 157 """ 158 Samples cells indices from training indices for calculations. 159 160 Parameters 161 ---------- 162 train_indices : np.ndarray 163 An array of indices to sample from. 164 165 sample_size : int 166 Number of samples to take from `train_indices`. Must be 167 smaller than length of `train_indices`. 168 169 Returns 170 ------- 171 indices : np.ndarray 172 The sampled indices from `train_indices`. 173 """ 174 n_samples = np.min((train_indices.shape[0], sample_size)) 175 indices = seed_obj.choice(train_indices, n_samples, replace = False) 176 177 return indices
Samples cells indices from training indices for calculations.
Parameters
- train_indices (np.ndarray): An array of indices to sample from.
- sample_size (int):
Number of samples to take from
train_indices
. Must be smaller than length oftrain_indices
.
Returns
- indices (np.ndarray):
The sampled indices from
train_indices
.
def
pca_transformation( X_train: scipy.sparse._csc.csc_matrix | numpy.ndarray, X_test: scipy.sparse._csc.csc_matrix | numpy.ndarray | None = None):
180def pca_transformation(X_train: scipy.sparse._csc.csc_matrix | np.ndarray, 181 X_test: scipy.sparse._csc.csc_matrix | np.ndarray | None=None): 182 """ 183 Returns matrices with PCA reduction. If `X_test is None`, only 184 X_train is returned. 185 186 Parameters 187 ---------- 188 X_train : scipy.sparse._csc.csc_matrix | np.ndarray 189 A 2D array of cells x features filtered to desired features 190 for training data. 191 192 X_test : scipy.sparse._csc.csc_matrix | np.ndarray | None 193 A 2D array of cells x features filtered to desired features 194 for testing data. 195 196 Returns 197 ------- 198 X_train, X_test : np.ndarray, np.ndarray 199 Transformed matrices. Only X_train is returned if 200 `X_test is None`. 201 """ 202 n_components = np.min([50, X_train.shape[1]]) 203 PCA_func = PCA(n_components = n_components, random_state = 1) 204 205 X_train = PCA_func.fit_transform(np.asarray(X_train)) 206 207 if X_test is not None: 208 X_test = PCA_func.transform(np.asarray(X_test)) 209 210 return X_train, X_test
Returns matrices with PCA reduction. If X_test is None
, only
X_train is returned.
Parameters
- X_train (scipy.sparse._csc.csc_matrix | np.ndarray): A 2D array of cells x features filtered to desired features for training data.
- X_test (scipy.sparse._csc.csc_matrix | np.ndarray | None): A 2D array of cells x features filtered to desired features for testing data.
Returns
- X_train, X_test (np.ndarray, np.ndarray):
Transformed matrices. Only X_train is returned if
X_test is None
.
def
get_reduction(reduction: str):
221def get_reduction(reduction: str): 222 """ 223 Function used to identify reduction type and return function to 224 apply to data matrices. 225 226 Parameters 227 ---------- 228 reduction : str 229 The reduction for data transformation. Options are `['pca', 230 'svd', 'None']`. 231 232 Returns 233 ------- 234 red_func : function 235 The function to reduce the data. 236 """ 237 match reduction: 238 case 'pca': 239 red_func = pca_transformation 240 case 'svd': 241 red_func = svd_transformation 242 case 'None': 243 red_func = _no_transformation 244 245 return red_func
Function used to identify reduction type and return function to apply to data matrices.
Parameters
- reduction (str):
The reduction for data transformation. Options are
['pca', 'svd', 'None']
.
Returns
- red_func (function): The function to reduce the data.
def
get_group_mat( adata: anndata._core.anndata.AnnData, n_features: int, group_features: numpy.ndarray, n_group_features: int, process_test: bool = False) -> numpy.ndarray:
248def get_group_mat(adata: ad.AnnData, n_features: int, 249 group_features: np.ndarray, 250 n_group_features: int, 251 process_test: bool=False) -> np.ndarray: 252 """ 253 Filters to only features in group. Will sample features if 254 `n_features < n_group_features`. 255 256 Parameters 257 ---------- 258 adata : anndata.AnnData 259 anndata object with `'seed_obj'`, `'train_indices'`, and 260 `'test_indices'` in `.uns`. 261 262 n_features : int 263 Maximum number of features to keep in matrix. Only 264 impacts mat if `n_features < n_group_features`. 265 266 group_features : list | tuple | np.ndarray 267 Feature names in group to filter matrices to. 268 269 n_group_features : int 270 Number of features in group. 271 272 n_samples : int 273 Number of samples to filter X_train to. 274 275 Returns 276 ------- 277 X_train, X_test : np.ndarray, np.ndarray 278 Filtered matrices. If `n_samples` is provided, only `X_train` 279 is returned. If `adata.uns['reduction']` is `'pca'` or 280 `'svd'` the matrices are transformed before being returned. 281 """ 282 # Getting reduction function 283 reduction_func = get_reduction(adata.uns['reduction']) 284 285 # Sample up to n_features features- important for scalability if 286 # using large groupings 287 # Will use all features if the grouping contains fewer than n_features 288 number_features = np.min([n_features, n_group_features]) 289 group_array = np.array(list(group_features)) 290 group_features = adata.uns['seed_obj'].choice(group_array, 291 number_features, 292 replace = False) 293 294 # Create data arrays containing only features within this group 295 if process_test: 296 X_train = adata[adata.uns['train_indices'],:][:, group_features].X 297 X_test = adata[adata.uns['test_indices'],:][:, group_features].X 298 X_train, X_test = reduction_func(X_train, X_test) 299 return X_train, X_test 300 301 else: 302 X_train = adata[:, group_features].X 303 return X_train
Filters to only features in group. Will sample features if
n_features < n_group_features
.
Parameters
- adata (anndata.AnnData):
anndata object with
'seed_obj'
,'train_indices'
, and'test_indices'
in.uns
. - n_features (int):
Maximum number of features to keep in matrix. Only
impacts mat if
n_features < n_group_features
. - group_features (list | tuple | np.ndarray): Feature names in group to filter matrices to.
- n_group_features (int): Number of features in group.
- n_samples (int): Number of samples to filter X_train to.
Returns
- X_train, X_test (np.ndarray, np.ndarray):
Filtered matrices. If
n_samples
is provided, onlyX_train
is returned. Ifadata.uns['reduction']
is'pca'
or'svd'
the matrices are transformed before being returned.