基于华为鲲鹏云服务器的分布式并行程序设计实践

举报
wxwxwxw 发表于 2023/11/14 23:48:50 2023/11/14
【摘要】 分布式系统是指在多台计算机上执行的程序并通过网络进行通信和协调的系统,它的主要目标是提高性能、可靠性以及扩展性。在云计算时代,分布式系统变得尤为重要,因为它允许资源在不同的物理位置进行分布。 并行计算是一种通过同时执行多个计算任务来提高计算速度的方法。它可以在单个计算机上实现(称为并行计算)或者在多台计算机上实现(称为分布式并行计算)。

基于华为鲲鹏云服务器的分布式并行程序设计实践

1 背景介绍

1-1 分布式系统和并行计算基础知识

  1. 分布式系统: 分布式系统是指在多台计算机上执行的程序并通过网络进行通信和协调的系统,它的主要目标是提高性能、可靠性以及扩展性。在云计算时代,分布式系统变得尤为重要,因为它允许资源在不同的物理位置进行分布。

  2. 并行计算: 并行计算是一种通过同时执行多个计算任务来提高计算速度的方法。它可以在单个计算机上实现(称为并行计算)或者在多台计算机上实现(称为分布式并行计算)。

1-2 现代分布式并行架构的发展

  1. 云计算: 云计算提供了按需访问计算资源的能力,它包括IaaS(基础设施即服务)、PaaS(平台即服务)和SaaS(软件即服务)。华为鲲鹏云服务器是云计算中的一种基础设施服务,为用户提供灵活的计算资源。

  2. 大数据和分布式存储: 随着数据量的不断增加,处理大规模数据集的需求变得迫切。分布式存储系统(如Hadoop分布式文件系统)和大数据处理框架(如Apache Spark)应运而生,使得处理大规模数据变得更加高效。

  3. 容器化和微服务架构: 容器技术(如Docker)和微服务架构允许将应用程序拆分成小的、独立的服务单元,这有助于提高可维护性和扩展性。

1-3 MPI编程方法

  1. MPI概述: MPI是一种消息传递接口,用于在分布式内存系统中实现并行计算;它定义了一组标准的通信协议,允许多个进程在不同的节点上进行通信和交换数据。

  2. MPI的基本概念:

    • 进程组:MPI程序由一组并行工作的进程组成,每个进程都有唯一的标识符(rank)。
    • 通信操作:MPI提供了一系列的通信操作,包括点对点通信和集体通信,以实现进程之间的信息交流。
    • 数据类型:MPI支持复杂的数据类型,允许用户定义自定义的数据结构。
  3. MPI实践: 在基于华为鲲鹏云服务器的实践中,主要考虑下列三个方面:

    • 利用云服务器的弹性伸缩性,根据负载自动调整计算资源。
    • 使用MPI库进行分布式并行计算,将任务分解为多个子任务,由不同的节点执行。
    • 利用云服务器提供的高速网络互连,实现节点间高效的消息传递。

2 实践内容

2-1 实验环境

  • 三台华为鲲鹏云弹性云服务器(ECS),采用鲲鹏计算计算CPU架构,kc1.larger.2架构和40G系统盘内存,弹性公网带宽为100Mbits/s
  • 使用Xshell (Windows)分别与三台机器建立ssh连接,并进行免密配置

2-2 实践任务

针对梯形积分法、“多个数组排序”的任务不均衡案例和高斯消去法解线性方程组这三个具体的问题,在三台华为云服务器上设计分布式并行程序,进行实验。

3 实验过程

3-1 MPI编程实现梯形积分法

3-1-1 实验源码

1.#include <mpi.h>  
2.#include <stdio.h>  
3.#include <math.h>  
4.#include <sys/time.h>  
5.  
6.//表示总面积  
7.double totalSize = 0.00;  
8.double gap = 0.00;  
9.// 任务总数   
10.int total_num = 1000;  
11.// 积分区间   
12.double begin_num = 2.000, end_num = 10.0000;  
13.  
14.// 用于记录时间   
15.struct timeval startTime, stopTime;  
16.  
17.int min(int i, int num) {  
18.    if (i < num) {  
19.        return i;  
20.    }  
21.    return num;  
22.}  
23.  
24.// 定义函数   
25.double f(double x) {  
26.    return x * x;  
27.}  
28.  
29.  
30.double calculate(int begin, int size) {  
31.    double temp_size = 0.00;  
32.    for (int i = begin; i < min(begin + size, total_num); ++i) {  
33.        temp_size += (f(begin_num + gap * i) + f(begin_num + gap * (i + 1))) * gap / 2;  
34.    }  
35.    return temp_size;  
36.}  
37.  
38.// 静态任务分配 (其中0号线程为主线程)   
39.int main(int argc, char *argv[]) {  
40.    double buf[10];  
41.    gap = (end_num - begin_num) / total_num;  
42.    int rank, thread_num;  
43.    gettimeofday(&startTime, NULL);  
44.    MPI_Init(NULL, NULL);  
45.    MPI_Comm_rank(MPI_COMM_WORLD, &rank);  
46.    MPI_Comm_size(MPI_COMM_WORLD, &thread_num);  
47.    int each_thread_size = total_num / (thread_num - 1) + 1;  
48.    if (rank != 0) {  
49.        buf[rank] = calculate((rank - 1) * each_thread_size, each_thread_size);  
50.        MPI_Send(buf + rank, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);  
51.    }  
52.    if (rank == 0) {  
53.        for (int i = 1; i < thread_num; ++i) {  
54.            double temp_size;  
55.            MPI_Recv(&temp_size, 1, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);  
56.            totalSize += temp_size;  
57.        }  
58.        gettimeofday(&stopTime, NULL);  
59.        double trans_mul_time =  
60.                (stopTime.tv_sec - startTime.tv_sec) * 1000 + (stopTime.tv_usec - startTime.tv_usec) * 0.001;  
61.        printf("The size is: %d\nThe result is: %lf\nThe total time is: %lfms",total_num,totalSize,trans_mul_time);  
62.    }  
63.    MPI_Finalize();  
64.    return MPI_SUCCESS;  
65.}  

3-1-2 对比分析

计算规模为1000
计算规模为2000
计算规模为5000

3-1-3 结果分析

由上述结果不难发现,随着计算规模不断增加,计算的精确度也在不断提高;但是由于我家的网络不太稳定,所以程序执行过程中在三台主机上进行免密登录等操作花费的时间较多,所以这三次执行结果在时间上的差异并不明显,但是可以对比着进行分析:问题规模为1000时,程序执行时间为2.2s左右,当问题规模提升至5000时,程序的执行时间只有2.7s左右,也即问题规模提升了4000,但是时间却仅仅增加了0.4s,从这可以看出MPI编程在性能方面还是有着较大优势的。

3-2 MPI编程实现多数组排序

3-2-1 实验源码

1.#include <mpi.h>  
2.#include <math.h>  
3.#include <sys/time.h>  
4.#include <vector>  
5.#include <iostream>  
6.#include <algorithm>  
7.  
8.using namespace std;  
9.  
10.//int arr_num[10] = {200,400,600,800,1000,1200,1400,1600,1800,2000};  
11.  
12.// 任务总数   
13.int ARR_NUM = 200;   
14.// 待排序数组的规模   
15.const int ARR_LEN = 2000;   
16.// 任务粒度大小   
17.int seg = 50;   
18.  
19.// 用于记录时间   
20.struct timeval startTime, stopTime;  
21.  
22.vector<vector<int> > arr(ARR_NUM, vector<int>(ARR_LEN));  
23.  
24.  
25.// 利用随机数初始化  
26.void init() {  
27.    for (int i = 0; i < ARR_NUM; i++) {  
28.        for (int j = 0; j < ARR_LEN; j++)  
29.            arr[i][j] = rand() % 100;  
30.    }  
31.}  
32.  
33.// 排序函数   
34.void Sort(int begin) {  
35.    for (int i = begin; i < min(begin + seg, ARR_NUM); ++i) {  
36.        sort(arr[i].begin(),  arr[i].end());  
37.    }  
38.}  
39.  
40.  
41.int main() {  
42.    init();  
43.    // 当前的任务  
44.    int current_task = 0;  
45.    MPI_Init(NULL, NULL);  
46.    int rank, thread_num;  
47.    // 获取当前线程的编号   
48.    MPI_Comm_rank(MPI_COMM_WORLD, &rank);  
49.    // 获取线程总数   
50.    MPI_Comm_size(MPI_COMM_WORLD, &thread_num);  
51.    MPI_Status status;  
52.    int ready;  
53.    bool done = false;  
54.    gettimeofday(&startTime, NULL);  
55.    // 主线程进行任务分配   
56.    if (rank == 0) {  
57.        while (current_task < ARR_NUM) {  
58.            //接受任何一个线程的状态  
59.            MPI_Recv(&ready, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);  
60.            MPI_Send(¤t_task, 1, MPI_INT, status.MPI_SOURCE, 0, MPI_COMM_WORLD);  
61.            current_task += seg;  
62.        }  
63.        gettimeofday(&stopTime, NULL);  
64.        double trans_mul_time =  
65.                (stopTime.tv_sec - startTime.tv_sec) * 1000 + (stopTime.tv_usec - startTime.tv_usec) * 0.001;  
66.        printf("The size is: %d X %d\nTotal time: %lfms.\n",ARR_NUM,ARR_LEN,trans_mul_time);  
67.        done = true;  
68.    } else {  
69.        while (!done) {  
70.            int begin;  
71.            MPI_Send(&ready, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);  
72.            MPI_Recv(&begin, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);  
73.            Sort(begin);  
74.        }  
75.    }  
76.    MPI_Finalize();  
77.    return MPI_SUCCESS;  
78.}  

3-2-2 对比分析

问题规模为200 x 2000

问题规模为500 x 2000

问题规模为1000 x 2000

3-2-3 结果分析

由上图不难发现,随着问题规模不断增大,任务执行时间的增长幅度渐渐变小,这说明MPI编程对于解决大规模问题有着较大的优势。

3-3 MPI编程实现高斯消去法

3-3-1 实验源码

1.#include <stdio.h>  
2.#include <mpi.h>  
3.#include <ctime>  
4.#include <sys/time.h>  
5.#include <algorithm>  
6.#include "sse2neon.h"
7.
8.  
9.  
10.//矩阵规模   
11.const int n = 1024;  
12.// 矩阵的最大值  
13.const int maxN = n + 1;   
14.  
15.float a[maxN][maxN];  
16.// 暂时存储矩阵的值,用于控制变量   
17.float temp[maxN][maxN];  
18.  
19.int next_task = 0;  
20.// 任务粒度大小   
21.int seg;  
22.//当前所依赖的行数  
23.int line = 0;  
24.  
25.// 用于记录时间   
26.struct timeval startTime, stopTime;  
27.  
28.// 普通算法   
29.void OMP_elimination(int i, int j)   
30.{  
31.    float temp = a[j][i] / a[i][i];  
32.    for (int k = i + 1; k <= n; ++k) {  
33.        a[j][k] -= a[i][k] * temp;  
34.    }  
35.    a[j][i] = 0.00;  
36.}  
37.  
38.// 与SSE编程结合  
39.void OMP_elimination_SSE(int i, int j)  
40.{  
41.    __m128 div, t1, t2, sub;  
42.    float temp = a[j][i] / a[i][i];  
43.    // div全部用于存储temp,方便后面计算  
44.    div = _mm_set1_ps(temp);  
45.              
46.    int k = n - 4;  
47.    //每4个一组进行计算,思想和串行类似  
48.    for(k; k>=i+1;k-=4)  
49.    {  
50.        t1 = _mm_loadu_ps(a[i] + k);  
51.        t2 = _mm_loadu_ps(a[j] + k);  
52.        sub = _mm_sub_ps(t2, _mm_mul_ps(t1, div));  
53.        _mm_store_ss(a[j] + k, sub);  
54.    }  
55.    //处理剩余部分  
56.    for (k += 3; k >= i + 1; --k) {   
57.        a[j][k] -= a[i][k] * temp;  
58.    }  
59.      
60.    a[j][i] = 0.00;  
61.}  
62.  
63.// 与AVX编程结合   
64.void OMP_elimination_AVX(int i, int j)  
65.{  
66.    __m128 div, t1, t2, sub;  
67.    float temp = a[j][i] / a[i][i];  
68.    // div全部用于存储temp,方便后面计算  
69.    div = _mm_set1_ps(temp);  
70.              
71.    int k = n - 8;  
72.    //每4个一组进行计算,思想和串行类似  
73.    for(k; k>=i+1;k-=4)  
74.    {  
75.        t1 = _mm_loadu_ps(a[i] + k);  
76.        t2 = _mm_loadu_ps(a[j] + k);  
77.        sub = _mm_sub_ps(t2, _mm_mul_ps(t1, div));  
78.        _mm_store_ss(a[j] + k, sub);  
79.    }  
80.    //处理剩余部分  
81.    for (k += 7; k >= i + 1; --k) {   
82.        a[j][k] -= a[i][k] * temp;  
83.    }  
84.      
85.    a[j][i] = 0.00;  
86.}  
87.  
88.//初始化矩阵   
89.void init() {  
90.    srand((unsigned) time(NULL));  
91.    for (int i = 0; i < n; i++) {  
92.        for (int j = 0; j <= n; j++) {  
93.            a[i][j] = (float) (rand() % 10000) / 100.00;  
94.        }  
95.    }  
96.}  
97.  
98.  
99.int main(int argc, char *argv[]) {  
100.    init();  
101.    int rank, thread_num;  
102.    gettimeofday(&startTime, NULL);  
103.    MPI_Init(&argc, &argv);  
104.    MPI_Comm_rank(MPI_COMM_WORLD, &rank);  
105.    MPI_Comm_size(MPI_COMM_WORLD, &thread_num);  
106.    MPI_Status status;  
107.    int ready;  
108.    bool done = false;  
109.    gettimeofday(&startTime, NULL);  
110.    if (rank == 0) {  
111.        printf("size : %d\n", n);  
112.        for (line = 0; line < n - 1; ++line) {  
113.            next_task = line + 1;  
114.            seg = (n - next_task) / (thread_num - 1) + 1;  
115.            for (int i = 1; i < thread_num; i++) {  
116.                int task = (i - 1) * seg + next_task;  
117.                MPI_Send(&task, 1, MPI_INT, i, 0, MPI_COMM_WORLD);  
118.            }  
119.            //等待所有线程  
120.            for (int j = 1; j < thread_num; ++j) {  
121.                MPI_Recv(&ready, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &status);  
122.            }  
123.        }  
124.        done = true;  
125.        gettimeofday(&stopTime, NULL);  
126.        double trans_mul_time =  
127.                (stopTime.tv_sec - startTime.tv_sec) * 1000 + (stopTime.tv_usec - startTime.tv_usec) * 0.001;  
128.        printf("The size is: %d\n!Total time: %lfms.\n", n,trans_mul_time);  
129.    } else {  
130.        while (!done) {  
131.            int task;  
132.            MPI_Recv(&task, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);  
133.            int min = task + seg < n ? task + seg : n;  
134.            for (int i = task; i < min; ++i) {  
135.                OMP_elimination(line, i);  
136.            }  
137.            MPI_Send(&ready, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);  
138.        }  
139.    }  
140.    MPI_Finalize();  
141.    return 0;  
142.}  

3-3-2 实验分析

由上述执行结果可知,SSE编程和AVX编程相较于普通的串行算法,在性能方面已经有了很大地提升;在本问题中,由于我设置的问题规模都比较小,因此SSE编程与AVX编程在性能上的差异并不能较明显地体现出来。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。