联邦学习算法之一ModelArts “pytorch_fedamp_emnist_classification”学习(五)
上期,我们已经实现了pytorch_fedamp_emnist_classification上的案例。这期,我们将结合实际数据,对模型进行一些更加深入的探索。
1. 尝试只有三个租户的个性化联邦学习
batch_size = 100
num_epochs = 5
communication = 20
val_acc_cloud = []
val_acc_client = []
num_clients = 3
for rounds in range(communication):
val_acc_total = 0
# Train model locally
print("===Starting local training===")
for i in range(num_clients):
print("Training client " + str(i), end='\r')
model, optimizer = client_models[i]
local_client = client_feds[i]
data = client_data[i]
acc = train_local_clients(model, optimizer, data, num_epochs)
val_acc_total += acc
local_client.send_model(parameters=optimizer.param_groups[0]['params'])
# server aggregate
val_acc_avg = val_acc_total / num_clients *100
val_acc_client.append(val_acc_avg)
clear_output(wait=True)
print("\n===Local training is completed===")
print("Communication round: {0:1d}".format(rounds+1))
print("===Starting model aggregation===")
step = list(server.backend.wait_next())[-1]
ctx = server.backend.get_context()
server.backend.aggregate_data(step, ctx.names)
print("===Model aggregation is completed===")
print("Local model average validation test accuracy: {0:5.2f}%".format(val_acc_avg))
# Update local model
val_acc_total = 0
with torch.no_grad():
for i in range(num_clients):
val_samples = client_data[i].val_samples
val_labels = client_data[i].val_labels
local_client = client_feds[i]
model, optimizer = client_models[i]
new_parameters = local_client.get_model()
local_client.clean_previous()
local_client.next_step()
parameters = optimizer.param_groups[0]['params']
for p, new_p in zip(parameters, new_parameters):
p.copy_(new_p)
acc = evaluation(model, val_samples, val_labels)
val_acc_total += acc
val_acc_avg = val_acc_total / num_clients *100
val_acc_cloud.append(val_acc_avg)
print("Personalized cloud model average validation accuracy: {0:5.2f}%".format(val_acc_avg))
print("====Training Completed====")
INFO:root:waiting for next federation...
INFO:root:ready to do federation. <step={'0'}> <clients=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61]> <algorithms=FedAMP(alpha=0.1, mul=50.0, sim=None)>
===Local training is completed===
Communication round: 1
===Starting model aggregation===
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-8-881b59aab94f> in <module>()
32 step = list(server.backend.wait_next())[-1]
33 ctx = server.backend.get_context()
---> 34 server.backend.aggregate_data(step, ctx.names)
35
36 print("===Model aggregation is completed===")
moxing/framework/federated/fed_backend.pyx in moxing.framework.federated.fed_backend.FederatedBackendOBS.aggregate_data()
KeyboardInterrupt:
第一轮通讯后卡死了,怀疑是租户初始化数量与训练数量不一致导致的
2. 重新初始化,租户数量为3,并进行个性化训练
num_clients = 10
dfl = DataFileLoaderHorizontal()
train_sample_filename = 'FedAMP/EMNIST/client_train_samples_'
train_label_filename = 'FedAMP/EMNIST/client_train_labels_'
val_sample_filename = 'FedAMP/EMNIST/client_test_samples_'
val_label_filename = 'FedAMP/EMNIST/client_test_labels_'
filename_sx = '.npy'
input_dim = (-1, 1, 28, 28)
CUDA_DEVICE = 0
client_data = []
for i in range(num_clients):
m_data = m_Data()
# Load training data
dfl.load_file_binary(data_filename=train_sample_filename + str(i) + filename_sx,
label_filename=train_label_filename + str(i) + filename_sx)
samples, labels = dfl.getDataToTorch()
m_data.train_samples = samples.reshape(input_dim)
m_data.train_labels = labels.squeeze()
# Load validation data
dfl.load_file_binary(data_filename=val_sample_filename + str(i) + filename_sx,
label_filename=val_label_filename + str(i) + filename_sx)
samples, labels = dfl.getDataToTorch()
m_data.val_samples = samples.reshape(input_dim)
m_data.val_labels = labels.squeeze()
client_data.append(m_data)
os.environ['MOX_FEDERATED_SIZE'] = str(num_clients)
os.environ['MOX_FEDERATED_BACKEND'] = 'obs'
os.environ['MOX_FEDERATED_URL'] = '/tmp/fed_workspace/'
stepsize = 0.001 #local training stepsize
alg_config = fed_algorithm.FedAMPConfig(alpha=0.1, mul=50.0, display=True) #FedAMP parameter setting
client_models = []
client_feds = []
optimizer_list = []
base_model = CNN()
for i in range(num_clients):
os.environ['MOX_FEDERATED_RANK'] = str(i)
backend_config = fed_backend.FederatedBackendOBSConfig(
load_fn=torch_load,
save_fn=torch_save,
suffix='pth')
# create client
fed_alg = fed_algorithm.build_algorithm(alg_config)
backend = fed_backend.build_backend(
config=backend_config,
fed_alg=fed_alg)
fed_client = client.FederatedHorizontalPTClient(backend=backend)
fed_client.connect()
client_feds.append(fed_client)
model = CNN()
optimizer = torch.optim.Adam(model.parameters(), lr=stepsize)
model.load_state_dict(base_model.state_dict())
client_models.append((model, optimizer))
optimizer_list.append(optimizer)
# create server
fed_alg = fed_algorithm.build_algorithm(alg_config)
fed_be_config = fed_backend.FederatedBackendOBSConfig(
load_fn=torch_load, save_fn=torch_save, suffix='pth', hooks=[TorchFedHook()])
fed_be = fed_backend.build_backend(fed_be_config, fed_alg)
server = fed_server.FederatedServerHorizontal(fed_be)
server.backend.wait_ready()
batch_size = 100
num_epochs = 5
communication = 20
val_acc_cloud = []
val_acc_client = []
for rounds in range(communication):
val_acc_total = 0
# Train model locally
print("===Starting local training===")
for i in range(num_clients):
print("Training client " + str(i), end='\r')
model, optimizer = client_models[i]
local_client = client_feds[i]
data = client_data[i]
acc = train_local_clients(model, optimizer, data, num_epochs)
val_acc_total += acc
local_client.send_model(parameters=optimizer.param_groups[0]['params'])
# server aggregate
val_acc_avg = val_acc_total / num_clients *100
val_acc_client.append(val_acc_avg)
clear_output(wait=True)
print("\n===Local training is completed===")
print("Communication round: {0:1d}".format(rounds+1))
print("===Starting model aggregation===")
step = list(server.backend.wait_next())[-1]
ctx = server.backend.get_context()
server.backend.aggregate_data(step, ctx.names)
print("===Model aggregation is completed===")
print("Local model average validation test accuracy: {0:5.2f}%".format(val_acc_avg))
# Update local model
val_acc_total = 0
with torch.no_grad():
for i in range(num_clients):
val_samples = client_data[i].val_samples
val_labels = client_data[i].val_labels
local_client = client_feds[i]
model, optimizer = client_models[i]
new_parameters = local_client.get_model()
local_client.clean_previous()
local_client.next_step()
parameters = optimizer.param_groups[0]['params']
for p, new_p in zip(parameters, new_parameters):
p.copy_(new_p)
acc = evaluation(model, val_samples, val_labels)
val_acc_total += acc
val_acc_avg = val_acc_total / num_clients *100
val_acc_cloud.append(val_acc_avg)
print("Personalized cloud model average validation accuracy: {0:5.2f}%".format(val_acc_avg))
print("====Training Completed====")
INFO:root:waiting for next federation...
INFO:root:ready to do federation. <step={'19'}> <clients=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]> <algorithms=FedAMP(alpha=0.1, mul=50.0, sim=None)>
===Local training is completed===
Communication round: 20
===Starting model aggregation===
===Model aggregation is completed===
Local model average validation test accuracy: 87.50%
Personalized cloud model average validation accuracy: 88.55%
====Training Completed====
plt.figure(figsize=(8, 6))
x = list(range(communication))
plt.xticks(x[0:-1:2])
acc_client, = plt.plot(x, val_acc_client, color="red")
acc_cloud, = plt.plot(x, val_acc_cloud, color="blue")
plt.legend([acc_client, acc_cloud], ['Client Acc', 'Cloud Acc'], fontsize=20)
plt.xlabel("Communication round", fontsize=18)
plt.ylabel("Mean validation accuracy", fontsize=18)
plt.title("Mean validation accuracy vs. Communication round", fontsize=18)
plt.show()
可以看到,当租户数量减少到3时,联邦学习的收敛性较差。
并且,当再次进行个性化学习时,可以看到准确率曲线变化较大,训练结果的稳定性也较差,是否收敛也值得考虑。为了验证模型的收敛性,我们将通讯次数提高到50。
可以看到,即使进行50轮通讯,收敛性仍然不是很好。我们不禁想问,当前租户样本量下,多少租户能够表现出较好的收敛性?下面我们将租户数提高到10,做一次尝试。
3. 10个租户的个性化学习
从图形看,当租户上升到10个时,收敛性明显上升。
4. 尝试将服务端与租户端分开
从对联邦学习的理解来看,服务端与租户端应该是可以分开运行的,我们对此进行一些尝试。
4.1. 服务端程序
import os
num_clients = 62
os.environ['MOX_FEDERATED_SIZE'] = str(num_clients)
os.environ['MOX_FEDERATED_BACKEND'] = 'obs'
os.environ['MOX_FEDERATED_URL'] = '/tmp/fed_workspace/'
from moxing.framework.federated import fed_algorithm
from moxing.framework.federated import fed_backend
from moxing.framework.federated import fed_server
from moxing.pytorch.executor.federated.util import TorchFedHook
from moxing.pytorch.executor.federated.util import torch_save
from moxing.pytorch.executor.federated.util import torch_load
alg_config = fed_algorithm.FedAMPConfig(alpha=0.1, mul=50.0, display=True) #FedAMP parameter setting
# create server
fed_alg = fed_algorithm.build_algorithm(alg_config)
fed_be_config = fed_backend.FederatedBackendOBSConfig(
load_fn=torch_load, save_fn=torch_save, suffix='pth', hooks=[TorchFedHook()])
fed_be = fed_backend.build_backend(fed_be_config, fed_alg)
server = fed_server.FederatedServerHorizontal(fed_be)
server.backend.wait_ready()
INFO:root:Using MoXing-v1.17.3-
INFO:root:Using OBS-Python-SDK-3.20.7
INFO:root:waiting for all clients ready...
服务端启动成功了,但大家可以看到,启动服务端的时候已经给定了租户数量这个参数,这可能也是导致我们1.里面修改租户数量后,个性化学习失败的原因。
4.2. 租户端程序
from IPython.display import clear_output
import torch
import numpy as np
import random
import moxing as mox
from moxing.framework.federated import fed_algorithm
from moxing.framework.federated import fed_backend
from moxing.pytorch.executor.federated.util import torch_load
from moxing.pytorch.executor.federated.util import torch_save
from moxing.pytorch.executor.federated import client
import matplotlib.pyplot as plt
%matplotlib inline
from moxing.framework.federated.fed_algorithm import FedAMP
import torch.nn.functional as F
if mox.file.is_directory('/tmp/fed_workspace/'):
mox.file.remove('/tmp/fed_workspace/', recursive=True)
class DataFileLoaderHorizontal():
def __init__(self, data=None, label=None):
if data is None:
self.data = data
if label is None:
self.label = label
def getDataToTorch(self):
return torch.FloatTensor(self.data), torch.FloatTensor(self.label)
def load_file_binary(self, data_filename=None, label_filename=None):
assert data_filename is not None
assert label_filename is not None
self.data = np.load(data_filename, allow_pickle=True)
self.label = np.load(label_filename, allow_pickle=True)
self.data, self.label = self.data.astype(float), self.label.astype(float)
class m_Data():
def __init__(self):
self.train_samples = None
self.train_labels = None
self.test_samples = None
self.train_samples = None
num_clients = 62
dfl = DataFileLoaderHorizontal()
train_sample_filename = 'FedAMP/EMNIST/client_train_samples_'
train_label_filename = 'FedAMP/EMNIST/client_train_labels_'
val_sample_filename = 'FedAMP/EMNIST/client_test_samples_'
val_label_filename = 'FedAMP/EMNIST/client_test_labels_'
filename_sx = '.npy'
input_dim = (-1, 1, 28, 28)
CUDA_DEVICE = 0
client_data = []
for i in range(num_clients):
m_data = m_Data()
# Load training data
dfl.load_file_binary(data_filename=train_sample_filename + str(i) + filename_sx,
label_filename=train_label_filename + str(i) + filename_sx)
samples, labels = dfl.getDataToTorch()
m_data.train_samples = samples.reshape(input_dim)
m_data.train_labels = labels.squeeze()
# Load validation data
dfl.load_file_binary(data_filename=val_sample_filename + str(i) + filename_sx,
label_filename=val_label_filename + str(i) + filename_sx)
samples, labels = dfl.getDataToTorch()
m_data.val_samples = samples.reshape(input_dim)
m_data.val_labels = labels.squeeze()
client_data.append(m_data)
class CNN(torch.nn.Module):
def __init__(self, n_channel0=1, n_channel1=32, n_channel2=64,
kernel_size=5, stride=1, padding=2,
n_fc_input=3136, n_fc1=512, n_output=62):
super(CNN, self).__init__()
self.conv1 = torch.nn.Conv2d(in_channels=n_channel0, out_channels=n_channel1,
kernel_size=kernel_size, stride=stride, padding=padding)
self.conv2 = torch.nn.Conv2d(in_channels=n_channel1, out_channels=n_channel2,
kernel_size=kernel_size, stride=stride, padding=padding)
self.fc1 = torch.nn.Linear(n_fc_input, n_fc1)
self.fc2 = torch.nn.Linear(n_fc1, n_output)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(x.shape[0], -1)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
stepsize = 0.001 #local training stepsize
alg_config = fed_algorithm.FedAMPConfig(alpha=0.1, mul=50.0, display=True) #FedAMP parameter setting
client_models = []
client_feds = []
optimizer_list = []
base_model = CNN()
for i in range(num_clients):
os.environ['MOX_FEDERATED_RANK'] = str(i)
backend_config = fed_backend.FederatedBackendOBSConfig(
load_fn=torch_load,
save_fn=torch_save,
suffix='pth')
# create client
fed_alg = fed_algorithm.build_algorithm(alg_config)
backend = fed_backend.build_backend(
config=backend_config,
fed_alg=fed_alg)
fed_client = client.FederatedHorizontalPTClient(backend=backend)
fed_client.connect()
client_feds.append(fed_client)
model = CNN()
optimizer = torch.optim.Adam(model.parameters(), lr=stepsize)
model.load_state_dict(base_model.state_dict())
client_models.append((model, optimizer))
optimizer_list.append(optimizer)
def evaluation(model, test_samples, test_labels):
with torch.no_grad():
outputs = model(test_samples)
_, preds = outputs.max(1)
test_acc = preds.eq(test_labels).sum() / float(len(test_labels))
return test_acc
def train_local_clients(model, optimizer, data, num_epochs):
train_samples = data.train_samples
train_labels = data.train_labels
val_samples = data.val_samples
val_labels = data.val_labels
total_num_samples = len(train_samples)
criterion = torch.nn.CrossEntropyLoss()
for x in range(num_epochs):
seq = np.arange(total_num_samples)
random.shuffle(seq)
for begin_batch_index in range(0, total_num_samples, batch_size):
batch_ids = seq[begin_batch_index: min(begin_batch_index + batch_size, total_num_samples)]
inputs = train_samples[batch_ids]
labels = train_labels[batch_ids]
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels.type(torch.long))
loss.backward()
optimizer.step()
test_acc = evaluation(model, val_samples, val_labels)
return test_acc
batch_size = 100
num_epochs = 5
communication = 20
val_acc_cloud = []
val_acc_client = []
for rounds in range(communication):
val_acc_total = 0
# Train model locally
print("===Starting local training===")
for i in range(num_clients):
print("Training client " + str(i), end='\r')
model, optimizer = client_models[i]
local_client = client_feds[i]
data = client_data[i]
acc = train_local_clients(model, optimizer, data, num_epochs)
val_acc_total += acc
local_client.send_model(parameters=optimizer.param_groups[0]['params'])
# server aggregate
val_acc_avg = val_acc_total / num_clients *100
val_acc_client.append(val_acc_avg)
clear_output(wait=True)
print("\n===Local training is completed===")
print("Communication round: {0:1d}".format(rounds+1))
print("===Starting model aggregation===")
step = list(server.backend.wait_next())[-1]
ctx = server.backend.get_context()
server.backend.aggregate_data(step, ctx.names)
print("===Model aggregation is completed===")
print("Local model average validation test accuracy: {0:5.2f}%".format(val_acc_avg))
# Update local model
val_acc_total = 0
with torch.no_grad():
for i in range(num_clients):
val_samples = client_data[i].val_samples
val_labels = client_data[i].val_labels
local_client = client_feds[i]
model, optimizer = client_models[i]
new_parameters = local_client.get_model()
local_client.clean_previous()
local_client.next_step()
parameters = optimizer.param_groups[0]['params']
for p, new_p in zip(parameters, new_parameters):
p.copy_(new_p)
acc = evaluation(model, val_samples, val_labels)
val_acc_total += acc
val_acc_avg = val_acc_total / num_clients *100
val_acc_cloud.append(val_acc_avg)
print("Personalized cloud model average validation accuracy: {0:5.2f}%".format(val_acc_avg))
print("====Training Completed====")
plt.figure(figsize=(8, 6))
x = list(range(communication))
plt.xticks(x[0:-1:2])
acc_client, = plt.plot(x, val_acc_client, color="red")
acc_cloud, = plt.plot(x, val_acc_cloud, color="blue")
plt.legend([acc_client, acc_cloud], ['Client Acc', 'Cloud Acc'], fontsize=20)
plt.xlabel("Communication round", fontsize=18)
plt.ylabel("Mean validation accuracy", fontsize=18)
plt.title("Mean validation accuracy vs. Communication round", fontsize=18)
plt.show()
INFO:root:waiting for next federation...
INFO:root:ready to do federation. <step={'19'}> <clients=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61]> <algorithms=FedAMP(alpha=0.1, mul=50.0, sim=None)>
===Local training is completed===
Communication round: 20
===Starting model aggregation===
===Model aggregation is completed===
Local model average validation test accuracy: 76.60%
Personalized cloud model average validation accuracy: 81.45%
====Training Completed====
从准确率及图形都可以看到跟上一期的训练结果基本一致,说明租户数量达到62后,模型具有较好的稳定性。但是,目前的租户端程度中需要调用服务端的server,且服务端需要提前定义租户数量,无法跨平台进行联邦学习。未来,需要将服务端和租户端完全独立,不同租户独立与服务端进行通讯,才能让联邦学习模型具有更好的使用价值。
- 点赞
- 收藏
- 关注作者
评论(0)