Podręcznik

2. Współbieżność

2.2. Blokowanie zasobów

No dobrze - a jakbym chciał wyświetlać postęp sortowania? Dodajmy kod który co N porównań pozwoli na wyświetlenie aktualnej wartości:


...
const int N{1000};
...
/** Metoda sortuje pojedynczy wiersz, drukując co N porównań informację o postępie */
void sortFunction(std::deque<double>& array, const std::string visPrefix) {
    auto callCount{0};
    std::sort(array.begin(), array.end(), [&callCount,  &visPrefix](double a, double b) {
        if (++callCount % N == 0) {
            std::cout << visPrefix << ": " <<  callCount << "\n";
        }
        return a > b;
    });
    std::cout << visPrefix << " skończony po " << callCount << " porównaniach\n" << std::flush;
}

int main() {
    ...
    // sortowanie  w jednym wątku
    for (int i=0; i<toBeSorted.size(); ++i) {
        sortFunction(toBeSorted[i], std::string("Wątek ")+std::to_string(i+1));
    }
    ...
    // sortowanie w wielu wątkach
    for (int i=0; i<toBeSorted.size(); ++i) {
        threads.emplace_back(sortFunction, std::ref(toBeSorted[i]), std::string("Wątek ")+std::to_string(i+1));
    }
    ...
    return 0;
}

Po wprowadzeniu zmian - znów mamy bałagan na konsoli ... mniejszy lub większy - to zależy od Waszego procesora, ale mamy ... Musimy się zabiezpieczyć jakoś przed wielodostępem do strumienia wyjściowego. Podobnie - jeśli oprócz sortowania chcielibyśmy np sumować wszystkie elementy do jednej zmiennej.

Do cełów blokowania zasobów wykorzystuje się tzw. mutex-y (ang. mutual exclusion). Ogólnie - ochronie powinny podlegać wszystkie zasoby które nie są zabezpieczone wewnętrznie przed wielodostępem. A zabezpieczone mogą być na różne sposoby. Zacznijmy od wprowadzenia operacji atomowych

Operacja atomowa to operacja, która jest niepodzielna i wykonuje się jako całość, bez możliwości przerwania przez inne operacje. Oznacza to, że operacja atomowa zostanie wykonana w całości lub w ogóle, a żadne inne operacje nie będą ingerować w jej przebieg.

Jeśli taka operacja będzie wykonywać zmianę jakiejś zmiennej - to oznacza że możemy ją bezpiecznie zmieniać z równolegle wykonywanych wątków. W C++ mamy wsparcie dla operacji atomowych w bibliotece standardowej. Jest tam zamieszczony szablon std::atomic który przekształca typ podany tam jako parametr na typ atomowy. Taki typ możemy bezpiecznie zmieniać w różnych wątkach:


#include "atomic"
#include "iostream"
#include "thread"
#include "vector"

std::atomic<int> acnt{0};
int cnt{0};

void f()
{
    for (int n = 0; n < 10000; ++n) {
        ++acnt;
        ++cnt;
    }
}

int main()
{
    std::vector<std::thread> pool;
    for (int n = 0; n < 10; ++n) {
        pool.emplace_back(f);
    }
    for (auto& thr : pool) {
        thr.join();
    }

    std::cout << "Licznik bezpieczny " << acnt << '\n';
    std::cout << "Licznik bez zabezpieczeń " << cnt << '\n';
    return 0;
}

Podejście z bezpośrednim wykorzystaniem szablonu atomic jest zalecane dla typów prostych. Dla skomplikowanych klas lepiej jest zapewnić własną kontrolę dostępu.

Zauważcie, że mając do dyspozycji operacje atomowe - mogę mieć zmienną, dzięki której oznaczam że korzystam z zasobu, i nikt inny nie powinien. Oczywiście tego kodu nie muszę robić sam - już jest dla mnie przygotowany, właśnie w formie mutex-ów. Sam mutex jest tworem, w którym najważniejsze są metody lock i unlock.

  • Blokowanie (lock): Wątek wywołujący lock staje się właścicielem blokady. Jeśli inny wątek już posiada blokadę, to wątek wywołujący lock zostanie zablokowany (zostanie wstrzymany tok jego wykonania) do momentu, gdy blokada będzie dostępna.
    
        #include "mutex"
    
        std::mutex myMutex;
    
        // ...
    
        myMutex.lock();  // Blokowanie mutex
        // ... chroniony kod ...
        myMutex.unlock();  // Odblokowywanie mutex
        ```
            
  • Odblokowywanie (unlock): Wątek wywołujący unlock zwalnia blokadę, umożliwiając innym wątkom dostęp do chronionego zasobu.

Współpraca z std::mutex może również odbywać się za pomocą bardziej bezpiecznych konstrukcji. Są klasy unique_lock, scoped_lock czy shared_lock które automatycznie zarządzają blokadą i eliminują potrzebę ręcznego wywoływania metod lock i unlock. To pomaga w unikaniu błędów związanych z niedbalstwem w zwalnianiu blokad po zakończeniu operacji chronionej przez mutex.

  • std::unique_lock: reprezentuje blokadę, która może być zarówno zablokowana, jak i odblokowana ręcznie. Dodatkowo - pozwala na ręczne zarządzanie blokadą, co jest przydatne w bardziej zaawansowanych scenariuszach, i może być kopiowany. Natomiast jego wadą jest brak możliwości obsługi równoległej wielu blokad w jednym obiekcie. Przykład użycia std::unique_lock:
    
    {
        std::mutex myMutex;
        std::unique_lock<std::mutex> myLock(myMutex);
        // ... kod chroniony blokadą ...
    }
    // myLock jest automatycznie zwalniana po opuszczeniu zakresu
            
  • std::scoped_lock to wygodna klasa, która umożliwia blokowanie wielu mutexów jednocześnie w sposób bezpieczny dla nich (eliminuje ryzyko zakleszczenia). Zapewnia automatyczne zarządzanie zajmowaniem i zwalnianiem wszyskich na raz. Natomiast nie daje nam możliwości ręcznego zwalniania, i nie może być kopiowana. Wykorzystuje się ją analogicznie do unique_lock
  • std::shared_lock to współpracująca z std::unique_lock klasa, która pozwala na rozróżnienie typ dostępu. Zauważcie, że w przypadku odczytu wielodostęp nie jest problemem - tak więc blokowanie zasobu do odczytu nie ma większego sensu. Natomiast w momencie wystąpienia zapisu - nie powinny trwać ani żadne inne zapisy, ani jakiekolwiek odczyty. Dlatego też jeśli chcemy różnicować dostęp w zależności od tego czy będziemy zasób modyfikowali czy nie - to wykorzystujemy unique_lock przy próbach zapisu, oraz shared_lock przy odczytach. Resztę zrobi za nas kompilator.

Wróćmy zatem do naszego przykładu - zmodyfikujemy przykład, każdorazowo blokując standardowy strumień wyjściowy na czas wykonywania przez dany wątek zapisów.


#include "iostream"
#include "sstream"
#include "deque"
#include "random"
#include "algorithm"
#include "thread"
#include "mutex"

const int N{1000};

std::mutex consoleMx;

/** Metoda wypełnia tablicę przekazaną jej jako parametr liczbami losowymi.
 * Tablica jest wypełniana liczbami losowymi z zakresu 1..100 */
void fillRandmCollection(std::deque<std::deque<double>>& array) {
    std::random_device random_device;
    std::mt19937 random_engine(random_device());
    std::uniform_int_distribution<int> distribution_1_100(1, 100);

    for (auto& row : array) {
        for (auto& elem : row) {
            elem = distribution_1_100(random_engine);
        }
    }
}

/** Metoda sortuje pojedynczy wiersz, drukując co N porównań informację o postępie */
void sortFunction(std::deque<double>& array, const std::string visPrefix) {
    auto callCount{0};
    std::sort(array.begin(), array.end(), [&callCount,  &visPrefix](double a, double b) {
        if (++callCount % N == 0) {
            std::scoped_lock lck{consoleMx};
            std::cout << visPrefix << ": " <<  callCount << "\n";
        }
        return a > b;
    });
    std::scoped_lock lck{consoleMx};
    std::cout << visPrefix << " skończony po " << callCount << " porównaniach\n" << std::flush;
}

int main() {
    // tablica do posortowania
    std::deque<std::deque<double>> toBeSorted(10, std::deque<double>(1000, 0) );

    // losowanie tablicy
    fillRandmCollection(toBeSorted);

    // sortowanie  w jednym wątku
    std::cout << "Sortuję w jednym wątku ... " << std::flush;
    auto tstart = std::chrono::high_resolution_clock::now();
    for (int i=0; i<toBeSorted.size(); ++i) {
        sortFunction(toBeSorted[i], std::string("Wątek ")+std::to_string(i+1));
    }
    auto tstop = std::chrono::high_resolution_clock::now();
    std::cout << " koniec, trwało " << std::chrono::duration_cast<std::chrono::milliseconds>(tstop-tstart).count()/1000.0 << " s\n";

    // sortowanie w wielu wątkach
    fillRandmCollection(toBeSorted);
    std::deque<std::thread> threads;
    std::cout << "Sortuję w wielu wątkach ... " << std::flush;
    tstart = std::chrono::high_resolution_clock::now();
    for (int i=0; i<toBeSorted.size(); ++i) {
        threads.emplace_back(sortFunction, std::ref(toBeSorted[i]), std::string("Wątek ")+std::to_string(i+1));
    }
    for (auto& thread : threads) {
        thread.join();
    }
    tstop = std::chrono::high_resolution_clock::now();
    std::cout << " koniec, trwało " << std::chrono::duration_cast<std::chrono::milliseconds>(tstop-tstart).count()/1000.0 << " s\n";

    return 0;
}

Powyższy kod wreszcie działa zgodnie z oczekiwaniami.