scmkl.estimate_sigma
1import scipy 2import anndata as ad 3import numpy as np 4 5from scmkl.data_processing import process_data, get_group_mat, sample_cells 6from scmkl.tfidf_normalize import tfidf 7 8 9def get_batches(sample_range: list | np.ndarray, 10 seed_obj: np.random._generator.Generator, 11 batches: int, 12 batch_size: int) -> np.ndarray: 13 """ 14 Gets batch indices for estimating sigma. 15 16 Parameters 17 ---------- 18 sample_range : list | np.ndarray 19 A 1D array with first element being 0 and last element being 20 (1 - number of samples from X_train). 21 22 seed_obj : np.random._generator.Generator 23 Numpy random generator object from `adata.uns['seed_obj']`. 24 25 batches : int 26 Number of batches to calculate indices for. 27 28 batch_size : int 29 Number of samples in each batch. 30 31 Returns 32 ------- 33 batches_idx : np.ndarray 34 A 2D array with each row cooresponding to the sample indices 35 for each batch. 36 """ 37 required_n = batches*batch_size 38 train_n = len(sample_range) 39 assert required_n <= train_n, (f"{required_n} cells required for " 40 f"{batches} batches of {batch_size} cells; " 41 f"only {train_n} cells present") 42 43 # Creating a mat of batch x sample indices for estimating sigma 44 batches_idx = np.zeros((batches, batch_size), dtype = np.int16) 45 46 for i in range(batches): 47 batches_idx[i] = seed_obj.choice(sample_range, 48 batch_size, 49 replace = False) 50 51 # Removing selected indices from sample options 52 rm_indices = np.isin(sample_range, batches_idx[i]) 53 sample_range = np.delete(sample_range, rm_indices) 54 55 return batches_idx 56 57 58def batch_sigma(X_train: np.ndarray, 59 distance_metric: str, 60 batch_idx: np.ndarray) -> float: 61 """ 62 Calculates the kernel width (sigma) for a feature grouping through 63 sample batching. 64 65 Parameters 66 ---------- 67 X_train : np.ndarray 68 A 2D numpy array with cells x features with features filtered 69 to features in grouping and sampled cells. 70 71 distance_metric: str 72 The pairwise distance metric used to estimate sigma. Must 73 be one of the options used in scipy.spatial.distance.cdist. 74 75 batch_idx: np.ndarray 76 A 2D array with each row cooresponding to the sample indices 77 for each batch. 78 79 Returns 80 ------- 81 sigma : float 82 The estimated group kernel with for Z projection before 83 adjustments for small kernel width or large groupings. 84 """ 85 # Calculate Distance Matrix with specified metric 86 n_batches = batch_idx.shape[0] 87 batch_sigmas = np.zeros(n_batches) 88 89 for i in np.arange(n_batches): 90 idx = batch_idx[i] 91 batch_sigma = scipy.spatial.distance.pdist(X_train[idx,:], 92 distance_metric) 93 batch_sigmas[i] = np.mean(batch_sigma) 94 95 sigma = np.mean(batch_sigmas) 96 97 return sigma 98 99 100def est_group_sigma(adata: ad.AnnData, 101 X_train: np.ndarray, 102 n_group_features: int, 103 n_features: int, 104 batch_idx: np.ndarray) -> float: 105 """ 106 Processes data and calculates the kernel width (sigma) for a 107 feature grouping through sample batching. 108 109 Parameters 110 ---------- 111 X_train : np.ndarray 112 A 2D numpy array with cells x features with features filtered 113 to features in grouping and sampled cells. 114 115 adata : anndata.AnnData 116 adata used to derive X_train containing 'seed_obj' in uns 117 attribute. 118 119 n_group_features : int 120 Number of features in feature grouping. 121 122 n_features : int 123 Maximum number of features to be used in sigma estimation. 124 125 batch_idx 126 A 2D array with each row cooresponding to the sample indices 127 for each batch. 128 129 Returns 130 ------- 131 sigma : float 132 The estimated group kernel with for Z projection. 133 134 """ 135 if adata.uns['tfidf']: 136 X_train = tfidf(X_train, mode = 'normalize') 137 138 X_train = process_data(X_train, 139 scale_data=adata.uns['scale_data'], 140 transform_data=adata.uns['transform_data'], 141 return_dense=True) 142 143 if scipy.sparse.issparse(X_train): 144 X_train = X_train.toarray() 145 X_train = np.array(X_train, dtype=np.float16) 146 147 # Calculates mean sigma from all batches 148 sigma = batch_sigma(X_train, adata.uns['distance_metric'], batch_idx) 149 150 # sigma = 0 is numerically unusable in later steps 151 # Using such a small sigma will result in wide distribution, and 152 # typically a non-predictive Z 153 if sigma < 1e-2: 154 sigma = 1e-2 155 156 if n_features < n_group_features: 157 # Heuristic we calculated to account for fewer features used in 158 # distance calculation 159 sigma = sigma * n_group_features / n_features 160 161 return sigma 162 163 164def estimate_sigma(adata: ad.AnnData, 165 n_features: int = 5000, 166 batches: int = 10, 167 batch_size: int = 100) -> ad.AnnData: 168 """ 169 Calculate kernel widths to inform distribution for projection of 170 Fourier Features. Calculates one sigma per group of features. 171 172 Parameters 173 ---------- 174 adata : ad.AnnData 175 Created by `create_adata`. 176 177 n_features : int 178 Number of random features to include when estimating sigma. 179 Will be scaled for the whole pathway set according to a 180 heuristic. Used for scalability. 181 182 batches : int 183 The number of batches to use for the distance calculation. 184 This will average the result of `batches` distance calculations 185 of `batch_size` randomly sampled cells. More batches will converge 186 to population distance values at the cost of scalability. 187 188 batch_size : int 189 The number of cells to include per batch for distance 190 calculations. Higher batch size will converge to population 191 distance values at the cost of scalability. 192 If `batches` * `batch_size` > # training cells, 193 `batch_size` will be reduced to `int(num training cells / 194 batches)`. 195 196 Returns 197 ------- 198 adata : ad.AnnData 199 Key added `adata.uns['sigma']` with grouping kernel widths. 200 201 Examples 202 -------- 203 >>> adata = scmkl.estimate_sigma(adata) 204 >>> adata.uns['sigma'] 205 array([10.4640895 , 10.82011454, 6.16769438, 9.86156855, ...]) 206 """ 207 assert batch_size <= len(adata.uns['train_indices']), ("Batch size must be " 208 "smaller than the " 209 "training set.") 210 211 if batch_size * batches > len(adata.uns['train_indices']): 212 old_batch_size = batch_size 213 batch_size = int(len(adata.uns['train_indices'])/batches) 214 print("Specified batch size required too many cells for " 215 "independent batches. Reduced batch size from " 216 f"{old_batch_size} to {batch_size}", flush=True) 217 218 if batch_size > 2000: 219 print("Warning: Batch sizes over 2000 may " 220 "result in long run-time.", flush=True) 221 222 # Getting subsample indices 223 sample_size = np.min((2000, adata.uns['train_indices'].shape[0])) 224 indices = sample_cells(adata.uns['train_indices'], sample_size=sample_size, 225 seed_obj=adata.uns['seed_obj']) 226 227 # Getting batch indices 228 sample_range = np.arange(sample_size) 229 batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 230 batches, batch_size) 231 232 # Loop over every group in group_dict 233 sigma_array = np.zeros((len(adata.uns['group_dict']))) 234 for m, group_features in enumerate(adata.uns['group_dict'].values()): 235 236 n_group_features = len(group_features) 237 238 # Filtering to only features in grouping using filtered view of adata 239 X_train = get_group_mat(adata[indices], n_features=n_features, 240 group_features=group_features, 241 n_group_features=n_group_features) 242 243 if adata.uns['tfidf']: 244 X_train = tfidf(X_train, mode='normalize') 245 246 # Data filtering, and transformation according to given data_type 247 # Will remove low variance (< 1e5) features regardless of data_type 248 # If scale_data will log scale and z-score the data 249 X_train = process_data(X_train=X_train, 250 scale_data=adata.uns['scale_data'], 251 transform_data=adata.uns['transform_data'], 252 return_dense=True) 253 254 # Estimating sigma 255 sigma = est_group_sigma(adata, X_train, n_group_features, 256 n_features, batch_idx=batch_idx) 257 258 sigma_array[m] = sigma 259 260 adata.uns['sigma'] = sigma_array 261 262 return adata
def
get_batches( sample_range: list | numpy.ndarray, seed_obj: numpy.random._generator.Generator, batches: int, batch_size: int) -> numpy.ndarray:
10def get_batches(sample_range: list | np.ndarray, 11 seed_obj: np.random._generator.Generator, 12 batches: int, 13 batch_size: int) -> np.ndarray: 14 """ 15 Gets batch indices for estimating sigma. 16 17 Parameters 18 ---------- 19 sample_range : list | np.ndarray 20 A 1D array with first element being 0 and last element being 21 (1 - number of samples from X_train). 22 23 seed_obj : np.random._generator.Generator 24 Numpy random generator object from `adata.uns['seed_obj']`. 25 26 batches : int 27 Number of batches to calculate indices for. 28 29 batch_size : int 30 Number of samples in each batch. 31 32 Returns 33 ------- 34 batches_idx : np.ndarray 35 A 2D array with each row cooresponding to the sample indices 36 for each batch. 37 """ 38 required_n = batches*batch_size 39 train_n = len(sample_range) 40 assert required_n <= train_n, (f"{required_n} cells required for " 41 f"{batches} batches of {batch_size} cells; " 42 f"only {train_n} cells present") 43 44 # Creating a mat of batch x sample indices for estimating sigma 45 batches_idx = np.zeros((batches, batch_size), dtype = np.int16) 46 47 for i in range(batches): 48 batches_idx[i] = seed_obj.choice(sample_range, 49 batch_size, 50 replace = False) 51 52 # Removing selected indices from sample options 53 rm_indices = np.isin(sample_range, batches_idx[i]) 54 sample_range = np.delete(sample_range, rm_indices) 55 56 return batches_idx
Gets batch indices for estimating sigma.
Parameters
- sample_range (list | np.ndarray): A 1D array with first element being 0 and last element being (1 - number of samples from X_train).
- seed_obj (np.random._generator.Generator):
Numpy random generator object from
adata.uns['seed_obj']. - batches (int): Number of batches to calculate indices for.
- batch_size (int): Number of samples in each batch.
Returns
- batches_idx (np.ndarray): A 2D array with each row cooresponding to the sample indices for each batch.
def
batch_sigma( X_train: numpy.ndarray, distance_metric: str, batch_idx: numpy.ndarray) -> float:
59def batch_sigma(X_train: np.ndarray, 60 distance_metric: str, 61 batch_idx: np.ndarray) -> float: 62 """ 63 Calculates the kernel width (sigma) for a feature grouping through 64 sample batching. 65 66 Parameters 67 ---------- 68 X_train : np.ndarray 69 A 2D numpy array with cells x features with features filtered 70 to features in grouping and sampled cells. 71 72 distance_metric: str 73 The pairwise distance metric used to estimate sigma. Must 74 be one of the options used in scipy.spatial.distance.cdist. 75 76 batch_idx: np.ndarray 77 A 2D array with each row cooresponding to the sample indices 78 for each batch. 79 80 Returns 81 ------- 82 sigma : float 83 The estimated group kernel with for Z projection before 84 adjustments for small kernel width or large groupings. 85 """ 86 # Calculate Distance Matrix with specified metric 87 n_batches = batch_idx.shape[0] 88 batch_sigmas = np.zeros(n_batches) 89 90 for i in np.arange(n_batches): 91 idx = batch_idx[i] 92 batch_sigma = scipy.spatial.distance.pdist(X_train[idx,:], 93 distance_metric) 94 batch_sigmas[i] = np.mean(batch_sigma) 95 96 sigma = np.mean(batch_sigmas) 97 98 return sigma
Calculates the kernel width (sigma) for a feature grouping through sample batching.
Parameters
- X_train (np.ndarray): A 2D numpy array with cells x features with features filtered to features in grouping and sampled cells.
- distance_metric (str): The pairwise distance metric used to estimate sigma. Must be one of the options used in scipy.spatial.distance.cdist.
- batch_idx (np.ndarray): A 2D array with each row cooresponding to the sample indices for each batch.
Returns
- sigma (float): The estimated group kernel with for Z projection before adjustments for small kernel width or large groupings.
def
est_group_sigma( adata: anndata._core.anndata.AnnData, X_train: numpy.ndarray, n_group_features: int, n_features: int, batch_idx: numpy.ndarray) -> float:
101def est_group_sigma(adata: ad.AnnData, 102 X_train: np.ndarray, 103 n_group_features: int, 104 n_features: int, 105 batch_idx: np.ndarray) -> float: 106 """ 107 Processes data and calculates the kernel width (sigma) for a 108 feature grouping through sample batching. 109 110 Parameters 111 ---------- 112 X_train : np.ndarray 113 A 2D numpy array with cells x features with features filtered 114 to features in grouping and sampled cells. 115 116 adata : anndata.AnnData 117 adata used to derive X_train containing 'seed_obj' in uns 118 attribute. 119 120 n_group_features : int 121 Number of features in feature grouping. 122 123 n_features : int 124 Maximum number of features to be used in sigma estimation. 125 126 batch_idx 127 A 2D array with each row cooresponding to the sample indices 128 for each batch. 129 130 Returns 131 ------- 132 sigma : float 133 The estimated group kernel with for Z projection. 134 135 """ 136 if adata.uns['tfidf']: 137 X_train = tfidf(X_train, mode = 'normalize') 138 139 X_train = process_data(X_train, 140 scale_data=adata.uns['scale_data'], 141 transform_data=adata.uns['transform_data'], 142 return_dense=True) 143 144 if scipy.sparse.issparse(X_train): 145 X_train = X_train.toarray() 146 X_train = np.array(X_train, dtype=np.float16) 147 148 # Calculates mean sigma from all batches 149 sigma = batch_sigma(X_train, adata.uns['distance_metric'], batch_idx) 150 151 # sigma = 0 is numerically unusable in later steps 152 # Using such a small sigma will result in wide distribution, and 153 # typically a non-predictive Z 154 if sigma < 1e-2: 155 sigma = 1e-2 156 157 if n_features < n_group_features: 158 # Heuristic we calculated to account for fewer features used in 159 # distance calculation 160 sigma = sigma * n_group_features / n_features 161 162 return sigma
Processes data and calculates the kernel width (sigma) for a feature grouping through sample batching.
Parameters
- X_train (np.ndarray): A 2D numpy array with cells x features with features filtered to features in grouping and sampled cells.
- adata (anndata.AnnData): adata used to derive X_train containing 'seed_obj' in uns attribute.
- n_group_features (int): Number of features in feature grouping.
- n_features (int): Maximum number of features to be used in sigma estimation.
- batch_idx: A 2D array with each row cooresponding to the sample indices for each batch.
Returns
- sigma (float): The estimated group kernel with for Z projection.
def
estimate_sigma( adata: anndata._core.anndata.AnnData, n_features: int = 5000, batches: int = 10, batch_size: int = 100) -> anndata._core.anndata.AnnData:
165def estimate_sigma(adata: ad.AnnData, 166 n_features: int = 5000, 167 batches: int = 10, 168 batch_size: int = 100) -> ad.AnnData: 169 """ 170 Calculate kernel widths to inform distribution for projection of 171 Fourier Features. Calculates one sigma per group of features. 172 173 Parameters 174 ---------- 175 adata : ad.AnnData 176 Created by `create_adata`. 177 178 n_features : int 179 Number of random features to include when estimating sigma. 180 Will be scaled for the whole pathway set according to a 181 heuristic. Used for scalability. 182 183 batches : int 184 The number of batches to use for the distance calculation. 185 This will average the result of `batches` distance calculations 186 of `batch_size` randomly sampled cells. More batches will converge 187 to population distance values at the cost of scalability. 188 189 batch_size : int 190 The number of cells to include per batch for distance 191 calculations. Higher batch size will converge to population 192 distance values at the cost of scalability. 193 If `batches` * `batch_size` > # training cells, 194 `batch_size` will be reduced to `int(num training cells / 195 batches)`. 196 197 Returns 198 ------- 199 adata : ad.AnnData 200 Key added `adata.uns['sigma']` with grouping kernel widths. 201 202 Examples 203 -------- 204 >>> adata = scmkl.estimate_sigma(adata) 205 >>> adata.uns['sigma'] 206 array([10.4640895 , 10.82011454, 6.16769438, 9.86156855, ...]) 207 """ 208 assert batch_size <= len(adata.uns['train_indices']), ("Batch size must be " 209 "smaller than the " 210 "training set.") 211 212 if batch_size * batches > len(adata.uns['train_indices']): 213 old_batch_size = batch_size 214 batch_size = int(len(adata.uns['train_indices'])/batches) 215 print("Specified batch size required too many cells for " 216 "independent batches. Reduced batch size from " 217 f"{old_batch_size} to {batch_size}", flush=True) 218 219 if batch_size > 2000: 220 print("Warning: Batch sizes over 2000 may " 221 "result in long run-time.", flush=True) 222 223 # Getting subsample indices 224 sample_size = np.min((2000, adata.uns['train_indices'].shape[0])) 225 indices = sample_cells(adata.uns['train_indices'], sample_size=sample_size, 226 seed_obj=adata.uns['seed_obj']) 227 228 # Getting batch indices 229 sample_range = np.arange(sample_size) 230 batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 231 batches, batch_size) 232 233 # Loop over every group in group_dict 234 sigma_array = np.zeros((len(adata.uns['group_dict']))) 235 for m, group_features in enumerate(adata.uns['group_dict'].values()): 236 237 n_group_features = len(group_features) 238 239 # Filtering to only features in grouping using filtered view of adata 240 X_train = get_group_mat(adata[indices], n_features=n_features, 241 group_features=group_features, 242 n_group_features=n_group_features) 243 244 if adata.uns['tfidf']: 245 X_train = tfidf(X_train, mode='normalize') 246 247 # Data filtering, and transformation according to given data_type 248 # Will remove low variance (< 1e5) features regardless of data_type 249 # If scale_data will log scale and z-score the data 250 X_train = process_data(X_train=X_train, 251 scale_data=adata.uns['scale_data'], 252 transform_data=adata.uns['transform_data'], 253 return_dense=True) 254 255 # Estimating sigma 256 sigma = est_group_sigma(adata, X_train, n_group_features, 257 n_features, batch_idx=batch_idx) 258 259 sigma_array[m] = sigma 260 261 adata.uns['sigma'] = sigma_array 262 263 return adata
Calculate kernel widths to inform distribution for projection of Fourier Features. Calculates one sigma per group of features.
Parameters
- adata (ad.AnnData):
Created by
create_adata. - n_features (int): Number of random features to include when estimating sigma. Will be scaled for the whole pathway set according to a heuristic. Used for scalability.
- batches (int):
The number of batches to use for the distance calculation.
This will average the result of
batchesdistance calculations ofbatch_sizerandomly sampled cells. More batches will converge to population distance values at the cost of scalability. - batch_size (int):
The number of cells to include per batch for distance
calculations. Higher batch size will converge to population
distance values at the cost of scalability.
If
batches*batch_size> # training cells,batch_sizewill be reduced toint(num training cells / batches).
Returns
- adata (ad.AnnData):
Key added
adata.uns['sigma']with grouping kernel widths.
Examples
>>> adata = scmkl.estimate_sigma(adata)
>>> adata.uns['sigma']
array([10.4640895 , 10.82011454, 6.16769438, 9.86156855, ...])