scmkl.calculate_z
1import numpy as np 2import scipy 3import anndata as ad 4 5from scmkl.tfidf_normalize import tfidf_train_test 6from scmkl.estimate_sigma import est_group_sigma, get_batches 7from scmkl.data_processing import process_data, get_group_mat, sample_cells 8from scmkl.projections import gaussian_trans, laplacian_trans, cauchy_trans 9 10 11def check_for_nan(adata: ad.AnnData): 12 """ 13 Ensures only valid values are in training and test matrices. 14 15 Parameters 16 ---------- 17 adata : ad.AnnData 18 Object with `'Z_train'` and `'Z_test'` keys in `.uns` 19 attribute. 20 21 Returns 22 ------- 23 None 24 """ 25 n_nans = np.sum(np.isnan(adata.uns['Z_train'])) 26 n_nans += np.sum(np.isnan(adata.uns['Z_test'])) 27 28 if n_nans: 29 raise ValueError( 30 "Some values in Z matrix are type `np.nan`. This is likely " 31 "due to a small kernel width or invalid values in input Z matrix." 32 ) 33 34 return None 35 36 37def check_for_inf(adata: ad.AnnData): 38 """ 39 Ensures only valid values are in training and test matrices. 40 41 Parameters 42 ---------- 43 adata : ad.AnnData 44 Object with `'Z_train'` and `'Z_test'` keys in `.uns` 45 attribute. 46 47 Returns 48 ------- 49 None 50 """ 51 n_infs = np.sum(np.isinf(adata.uns['Z_train'])) 52 n_infs += np.sum(np.isinf(adata.uns['Z_test'])) 53 54 if n_infs: 55 raise ValueError( 56 "Some values in Z matrix are type `np.inf`. This is likely " 57 "due to input matrix containing negative values." 58 ) 59 60 return None 61 62 63def get_z_indices(m, D): 64 """ 65 Takes the number associated with the group as `m` and returns the 66 indices for cos and sin functions to be applied. 67 68 Parameters 69 ---------- 70 m : int 71 The chronological number of the group being processed. 72 73 D : int 74 The number of dimensions per group. 75 76 Returns 77 ------- 78 cos_idx, sin_idx : np.ndarray, np.ndarray 79 The indices for cos and sin projections in overall Z matrix. 80 """ 81 x_idx = np.arange(m*2*D ,(m + 1)*2*D) 82 cos_idx = x_idx[:len(x_idx)//2] 83 sin_idx = x_idx[len(x_idx)//2:] 84 85 return cos_idx, sin_idx 86 87 88def calc_groupz(X_train, X_test, adata, D, sigma, proj_func): 89 """ 90 Calculates the Z matrix for grouping. 91 92 Parameters 93 ---------- 94 X_train : np.ndarray 95 The filtered data matrix to calculate train Z mat for. 96 97 X_test : np.ndarray 98 The filtered data matrix to calculate test Z mat for. 99 100 adata : anndata.AnnData 101 AnnData object containing `seed_obj` in `.uns` attribute. 102 103 D : int 104 Number of dimensions per grouping. 105 106 sigma : float 107 Kernel width for grouping. 108 109 proj_func : function 110 The projection direction function to be applied to data. 111 112 Returns 113 ------- 114 train_projections, test_projections : np.ndarray, np.ndarray 115 Training and testing Z matrices for group. 116 """ 117 if scipy.sparse.issparse(X_train): 118 X_train = X_train.toarray().astype(np.float16) 119 X_test = X_test.toarray().astype(np.float16) 120 121 W = proj_func(X_train, sigma, adata.uns['seed_obj'], D) 122 123 train_projection = np.matmul(X_train, W, dtype=np.float16) 124 test_projection = np.matmul(X_test, W, dtype=np.float16) 125 126 return train_projection, test_projection 127 128 129def calculate_z(adata, n_features=5000, batches=10, 130 batch_size=100) -> ad.AnnData: 131 """ 132 Function to calculate Z matrices for all groups in both training 133 and testing data. 134 135 Parameters 136 ---------- 137 adata : ad.AnnData 138 created by `scmkl.create_adata()` with `adata.uns.keys()`: 139 `'train_indices'`, and `'test_indices'`. 140 141 n_features : int 142 Number of random feature to use when calculating Z; used for 143 scalability. 144 145 batches : int 146 The number of batches to use for the distance calculation. 147 This will average the result of `batches` distance calculations 148 of `batch_size` randomly sampled cells. More batches will converge 149 to population distance values at the cost of scalability. 150 151 batch_size : int 152 The number of cells to include per batch for distance 153 calculations. Higher batch size will converge to population 154 distance values at the cost of scalability. 155 If `batches*batch_size > num_training_cells`, 156 `batch_size` will be reduced to 157 `int(num_training_cells / batches)`. 158 159 Returns 160 ------- 161 adata : ad.AnnData 162 `adata` with Z matrices accessible with `adata.uns['Z_train']` 163 and `adata.uns['Z_test']`. 164 165 Examples 166 -------- 167 >>> adata = scmkl.estimate_sigma(adata) 168 >>> adata = scmkl.calculate_z(adata) 169 >>> adata.uns.keys() 170 dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices', 171 'test_indices']) 172 """ 173 # Number of groupings taking from group_dict 174 n_pathway = len(adata.uns['group_dict'].keys()) 175 D = adata.uns['D'] 176 177 sq_i_d = np.sqrt(1/D) 178 179 # Capturing training and testing sizes 180 train_len = len(adata.uns['train_indices']) 181 test_len = len(adata.uns['test_indices']) 182 183 if batch_size * batches > train_len: 184 old_batch_size = batch_size 185 batch_size = int(train_len/batches) 186 print("Specified batch size required too many cells for " 187 "independent batches. Reduced batch size from " 188 f"{old_batch_size} to {batch_size}") 189 190 if 'sigma' not in adata.uns.keys(): 191 n_samples = np.min((2000, train_len)) 192 sample_range = np.arange(n_samples) 193 batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 194 batches=batches, batch_size=batch_size) 195 sigma_indices = sample_cells(adata.uns['train_indices'], n_samples, adata.uns['seed_obj']) 196 197 # Create Arrays to store concatenated group Zs 198 # Each group of features will have a corresponding entry in each array 199 n_cols = 2*adata.uns['D']*n_pathway 200 Z_train = np.zeros((train_len, n_cols), dtype=np.float16) 201 Z_test = np.zeros((test_len, n_cols), dtype=np.float16) 202 203 204 # Setting kernel function 205 match adata.uns['kernel_type'].lower(): 206 case 'gaussian': 207 proj_func = gaussian_trans 208 case 'laplacian': 209 proj_func = laplacian_trans 210 case 'cauchy': 211 proj_func = cauchy_trans 212 213 214 # Loop over each of the groups and creating Z for each 215 sigma_list = list() 216 for m, group_features in enumerate(adata.uns['group_dict'].values()): 217 218 n_group_features = len(group_features) 219 220 X_train, X_test = get_group_mat(adata, n_features, group_features, 221 n_group_features, process_test=True) 222 223 if adata.uns['tfidf']: 224 X_train, X_test = tfidf_train_test(X_train, X_test) 225 226 # Data filtering, and transformation according to given data_type 227 # Will remove low variance (< 1e5) features regardless of data_type 228 # If scale/transform data depending on .uns values 229 X_train, X_test = process_data(X_train=X_train, X_test=X_test, 230 scale_data=adata.uns['scale_data'], 231 transform_data=adata.uns['transform_data'], 232 return_dense=True) 233 234 # Getting sigma 235 if 'sigma' in adata.uns.keys(): 236 sigma = adata.uns['sigma'][m] 237 else: 238 sigma = est_group_sigma(adata, X_train, n_group_features, 239 n_features, batch_idx=batch_idx) 240 sigma_list.append(sigma) 241 242 assert sigma > 0, "Sigma must be more than 0" 243 train_projection, test_projection = calc_groupz(X_train, X_test, 244 adata, D, sigma, 245 proj_func) 246 247 # Store group Z in whole-Z object 248 # Preserves order to be able to extract meaningful groups 249 cos_idx, sin_idx = get_z_indices(m, D) 250 251 Z_train[0:, cos_idx] = np.cos(train_projection, dtype=np.float16) 252 Z_train[0:, sin_idx] = np.sin(train_projection, dtype=np.float16) 253 254 Z_test[0:, cos_idx] = np.cos(test_projection, dtype=np.float16) 255 Z_test[0:, sin_idx] = np.sin(test_projection, dtype=np.float16) 256 257 adata.uns['Z_train'] = Z_train*sq_i_d 258 adata.uns['Z_test'] = Z_test*sq_i_d 259 260 if 'sigma' not in adata.uns.keys(): 261 adata.uns['sigma'] = np.array(sigma_list) 262 263 check_for_nan(adata) 264 check_for_inf(adata) 265 266 return adata
def
check_for_nan(adata: anndata._core.anndata.AnnData):
12def check_for_nan(adata: ad.AnnData): 13 """ 14 Ensures only valid values are in training and test matrices. 15 16 Parameters 17 ---------- 18 adata : ad.AnnData 19 Object with `'Z_train'` and `'Z_test'` keys in `.uns` 20 attribute. 21 22 Returns 23 ------- 24 None 25 """ 26 n_nans = np.sum(np.isnan(adata.uns['Z_train'])) 27 n_nans += np.sum(np.isnan(adata.uns['Z_test'])) 28 29 if n_nans: 30 raise ValueError( 31 "Some values in Z matrix are type `np.nan`. This is likely " 32 "due to a small kernel width or invalid values in input Z matrix." 33 ) 34 35 return None
Ensures only valid values are in training and test matrices.
Parameters
- adata (ad.AnnData):
Object with
'Z_train'and'Z_test'keys in.unsattribute.
Returns
- None
def
check_for_inf(adata: anndata._core.anndata.AnnData):
38def check_for_inf(adata: ad.AnnData): 39 """ 40 Ensures only valid values are in training and test matrices. 41 42 Parameters 43 ---------- 44 adata : ad.AnnData 45 Object with `'Z_train'` and `'Z_test'` keys in `.uns` 46 attribute. 47 48 Returns 49 ------- 50 None 51 """ 52 n_infs = np.sum(np.isinf(adata.uns['Z_train'])) 53 n_infs += np.sum(np.isinf(adata.uns['Z_test'])) 54 55 if n_infs: 56 raise ValueError( 57 "Some values in Z matrix are type `np.inf`. This is likely " 58 "due to input matrix containing negative values." 59 ) 60 61 return None
Ensures only valid values are in training and test matrices.
Parameters
- adata (ad.AnnData):
Object with
'Z_train'and'Z_test'keys in.unsattribute.
Returns
- None
def
get_z_indices(m, D):
64def get_z_indices(m, D): 65 """ 66 Takes the number associated with the group as `m` and returns the 67 indices for cos and sin functions to be applied. 68 69 Parameters 70 ---------- 71 m : int 72 The chronological number of the group being processed. 73 74 D : int 75 The number of dimensions per group. 76 77 Returns 78 ------- 79 cos_idx, sin_idx : np.ndarray, np.ndarray 80 The indices for cos and sin projections in overall Z matrix. 81 """ 82 x_idx = np.arange(m*2*D ,(m + 1)*2*D) 83 cos_idx = x_idx[:len(x_idx)//2] 84 sin_idx = x_idx[len(x_idx)//2:] 85 86 return cos_idx, sin_idx
Takes the number associated with the group as m and returns the
indices for cos and sin functions to be applied.
Parameters
- m (int): The chronological number of the group being processed.
- D (int): The number of dimensions per group.
Returns
- cos_idx, sin_idx (np.ndarray, np.ndarray): The indices for cos and sin projections in overall Z matrix.
def
calc_groupz(X_train, X_test, adata, D, sigma, proj_func):
89def calc_groupz(X_train, X_test, adata, D, sigma, proj_func): 90 """ 91 Calculates the Z matrix for grouping. 92 93 Parameters 94 ---------- 95 X_train : np.ndarray 96 The filtered data matrix to calculate train Z mat for. 97 98 X_test : np.ndarray 99 The filtered data matrix to calculate test Z mat for. 100 101 adata : anndata.AnnData 102 AnnData object containing `seed_obj` in `.uns` attribute. 103 104 D : int 105 Number of dimensions per grouping. 106 107 sigma : float 108 Kernel width for grouping. 109 110 proj_func : function 111 The projection direction function to be applied to data. 112 113 Returns 114 ------- 115 train_projections, test_projections : np.ndarray, np.ndarray 116 Training and testing Z matrices for group. 117 """ 118 if scipy.sparse.issparse(X_train): 119 X_train = X_train.toarray().astype(np.float16) 120 X_test = X_test.toarray().astype(np.float16) 121 122 W = proj_func(X_train, sigma, adata.uns['seed_obj'], D) 123 124 train_projection = np.matmul(X_train, W, dtype=np.float16) 125 test_projection = np.matmul(X_test, W, dtype=np.float16) 126 127 return train_projection, test_projection
Calculates the Z matrix for grouping.
Parameters
- X_train (np.ndarray): The filtered data matrix to calculate train Z mat for.
- X_test (np.ndarray): The filtered data matrix to calculate test Z mat for.
- adata (anndata.AnnData):
AnnData object containing
seed_objin.unsattribute. - D (int): Number of dimensions per grouping.
- sigma (float): Kernel width for grouping.
- proj_func (function): The projection direction function to be applied to data.
Returns
- train_projections, test_projections (np.ndarray, np.ndarray): Training and testing Z matrices for group.
def
calculate_z( adata, n_features=5000, batches=10, batch_size=100) -> anndata._core.anndata.AnnData:
130def calculate_z(adata, n_features=5000, batches=10, 131 batch_size=100) -> ad.AnnData: 132 """ 133 Function to calculate Z matrices for all groups in both training 134 and testing data. 135 136 Parameters 137 ---------- 138 adata : ad.AnnData 139 created by `scmkl.create_adata()` with `adata.uns.keys()`: 140 `'train_indices'`, and `'test_indices'`. 141 142 n_features : int 143 Number of random feature to use when calculating Z; used for 144 scalability. 145 146 batches : int 147 The number of batches to use for the distance calculation. 148 This will average the result of `batches` distance calculations 149 of `batch_size` randomly sampled cells. More batches will converge 150 to population distance values at the cost of scalability. 151 152 batch_size : int 153 The number of cells to include per batch for distance 154 calculations. Higher batch size will converge to population 155 distance values at the cost of scalability. 156 If `batches*batch_size > num_training_cells`, 157 `batch_size` will be reduced to 158 `int(num_training_cells / batches)`. 159 160 Returns 161 ------- 162 adata : ad.AnnData 163 `adata` with Z matrices accessible with `adata.uns['Z_train']` 164 and `adata.uns['Z_test']`. 165 166 Examples 167 -------- 168 >>> adata = scmkl.estimate_sigma(adata) 169 >>> adata = scmkl.calculate_z(adata) 170 >>> adata.uns.keys() 171 dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices', 172 'test_indices']) 173 """ 174 # Number of groupings taking from group_dict 175 n_pathway = len(adata.uns['group_dict'].keys()) 176 D = adata.uns['D'] 177 178 sq_i_d = np.sqrt(1/D) 179 180 # Capturing training and testing sizes 181 train_len = len(adata.uns['train_indices']) 182 test_len = len(adata.uns['test_indices']) 183 184 if batch_size * batches > train_len: 185 old_batch_size = batch_size 186 batch_size = int(train_len/batches) 187 print("Specified batch size required too many cells for " 188 "independent batches. Reduced batch size from " 189 f"{old_batch_size} to {batch_size}") 190 191 if 'sigma' not in adata.uns.keys(): 192 n_samples = np.min((2000, train_len)) 193 sample_range = np.arange(n_samples) 194 batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 195 batches=batches, batch_size=batch_size) 196 sigma_indices = sample_cells(adata.uns['train_indices'], n_samples, adata.uns['seed_obj']) 197 198 # Create Arrays to store concatenated group Zs 199 # Each group of features will have a corresponding entry in each array 200 n_cols = 2*adata.uns['D']*n_pathway 201 Z_train = np.zeros((train_len, n_cols), dtype=np.float16) 202 Z_test = np.zeros((test_len, n_cols), dtype=np.float16) 203 204 205 # Setting kernel function 206 match adata.uns['kernel_type'].lower(): 207 case 'gaussian': 208 proj_func = gaussian_trans 209 case 'laplacian': 210 proj_func = laplacian_trans 211 case 'cauchy': 212 proj_func = cauchy_trans 213 214 215 # Loop over each of the groups and creating Z for each 216 sigma_list = list() 217 for m, group_features in enumerate(adata.uns['group_dict'].values()): 218 219 n_group_features = len(group_features) 220 221 X_train, X_test = get_group_mat(adata, n_features, group_features, 222 n_group_features, process_test=True) 223 224 if adata.uns['tfidf']: 225 X_train, X_test = tfidf_train_test(X_train, X_test) 226 227 # Data filtering, and transformation according to given data_type 228 # Will remove low variance (< 1e5) features regardless of data_type 229 # If scale/transform data depending on .uns values 230 X_train, X_test = process_data(X_train=X_train, X_test=X_test, 231 scale_data=adata.uns['scale_data'], 232 transform_data=adata.uns['transform_data'], 233 return_dense=True) 234 235 # Getting sigma 236 if 'sigma' in adata.uns.keys(): 237 sigma = adata.uns['sigma'][m] 238 else: 239 sigma = est_group_sigma(adata, X_train, n_group_features, 240 n_features, batch_idx=batch_idx) 241 sigma_list.append(sigma) 242 243 assert sigma > 0, "Sigma must be more than 0" 244 train_projection, test_projection = calc_groupz(X_train, X_test, 245 adata, D, sigma, 246 proj_func) 247 248 # Store group Z in whole-Z object 249 # Preserves order to be able to extract meaningful groups 250 cos_idx, sin_idx = get_z_indices(m, D) 251 252 Z_train[0:, cos_idx] = np.cos(train_projection, dtype=np.float16) 253 Z_train[0:, sin_idx] = np.sin(train_projection, dtype=np.float16) 254 255 Z_test[0:, cos_idx] = np.cos(test_projection, dtype=np.float16) 256 Z_test[0:, sin_idx] = np.sin(test_projection, dtype=np.float16) 257 258 adata.uns['Z_train'] = Z_train*sq_i_d 259 adata.uns['Z_test'] = Z_test*sq_i_d 260 261 if 'sigma' not in adata.uns.keys(): 262 adata.uns['sigma'] = np.array(sigma_list) 263 264 check_for_nan(adata) 265 check_for_inf(adata) 266 267 return adata
Function to calculate Z matrices for all groups in both training and testing data.
Parameters
- adata (ad.AnnData):
created by
scmkl.create_adatawithadata.uns.keys():'train_indices', and'test_indices'. - n_features (int): Number of random feature to use when calculating Z; used for scalability.
- batches (int):
The number of batches to use for the distance calculation.
This will average the result of
batchesdistance calculations ofbatch_sizerandomly sampled cells. More batches will converge to population distance values at the cost of scalability. - batch_size (int):
The number of cells to include per batch for distance
calculations. Higher batch size will converge to population
distance values at the cost of scalability.
If
batches*batch_size > num_training_cells,batch_sizewill be reduced toint(num_training_cells / batches).
Returns
- adata (ad.AnnData):
adatawith Z matrices accessible withadata.uns['Z_train']andadata.uns['Z_test'].
Examples
>>> adata = scmkl.estimate_sigma(adata)
>>> adata = scmkl.calculate_z(adata)
>>> adata.uns.keys()
dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices',
'test_indices'])