import numpy as np, networkx as nx, matplotlib.pyplot as plt, seaborn as sns, itertools
from tabulate import tabulate
from scipy.stats import chi2, t
from scipy.sparse import csgraph
from scipy.linalg import block_diag
from scipy.sparse.linalg import eigsh
from scipy.linalg import eigh
from sklearn.cluster import KMeans
[docs]class core:
"""
"""
####### Resampled statistics #######
[docs] @staticmethod
def drobust_test(Z, mu, alpha=0.05, beta=0.01, R=None, L=1000, seed=None):
"""Returns conclusion of dependence-robust test due to [1]_. Note that the output of the test is random by nature. L is the number of simulation draws, and larger values reduce the random variation of the test.
Test is implemented using the U-type statistic and randomized confidence function approach due to [2]_ discussed in Remark 2 of [1]_.
Parameters
----------
Z : numpy array
n-dimensional array of scalar observations.
mu : float
Null hypothesis, e.g. the hypothesized mean of Z.
alpha : float
Significance level. Default value: 0.05.
beta : float
beta in Remark 2 of Leung (2021). The closer this is to alpha, the more conservative the test. Default value: 0.01.
R : int
Number of resampling draws for test statistic. Uses default if R=None. Default value: None.
L : int
Number of resampling draws for randomized confidence function. The larger the value, the less random the output. Default value: 1000.
seed : int
Seed for resampling draws. Set to None to not set a seed. Default value: None.
Returns
-------
string
Reject or not reject.
Examples
--------
>>> import networkinference as ni
>>> import numpy as np
>>> Z = np.random.normal(size=500)
>>> ni.core.drobust_test(Z, 0)
References
----------
.. [1] Leung, M. "Dependence-Robust Inference Using Resampled Statistics," Journal of Applied Econometrics (forthcoming), 2021.
.. [2] Song, K. "Ordering-Free Inference from Locally Dependent Data," UBC working paper, 2016.
"""
if beta <= 0 or beta > alpha:
raise ValueError('Must specify a value for beta in (0,alpha].')
if R==None: R = Z.size
np.random.seed(seed=seed)
Zc = (Z - mu) / Z.std()
BC = np.sqrt(R) / Z.size
allPairs = np.array(list(itertools.combinations(Zc, 2))) # array of all pairs of Zc
draws = np.random.choice(allPairs.prod(axis=1), R*L).reshape(L,R).sum(axis=1) / np.sqrt(R) - BC
RCF = (np.power(draws,2) <= chi2.ppf(1-alpha+beta,1)).mean()
concl = 'Not reject' if RCF >= 1-alpha else 'Reject'
return concl
[docs] @staticmethod
def drobust_ci(Z, grid_start, grid_stop, grid_size=151, coverage=0.95, beta=0.01, R=None, L=1000, seed=None):
"""Returns confidence interval (CI) derived from the dependence-robust test due to [1]_. Note that the output of the test is random by nature. L is the number of simulation draws, and larger values reduce the random variation of the test. If the result is a trivial interval, try increasing grid_size.
Test is implemented using the U-type statistic and randomized confidence function approach due to [2]_ discussed in Remark 2 of [1]_.
Parameters
----------
Z : numpy array
n-dimensional array of scalar observations.
grid_start : float
Need to specify a grid of values to test for inclusion in the CI. This is the leftmost point of the grid.
grid_stop : float
Rightmost point of the grid.
grid_size : int
Number of points in the grid. Default value: 151.
coverage : float
Desired coverage. Default value: 0.95.
beta : float
beta in Remark 2 of Leung (2021). The closer this is to 1-coverage, the more conservative the CI. Default value: 0.01.
R : int
Number of resampling draws for test statistic. Uses default if R=None. Default value: None.
L : int
Number of resampling draws for randomized confidence function. The larger the value, the less random the output. Default value: 1000.
seed : int
Seed for resampling draws. Set to None to not set a seed. Default value: None.
Returns
-------
list
Confidence interval.
Examples
--------
>>> import networkinference as ni
>>> import numpy as np
>>> Z = np.random.normal(size=500)
>>> ni.core.drobust_ci(Z, -2, 2)
References
----------
.. [1] Leung, M. "Dependence-Robust Inference Using Resampled Statistics," Journal of Applied Econometrics (forthcoming), 2021.
.. [2] Song, K. "Ordering-Free Inference from Locally Dependent Data," UBC working paper, 2016.
"""
if beta <= 0 or beta > 1-coverage:
raise ValueError('Must specify a value for beta in (0,1-coverage].')
if grid_stop < grid_start: raise IndexError('Grid start point must be smaller than grid endpoint.')
np.random.seed(seed=seed)
n = Z.size
if R==None: R = Z.size
BC = np.sqrt(R) / n
sigma = Z.std()
allPairs = np.array(list(itertools.combinations(Z/sigma, 2)))
indices = np.random.choice(range(allPairs.shape[0]), R*L).reshape(L,R)
CV = chi2.ppf(coverage+beta,1)
CI_L = grid_start # left endpoint of CI
grid = np.linspace(grid_start, grid_stop, grid_size)
for b in range(1,grid_size):
mu = grid[b]
ZProd = (allPairs - mu/sigma).prod(axis=1)
draws = ZProd[indices].sum(axis=1) / np.sqrt(R) - BC
RCF = (np.power(draws,2) <= CV).mean()
if RCF >= coverage: # if not reject
CI_L = mu
break
CI_R = grid_stop # right endpoint of CI
for b in range(grid_size-1):
mu = grid[grid_size - 1 - b]
if mu == CI_L:
CI_R = mu
break
ZProd = (allPairs - mu/sigma).prod(axis=1)
draws = ZProd[indices].sum(axis=1) / np.sqrt(R) - BC
RCF = (np.power(draws,2) <= CV).mean()
if RCF >= coverage:
CI_R = mu
break
return [CI_L,CI_R]
####### HAC #######
[docs] @staticmethod
def network_hac(Z, A, b=None, disp=False):
"""Returns network HAC variance estimator due to [1]_ (also see [2]_). Setting b=0 and A = any value (e.g. None) outputs the conventional heteroskedasticity-robust variance estimator for i.i.d. data. Network is converted to an unweighted, undirected version by dropping edge weights and directionality of links.
Parameters
----------
Z : numpy array
n-dimensional array of scalar observations or n x k matrix of n k-dimensional observations.
A : NetworkX graph
Graph on n nodes. NOTE: Assumes nodes are labeled as integers 0, ..., n-1 in A, so that the data for node i is given by the ith component of Z.
b : float
HAC bandwidth. Recommend keeping b=None, which uses the bandwidth choice recommended by [2]_. Default value: None.
disp : boolean
If False, the function only returns HAC. If True, the function returns (HAC, APL, b, PD_failure). Default value: False.
Returns
-------
HAC : numpy array
Estimate of variance-covariance matrix.
APL : float
Average path length of A.
b : int
Bandwidth.
PD_failure : boolean
True if substitute positive definite variance estimator needed to be used.
References
----------
.. [1] Kojevnikov, D., V. Marmer, and K. Song, "Limit Theorems for Network Dependent Random Variables," Journal of Econometrics, 2021, 222 (2), 882-908.
.. [2] Leung, M. "Causal Inference Under Approximate Neighborhood Interference," Econometrica (forthcoming), 2021.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> import numpy as np
>>> Z = np.random.normal(size=500)
>>> A = FakeData.erdos_renyi(500)
>>> HAC = ni.core.network_hac(Z, A)
"""
n = Z.shape[0]
PD_failure = False
if b == 0: # iid SE
weights = np.eye(n)
APL = 0
else:
G = nx.to_scipy_sparse_matrix(A.to_undirected(as_view=True), nodelist=range(n), weight=None, format='csr') # sparse matrix representation
dist_matrix = csgraph.dijkstra(csgraph=G, directed=False, unweighted=True) # path distance matrix
Gcc = [A.subgraph(c).copy() for c in sorted(nx.connected_components(A), key=len, reverse=True)]
giant = [i for i in Gcc[0]] # set of nodes in giant component
APL = dist_matrix[np.ix_(giant,giant)].sum() / len(giant) / (len(giant)-1) # average path length
if b==None:
avg_deg = G.dot(np.ones(n)[:,None]).mean()
exp_nbhd = APL < 2 * np.log(n) / np.log(avg_deg)
b = round(APL/2) if exp_nbhd else round(APL**(1/3)) # default bandwidth
weights = dist_matrix <= b
# default variance estimator (not guaranteed PD)
Zc = Z - Z.mean(axis=0)
HAC = Zc.T.dot(weights.dot(Zc)) / n
# PD variance estimator from the first (v1) arXiv draft of Leung (2019), "Causal Inference Under Approximate Neighborhood Interference"
if Z.ndim == 1:
PD_failure = HAC <= 0
elif Z.ndim == 2:
PD_failure = np.any(np.linalg.eigvals(HAC) <= 0)
if PD_failure:
if b==None:
avg_deg = np.array([i[1] for i in A.degree]).mean()
exp_nbhd = APL < 2 * np.log(n) / np.log(avg_deg)
b = round(APL/4) if exp_nbhd else round(APL**(1/3)) # default bandwidth
b_neighbors = dist_matrix <= b
row_sums = np.squeeze(b_neighbors.dot(np.ones(Z.shape[0])[:,None]))
b_norm = b_neighbors / np.sqrt(row_sums)[:,None]
weights = b_norm.dot(b_norm.T)
HAC = Zc.T.dot(weights.dot(Zc)) / n
if disp:
return HAC, APL, b, PD_failure
else:
return HAC
####### Clustering #######
[docs] @staticmethod
def sumstats(A, decimals=3):
"""Prints table of network summary statistics.
Parameters
----------
A : NetworkX undirected, unweighted graph
decimals : int
Number of decimals to which to round the output.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> A = FakeData.erdos_renyi(500)
>>> sumstats(A)
"""
numnodes = A.number_of_nodes()
numedges = A.number_of_edges()
clustering = nx.average_clustering(A)
Gcc = [A.subgraph(c).copy() for c in sorted(nx.connected_components(A), key=len, reverse=True)]
giant = Gcc[0].to_undirected()
diam = nx.diameter(giant)
APL = nx.average_shortest_path_length(giant)
giant_size = len(giant)
ccount = len(Gcc)
deg_seq = np.array([i[1] for i in A.degree])
num_isos = (deg_seq == 0).sum()
max_deg = deg_seq.max()
avg_deg = deg_seq.mean()
labels = ['# Units', '# Links', 'Average Degree', 'Max Degree', '# Isolates', 'Giant Size', 'Diameter', 'Average Path Length', '# Components', 'Clustering']
numbers = [numnodes, numedges, avg_deg, max_deg, num_isos, giant_size, diam, APL, ccount, clustering]
table = np.vstack([labels, numbers]).T
print(tabulate(table, headers=['Summary Statistics', ''], floatfmt='.' + str(decimals) + 'f'))
[docs] @staticmethod
def plot_spectrum(A, giant=True, weight=None, xlim_scat_buffer=0.03, ylim_scat_buffer=0.03, \
xticks_scat=3, yticks_scat=3, xticks_hist=3, binwidth=None, binrange=None, figsize=(10, 4), \
title_hist='Histogram', title_scat='Scatterplot', title_y='Eigenvalues', sns_style='dark'):
"""Plots spectrum of the normalized Laplacian in a scatterplot and histogram.
Parameters
----------
A : NetworkX graph
Can be weighted or directed.
giant : boolean
Set to True to plot spectrum of the giant component. Set to False to plot spectrum of the full graph. Default value: True.
weight : string
Specifies how edge weights are labeled in A, if A is a weighted graph. Default value: None.
xlim_scat_buffer : float [0,1]
Larger value adds more whitespace before and after the leftmost and rightmost points of the scatterplot.
ylim_scat_buffer : float [0,1]
Larger value adds more whitespace below and above the bottommost and topmost points of the scatterplot.
xticks_scat : int
Number of tick marks on x-axis of scatterplot.
yticks_scat : int
Number of tick marks of y-axis of scatterplot.
xticks_hist : int
Number of tick marks on x-axis of histogram.
binwidth : int
Width of histogram bins.
binrange : int
Range of histogram bins.
figsize : tuple of ints
Size of figure.
title_hist : string
Title of histogram.
title_scat : string
Title of scatterplot.
title_y : string
Title of scatterplot y-axis.
sns_style : string
Seaborn style of figures.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> A = FakeData.erdos_renyi(500)
>>> plot_spectrum(A)
"""
if giant:
Gcc = [A.subgraph(c).copy() for c in sorted(nx.connected_components(A), key=len, reverse=True)]
G = Gcc[0]
else:
G = A
n = G.number_of_nodes()
L = csgraph.laplacian(nx.to_scipy_sparse_matrix(G, weight=weight, format='csr'), normed=True)
ivals = eigh(L.todense(), eigvals_only=True)
sns.set_theme(style='dark')
fig, axes = plt.subplots(1, 2, figsize=figsize)
axes[0].set_title(title_scat)
axes[0].set(ylabel=title_y)
yscat_buffer = (np.max(ivals)-np.min(ivals))*ylim_scat_buffer
axes[0].set(ylim=(np.min(ivals)-yscat_buffer, np.max(ivals)+yscat_buffer))
axes[0].set_yticks(np.linspace(np.min(ivals),np.max(ivals),yticks_scat))
xscat_buffer = n*xlim_scat_buffer
axes[0].set(xlim=(-xscat_buffer,n+xscat_buffer))
axes[0].set_xticks(np.linspace(0,n,xticks_scat))
sns.scatterplot(x=np.arange(n), y=ivals, linewidth=0, s=10, ax=axes[0])
maxcount = np.max(np.histogram(ivals)[0])
axes[1].set(xlim=(np.min(ivals), np.max(ivals)))
axes[1].set_xticks(np.linspace(np.min(ivals),np.max(ivals),xticks_hist))
axes[1].set_title(title_hist)
sns.histplot(data=ivals, binwidth=binwidth, binrange=binrange, ax=axes[1])
plt.tight_layout()
plt.show()
[docs] @staticmethod
def spectrum(A, giant=True, weight=None):
"""Returns spectrum of the normalized Laplacian.
Parameters
----------
A : NetworkX graph
Can be weighted or directed.
giant : boolean
Set to True to only return spectrum of the giant component. Set to False to return spectrum of the full graph. Default value: True.
weight : string
Specifies how edge weights are labeled in A, if A is a weighted graph. Default value: None.
Returns
-------
numpy array
Eigenvalues.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> A = FakeData.erdos_renyi(500)
>>> ivals = spectrum(A)
"""
if giant:
Gcc = [A.subgraph(c).copy() for c in sorted(nx.connected_components(A), key=len, reverse=True)]
G = Gcc[0]
else:
G = A
L = csgraph.laplacian(nx.to_scipy_sparse_matrix(G, weight=weight, format='csr'), normed=True)
return eigh(L.todense(), eigvals_only=True)
[docs] @staticmethod
def spectral_clustering(num_clusters, A, seed=None):
"""Returns network clusters obtained from normalized spectral clustering algorithm due to [1]_ (also see [2]_). All nodes not in the giant component are grouped into a single cluster. NOTE: Assumes nodes are labeled as integers 0, ..., n-1 in A.
Parameters
----------
num_clusters : int
Number of desired clusters in the giant component.
A : NetworkX graph
Graph on n nodes. Can be weighted or directed.
seed : int
Seed for k-means clustering initialization. Set to None to not set a seed. Default value: None.
Returns
-------
numpy array
n-dimensional array of cluster labels from 0 to num_clusters-1
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> A = FakeData.erdos_renyi(500)
>>> clusters = ni.core.spectral_clustering(10, A)
References
----------
.. [1] Ng, A., M. Jordan, Y. Weiss, "On Spectral Clustering: Analysis and an Algorithm." Advances in Neural Information Processing Systems, 2002, 849-856.
.. [2] von Luxburg, U., "A Tutorial on Spectral Clustering," Statistics and Computing, 2007, 17 (4), 395-416.
"""
A_components = [A.subgraph(c).copy() for c in sorted(nx.connected_components(A), key=len, reverse=True)]
A_giant = A_components[0]
L = csgraph.laplacian(nx.to_scipy_sparse_matrix(A_giant, format='csr'), normed=True) # sparse laplacian matrix of the network restricted to the giant component
ivals, ivecs = eigsh(L, k=num_clusters, which='SM')
ivecs /= np.sqrt( (ivecs**2).sum(axis=1) )[:,None] # row normalize by row norm
kmeans = KMeans(num_clusters, n_init=30, random_state=seed).fit(ivecs)
clusters = kmeans.labels_ # spectral clustering only the giant component
# assign labels to nodes not in the giant
all_clusters = np.ones(A.number_of_nodes()).astype('float') * clusters.max()+1
all_clusters[list(A_giant.nodes)] = clusters
return all_clusters
[docs] @staticmethod
def conductance(clusters, A, weight=None):
"""Returns maximal conductance of a set of clusters. For cluster-robust methods to work, conductance should be at most 0.1, as recommended by [1]_.
Parameters
----------
clusters : numpy array
n-dimensional array of cluster labels for all n nodes, assumed to be 0, ..., L-1 where L is the number of clusters.
A : NetworkX graph
Graph on n nodes. Can be weighted or directed.
weight : string
Specifies how edge weights are labeled in A, if A is a weighted graph. Default value: None.
Returns
-------
float
Maximal conductance of the clusters.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> A = FakeData.erdos_renyi(500)
>>> clusters = ni.core.spectral_clustering(10, A)
>>> ni.core.conductance(clusters, A)
References
----------
.. [1] Leung, M., "Network Cluster-Robust Inference," arXiv preprint arXiv:2103.01470, 2021.
"""
num_clusters = int(np.max(clusters) + 1)
conductances = np.zeros(num_clusters) # record conductance of each cluster
for i in range(num_clusters):
S = np.where(clusters==i)[0]
conductances[i] = nx.cut_size(A, S, weight) / np.maximum(nx.volume(A, S, weight), 1)
return np.max(conductances)
[docs] @staticmethod
def cluster_var(Z, clusters):
"""Returns conventional cluster-robust variance estimator.
Parameters
----------
Z : numpy array
n-dimensional array of scalar observations or n x k matrix of n k-dimensional observations.
clusters : numpy array
n-dimensional array of cluster labels for all n nodes, assumed to be 0, ..., L-1 where L is the number of clusters.
Returns
-------
numpy array
Variance-covariance matrix.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> import numpy as np
>>> Z = np.random.normal(size=500)
>>> A = FakeData.erdos_renyi(500)
>>> clusters = ni.core.spectral_clustering(10, A)
>>> var = ni.core.cluster_var(Z, clusters)
"""
n = Z.shape[0]
Zc = Z - Z.mean(axis=0)
cluster_sizes = np.array([(clusters==i).sum() for i in np.unique(clusters)])
blocks = [np.ones((i,i)) for i in cluster_sizes]
weights = block_diag(*blocks)
return Zc.T.dot(weights.dot(Zc)) / n
[docs] @staticmethod
def trobust_ci(Z, coverage=0.95, verbose=True):
"""Returns CI from the t-statistic based cluster-robust procedure due to [1]_. The larger the dimension of Z (i.e. more clusters), the more powerful the test. However, since the test computes estimates cluster by cluster, the output can be more unstable with a larger number of clusters since the sample size within each cluster can be small.
Parameters
----------
Z : numpy array
q-dimensional array of estimates, one for each of the q clusters.
coverage : float
Desired coverage. Default value: 0.95.
verbose : boolean
If True, calling this function prints out the results. Default value: True.
Returns
-------
CI : list
Confidence interval.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> import numpy as np
>>> Z = np.random.normal(size=10)
>>> ni.core.trobust(Z)
References
----------
.. [1] Ibragimov, R. and U. Mueller, "t-Statistic Based Correlation and Heterogeneity Robust Inference," Journal of Business and Economic Statistics, 2010, 28 (4), 453-468.
"""
if 1-coverage > 0.08326:
raise ValueError('alpha must be less than 0.08326.')
mean, SE = Z.mean(), Z.std() / np.sqrt(Z.size)
CV = t.ppf(1-(1-coverage)/2, Z.size-1)
CI = [mean - CV * SE, mean + CV * SE]
if verbose:
fmat = '%.' + str(decimals) + 'f'
CI_formatted = [float(Decimal(fmat % CI[0])), float(Decimal(fmat % CI[1]))]
print(tabulate([mean, SE, CI_formatted], headers=['Mean', 'SE', '95% CI'], floatfmt='.' + str(decimals) + 'f'))
return CI
[docs] @staticmethod
def arand_test(Z, mu, seed=None):
"""Returns p-value and test statistic of approximate randomization test [1]_. The larger the dimension of Z (i.e. more clusters), the more powerful the test. However, since the test computes estimates cluster by cluster, the output can be more unstable with a larger number of clusters since the sample size within each cluster can be small.
Parameters
----------
Z : numpy array
q-dimensional array of estimates, one for each of the q clusters.
mu : float
Scalar null-hypothesized value of the mean of Z.
seed : int
Seed for drawing permutations, which is only relevant when the dimension of Z exceeds 12. Set to None to not set a seed. Default value: None.
Returns
-------
p_value : float
P-value of test.
T_wald : float
Test statistic.
Examples
--------
>>> import networkinference as ni
>>> import numpy as np
>>> Z = np.random.normal(size=10)
>>> ni.core.arand_test(Z, 0)
References
----------
.. [1] Canay, I., J. Romano, and A. Shaikh, "Randomization Tests Under an Approximate Symmetry Assumption," Econometrica, 2017, 85 (3), 1013-1030.
"""
q = Z.size
if q <= 12: # if there are less than 12 clusters, use all possible permutations
ph = q*[1]
G = [tuple(ph), tuple(q*[-1])] # list of permutations
for j in range(q-1):
ph[j] = -1
G += list(perm_unique(ph))
G = np.array(G)
else: # if there are more than 12 clusters, sample permutations at random
np.random.seed(seed=seed)
G = np.random.binomial(1,0.5,size=(5000,q))
G[G == 0] = -1
Zc = Z - mu
rand_dist = np.array([test_stat(G[g] * Zc) for g in range(G.shape[0])])
T_wald = test_stat(Zc)
p_value = (rand_dist >= T_wald).mean()
return p_value, T_wald
[docs] @staticmethod
def arand_ci(Z, grid_start, grid_stop, grid_size=151, coverage=0.95, seed=None):
"""Returns confidence interval (CI) obtained by inverting an approximate randomization test [1]_. If the result is a trivial interval, try increasing grid_size. The larger the dimension of Z (i.e. more clusters), the narrower the CI. However, since the test computes estimates cluster by cluster, the output can be more unstable with a larger number of clusters since the sample size within each cluster can be small.
Parameters
----------
Z : numpy array
q-dimensional array containing estimator for each of the q clusters.
grid_start : float
Need to specify a grid of values over which to invert the test. This is the leftmost point of the grid.
grid_stop : float
Rightmost point of the grid.
grid_size : int
Number of points in the grid. Default value: 151.
coverage : float
Desired coverage. Default value: 0.95.
seed : int
Seed for drawing permutations, which is only relevant when the dimension of Z exceeds 12. Set to None to not set a seed. Default value: None.
Returns
-------
list
Confidence interval.
Examples
--------
>>> import networkinference as ni
>>> import numpy as np
>>> Z = np.random.normal(size=10)
>>> ni.core.arand_ci(Z, -2, 2)
References
----------
.. [1] Canay, I., J. Romano, and A. Shaikh, "Randomization Tests Under an Approximate Symmetry Assumption," Econometrica, 2017, 85 (3), 1013-1030.
"""
if grid_stop < grid_start: raise IndexError('Grid start point must be smaller than grid endpoint.')
q = Z.size
if q <= 12: # if there are less than 12 clusters, use all possible permutations
ph = q*[1]
G = [tuple(ph), tuple(q*[-1])] # list of permutations
for j in range(q-1):
ph[j] = -1
G += list(perm_unique(ph))
G = np.array(G)
else: # if there are more than 12 clusters, sample permutations at random
np.random.seed(seed=seed)
G = np.random.binomial(1,0.5,size=(5000,q))
G[G == 0] = -1
# TO DO: improve computational speed using bisection search
CI_L = grid_start # left endpoint of CI
grid = np.linspace(grid_start, grid_stop, grid_size)
for b in range(1,grid_size):
mu = grid[b]
Zc = Z - mu
rand_dist = np.array([test_stat(G[g] * Zc) for g in range(G.shape[0])])
T_wald = test_stat(Zc)
pval = (rand_dist >= T_wald).mean()
if pval > 1-coverage: # if not reject
CI_L = mu
break
CI_R = grid_stop # right endpoint of CI
for b in range(grid_size-1):
mu = grid[grid_size - 1 - b]
if mu == CI_L:
CI_R = mu
break
Zc = Z - mu
rand_dist = np.array([test_stat(G[g] * Zc) for g in range(G.shape[0])])
T_wald = test_stat(Zc)
pval = (rand_dist >= T_wald).mean()
if pval > 1-coverage: # if not reject
CI_R = mu
break
return [CI_L,CI_R]
def test_stat(Z):
# Test statistic for core.arand_test() and core.arand_ci().
return np.abs(Z.mean())
class UniqueElement:
def __init__(self,value,occurrences):
self.value = value
self.occurrences = occurrences
def perm_unique(elements):
# Function used in core.arand_test() and core.arand_ci().
# Code from https://stackoverflow.com/questions/6284396/permutations-with-unique-values
eset=set(elements)
listunique = [UniqueElement(i,elements.count(i)) for i in eset]
u=len(elements)
return perm_unique_helper(listunique,[0]*u,u-1)
def perm_unique_helper(listunique,result_list,d):
if d < 0:
yield tuple(result_list)
else:
for i in listunique:
if i.occurrences > 0:
result_list[d]=i.value
i.occurrences-=1
for g in perm_unique_helper(listunique,result_list,d-1):
yield g
i.occurrences+=1