scmkl.calculate_z

  1import numpy as np
  2import scipy
  3import anndata as ad
  4
  5from scmkl.tfidf_normalize import tfidf_train_test
  6from scmkl.estimate_sigma import est_group_sigma, get_batches
  7from scmkl.data_processing import process_data, get_group_mat, sample_cells
  8from scmkl.projections import gaussian_trans, laplacian_trans, cauchy_trans
  9
 10
 11def check_for_nan(adata: ad.AnnData):
 12    """
 13    Ensures only valid values are in training and test matrices.
 14
 15    Parameters
 16    ----------
 17    adata : ad.AnnData
 18        Object with `'Z_train'` and `'Z_test'` keys in `.uns` 
 19        attribute.
 20
 21    Returns
 22    -------
 23    None
 24    """
 25    n_nans = np.sum(np.isnan(adata.uns['Z_train']))
 26    n_nans += np.sum(np.isnan(adata.uns['Z_test']))
 27
 28    if n_nans:
 29        raise ValueError(
 30            "Some values in Z matrix are type `np.nan`. This is likely "
 31            "due to a small kernel width or invalid values in input Z matrix."
 32            )
 33    
 34    return None
 35
 36
 37def check_for_inf(adata: ad.AnnData):
 38    """
 39    Ensures only valid values are in training and test matrices.
 40
 41    Parameters
 42    ----------
 43    adata : ad.AnnData
 44        Object with `'Z_train'` and `'Z_test'` keys in `.uns` 
 45        attribute.
 46
 47    Returns
 48    -------
 49    None
 50    """
 51    n_infs = np.sum(np.isinf(adata.uns['Z_train']))
 52    n_infs += np.sum(np.isinf(adata.uns['Z_test']))
 53
 54    if n_infs:
 55        raise ValueError(
 56            "Some values in Z matrix are type `np.inf`. This is likely "
 57            "due to input matrix containing negative values."
 58            )
 59    
 60    return None
 61
 62
 63def get_z_indices(m, D):
 64    """
 65    Takes the number associated with the group as `m` and returns the 
 66    indices for cos and sin functions to be applied.
 67
 68    Parameters
 69    ----------
 70    m : int
 71        The chronological number of the group being processed.
 72
 73    D : int
 74        The number of dimensions per group.
 75
 76    Returns
 77    -------
 78    cos_idx, sin_idx : np.ndarray, np.ndarray
 79        The indices for cos and sin projections in overall Z matrix.
 80    """
 81    x_idx = np.arange(m*2*D ,(m + 1)*2*D)
 82    cos_idx = x_idx[:len(x_idx)//2]
 83    sin_idx = x_idx[len(x_idx)//2:]
 84
 85    return cos_idx, sin_idx
 86
 87
 88def calc_groupz(X_train, X_test, adata, D, sigma, proj_func):
 89    """
 90    Calculates the Z matrix for grouping.
 91
 92    Parameters
 93    ----------
 94    X_train : np.ndarray
 95        The filtered data matrix to calculate train Z mat for.
 96    
 97    X_test : np.ndarray
 98        The filtered data matrix to calculate test Z mat for.
 99
100    adata : anndata.AnnData 
101        AnnData object containing `seed_obj` in `.uns` attribute.
102
103    D : int
104        Number of dimensions per grouping.
105
106    sigma : float
107        Kernel width for grouping.
108
109    proj_func : function
110        The projection direction function to be applied to data.
111
112    Returns
113    -------
114    train_projections, test_projections : np.ndarray, np.ndarray
115        Training and testing Z matrices for group.
116    """  
117    if scipy.sparse.issparse(X_train):
118        X_train = X_train.toarray().astype(np.float16)
119        X_test = X_test.toarray().astype(np.float16)
120
121    W = proj_func(X_train, sigma, adata.uns['seed_obj'], D)
122    
123    train_projection = np.matmul(X_train, W, dtype=np.float16)
124    test_projection = np.matmul(X_test, W, dtype=np.float16)
125
126    return train_projection, test_projection
127
128
129def calculate_z(adata, n_features=5000, batches=10, 
130                batch_size=100) -> ad.AnnData:
131    """
132    Function to calculate Z matrices for all groups in both training 
133    and testing data.
134
135    Parameters
136    ----------
137    adata : ad.AnnData
138        created by `scmkl.create_adata()` with `adata.uns.keys()`: 
139        `'train_indices'`, and `'test_indices'`. 
140
141    n_features : int
142        Number of random feature to use when calculating Z; used for 
143        scalability.
144
145    batches : int
146        The number of batches to use for the distance calculation.
147        This will average the result of `batches` distance calculations
148        of `batch_size` randomly sampled cells. More batches will converge
149        to population distance values at the cost of scalability.
150
151    batch_size : int
152        The number of cells to include per batch for distance
153        calculations. Higher batch size will converge to population
154        distance values at the cost of scalability.
155        If `batches*batch_size > num_training_cells`,
156        `batch_size` will be reduced to 
157        `int(num_training_cells / batches)`.
158
159    Returns
160    -------
161    adata : ad.AnnData
162        `adata` with Z matrices accessible with `adata.uns['Z_train']` 
163        and `adata.uns['Z_test']`.
164
165    Examples
166    --------
167    >>> adata = scmkl.estimate_sigma(adata)
168    >>> adata = scmkl.calculate_z(adata)
169    >>> adata.uns.keys()
170    dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices', 
171    'test_indices'])
172    """
173    # Number of groupings taking from group_dict
174    n_pathway = len(adata.uns['group_dict'].keys())
175    D = adata.uns['D']
176
177    sq_i_d = np.sqrt(1/D)
178
179    # Capturing training and testing sizes
180    train_len = len(adata.uns['train_indices'])
181    test_len = len(adata.uns['test_indices'])
182
183    if batch_size * batches > train_len:
184        old_batch_size = batch_size
185        batch_size = int(train_len/batches)
186        print("Specified batch size required too many cells for "
187                "independent batches. Reduced batch size from "
188                f"{old_batch_size} to {batch_size}")
189
190    if 'sigma' not in adata.uns.keys():
191        n_samples = np.min((2000, train_len))
192        sample_range = np.arange(n_samples)
193        batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 
194                                batches=batches, batch_size=batch_size)
195        sigma_indices = sample_cells(adata.uns['train_indices'], n_samples, adata.uns['seed_obj'])
196
197    # Create Arrays to store concatenated group Zs
198    # Each group of features will have a corresponding entry in each array
199    n_cols = 2*adata.uns['D']*n_pathway
200    Z_train = np.zeros((train_len, n_cols), dtype=np.float16)
201    Z_test = np.zeros((test_len, n_cols), dtype=np.float16)
202
203
204    # Setting kernel function 
205    match adata.uns['kernel_type'].lower():
206        case 'gaussian':
207            proj_func = gaussian_trans
208        case 'laplacian':
209            proj_func = laplacian_trans
210        case 'cauchy':
211            proj_func = cauchy_trans
212
213
214    # Loop over each of the groups and creating Z for each
215    sigma_list = list()
216    for m, group_features in enumerate(adata.uns['group_dict'].values()):
217
218        n_group_features = len(group_features)
219
220        X_train, X_test = get_group_mat(adata, n_features, group_features, 
221                                        n_group_features, process_test=True)
222        
223        if adata.uns['tfidf']:
224            X_train, X_test = tfidf_train_test(X_train, X_test)
225
226        # Data filtering, and transformation according to given data_type
227        # Will remove low variance (< 1e5) features regardless of data_type
228        # If scale/transform data depending on .uns values
229        X_train, X_test = process_data(X_train=X_train, X_test=X_test, 
230                                       scale_data=adata.uns['scale_data'], 
231                                       transform_data=adata.uns['transform_data'],
232                                       return_dense=True)    
233
234        # Getting sigma
235        if 'sigma' in adata.uns.keys():
236            sigma = adata.uns['sigma'][m]
237        else:
238            sigma = est_group_sigma(adata, X_train, n_group_features, 
239                                    n_features, batch_idx=batch_idx)
240            sigma_list.append(sigma)
241            
242        assert sigma > 0, "Sigma must be more than 0"
243        train_projection, test_projection = calc_groupz(X_train, X_test, 
244                                                        adata, D, sigma, 
245                                                        proj_func)
246
247        # Store group Z in whole-Z object
248        # Preserves order to be able to extract meaningful groups
249        cos_idx, sin_idx = get_z_indices(m, D)
250
251        Z_train[0:, cos_idx] = np.cos(train_projection, dtype=np.float16)
252        Z_train[0:, sin_idx] = np.sin(train_projection, dtype=np.float16)
253
254        Z_test[0:, cos_idx] = np.cos(test_projection, dtype=np.float16)
255        Z_test[0:, sin_idx] = np.sin(test_projection, dtype=np.float16)
256
257    adata.uns['Z_train'] = Z_train*sq_i_d
258    adata.uns['Z_test'] = Z_test*sq_i_d
259
260    if 'sigma' not in adata.uns.keys():
261        adata.uns['sigma'] = np.array(sigma_list)
262
263    check_for_nan(adata)
264    check_for_inf(adata)
265
266    return adata
def check_for_nan(adata: anndata._core.anndata.AnnData):
12def check_for_nan(adata: ad.AnnData):
13    """
14    Ensures only valid values are in training and test matrices.
15
16    Parameters
17    ----------
18    adata : ad.AnnData
19        Object with `'Z_train'` and `'Z_test'` keys in `.uns` 
20        attribute.
21
22    Returns
23    -------
24    None
25    """
26    n_nans = np.sum(np.isnan(adata.uns['Z_train']))
27    n_nans += np.sum(np.isnan(adata.uns['Z_test']))
28
29    if n_nans:
30        raise ValueError(
31            "Some values in Z matrix are type `np.nan`. This is likely "
32            "due to a small kernel width or invalid values in input Z matrix."
33            )
34    
35    return None

Ensures only valid values are in training and test matrices.

Parameters
  • adata (ad.AnnData): Object with 'Z_train' and 'Z_test' keys in .uns attribute.
Returns
  • None
def check_for_inf(adata: anndata._core.anndata.AnnData):
38def check_for_inf(adata: ad.AnnData):
39    """
40    Ensures only valid values are in training and test matrices.
41
42    Parameters
43    ----------
44    adata : ad.AnnData
45        Object with `'Z_train'` and `'Z_test'` keys in `.uns` 
46        attribute.
47
48    Returns
49    -------
50    None
51    """
52    n_infs = np.sum(np.isinf(adata.uns['Z_train']))
53    n_infs += np.sum(np.isinf(adata.uns['Z_test']))
54
55    if n_infs:
56        raise ValueError(
57            "Some values in Z matrix are type `np.inf`. This is likely "
58            "due to input matrix containing negative values."
59            )
60    
61    return None

Ensures only valid values are in training and test matrices.

Parameters
  • adata (ad.AnnData): Object with 'Z_train' and 'Z_test' keys in .uns attribute.
Returns
  • None
def get_z_indices(m, D):
64def get_z_indices(m, D):
65    """
66    Takes the number associated with the group as `m` and returns the 
67    indices for cos and sin functions to be applied.
68
69    Parameters
70    ----------
71    m : int
72        The chronological number of the group being processed.
73
74    D : int
75        The number of dimensions per group.
76
77    Returns
78    -------
79    cos_idx, sin_idx : np.ndarray, np.ndarray
80        The indices for cos and sin projections in overall Z matrix.
81    """
82    x_idx = np.arange(m*2*D ,(m + 1)*2*D)
83    cos_idx = x_idx[:len(x_idx)//2]
84    sin_idx = x_idx[len(x_idx)//2:]
85
86    return cos_idx, sin_idx

Takes the number associated with the group as m and returns the indices for cos and sin functions to be applied.

Parameters
  • m (int): The chronological number of the group being processed.
  • D (int): The number of dimensions per group.
Returns
  • cos_idx, sin_idx (np.ndarray, np.ndarray): The indices for cos and sin projections in overall Z matrix.
def calc_groupz(X_train, X_test, adata, D, sigma, proj_func):
 89def calc_groupz(X_train, X_test, adata, D, sigma, proj_func):
 90    """
 91    Calculates the Z matrix for grouping.
 92
 93    Parameters
 94    ----------
 95    X_train : np.ndarray
 96        The filtered data matrix to calculate train Z mat for.
 97    
 98    X_test : np.ndarray
 99        The filtered data matrix to calculate test Z mat for.
100
101    adata : anndata.AnnData 
102        AnnData object containing `seed_obj` in `.uns` attribute.
103
104    D : int
105        Number of dimensions per grouping.
106
107    sigma : float
108        Kernel width for grouping.
109
110    proj_func : function
111        The projection direction function to be applied to data.
112
113    Returns
114    -------
115    train_projections, test_projections : np.ndarray, np.ndarray
116        Training and testing Z matrices for group.
117    """  
118    if scipy.sparse.issparse(X_train):
119        X_train = X_train.toarray().astype(np.float16)
120        X_test = X_test.toarray().astype(np.float16)
121
122    W = proj_func(X_train, sigma, adata.uns['seed_obj'], D)
123    
124    train_projection = np.matmul(X_train, W, dtype=np.float16)
125    test_projection = np.matmul(X_test, W, dtype=np.float16)
126
127    return train_projection, test_projection

Calculates the Z matrix for grouping.

Parameters
  • X_train (np.ndarray): The filtered data matrix to calculate train Z mat for.
  • X_test (np.ndarray): The filtered data matrix to calculate test Z mat for.
  • adata (anndata.AnnData): AnnData object containing seed_obj in .uns attribute.
  • D (int): Number of dimensions per grouping.
  • sigma (float): Kernel width for grouping.
  • proj_func (function): The projection direction function to be applied to data.
Returns
  • train_projections, test_projections (np.ndarray, np.ndarray): Training and testing Z matrices for group.
def calculate_z( adata, n_features=5000, batches=10, batch_size=100) -> anndata._core.anndata.AnnData:
130def calculate_z(adata, n_features=5000, batches=10, 
131                batch_size=100) -> ad.AnnData:
132    """
133    Function to calculate Z matrices for all groups in both training 
134    and testing data.
135
136    Parameters
137    ----------
138    adata : ad.AnnData
139        created by `scmkl.create_adata()` with `adata.uns.keys()`: 
140        `'train_indices'`, and `'test_indices'`. 
141
142    n_features : int
143        Number of random feature to use when calculating Z; used for 
144        scalability.
145
146    batches : int
147        The number of batches to use for the distance calculation.
148        This will average the result of `batches` distance calculations
149        of `batch_size` randomly sampled cells. More batches will converge
150        to population distance values at the cost of scalability.
151
152    batch_size : int
153        The number of cells to include per batch for distance
154        calculations. Higher batch size will converge to population
155        distance values at the cost of scalability.
156        If `batches*batch_size > num_training_cells`,
157        `batch_size` will be reduced to 
158        `int(num_training_cells / batches)`.
159
160    Returns
161    -------
162    adata : ad.AnnData
163        `adata` with Z matrices accessible with `adata.uns['Z_train']` 
164        and `adata.uns['Z_test']`.
165
166    Examples
167    --------
168    >>> adata = scmkl.estimate_sigma(adata)
169    >>> adata = scmkl.calculate_z(adata)
170    >>> adata.uns.keys()
171    dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices', 
172    'test_indices'])
173    """
174    # Number of groupings taking from group_dict
175    n_pathway = len(adata.uns['group_dict'].keys())
176    D = adata.uns['D']
177
178    sq_i_d = np.sqrt(1/D)
179
180    # Capturing training and testing sizes
181    train_len = len(adata.uns['train_indices'])
182    test_len = len(adata.uns['test_indices'])
183
184    if batch_size * batches > train_len:
185        old_batch_size = batch_size
186        batch_size = int(train_len/batches)
187        print("Specified batch size required too many cells for "
188                "independent batches. Reduced batch size from "
189                f"{old_batch_size} to {batch_size}")
190
191    if 'sigma' not in adata.uns.keys():
192        n_samples = np.min((2000, train_len))
193        sample_range = np.arange(n_samples)
194        batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 
195                                batches=batches, batch_size=batch_size)
196        sigma_indices = sample_cells(adata.uns['train_indices'], n_samples, adata.uns['seed_obj'])
197
198    # Create Arrays to store concatenated group Zs
199    # Each group of features will have a corresponding entry in each array
200    n_cols = 2*adata.uns['D']*n_pathway
201    Z_train = np.zeros((train_len, n_cols), dtype=np.float16)
202    Z_test = np.zeros((test_len, n_cols), dtype=np.float16)
203
204
205    # Setting kernel function 
206    match adata.uns['kernel_type'].lower():
207        case 'gaussian':
208            proj_func = gaussian_trans
209        case 'laplacian':
210            proj_func = laplacian_trans
211        case 'cauchy':
212            proj_func = cauchy_trans
213
214
215    # Loop over each of the groups and creating Z for each
216    sigma_list = list()
217    for m, group_features in enumerate(adata.uns['group_dict'].values()):
218
219        n_group_features = len(group_features)
220
221        X_train, X_test = get_group_mat(adata, n_features, group_features, 
222                                        n_group_features, process_test=True)
223        
224        if adata.uns['tfidf']:
225            X_train, X_test = tfidf_train_test(X_train, X_test)
226
227        # Data filtering, and transformation according to given data_type
228        # Will remove low variance (< 1e5) features regardless of data_type
229        # If scale/transform data depending on .uns values
230        X_train, X_test = process_data(X_train=X_train, X_test=X_test, 
231                                       scale_data=adata.uns['scale_data'], 
232                                       transform_data=adata.uns['transform_data'],
233                                       return_dense=True)    
234
235        # Getting sigma
236        if 'sigma' in adata.uns.keys():
237            sigma = adata.uns['sigma'][m]
238        else:
239            sigma = est_group_sigma(adata, X_train, n_group_features, 
240                                    n_features, batch_idx=batch_idx)
241            sigma_list.append(sigma)
242            
243        assert sigma > 0, "Sigma must be more than 0"
244        train_projection, test_projection = calc_groupz(X_train, X_test, 
245                                                        adata, D, sigma, 
246                                                        proj_func)
247
248        # Store group Z in whole-Z object
249        # Preserves order to be able to extract meaningful groups
250        cos_idx, sin_idx = get_z_indices(m, D)
251
252        Z_train[0:, cos_idx] = np.cos(train_projection, dtype=np.float16)
253        Z_train[0:, sin_idx] = np.sin(train_projection, dtype=np.float16)
254
255        Z_test[0:, cos_idx] = np.cos(test_projection, dtype=np.float16)
256        Z_test[0:, sin_idx] = np.sin(test_projection, dtype=np.float16)
257
258    adata.uns['Z_train'] = Z_train*sq_i_d
259    adata.uns['Z_test'] = Z_test*sq_i_d
260
261    if 'sigma' not in adata.uns.keys():
262        adata.uns['sigma'] = np.array(sigma_list)
263
264    check_for_nan(adata)
265    check_for_inf(adata)
266
267    return adata

Function to calculate Z matrices for all groups in both training and testing data.

Parameters
  • adata (ad.AnnData): created by scmkl.create_adata with adata.uns.keys(): 'train_indices', and 'test_indices'.
  • n_features (int): Number of random feature to use when calculating Z; used for scalability.
  • batches (int): The number of batches to use for the distance calculation. This will average the result of batches distance calculations of batch_size randomly sampled cells. More batches will converge to population distance values at the cost of scalability.
  • batch_size (int): The number of cells to include per batch for distance calculations. Higher batch size will converge to population distance values at the cost of scalability. If batches*batch_size > num_training_cells, batch_size will be reduced to int(num_training_cells / batches).
Returns
  • adata (ad.AnnData): adata with Z matrices accessible with adata.uns['Z_train'] and adata.uns['Z_test'].
Examples
>>> adata = scmkl.estimate_sigma(adata)
>>> adata = scmkl.calculate_z(adata)
>>> adata.uns.keys()
dict_keys(['Z_train', 'Z_test', 'sigmas', 'train_indices', 
'test_indices'])