奇技淫巧:非对称 MPI + OpenMP 并行

MPI + OpenMP/multi-threading 是如今大部分高性能计算程序的并行方式。一般情况下,各 MPI 进程地位对等(对称),可用的 CPU 资源和并行出来的线程数也相同。今天讲一个奇技淫巧:如何进行非对称 MPI + OpenMP 并行。

最近干活的时候遇到一个情况:一个纯 MPI 程序中有一个 eigen solver 只在 rank 0 上计算,因为矩阵尺寸不大(<2000),用 ScaLAPACK 或者其他 MPI 库并行得不偿失,然而单线程求解又有点慢。因此甲方想在这一步的时候让 rank 0 用多线程来做 eigen solver, 其他 MPI 进程休眠等待 rank 0 完成计算。

乍一看这下,这个需求似乎很好实现,只要在 rank 0 上手动将线程数设大一点然后调 LAPACK 就行了。这就是我碰到的第一个钉子:有些高性能的 MPI 库默认做了自动绑核以提高性能(比如 Intel MPI 默认 I_MPI_PIN=1,MVAPICH2 默认 MV2_ENABLE_AFFINITY=1),每个 MPI 进程只能在指定的几个核心上执行。不关掉 MPI 的自动绑核,rank 0 就没办法用属于其他 MPI 进程的核心。但是关掉了自动绑核,其他部分的性能可能会严重下降。因此,我们首先需要手动进行绑核:rank 0 绑到所有核心上,其他进程只绑到一个物理核心上。

现在的 CPU 大部分有超线程,手动绑核要求不能把两个 MPI 进程绑到同一个物理核心上。Linux 上同一个物理核心的超线程核心编号一般都是不连续的,相差的值就是总的物理核心数;但有些时候同一个物理核心的超线程核心编号也可能是连续的。我不想引入别的库来解决这个问题,所以自己撸了一个简单粗暴的检测方法:读 /sys/devices/system/cpu/cpu0/topology/thread_siblings_list/sys/devices/system/cpu/cpu1/topology/thread_siblings_list 这两个文件,看一下 CPU0 和 CPU1 的第一个核心编号差了多少。有个物理核心编号的距离以后,我们就可以通过 CPU_SET_S()sched_setaffinity() 来手动绑核了。

最后一个小坑是 rank 0 以外的 MPI 进程的休眠。MPI_Barrier() 或者 MPI_Ibarrier() + MPI_Wait() 的组合都会阻塞 CPU,不断查询其他进程是否到达同步点。一个奇技淫巧是每隔一段时间用 MPI_Test() 代替 MPI_Wait() 来检测同步是否完成,如果没完成就用 usleep() 继续休眠一段时间。这样,非 rank 0 进程休眠的时候就不会阻碍 rank 0 使用属于它们的核心了。

下面是样例测试脚本:

1
2
3
4
5
6
7
8
9
10
11
# Compile the program
mpiicc -Wall -g -O3 -xHost -mkl -qopenmp -std=c99 -o test_dsyev_omp test_dsyev_omp.c
# Disable Intel MPI auto pinning
export I_MPI_PIN=0
# Test the program with different available threads on rank 0
for i in 1 2 4 8; do
export NTHREADS=$i
mpirun -np 64 ./test_dsyev_omp
done

下面是样例测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#define _XOPEN_SOURCE 500 // For srand48(), drand48(), usleep()
// For sched_setaffinity
#define _GNU_SOURCE
#include <sched.h>
#include <unistd.h> // Also for usleep()
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <omp.h>
#include <mkl.h>
#include <mpi.h>
void test_dgemm(const int rank, const int n)
{
mkl_set_dynamic(0);
mkl_set_num_threads(1);
size_t mat_msize = sizeof(double) * n * n;
double *A = (double*) malloc(mat_msize);
double *B = (double*) malloc(mat_msize);
double *C = (double*) malloc(mat_msize);
for (int i = 0; i < n * n; i++)
{
A[i] = drand48();
B[i] = drand48();
}
cblas_dgemm(
CblasRowMajor, CblasNoTrans, CblasNoTrans, n, n, n,
1.0, A, n, B, n, 0.0, C, n
);
MPI_Barrier(MPI_COMM_WORLD);
double st = omp_get_wtime();
for (int i = 0; i < 5; i++)
{
cblas_dgemm(
CblasRowMajor, CblasNoTrans, CblasNoTrans, n, n, n,
1.0, A, n, B, n, 0.0, C, n
);
}
double ut = (omp_get_wtime() - st) * 1000.0 / 5.0;
printf("Rank %2d single thread %d*%d*%d dgemm used %.3lf ms\n", rank, n, n, n, ut);
mkl_set_dynamic(1);
}
void test_dsyev(const int rank, const int n)
{
int my_nthreads = 1;
char *env_ntheads = getenv("NTHREADS");
if (env_ntheads != NULL) my_nthreads = atoi(env_ntheads);
if (my_nthreads < 1) my_nthreads = 1;
int save = mkl_get_max_threads();
mkl_set_dynamic(0);
mkl_set_num_threads(my_nthreads);
size_t mat_msize = sizeof(double) * n * n;
double *A = (double*) malloc(mat_msize);
double *B = (double*) malloc(mat_msize);
double *A0 = (double*) malloc(mat_msize);
for (int i = 0; i < n; i++)
{
for (int j = 0; j < i; j++)
{
double val = drand48();
A[i * n + j] = val;
A[j * n + i] = val;
}
}
memcpy(A, A0, mat_msize);
LAPACKE_dsyev(LAPACK_ROW_MAJOR, 'V', 'U', n, A, n, B);
for (int i = 0; i < 5; i++)
{
memcpy(A, A0, mat_msize);
double st = omp_get_wtime();
LAPACKE_dsyev(LAPACK_ROW_MAJOR, 'V', 'U', n, A, n, B);
double ut = (omp_get_wtime() - st) * 1000.0;
printf("Rank %d LAPACKE_dsyev use %.3lf ms\n", rank, ut);
}
printf("\n");
free(A);
free(B);
free(A0);
mkl_set_dynamic(1);
mkl_set_num_threads(1);
}
int get_phys_core_id_dist()
{
int core0_id, core1_id;
FILE *inf0 = fopen("/sys/devices/system/cpu/cpu0/topology/thread_siblings_list", "r");
FILE *inf1 = fopen("/sys/devices/system/cpu/cpu1/topology/thread_siblings_list", "r");
fscanf(inf0, "%d,", &core0_id);
fscanf(inf1, "%d,", &core1_id);
fclose(inf0);
fclose(inf1);
return (core1_id - core0_id);
}
// This function is based on Kent Milfeld's <[email protected]> code
int set_core_affinity(const int ncore, const int ntarget, const int *cores)
{
cpu_set_t *mask = CPU_ALLOC(ncore);
size_t size = CPU_ALLOC_SIZE(ncore);
CPU_ZERO_S(size, mask);
if (ntarget < 0)
{
// Allow to run on all cores
for (int core_id = 0; core_id < ncore; core_id++)
CPU_SET_S(core_id, size, mask);
} else {
for (int i = 0; i < ntarget; i++)
{
int core_id = cores[i];
CPU_SET_S(core_id, size, mask);
}
}
return sched_setaffinity((pid_t) 0, size, mask);
}
void bind_rank0_to_all_cores(const int nproc, const int rank)
{
int phys_core_id_dist = get_phys_core_id_dist();
if (rank == 0)
{
int *cores = (int*) malloc(sizeof(int) * nproc);
for (int i = 0; i < nproc; i++)
cores[i] = i * phys_core_id_dist;
set_core_affinity(nproc, nproc, cores);
free(cores);
} else {
int core_id = rank * phys_core_id_dist;
set_core_affinity(nproc, 1, &core_id);
}
}
void MPI_Wait_nonblocking(MPI_Request *req, const int microseconds)
{
int flag;
MPI_Status status;
MPI_Test(req, &flag, &status);
while (!flag)
{
usleep(microseconds);
MPI_Test(req, &flag, &status);
}
}
int main(int argc, char **argv)
{
MPI_Init(&argc, &argv);
int rank, nproc;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
MPI_Barrier(MPI_COMM_WORLD);
int n = 1000;
if (argc >= 2) n = atoi(argv[1]);
if (rank == 0) printf("Matrix size = %d\n", n);
srand48(114514 + rank);
bind_rank0_to_all_cores(nproc, rank);
test_dgemm(rank, 1000);
MPI_Barrier(MPI_COMM_WORLD);
if (rank == 0) test_dsyev(rank, n);
MPI_Request req;
MPI_Ibarrier(MPI_COMM_WORLD, &req);
MPI_Wait_nonblocking(&req, 10000);
MPI_Finalize();
}

顺带吐槽一句,LAPACKE_dsyev() 的并行扩展性真的不行……