#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <malloc.h>
#define NON_DATA -2
#define KILL -1
// Количество задач в каждом списке
#define NUMBER_OF_TASKS 5000
//#define DEBUG
int rank, size;
int prinimau = 0;
// 2 объекта типа "описатель потока"
pthread_t thrs[2];
// Атрибуты потока
pthread_attr_t attrs;
// Мьютекс
pthread_mutex_t mutex;
// Счетчик для выполнения заданий
double global_res = 0;
class CTaskList
{
private:
struct Task
{
int repeat_num;
Task *next;
} *m_head_list;
int count_task;
public:
CTaskList();
~CTaskList();
void AddTask(int);
int DelTask();
int GetCountTask();
};
// Создание списка заданий
CTaskList::CTaskList()
{
// Заголовок списка
m_head_list = NULL;
// Количество задач в списке
count_task = 0;
}
// Уничтожение списка
CTaskList::~CTaskList()
{
while(count_task > 0)
DelTask();
}
// Добавление задания
void CTaskList::AddTask(int x)
{
Task *p;
if (!m_head_list)
{ // Создаем первый элемент в списке
m_head_list = new Task;
p = m_head_list;
p->repeat_num = x;
p->next = NULL;
}
else
{
p = m_head_list;
m_head_list = new Task;
m_head_list->next = p;
m_head_list->repeat_num = x;
}
count_task++;
}
// Удаление задания, возвращает удаленное задание
int CTaskList::DelTask()
{
int x = m_head_list->repeat_num;
if(m_head_list->next == NULL)
{// одна задача в списке
delete m_head_list;
m_head_list = NULL;
}
else
{
Task *p;
p = m_head_list->next;
delete m_head_list;
m_head_list = p;
}
count_task--;
return x;
}
// Получение количества задач в списке
int CTaskList::GetCountTask()
{
return count_task;
}
CTaskList task_list;
// Функция обработчика и закачевальщика заданий
void* worker(void* me)
{
int task, i;
MPI_Status st;
// Счетчики для теста балансировки
int count_work = 0;
int work_sum = 0;
double time_work = 0;
double time_wait = 0;
double timeStart, timeFin;
double t1, t2;
MPI_Request *reqs1 = new MPI_Request[size], *reqs2 = new MPI_Request[size];
int *tasks_temp = new int[size];
#ifdef DEBUG
printf(">>>Поток 0 процесса %d\tСТАРТОВАЛ\n", rank);
#endif
timeStart = MPI_Wtime();
t2 = timeStart;
for(;;)
{
// Захват мьютекса
pthread_mutex_lock(&mutex);
while (task_list.GetCountTask() > 0)
{
task = task_list.DelTask(); // Взяли задание
#ifdef DEBUG
printf("Процесс %d\tвыполняет задание %d,\tосталось %d заданий\n", rank, task, task_list.GetCountTask());
#endif
// Освобождение
pthread_mutex_unlock(&mutex);
count_work++;
work_sum += task;
t1 = MPI_Wtime();
time_wait += t1 - t2;
for(i = 0; i < task; i++)// выполняем задачу
global_res += sqrt(double(i));
t2 = MPI_Wtime();
time_work += t2 - t1;
// Захват мьютекса
pthread_mutex_lock(&mutex);
}
#ifdef DEBUG
printf("!!!Процесс %d:\tосталось %d заданий\n", rank, task_list.GetCountTask());
#endif
// Освобождение
pthread_mutex_unlock(&mutex);
// Просим задание у других процессов
for(i = size-1; i >= 0; i--)
if(i!=rank)// Свои задания уже выполнили
{
#ifdef DEBUG
printf("Процесс %d\tПРОСИТ ЗАДАНИЕ у процесса %d\n", rank, i);
#endif
MPI_Isend(&rank, 1, MPI_INT, i, 17, MPI_COMM_WORLD, &reqs1[i]);
MPI_Irecv(&tasks_temp[i], 1, MPI_INT, i, 18, MPI_COMM_WORLD, &reqs2[i]);
}
pthread_mutex_lock(&mutex);
prinimau = 1;
pthread_mutex_unlock(&mutex);
// Принимаем задания
for(i = size-1; i >= 0; i--)
if(i!=rank)
{
MPI_Wait(&reqs2[i], &st);
#ifdef DEBUG
printf("Процесс %d\tПОЛУЧИЛ ОТ %d\tзадание %d\n", rank, i, tasks_temp[i]);
#endif
if(tasks_temp[i] != NON_DATA)
{
pthread_mutex_lock(&mutex);
task_list.AddTask(tasks_temp[i]);
pthread_mutex_unlock(&mutex);
}
}
pthread_mutex_lock(&mutex);
prinimau = 0;
if(task_list.GetCountTask() > 0)
{
#ifdef DEBUG
printf("!!!Процесс %d\t получил еще %d заданий\n", rank, task_list.GetCountTask());
#endif
pthread_mutex_unlock(&mutex);
}
else
{
pthread_mutex_unlock(&mutex);
// Нет заданий - завершаем работу потока
break;
}
}
task = KILL;
for (i = 0; i < size; i++)
if(i != rank)
MPI_Send(&task, 1, MPI_INT, i, 17, MPI_COMM_WORLD);
#ifdef DEBUG
printf(">>>Процесс %d\tвыполнил %d задания и ЗАВЕРШИЛ РАБОТУ\n\n", rank, count_work);
#endif
timeFin = MPI_Wtime();
time_wait += MPI_Wtime() - t2;
//printf("Процесс %d\tTime = %lf\tcount_work = %d\twork_sum = %d\ttime_work = %lf\ttime_wait = %lf\t (delta = %lf)\n", rank, timeFin - timeStart, count_work, work_sum, time_work, time_wait, (timeFin - timeStart) - time_work - time_wait);
printf("%d\n%lf\n%lf\n%lf\n%d\n%d\n", rank, timeFin - timeStart, time_work, time_wait, count_work, work_sum);
return NULL;
}
// Функция потока-слушателя
void* lister(void* me)
{
MPI_Status st;
int count_of_workers = size - 1, task, num;
#ifdef DEBUG
printf(">>>Слушатель процесса %d\tСТАРТОВАЛ\n", rank);
#endif
while (count_of_workers > 0)
{
#ifdef DEBUG
printf("Слушатель процесса %d\tОЖИДАЕТ...\n", rank);
#endif
// Принимаем запрос от потока обработчика
MPI_Recv(&num, 1, MPI_INT, MPI_ANY_SOURCE, 17, MPI_COMM_WORLD, &st);
if (num != KILL)
{
// Запрос на получение задачи
#ifdef DEBUG
printf("Слушатель процесса %d\tПОЛУЧИЛ ЗАПРОС ЗАДАНИЯ ОТ %d\n", rank, num);
#endif
pthread_mutex_lock(&mutex);
if(task_list.GetCountTask() > 0 && prinimau == 0)
{
task = task_list.DelTask();
pthread_mutex_unlock(&mutex);
MPI_Send(&task, 1, MPI_INT, num, 18, MPI_COMM_WORLD);
#ifdef DEBUG
printf("Слушатель процесса %d\tОТПРАВИЛ ЗАДАНИЕ %d\tПРОЦЕССУ %d\n", rank, task, num);
#endif
// Задание отправлено
}
else
{
pthread_mutex_unlock(&mutex);
task = NON_DATA;
MPI_Send(&task, 1, MPI_INT, num, 18, MPI_COMM_WORLD);
#ifdef DEBUG
printf("Слушатель процесса %d\tСООБЩИЛ процессу %d\t 'НЕТ ЗАДАНИЙ!'\n", rank, num);
#endif
// Вместо задачи отплавлено сообщение "нет заданий"
}
}
else
{
// Некоторый поток-обработчик завершил работу
count_of_workers--;
#ifdef DEBUG
printf("Слушатель процесса %d\tОПОВЕЩЕН о гибели процесса %d\n", rank, st.MPI_SOURCE);
#endif
}
}
#ifdef DEBUG
printf(">>>Слушатель процесса %d\tЗАВЕРШИЛ РАБОТУ\n", rank);
#endif
MPI_Barrier( MPI_COMM_WORLD);
return NULL;
}
int main(int argc, char **argv)
{
double timeStart, timeFin;
int i;
int provide;
// Инициализация параллельной части приложения
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provide);
// Определение номера процесса в группе
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
// Определение общего числа параллельных процессов
MPI_Comm_size(MPI_COMM_WORLD, &size);
if(rank == 0)
{
#ifdef DEBUG
printf("\n\n\tMPI_THREAD_MULTIPLE = %d\n", MPI_THREAD_MULTIPLE);
printf("\tMPI_THREAD_SERIALIZED = %d\n", MPI_THREAD_SERIALIZED);
printf("\tMPI_THREAD_FUNNELED = %d\n", MPI_THREAD_FUNNELED);
printf("\tMPI_THREAD_SINGLE = %d\n", MPI_THREAD_SINGLE);
printf("\tprovide = %d\n\n\n",provide);
#endif
}
if(provide < MPI_THREAD_MULTIPLE)
{
if(rank == 0)
perror("LOW PROVIDE!!\n");
abort();
}
// Инициализация атрибутов потока
if(0 != pthread_attr_init(&attrs))
{
perror("Cannot initialize attributes");
abort();
}
// Установка атрибута "присоединенный"
if(0 != pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_JOINABLE))
{
perror("Error in setting attributes");
abort();
}
// Инициализация mutex
if(0 != pthread_mutex_init(&mutex, NULL))
{
perror("Cannot initialize mutex");
abort();
}
// Инициализация списков задач для каждого процесса
for (i = 0; i < NUMBER_OF_TASKS; i++)
task_list.AddTask(rank*33333);
MPI_Barrier( MPI_COMM_WORLD);
if(rank==0)
timeStart = MPI_Wtime();
// Порождение 2 потоков
if(0 != pthread_create(&thrs[0], &attrs, worker, NULL))
{
perror("Cannot create a thread for work");
abort();
}
if(0 != pthread_create(&thrs[1], &attrs, lister, NULL))
{
perror("Cannot create a thread for listing");
abort();
}
for(i = 0; i < 2; i++)
if(0 != pthread_join(thrs[i], NULL))
{
perror("Cannot join a thread\n");
abort();
}
MPI_Barrier(MPI_COMM_WORLD);
if(rank==0)
{
timeFin = MPI_Wtime();
printf("Time: %lf\n", timeFin - timeStart);
}
// Освобождение ресурсов
pthread_mutex_destroy(&mutex);
pthread_attr_destroy(&attrs);
MPI_Finalize();
return 0;
}