Podręcznik
2. Współbieżność
Współczesne procesory praktycznie zawsze są wyposażone w więcej niż jedną jednostkę wykonawczą. W takim wypadku – by w pełni wykorzystać moc takiego procesora, przygotowuje się dla niego aplikacje, które mają więcej niż jeden tok wykonania. Takie toki wykonania nazywa się wątkami. Pamiętajcie:
- Każda uruchomiona instancja programu to proces
- Każdy proces ma co najmniej jeden wątek
- Każdy wątek musi być przypisany do jakiegoś procesu
W ogromnym skrócie – różnica pomiędzy wątkiem a procesem polega głównie na dostępie do pamięci. Każdy proces ma przydzielany własny obszar pamięci na stosie i na stercie, który nie jest dostępny dla innych procesów. W przypadku wątków sytuacja jest inna – każdy wątek posiada własny stos, lecz stertę współdzieli z innymi wątkami w ramach tego samego procesu. Skoro współdzieli – to zawsze może pojawić się sytuacja, w której dwa wątki próbują zapisać coś do tego samego obszaru pamięci. Konsekwencje tego – jak się zapewne domyślacie – są tragiczne ... dla programu oczywiście ;>
Język C, oraz C++ w pierwszych wersjach nie wspierały bezpośrednio mechanizmów programowania współbieżnego. Jedyne co mogliśmy - to korzystać z API systemu operacyjnego, i wykonywać współbieżne zadania bez użycia blokad i mechanizmów komunikacji wspieranych przez język.
Od wersji C++11 standardu wprowadzono model pamięci, wątki, mutex-y, zmienne typu conditional variables i inne niskopoziomowe mechanizmy współbieżności. Standard C++14 rozszerza jeszcze tą listę o nowe mechanizmy blokowania, w C++17 wprowadzono mechanizmy umożliwiające równoległe wykonanie wybranych alogrytmów STL.
Następna duża zmiana została wprowadzona wraz ze standardem C++20 - gdzie pojawiły się atomowe inteligentne wskaźniki, obiekty typu future i premise, pakiety zadań, i kilka innych konstrukcji pozwalających na bardziej wysokopoziomową obsługę wielowątkowości znaną z innych języków jak choćby JS czy C#
My w tym podręczniku ograniczymy się jednak do zagadnień niskopoziomowych.
Wątki w C++
To, co na najniższym poziomie podlega zrównoleglaniu - to podprogram, w C++ reprezentowany przez funkcję. Zwyczajowo taką funkcję przeznaczoną do wykonania w innym wątku nazywa się zadaniem.
Sama funkcja może być reprezentowana bezpośrednio (nie ma jakiś specjalnych wymagań odnośnie jej sygnatury, poza faktem, iż nie powinna zwracać bezpośrednio wartości), można też wykorzystać obiekt funkcyjny (funktor). Aby natomiast wykonać ją w niezależnym toku - równolegle z innymi funkcjami w systemie, wykorzystujemy obiekt typu std::thread. Utworzenie tego obiektu pozwala na uruchomienie funkcji lub funktora równolegle do toku funkcji / metody w której został pierwotnie utworzony. Formalnie nie ma wymogu czekania na zakończenie wykonania równolegle utworzonej funkcji - niemniej jednak często chcemy wiedzieć kiedy utworzone wątki się zakończą. By wykryć ten moment wykorzystuje się metodę join.
void fKlasyczna();
class fFunktor {
public:
void operator()();
};
int main() {
std::thread watekA{fKlasyczna};
std::thread watekB{fFunktor()};
// od tego momentu wątki wykonują się równolegle.
// jak chcemy na nie poczekać:
watekA.join();
watekB.join();
std::cout << "Zakończono wykonanie obu wątków";
return 0;
}
Powyższy kod nie pokazuje jeszcze problemów związanych ze współbieżnością - wątki nie korzystają ze wspólnych zasobów (a przynajmniej tego w nich nie widać).
Wystarczy jednak uzupełnić go o dodatkowe elementy związane choćby z drukowaniem na standardowym wyjściu korzystając z cout - który nie jest w pełni zabezpieczony przed wielodostępem. Jedyne co mamy gwarantowane - to fakt że pojedyncza operacja zapisu nie będzie przerwana (jest operacją atomową). Natomiast jeśli zmodyfikujemy powyższy przykład dodając implementację metod:
#include "iostream"
#include "thread"
void fKlasyczna() {
for (int i{0}; i<100; ++i) {
std::cout << "Ala " << "ma " << "kota " << "\n";
}
}
class fFunktor {
public:
void operator()() {
for (int i{0}; i<100; ++i) {
std::cout << "Mateusz " << "uprawia " << "fiołki " << "\n";
}
}
};
int main() {
std::thread watekA{fKlasyczna};
std::thread watekB{fFunktor()};
watekA.join();
watekB.join();
std::cout << "Zakończono wykonanie obu wątków";
return 0;
}
Uruchamiając ten kod możecie się dowiedzieć, że Mateusz uprawia kota ... Programując współbieżnie pamiętajcie o tym, by zadania wykonywane równolegle były od siebie kompletnie rozdzielone. A jeśli już muszą się wymienić jakimiś informacjami - to niech to odbywa się pod kontrolą.
Zazwyczaj wywołujemy funkcję w niezależnym wątku w pewnym celu - żeby coś zrobiła, co też zazwyczaj wymaga podania jej pewnych parametrów (argumentów). W przypadku std::thread funkcja przekazana jako argument w konstrukotrze sama może przyjąć dowolną listę argumentów, przekazywanych zarówno poprzez wartość, jak i referencję.
Programując wspólbieżnie staramy się nie przekazywać argumentów przez referencję. W przypadku przekazania przez referencję - nie mamy gwarancji, że przekazane dane są do wyłącznej dyspozycji funkcji (bo ma ona ich kopię) - i inny równolegle wykonywany wątek może także je zmienić. Dlatego też preferuje się przekazywanie przez wartość.
Przygotujmy więc program, który wykorzysta wiele wątków do czegoś innego niż proste uruchamianie funcji. Będziemy chcieli zrobić aplikację, która równolegle w kilku wątkach zsumuje wiersze w dwuwymiarowym wektorze. Przy czym najpierw kod wykonamy sekwencyjnie, potem równolegle, a na koniec porównamy czas w jakim te zadania zostały wykonane.
#include "iostream"
#include "deque"
#include "random"
#include "algorithm"
#include "thread"
/** Metoda wypełnia tablicę przekazaną jej jako parametr liczbami losowymi.
* Tablica jest wypełniana liczbami losowymi z zakresu 1..100 */
void fillRandmCollection(std::deque<std::deque<double>>& array) {
std::random_device random_device;
std::mt19937 random_engine(random_device());
std::uniform_int_distribution<int> distribution_1_100(1, 100);
for (auto& row : array) {
for (auto& elem : row) {
elem = distribution_1_100(random_engine);
}
}
}
/** Metoda sumuje pojedynczy wiersz */
void sortFunction(std::deque<double> array, double *result) {
*result = 0;
// celowo robimy przeciągnięte w czasie sumowanie
for (auto item : array) {
*result += item;
std::this_thread::sleep_for(std::chrono::nanoseconds (50));
}
}
int main() {
// tablica do posortowania
std::deque<std::deque<double>> toBeSorted(10, std::deque<double>(10000, 0) );
// tablica na sumy częściowe
std::vector<double> results{10, 0.0};
// losowanie tablicy
fillRandmCollection(toBeSorted);
// sumowanie w jednym wątku
std::cout << "Sumuję w jednym wątku ... " << std::flush;
auto tstart = std::chrono::high_resolution_clock::now();
for (int i=0; i<toBeSorted.size(); ++i) {
sortFunction(toBeSorted[i], &results[i]);
}
auto tstop = std::chrono::high_resolution_clock::now();
std::cout << " koniec, trwało " << std::chrono::duration_cast<std::chrono::milliseconds>(tstop-tstart).count()/1000.0 << " s\n";
// sumowanie w wielu wątkach
std::deque<std::thread> threads;
std::cout << "Sumuję w wielu wątkach ... " << std::flush;
tstart = std::chrono::high_resolution_clock::now();
for (int i=0; i<toBeSorted.size(); ++i) {
threads.emplace_back(sortFunction, toBeSorted[i], &results[i]);
}
for (auto& thread : threads) {
thread.join();
}
tstop = std::chrono::high_resolution_clock::now();
std::cout << " koniec, trwało " << std::chrono::duration_cast<std::chrono::milliseconds>(tstop-tstart).count()/1000.0 << " s\n";
return 0;
}
W kodzie powyżej mamy małe oszustwo - sumowanie jest sztucznie przedłużane. Jeśli z tego przedłużania zrezygnujecie - może się okazać, że czas wykonana obu wersji jest podobny. No cóż - pamiętajcie że uruchomienie wątku, a potem końcowa synchronizacja (oczekiwanie na ukończenie) - trwa ... czasem dłużej niż czas który jest potrzebny do wykonania zadania. Nie zawsze wielowątkowość jest najlepszym rozwiązaniem.