Podręcznik
2. Współbieżność
2.2. Blokowanie zasobów
No dobrze - a jakbym chciał wyświetlać postęp sortowania? Dodajmy kod który co N porównań pozwoli na wyświetlenie aktualnej wartości:
...
const int N{1000};
...
/** Metoda sortuje pojedynczy wiersz, drukując co N porównań informację o postępie */
void sortFunction(std::deque<double>& array, const std::string visPrefix) {
auto callCount{0};
std::sort(array.begin(), array.end(), [&callCount, &visPrefix](double a, double b) {
if (++callCount % N == 0) {
std::cout << visPrefix << ": " << callCount << "\n";
}
return a > b;
});
std::cout << visPrefix << " skończony po " << callCount << " porównaniach\n" << std::flush;
}
int main() {
...
// sortowanie w jednym wątku
for (int i=0; i<toBeSorted.size(); ++i) {
sortFunction(toBeSorted[i], std::string("Wątek ")+std::to_string(i+1));
}
...
// sortowanie w wielu wątkach
for (int i=0; i<toBeSorted.size(); ++i) {
threads.emplace_back(sortFunction, std::ref(toBeSorted[i]), std::string("Wątek ")+std::to_string(i+1));
}
...
return 0;
}
Po wprowadzeniu zmian - znów mamy bałagan na konsoli ... mniejszy lub większy - to zależy od Waszego procesora, ale mamy ... Musimy się zabiezpieczyć jakoś przed wielodostępem do strumienia wyjściowego. Podobnie - jeśli oprócz sortowania chcielibyśmy np sumować wszystkie elementy do jednej zmiennej.
Do cełów blokowania zasobów wykorzystuje się tzw. mutex-y (ang. mutual exclusion). Ogólnie - ochronie powinny podlegać wszystkie zasoby które nie są zabezpieczone wewnętrznie przed wielodostępem. A zabezpieczone mogą być na różne sposoby. Zacznijmy od wprowadzenia operacji atomowych
Jeśli taka operacja będzie wykonywać zmianę jakiejś zmiennej - to oznacza że możemy ją bezpiecznie zmieniać z równolegle wykonywanych wątków. W C++ mamy wsparcie dla operacji atomowych w bibliotece standardowej. Jest tam zamieszczony szablon std::atomic który przekształca typ podany tam jako parametr na typ atomowy. Taki typ możemy bezpiecznie zmieniać w różnych wątkach:
#include "atomic"
#include "iostream"
#include "thread"
#include "vector"
std::atomic<int> acnt{0};
int cnt{0};
void f()
{
for (int n = 0; n < 10000; ++n) {
++acnt;
++cnt;
}
}
int main()
{
std::vector<std::thread> pool;
for (int n = 0; n < 10; ++n) {
pool.emplace_back(f);
}
for (auto& thr : pool) {
thr.join();
}
std::cout << "Licznik bezpieczny " << acnt << '\n';
std::cout << "Licznik bez zabezpieczeń " << cnt << '\n';
return 0;
}
Podejście z bezpośrednim wykorzystaniem szablonu atomic jest zalecane dla typów prostych. Dla skomplikowanych klas lepiej jest zapewnić własną kontrolę dostępu.
Zauważcie, że mając do dyspozycji operacje atomowe - mogę mieć zmienną, dzięki której oznaczam że korzystam z zasobu, i nikt inny nie powinien. Oczywiście tego kodu nie muszę robić sam - już jest dla mnie przygotowany, właśnie w formie mutex-ów. Sam mutex jest tworem, w którym najważniejsze są metody lock i unlock.
- Blokowanie (lock): Wątek wywołujący lock staje się właścicielem blokady. Jeśli inny wątek już posiada blokadę, to wątek wywołujący lock zostanie zablokowany (zostanie wstrzymany tok jego wykonania) do momentu, gdy blokada będzie dostępna.
#include "mutex" std::mutex myMutex; // ... myMutex.lock(); // Blokowanie mutex // ... chroniony kod ... myMutex.unlock(); // Odblokowywanie mutex ```
- Odblokowywanie (unlock): Wątek wywołujący unlock zwalnia blokadę, umożliwiając innym wątkom dostęp do chronionego zasobu.
Współpraca z std::mutex może również odbywać się za pomocą bardziej bezpiecznych konstrukcji. Są klasy unique_lock, scoped_lock czy shared_lock które automatycznie zarządzają blokadą i eliminują potrzebę ręcznego wywoływania metod lock i unlock. To pomaga w unikaniu błędów związanych z niedbalstwem w zwalnianiu blokad po zakończeniu operacji chronionej przez mutex.
- std::unique_lock: reprezentuje blokadę, która może być zarówno zablokowana, jak i odblokowana ręcznie. Dodatkowo - pozwala na ręczne zarządzanie blokadą, co jest przydatne w bardziej zaawansowanych scenariuszach, i może być kopiowany. Natomiast jego wadą jest brak możliwości obsługi równoległej wielu blokad w jednym obiekcie.
Przykład użycia std::unique_lock:
{ std::mutex myMutex; std::unique_lock<std::mutex> myLock(myMutex); // ... kod chroniony blokadą ... } // myLock jest automatycznie zwalniana po opuszczeniu zakresu
- std::scoped_lock to wygodna klasa, która umożliwia blokowanie wielu mutexów jednocześnie w sposób bezpieczny dla nich (eliminuje ryzyko zakleszczenia). Zapewnia automatyczne zarządzanie zajmowaniem i zwalnianiem wszyskich na raz. Natomiast nie daje nam możliwości ręcznego zwalniania, i nie może być kopiowana. Wykorzystuje się ją analogicznie do unique_lock
- std::shared_lock to współpracująca z std::unique_lock klasa, która pozwala na rozróżnienie typ dostępu. Zauważcie, że w przypadku odczytu wielodostęp nie jest problemem - tak więc blokowanie zasobu do odczytu nie ma większego sensu. Natomiast w momencie wystąpienia zapisu - nie powinny trwać ani żadne inne zapisy, ani jakiekolwiek odczyty. Dlatego też jeśli chcemy różnicować dostęp w zależności od tego czy będziemy zasób modyfikowali czy nie - to wykorzystujemy unique_lock przy próbach zapisu, oraz shared_lock przy odczytach. Resztę zrobi za nas kompilator.
Wróćmy zatem do naszego przykładu - zmodyfikujemy przykład, każdorazowo blokując standardowy strumień wyjściowy na czas wykonywania przez dany wątek zapisów.
#include "iostream"
#include "sstream"
#include "deque"
#include "random"
#include "algorithm"
#include "thread"
#include "mutex"
const int N{1000};
std::mutex consoleMx;
/** Metoda wypełnia tablicę przekazaną jej jako parametr liczbami losowymi.
* Tablica jest wypełniana liczbami losowymi z zakresu 1..100 */
void fillRandmCollection(std::deque<std::deque<double>>& array) {
std::random_device random_device;
std::mt19937 random_engine(random_device());
std::uniform_int_distribution<int> distribution_1_100(1, 100);
for (auto& row : array) {
for (auto& elem : row) {
elem = distribution_1_100(random_engine);
}
}
}
/** Metoda sortuje pojedynczy wiersz, drukując co N porównań informację o postępie */
void sortFunction(std::deque<double>& array, const std::string visPrefix) {
auto callCount{0};
std::sort(array.begin(), array.end(), [&callCount, &visPrefix](double a, double b) {
if (++callCount % N == 0) {
std::scoped_lock lck{consoleMx};
std::cout << visPrefix << ": " << callCount << "\n";
}
return a > b;
});
std::scoped_lock lck{consoleMx};
std::cout << visPrefix << " skończony po " << callCount << " porównaniach\n" << std::flush;
}
int main() {
// tablica do posortowania
std::deque<std::deque<double>> toBeSorted(10, std::deque<double>(1000, 0) );
// losowanie tablicy
fillRandmCollection(toBeSorted);
// sortowanie w jednym wątku
std::cout << "Sortuję w jednym wątku ... " << std::flush;
auto tstart = std::chrono::high_resolution_clock::now();
for (int i=0; i<toBeSorted.size(); ++i) {
sortFunction(toBeSorted[i], std::string("Wątek ")+std::to_string(i+1));
}
auto tstop = std::chrono::high_resolution_clock::now();
std::cout << " koniec, trwało " << std::chrono::duration_cast<std::chrono::milliseconds>(tstop-tstart).count()/1000.0 << " s\n";
// sortowanie w wielu wątkach
fillRandmCollection(toBeSorted);
std::deque<std::thread> threads;
std::cout << "Sortuję w wielu wątkach ... " << std::flush;
tstart = std::chrono::high_resolution_clock::now();
for (int i=0; i<toBeSorted.size(); ++i) {
threads.emplace_back(sortFunction, std::ref(toBeSorted[i]), std::string("Wątek ")+std::to_string(i+1));
}
for (auto& thread : threads) {
thread.join();
}
tstop = std::chrono::high_resolution_clock::now();
std::cout << " koniec, trwało " << std::chrono::duration_cast<std::chrono::milliseconds>(tstop-tstart).count()/1000.0 << " s\n";
return 0;
}
Powyższy kod wreszcie działa zgodnie z oczekiwaniami.