Набор Алгоритмов

Статья посвящена распараллеливанию пузырьковой сортировки. В статье содержутся: описание алгоритма; примеры на C#, JavaScript; примеры сравнения с алгоритмами, описанными в статье Алгоритмы сортировки #1, на языке C#; премимущества и недостатки алгоритма. Если у вас доступно больше чем одно ядро процессора, то данная сортировка МЕДЛЕННЕЕ сортировок описанных в статье Алгоритмы сортировки #1. Почему? Рассмотрим далее...

ОПАСНОСТЬ: Быстрая пузырьковая сортировка не быстрая, она МЕДЛЕННАЯ на практике. Дело в том, что она даже математически она не быстрая, если каждое ядро не зависимо, и не нужно создавать контекст потока для ядер. Если изобрели быструю сортировку быстрее, чем в статье Алгоритмы сортировки #1 проверьте её скорость на практике.

Алгоритмы-Заблуждения - Алгоритмы сортировки #2, Быстрая пузырьковая сортировка или Всегда тестируйте.

1. Описание алгоритма.

Для представления об алгоритмах сортировки прочитайте статью: Алгоритмы сортировки #1.

Анимация 1. Пример быстрой пузырьковой сортировки от меньшего к большему.

Давайте разделим входной массив на части и поручим потоку обрабатывать независимо каждую часть массива. Для бесконечного числа ядер, элемента выделяется два для одного потока, как показано на Анимации 1; один элемент в массиве остаётся лишним - для взаимодействия потоков (один элемент массива делается лишним для всех потоков, а не для каждого). В реальности потоков столько сколько ядер процессора.

Дадим описание этих двух алгоритмов:

Описание алгоритма для бесконечного числа ядер, с двумя элементами на поток (далее будем называть Sort):

Входные данные:

Алгоритм:

Алгоритм Ak(k):

Описание алгоритма для числа ядер для конкретного процессора, с количеством потоков равным суммарному числу ядер процессоров (далее будем называть KernelSort):

Входные данные:

Алгоритм:

Дополнительный входной параметр:

Условие: ОКР(Количество_элементов_в_массиве / kernelCount) >= 2

Алгоритм Ak2(k):

ОКР_В_БОЛ(Дробное) — если число Дробное содержит дробь, то возвражаем (Целое_количество_дробей + 1). Т.е. ОКР_В_БОЛ(1.0001) = 2.

ОКР(Дробное) — функция округления числа. Т.е. ОКР(0.1) = 0; ОКР(2.5) = 3.

2. Примеры.

Рассмотрим пример на C#, как реализацию вышепоставленного алгоритма. Так же рассмотрим пример алгоритма Sort (вышеописанного) на JavaScript, с учётом того, что в JavaScript нет многопоточности (сейчас), а только очередь обработки функций.

2.1. Примеры на C#.

Приведу пример реализации алгоритма "Быстрой" пузырьковой сортировки на языке программирования C#.

Основной класс для сортировки двумя вышеописанными алгоритмами (FastBubleSorter.cs):

using System;
using System.Collections.Generic;
using System.Threading;

namespace Sorting
{
    /// <summary>
    /// Класс для многопоточной пузырьковой сортировки.
    /// </summary>
    public class FastBubleSorter
    {
        private class ArrayForSorting
        {
            public List<int> Array;
            public bool SortOrder;
            public object LockObject = new object();
            public int PreparedElementsM;

            public ArrayForSorting(bool sortOrder, List<int> array)
            {
                SortOrder = sortOrder;
                Array = array;
            }

            public void SortTwoDigit(int firstIndex)
            {
                if(SortOrder)
                {
                    if(Array[firstIndex] > Array[firstIndex + 1])
                    {
                        var temp = Array[firstIndex];
                        Array[firstIndex] = Array[firstIndex + 1];
                        Array[firstIndex + 1] = temp;
                    }
                }
                else
                {
                    if (Array[firstIndex] < Array[firstIndex + 1])
                    {
                        var temp = Array[firstIndex];
                        Array[firstIndex] = Array[firstIndex + 1];
                        Array[firstIndex + 1] = temp;
                    }
                }

                lock(LockObject)
                {
                    PreparedElementsM = PreparedElementsM + 2;
                }
            }

            public void BlockSort(int firstIndex, int kernelCount)
            {
                int blockSize = (int)Math.Round((double)(Array.Count / kernelCount));

                if (SortOrder)
                {
                    for (int k = firstIndex; (k < (Array.Count - 1)) && ( (k - firstIndex) <= blockSize); k = k + 2)
                    {
                        if (Array[k] > Array[k + 1])
                        {
                            var temp = Array[k];
                            Array[k] = Array[k + 1];
                            Array[k + 1] = temp;
                        }
                    }

                }
                else
                {
                    for (int k = firstIndex; (k < (Array.Count - 1)) && ((k - firstIndex) <= blockSize); k = k + 2)
                    {
                        if (Array[k] < Array[k + 1])
                        {
                            var temp = Array[k];
                            Array[k] = Array[k + 1];
                            Array[k + 1] = temp;
                        }
                    }
                }

                lock (LockObject)
                {
                    PreparedElementsM = PreparedElementsM + blockSize;
                }
            }
        }

        ArrayForSorting arr;

        public FastBubleSorter(bool sortOrder, List<int> array)
        {
            arr = new ArrayForSorting(sortOrder, array);
        }

        public List<int> Sort()
        {
            for (var i = 0; i < arr.Array.Count; i++)
            {
                lock (arr.LockObject)
                {
                    arr.PreparedElementsM = 0;
                }
                var j = 0;

                if (i % 2 != 0)
                {
                    j = 1;
                }

                for (int k = j; k < arr.Array.Count - 1; k = k + 2)
                {
                    int ind = k;
                    Thread t = new Thread(() => { arr.SortTwoDigit(ind); });
                    t.Start();
                }

            _startCheck:
                lock(arr.LockObject)
                {
                    if(arr.PreparedElementsM < arr.Array.Count - 1)
                    {
                        goto _startCheck;
                    }
                }

            }

            return arr.Array;
        }

        public List<int> KernelSort(int kernelCount)
        {
            int maxSortingIterations = (int) Math.Floor((double)(arr.Array.Count * (1 + kernelCount / arr.Array.Count)));

            for (var i = 0; i < maxSortingIterations; i++)
            {
                lock (arr.LockObject)
                {
                    arr.PreparedElementsM = 0;
                }
                var j = 0;

                if (i % 2 != 0)
                {
                    j = 1;
                }

                int blockSize = (int) Math.Round((double)(arr.Array.Count / kernelCount));

                for (int k = j; k < arr.Array.Count - 1; k = k + blockSize)
                {
                    int ind = k;
                    Thread t = new Thread(() => { arr.BlockSort(ind, kernelCount); });
                    t.Start();
                }

            _startCheck:
                lock (arr.LockObject)
                {
                    if (arr.PreparedElementsM < arr.Array.Count - 1)
                    {
                        goto _startCheck;
                    }
                }

            }

            return arr.Array;
        }

        static public string ToString(List<int> array)
        {

            string result = "[";

            for (var i = 0; i < array.Count; i++)
            {

                result += array[i];

                if (i < (array.Count - 1))
                {
                    result += ", ";
                }
            }

            result += "]";

            return result;
        }
    }
}

Метод Main программы для запуска сортировки по алгоритму Sort:

static void Main(string[] args)
{
    FastBubleSorter fbs = new FastBubleSorter(false, new List<int> { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 });
    Console.WriteLine(FastBubleSorter.ToString(fbs.Sort(4, false)));
    Console.ReadLine();
}

Метод Main программы для запуска сортировки по алгоритму KernelSort (для четырёх ядер процессора):

static void Main(string[] args)
{
    FastBubleSorter fbs = new FastBubleSorter(false, new List<int> { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 });
    Console.WriteLine(FastBubleSorter.ToString(fbs.KernelSort(4)));
    Console.ReadLine();
}

2.2. Примеры на JavaScript.

На данный момент в JavaScript нет многопоточности, только создание очереди обработки на основе async и await.

Рассмотрим пример реализации быстрой пузырьковой сортировки с реализацией на JavaScript:

В примере не реализована сортировка KernelSort, реализуйте её самостоятельно.

function FastBubleSorter(array, sortOrder, callback) {

    var self = this;

    self._private = {
        array: array,
        sortOrder: sortOrder,
        callback: callback,
        sortTwoDigit: async function (firstIndex) {
            if(self._private.sortOrder) {
                if(self._private.array[firstIndex] > self._private.array[firstIndex + 1]) {
                    var temp = self._private.array[firstIndex];
                    self._private.array[firstIndex] = self._private.array[firstIndex + 1];
                    self._private.array[firstIndex + 1] = temp;
                }
            }
            else {
                if(self._private.array[firstIndex] < self._private.array[firstIndex + 1]) {
                    var temp = self._private.array[firstIndex];
                    self._private.array[firstIndex] = self._private.array[firstIndex + 1];
                    self._private.array[firstIndex + 1] = temp;
                }
            }
        }
    };

    self.sort = async function () {
        for(var i = 0; i < self._private.array.length; i++) {

            var j = 0;

            if(i % 2 != 0) {
                j = 1;
            }

            for(j; j < self._private.array.length - 1; j = j + 2) {
                await self._private.sortTwoDigit(j);
            }
            
        }

        self._private.callback();
    };

    self.getArray = function () {
        return self._private.array;
    };
}

Рассмотрим параметры конструктора класса FastBubleSorter(array, sortOrder, callback):

Пример использования:

var arr = new FastBubleSorter(
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], false,
    function () {
        alert(arr.getArray());
    }
);
arr.sort();

3. Сравнение алгоритма.

3.1. Примеры на C#.

Для тестирования данных алгоритмов напишем вспомогательный класс с классической однопоточной пузырьковой сортировкой.

Класс-помощник для тестирования (FBSTester.cs):

using System.Collections.Generic;

namespace FastBubleSorting
{
    public class FBSTester
    {

        public static List<int> NewInverseArray(int lenght, bool order)
        {
            List<int> result = new List<int>();

            if (order)
            {
                for (int k = 0; k < lenght; k++)
                {
                    result.Add(k);
                }

            }
            else
            {
                for (int k = lenght - 1; k >= 0; k--)
                {
                    result.Add(k);
                }
            }
            return result;
        }

        public static bool TestArray(List<int> arr, bool order)
        {
            var result = true;

            for (var i = 0; (i < (arr.Count - 2)) && result; i++)
            {

                if (order)
                {
                    if (arr[i] > arr[i + 1])
                    {
                        result = false;
                    }
                }
                else
                {
                    if (arr[i] < arr[i + 1])
                    {
                        result = false;
                    }
                }
            }

            return result;
        }


        public static List<int> OneThreadSort(bool sortOrder, List<int> arr)
        {
            if (sortOrder)
            {
                for (int i = 0; i < arr.Count; i++)
                {

                    int k = 0;

                    if (i % 2 != 0)
                    {
                        k = 1;
                    }

                    for (int j = k; j < arr.Count - 1; j = j + 2)
                        if (arr[j] > arr[j + 1])
                        {
                            var temp = arr[j];
                            arr[j] = arr[j + 1];
                            arr[j + 1] = temp;
                        }
                }

            }
            else
            {
                for (var i = 0; i < arr.Count; i++)
                {

                    var k = 0;

                    if (i % 2 != 0)
                    {
                        k = 1;
                    }

                    for (int j = k; j < arr.Count - 1; j = j + 2)
                        if (arr[j] < arr[j + 1])
                        {
                            var temp = arr[j];
                            arr[j] = arr[j + 1];
                            arr[j + 1] = temp;
                        }
                }
            }

            return arr;
        }
    }
}

Описание методов класса:

Метод выполнения теста:

static void Main(string[] args)
{
    int arrNum = 1000;
    int arrLenght = 1000000;

    Console.WriteLine(String.Format("Начало тестирования производительности.\r\nТестирование производительности производится на {0} массивах длинной {1} элементов.", arrNum, arrLenght));

    Console.WriteLine("Начало тестирования обычной однопоточной пузырьковой сортировки.");

    DateTime dtbs = new DateTime(0);

    for(int i = 0; i < arrNum; i++)
    {
        Console.WriteLine(String.Format("Тест № {0}", i));

        // Генерация массива для тестированития
        var arr = FBSTester.NewInverseArray(100, true);

        // Время начала теста
        DateTime start = DateTime.Now;
        // Сортировка
        arr = FBSTester.OneThreadSort(false, arr);
        // Время окончания теста
        DateTime end = DateTime.Now;

        // Добавляем время в общее время сортировки.
        dtbs += end - start;
    }
    Console.WriteLine("Начало тестирования обычной многопоточной пузырьковой сортировки.");

    DateTime dtfbs = new DateTime(0);

    for (int i = 0; i < arrNum; i++)
    {
        Console.WriteLine(String.Format("Тест № {0}", i));

        // Генерация массива для тестированития
        var arr = FBSTester.NewInverseArray(100, true);

        // Время начала теста
        DateTime start = DateTime.Now;
        // Сортировка
        FastBubleSorter fbs = new FastBubleSorter(false, arr);
        arr = fbs.KernelSort(4);
        // Время окончания теста
        DateTime end = DateTime.Now;

        // Добавляем время в общее время сортировки.
        dtfbs += end - start;
    }

    Console.WriteLine(String.Format("\r\nИтоги тестирования сортировок:\r\nВремя выполнения пузырьковой сортировки: {0}\r\nВремя выполнения быстрой пузырьковой сортировки: {1}.\r\nВремя пузырьковой сортировки делённое на время быстрой пузырьковой сортировки: {2}.\r\nБыстрая пузырьковая сортировка медленнее в {3} раз(а).", dtbs.Ticks, dtfbs.Ticks, dtbs.Ticks / dtfbs.Ticks, dtfbs.Ticks / dtbs.Ticks));

    Console.ReadLine();
}

Как видно сложность алгоритма KernelSort: N x N x (1 + kernelCount / N), для одного потока (ядра). Хотя сложность обычной однопоточной пузырьковой сортировки: N x N,- что значительно меньше, чем выполняет сумма потоков и один поток. Что делает алгоритм KernelSort бессмысленным.

Вывод метода Main:

Начало тестирования производительности.
Тестирование производительности производится на 1000 массивах длинной 1000000 элементов.
Начало тестирования обычной однопоточной пузырьковой сортировки.
Тест № 0
Тест № 1
...
Тест № 998
Тест № 999
Начало тестирования обычной многопоточной пузырьковой сортировки.
Тест № 0
Тест № 1
...
Тест № 998
Тест № 999

Итоги тестирования сортировок:
Время выполнения пузырьковой сортировки: 1768892
Время выполнения быстрой пузырьковой сортировки: 82822977549.
Время пузырьковой сортировки делённое на время быстрой пузырьковой сортировки: 0.
Быстрая пузырьковая сортировка медвленнее в 46821 раз(а).

Как видно алгоритм работает Медленнее, это связано с тем, что суммарно надо выполнить больше команд, чем в классической пузырьковой сортировки.

3.2. Примеры на JavaScript.

На данный момент в JavaScript нет многопоточности, только создание очереди обработки на основе async и await. Поэтому сейчас мы сравнить не можем, возможно, в будующем в JavaScript для браузеров появится многопоточность и объекты синхронизации.

4. Блок схема алгоритма.

Блок-схема не преведена, т.к. алгоритм сложнее, чем классическая пузырьковая сортировка. Количество команд выполняемое всеми потоками больше, чем для однопоточной пузырьковой сортировки.

5. Преимущества и Недостатки.

Недостатки:

Преимущества: