import numpy as np
from scipy.linalg import inv
from scipy.stats import norm
from decimal import Decimal
from tabulate import tabulate
from .core import core
[docs]class OLS(object):
"""OLS estimator.
Parameters
----------
Y : numpy float array
n-dimensional array of outcomes.
X : numpy float array
n x k array of regressors (not including intercept) or n-dimensional array.
A : NetworkX graph
Graph on n nodes. NOTE: Assumes nodes are labeled as integers 0, ..., n-1 in A, so that the outcome of node i is given by the ith component of Y. Network can be weighted or directed, although weights and directions are ignored when computing network SEs. Argument not used for dependence robust test or CI. Default value: None.
Attributes
----------
data : dictionary
Stores all input data, adding a column of ones to X.
summands : numpy array
n-dimensional array of intermediate products used to compute OLS estimator.
estimate : float
OLS estimator.
resid : numpy array
Regression residuals.
invhessian : numpy array
Inverse hessian matrix.
scores : numpy array
Regression scores.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols()
>>> ols_model = ni.OLS(Y, X, A)
>>> print(ols_model.estimate)
"""
def __init__(self, Y, X, A=None):
"""Stores inputs, computes estimator.
"""
if X.ndim == 1:
Xp = np.vstack([np.ones(X.size), X]).T # n x 2
elif X.ndim == 2:
Xp = np.hstack([np.ones(X.shape[0])[:,np.newaxis], X]) # n x (k+1)
self.invhessian = inv(Xp.T.dot(Xp)) # (k+1) x (k+1), (Xp'Xp)^{-1} matrix
self.summands = Xp * Y[:,np.newaxis] # n x (k+1), mean of this is Xp'Y matrix
self.estimate = self.invhessian.dot(self.summands.sum(axis=0)) # (k+1) dimensional, OLS estimate
self.resid = Y - Xp.dot(self.estimate) # residuals
self.scores = Xp * self.resid[:,np.newaxis]
self.data = {'Y':Y, 'X':Xp, 'network':A}
[docs] def network_se(self, b=None, decimals=3, verbose=True, PD_alert=False):
"""Returns standard errors derived from network HAC variance estimator due to [1]_ using bandwidth suggested by [2]_. Setting b=0 outputs the conventional heteroskedasticity-robust variance estimator for i.i.d. data. Network is converted to an unweighted, undirected version by dropping edge weights and directionality of links.
The default output uses a uniform kernel. If the result is not positive definite, the output is an estimator guaranteed to be positive definite due to the first working paper version of [2]_.
Parameters
----------
b : float
HAC bandwidth. Recommend keeping b=None, which uses the bandwidth choice recommended by [2]_. Default value: None.
decimals : int
Number of decimals to which to round the output table.
verbose : boolean
If True, calling this method prints out the results. Default value: True.
PD_alert : boolean
If True, method will print an alert whenever the default estimator is not positive definite.
Attributes
----------
network_se_vcov : float
Estimate of variance-covariance matrix.
network_se_result : float
Standard errors.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols()
>>> ols_model = ni.OLS(Y, X, A)
>>> ols.network_se()
References
----------
.. [1] Kojevnikov, D., V. Marmer, and K. Song, "Limit Theorems for Network Dependent Random Variables," Journal of Econometrics, 2021, 222 (2), 882-908.
.. [2] Leung, M. "Causal Inference Under Approximate Neighborhood Interference," Econometrica (forthcoming), 2021.
"""
PD_failure = False
if isinstance(self.invhessian, np.ndarray):
if PD_alert:
V,_,_,PD_failure = core.network_hac(self.scores, self.data['network'], b, disp=True)
else:
V = core.network_hac(self.scores, self.data['network'], b)
self.network_se_vcov = self.data['Y'].size * self.invhessian.dot(V).dot(self.invhessian)
self.network_se_result = np.sqrt(np.diag(self.network_se_vcov))
else:
if PD_alert:
self.network_se_vcov,_,_,PD_failure = core.network_hac(self.summands, self.data['network'], b, disp=True)
else:
self.network_se_vcov = core.network_hac(self.summands, self.data['network'], b)
self.network_se_result = np.sqrt(self.network_se_vcov / self.summands.size)
if PD_alert and PD_failure: print('Estimator not positive definite. Correction used.')
if verbose:
CV = norm.ppf(1-0.05/2)
if self.estimate.size == 1:
est = np.array([self.estimate])
se = np.array([self.network_se_result])
else:
est = self.estimate
se = self.network_se_result
fmat = '%.' + str(decimals) + 'f'
table = []
for k in range(est.size):
CI = [est[k] - CV * se[k], est[k] + CV * se[k]]
CI = [float(Decimal(fmat % CI[0])), float(Decimal(fmat % CI[1]))]
table.append([est[k], se[k], CI])
print(tabulate(table, headers=['Estimate', 'SE', '95% CI'], floatfmt='.' + str(decimals) + 'f'))
[docs] def drobust_test(self, mu, dimension=0, alpha=0.05, beta=0.01, R=None, L=1000, seed=None, verbose=True):
"""Returns conclusion of dependence-robust test due to [1]_. Note that the output of the test is random by nature. L is the number of simulation draws, and larger values reduce the random variation of the test.
Test is implemented using the U-type statistic and randomized confidence function approach due to [2]_ discussed in Remark 2 of [1]_.
Parameters
----------
mu : float
Null value of the estimand in the specified dimension.
dimension : int
Dimension of the estimand being tested. Ignored if estimand is scalar. Default value: 0.
alpha : float
Significance level. Default value: 0.05.
beta : float
beta in Remark 2 of Leung (2021). The closer this is to alpha, the more conservative the test. Default value: 0.01.
R : int
Number of resampling draws for test statistic. Uses default if R=None. Default value: None.
L : int
Number of resampling draws for randomized confidence function. The larger the value, the less random the output. Default value: 1000.
seed : int
seed for resampling draws. Set to None to not set a seed. Default value: None.
verbose : boolean
If True, calling this method prints out the results. Default value: True.
Attributes
----------
drobust_test_result : string
Reject or not reject.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols()
>>> ols_model = ni.OLS(Y, X, A)
>>> ols.drobust_test(1, dimension=1)
References
----------
.. [1] Leung, M. "Dependence-Robust Inference Using Resampled Statistics," Journal of Applied Econometrics (forthcoming), 2021.
.. [2] Song, K. "Ordering-Free Inference from Locally Dependent Data," UBC working paper, 2016.
"""
if isinstance(self.invhessian, np.ndarray):
dat = self.summands.shape[0] * self.summands.dot(self.invhessian)[:,dimension]
else:
dat = self.summands
self.drobust_test_result = core.drobust_test(dat, mu, alpha, beta, R, L, seed)
if verbose: print(f'Conclusion of dependence-robust test: {self.drobust_test_result}')
[docs] def drobust_ci(self, grid_start, grid_stop, dimension=None, grid_size=151, coverage=0.95, \
beta=0.01, R=None, L=1000, seed=None, decimals=3, verbose=True):
"""Returns confidence interval (CI) derived from the dependence-robust test due to [1]_. Note that the output of the test is random by nature. L is the number of simulation draws, and larger values reduce the random variation of the test. If the result is a trivial interval, try increasing grid_size.
Test is implemented using the U-type statistic and randomized confidence function approach due to [2]_ discussed in Remark 2 of [1]_.
Parameters
----------
grid_start : float
Need to specify a grid of values to test for inclusion in the CI. This is the leftmost point of the grid.
grid_stop : float
Rightmost point of the grid.
dimension : int
Dimension of the estimand for which you want the CI. Ignored if estimand is scalar. To generate a table of CIs for all dimensions, set dimension=None. Default value: None.
grid_size : int
Number of points in the grid. Default value: 151.
coverage : float
Desired coverage. Default value: 0.95.
beta : float
beta in Remark 2 of Leung (2021). The closer this is to 1-coverage, the more conservative the CI. Default value: 0.01.
R : int
Number of resampling draws for test statistic. Uses default if R=None. Default value: None.
L : int
Number of resampling draws for randomized confidence function. The larger the value, the less random the output. Default value: 1000.
seed : int
seed for resampling draws. Set to None to not set a seed. Default value: None.
decimals : int
Number of decimals to which to round the output table.
verbose : boolean
If True, calling this method prints out the results. Default value: True.
Attributes
----------
drobust_ci_result : list
Confidence interval.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols()
>>> ols_model = ni.OLS(Y, X, A)
>>> ols.drobust_ci(-5, 5)
References
----------
.. [1] Leung, M. "Dependence-Robust Inference Using Resampled Statistics," Journal of Applied Econometrics (forthcoming), 2021.
.. [2] Song, K. "Ordering-Free Inference from Locally Dependent Data," UBC working paper, 2016.
"""
if isinstance(self.invhessian, np.ndarray):
dims = range(self.estimate.size) if dimension == None else [dimension]
else:
dims = [0]
fmat = '%.' + str(decimals) + 'f'
table = []
self.drobust_ci_result = []
for dim in dims:
if isinstance(self.invhessian, np.ndarray):
dat = self.summands.shape[0] * self.summands.dot(self.invhessian)[:,dim]
else:
dat = self.summands
CI = core.drobust_ci(dat, grid_start, grid_stop, grid_size, coverage, beta, R, L, seed)
CI = [np.around(CI[0],6), np.around(CI[1],6)] # dealing with floating point error
self.drobust_ci_result.append(CI)
if verbose:
CI = [float(Decimal(fmat % CI[0])), float(Decimal(fmat % CI[1]))]
table.append([dat.mean(), CI])
if len(self.drobust_ci_result) == 1: self.drobust_ci_result = self.drobust_ci_result[0]
if verbose: print(tabulate(table, headers=['Estimate', 'CI'], floatfmt='.' + str(decimals) + 'f'))
[docs] def get_clusters(self, num_clusters, clusters=None, seed=None, weight=None, verbose=True):
"""Returns network clusters obtained from normalized spectral clustering algorithm due to [2]_ (also see [3]_). Returns maximal conductance of clusters, a [0,1]-measure of cluster quality that should be at most 0.1 for cluster-robust methods to have good performance (see [1]_). All nodes not in the giant component are grouped into a single cluster.
Parameters
----------
num_clusters : int
Number of desired clusters in the giant component.
seed : int
Seed for k-means clustering initialization. Set to None to not set a seed. Default value: None.
clusters : numpy array
Optional array of cluster memberships obtained from the output of this method or spectral_clustering() in the core class. The only purpose of this argument is to load clusters obtained elsewhere into the current object.
weight : string
Specifies how edge weights are labeled in A, if A is a weighted graph. Default value: None.
verbose : boolean
When set to True, the method prints the maximal conductance of the clusters. Default value: True.
Attributes
----------
clusters : numpy array
n-dimensional array of cluster labels from 0 to num_clusters-1, where n is the number of nodes.
conductance : float
Maximal conductance of the clusters.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols(network='RGG')
>>> ols_model = ni.OLS(Y, X, A)
>>> ols.get_clusters(10)
References
----------
.. [1] Leung, M., "Network Cluster-Robust Inference," arXiv preprint arXiv:2103.01470, 2021.
.. [2] Ng, A., M. Jordan, Y. Weiss, "On Spectral Clustering: Analysis and an Algorithm." Advances in Neural Information Processing Systems, 2002, 849-856.
.. [3] von Luxburg, U., "A Tutorial on Spectral Clustering," Statistics and Computing, 2007, 17 (4), 395-416.
"""
if isinstance(clusters, np.ndarray):
self.clusters = clusters
else:
self.clusters = core.spectral_clustering(num_clusters, self.data['network'], seed)
self.conductance = core.conductance(self.clusters, self.data['network'], weight)
if verbose: print(f'Maximal conductance: {self.conductance}')
[docs] def est_by_cluster(self, dimension):
"""Returns array of OLS estimates, one for each cluster. This is a helper method used by arand_test() and arand_ci().
Parameters
----------
dimension : int
Dimension of estimand being tested. Ignore if estimand is scalar. Default value: 0.
Returns
-------
thetahat : numpy array
L-dimensional array of OLS estimates, one for each of the L clusters.
"""
thetahat = []
for C in np.unique(self.clusters):
members = np.where(self.clusters==C)[0]
Yp = self.data['Y'][members]
Xp = self.data['X'][members,:]
thetahat.append( np.linalg.pinv(Xp.T.dot(Xp)).dot(Xp.T.dot(Yp[:,np.newaxis]))[dimension,0] )
if len(thetahat) == 1:
thetahat = thetahat[0]
else:
thetahat = np.array(thetahat)
return thetahat
[docs] def trobust_ci(self, dimension=None, num_clusters=5, coverage=0.95, decimals=3, verbose=True):
"""Returns confidence interval (CI) from the t-statistic based cluster-robust procedure due to [1]_. The more clusters, the more powerful the test. However, since the test computes estimates cluster by cluster, the output can be more unstable with a larger number of clusters since the sample size within each cluster can be small.
Parameters
----------
dimension : int
Dimension of the estimand for which you want the CI. Ignored if estimand is scalar. To generate a table of CIs for all dimensions, set dimension=None. Default value: None.
num_clusters : int
Ignored if get_clusters() was already run on this object. If it wasn't, this calls the get_cluster() method, asking for this many clusters. Default value: 5.
coverage : float
Desired coverage. Default value: 0.95.
decimals : int
Number of decimals to which to round the output table.
verbose : boolean
If True, calling this method prints out the results. Default value: True.
Attributes
----------
trobust_ci_result : list
Confidence interval.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols(network='RGG')
>>> ols_model = ni.OLS(Y, X, A)
>>> ols.get_clusters(10)
>>> ols.trobust_ci()
References
----------
.. [1] Ibragimov, R. and U. Mueller, "t-Statistic Based Correlation and Heterogeneity Robust Inference," Journal of Business and Economic Statistics, 2010, 28 (4), 453-468.
"""
if not hasattr(self,'clusters'): self.get_clusters(num_clusters)
if isinstance(self.invhessian, np.ndarray):
dims = range(self.estimate.size) if dimension == None else [dimension]
else:
dims = [0]
fmat = '%.' + str(decimals) + 'f'
table = []
self.trobust_ci_result = []
for dim in dims:
if isinstance(self.invhessian, np.ndarray):
est = self.estimate[dim]
else:
est = self.estimate
thetahat = self.est_by_cluster(dim)
CI = core.trobust_ci(thetahat, coverage, False)
self.trobust_ci_result.append(CI)
if verbose:
CI = [float(Decimal(fmat % CI[0])), float(Decimal(fmat % CI[1]))]
table.append([est, CI])
if len(self.trobust_ci_result) == 1: self.trobust_ci_result = self.trobust_ci_result[0]
if verbose: print(tabulate(table, headers=['Estimate', 'CI'], floatfmt='.' + str(decimals) + 'f'))
[docs] def arand_test(self, mu, dimension=0, num_clusters=5, seed=None, verbose=True):
"""Returns p-value of approximate randomization test [1]_. The test is more powerful with more clusters. However, since the test computes estimates cluster by cluster, the output can be more unstable with a larger number of clusters since the sample size within each cluster can be small.
Parameters
----------
dimension : int
Dimension of estimand being tested. Ignored if estimand is scalar. Default value: 0.
mu : float
Null value of the estimand in the specified dimension.
num_clusters : int
Ignored if get_clusters() was already run on this object. If it wasn't, this calls the get_cluster() method, asking for this many clusters. Default value: 5.
seed : int
Seed for drawing permutations, which is only relevant when there are more than 12 clusters. Set to None to not set a seed. Default value: None.
verbose : boolean
If True, calling this method prints out the results. Default value: True.
Attributes
----------
arand_test_result : float
P-value.
arand_test_stat : float
Test statistic.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols(network='RGG')
>>> ols_model = ni.OLS(Y, X, A)
>>> ols.get_clusters(10)
>>> ols.arand_test(1, dimension=1)
References
----------
.. [1] Canay, I., J. Romano, and A. Shaikh, "Randomization Tests Under an Approximate Symmetry Assumption," Econometrica, 2017, 85 (3), 1013-1030.
"""
if not hasattr(self,'clusters'): self.get_clusters(num_clusters)
thetahat = self.est_by_cluster(dimension)
self.arand_test_result, self.arand_test_stat = core.arand_test(thetahat, mu, seed)
if verbose: print(f'P-value of randomization test: {self.arand_test_result}')
[docs] def arand_ci(self, grid_start, grid_stop, dimension=None, grid_size=151, coverage=0.95, \
num_clusters=5, decimals=3, seed=None, verbose=True):
"""Returns confidence interval (CI) obtained by inverting an approximate randomization test [1]_. If the result is a trivial interval, try increasing grid_size. The CI is narrower with more clusters. However, since the test computes estimates cluster by cluster, the output can be more unstable with a larger number of clusters since the sample size within each cluster can be small.
Parameters
----------
grid_start : float
Need to specify a grid of values over which to invert the test. This is the leftmost point of the grid.
grid_stop : float
Rightmost point of the grid.
dimension : int
Dimension of the estimand for which you want the CI. To generate a table of CIs for all dimensions, set dimension=None. Ignored if estimand is scalar. Default value: None.
grid_size : int
Number of points in the grid. Default value: 151.
coverage : float
Desired coverage. Default value: 0.95.
num_clusters : int
Ignored if get_clusters() was already run on this object. If it wasn't, this calls the get_cluster() method, asking for this many clusters. Default value: 5.
decimals : int
Number of decimals to which to round the output table.
seed : int
Seed for drawing permutations, which is only relevant when there are more than 12 clusters. Set to None to not set a seed. Default value: None.
verbose : boolean
If True, calling this method prints out the results. Default value: True.
Attributes
----------
arand_ci_result : list
Confidence interval.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols(network='RGG')
>>> ols_model = ni.OLS(Y, X, A)
>>> ols.get_clusters(10)
>>> ols.arand_ci(-5, 5)
References
----------
.. [1] Canay, I., J. Romano, and A. Shaikh, "Randomization Tests Under an Approximate Symmetry Assumption," Econometrica, 2017, 85 (3), 1013-1030.
"""
if not hasattr(self,'clusters'): self.get_clusters(num_clusters)
if isinstance(self.invhessian, np.ndarray):
dims = range(self.estimate.size) if dimension == None else [dimension]
else:
dims = [0]
fmat = '%.' + str(decimals) + 'f'
table = []
self.arand_ci_result = []
for dim in dims:
if isinstance(self.invhessian, np.ndarray):
est = self.estimate[dim]
else:
est = self.estimate
thetahat = self.est_by_cluster(dim)
CI = core.arand_ci(thetahat, grid_start, grid_stop, grid_size, coverage, seed)
CI = [np.around(CI[0],6), np.around(CI[1],6)] # dealing with floating point error
self.arand_ci_result.append(CI)
if verbose:
CI = [float(Decimal(fmat % CI[0])), float(Decimal(fmat % CI[1]))]
table.append([est, CI])
if len(self.arand_ci_result) == 1: self.arand_ci_result = self.arand_ci_result[0]
if verbose: print(tabulate(table, headers=['Estimate', 'CI'], floatfmt='.' + str(decimals) + 'f'))
[docs] def cluster_se(self, num_clusters=30, decimals=3, verbose=True):
"""Returns clustered standard errors.
Parameters
----------
num_clusters : int
Ignored if get_clusters() was already run on this object. If it wasn't, this calls the get_cluster() method, asking for this many clusters. Default value: 30.
decimals : int
Number of decimals to which to round the output table.
verbose : boolean
If True, calling this method prints out the results. Default value: True.
Attributes
----------
cluster_se_vcov : float
Cluster-robust variance estimate.
cluster_se_result : float
Clustered standard errors.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, A = FakeData.ols(network='RGG')
>>> ols_model = ni.OLS(Y, X, A)
>>> ols.get_clusters(30)
>>> ols.cluster_se()
"""
if not hasattr(self,'clusters'): self.get_clusters(num_clusters)
if isinstance(self.invhessian, np.ndarray):
self.cluster_se_vcov = self.data['Y'].size * self.invhessian.dot(core.cluster_var(self.scores, self.clusters)).dot(self.invhessian)
self.cluster_se_result = np.sqrt(np.diag(self.cluster_se_vcov))
else:
self.cluster_se_vcov = core.cluster_var(self.summands, self.clusters)
self.cluster_se_result = np.sqrt(self.cluster_se_vcov / self.summands.size)
if self.estimate.size == 1:
est = np.array([self.estimate])
se = np.array([self.cluster_se_result])
else:
est = self.estimate
se = self.cluster_se_result
if verbose:
CV = norm.ppf(1-0.05/2)
fmat = '%.' + str(decimals) + 'f'
table = []
for k in range(est.size):
CI = [est[k] - CV * se[k], est[k] + CV * se[k]]
CI = [float(Decimal(fmat % CI[0])), float(Decimal(fmat % CI[1]))]
table.append([est[k], se[k], CI])
print(tabulate(table, headers=['Estimate', 'SE', '95% CI'], floatfmt='.' + str(decimals) + 'f'))
[docs]class TSLS(OLS):
"""2SLS estimator.
Parameters
----------
Y : numpy float array
n-dimensional array of outcomes.
X : numpy float array
n x k array of regressors (not including intercept) or n-dimensional array.
W : numpy float array
n x d array of instruments for d >= k (not including intercept) or n-dimensional array.
A : NetworkX undirected graph
Graph on n nodes. NOTE: Assumes nodes are labeled as integers 0, ..., n-1 in A, so that the outcome of node i is given by the ith component of Y. Network can be weighted or directed, although weights and directions are ignored when computing network SEs. Argument not used for dependence robust test or CI. Default value: None.
Attributes
----------
data : dictionary
Stores all input data, adding a column of ones to X and W.
summands : numpy array
n-dimensional array of intermediate products used to compute 2SLS estimator.
estimate : float
2SLS estimator.
resid : numpy array
Regression residuals.
invhessian : numpy array
Inverse hessian matrix.
scores : numpy array
Regression scores.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, X, W, A = FakeData.tsls()
>>> tsls_model = ni.TSLS(Y, X, W, A)
>>> print(tsls_model.estimate)
"""
def __init__(self, Y, X, W, A=None):
"""Stores inputs, computes estimator.
"""
n = Y.size
if X.ndim == 1:
Xp = np.vstack([np.ones(n), X]).T
elif X.ndim == 2:
Xp = np.hstack([np.ones(n)[:,np.newaxis], X])
if W.ndim == 1:
Wp = np.vstack([np.ones(n), W]).T
elif W.ndim == 2:
Wp = np.hstack([np.ones(n)[:,np.newaxis], W])
S = Wp.T.dot(Xp) # (d+1) x (k+1)
P = inv(Wp.T.dot(Wp)) # (d+1) x (d+1)
self.invhessian = inv(S.T.dot(P).dot(S)) # (k+1) x (k+1), Xp'Wp(Wp'Wp)^{-1}Wp'Xp matrix
self.summands = Wp.dot(P).dot(S) * Y[:,np.newaxis] # n x (k+1), mean of this is Xp'Wp(Wp'Wp)^{-1}Wp'Y
self.estimate = self.invhessian.dot(self.summands.sum(axis=0)) # (k+1) dimensional, 2SLS estimate
self.resid = Y - Xp.dot(self.estimate) # residuals
self.scores = Wp.dot(P).dot(S) * self.resid[:,np.newaxis]
self.data = {'Y':Y, 'X':Xp, 'W':Wp, 'network':A}
[docs] def est_by_cluster(self, dimension):
"""Returns array of 2SLS estimates, one for each cluster. This is a helper method used by arand_test() and arand_ci().
Parameters
----------
dimension : int
Dimension of estimand being tested. Ignored if estimand is scalar. Default value: 0.
Returns
-------
thetahat : numpy array
L-dimensional array of OLS estimates, one for each of the L clusters.
"""
thetahat = []
for C in np.unique(self.clusters):
members = np.where(self.clusters==C)[0]
Yp = self.data['Y'][members]
Xp = self.data['X'][members,:]
Wp = self.data['W'][members,:]
S = Wp.T.dot(Xp)
P = np.linalg.pinv(Wp.T.dot(Wp))
thetahat.append( np.linalg.pinv(S.T.dot(P).dot(S)).dot(S.T.dot(P).dot(Wp.T.dot(Yp[:,np.newaxis])))[dimension,0] )
if len(thetahat) == 1:
thetahat = thetahat[0]
else:
thetahat = np.array(thetahat)
return thetahat
[docs]class IPW(OLS):
"""Horovitz-Thompson estimator (inverse probability weighting with known propensity scores). See e.g. [1]_ Formula:
.. math::
\\frac{1}{n} \sum_{i=1}^n \left( \\frac{\\text{ind1}_i}{\\text{pscores1}_i} - \\frac{\\text{ind2}_i}{\\text{pscores2}_i} \\right) Y_i.
Parameters
----------
Y : numpy float array
n-dimensional array of outcomes.
ind1 : numpy int array
n-dimensional array of indicators for first exposure mapping.
ind2 : numpy int array
n-dimensional array of indicators for second exposure mapping.
pscores1 : numpy float array
n-dimensional array of propensity scores corresponding to first exposure mapping ind1. The ith component is node i's probability of exposure.
pscores2 : numpy float array
n-dimensional array of propensity scores corresponding to second exposure mapping ind2. The ith component is node i's probability of exposure.
A : NetworkX undirected graph
Graph on n nodes. NOTE: Assumes nodes are labeled as integers 0, ..., n-1 in A, so that the outcome for node i is given by the ith component of Y. Network can be weighted or directed, although weights and directions are ignored when computing network SEs. Argument not used for dependence robust test or CI. Default value: None.
Attributes
----------
data : dictionary
Stores all input data.
summands : numpy array
n-dimensional array whose mean is the IPW estimator.
estimate : float
IPW estimator.
Examples
--------
>>> import networkinference as ni
>>> from networkinference.utils import FakeData
>>> Y, ind1, ind2, pscores1, pscores2, A = FakeData.ipw()
>>> ipw_model = ni.IPW(Y, ind1, ind2, pscores1, pscores2, A)
>>> print(ipw_model.estimate)
References
----------
.. [1] Leung, M. "Causal Inference Under Approximate Neighborhood Interference," Econometrica (forthcoming), 2021.
"""
def __init__(self, Y, ind1, ind2, pscores1, pscores2, A=None):
"""Stores inputs, computes estimator.
"""
self.data = {'Y':Y, 'ind1':ind1, 'ind2':ind2, 'pscores1':pscores1, 'pscores2':pscores2, 'network':A}
weight1 = self.data['ind1'].copy().astype('float')
weight2 = self.data['ind2'].copy().astype('float')
weight1[weight1 == 1] = self.data['ind1'][weight1 == 1] / self.data['pscores1'][weight1 == 1]
weight2[weight2 == 1] = self.data['ind2'][weight2 == 1] / self.data['pscores2'][weight2 == 1]
self.summands = self.data['Y'] * (weight1 - weight2)
self.estimate = self.summands.mean() # IPW estimate
self.invhessian = 1
[docs] def est_by_cluster(self, dimension):
"""Returns array of IPW estimators, one for each cluster. This is a helper method used by arand_test() and arand_ci().
Parameters
----------
dimension : int
Argument ignored.
Returns
-------
thetahat : numpy array
L-dimensional array of means, one for each of the L clusters.
"""
thetahat = []
for C in np.unique(self.clusters):
members = np.where(self.clusters==C)[0]
Z = self.summands[members]
thetahat.append( Z.mean() )
if len(thetahat) == 1:
thetahat = thetahat[0]
else:
thetahat = np.array(thetahat)
return thetahat