2014 dxdy logo

Научный форум dxdy

Математика, Физика, Computer Science, Machine Learning, LaTeX, Механика и Техника, Химия,
Биология и Медицина, Экономика и Финансовая Математика, Гуманитарные науки




На страницу Пред.  1 ... 10, 11, 12, 13, 14
 
 Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 06:11 
Я тоже так понимаю, операция выделения памяти потокоопасна в принципе. Но обычно, когда пытаются ускорить программу, выделяют память в самом начале.

 i  GAA:
Замена индексации арифметикой указателей выделена в ветку «Арифметика указателей vs индексация»

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 06:17 
Andrey_Kireew в сообщении #1435575 писал(а):
Если Вы намекаете,
Я даже не намекаю, я прямо говорю, что эти ваши 10% — это накладные расходы на запуск и завершение потоков. В моём коде потоки создаются однажды и однажды завершаются и всё, больше никаких обращений к ОС скорее всего не будет (критические секции уходят в ядро ОС только если ресурс занят достаточно долго, у меня такого не бывает). Так что у меня этих ваших 10% быть не должно. Вообще. Хоть на 4 потоках, хоть на 8.

SergeCpp в сообщении #1435577 писал(а):
Задача -- порция обрабатываемой информации. "Очень ориентировочно минимальный объём полезной работы на задачу должен быть порядка 10 000 тактов." После этого задача кончается и выполнявший её поток выбирает из очереди следующую задачу.
Ок, я именно это и предлагал. Только не 10000 тактов, а сильно больше, вплоть до 0.1с на кусок/(под)задачу, т.е. сотню миллионов тактов. Согласен даже что 10000 тактов по ихнему минимум, хотя почему не очень ясно (да читал, читал почему), ведь CriticalSection контекст не переключают при грамотном использовании. Т.е. в общем советы правильные, а в частных случаях можно и оптимальнее.
Не ругайтесь, я плохо владею терминологией, а задачу=task можно использовать и как кусок работы, и как выполняемый процессором процесс.

Andrey_Kireew в сообщении #1435578 писал(а):
Если вычисления идут пару недель, то до windows дела никакого нет, тут дотронуться до компа лишний раз боишься, не то что посторонние задачи на нём запускать.
Это плохо написанная программа. У меня программы годами считают, при этом я и компом пользуюсь, и перегружаю его (бывает и свет вырубается), и приостанавливаю счёт когда ресурсы срочно нужны, и т.д. Т.е. надо предусматривать контрольные точки и возможность перезапуска счёта с них. Для себя решил что примерно раз в час-три (смотря какой объём контрольной точки) достаточно сохранять состояние, уж столько времени потерять даже раз в неделю не страшно.
Если Вас волнуют 5% на недельных интервалах и более, читайте книги по оптимизации, изучайте SSE/AVX/OpenMP/GPGPU, ищите кластеры в аренду или подключайтесь к BOINC — всё это даст намного больше выигрыш. Выбор разумеется за Вами.

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 06:28 
Dmitriy40 в сообщении #1435581 писал(а):
Т.е. надо предусматривать контрольные точки и возможность перезапуска счёта с них

Вы правы, действительно неплохо так и сделать, но я пока до такого не дошел. Свет вырубился - и всё ... Но это всё уже лирика.

У меня есть в Вам Dmitriy40 ещё один вопрос. Правильно ли я понимаю, что такая реализация, которую я сейчас сделал самая быстрая и никакие многопоточные библиотеки, типа OpenMP не нужны? или наоборот, с ними будут какие то преимущества?

-- 17.01.2020, 07:33 --

Mihaylo в сообщении #1435579 писал(а):
надо вместо обращения к массиву по индексу A[i], использовать указатели и разыменовывание указателей

Это всё уже давно сделано, я хоть и не программист, но не до такой же степени ...

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 07:32 
Andrey_Kireew в сообщении #1435583 писал(а):
Правильно ли я понимаю, что такая реализация, которую я сейчас сделал самая быстрая и никакие многопоточные библиотеки, типа OpenMP не нужны?
На одном процессоре — да, не нужны, они точно не ускорят. У них другие преимущества: более простой способ распараллелить программу, запускать не на одном компьютере, практически независимость от количества задействованных компьютеров/процессоров, на одном же раскидать циклы по потокам. Последнее Вы уже сделали.
Но вот к примеру задействовав OpenCL можно запустить расчёты одновременно на CPU и GPU ... И если вся задача допускает деление на сотни и тысячи потоков (подзадач), то GPU может быть быстрее (и даже на порядки, а если ещё и трафик памяти огромный и регулярный, то ...).
Если же у Вас в коде используются более-менее стандартные преобразования (типа умножения матриц или векторов и т.п.), то Intel даёт библиотеку для их быстрого вычисления. Очень быстрого. Очень оптимизированную. Под все возможности конкретного процессора. Даже банальное копирование гигабайта данных лучше делать ей (memcpy в библиотеке компилятора, хотя и тоже умнее банального копирования байтов/слов, но всё же не оптимизирована под конкретный процессор, тем более в старых компиляторах).

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 07:39 
Dmitriy40 в сообщении #1435589 писал(а):
Intel даёт библиотеку для их быстрого вычисления. Очень быстрого

intel mkl ?

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 08:56 
Еще вариант - выполнить вычисления в облаке типа Амазона. Условно за деньги.

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 09:06 
Andrey_Kireew в сообщении #1435563 писал(а):
У меня нет отладчика,
Есть, но вы про него не догадываетесь.

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 09:11 
Andrey_Kireew в сообщении #1435592 писал(а):
intel mkl ?
Думаю да, Вы вроде бы с ней уже работали, так что мой совет излишен, увы. :-(

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 09:12 
SergeCpp

(Оффтоп)

SergeCpp в сообщении #1435572 писал(а):
Сейчас не буду, по упомянутому выше "нет компьютера" и "годы".

52? Да ладно!
Без компа, конечно, сложнее, но какие же это годы?


-- 17.01.2020, 09:17 --

SergeCpp в сообщении #1435555 писал(а):
Потому что new использует межпоточную синхронизацию
А ТС в статистике учитывает время на запуск расчётой нити?

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 09:22 
Если задача и железо позволяют, грех не воспользоваться CUDA, - там извратов с потоками (блоками, гридами, нитями и т.п.) гораздо меньше, а выигрыш, как правило, существенный...

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 09:40 
SergeCpp в сообщении #1433892 писал(а):
где написано: "общее количество задач должно быть не меньше чем, ну скажем, 16*количество_ядер (для обеспечения балансировки нагрузки)
Совет спорный. Кэши же будут сбрасываться при частом переключении задач. Плюс возрастают требования к суммарному размеру стеков всех нитей, и другие накладные расходы на нить.

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 12:03 
Аватара пользователя

(Оффтоп)

Mihaylo в сообщении #1435579 писал(а):
операция выделения памяти потокоопасна в принципе
Что вы тут понимаете под операцией выделения памяти - соответствующий системный вызов? Его естественно можно делать без всяких синхронизаций (иначе пришлось бы процессам приходилось синхронизироваться друг с другом). Другое дело что обычно стараются не делать системный вызов на каждую аллокацию, а выделяют заранее большой кусок и дальше берут память из него - тут уже да, нужно следить.
Dmitriy40 в сообщении #1435589 писал(а):
Даже банальное копирование гигабайта данных лучше делать ей
А бенчмарки есть? В основном MKL конечно шустрая, но по крайней мере 5 лет назад nrm2 почему-то работал раз в 10 медленнее чем dot + sqrt, так что странности в ней тоже бывают

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 12:15 
mihaild в сообщении #1435618 писал(а):
Его естественно можно делать без всяких синхронизаций
Менеджер виртуальной памяти в ядре синхронизирует подобные запросы сам.

Речь идёт про общую на все нити глобальную кучу процесса, на которой, по дефолту аллокирует память malloc, будучи вызван с произвольной нити. Обращения к этой куче сериализируются CRTL. В плюсах есть средства, чтобы этого избежать, используя для различных нитей различные аллокаторы. Но это высший пилотаж, так как последствия бывают неочевидными с самого начала. Проще память распределять до запуска расчетной нити, и внутри нити аллокациями не заниматься, если мизерное подтормаживание недопустимо.

 
 
 
 Распараллеливание программы (ядра/потоки)
Сообщение17.01.2020, 19:00 
realeugene в сообщении #1435605 писал(а):
SergeCpp в сообщении #1433892 писал(а):
где написано: "общее количество задач должно быть не меньше чем, ну скажем, 16*количество_ядер (для обеспечения балансировки нагрузки)
Совет спорный. Кэши же будут сбрасываться при частом переключении задач. Плюс возрастают требования к суммарному размеру стеков всех нитей, и другие накладные расходы на нить.
Вы тоже попались в ту же ловушку что и я: под задачами там понимаются не потоки и не процессы, а просто куски исходного объёма работы. :facepalm: Т.е. предлагают делить весь объём работы минимум на 16*Ncpu кусков, а количество потоков не оговаривают. Впрочем это тоже спорно, я обычно делю намного сильнее.
mihaild в сообщении #1435618 писал(а):
Dmitriy40 в сообщении #1435589 писал(а):
Даже банальное копирование гигабайта данных лучше делать ей
А бенчмарки есть? В основном MKL конечно шустрая, но по крайней мере 5 лет назад nrm2 почему-то работал раз в 10 медленнее чем dot + sqrt, так что странности в ней тоже бывают
У меня нет. Я даже саму MKL не щупал. И не уверен что там есть аналог memcpy. ;-) Просто читал историю как эта самая memcpy (под каким-то Linux, возможно и gcc) то тормозила, то ускорялась, каждые несколько лет, с выходом новых процессоров и компиляторов. А MKL должна сама оптимизироваться под конкретный процессор, а не в среднем как memcpy, что и должно быть всегда не медленнее. Но тут я с чужих слов.

 
 
 
 Re: Распараллеливание программы (ядра/потоки)
Сообщение11.02.2020, 12:29 
Andrey_Kireew в сообщении #1435583 писал(а):
OpenMP не нужны

OpenMP - это наиболее простой способ распараллеливания "для самых маленьких", с него можно было начинать, и, убедившись в его тормознутости, выкинуть на помойку (или наоборот)

Наиболее православный инструментарий для распараллеливания с разделяемой или общей памятью это MPI $+$ POSIX Threads (как нас учили в шараге). С общей памятью достаточно потоков, больше ничего не нужно.

Иногда удобно для каждого вычислительного потока в пару добавить "слушающий" поток, который ожидает и принимает данные от других потоков. Некоторые задержки могут уменьшиться. У вас судя по цифрам эффективность распараллеливания около 90%. Если данные между потоками не пересылаются и упор скорее в вычисления, чем в кэш и память, то это маловато для 4-х ядер.

Иллюстрация worker$+$listener (это просто лаба по параллельному программированию):
код: [ скачать ] [ спрятать ]
Используется синтаксис C++
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <malloc.h>

#define NON_DATA -2
#define KILL -1
// Количество задач в каждом списке
#define NUMBER_OF_TASKS 5000
//#define DEBUG

int rank, size;
int prinimau = 0;
// 2 объекта типа "описатель потока"
pthread_t thrs[2];
// Атрибуты потока
pthread_attr_t attrs;
// Мьютекс
pthread_mutex_t mutex;
// Счетчик для выполнения заданий
double global_res = 0;



class CTaskList
{
private:
        struct Task
        {
                int repeat_num;
                Task *next;
        } *m_head_list;
        int count_task;
public:
        CTaskList();
        ~CTaskList();
        void AddTask(int);
        int DelTask();
        int GetCountTask();
};

// Создание списка заданий
CTaskList::CTaskList()
{
        // Заголовок списка
        m_head_list = NULL;
        // Количество задач в списке
        count_task = 0;
}

// Уничтожение списка
CTaskList::~CTaskList()
{
        while(count_task > 0)
                DelTask();
}

// Добавление задания
void CTaskList::AddTask(int x)
{
        Task *p;
        if (!m_head_list)
        {       // Создаем первый элемент в списке
                m_head_list = new Task;
                p = m_head_list;
                p->repeat_num = x;
                p->next = NULL;
        }
        else
        {
                p = m_head_list;
                m_head_list = new Task;
                m_head_list->next = p;
                m_head_list->repeat_num = x;
        }
        count_task++;
}

// Удаление задания, возвращает удаленное задание
int CTaskList::DelTask()
{
        int x = m_head_list->repeat_num;
        if(m_head_list->next == NULL)
        {// одна задача в списке
                delete m_head_list;
                m_head_list = NULL;
        }
        else
        {
                Task *p;
                p = m_head_list->next;
                delete m_head_list;
                m_head_list = p;
        }
        count_task--;
        return x;
}

// Получение количества задач в списке
int CTaskList::GetCountTask()
{
        return count_task;
}



CTaskList task_list;

// Функция обработчика и закачевальщика заданий
void* worker(void* me)
{
        int task, i;
        MPI_Status st;
        // Счетчики для теста балансировки
        int count_work = 0;
        int work_sum = 0;
        double time_work = 0;
        double time_wait = 0;
        double timeStart, timeFin;
        double t1, t2;
        MPI_Request *reqs1 = new MPI_Request[size], *reqs2 = new MPI_Request[size];
        int *tasks_temp = new int[size];
#ifdef DEBUG
        printf(">>>Поток 0 процесса %d\tСТАРТОВАЛ\n", rank);
#endif
       
        timeStart = MPI_Wtime();
        t2 = timeStart;
        for(;;)
        {
                // Захват мьютекса
                pthread_mutex_lock(&mutex);
                while (task_list.GetCountTask() > 0)
                {
                        task = task_list.DelTask(); // Взяли задание
#ifdef DEBUG
                        printf("Процесс %d\tвыполняет задание %d,\tосталось %d заданий\n", rank, task, task_list.GetCountTask());
#endif
                        // Освобождение
                        pthread_mutex_unlock(&mutex);
                       
                        count_work++;
                        work_sum += task;
                        t1 = MPI_Wtime();
                        time_wait += t1 - t2;
                        for(i = 0; i < task; i++)// выполняем задачу  
                                global_res += sqrt(double(i));
                        t2 = MPI_Wtime();
                        time_work += t2 - t1;

                        // Захват мьютекса
                        pthread_mutex_lock(&mutex);
                }
#ifdef DEBUG
                printf("!!!Процесс %d:\tосталось %d заданий\n", rank, task_list.GetCountTask());
#endif
                // Освобождение
                pthread_mutex_unlock(&mutex);
                // Просим задание у других процессов
                for(i = size-1; i >= 0; i--)
                if(i!=rank)// Свои задания уже выполнили
                {
#ifdef DEBUG
                        printf("Процесс %d\tПРОСИТ ЗАДАНИЕ у процесса %d\n", rank, i);
#endif
                        MPI_Isend(&rank, 1, MPI_INT, i, 17, MPI_COMM_WORLD, &reqs1[i]);
                        MPI_Irecv(&tasks_temp[i], 1, MPI_INT, i, 18, MPI_COMM_WORLD, &reqs2[i]);
                }
                pthread_mutex_lock(&mutex);
                prinimau = 1;
                pthread_mutex_unlock(&mutex);
                // Принимаем задания
                for(i = size-1; i >= 0; i--)
                if(i!=rank)
                {
                        MPI_Wait(&reqs2[i], &st);
#ifdef DEBUG
                        printf("Процесс %d\tПОЛУЧИЛ ОТ %d\tзадание %d\n", rank, i, tasks_temp[i]);
#endif
                        if(tasks_temp[i] != NON_DATA)
                        {
                                pthread_mutex_lock(&mutex);
                                task_list.AddTask(tasks_temp[i]);
                                pthread_mutex_unlock(&mutex);
                        }
                }
                pthread_mutex_lock(&mutex);
                prinimau = 0;
                if(task_list.GetCountTask() > 0)
                {
#ifdef DEBUG
                        printf("!!!Процесс %d\t получил еще %d заданий\n", rank, task_list.GetCountTask());
#endif
                        pthread_mutex_unlock(&mutex);
                }
                else
                {
                        pthread_mutex_unlock(&mutex);
                        // Нет заданий - завершаем работу потока
                        break;
                }
        }
        task = KILL;
       
        for (i = 0; i < size; i++)
                if(i != rank)
                        MPI_Send(&task, 1, MPI_INT, i, 17, MPI_COMM_WORLD);
#ifdef DEBUG
        printf(">>>Процесс %d\tвыполнил %d задания и ЗАВЕРШИЛ РАБОТУ\n\n", rank, count_work);
#endif
        timeFin = MPI_Wtime();
        time_wait += MPI_Wtime() - t2;
        //printf("Процесс %d\tTime = %lf\tcount_work = %d\twork_sum = %d\ttime_work = %lf\ttime_wait = %lf\t (delta = %lf)\n", rank, timeFin - timeStart, count_work, work_sum, time_work, time_wait, (timeFin - timeStart) - time_work - time_wait);
        printf("%d\n%lf\n%lf\n%lf\n%d\n%d\n", rank, timeFin - timeStart, time_work, time_wait, count_work, work_sum);
        return NULL;
}


// Функция потока-слушателя
void* lister(void* me)
{
        MPI_Status st;
        int count_of_workers = size - 1, task, num;

#ifdef DEBUG
        printf(">>>Слушатель процесса %d\tСТАРТОВАЛ\n", rank);
#endif
        while (count_of_workers > 0)
        {
#ifdef DEBUG
                printf("Слушатель процесса %d\tОЖИДАЕТ...\n", rank);
#endif
                // Принимаем запрос от потока обработчика
                MPI_Recv(&num, 1, MPI_INT, MPI_ANY_SOURCE, 17, MPI_COMM_WORLD, &st);
                if (num != KILL)
                {
                        // Запрос на получение задачи
#ifdef DEBUG
                        printf("Слушатель процесса %d\tПОЛУЧИЛ ЗАПРОС ЗАДАНИЯ ОТ %d\n", rank, num);
#endif
                        pthread_mutex_lock(&mutex);
                        if(task_list.GetCountTask() > 0 && prinimau == 0)
                        {
                                task = task_list.DelTask();
                                pthread_mutex_unlock(&mutex);
                                MPI_Send(&task, 1, MPI_INT, num, 18, MPI_COMM_WORLD);
#ifdef DEBUG
                                printf("Слушатель процесса %d\tОТПРАВИЛ ЗАДАНИЕ %d\tПРОЦЕССУ %d\n", rank, task, num);
#endif
                                // Задание отправлено
                        }
                        else
                        {
                                pthread_mutex_unlock(&mutex);
                                task = NON_DATA;
                                MPI_Send(&task, 1, MPI_INT, num, 18, MPI_COMM_WORLD);
#ifdef DEBUG
                                printf("Слушатель процесса %d\tСООБЩИЛ процессу %d\t 'НЕТ ЗАДАНИЙ!'\n", rank, num);
#endif
                                // Вместо задачи отплавлено сообщение "нет заданий"
                        }
                }
                else
                {
                        // Некоторый поток-обработчик завершил работу
                        count_of_workers--;
#ifdef DEBUG
                        printf("Слушатель процесса %d\tОПОВЕЩЕН о гибели процесса %d\n", rank, st.MPI_SOURCE);
#endif
                }
        }
#ifdef DEBUG
        printf(">>>Слушатель процесса %d\tЗАВЕРШИЛ РАБОТУ\n", rank);
#endif
        MPI_Barrier( MPI_COMM_WORLD);
        return NULL;
}

int main(int argc, char **argv)
{
        double timeStart, timeFin;
    int i;
    int provide;
        // Инициализация параллельной части приложения
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provide);
        // Определение номера процесса в группе
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
        // Определение общего числа параллельных процессов
    MPI_Comm_size(MPI_COMM_WORLD, &size);
        if(rank == 0)
        {
#ifdef DEBUG
                printf("\n\n\tMPI_THREAD_MULTIPLE = %d\n", MPI_THREAD_MULTIPLE);
                printf("\tMPI_THREAD_SERIALIZED = %d\n", MPI_THREAD_SERIALIZED);
                printf("\tMPI_THREAD_FUNNELED = %d\n", MPI_THREAD_FUNNELED);
                printf("\tMPI_THREAD_SINGLE = %d\n", MPI_THREAD_SINGLE);
                printf("\tprovide = %d\n\n\n",provide);
#endif
        }
        if(provide < MPI_THREAD_MULTIPLE)
    {
                if(rank == 0)
                        perror("LOW PROVIDE!!\n");
                abort();
        }
        // Инициализация атрибутов потока
    if(0 != pthread_attr_init(&attrs))
    {
        perror("Cannot initialize attributes");
        abort();
    }
        // Установка атрибута "присоединенный"
    if(0 != pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_JOINABLE))
    {
        perror("Error in setting attributes");
        abort();
    }
        // Инициализация mutex
        if(0 != pthread_mutex_init(&mutex, NULL))
        {
                perror("Cannot initialize mutex");
                abort();       
        }
        // Инициализация списков задач для каждого процесса
        for (i = 0; i < NUMBER_OF_TASKS; i++)
                task_list.AddTask(rank*33333);
        MPI_Barrier( MPI_COMM_WORLD);
        if(rank==0)
        timeStart = MPI_Wtime();
        // Порождение 2 потоков
        if(0 != pthread_create(&thrs[0], &attrs, worker, NULL))
        {
                perror("Cannot create a thread for work");
                abort();
        }
        if(0 != pthread_create(&thrs[1], &attrs, lister, NULL))
        {
                perror("Cannot create a thread for listing");
                abort();
        }

        for(i = 0; i < 2; i++)
    if(0 != pthread_join(thrs[i], NULL))
    {
                perror("Cannot join a thread\n");
                abort();
        }
        MPI_Barrier(MPI_COMM_WORLD);
        if(rank==0)
        {      
                timeFin = MPI_Wtime();
                printf("Time: %lf\n", timeFin - timeStart);
        }
        // Освобождение ресурсов
        pthread_mutex_destroy(&mutex);
        pthread_attr_destroy(&attrs);
    MPI_Finalize();
    return 0;
}
 

 
 
 [ Сообщений: 210 ]  На страницу Пред.  1 ... 10, 11, 12, 13, 14


Powered by phpBB © 2000, 2002, 2005, 2007 phpBB Group