联邦学习算法之一ModelArts “pytorch_fedamp_emnist_classification”学习(五)

举报
darkpard 发表于 2021/04/24 18:41:16 2021/04/24
【摘要】 这期,我们结合具体数据对建模过程进行更深入地探索。目前的租户端程度中需要调用服务端的server,且服务端需要提前定义租户数量,无法跨平台进行联邦学习。如果服务端和租户端能完全独立,不同租户独立与服务端通讯,将更具使用价值。

上期,我们已经实现了pytorch_fedamp_emnist_classification上的案例。这期,我们将结合实际数据,对模型进行一些更加深入的探索。

1. 尝试只有三个租户的个性化联邦学习

batch_size = 100
num_epochs = 5
communication = 20
val_acc_cloud = []
val_acc_client = []
num_clients = 3
 
for rounds in range(communication):
    val_acc_total = 0
    # Train model locally
   
    print("===Starting local training===")
   
    for i in range(num_clients):
        print("Training client " + str(i), end='\r')
        model, optimizer = client_models[i]
        local_client = client_feds[i]
        data = client_data[i]
        acc = train_local_clients(model, optimizer, data, num_epochs)  
        val_acc_total += acc
        local_client.send_model(parameters=optimizer.param_groups[0]['params'])  
   
    # server aggregate 
    val_acc_avg = val_acc_total / num_clients *100
    val_acc_client.append(val_acc_avg)
   
    clear_output(wait=True)
    print("\n===Local training is completed===")
    print("Communication round: {0:1d}".format(rounds+1))
    print("===Starting model aggregation===")
   
    step = list(server.backend.wait_next())[-1]
    ctx = server.backend.get_context()
    server.backend.aggregate_data(step, ctx.names)
   
    print("===Model aggregation is completed===")
   
    print("Local model average validation test accuracy: {0:5.2f}%".format(val_acc_avg))
   
    # Update local model
    val_acc_total = 0
    with torch.no_grad():
        for i in range(num_clients):
            val_samples =   client_data[i].val_samples
            val_labels = client_data[i].val_labels
            local_client = client_feds[i]
            model, optimizer = client_models[i]
            new_parameters = local_client.get_model()
            local_client.clean_previous()
            local_client.next_step()
            parameters = optimizer.param_groups[0]['params']
            for p, new_p in zip(parameters, new_parameters):
                p.copy_(new_p)
            acc = evaluation(model, val_samples, val_labels)
            val_acc_total += acc
    val_acc_avg = val_acc_total / num_clients *100
    val_acc_cloud.append(val_acc_avg)
    print("Personalized cloud model average validation accuracy: {0:5.2f}%".format(val_acc_avg))     
       
print("====Training Completed====")

INFO:root:waiting for next federation...

INFO:root:ready to do federation. <step={'0'}> <clients=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61]> <algorithms=FedAMP(alpha=0.1, mul=50.0, sim=None)>

===Local training is completed===

Communication round: 1

===Starting model aggregation===

---------------------------------------------------------------------------

KeyboardInterrupt                         Traceback (most recent call last)

<ipython-input-8-881b59aab94f> in <module>()

     32     step = list(server.backend.wait_next())[-1]

     33     ctx = server.backend.get_context()

---> 34     server.backend.aggregate_data(step, ctx.names)

     35

     36     print("===Model aggregation is completed===")

 

moxing/framework/federated/fed_backend.pyx in moxing.framework.federated.fed_backend.FederatedBackendOBS.aggregate_data()

 

KeyboardInterrupt:

第一轮通讯后卡死了,怀疑是租户初始化数量与训练数量不一致导致的

2. 重新初始化,租户数量为3,并进行个性化训练

num_clients = 10
dfl = DataFileLoaderHorizontal()
train_sample_filename = 'FedAMP/EMNIST/client_train_samples_'
train_label_filename = 'FedAMP/EMNIST/client_train_labels_'
val_sample_filename = 'FedAMP/EMNIST/client_test_samples_'
val_label_filename = 'FedAMP/EMNIST/client_test_labels_'
filename_sx = '.npy'
input_dim = (-1, 1, 28, 28)
CUDA_DEVICE = 0
client_data = []
 
for i in range(num_clients):
    m_data = m_Data()
    # Load training data
    dfl.load_file_binary(data_filename=train_sample_filename + str(i) + filename_sx,
                         label_filename=train_label_filename + str(i) + filename_sx)
    samples, labels = dfl.getDataToTorch()
    m_data.train_samples = samples.reshape(input_dim)
    m_data.train_labels = labels.squeeze()
       
    # Load validation data
    dfl.load_file_binary(data_filename=val_sample_filename + str(i) + filename_sx,
                         label_filename=val_label_filename + str(i) + filename_sx)
    samples, labels = dfl.getDataToTorch()
    m_data.val_samples = samples.reshape(input_dim)
    m_data.val_labels = labels.squeeze()
       
    client_data.append(m_data)

os.environ['MOX_FEDERATED_SIZE'] = str(num_clients)
os.environ['MOX_FEDERATED_BACKEND'] = 'obs'
os.environ['MOX_FEDERATED_URL'] = '/tmp/fed_workspace/'
 

stepsize = 0.001 #local training stepsize
 
alg_config = fed_algorithm.FedAMPConfig(alpha=0.1, mul=50.0, display=True) #FedAMP parameter setting
client_models = []
client_feds = []
optimizer_list = []
base_model = CNN()
 
for i in range(num_clients):
    os.environ['MOX_FEDERATED_RANK'] = str(i)
 
    backend_config = fed_backend.FederatedBackendOBSConfig(
        load_fn=torch_load,
        save_fn=torch_save,
        suffix='pth')
 
    # create client
    fed_alg = fed_algorithm.build_algorithm(alg_config)
    backend = fed_backend.build_backend(
        config=backend_config,
        fed_alg=fed_alg)
    fed_client = client.FederatedHorizontalPTClient(backend=backend)
    fed_client.connect()
 
    client_feds.append(fed_client)
    model = CNN()
       
    optimizer = torch.optim.Adam(model.parameters(), lr=stepsize)
    model.load_state_dict(base_model.state_dict())
    client_models.append((model, optimizer))
    optimizer_list.append(optimizer)
   
# create server
fed_alg = fed_algorithm.build_algorithm(alg_config)
fed_be_config = fed_backend.FederatedBackendOBSConfig(
  load_fn=torch_load, save_fn=torch_save, suffix='pth', hooks=[TorchFedHook()])
fed_be = fed_backend.build_backend(fed_be_config, fed_alg)
server = fed_server.FederatedServerHorizontal(fed_be)
server.backend.wait_ready()
 
batch_size = 100
num_epochs = 5
communication = 20
val_acc_cloud = []
val_acc_client = []
 
for rounds in range(communication):
    val_acc_total = 0
    # Train model locally
   
    print("===Starting local training===")
   
    for i in range(num_clients):
        print("Training client " + str(i), end='\r')
        model, optimizer = client_models[i]
        local_client = client_feds[i]
        data = client_data[i]
        acc = train_local_clients(model, optimizer, data, num_epochs)  
        val_acc_total += acc
        local_client.send_model(parameters=optimizer.param_groups[0]['params'])  
   
    # server aggregate 
    val_acc_avg = val_acc_total / num_clients *100
    val_acc_client.append(val_acc_avg)
   
    clear_output(wait=True)
    print("\n===Local training is completed===")
    print("Communication round: {0:1d}".format(rounds+1))
    print("===Starting model aggregation===")
   
    step = list(server.backend.wait_next())[-1]
    ctx = server.backend.get_context()
    server.backend.aggregate_data(step, ctx.names)
   
    print("===Model aggregation is completed===")
   
    print("Local model average validation test accuracy: {0:5.2f}%".format(val_acc_avg))
   
    # Update local model
    val_acc_total = 0
    with torch.no_grad():
        for i in range(num_clients):
            val_samples =   client_data[i].val_samples
            val_labels = client_data[i].val_labels
            local_client = client_feds[i]
            model, optimizer = client_models[i]
            new_parameters = local_client.get_model()
            local_client.clean_previous()
            local_client.next_step()
            parameters = optimizer.param_groups[0]['params']
            for p, new_p in zip(parameters, new_parameters):
                p.copy_(new_p)
            acc = evaluation(model, val_samples, val_labels)
            val_acc_total += acc
    val_acc_avg = val_acc_total / num_clients *100
    val_acc_cloud.append(val_acc_avg)
    print("Personalized cloud model average validation accuracy: {0:5.2f}%".format(val_acc_avg))     
       
print("====Training Completed====")

INFO:root:waiting for next federation...

INFO:root:ready to do federation. <step={'19'}> <clients=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]> <algorithms=FedAMP(alpha=0.1, mul=50.0, sim=None)>

===Local training is completed===

Communication round: 20

===Starting model aggregation===

===Model aggregation is completed===

Local model average validation test accuracy: 87.50%

Personalized cloud model average validation accuracy: 88.55%

====Training Completed====

plt.figure(figsize=(8, 6))
x = list(range(communication))
plt.xticks(x[0:-1:2])
acc_client, = plt.plot(x, val_acc_client, color="red")
acc_cloud, = plt.plot(x, val_acc_cloud, color="blue")
plt.legend([acc_client, acc_cloud], ['Client Acc', 'Cloud Acc'], fontsize=20)
plt.xlabel("Communication round", fontsize=18)
plt.ylabel("Mean validation accuracy", fontsize=18)
plt.title("Mean validation accuracy vs. Communication round", fontsize=18)
plt.show()

可以看到,当租户数量减少到3时,联邦学习的收敛性较差。

并且,当再次进行个性化学习时,可以看到准确率曲线变化较大,训练结果的稳定性也较差,是否收敛也值得考虑。为了验证模型的收敛性,我们将通讯次数提高到50

可以看到,即使进行50轮通讯,收敛性仍然不是很好。我们不禁想问,当前租户样本量下,多少租户能够表现出较好的收敛性?下面我们将租户数提高到10,做一次尝试。

3. 10个租户的个性化学习

从图形看,当租户上升到10个时,收敛性明显上升。

4. 尝试将服务端与租户端分开

从对联邦学习的理解来看,服务端与租户端应该是可以分开运行的,我们对此进行一些尝试。

4.1. 服务端程序

import os
num_clients = 62
os.environ['MOX_FEDERATED_SIZE'] = str(num_clients)
os.environ['MOX_FEDERATED_BACKEND'] = 'obs'
os.environ['MOX_FEDERATED_URL'] = '/tmp/fed_workspace/'
from moxing.framework.federated import fed_algorithm
from moxing.framework.federated import fed_backend
from moxing.framework.federated import fed_server
from moxing.pytorch.executor.federated.util import TorchFedHook
from moxing.pytorch.executor.federated.util import torch_save
from moxing.pytorch.executor.federated.util import torch_load
alg_config = fed_algorithm.FedAMPConfig(alpha=0.1, mul=50.0, display=True) #FedAMP parameter setting
# create server
fed_alg = fed_algorithm.build_algorithm(alg_config)
fed_be_config = fed_backend.FederatedBackendOBSConfig(
  load_fn=torch_load, save_fn=torch_save, suffix='pth', hooks=[TorchFedHook()])
fed_be = fed_backend.build_backend(fed_be_config, fed_alg)
server = fed_server.FederatedServerHorizontal(fed_be)
server.backend.wait_ready()

INFO:root:Using MoXing-v1.17.3-

INFO:root:Using OBS-Python-SDK-3.20.7

INFO:root:waiting for all clients ready...

服务端启动成功了,但大家可以看到,启动服务端的时候已经给定了租户数量这个参数,这可能也是导致我们1.里面修改租户数量后,个性化学习失败的原因。

4.2. 租户端程序

from IPython.display import clear_output
import torch
import numpy as np
import random
import moxing as mox
from moxing.framework.federated import fed_algorithm
from moxing.framework.federated import fed_backend
from moxing.pytorch.executor.federated.util import torch_load
from moxing.pytorch.executor.federated.util import torch_save
from moxing.pytorch.executor.federated import client
import matplotlib.pyplot as plt
%matplotlib inline
from moxing.framework.federated.fed_algorithm import FedAMP
import torch.nn.functional as F
 
if mox.file.is_directory('/tmp/fed_workspace/'):
    mox.file.remove('/tmp/fed_workspace/', recursive=True)
   
class DataFileLoaderHorizontal():
    def __init__(self, data=None, label=None):
        if data is None:
            self.data = data
        if label is None:
            self.label = label
           
    def getDataToTorch(self):
        return torch.FloatTensor(self.data), torch.FloatTensor(self.label)
 
 
    def load_file_binary(self, data_filename=None, label_filename=None):
        assert data_filename is not None
        assert label_filename is not None
        self.data = np.load(data_filename, allow_pickle=True)
        self.label = np.load(label_filename, allow_pickle=True)
        self.data, self.label = self.data.astype(float), self.label.astype(float)
 
 
class m_Data():
    def __init__(self):
        self.train_samples          = None
        self.train_labels           = None
        self.test_samples           = None
        self.train_samples          = None
      
num_clients = 62
dfl = DataFileLoaderHorizontal()
train_sample_filename = 'FedAMP/EMNIST/client_train_samples_'
train_label_filename = 'FedAMP/EMNIST/client_train_labels_'
val_sample_filename = 'FedAMP/EMNIST/client_test_samples_'
val_label_filename = 'FedAMP/EMNIST/client_test_labels_'
filename_sx = '.npy'
input_dim = (-1, 1, 28, 28)
CUDA_DEVICE = 0
client_data = []
 
for i in range(num_clients):
    m_data = m_Data()
    # Load training data
    dfl.load_file_binary(data_filename=train_sample_filename + str(i) + filename_sx,
                         label_filename=train_label_filename + str(i) + filename_sx)
    samples, labels = dfl.getDataToTorch()
    m_data.train_samples = samples.reshape(input_dim)
    m_data.train_labels = labels.squeeze()
       
    # Load validation data
    dfl.load_file_binary(data_filename=val_sample_filename + str(i) + filename_sx,
                         label_filename=val_label_filename + str(i) + filename_sx)
    samples, labels = dfl.getDataToTorch()
    m_data.val_samples = samples.reshape(input_dim)
    m_data.val_labels = labels.squeeze()
       
    client_data.append(m_data)
   
class CNN(torch.nn.Module):
    def __init__(self, n_channel0=1, n_channel1=32, n_channel2=64,
                 kernel_size=5, stride=1, padding=2,
                 n_fc_input=3136, n_fc1=512, n_output=62):
        super(CNN, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels=n_channel0, out_channels=n_channel1,
                               kernel_size=kernel_size, stride=stride, padding=padding)
        self.conv2 = torch.nn.Conv2d(in_channels=n_channel1, out_channels=n_channel2,
                               kernel_size=kernel_size, stride=stride, padding=padding)
        self.fc1 = torch.nn.Linear(n_fc_input, n_fc1)
        self.fc2 = torch.nn.Linear(n_fc1, n_output)
 
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
   
 


stepsize = 0.001 #local training stepsize

alg_config = fed_algorithm.FedAMPConfig(alpha=0.1, mul=50.0, display=True) #FedAMP parameter setting
client_models = []
client_feds = []
optimizer_list = []
base_model = CNN()
 
for i in range(num_clients):
    os.environ['MOX_FEDERATED_RANK'] = str(i)
 
    backend_config = fed_backend.FederatedBackendOBSConfig(
        load_fn=torch_load,
        save_fn=torch_save,
        suffix='pth')
 
    # create client
    fed_alg = fed_algorithm.build_algorithm(alg_config)
    backend = fed_backend.build_backend(
        config=backend_config,
        fed_alg=fed_alg)
    fed_client = client.FederatedHorizontalPTClient(backend=backend)
    fed_client.connect()
 
    client_feds.append(fed_client)
    model = CNN()
       
    optimizer = torch.optim.Adam(model.parameters(), lr=stepsize)
    model.load_state_dict(base_model.state_dict())
    client_models.append((model, optimizer))
    optimizer_list.append(optimizer)
   
   
 
def evaluation(model, test_samples, test_labels):
    with torch.no_grad():
        outputs = model(test_samples)
        _, preds = outputs.max(1)
        test_acc = preds.eq(test_labels).sum() / float(len(test_labels))
    return test_acc
 
def train_local_clients(model, optimizer, data, num_epochs):
    train_samples =  data.train_samples
    train_labels = data.train_labels
    val_samples = data.val_samples
    val_labels = data.val_labels
    total_num_samples = len(train_samples)
    criterion = torch.nn.CrossEntropyLoss()
    for x in range(num_epochs):
        seq = np.arange(total_num_samples)
        random.shuffle(seq)
        for begin_batch_index in range(0, total_num_samples, batch_size):
            batch_ids = seq[begin_batch_index: min(begin_batch_index + batch_size, total_num_samples)]
            inputs = train_samples[batch_ids]
            labels = train_labels[batch_ids]
            optimizer.zero_grad()
            outputs = model(inputs)   
            loss = criterion(outputs, labels.type(torch.long))
            loss.backward()
            optimizer.step()      
    test_acc = evaluation(model, val_samples, val_labels)
    return test_acc
 

batch_size = 100
num_epochs = 5
communication = 20
val_acc_cloud = []
val_acc_client = []
 
for rounds in range(communication):
    val_acc_total = 0
    # Train model locally
   
    print("===Starting local training===")
   
    for i in range(num_clients):
        print("Training client " + str(i), end='\r')
        model, optimizer = client_models[i]
        local_client = client_feds[i]
        data = client_data[i]
        acc = train_local_clients(model, optimizer, data, num_epochs)  
        val_acc_total += acc
        local_client.send_model(parameters=optimizer.param_groups[0]['params'])  
   
    # server aggregate 
    val_acc_avg = val_acc_total / num_clients *100
    val_acc_client.append(val_acc_avg)
   
    clear_output(wait=True)
    print("\n===Local training is completed===")
    print("Communication round: {0:1d}".format(rounds+1))
    print("===Starting model aggregation===")
   
    step = list(server.backend.wait_next())[-1]
    ctx = server.backend.get_context()
    server.backend.aggregate_data(step, ctx.names)
   
    print("===Model aggregation is completed===")
   
    print("Local model average validation test accuracy: {0:5.2f}%".format(val_acc_avg))
   
    # Update local model
    val_acc_total = 0
    with torch.no_grad():
        for i in range(num_clients):
            val_samples =   client_data[i].val_samples
            val_labels = client_data[i].val_labels
            local_client = client_feds[i]
            model, optimizer = client_models[i]
            new_parameters = local_client.get_model()
            local_client.clean_previous()
            local_client.next_step()
            parameters = optimizer.param_groups[0]['params']
            for p, new_p in zip(parameters, new_parameters):
                p.copy_(new_p)
            acc = evaluation(model, val_samples, val_labels)
            val_acc_total += acc
    val_acc_avg = val_acc_total / num_clients *100
    val_acc_cloud.append(val_acc_avg)
    print("Personalized cloud model average validation accuracy: {0:5.2f}%".format(val_acc_avg))     
       
print("====Training Completed====")
 

plt.figure(figsize=(8, 6))
x = list(range(communication))
plt.xticks(x[0:-1:2])
acc_client, = plt.plot(x, val_acc_client, color="red")
acc_cloud, = plt.plot(x, val_acc_cloud, color="blue")
plt.legend([acc_client, acc_cloud], ['Client Acc', 'Cloud Acc'], fontsize=20)
plt.xlabel("Communication round", fontsize=18)
plt.ylabel("Mean validation accuracy", fontsize=18)
plt.title("Mean validation accuracy vs. Communication round", fontsize=18)
plt.show()

INFO:root:waiting for next federation...

INFO:root:ready to do federation. <step={'19'}> <clients=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61]> <algorithms=FedAMP(alpha=0.1, mul=50.0, sim=None)>

===Local training is completed===

Communication round: 20

===Starting model aggregation===

===Model aggregation is completed===

Local model average validation test accuracy: 76.60%

Personalized cloud model average validation accuracy: 81.45%

====Training Completed====

从准确率及图形都可以看到跟上一期的训练结果基本一致,说明租户数量达到62后,模型具有较好的稳定性。但是,目前的租户端程度中需要调用服务端的server,且服务端需要提前定义租户数量,无法跨平台进行联邦学习。未来,需要将服务端和租户端完全独立,不同租户独立与服务端进行通讯,才能让联邦学习模型具有更好的使用价值。

 

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。