scmkl.estimate_sigma
1import scipy 2import anndata as ad 3import numpy as np 4 5from scmkl.data_processing import process_data, get_group_mat, sample_cells 6from scmkl.tfidf_normalize import tfidf 7 8 9def get_batches(sample_range: list | np.ndarray, 10 seed_obj: np.random._generator.Generator, 11 batches: int, 12 batch_size: int) -> np.ndarray: 13 """ 14 Gets batch indices for estimating sigma. 15 16 Parameters 17 ---------- 18 sample_range : list | np.ndarray 19 A 1D array with first element being 0 and last element being 20 (1 - number of samples from X_train). 21 22 seed_obj : np.random._generator.Generator 23 Numpy random generator object from `adata.uns['seed_obj']`. 24 25 batches : int 26 Number of batches to calculate indices for. 27 28 batch_size : int 29 Number of samples in each batch. 30 31 Returns 32 ------- 33 batches_idx : np.ndarray 34 A 2D array with each row cooresponding to the sample indices 35 for each batch. 36 """ 37 required_n = batches*batch_size 38 train_n = len(sample_range) 39 assert required_n <= train_n, (f"{required_n} cells required for " 40 f"{batches} batches of {batch_size} cells; " 41 f"only {train_n} cells present") 42 43 # Creating a mat of batch x sample indices for estimating sigma 44 batches_idx = np.zeros((batches, batch_size), dtype = np.int16) 45 46 for i in range(batches): 47 batches_idx[i] = seed_obj.choice(sample_range, 48 batch_size, 49 replace = False) 50 51 # Removing selected indices from sample options 52 rm_indices = np.isin(sample_range, batches_idx[i]) 53 sample_range = np.delete(sample_range, rm_indices) 54 55 return batches_idx 56 57 58def batch_sigma(X_train: np.ndarray, 59 distance_metric: str, 60 batch_idx: np.ndarray) -> float: 61 """ 62 Calculates the kernel width (sigma) for a feature grouping through 63 sample batching. 64 65 Parameters 66 ---------- 67 X_train : np.ndarray 68 A 2D numpy array with cells x features with features filtered 69 to features in grouping and sampled cells. 70 71 distance_metric: str 72 The pairwise distance metric used to estimate sigma. Must 73 be one of the options used in scipy.spatial.distance.cdist. 74 75 batch_idx: np.ndarray 76 A 2D array with each row cooresponding to the sample indices 77 for each batch. 78 79 Returns 80 ------- 81 sigma : float 82 The estimated group kernel with for Z projection before 83 adjustments for small kernel width or large groupings. 84 """ 85 # Calculate Distance Matrix with specified metric 86 n_batches = batch_idx.shape[0] 87 batch_sigmas = np.zeros(n_batches) 88 89 for i in np.arange(n_batches): 90 idx = batch_idx[i] 91 batch_sigma = scipy.spatial.distance.pdist(X_train[idx,:], 92 distance_metric) 93 batch_sigmas[i] = np.mean(batch_sigma) 94 95 sigma = np.mean(batch_sigmas) 96 97 return sigma 98 99 100def est_group_sigma(adata: ad.AnnData, 101 X_train: np.ndarray, 102 n_group_features: int, 103 n_features: int, 104 batch_idx: np.ndarray) -> float: 105 """ 106 Processes data and calculates the kernel width (sigma) for a 107 feature grouping through sample batching. 108 109 Parameters 110 ---------- 111 X_train : np.ndarray 112 A 2D numpy array with cells x features with features filtered 113 to features in grouping and sampled cells. 114 115 adata : anndata.AnnData 116 adata used to derive X_train containing 'seed_obj' in uns 117 attribute. 118 119 n_group_features : int 120 Number of features in feature grouping. 121 122 n_features : int 123 Maximum number of features to be used in sigma estimation. 124 125 batch_idx 126 A 2D array with each row cooresponding to the sample indices 127 for each batch. 128 129 Returns 130 ------- 131 sigma : float 132 The estimated group kernel with for Z projection. 133 134 """ 135 if adata.uns['tfidf']: 136 X_train = tfidf(X_train, mode = 'normalize') 137 138 X_train = process_data(X_train, 139 scale_data = adata.uns['scale_data'], 140 return_dense = True) 141 142 if scipy.sparse.issparse(X_train): 143 X_train = X_train.toarray() 144 X_train = np.array(X_train, dtype = np.float32) 145 146 # Calculates mean sigma from all batches 147 sigma = batch_sigma(X_train, adata.uns['distance_metric'], batch_idx) 148 149 # sigma = 0 is numerically unusable in later steps 150 # Using such a small sigma will result in wide distribution, and 151 # typically a non-predictive Z 152 if sigma == 0: 153 sigma += 1e-5 154 155 if n_features < n_group_features: 156 # Heuristic we calculated to account for fewer features used in 157 # distance calculation 158 sigma = sigma * n_group_features / n_features 159 160 return sigma 161 162 163def estimate_sigma(adata: ad.AnnData, 164 n_features: int = 5000, 165 batches: int = 10, 166 batch_size: int = 100) -> ad.AnnData: 167 """ 168 Calculate kernel widths to inform distribution for projection of 169 Fourier Features. Calculates one sigma per group of features. 170 171 Parameters 172 ---------- 173 adata : ad.AnnData 174 Created by `create_adata`. 175 176 n_features : int 177 Number of random features to include when estimating sigma. 178 Will be scaled for the whole pathway set according to a 179 heuristic. Used for scalability. 180 181 batches : int 182 The number of batches to use for the distance calculation. 183 This will average the result of `batches` distance calculations 184 of `batch_size` randomly sampled cells. More batches will converge 185 to population distance values at the cost of scalability. 186 187 batch_size : int 188 The number of cells to include per batch for distance 189 calculations. Higher batch size will converge to population 190 distance values at the cost of scalability. 191 If `batches` * `batch_size` > # training cells, 192 `batch_size` will be reduced to `int(num training cells / 193 batches)`. 194 195 Returns 196 ------- 197 adata : ad.AnnData 198 Key added `adata.uns['sigma']` with grouping kernel widths. 199 200 Examples 201 -------- 202 >>> adata = scmkl.estimate_sigma(adata) 203 >>> adata.uns['sigma'] 204 array([10.4640895 , 10.82011454, 6.16769438, 9.86156855, ...]) 205 """ 206 assert batch_size <= len(adata.uns['train_indices']), ("Batch size must be " 207 "smaller than the " 208 "training set.") 209 210 if batch_size * batches > len(adata.uns['train_indices']): 211 old_batch_size = batch_size 212 batch_size = int(len(adata.uns['train_indices'])/batches) 213 print("Specified batch size required too many cells for " 214 "independent batches. Reduced batch size from " 215 f"{old_batch_size} to {batch_size}") 216 217 if batch_size > 2000: 218 print("Warning: Batch sizes over 2000 may " 219 "result in long run-time.") 220 221 # Getting subsample indices 222 sample_size = np.min((2000, adata.uns['train_indices'].shape[0])) 223 indices = sample_cells(adata.uns['train_indices'], sample_size=sample_size, 224 seed_obj=adata.uns['seed_obj']) 225 226 # Getting batch indices 227 sample_range = np.arange(sample_size) 228 batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 229 batches, batch_size) 230 231 # Loop over every group in group_dict 232 sigma_array = np.zeros((len(adata.uns['group_dict']))) 233 for m, group_features in enumerate(adata.uns['group_dict'].values()): 234 235 n_group_features = len(group_features) 236 237 # Filtering to only features in grouping using filtered view of adata 238 X_train = get_group_mat(adata[indices], n_features=n_features, 239 group_features=group_features, 240 n_group_features=n_group_features) 241 242 if adata.uns['tfidf']: 243 X_train = tfidf(X_train, mode='normalize') 244 245 # Data filtering, and transformation according to given data_type 246 # Will remove low variance (< 1e5) features regardless of data_type 247 # If scale_data will log scale and z-score the data 248 X_train = process_data(X_train=X_train, 249 scale_data=adata.uns['scale_data'], 250 return_dense=True) 251 252 # Estimating sigma 253 sigma = est_group_sigma(adata, X_train, n_group_features, 254 n_features, batch_idx=batch_idx) 255 256 sigma_array[m] = sigma 257 258 adata.uns['sigma'] = sigma_array 259 260 return adata
def
get_batches( sample_range: list | numpy.ndarray, seed_obj: numpy.random._generator.Generator, batches: int, batch_size: int) -> numpy.ndarray:
10def get_batches(sample_range: list | np.ndarray, 11 seed_obj: np.random._generator.Generator, 12 batches: int, 13 batch_size: int) -> np.ndarray: 14 """ 15 Gets batch indices for estimating sigma. 16 17 Parameters 18 ---------- 19 sample_range : list | np.ndarray 20 A 1D array with first element being 0 and last element being 21 (1 - number of samples from X_train). 22 23 seed_obj : np.random._generator.Generator 24 Numpy random generator object from `adata.uns['seed_obj']`. 25 26 batches : int 27 Number of batches to calculate indices for. 28 29 batch_size : int 30 Number of samples in each batch. 31 32 Returns 33 ------- 34 batches_idx : np.ndarray 35 A 2D array with each row cooresponding to the sample indices 36 for each batch. 37 """ 38 required_n = batches*batch_size 39 train_n = len(sample_range) 40 assert required_n <= train_n, (f"{required_n} cells required for " 41 f"{batches} batches of {batch_size} cells; " 42 f"only {train_n} cells present") 43 44 # Creating a mat of batch x sample indices for estimating sigma 45 batches_idx = np.zeros((batches, batch_size), dtype = np.int16) 46 47 for i in range(batches): 48 batches_idx[i] = seed_obj.choice(sample_range, 49 batch_size, 50 replace = False) 51 52 # Removing selected indices from sample options 53 rm_indices = np.isin(sample_range, batches_idx[i]) 54 sample_range = np.delete(sample_range, rm_indices) 55 56 return batches_idx
Gets batch indices for estimating sigma.
Parameters
- sample_range (list | np.ndarray): A 1D array with first element being 0 and last element being (1 - number of samples from X_train).
- seed_obj (np.random._generator.Generator):
Numpy random generator object from
adata.uns['seed_obj']
. - batches (int): Number of batches to calculate indices for.
- batch_size (int): Number of samples in each batch.
Returns
- batches_idx (np.ndarray): A 2D array with each row cooresponding to the sample indices for each batch.
def
batch_sigma( X_train: numpy.ndarray, distance_metric: str, batch_idx: numpy.ndarray) -> float:
59def batch_sigma(X_train: np.ndarray, 60 distance_metric: str, 61 batch_idx: np.ndarray) -> float: 62 """ 63 Calculates the kernel width (sigma) for a feature grouping through 64 sample batching. 65 66 Parameters 67 ---------- 68 X_train : np.ndarray 69 A 2D numpy array with cells x features with features filtered 70 to features in grouping and sampled cells. 71 72 distance_metric: str 73 The pairwise distance metric used to estimate sigma. Must 74 be one of the options used in scipy.spatial.distance.cdist. 75 76 batch_idx: np.ndarray 77 A 2D array with each row cooresponding to the sample indices 78 for each batch. 79 80 Returns 81 ------- 82 sigma : float 83 The estimated group kernel with for Z projection before 84 adjustments for small kernel width or large groupings. 85 """ 86 # Calculate Distance Matrix with specified metric 87 n_batches = batch_idx.shape[0] 88 batch_sigmas = np.zeros(n_batches) 89 90 for i in np.arange(n_batches): 91 idx = batch_idx[i] 92 batch_sigma = scipy.spatial.distance.pdist(X_train[idx,:], 93 distance_metric) 94 batch_sigmas[i] = np.mean(batch_sigma) 95 96 sigma = np.mean(batch_sigmas) 97 98 return sigma
Calculates the kernel width (sigma) for a feature grouping through sample batching.
Parameters
- X_train (np.ndarray): A 2D numpy array with cells x features with features filtered to features in grouping and sampled cells.
- distance_metric (str): The pairwise distance metric used to estimate sigma. Must be one of the options used in scipy.spatial.distance.cdist.
- batch_idx (np.ndarray): A 2D array with each row cooresponding to the sample indices for each batch.
Returns
- sigma (float): The estimated group kernel with for Z projection before adjustments for small kernel width or large groupings.
def
est_group_sigma( adata: anndata._core.anndata.AnnData, X_train: numpy.ndarray, n_group_features: int, n_features: int, batch_idx: numpy.ndarray) -> float:
101def est_group_sigma(adata: ad.AnnData, 102 X_train: np.ndarray, 103 n_group_features: int, 104 n_features: int, 105 batch_idx: np.ndarray) -> float: 106 """ 107 Processes data and calculates the kernel width (sigma) for a 108 feature grouping through sample batching. 109 110 Parameters 111 ---------- 112 X_train : np.ndarray 113 A 2D numpy array with cells x features with features filtered 114 to features in grouping and sampled cells. 115 116 adata : anndata.AnnData 117 adata used to derive X_train containing 'seed_obj' in uns 118 attribute. 119 120 n_group_features : int 121 Number of features in feature grouping. 122 123 n_features : int 124 Maximum number of features to be used in sigma estimation. 125 126 batch_idx 127 A 2D array with each row cooresponding to the sample indices 128 for each batch. 129 130 Returns 131 ------- 132 sigma : float 133 The estimated group kernel with for Z projection. 134 135 """ 136 if adata.uns['tfidf']: 137 X_train = tfidf(X_train, mode = 'normalize') 138 139 X_train = process_data(X_train, 140 scale_data = adata.uns['scale_data'], 141 return_dense = True) 142 143 if scipy.sparse.issparse(X_train): 144 X_train = X_train.toarray() 145 X_train = np.array(X_train, dtype = np.float32) 146 147 # Calculates mean sigma from all batches 148 sigma = batch_sigma(X_train, adata.uns['distance_metric'], batch_idx) 149 150 # sigma = 0 is numerically unusable in later steps 151 # Using such a small sigma will result in wide distribution, and 152 # typically a non-predictive Z 153 if sigma == 0: 154 sigma += 1e-5 155 156 if n_features < n_group_features: 157 # Heuristic we calculated to account for fewer features used in 158 # distance calculation 159 sigma = sigma * n_group_features / n_features 160 161 return sigma
Processes data and calculates the kernel width (sigma) for a feature grouping through sample batching.
Parameters
- X_train (np.ndarray): A 2D numpy array with cells x features with features filtered to features in grouping and sampled cells.
- adata (anndata.AnnData): adata used to derive X_train containing 'seed_obj' in uns attribute.
- n_group_features (int): Number of features in feature grouping.
- n_features (int): Maximum number of features to be used in sigma estimation.
- batch_idx: A 2D array with each row cooresponding to the sample indices for each batch.
Returns
- sigma (float): The estimated group kernel with for Z projection.
def
estimate_sigma( adata: anndata._core.anndata.AnnData, n_features: int = 5000, batches: int = 10, batch_size: int = 100) -> anndata._core.anndata.AnnData:
164def estimate_sigma(adata: ad.AnnData, 165 n_features: int = 5000, 166 batches: int = 10, 167 batch_size: int = 100) -> ad.AnnData: 168 """ 169 Calculate kernel widths to inform distribution for projection of 170 Fourier Features. Calculates one sigma per group of features. 171 172 Parameters 173 ---------- 174 adata : ad.AnnData 175 Created by `create_adata`. 176 177 n_features : int 178 Number of random features to include when estimating sigma. 179 Will be scaled for the whole pathway set according to a 180 heuristic. Used for scalability. 181 182 batches : int 183 The number of batches to use for the distance calculation. 184 This will average the result of `batches` distance calculations 185 of `batch_size` randomly sampled cells. More batches will converge 186 to population distance values at the cost of scalability. 187 188 batch_size : int 189 The number of cells to include per batch for distance 190 calculations. Higher batch size will converge to population 191 distance values at the cost of scalability. 192 If `batches` * `batch_size` > # training cells, 193 `batch_size` will be reduced to `int(num training cells / 194 batches)`. 195 196 Returns 197 ------- 198 adata : ad.AnnData 199 Key added `adata.uns['sigma']` with grouping kernel widths. 200 201 Examples 202 -------- 203 >>> adata = scmkl.estimate_sigma(adata) 204 >>> adata.uns['sigma'] 205 array([10.4640895 , 10.82011454, 6.16769438, 9.86156855, ...]) 206 """ 207 assert batch_size <= len(adata.uns['train_indices']), ("Batch size must be " 208 "smaller than the " 209 "training set.") 210 211 if batch_size * batches > len(adata.uns['train_indices']): 212 old_batch_size = batch_size 213 batch_size = int(len(adata.uns['train_indices'])/batches) 214 print("Specified batch size required too many cells for " 215 "independent batches. Reduced batch size from " 216 f"{old_batch_size} to {batch_size}") 217 218 if batch_size > 2000: 219 print("Warning: Batch sizes over 2000 may " 220 "result in long run-time.") 221 222 # Getting subsample indices 223 sample_size = np.min((2000, adata.uns['train_indices'].shape[0])) 224 indices = sample_cells(adata.uns['train_indices'], sample_size=sample_size, 225 seed_obj=adata.uns['seed_obj']) 226 227 # Getting batch indices 228 sample_range = np.arange(sample_size) 229 batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 230 batches, batch_size) 231 232 # Loop over every group in group_dict 233 sigma_array = np.zeros((len(adata.uns['group_dict']))) 234 for m, group_features in enumerate(adata.uns['group_dict'].values()): 235 236 n_group_features = len(group_features) 237 238 # Filtering to only features in grouping using filtered view of adata 239 X_train = get_group_mat(adata[indices], n_features=n_features, 240 group_features=group_features, 241 n_group_features=n_group_features) 242 243 if adata.uns['tfidf']: 244 X_train = tfidf(X_train, mode='normalize') 245 246 # Data filtering, and transformation according to given data_type 247 # Will remove low variance (< 1e5) features regardless of data_type 248 # If scale_data will log scale and z-score the data 249 X_train = process_data(X_train=X_train, 250 scale_data=adata.uns['scale_data'], 251 return_dense=True) 252 253 # Estimating sigma 254 sigma = est_group_sigma(adata, X_train, n_group_features, 255 n_features, batch_idx=batch_idx) 256 257 sigma_array[m] = sigma 258 259 adata.uns['sigma'] = sigma_array 260 261 return adata
Calculate kernel widths to inform distribution for projection of Fourier Features. Calculates one sigma per group of features.
Parameters
- adata (ad.AnnData):
Created by
create_adata
. - n_features (int): Number of random features to include when estimating sigma. Will be scaled for the whole pathway set according to a heuristic. Used for scalability.
- batches (int):
The number of batches to use for the distance calculation.
This will average the result of
batches
distance calculations ofbatch_size
randomly sampled cells. More batches will converge to population distance values at the cost of scalability. - batch_size (int):
The number of cells to include per batch for distance
calculations. Higher batch size will converge to population
distance values at the cost of scalability.
If
batches
*batch_size
> # training cells,batch_size
will be reduced toint(num training cells / batches)
.
Returns
- adata (ad.AnnData):
Key added
adata.uns['sigma']
with grouping kernel widths.
Examples
>>> adata = scmkl.estimate_sigma(adata)
>>> adata.uns['sigma']
array([10.4640895 , 10.82011454, 6.16769438, 9.86156855, ...])