scmkl.estimate_sigma

  1import scipy
  2import anndata as ad
  3import numpy as np
  4
  5from scmkl.data_processing import process_data, get_group_mat, sample_cells
  6from scmkl.tfidf_normalize import tfidf
  7
  8
  9def get_batches(sample_range: list | np.ndarray, 
 10                seed_obj: np.random._generator.Generator, 
 11                batches: int,
 12                batch_size: int) -> np.ndarray:
 13    """
 14    Gets batch indices for estimating sigma.
 15
 16    Parameters
 17    ----------
 18    sample_range : list | np.ndarray
 19        A 1D array with first element being 0 and last element being 
 20        (1 - number of samples from X_train).
 21
 22    seed_obj : np.random._generator.Generator
 23        Numpy random generator object from `adata.uns['seed_obj']`.
 24
 25    batches : int
 26        Number of batches to calculate indices for.
 27
 28    batch_size : int
 29        Number of samples in each batch.
 30
 31    Returns
 32    -------
 33    batches_idx : np.ndarray
 34        A 2D array with each row cooresponding to the sample indices 
 35        for each batch.
 36    """
 37    required_n = batches*batch_size
 38    train_n = len(sample_range)
 39    assert required_n <= train_n, (f"{required_n} cells required for "
 40                                   f"{batches} batches of {batch_size} cells; "
 41                                   f"only {train_n} cells present")
 42
 43    # Creating a mat of batch x sample indices for estimating sigma
 44    batches_idx = np.zeros((batches, batch_size), dtype = np.int16)
 45
 46    for i in range(batches):
 47        batches_idx[i] = seed_obj.choice(sample_range, 
 48                                         batch_size, 
 49                                         replace = False)
 50        
 51        # Removing selected indices from sample options
 52        rm_indices = np.isin(sample_range, batches_idx[i])
 53        sample_range = np.delete(sample_range, rm_indices)
 54
 55    return batches_idx
 56
 57
 58def batch_sigma(X_train: np.ndarray,
 59                distance_metric: str,
 60                batch_idx: np.ndarray) -> float:
 61    """
 62    Calculates the kernel width (sigma) for a feature grouping through 
 63    sample batching.
 64
 65    Parameters
 66    ----------
 67    X_train : np.ndarray
 68        A 2D numpy array with cells x features with features filtered 
 69        to features in grouping and sampled cells.
 70
 71    distance_metric: str
 72        The pairwise distance metric used to estimate sigma. Must
 73        be one of the options used in scipy.spatial.distance.cdist.
 74
 75    batch_idx: np.ndarray
 76        A 2D array with each row cooresponding to the sample indices 
 77        for each batch.    
 78
 79    Returns
 80    -------
 81    sigma : float
 82        The estimated group kernel with for Z projection before 
 83        adjustments for small kernel width or large groupings.
 84    """
 85    # Calculate Distance Matrix with specified metric
 86    n_batches = batch_idx.shape[0]
 87    batch_sigmas = np.zeros(n_batches)
 88
 89    for i in np.arange(n_batches):
 90        idx = batch_idx[i]
 91        batch_sigma = scipy.spatial.distance.pdist(X_train[idx,:], 
 92                                                   distance_metric)
 93        batch_sigmas[i] = np.mean(batch_sigma)
 94
 95    sigma = np.mean(batch_sigmas)
 96
 97    return sigma
 98
 99
100def est_group_sigma(adata: ad.AnnData,
101                    X_train: np.ndarray,
102                    n_group_features: int,
103                    n_features: int, 
104                    batch_idx: np.ndarray) -> float:
105    """
106    Processes data and calculates the kernel width (sigma) for a 
107    feature grouping through sample batching.
108
109    Parameters
110    ----------
111    X_train : np.ndarray
112        A 2D numpy array with cells x features with features filtered 
113        to features in grouping and sampled cells.
114
115    adata : anndata.AnnData
116        adata used to derive X_train containing 'seed_obj' in uns 
117        attribute.
118
119    n_group_features : int
120        Number of features in feature grouping.
121
122    n_features : int
123        Maximum number of features to be used in sigma estimation.
124
125    batch_idx
126        A 2D array with each row cooresponding to the sample indices 
127        for each batch.
128
129    Returns
130    -------
131    sigma : float
132        The estimated group kernel with for Z projection.
133
134    """   
135    if adata.uns['tfidf']:
136        X_train = tfidf(X_train, mode = 'normalize')
137
138    X_train = process_data(X_train, 
139                        scale_data = adata.uns['scale_data'], 
140                        return_dense = True)
141
142    if scipy.sparse.issparse(X_train):
143        X_train = X_train.toarray()
144        X_train = np.array(X_train, dtype = np.float32)
145
146    # Calculates mean sigma from all batches
147    sigma = batch_sigma(X_train, adata.uns['distance_metric'], batch_idx)
148
149    # sigma = 0 is numerically unusable in later steps
150    # Using such a small sigma will result in wide distribution, and 
151    # typically a non-predictive Z
152    if sigma == 0:
153        sigma += 1e-5
154
155    if n_features < n_group_features:
156        # Heuristic we calculated to account for fewer features used in 
157        # distance calculation
158        sigma = sigma * n_group_features / n_features 
159
160    return sigma
161
162
163def estimate_sigma(adata: ad.AnnData,
164                   n_features: int = 5000,
165                   batches: int = 10, 
166                   batch_size: int = 100) -> ad.AnnData:
167    """
168    Calculate kernel widths to inform distribution for projection of 
169    Fourier Features. Calculates one sigma per group of features.
170
171    Parameters
172    ----------
173    adata : ad.AnnData
174        Created by `create_adata`.
175    
176    n_features : int
177        Number of random features to include when estimating sigma. 
178        Will be scaled for the whole pathway set according to a 
179        heuristic. Used for scalability.
180
181    batches : int
182        The number of batches to use for the distance calculation.
183        This will average the result of `batches` distance calculations
184        of `batch_size` randomly sampled cells. More batches will converge
185        to population distance values at the cost of scalability.
186
187    batch_size : int
188        The number of cells to include per batch for distance
189        calculations. Higher batch size will converge to population
190        distance values at the cost of scalability.
191        If `batches` * `batch_size` > # training cells,
192        `batch_size` will be reduced to `int(num training cells / 
193        batches)`.
194        
195    Returns
196    -------
197    adata : ad.AnnData
198        Key added `adata.uns['sigma']` with grouping kernel widths.
199
200    Examples
201    --------
202    >>> adata = scmkl.estimate_sigma(adata)
203    >>> adata.uns['sigma']
204    array([10.4640895 , 10.82011454,  6.16769438,  9.86156855, ...])
205    """
206    assert batch_size <= len(adata.uns['train_indices']), ("Batch size must be "
207                                                          "smaller than the "
208                                                          "training set.")
209
210    if batch_size * batches > len(adata.uns['train_indices']):
211        old_batch_size = batch_size
212        batch_size = int(len(adata.uns['train_indices'])/batches)
213        print("Specified batch size required too many cells for "
214                "independent batches. Reduced batch size from "
215                f"{old_batch_size} to {batch_size}")
216
217    if batch_size > 2000:
218        print("Warning: Batch sizes over 2000 may "
219               "result in long run-time.")
220        
221    # Getting subsample indices
222    sample_size = np.min((2000, adata.uns['train_indices'].shape[0]))
223    indices = sample_cells(adata.uns['train_indices'], sample_size=sample_size, 
224                           seed_obj=adata.uns['seed_obj'])
225
226    # Getting batch indices
227    sample_range = np.arange(sample_size)
228    batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 
229                            batches, batch_size)
230
231    # Loop over every group in group_dict
232    sigma_array = np.zeros((len(adata.uns['group_dict'])))
233    for m, group_features in enumerate(adata.uns['group_dict'].values()):
234
235        n_group_features = len(group_features)
236
237        # Filtering to only features in grouping using filtered view of adata
238        X_train = get_group_mat(adata[indices], n_features=n_features, 
239                            group_features=group_features, 
240                            n_group_features=n_group_features)
241        
242        if adata.uns['tfidf']:
243            X_train = tfidf(X_train, mode='normalize')
244
245        # Data filtering, and transformation according to given data_type
246        # Will remove low variance (< 1e5) features regardless of data_type
247        # If scale_data will log scale and z-score the data
248        X_train = process_data(X_train=X_train,
249                               scale_data=adata.uns['scale_data'], 
250                               return_dense=True)    
251
252        # Estimating sigma
253        sigma = est_group_sigma(adata, X_train, n_group_features, 
254                                n_features, batch_idx=batch_idx)
255
256        sigma_array[m] = sigma
257    
258    adata.uns['sigma'] = sigma_array
259        
260    return adata
def get_batches( sample_range: list | numpy.ndarray, seed_obj: numpy.random._generator.Generator, batches: int, batch_size: int) -> numpy.ndarray:
10def get_batches(sample_range: list | np.ndarray, 
11                seed_obj: np.random._generator.Generator, 
12                batches: int,
13                batch_size: int) -> np.ndarray:
14    """
15    Gets batch indices for estimating sigma.
16
17    Parameters
18    ----------
19    sample_range : list | np.ndarray
20        A 1D array with first element being 0 and last element being 
21        (1 - number of samples from X_train).
22
23    seed_obj : np.random._generator.Generator
24        Numpy random generator object from `adata.uns['seed_obj']`.
25
26    batches : int
27        Number of batches to calculate indices for.
28
29    batch_size : int
30        Number of samples in each batch.
31
32    Returns
33    -------
34    batches_idx : np.ndarray
35        A 2D array with each row cooresponding to the sample indices 
36        for each batch.
37    """
38    required_n = batches*batch_size
39    train_n = len(sample_range)
40    assert required_n <= train_n, (f"{required_n} cells required for "
41                                   f"{batches} batches of {batch_size} cells; "
42                                   f"only {train_n} cells present")
43
44    # Creating a mat of batch x sample indices for estimating sigma
45    batches_idx = np.zeros((batches, batch_size), dtype = np.int16)
46
47    for i in range(batches):
48        batches_idx[i] = seed_obj.choice(sample_range, 
49                                         batch_size, 
50                                         replace = False)
51        
52        # Removing selected indices from sample options
53        rm_indices = np.isin(sample_range, batches_idx[i])
54        sample_range = np.delete(sample_range, rm_indices)
55
56    return batches_idx

Gets batch indices for estimating sigma.

Parameters
  • sample_range (list | np.ndarray): A 1D array with first element being 0 and last element being (1 - number of samples from X_train).
  • seed_obj (np.random._generator.Generator): Numpy random generator object from adata.uns['seed_obj'].
  • batches (int): Number of batches to calculate indices for.
  • batch_size (int): Number of samples in each batch.
Returns
  • batches_idx (np.ndarray): A 2D array with each row cooresponding to the sample indices for each batch.
def batch_sigma( X_train: numpy.ndarray, distance_metric: str, batch_idx: numpy.ndarray) -> float:
59def batch_sigma(X_train: np.ndarray,
60                distance_metric: str,
61                batch_idx: np.ndarray) -> float:
62    """
63    Calculates the kernel width (sigma) for a feature grouping through 
64    sample batching.
65
66    Parameters
67    ----------
68    X_train : np.ndarray
69        A 2D numpy array with cells x features with features filtered 
70        to features in grouping and sampled cells.
71
72    distance_metric: str
73        The pairwise distance metric used to estimate sigma. Must
74        be one of the options used in scipy.spatial.distance.cdist.
75
76    batch_idx: np.ndarray
77        A 2D array with each row cooresponding to the sample indices 
78        for each batch.    
79
80    Returns
81    -------
82    sigma : float
83        The estimated group kernel with for Z projection before 
84        adjustments for small kernel width or large groupings.
85    """
86    # Calculate Distance Matrix with specified metric
87    n_batches = batch_idx.shape[0]
88    batch_sigmas = np.zeros(n_batches)
89
90    for i in np.arange(n_batches):
91        idx = batch_idx[i]
92        batch_sigma = scipy.spatial.distance.pdist(X_train[idx,:], 
93                                                   distance_metric)
94        batch_sigmas[i] = np.mean(batch_sigma)
95
96    sigma = np.mean(batch_sigmas)
97
98    return sigma

Calculates the kernel width (sigma) for a feature grouping through sample batching.

Parameters
  • X_train (np.ndarray): A 2D numpy array with cells x features with features filtered to features in grouping and sampled cells.
  • distance_metric (str): The pairwise distance metric used to estimate sigma. Must be one of the options used in scipy.spatial.distance.cdist.
  • batch_idx (np.ndarray): A 2D array with each row cooresponding to the sample indices for each batch.
Returns
  • sigma (float): The estimated group kernel with for Z projection before adjustments for small kernel width or large groupings.
def est_group_sigma( adata: anndata._core.anndata.AnnData, X_train: numpy.ndarray, n_group_features: int, n_features: int, batch_idx: numpy.ndarray) -> float:
101def est_group_sigma(adata: ad.AnnData,
102                    X_train: np.ndarray,
103                    n_group_features: int,
104                    n_features: int, 
105                    batch_idx: np.ndarray) -> float:
106    """
107    Processes data and calculates the kernel width (sigma) for a 
108    feature grouping through sample batching.
109
110    Parameters
111    ----------
112    X_train : np.ndarray
113        A 2D numpy array with cells x features with features filtered 
114        to features in grouping and sampled cells.
115
116    adata : anndata.AnnData
117        adata used to derive X_train containing 'seed_obj' in uns 
118        attribute.
119
120    n_group_features : int
121        Number of features in feature grouping.
122
123    n_features : int
124        Maximum number of features to be used in sigma estimation.
125
126    batch_idx
127        A 2D array with each row cooresponding to the sample indices 
128        for each batch.
129
130    Returns
131    -------
132    sigma : float
133        The estimated group kernel with for Z projection.
134
135    """   
136    if adata.uns['tfidf']:
137        X_train = tfidf(X_train, mode = 'normalize')
138
139    X_train = process_data(X_train, 
140                        scale_data = adata.uns['scale_data'], 
141                        return_dense = True)
142
143    if scipy.sparse.issparse(X_train):
144        X_train = X_train.toarray()
145        X_train = np.array(X_train, dtype = np.float32)
146
147    # Calculates mean sigma from all batches
148    sigma = batch_sigma(X_train, adata.uns['distance_metric'], batch_idx)
149
150    # sigma = 0 is numerically unusable in later steps
151    # Using such a small sigma will result in wide distribution, and 
152    # typically a non-predictive Z
153    if sigma == 0:
154        sigma += 1e-5
155
156    if n_features < n_group_features:
157        # Heuristic we calculated to account for fewer features used in 
158        # distance calculation
159        sigma = sigma * n_group_features / n_features 
160
161    return sigma

Processes data and calculates the kernel width (sigma) for a feature grouping through sample batching.

Parameters
  • X_train (np.ndarray): A 2D numpy array with cells x features with features filtered to features in grouping and sampled cells.
  • adata (anndata.AnnData): adata used to derive X_train containing 'seed_obj' in uns attribute.
  • n_group_features (int): Number of features in feature grouping.
  • n_features (int): Maximum number of features to be used in sigma estimation.
  • batch_idx: A 2D array with each row cooresponding to the sample indices for each batch.
Returns
  • sigma (float): The estimated group kernel with for Z projection.
def estimate_sigma( adata: anndata._core.anndata.AnnData, n_features: int = 5000, batches: int = 10, batch_size: int = 100) -> anndata._core.anndata.AnnData:
164def estimate_sigma(adata: ad.AnnData,
165                   n_features: int = 5000,
166                   batches: int = 10, 
167                   batch_size: int = 100) -> ad.AnnData:
168    """
169    Calculate kernel widths to inform distribution for projection of 
170    Fourier Features. Calculates one sigma per group of features.
171
172    Parameters
173    ----------
174    adata : ad.AnnData
175        Created by `create_adata`.
176    
177    n_features : int
178        Number of random features to include when estimating sigma. 
179        Will be scaled for the whole pathway set according to a 
180        heuristic. Used for scalability.
181
182    batches : int
183        The number of batches to use for the distance calculation.
184        This will average the result of `batches` distance calculations
185        of `batch_size` randomly sampled cells. More batches will converge
186        to population distance values at the cost of scalability.
187
188    batch_size : int
189        The number of cells to include per batch for distance
190        calculations. Higher batch size will converge to population
191        distance values at the cost of scalability.
192        If `batches` * `batch_size` > # training cells,
193        `batch_size` will be reduced to `int(num training cells / 
194        batches)`.
195        
196    Returns
197    -------
198    adata : ad.AnnData
199        Key added `adata.uns['sigma']` with grouping kernel widths.
200
201    Examples
202    --------
203    >>> adata = scmkl.estimate_sigma(adata)
204    >>> adata.uns['sigma']
205    array([10.4640895 , 10.82011454,  6.16769438,  9.86156855, ...])
206    """
207    assert batch_size <= len(adata.uns['train_indices']), ("Batch size must be "
208                                                          "smaller than the "
209                                                          "training set.")
210
211    if batch_size * batches > len(adata.uns['train_indices']):
212        old_batch_size = batch_size
213        batch_size = int(len(adata.uns['train_indices'])/batches)
214        print("Specified batch size required too many cells for "
215                "independent batches. Reduced batch size from "
216                f"{old_batch_size} to {batch_size}")
217
218    if batch_size > 2000:
219        print("Warning: Batch sizes over 2000 may "
220               "result in long run-time.")
221        
222    # Getting subsample indices
223    sample_size = np.min((2000, adata.uns['train_indices'].shape[0]))
224    indices = sample_cells(adata.uns['train_indices'], sample_size=sample_size, 
225                           seed_obj=adata.uns['seed_obj'])
226
227    # Getting batch indices
228    sample_range = np.arange(sample_size)
229    batch_idx = get_batches(sample_range, adata.uns['seed_obj'], 
230                            batches, batch_size)
231
232    # Loop over every group in group_dict
233    sigma_array = np.zeros((len(adata.uns['group_dict'])))
234    for m, group_features in enumerate(adata.uns['group_dict'].values()):
235
236        n_group_features = len(group_features)
237
238        # Filtering to only features in grouping using filtered view of adata
239        X_train = get_group_mat(adata[indices], n_features=n_features, 
240                            group_features=group_features, 
241                            n_group_features=n_group_features)
242        
243        if adata.uns['tfidf']:
244            X_train = tfidf(X_train, mode='normalize')
245
246        # Data filtering, and transformation according to given data_type
247        # Will remove low variance (< 1e5) features regardless of data_type
248        # If scale_data will log scale and z-score the data
249        X_train = process_data(X_train=X_train,
250                               scale_data=adata.uns['scale_data'], 
251                               return_dense=True)    
252
253        # Estimating sigma
254        sigma = est_group_sigma(adata, X_train, n_group_features, 
255                                n_features, batch_idx=batch_idx)
256
257        sigma_array[m] = sigma
258    
259    adata.uns['sigma'] = sigma_array
260        
261    return adata

Calculate kernel widths to inform distribution for projection of Fourier Features. Calculates one sigma per group of features.

Parameters
  • adata (ad.AnnData): Created by create_adata.
  • n_features (int): Number of random features to include when estimating sigma. Will be scaled for the whole pathway set according to a heuristic. Used for scalability.
  • batches (int): The number of batches to use for the distance calculation. This will average the result of batches distance calculations of batch_size randomly sampled cells. More batches will converge to population distance values at the cost of scalability.
  • batch_size (int): The number of cells to include per batch for distance calculations. Higher batch size will converge to population distance values at the cost of scalability. If batches * batch_size > # training cells, batch_size will be reduced to int(num training cells / batches).
Returns
  • adata (ad.AnnData): Key added adata.uns['sigma'] with grouping kernel widths.
Examples
>>> adata = scmkl.estimate_sigma(adata)
>>> adata.uns['sigma']
array([10.4640895 , 10.82011454,  6.16769438,  9.86156855, ...])