scmkl.estimate_sigma

  1import scipy
  2import anndata as ad
  3import numpy as np
  4
  5from scmkl.data_processing import process_data, get_group_mat, sample_cells
  6from scmkl.tfidf_normalize import tfidf
  7
  8
  9def get_batches(sample_range: list | np.ndarray, 
 10                seed_obj: np.random._generator.Generator, 
 11                batches: int,
 12                batch_size: int) -> np.ndarray:
 13    """
 14    Gets batch indices for estimating sigma.
 15
 16    Parameters
 17    ----------
 18    sample_range : list | np.ndarray
 19        A 1D array with first element being 0 and last element being 
 20        (1 - number of samples from X_train).
 21
 22    seed_obj : np.random._generator.Generator
 23        Numpy random generator object from `adata.uns['seed_obj']`.
 24
 25    batches : int
 26        Number of batches to calculate indices for.
 27
 28    batch_size : int
 29        Number of samples in each batch.
 30
 31    Returns
 32    -------
 33    batches_idx : np.ndarray
 34        A 2D array with each row cooresponding to the sample indices 
 35        for each batch.
 36    """
 37    required_n = batches*batch_size
 38    train_n = len(sample_range)
 39    assert required_n <= train_n, (f"{required_n} cells required for "
 40                                   f"{batches} batches of {batch_size} cells; "
 41                                   f"only {train_n} cells present")
 42
 43    # Creating a mat of batch x sample indices for estimating sigma
 44    batches_idx = np.zeros((batches, batch_size), dtype = np.int16)
 45
 46    for i in range(batches):
 47        batches_idx[i] = seed_obj.choice(sample_range, 
 48                                         batch_size, 
 49                                         replace = False)
 50        
 51        # Removing selected indices from sample options
 52        rm_indices = np.isin(sample_range, batches_idx[i])
 53        sample_range = np.delete(sample_range, rm_indices)
 54
 55    return batches_idx
 56
 57
 58def batch_sigma(X_train: np.ndarray,
 59                distance_metric: str,
 60                batch_idx: np.ndarray) -> float:
 61    """
 62    Calculates the kernel width (sigma) for a feature grouping through 
 63    sample batching.
 64
 65    Parameters
 66    ----------
 67    X_train : np.ndarray
 68        A 2D numpy array with cells x features with features filtered 
 69        to features in grouping and sampled cells.
 70
 71    distance_metric: str
 72        The pairwise distance metric used to estimate sigma. Must
 73        be one of the options used in scipy.spatial.distance.cdist.
 74
 75    batch_idx: np.ndarray
 76        A 2D array with each row cooresponding to the sample indices 
 77        for each batch.    
 78
 79    Returns
 80    -------
 81    sigma : float
 82        The estimated group kernel with for Z projection before 
 83        adjustments for small kernel width or large groupings.
 84    """
 85    # Calculate Distance Matrix with specified metric
 86    n_batches = batch_idx.shape[0]
 87    batch_sigmas = np.zeros(n_batches)
 88
 89    for i in np.arange(n_batches):
 90        idx = batch_idx[i]
 91        batch_sigma = scipy.spatial.distance.pdist(X_train[idx,:], 
 92                                                   distance_metric)
 93        batch_sigmas[i] = np.mean(batch_sigma)
 94
 95    sigma = np.mean(batch_sigmas)
 96
 97    return sigma
 98
 99
100def est_group_sigma(adata: ad.AnnData,
101                    X_train: np.ndarray,
102                    n_group_features: int,
103                    n_features: int, 
104                    batch_idx: np.ndarray) -> float:
105    """
106    Processes data and calculates the kernel width (sigma) for a 
107    feature grouping through sample batching.
108
109    Parameters
110    ----------
111    X_train : np.ndarray
112        A 2D numpy array with cells x features with features filtered 
113        to features in grouping and sampled cells.
114
115    adata : anndata.AnnData
116        adata used to derive X_train containing 'seed_obj' in uns 
117        attribute.
118
119    n_group_features : int
120        Number of features in feature grouping.
121
122    n_features : int
123        Maximum number of features to be used in sigma estimation.
124
125    batch_idx
126        A 2D array with each row cooresponding to the sample indices 
127        for each batch.
128
129    Returns
130    -------
131    sigma : float
132        The estimated group kernel with for Z projection.
133
134    """   
135    if adata.uns['tfidf']:
136        X_train = tfidf(X_train, mode = 'normalize')
137
138    X_train = process_data(X_train, 
139                           scale_data=adata.uns['scale_data'], 
140                           transform_data=adata.uns['transform_data'],
141                           return_dense=True)
142
143    if scipy.sparse.issparse(X_train):
144        X_train = X_train.toarray()
145        X_train = np.array(X_train, dtype=np.float16)
146
147    # Calculates mean sigma from all batches
148    sigma = batch_sigma(X_train, adata.uns['distance_metric'], batch_idx)
149
150    # sigma = 0 is numerically unusable in later steps
151    # Using such a small sigma will result in wide distribution, and 
152    # typically a non-predictive Z
153    if sigma < 1e-2:
154        sigma = 1e-2
155
156    if n_features < n_group_features:
157        # Heuristic we calculated to account for fewer features used in 
158        # distance calculation
159        sigma = sigma * n_group_features / n_features 
160
161    return sigma
162
163
164def estimate_sigma(adata: ad.AnnData,
165                   n_features: int = 5000,
166                   batches: int = 10, 
167                   batch_size: int = 100) -> ad.AnnData:
168    """
169    Calculate kernel widths to inform distribution for projection of 
170    Fourier Features. Calculates one sigma per group of features.
171
172    Parameters
173    ----------
174    adata : ad.AnnData
175        Created by `create_adata`.
176    
177    n_features : int
178        Number of random features to include when estimating sigma. 
179        Will be scaled for the whole pathway set according to a 
180        heuristic. Used for scalability.
181
182    batches : int
183        The number of batches to use for the distance calculation.
184        This will average the result of `batches` distance calculations
185        of `batch_size` randomly sampled cells. More batches will converge
186        to population distance values at the cost of scalability.
187
188    batch_size : int
189        The number of cells to include per batch for distance
190        calculations. Higher batch size will converge to population
191        distance values at the cost of scalability.
192        If `batches` * `batch_size` > # training cells,
193        `batch_size` will be reduced to `int(num training cells / 
194        batches)`.
195        
196    Returns
197    -------
198    adata : ad.AnnData
199        Key added `adata.uns['sigma']` with grouping kernel widths.
200
201    Examples
202    --------
203    >>> adata = scmkl.estimate_sigma(adata)
204    >>> adata.uns['sigma']
205    array([10.4640895 , 10.82011454,  6.16769438,  9.86156855, ...])
206    """
207    assert batch_size <= len(adata.uns['train_indices']), ("Batch size must be "
208                                                          "smaller than the "
209                                                          "training set.")
210
211    if batch_size * batches > len(adata.uns['train_indices']):
212        old_batch_size = batch_size
213        batch_size = int(len(adata.uns['train_indices'])/batches)
214        print("Specified batch size required too many cells for "
215                "independent batches. Reduced batch size from "
216                f"{old_batch_size} to {batch_size}", flush=True)
217
218    if batch_size > 2000:
219        print("Warning: Batch sizes over 2000 may "
220               "result in long run-time.", flush=True)
221        
222    # Getting subsample indices
223    sample_size = np.min((2000, adata.uns['train_indices'].shape[0]))
224    indices = sample_cells(adata.uns['train_indices'], sample_size=sample_size, 
225                           seed_obj=adata.uns['seed_obj'])
226
227    # Getting batch indices
228    sample_range = np.arange(sample_size)
229    batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 
230                            batches, batch_size)
231
232    # Loop over every group in group_dict
233    sigma_array = np.zeros((len(adata.uns['group_dict'])))
234    for m, group_features in enumerate(adata.uns['group_dict'].values()):
235
236        n_group_features = len(group_features)
237
238        # Filtering to only features in grouping using filtered view of adata
239        X_train = get_group_mat(adata[indices], n_features=n_features, 
240                            group_features=group_features, 
241                            n_group_features=n_group_features)
242        
243        if adata.uns['tfidf']:
244            X_train = tfidf(X_train, mode='normalize')
245
246        # Data filtering, and transformation according to given data_type
247        # Will remove low variance (< 1e5) features regardless of data_type
248        # If scale_data will log scale and z-score the data
249        X_train = process_data(X_train=X_train,
250                               scale_data=adata.uns['scale_data'], 
251                               transform_data=adata.uns['transform_data'],
252                               return_dense=True)    
253
254        # Estimating sigma
255        sigma = est_group_sigma(adata, X_train, n_group_features, 
256                                n_features, batch_idx=batch_idx)
257
258        sigma_array[m] = sigma
259    
260    adata.uns['sigma'] = sigma_array
261        
262    return adata
def get_batches( sample_range: list | numpy.ndarray, seed_obj: numpy.random._generator.Generator, batches: int, batch_size: int) -> numpy.ndarray:
10def get_batches(sample_range: list | np.ndarray, 
11                seed_obj: np.random._generator.Generator, 
12                batches: int,
13                batch_size: int) -> np.ndarray:
14    """
15    Gets batch indices for estimating sigma.
16
17    Parameters
18    ----------
19    sample_range : list | np.ndarray
20        A 1D array with first element being 0 and last element being 
21        (1 - number of samples from X_train).
22
23    seed_obj : np.random._generator.Generator
24        Numpy random generator object from `adata.uns['seed_obj']`.
25
26    batches : int
27        Number of batches to calculate indices for.
28
29    batch_size : int
30        Number of samples in each batch.
31
32    Returns
33    -------
34    batches_idx : np.ndarray
35        A 2D array with each row cooresponding to the sample indices 
36        for each batch.
37    """
38    required_n = batches*batch_size
39    train_n = len(sample_range)
40    assert required_n <= train_n, (f"{required_n} cells required for "
41                                   f"{batches} batches of {batch_size} cells; "
42                                   f"only {train_n} cells present")
43
44    # Creating a mat of batch x sample indices for estimating sigma
45    batches_idx = np.zeros((batches, batch_size), dtype = np.int16)
46
47    for i in range(batches):
48        batches_idx[i] = seed_obj.choice(sample_range, 
49                                         batch_size, 
50                                         replace = False)
51        
52        # Removing selected indices from sample options
53        rm_indices = np.isin(sample_range, batches_idx[i])
54        sample_range = np.delete(sample_range, rm_indices)
55
56    return batches_idx

Gets batch indices for estimating sigma.

Parameters
  • sample_range (list | np.ndarray): A 1D array with first element being 0 and last element being (1 - number of samples from X_train).
  • seed_obj (np.random._generator.Generator): Numpy random generator object from adata.uns['seed_obj'].
  • batches (int): Number of batches to calculate indices for.
  • batch_size (int): Number of samples in each batch.
Returns
  • batches_idx (np.ndarray): A 2D array with each row cooresponding to the sample indices for each batch.
def batch_sigma( X_train: numpy.ndarray, distance_metric: str, batch_idx: numpy.ndarray) -> float:
59def batch_sigma(X_train: np.ndarray,
60                distance_metric: str,
61                batch_idx: np.ndarray) -> float:
62    """
63    Calculates the kernel width (sigma) for a feature grouping through 
64    sample batching.
65
66    Parameters
67    ----------
68    X_train : np.ndarray
69        A 2D numpy array with cells x features with features filtered 
70        to features in grouping and sampled cells.
71
72    distance_metric: str
73        The pairwise distance metric used to estimate sigma. Must
74        be one of the options used in scipy.spatial.distance.cdist.
75
76    batch_idx: np.ndarray
77        A 2D array with each row cooresponding to the sample indices 
78        for each batch.    
79
80    Returns
81    -------
82    sigma : float
83        The estimated group kernel with for Z projection before 
84        adjustments for small kernel width or large groupings.
85    """
86    # Calculate Distance Matrix with specified metric
87    n_batches = batch_idx.shape[0]
88    batch_sigmas = np.zeros(n_batches)
89
90    for i in np.arange(n_batches):
91        idx = batch_idx[i]
92        batch_sigma = scipy.spatial.distance.pdist(X_train[idx,:], 
93                                                   distance_metric)
94        batch_sigmas[i] = np.mean(batch_sigma)
95
96    sigma = np.mean(batch_sigmas)
97
98    return sigma

Calculates the kernel width (sigma) for a feature grouping through sample batching.

Parameters
  • X_train (np.ndarray): A 2D numpy array with cells x features with features filtered to features in grouping and sampled cells.
  • distance_metric (str): The pairwise distance metric used to estimate sigma. Must be one of the options used in scipy.spatial.distance.cdist.
  • batch_idx (np.ndarray): A 2D array with each row cooresponding to the sample indices for each batch.
Returns
  • sigma (float): The estimated group kernel with for Z projection before adjustments for small kernel width or large groupings.
def est_group_sigma( adata: anndata._core.anndata.AnnData, X_train: numpy.ndarray, n_group_features: int, n_features: int, batch_idx: numpy.ndarray) -> float:
101def est_group_sigma(adata: ad.AnnData,
102                    X_train: np.ndarray,
103                    n_group_features: int,
104                    n_features: int, 
105                    batch_idx: np.ndarray) -> float:
106    """
107    Processes data and calculates the kernel width (sigma) for a 
108    feature grouping through sample batching.
109
110    Parameters
111    ----------
112    X_train : np.ndarray
113        A 2D numpy array with cells x features with features filtered 
114        to features in grouping and sampled cells.
115
116    adata : anndata.AnnData
117        adata used to derive X_train containing 'seed_obj' in uns 
118        attribute.
119
120    n_group_features : int
121        Number of features in feature grouping.
122
123    n_features : int
124        Maximum number of features to be used in sigma estimation.
125
126    batch_idx
127        A 2D array with each row cooresponding to the sample indices 
128        for each batch.
129
130    Returns
131    -------
132    sigma : float
133        The estimated group kernel with for Z projection.
134
135    """   
136    if adata.uns['tfidf']:
137        X_train = tfidf(X_train, mode = 'normalize')
138
139    X_train = process_data(X_train, 
140                           scale_data=adata.uns['scale_data'], 
141                           transform_data=adata.uns['transform_data'],
142                           return_dense=True)
143
144    if scipy.sparse.issparse(X_train):
145        X_train = X_train.toarray()
146        X_train = np.array(X_train, dtype=np.float16)
147
148    # Calculates mean sigma from all batches
149    sigma = batch_sigma(X_train, adata.uns['distance_metric'], batch_idx)
150
151    # sigma = 0 is numerically unusable in later steps
152    # Using such a small sigma will result in wide distribution, and 
153    # typically a non-predictive Z
154    if sigma < 1e-2:
155        sigma = 1e-2
156
157    if n_features < n_group_features:
158        # Heuristic we calculated to account for fewer features used in 
159        # distance calculation
160        sigma = sigma * n_group_features / n_features 
161
162    return sigma

Processes data and calculates the kernel width (sigma) for a feature grouping through sample batching.

Parameters
  • X_train (np.ndarray): A 2D numpy array with cells x features with features filtered to features in grouping and sampled cells.
  • adata (anndata.AnnData): adata used to derive X_train containing 'seed_obj' in uns attribute.
  • n_group_features (int): Number of features in feature grouping.
  • n_features (int): Maximum number of features to be used in sigma estimation.
  • batch_idx: A 2D array with each row cooresponding to the sample indices for each batch.
Returns
  • sigma (float): The estimated group kernel with for Z projection.
def estimate_sigma( adata: anndata._core.anndata.AnnData, n_features: int = 5000, batches: int = 10, batch_size: int = 100) -> anndata._core.anndata.AnnData:
165def estimate_sigma(adata: ad.AnnData,
166                   n_features: int = 5000,
167                   batches: int = 10, 
168                   batch_size: int = 100) -> ad.AnnData:
169    """
170    Calculate kernel widths to inform distribution for projection of 
171    Fourier Features. Calculates one sigma per group of features.
172
173    Parameters
174    ----------
175    adata : ad.AnnData
176        Created by `create_adata`.
177    
178    n_features : int
179        Number of random features to include when estimating sigma. 
180        Will be scaled for the whole pathway set according to a 
181        heuristic. Used for scalability.
182
183    batches : int
184        The number of batches to use for the distance calculation.
185        This will average the result of `batches` distance calculations
186        of `batch_size` randomly sampled cells. More batches will converge
187        to population distance values at the cost of scalability.
188
189    batch_size : int
190        The number of cells to include per batch for distance
191        calculations. Higher batch size will converge to population
192        distance values at the cost of scalability.
193        If `batches` * `batch_size` > # training cells,
194        `batch_size` will be reduced to `int(num training cells / 
195        batches)`.
196        
197    Returns
198    -------
199    adata : ad.AnnData
200        Key added `adata.uns['sigma']` with grouping kernel widths.
201
202    Examples
203    --------
204    >>> adata = scmkl.estimate_sigma(adata)
205    >>> adata.uns['sigma']
206    array([10.4640895 , 10.82011454,  6.16769438,  9.86156855, ...])
207    """
208    assert batch_size <= len(adata.uns['train_indices']), ("Batch size must be "
209                                                          "smaller than the "
210                                                          "training set.")
211
212    if batch_size * batches > len(adata.uns['train_indices']):
213        old_batch_size = batch_size
214        batch_size = int(len(adata.uns['train_indices'])/batches)
215        print("Specified batch size required too many cells for "
216                "independent batches. Reduced batch size from "
217                f"{old_batch_size} to {batch_size}", flush=True)
218
219    if batch_size > 2000:
220        print("Warning: Batch sizes over 2000 may "
221               "result in long run-time.", flush=True)
222        
223    # Getting subsample indices
224    sample_size = np.min((2000, adata.uns['train_indices'].shape[0]))
225    indices = sample_cells(adata.uns['train_indices'], sample_size=sample_size, 
226                           seed_obj=adata.uns['seed_obj'])
227
228    # Getting batch indices
229    sample_range = np.arange(sample_size)
230    batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 
231                            batches, batch_size)
232
233    # Loop over every group in group_dict
234    sigma_array = np.zeros((len(adata.uns['group_dict'])))
235    for m, group_features in enumerate(adata.uns['group_dict'].values()):
236
237        n_group_features = len(group_features)
238
239        # Filtering to only features in grouping using filtered view of adata
240        X_train = get_group_mat(adata[indices], n_features=n_features, 
241                            group_features=group_features, 
242                            n_group_features=n_group_features)
243        
244        if adata.uns['tfidf']:
245            X_train = tfidf(X_train, mode='normalize')
246
247        # Data filtering, and transformation according to given data_type
248        # Will remove low variance (< 1e5) features regardless of data_type
249        # If scale_data will log scale and z-score the data
250        X_train = process_data(X_train=X_train,
251                               scale_data=adata.uns['scale_data'], 
252                               transform_data=adata.uns['transform_data'],
253                               return_dense=True)    
254
255        # Estimating sigma
256        sigma = est_group_sigma(adata, X_train, n_group_features, 
257                                n_features, batch_idx=batch_idx)
258
259        sigma_array[m] = sigma
260    
261    adata.uns['sigma'] = sigma_array
262        
263    return adata

Calculate kernel widths to inform distribution for projection of Fourier Features. Calculates one sigma per group of features.

Parameters
  • adata (ad.AnnData): Created by create_adata.
  • n_features (int): Number of random features to include when estimating sigma. Will be scaled for the whole pathway set according to a heuristic. Used for scalability.
  • batches (int): The number of batches to use for the distance calculation. This will average the result of batches distance calculations of batch_size randomly sampled cells. More batches will converge to population distance values at the cost of scalability.
  • batch_size (int): The number of cells to include per batch for distance calculations. Higher batch size will converge to population distance values at the cost of scalability. If batches * batch_size > # training cells, batch_size will be reduced to int(num training cells / batches).
Returns
  • adata (ad.AnnData): Key added adata.uns['sigma'] with grouping kernel widths.
Examples
>>> adata = scmkl.estimate_sigma(adata)
>>> adata.uns['sigma']
array([10.4640895 , 10.82011454,  6.16769438,  9.86156855, ...])