scmkl.calculate_z
1import numpy as np 2import scipy 3import anndata as ad 4 5 6def _sparse_var(X, axis = None): 7 ''' 8 Function to calculate variance on a scipy sparse matrix. 9 10 Parameters 11 ---------- 12 X : A scipy sparse or numpy array 13 axis : Determines which axis variance is calculated on. Same usage 14 as Numpy. 15 axis = 0 => column variances 16 axis = 1 => row variances 17 axis = None => total variance (calculated on all data) 18 19 Returns 20 ------- 21 var : Variance values calculated over the given axis 22 ''' 23 24 # E[X^2] - E[X]^2 25 if scipy.sparse.issparse(X): 26 exp_mean = (X.power(2).mean(axis = axis)) 27 sq_mean = np.square(X.mean(axis = axis)) 28 var = np.array(exp_mean - sq_mean) 29 else: 30 var = np.var(X, axis = axis) 31 32 return var.ravel() 33 34 35def _process_data(X_train, X_test = None, scale_data = True, 36 return_dense = True): 37 ''' 38 Function to preprocess data matrix according to type of data 39 (counts- e.g. rna, or binary- atac). Will process test data 40 according to parameters calculated from test data 41 42 Parameters 43 ---------- 44 X_train : A scipy sparse or numpy array 45 X_train : A scipy sparse or numpy array 46 data_type : 'counts' or 'binary'. Determines what preprocessing is 47 applied to the data. Log transforms and standard scales 48 counts data TFIDF filters ATAC data to remove 49 uninformative columns 50 51 Returns 52 ------- 53 X_train, X_test : Numpy arrays with the process train/test data 54 respectively. 55 ''' 56 if X_test is None: 57 # Creates dummy matrix to for the sake of calculation without 58 # increasing computational time 59 X_test = X_train[:1,:] 60 orig_test = None 61 else: 62 orig_test = 'given' 63 64 # Remove features that have no variance in the training data 65 # (will be uniformative) 66 var = _sparse_var(X_train, axis = 0) 67 variable_features = np.where(var > 1e-5)[0] 68 69 X_train = X_train[:,variable_features] 70 X_test = X_test[:, variable_features] 71 72 #Data processing according to data type 73 if scale_data: 74 75 if scipy.sparse.issparse(X_train): 76 X_train = X_train.log1p() 77 X_test = X_test.log1p() 78 else: 79 X_train = np.log1p(X_train) 80 X_test = np.log1p(X_test) 81 82 #Center and scale count data 83 train_means = np.mean(X_train, 0) 84 train_sds = np.sqrt(var[variable_features]) 85 86 # Perform transformation on test data according to parameters 87 # of the training data 88 X_train = (X_train - train_means) / train_sds 89 X_test = (X_test - train_means) / train_sds 90 91 if return_dense and scipy.sparse.issparse(X_train): 92 X_train = X_train.toarray() 93 X_test = X_test.toarray() 94 95 if orig_test == None: 96 return X_train 97 else: 98 return X_train, X_test 99 100 101def calculate_z(adata, n_features = 5000) -> ad.AnnData: 102 ''' 103 Function to calculate Z matrix. 104 105 Parameters 106 ---------- 107 **adata** : *AnnData* 108 > created by `create_adata()` with `adata.uns.keys()` 109 `'sigma'`, `'train_indices'`, and `'test_indices'`. 110 `'sigma'` key can be added by running `estimate_sigma()` on 111 adata. 112 113 **n_features** : *int* 114 > Number of random feature to use when calculating Z- used for 115 scalability. 116 117 Returns 118 ------- 119 **adata** : *AnnData* 120 > adata with Z matrices accessible with `adata.uns['Z_train']` 121 and `adata.uns['Z_test']`. 122 123 Examples 124 -------- 125 >>> adata = scmkl.estimate_sigma(adata) 126 >>> adata = scmkl.calculate_z(adata) 127 >>> adata.uns.keys() 128 dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices', 129 'test_indices']) 130 ''' 131 assert np.all(adata.uns['sigma'] > 0), 'Sigma must be positive' 132 133 # Number of groupings taking from group_dict 134 n_pathway = len(adata.uns['group_dict'].keys()) 135 D = adata.uns['D'] 136 137 # Capturing training and testing indices 138 train_idx = np.array(adata.uns['train_indices'], dtype = np.int_) 139 test_idx = np.array(adata.uns['test_indices'], dtype = np.int_) 140 141 # Create Arrays to store concatenated group Z 142 # Each group of features will have a corresponding entry in each array 143 n_cols = 2 * adata.uns['D'] * n_pathway 144 145 Z_train = np.zeros((train_idx.shape[0], n_cols)) 146 Z_test = np.zeros((test_idx.shape[0], n_cols)) 147 148 # Loop over each of the groups and creating Z for each 149 for m, group_features in enumerate(adata.uns['group_dict'].values()): 150 151 #Extract features from mth group 152 num_group_features = len(group_features) 153 154 # Sample up to n_features features- important for scalability if 155 # using large groupings 156 # Will use all features if the grouping contains fewer than n_features 157 number_features = np.min([n_features, num_group_features]) 158 group_array = np.array(list(group_features)) 159 group_features = adata.uns['seed_obj'].choice(group_array, 160 number_features, 161 replace = False) 162 163 # Create data arrays containing only features within this group 164 X_train = adata[adata.uns['train_indices'],:][:, group_features].X 165 X_test = adata[adata.uns['test_indices'],:][:, group_features].X 166 167 # Data filtering, and transformation according to given data_type 168 # Will remove low variance (< 1e5) features regardless of data_type 169 # If given data_type is 'counts' will log scale and z-score the data 170 X_train, X_test = _process_data(X_train = X_train, X_test = X_test, 171 scale_data = adata.uns['scale_data'], 172 return_dense = True) 173 174 # Extract pre-calculated sigma used for approximating kernel 175 adjusted_sigma = adata.uns['sigma'][m] 176 177 # Calculates approximate kernel according to chosen kernel function 178 # Distribution data comes from Fourier Transform of kernel function 179 if adata.uns['kernel_type'].lower() == 'gaussian': 180 181 gamma = 1/(2*adjusted_sigma**2) 182 sigma_p = 0.5*np.sqrt(2*gamma) 183 184 W = adata.uns['seed_obj'].normal(0, sigma_p, X_train.shape[1] * D) 185 W = W.reshape((X_train.shape[1]), D) 186 187 elif adata.uns['kernel_type'].lower() == 'laplacian': 188 189 gamma = 1 / (2 * adjusted_sigma) 190 191 W = adata.uns['seed_obj'].standard_cauchy(X_train.shape[1] * D) 192 W = gamma * W.reshape((X_train.shape[1], D)) 193 194 elif adata.uns['kernel_type'].lower() == 'cauchy': 195 196 gamma = 1 / (2 * adjusted_sigma ** 2) 197 b = 0.5 * np.sqrt(gamma) 198 199 W = adata.uns['seed_obj'].laplace(0, b, X_train.shape[1] * D) 200 W = W.reshape((X_train.shape[1], D)) 201 202 203 train_projection = np.matmul(X_train, W) 204 test_projection = np.matmul(X_test, W) 205 206 # Store group Z in whole-Z object. 207 # Preserves order to be able to extract meaningful groups 208 x_idx = np.arange( m * 2 * D , (m + 1) * 2 * D) 209 sq_i_d = np.sqrt(1/D) 210 211 Z_train[0:, x_idx] = sq_i_d * np.hstack((np.cos(train_projection), 212 np.sin(train_projection))) 213 Z_test[0:, x_idx] = sq_i_d * np.hstack((np.cos(test_projection), 214 np.sin(test_projection))) 215 216 adata.uns['Z_train'] = Z_train 217 adata.uns['Z_test'] = Z_test 218 219 return adata
def
calculate_z(adata, n_features=5000) -> anndata._core.anndata.AnnData:
102def calculate_z(adata, n_features = 5000) -> ad.AnnData: 103 ''' 104 Function to calculate Z matrix. 105 106 Parameters 107 ---------- 108 **adata** : *AnnData* 109 > created by `create_adata()` with `adata.uns.keys()` 110 `'sigma'`, `'train_indices'`, and `'test_indices'`. 111 `'sigma'` key can be added by running `estimate_sigma()` on 112 adata. 113 114 **n_features** : *int* 115 > Number of random feature to use when calculating Z- used for 116 scalability. 117 118 Returns 119 ------- 120 **adata** : *AnnData* 121 > adata with Z matrices accessible with `adata.uns['Z_train']` 122 and `adata.uns['Z_test']`. 123 124 Examples 125 -------- 126 >>> adata = scmkl.estimate_sigma(adata) 127 >>> adata = scmkl.calculate_z(adata) 128 >>> adata.uns.keys() 129 dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices', 130 'test_indices']) 131 ''' 132 assert np.all(adata.uns['sigma'] > 0), 'Sigma must be positive' 133 134 # Number of groupings taking from group_dict 135 n_pathway = len(adata.uns['group_dict'].keys()) 136 D = adata.uns['D'] 137 138 # Capturing training and testing indices 139 train_idx = np.array(adata.uns['train_indices'], dtype = np.int_) 140 test_idx = np.array(adata.uns['test_indices'], dtype = np.int_) 141 142 # Create Arrays to store concatenated group Z 143 # Each group of features will have a corresponding entry in each array 144 n_cols = 2 * adata.uns['D'] * n_pathway 145 146 Z_train = np.zeros((train_idx.shape[0], n_cols)) 147 Z_test = np.zeros((test_idx.shape[0], n_cols)) 148 149 # Loop over each of the groups and creating Z for each 150 for m, group_features in enumerate(adata.uns['group_dict'].values()): 151 152 #Extract features from mth group 153 num_group_features = len(group_features) 154 155 # Sample up to n_features features- important for scalability if 156 # using large groupings 157 # Will use all features if the grouping contains fewer than n_features 158 number_features = np.min([n_features, num_group_features]) 159 group_array = np.array(list(group_features)) 160 group_features = adata.uns['seed_obj'].choice(group_array, 161 number_features, 162 replace = False) 163 164 # Create data arrays containing only features within this group 165 X_train = adata[adata.uns['train_indices'],:][:, group_features].X 166 X_test = adata[adata.uns['test_indices'],:][:, group_features].X 167 168 # Data filtering, and transformation according to given data_type 169 # Will remove low variance (< 1e5) features regardless of data_type 170 # If given data_type is 'counts' will log scale and z-score the data 171 X_train, X_test = _process_data(X_train = X_train, X_test = X_test, 172 scale_data = adata.uns['scale_data'], 173 return_dense = True) 174 175 # Extract pre-calculated sigma used for approximating kernel 176 adjusted_sigma = adata.uns['sigma'][m] 177 178 # Calculates approximate kernel according to chosen kernel function 179 # Distribution data comes from Fourier Transform of kernel function 180 if adata.uns['kernel_type'].lower() == 'gaussian': 181 182 gamma = 1/(2*adjusted_sigma**2) 183 sigma_p = 0.5*np.sqrt(2*gamma) 184 185 W = adata.uns['seed_obj'].normal(0, sigma_p, X_train.shape[1] * D) 186 W = W.reshape((X_train.shape[1]), D) 187 188 elif adata.uns['kernel_type'].lower() == 'laplacian': 189 190 gamma = 1 / (2 * adjusted_sigma) 191 192 W = adata.uns['seed_obj'].standard_cauchy(X_train.shape[1] * D) 193 W = gamma * W.reshape((X_train.shape[1], D)) 194 195 elif adata.uns['kernel_type'].lower() == 'cauchy': 196 197 gamma = 1 / (2 * adjusted_sigma ** 2) 198 b = 0.5 * np.sqrt(gamma) 199 200 W = adata.uns['seed_obj'].laplace(0, b, X_train.shape[1] * D) 201 W = W.reshape((X_train.shape[1], D)) 202 203 204 train_projection = np.matmul(X_train, W) 205 test_projection = np.matmul(X_test, W) 206 207 # Store group Z in whole-Z object. 208 # Preserves order to be able to extract meaningful groups 209 x_idx = np.arange( m * 2 * D , (m + 1) * 2 * D) 210 sq_i_d = np.sqrt(1/D) 211 212 Z_train[0:, x_idx] = sq_i_d * np.hstack((np.cos(train_projection), 213 np.sin(train_projection))) 214 Z_test[0:, x_idx] = sq_i_d * np.hstack((np.cos(test_projection), 215 np.sin(test_projection))) 216 217 adata.uns['Z_train'] = Z_train 218 adata.uns['Z_test'] = Z_test 219 220 return adata
Function to calculate Z matrix.
Parameters
adata : AnnData
created by
create_adata()
withadata.uns.keys()
'sigma'
,'train_indices'
, and'test_indices'
.'sigma'
key can be added by runningestimate_sigma()
on adata.
n_features : int
Number of random feature to use when calculating Z- used for scalability.
Returns
adata : AnnData
adata with Z matrices accessible with
adata.uns['Z_train']
andadata.uns['Z_test']
.
Examples
>>> adata = scmkl.estimate_sigma(adata)
>>> adata = scmkl.calculate_z(adata)
>>> adata.uns.keys()
dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices',
'test_indices'])