算法公平性统计指标:机会均等与校准误差的不可兼容性
算法公平性统计指标:机会均等与校准误差的不可兼容性
引言:公平性悖论与统计指标的冲突
在算法公平性研究的核心,存在着一个深刻的数学悖论:我们能否同时实现统计上的机会均等和完美的概率校准?2016年,Kleinberg等人从理论上证明了,在某些条件下,这两个被广泛追求的公平性目标是本质上不可兼容的。这一发现不仅动摇了早期公平性研究的乐观假设,也为算法公平性的实践带来了根本性挑战。
本文将从理论基础、数学证明、实际影响和解决方案等多个维度,深入探讨机会均等与校准误差之间的不可兼容性问题,并通过详细的代码实例展示这一理论矛盾在实际机器学习系统中的具体表现。
理论基础:三大公平性统计指标
机会均等(Equalized Odds)的严格定义
机会均等要求算法预测在不同受保护群体中具有相同的错误率。数学上,对于二元分类器Ŷ和真实标签Y,给定受保护属性A(如性别、种族),机会均等要求:
这意味着:
- 平等真正率:两个群体中真正例的比例相同
- 平等假正率:两个群体中假正例的比例相同
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
from typing import Dict, Tuple
class EqualizedOddsAnalyzer:
"""机会均等性分析器"""
def __init__(self, y_true: np.ndarray, y_pred: np.ndarray,
sensitive_attribute: np.ndarray):
"""
初始化分析器
参数:
y_true: 真实标签 (0或1)
y_pred: 预测标签 (0或1)
sensitive_attribute: 受保护属性 (0或1表示不同群体)
"""
self.y_true = y_true
self.y_pred = y_pred
self.sensitive = sensitive_attribute
# 分离两个群体的数据
self.mask_group_0 = (sensitive_attribute == 0)
self.mask_group_1 = (sensitive_attribute == 1)
def calculate_equalized_odds(self) -> Dict[str, Dict[str, float]]:
"""
计算机会均等各项指标
返回:
包含TPR、FPR、TNR、FNR的字典
"""
results = {}
for group_name, mask in [("group_0", self.mask_group_0),
("group_1", self.mask_group_1)]:
if np.sum(mask) == 0:
continue
y_true_group = self.y_true[mask]
y_pred_group = self.y_pred[mask]
# 计算混淆矩阵
tn, fp, fn, tp = confusion_matrix(y_true_group, y_pred_group).ravel()
# 计算各项比率
tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
tnr = tn / (tn + fp) if (tn + fp) > 0 else 0
fnr = fn / (fn + tp) if (fn + tp) > 0 else 0
results[group_name] = {
'TPR': tpr,
'FPR': fpr,
'TNR': tnr,
'FNR': fnr,
'sample_size': len(y_true_group)
}
return results
def check_equalized_odds(self, tolerance: float = 0.05) -> Tuple[bool, Dict]:
"""
检查是否满足机会均等
参数:
tolerance: 允许的差异容忍度
返回:
(是否满足, 详细差异报告)
"""
metrics = self.calculate_equalized_odds()
if len(metrics) < 2:
return False, {"error": "需要两个群体的数据"}
# 计算差异
tpr_diff = abs(metrics['group_0']['TPR'] - metrics['group_1']['TPR'])
fpr_diff = abs(metrics['group_0']['FPR'] - metrics['group_1']['FPR'])
# 判断是否满足机会均等
satisfies_tpr = tpr_diff <= tolerance
satisfies_fpr = fpr_diff <= tolerance
satisfies_eo = satisfies_tpr and satisfies_fpr
report = {
'satisfies_equalized_odds': satisfies_eo,
'TPR_difference': tpr_diff,
'FPR_difference': fpr_diff,
'satisfies_TPR_equality': satisfies_tpr,
'satisfies_FPR_equality': satisfies_fpr,
'detailed_metrics': metrics
}
return satisfies_eo, report
完美校准(Perfect Calibration)的数学定义
校准性要求预测概率能够准确反映实际概率。对于概率预测器Ŝ,完美校准要求:
这意味着对于所有给出预测概率s的个体,其真实为正例的比例应该恰好是s。
class CalibrationAnalyzer:
"""校准性分析器"""
def __init__(self, y_true: np.ndarray, y_prob: np.ndarray,
sensitive_attribute: np.ndarray = None):
"""
初始化校准分析器
参数:
y_true: 真实标签
y_prob: 预测概率 (0到1之间)
sensitive_attribute: 可选,受保护属性
"""
self.y_true = y_true
self.y_prob = y_prob
self.sensitive = sensitive_attribute
def calculate_calibration_error(self, n_bins: int = 10) -> Dict:
"""
计算校准误差
参数:
n_bins: 分箱数量
返回:
包含总体校准误差和分群体误差的字典
"""
# 总体校准误差
overall_ece = self._expected_calibration_error(self.y_true, self.y_prob, n_bins)
results = {
'overall': {
'ECE': overall_ece,
'calibration_curve': self._calculate_calibration_curve(self.y_true, self.y_prob, n_bins)
}
}
# 如果提供了受保护属性,计算各群体的校准误差
if self.sensitive is not None:
unique_groups = np.unique(self.sensitive)
for group in unique_groups:
mask = (self.sensitive == group)
if np.sum(mask) > 0:
y_true_group = self.y_true[mask]
y_prob_group = self.y_prob[mask]
group_ece = self._expected_calibration_error(y_true_group, y_prob_group, n_bins)
results[f'group_{group}'] = {
'ECE': group_ece,
'calibration_curve': self._calculate_calibration_curve(y_true_group, y_prob_group, n_bins),
'sample_size': len(y_true_group)
}
return results
def _expected_calibration_error(self, y_true: np.ndarray, y_prob: np.ndarray, n_bins: int) -> float:
"""
计算期望校准误差 (ECE)
ECE = Σ |accuracy(bin_i) - confidence(bin_i)| * (n_i / n_total)
"""
# 等宽分箱
bins = np.linspace(0, 1, n_bins + 1)
bin_indices = np.digitize(y_prob, bins) - 1
bin_indices = np.clip(bin_indices, 0, n_bins - 1)
ece = 0.0
n_total = len(y_true)
for i in range(n_bins):
mask = (bin_indices == i)
if np.sum(mask) > 0:
bin_prob = y_prob[mask]
bin_true = y_true[mask]
# 计算箱内的平均预测概率
avg_confidence = np.mean(bin_prob)
# 计算箱内的准确率
accuracy = np.mean(bin_true)
# 计算权重
weight = len(bin_true) / n_total
# 累加校准误差
ece += weight * abs(accuracy - avg_confidence)
return ece
def _calculate_calibration_curve(self, y_true: np.ndarray, y_prob: np.ndarray, n_bins: int) -> Dict:
"""计算校准曲线数据"""
bins = np.linspace(0, 1, n_bins + 1)
bin_indices = np.digitize(y_prob, bins) - 1
bin_indices = np.clip(bin_indices, 0, n_bins - 1)
calibration_data = []
for i in range(n_bins):
mask = (bin_indices == i)
if np.sum(mask) > 0:
bin_prob = y_prob[mask]
bin_true = y_true[mask]
avg_confidence = np.mean(bin_prob)
accuracy = np.mean(bin_true)
count = len(bin_true)
calibration_data.append({
'bin_index': i,
'avg_confidence': avg_confidence,
'accuracy': accuracy,
'count': count,
'calibration_error': abs(accuracy - avg_confidence)
})
return calibration_data
def check_perfect_calibration(self, tolerance: float = 0.05) -> Tuple[bool, Dict]:
"""
检查是否满足完美校准
参数:
tolerance: 允许的校准误差容忍度
返回:
(是否满足, 详细报告)
"""
calibration_results = self.calculate_calibration_error()
# 检查总体校准误差
overall_satisfies = calibration_results['overall']['ECE'] <= tolerance
# 检查各群体校准误差(如果存在)
group_satisfies = True
group_details = {}
for key in calibration_results:
if key != 'overall':
group_ece = calibration_results[key]['ECE']
group_satisfies = group_satisfies and (group_ece <= tolerance)
group_details[key] = {
'ECE': group_ece,
'satisfies': group_ece <= tolerance
}
# 完美校准要求所有群体都满足
perfect_calibration = overall_satisfies and group_satisfies
report = {
'perfect_calibration': perfect_calibration,
'overall_ECE': calibration_results['overall']['ECE'],
'group_details': group_details,
'full_calibration_results': calibration_results
}
return perfect_calibration, report
不可能性定理:数学证明与直观解释
Kleinberg不可能性定理的严格表述
定理:假设以下条件成立:
- 预测器是非平凡的(对两个群体都有预测能力)
- 两个群体有不同的基础比率:P(Y=1|A=0) ≠ P(Y=1|A=1)
- 预测器满足完美校准
那么,预测器不可能同时满足机会均等。
数学证明的核心思路
class ImpossibilityTheoremDemonstrator:
"""不可能性定理演示器"""
def __init__(self, base_rate_0: float, base_rate_1: float):
"""
初始化演示器
参数:
base_rate_0: 群体0的基础比率 P(Y=1|A=0)
base_rate_1: 群体1的基础比率 P(Y=1|A=1)
"""
self.base_rate_0 = base_rate_0
self.base_rate_1 = base_rate_1
def demonstrate_impossibility(self) -> Dict:
"""
演示不可能性定理
返回:
包含定理证明步骤和结论的字典
"""
print("=" * 60)
print("不可能性定理演示")
print("=" * 60)
print(f"群体0基础比率: {self.base_rate_0:.3f}")
print(f"群体1基础比率: {self.base_rate_1:.3f}")
print()
# 假设我们有完美校准的预测器
# 根据完美校准的定义,预测概率等于真实概率
# 步骤1: 计算完美校准下的预测分布
# 由于完美校准,预测概率的期望等于基础比率
expected_pred_0 = self.base_rate_0
expected_pred_1 = self.base_rate_1
print("步骤1: 完美校准下的期望预测")
print(f"群体0的期望预测: E[Ŝ|A=0] = {expected_pred_0:.3f}")
print(f"群体1的期望预测: E[Ŝ|A=1] = {expected_pred_1:.3f}")
print()
# 步骤2: 机会均等的要求
# 机会均等要求: TPR和FPR在两个群体中相等
# TPR = P(Ŷ=1|Y=1, A) = P(Ŝ>τ|Y=1, A)
# FPR = P(Ŷ=1|Y=0, A) = P(Ŝ>τ|Y=0, A)
# 步骤3: 使用贝叶斯定理推导矛盾
# 根据完美校准,我们可以写出条件分布
print("步骤2: 推导矛盾")
print("-" * 40)
# 假设存在阈值τ使得预测器满足机会均等
# 我们可以证明这会与基础比率差异矛盾
contradiction_proof = self._derive_contradiction()
return {
'base_rate_0': self.base_rate_0,
'base_rate_1': self.base_rate_1,
'expected_predictions': {
'group_0': expected_pred_0,
'group_1': expected_pred_1
},
'contradiction_proof': contradiction_proof,
'theorem_statement': "在非平凡预测器、不同基础比率和完美校准的条件下,机会均等不可能实现"
}
def _derive_contradiction(self) -> Dict:
"""推导数学矛盾"""
# 令α = P(Ŝ>τ|Y=1, A=0) = P(Ŝ>τ|Y=1, A=1) [机会均等的TPR相等]
# 令β = P(Ŝ>τ|Y=0, A=0) = P(Ŝ>τ|Y=0, A=1) [机会均等的FPR相等]
# 使用全概率公式计算每个群体的预测为正的比例
# P(Ŝ>τ|A=0) = α * base_rate_0 + β * (1 - base_rate_0)
# P(Ŝ>τ|A=1) = α * base_rate_1 + β * (1 - base_rate_1)
# 但由于完美校准,预测为正的比例应该反映真实概率
# 这与基础比率差异矛盾
print("假设:")
print(" 1. 完美校准: P(Y=1|Ŝ=s, A=a) = s")
print(" 2. 机会均等: TPR和FPR在两个群体中相等")
print(" 3. 不同基础比率: P(Y=1|A=0) ≠ P(Y=1|A=1)")
print()
print("推导:")
print(" 设 α = TPR (两个群体相等)")
print(" 设 β = FPR (两个群体相等)")
print()
print(" 群体0预测为正的比例:")
print(f" P(Ŷ=1|A=0) = α × {self.base_rate_0:.3f} + β × {1-self.base_rate_0:.3f}")
print()
print(" 群体1预测为正的比例:")
print(f" P(Ŷ=1|A=1) = α × {self.base_rate_1:.3f} + β × {1-self.base_rate_1:.3f}")
print()
print(" 完美校准要求预测比例反映真实基础比率")
print(" 但这两个表达式只有在基础比率相等时才能同时满足完美校准")
print()
print("结论: 矛盾!")
return {
'assumptions': [
'完美校准',
'机会均等',
'不同基础比率'
],
'contradiction': '完美校准要求预测分布反映真实基础比率差异,但机会均等强制两个群体的错误率相同,这要求预测分布相同,形成矛盾'
}
直观解释:天气预报的比喻
想象两个城市:
- 城市A(沙漠城市):下雨的基础概率是10%
- 城市B(雨林城市):下雨的基础概率是50%
如果一个天气预报系统是完美校准的:
- 在城市A,当它说"10%概率下雨"时,实际下雨的比例确实是10%
- 在城市B,当它说"50%概率下雨"时,实际下雨的比例确实是50%
现在要求机会均等:
- 真正率相等:在两个城市中,当下雨时预报说有雨的比例相同
- 假正率相等:在两个城市中,当不下雨时预报说有雨的比例相同
由于两个城市的基础天气模式不同,满足完美校准的预报系统在两个城市的预报行为必然不同。强制要求相同的错误率(机会均等)会破坏这种适应性,从而导致校准失效。
实际影响与案例研究
信贷审批系统中的公平性困境
class CreditScoringCaseStudy:
"""信贷审批案例研究"""
def __init__(self):
"""初始化案例数据"""
# 模拟数据:两个不同信用基础的群体
np.random.seed(42)
# 群体A:传统银行客户(基础违约率10%)
n_group_a = 5000
self.group_a_data = self._generate_group_data(
n=n_group_a,
base_default_rate=0.10,
group_id=0
)
# 群体B:新兴市场客户(基础违约率20%)
n_group_b = 5000
self.group_b_data = self._generate_group_data(
n=n_group_b,
base_default_rate=0.20,
group_id=1
)
# 合并数据
self.data = pd.concat([self.group_a_data, self.group_b_data], ignore_index=True)
def _generate_group_data(self, n: int, base_default_rate: float, group_id: int) -> pd.DataFrame:
"""生成群体数据"""
# 特征:收入、信用历史、负债比
income = np.random.lognormal(10.5, 0.3, n)
credit_history = np.random.exponential(5, n)
debt_ratio = np.random.beta(2, 5, n)
# 真实违约概率(逻辑回归模型)
log_odds = (
-2.0 +
0.5 * (income - income.mean()) / income.std() +
-1.0 * (credit_history - credit_history.mean()) / credit_history.std() +
2.0 * (debt_ratio - debt_ratio.mean()) / debt_ratio.std()
)
true_prob = 1 / (1 + np.exp(-log_odds))
# 调整到目标基础违约率
current_rate = true_prob.mean()
adjustment = np.log(base_default_rate/(1-base_default_rate)) - np.log(current_rate/(1-current_rate))
true_prob_adjusted = 1 / (1 + np.exp(-(log_odds + adjustment)))
# 生成真实标签
true_default = np.random.binomial(1, true_prob_adjusted)
# 模型预测概率(加入一些噪声)
predicted_prob = true_prob_adjusted + np.random.normal(0, 0.1, n)
predicted_prob = np.clip(predicted_prob, 0, 1)
# 预测标签(使用阈值0.5)
predicted_default = (predicted_prob > 0.5).astype(int)
return pd.DataFrame({
'income': income,
'credit_history': credit_history,
'debt_ratio': debt_ratio,
'true_default': true_default,
'predicted_prob': predicted_prob,
'predicted_default': predicted_default,
'group': group_id
})
def analyze_fairness_tradeoffs(self):
"""分析公平性权衡"""
print("=" * 60)
print("信贷审批案例:公平性权衡分析")
print("=" * 60)
# 分离两个群体的数据
data_a = self.data[self.data['group'] == 0]
data_b = self.data[self.data['group'] == 1]
# 分析1:原始模型的性能
print("\n1. 原始模型性能分析")
print("-" * 40)
# 机会均等分析
eo_analyzer = EqualizedOddsAnalyzer(
y_true=self.data['true_default'].values,
y_pred=self.data['predicted_default'].values,
sensitive_attribute=self.data['group'].values
)
satisfies_eo, eo_report = eo_analyzer.check_equalized_odds(tolerance=0.03)
print(f"是否满足机会均等: {satisfies_eo}")
print(f"TPR差异: {eo_report['TPR_difference']:.4f}")
print(f"FPR差异: {eo_report['FPR_difference']:.4f}")
# 校准性分析
cal_analyzer = CalibrationAnalyzer(
y_true=self.data['true_default'].values,
y_prob=self.data['predicted_prob'].values,
sensitive_attribute=self.data['group'].values
)
perfect_cal, cal_report = cal_analyzer.check_perfect_calibration(tolerance=0.03)
print(f"\n是否满足完美校准: {perfect_cal}")
print(f"总体校准误差 (ECE): {cal_report['overall_ECE']:.4f}")
# 分析2:基础比率差异
print("\n2. 基础比率分析")
print("-" * 40)
base_rate_a = data_a['true_default'].mean()
base_rate_b = data_b['true_default'].mean()
print(f"群体A(传统客户)违约率: {base_rate_a:.4f}")
print(f"群体B(新兴客户)违约率: {base_rate_b:.4f}")
print(f"基础比率差异: {abs(base_rate_a - base_rate_b):.4f}")
# 分析3:不可能性定理验证
print("\n3. 不可能性定理验证")
print("-" * 40)
if abs(base_rate_a - base_rate_b) > 0.01 and not perfect_cal and not satisfies_eo:
print("情况符合不可能性定理条件:")
print(" ✓ 不同基础比率")
print(" ✗ 不满足完美校准")
print(" ✗ 不满足机会均等")
print("\n符合定理预期:三个条件不能同时满足")
elif satisfies_eo and perfect_cal:
print("警告:如果同时满足机会均等和完美校准,则基础比率必须相同")
print(f"但实际基础比率: A={base_rate_a:.4f}, B={base_rate_b:.4f}")
return {
'equalized_odds_report': eo_report,
'calibration_report': cal_report,
'base_rates': {'A': base_rate_a, 'B': base_rate_b}
}
医疗诊断中的群体公平性
class HealthcareDiagnosticCase:
"""医疗诊断案例研究"""
def __init__(self):
"""模拟医疗诊断场景"""
np.random.seed(123)
# 两个群体:不同疾病患病率
# 群体0:低风险人群(患病率5%)
# 群体1:高风险人群(患病率15%)
self.n_samples = 10000
self.y_true, self.y_score, self.groups = self._generate_medical_data()
def _generate_medical_data(self):
"""生成医疗诊断数据"""
y_true = []
y_score = []
groups = []
for group_id, prevalence in [(0, 0.05), (1, 0.15)]:
n_group = self.n_samples // 2
# 生成真实患病状态
true_disease = np.random.binomial(1, prevalence, n_group)
# 生成预测分数(基于真实状态加上噪声)
scores = []
for i in range(n_group):
if true_disease[i] == 1:
# 患病者的分数较高但有一定重叠
base_score = np.random.beta(8, 2) # 偏向高值
else:
# 非患病者的分数较低但有一定重叠
base_score = np.random.beta(2, 8) # 偏向低值
# 添加噪声
score = base_score + np.random.normal(0, 0.1)
score = np.clip(score, 0, 1)
scores.append(score)
y_true.extend(true_disease)
y_score.extend(scores)
groups.extend([group_id] * n_group)
return np.array(y_true), np.array(y_score), np.array(groups)
def analyze_fairness_metrics(self, threshold: float = 0.5):
"""分析不同公平性指标"""
# 二值化预测
y_pred = (self.y_score >= threshold).astype(int)
print("\n医疗诊断公平性分析")
print("=" * 60)
# 1. 机会均等分析
eo_analyzer = EqualizedOddsAnalyzer(self.y_true, y_pred, self.groups)
eo_metrics = eo_analyzer.calculate_equalized_odds()
print("\n机会均等指标:")
for group, metrics in eo_metrics.items():
print(f"{group}: TPR={metrics['TPR']:.3f}, FPR={metrics['FPR']:.3f}")
tpr_diff = abs(eo_metrics['group_0']['TPR'] - eo_metrics['group_1']['TPR'])
fpr_diff = abs(eo_metrics['group_0']['FPR'] - eo_metrics['group_1']['FPR'])
print(f"TPR差异: {tpr_diff:.3f}, FPR差异: {fpr_diff:.3f}")
# 2. 校准性分析
cal_analyzer = CalibrationAnalyzer(self.y_true, self.y_score, self.groups)
cal_metrics = cal_analyzer.calculate_calibration_error()
print("\n校准误差:")
print(f"总体ECE: {cal_metrics['overall']['ECE']:.3f}")
if 'group_0' in cal_metrics:
print(f"群体0 ECE: {cal_metrics['group_0']['ECE']:.3f}")
if 'group_1' in cal_metrics:
print(f"群体1 ECE: {cal_metrics['group_1']['ECE']:.3f}")
# 3. 基础比率分析
mask_0 = (self.groups == 0)
mask_1 = (self.groups == 1)
base_rate_0 = self.y_true[mask_0].mean()
base_rate_1 = self.y_true[mask_1].mean()
print(f"\n基础比率: 群体0={base_rate_0:.3f}, 群体1={base_rate_1:.3f}")
print(f"基础比率差异: {abs(base_rate_0 - base_rate_1):.3f}")
# 4. 展示权衡关系
print("\n公平性权衡分析:")
print("-" * 40)
# 计算不同的权衡点
thresholds = np.linspace(0.3, 0.7, 5)
tradeoff_data = []
for t in thresholds:
y_pred_t = (self.y_score >= t).astype(int)
# 机会均等差异
eo_temp = EqualizedOddsAnalyzer(self.y_true, y_pred_t, self.groups)
eo_metrics_t = eo_temp.calculate_equalized_odds()
tpr_diff_t = abs(eo_metrics_t['group_0']['TPR'] - eo_metrics_t['group_1']['TPR'])
# 校准误差
cal_temp = CalibrationAnalyzer(self.y_true, self.y_score, self.groups)
cal_metrics_t = cal_temp.calculate_calibration_error()
ece_t = cal_metrics_t['overall']['ECE']
tradeoff_data.append({
'threshold': t,
'tpr_difference': tpr_diff_t,
'ece': ece_t
})
# 显示权衡表
print("阈值 | TPR差异 | 校准误差")
print("-" * 30)
for data in tradeoff_data:
print(f"{data['threshold']:.2f} | {data['tpr_difference']:.3f} | {data['ece']:.3f}")
return {
'equalized_odds': eo_metrics,
'calibration': cal_metrics,
'base_rates': {'group_0': base_rate_0, 'group_1': base_rate_1},
'tradeoff_analysis': tradeoff_data
}
解决方案与缓解策略
权衡管理框架
class FairnessTradeoffManager:
"""公平性权衡管理器"""
def __init__(self, y_true: np.ndarray, y_score: np.ndarray,
sensitive_attribute: np.ndarray):
"""
初始化权衡管理器
参数:
y_true: 真实标签
y_score: 预测分数
sensitive_attribute: 受保护属性
"""
self.y_true = y_true
self.y_score = y_score
self.sensitive = sensitive_attribute
def find_optimal_tradeoff(self, metric_weights: Dict[str, float] = None):
"""
寻找最优权衡点
参数:
metric_weights: 各指标的权重
- 'equalized_odds': 机会均等权重
- 'calibration': 校准权重
- 'accuracy': 准确率权重
返回:
最优阈值和对应的指标
"""
if metric_weights is None:
metric_weights = {
'equalized_odds': 0.4,
'calibration': 0.4,
'accuracy': 0.2
}
thresholds = np.linspace(0.1, 0.9, 50)
results = []
for threshold in thresholds:
# 二值化预测
y_pred = (self.y_score >= threshold).astype(int)
# 计算各项指标
metrics = self._calculate_all_metrics(y_pred, threshold)
# 计算加权分数(分数越低越好)
weighted_score = (
metric_weights['equalized_odds'] * metrics['equalized_odds_score'] +
metric_weights['calibration'] * metrics['calibration_score'] +
metric_weights['accuracy'] * (1 - metrics['accuracy'])
)
results.append({
'threshold': threshold,
'weighted_score': weighted_score,
**metrics
})
# 找到最优阈值(最小化加权分数)
results_df = pd.DataFrame(results)
optimal_idx = results_df['weighted_score'].idxmin()
optimal_result = results_df.iloc[optimal_idx].to_dict()
return optimal_result, results_df
def _calculate_all_metrics(self, y_pred: np.ndarray, threshold: float) -> Dict:
"""计算所有相关指标"""
metrics = {}
# 1. 机会均等差异
eo_analyzer = EqualizedOddsAnalyzer(self.y_true, y_pred, self.sensitive)
eo_metrics = eo_analyzer.calculate_equalized_odds()
if len(eo_metrics) >= 2:
tpr_diff = abs(eo_metrics['group_0']['TPR'] - eo_metrics['group_1']['TPR'])
fpr_diff = abs(eo_metrics['group_0']['FPR'] - eo_metrics['group_1']['FPR'])
metrics['tpr_difference'] = tpr_diff
metrics['fpr_difference'] = fpr_diff
metrics['equalized_odds_score'] = (tpr_diff + fpr_diff) / 2
else:
metrics['equalized_odds_score'] = 1.0 # 最差值
# 2. 校准误差
cal_analyzer = CalibrationAnalyzer(self.y_true, self.y_score, self.sensitive)
cal_metrics = cal_analyzer.calculate_calibration_error()
metrics['calibration_error'] = cal_metrics['overall']['ECE']
metrics['calibration_score'] = cal_metrics['overall']['ECE']
# 3. 准确率
metrics['accuracy'] = np.mean(self.y_true == y_pred)
# 4. 各群体指标
unique_groups = np.unique(self.sensitive)
group_metrics = {}
for group in unique_groups:
mask = (self.sensitive == group)
if np.sum(mask) > 0:
group_accuracy = np.mean(self.y_true[mask] == y_pred[mask])
group_tpr = np.mean(y_pred[mask][self.y_true[mask] == 1] == 1) if np.sum(self.y_true[mask] == 1) > 0 else 0
group_fpr = np.mean(y_pred[mask][self.y_true[mask] == 0] == 1) if np.sum(self.y_true[mask] == 0) > 0 else 0
group_metrics[f'group_{group}'] = {
'accuracy': group_accuracy,
'tpr': group_tpr,
'fpr': group_fpr,
'sample_size': np.sum(mask)
}
metrics['group_metrics'] = group_metrics
return metrics
def visualize_tradeoff_pareto(self, results_df: pd.DataFrame):
"""可视化帕累托前沿"""
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 1. 机会均等 vs 校准误差
ax = axes[0, 0]
ax.scatter(results_df['equalized_odds_score'],
results_df['calibration_score'],
c=results_df['threshold'], cmap='viridis')
ax.set_xlabel('机会均等差异')
ax.set_ylabel('校准误差')
ax.set_title('机会均等 vs 校准误差权衡')
# 2. 阈值 vs 加权分数
ax = axes[0, 1]
ax.plot(results_df['threshold'], results_df['weighted_score'], 'b-')
ax.set_xlabel('阈值')
ax.set_ylabel('加权分数')
ax.set_title('阈值选择优化')
ax.grid(True, alpha=0.3)
# 3. 准确率 vs 公平性
ax = axes[1, 0]
ax.scatter(results_df['accuracy'],
results_df['equalized_odds_score'],
c=results_df['threshold'], cmap='plasma')
ax.set_xlabel('准确率')
ax.set_ylabel('机会均等差异')
ax.set_title('准确率 vs 机会均等')
# 4. 帕累托前沿
ax = axes[1, 1]
# 找到帕累托最优解
pareto_mask = self._find_pareto_front(results_df[['equalized_odds_score', 'calibration_score']].values)
pareto_points = results_df[pareto_mask]
ax.scatter(results_df['equalized_odds_score'],
results_df['calibration_score'],
alpha=0.5, label='所有解')
ax.scatter(pareto_points['equalized_odds_score'],
pareto_points['calibration_score'],
color='red', s=100, label='帕累托最优')
ax.set_xlabel('机会均等差异')
ax.set_ylabel('校准误差')
ax.set_title('帕累托前沿')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def _find_pareto_front(self, points: np.ndarray) -> np.ndarray:
"""找到帕累托前沿(最小化两个目标)"""
n_points = points.shape[0]
is_pareto = np.ones(n_points, dtype=bool)
for i in range(n_points):
for j in range(n_points):
if i != j:
# 如果点j在两个目标上都优于点i
if (points[j, 0] <= points[i, 0] and points[j, 1] <= points[i, 1] and
(points[j, 0] < points[i, 0] or points[j, 1] < points[i, 1])):
is_pareto[i] = False
break
return is_pareto
后处理调整方法
class PostProcessingFairnessAdjuster:
"""后处理公平性调整器"""
def __init__(self, strategy: str = "threshold_adjustment"):
"""
初始化调整器
参数:
strategy: 调整策略
- 'threshold_adjustment': 阈值调整
- 'probability_calibration': 概率校准
- 'rejection_option': 拒绝选项
"""
self.strategy = strategy
self.group_thresholds = {}
self.calibration_models = {}
def fit(self, y_true: np.ndarray, y_score: np.ndarray,
sensitive_attribute: np.ndarray):
"""拟合调整模型"""
unique_groups = np.unique(sensitive_attribute)
if self.strategy == "threshold_adjustment":
self._fit_threshold_adjustment(y_true, y_score, sensitive_attribute, unique_groups)
elif self.strategy == "probability_calibration":
self._fit_probability_calibration(y_true, y_score, sensitive_attribute, unique_groups)
return self
def _fit_threshold_adjustment(self, y_true, y_score, sensitive_attribute, unique_groups):
"""拟合阈值调整模型"""
from sklearn.metrics import roc_curve
for group in unique_groups:
mask = (sensitive_attribute == group)
if np.sum(mask) == 0:
continue
y_true_group = y_true[mask]
y_score_group = y_score[mask]
# 寻找满足目标FPR的阈值
fpr, tpr, thresholds = roc_curve(y_true_group, y_score_group)
# 目标:统一所有群体的FPR
target_fpr = 0.1 # 可以根据需求调整
# 找到最接近目标FPR的阈值
idx = np.argmin(np.abs(fpr - target_fpr))
optimal_threshold = thresholds[idx] if idx < len(thresholds) else 0.5
self.group_thresholds[group] = optimal_threshold
def _fit_probability_calibration(self, y_true, y_score, sensitive_attribute, unique_groups):
"""拟合概率校准模型"""
from sklearn.isotonic import IsotonicRegression
for group in unique_groups:
mask = (sensitive_attribute == group)
if np.sum(mask) == 0:
continue
y_true_group = y_true[mask]
y_score_group = y_score[mask]
# 训练等渗回归校准器
calibrator = IsotonicRegression(out_of_bounds='clip')
calibrator.fit(y_score_group, y_true_group)
self.calibration_models[group] = calibrator
def predict(self, y_score: np.ndarray, sensitive_attribute: np.ndarray) -> np.ndarray:
"""应用调整后的预测"""
n_samples = len(y_score)
y_pred_adjusted = np.zeros(n_samples)
if self.strategy == "threshold_adjustment":
for group, threshold in self.group_thresholds.items():
mask = (sensitive_attribute == group)
y_pred_adjusted[mask] = (y_score[mask] >= threshold).astype(int)
elif self.strategy == "probability_calibration":
# 首先校准概率,然后使用统一阈值
y_score_calibrated = np.copy(y_score)
for group, calibrator in self.calibration_models.items():
mask = (sensitive_attribute == group)
if np.sum(mask) > 0:
y_score_calibrated[mask] = calibrator.predict(y_score[mask])
# 使用统一阈值
unified_threshold = 0.5
y_pred_adjusted = (y_score_calibrated >= unified_threshold).astype(int)
return y_pred_adjusted
def predict_proba(self, y_score: np.ndarray, sensitive_attribute: np.ndarray) -> np.ndarray:
"""返回调整后的概率"""
if self.strategy == "probability_calibration":
y_score_calibrated = np.copy(y_score)
for group, calibrator in self.calibration_models.items():
mask = (sensitive_attribute == group)
if np.sum(mask) > 0:
y_score_calibrated[mask] = calibrator.predict(y_score[mask])
return y_score_calibrated
else:
# 对于阈值调整,返回原始分数(但可以添加解释)
return y_score
理论扩展与前沿研究
条件机会均等与部分校准
由于完全的机会均等和完美校准不可兼得,研究者提出了折中方案:
-
条件机会均等:只在某些特征子集上要求机会均等
-
群体校准:允许不同群体有不同的校准函数
贝叶斯公平性框架
从贝叶斯视角看,不可能性定理反映了先验分布(基础比率)与似然函数(模型预测)之间的内在冲突。解决方案包括:
- 分层贝叶斯模型:显式建模群体差异
- 正则化先验:在群体公平性目标上添加先验约束
class BayesianFairnessModel:
"""贝叶斯公平性模型"""
def __init__(self):
"""初始化模型参数"""
import pymc3 as pm
self.model = None
self.trace = None
def build_hierarchical_model(self, X, y, groups):
"""构建分层贝叶斯模型"""
n_groups = len(np.unique(groups))
n_features = X.shape[1]
with pm.Model() as hierarchical_model:
# 群体级别的超先验
mu_alpha = pm.Normal('mu_alpha', mu=0, sigma=1)
sigma_alpha = pm.HalfNormal('sigma_alpha', sigma=1)
mu_beta = pm.Normal('mu_beta', mu=0, sigma=1, shape=n_features)
sigma_beta = pm.HalfNormal('sigma_beta', sigma=1, shape=n_features)
# 群体特定参数
alpha = pm.Normal('alpha', mu=mu_alpha, sigma=sigma_alpha, shape=n_groups)
beta = pm.Normal('beta', mu=mu_beta, sigma=sigma_beta, shape=(n_groups, n_features))
# 线性预测
logits = alpha[groups] + pm.math.dot(X, beta[groups].T)
# 似然
y_obs = pm.Bernoulli('y_obs', logit_p=logits, observed=y)
# 公平性约束(作为先验)
# 可以添加群体间参数差异的约束
self.model = hierarchical_model
return hierarchical_model
def add_fairness_constraint(self, constraint_type: str = "equal_odds_approx"):
"""添加公平性约束"""
# 在贝叶斯框架中,公平性可以作为先验或正则化项加入
pass
实践指南与政策建议
实际应用中的建议
- 情境化分析:理解具体应用场景中不同公平性指标的重要性
- 利益相关者参与:让受影响的群体参与公平性标准的制定
- 透明化权衡:明确告知决策中的公平性-准确性权衡
- 持续监控:建立长期监控机制,检测公平性漂移
政策制定建议
- 避免单一指标:不要立法强制单一公平性指标
- 允许情境化调整:为不同应用场景留出调整空间
- 促进算法透明:要求披露使用的公平性指标和权衡选择
- 建立申诉机制:为受到不公平对待的个体提供申诉渠道
结论
机会均等与校准误差的不可兼容性揭示了算法公平性本质上的复杂性。这一数学事实告诉我们,追求完美的算法公平性可能是一个不可能的任务,至少在使用当前主流统计框架的情况下是如此。
然而,这并不意味着我们应该放弃公平性的追求。相反,这一理论结果强调了:
- 需要明确的价值判断:在不同公平性目标之间进行权衡需要明确的价值选择
- 情境的重要性:不同应用场景可能需要不同的公平性标准
- 持续研究的必要性:需要开发新的数学框架来超越当前限制
在实践中,算法公平性不是一个可以"解决"的技术问题,而是一个需要持续管理的社会-技术挑战。通过理解这些根本性的限制,我们可以更明智地设计和部署人工智能系统,在技术可能性与社会价值之间找到适当的平衡。
- 点赞
- 收藏
- 关注作者
评论(0)