Concurrency with nothing, Part 1
Concurrency is when several execution threads exist within a program and interract with a shared state.
An execution thread is not necessarily an operating system thread. It can be a lightweight thread, a coroutine, or even another process.
Semaphores
Semaphores are accessibility signaling devices. They are made of a counter that can be incremended and decremented up to a predetermined value.
/**
* @brief A fast semaphore exclusion handler WITHOUT deadlock detection or yielding
*/
template<std::integral T, T default_increment, T maximum_increment>
class fast_semaphore {
std::atomic<T> flag; //< This is our counter
public:
fast_semaphore() = default;
fast_semaphore(fast_semaphore&) = delete;
fast_semaphore(fast_semaphore&&) = delete;
/// 3 things may happen when trying to unlock
enum class tristate {
success = 1, //< The unlocking was successful
timing = 0, //< Someone interfered with the unlocking
error = -1 //< The unlocking would over-unlock the semaphore
};
/// We try locking until we succeed
template<T increment = default_increment>
void lock() {
while(not try_lock<increment>());
}
/// For locking, we try to atomically increment the counter while maintaining it below the set limit
template<T increment = default_increment>
[[nodiscard]] bool try_lock() {
T expect = flag.load(std::memory_order::acquire);
T target = expect + increment;
if(target > maximum_increment) return false;
return flag.compare_exchange_strong(expect,target,std::memory_order::release);
}
/// Similarly to locking, we try unlocking until we succeed (or reach an invalid state)
template<T increment = default_increment>
void unlock() {
tristate v;
do{ v = try_unlock<increment>(); }
while(v == tristate::timing);
if(v != tristate::success) {
throw std::runtime_error("Over unlocking may have happened: potential double unlocking issue");
}
}
/// Unlocking is the reverse of locking, we have to ensure to return an error if we try to go below zero
template<T increment = default_increment>
[[nodiscard]] tristate try_unlock() {
T expect = flag.load(std::memory_order::relaxed);
T target = expect - increment;
if(target < 0) return tristate::error;
return flag.compare_exchange_strong(expect,target,std::memory_order::release) ? tristate::success : tristate::timing;
}
};
}
A semaphore blocks when trying to exceed its counter maximum value.
The above semaphore is a spinlock: if you try to lock or unlock, it will continue trying without letting another thread execute. This can result in deadlocks on systems with very low thread count, and in jitter[1] with very contested resources in other systems.
It is possible to improve the above semaphore into a reader/writer lock with a "readers wait for writers" scheme with the following method:
void lock_super() {
auto value = flag.fetch_add(maximum_increment, std::memory_order::relaxed);
while(value > maximum_increment) { value = flag.load(std::memory_order::release); }
}
This prevents other readers from locking while a writer is waiting.
Yielding
Concurrency on modern computers, particularly modern operating systems, makes it important to advertise to the system when a thread of execution reaches a state where it will wait and need to stop consuming resources waiting.
This goes with yielding execution to another subsystem.
In C++, there exist a std::this_thread::yield()
, it makes the current thread
yield execution to another thread
with identical priority. It guarrantees minimal CPU downtime but may not yield execution to another process.
C++ also offers other tools for making your process inactive. Lets replace the above lock
function for the semaphore:
/// We try locking until we succeed
template<T increment = default_increment>
void lock() {
constexpr std::chrono::milliseconds max{1};
std::chrono::nanoseconds wait{256};
while(not try_lock<increment>()) {
std::this_thread::sleep_for(wait);
wait += wait < max ? wait/2 : 0;
}
}
This is a more reasonable implementation of a semaphore[2]: we wait more and more everytime we fail locking[3].
But there is more we can do to our concurrency primitives: we can sometimes detect when they fail. This however will sacrifice some performance when failing to lock.
Deadlocks and Deadlock detection
"if we named it Bottleneck instead of Mutex, people may actually think twice before using them" — Anthony Williams
Let's make two binary semaphores[4]
using bottleneck = fast_semaphore<int, 1, 1>;
bottleneck A;
bottleneck B;
int A_value = 5;
int B_value = 4;
int main() {
std::jthread A_first{[](){
A.lock();
std::this_thread::sleep_for(15ms);
B.lock();
A_value += B_value;
B.unlock();
A.unlock();
}};
std::jthread B_first{[](){
B.lock();
std::this_thread::sleep_for(15ms);
A.lock();
B_value *= A_value;
A.unlock();
B.unlock();
}};
return EXIT_SUCCESS;
}
While the standard doesn't say if this code should terminate or not on the abstract machine, but in all likelhood, this C++20 program would not terminate with our implementation of the binary semaphore: there is a deadlock situation: both threads require A and B to be locked, but try to lock them in a different order and not atomically.
Before trying to remedy to the problem, let's try detecting it.
A more simple bottleneck
using mutex_handle = size_t;
enum class thread_state {
locking,
waiting,
unlocking
};
enum class mutex_state {
remove = 0,
create = 1
};
using namespace std::chrono_literals;
class fast_bottleneck {
/** This is a secret tool that will help us later **/
static std::atomic<size_t> counter;
const mutex_handle handle;
std::atomic_bool flag;
[[nodiscard]] bool try_lock() {
bool f = false;
bool t = true;
return flag.compare_exchange_strong(f,t,std::memory_order::acquire);
}
[[nodiscard]] bool try_unlock() {
bool f = false;
bool t = true;
return flag.compare_exchange_strong(t,f,std::memory_order::release);
}
public:
fast_bottleneck()
: flag()
, handle(counter.fetch_add(1)) //< We have to initialize that
{
mutex_state_update(handle, mutex_state::create);
}
fast_bottleneck(fast_bottleneck&) = delete;
fast_bottleneck(fast_bottleneck&&) = delete;
fast_bottleneck& operator=(fast_bottleneck&) = delete;
fast_bottleneck& operator=(fast_bottleneck&&) = delete;
~fast_bottleneck() {
mutex_state_update(handle, mutex_state::remove);
}
void lock() {
/// The exponential backing variables
constexpr std::chrono::milliseconds max{1};
std::chrono::nanoseconds wait{256};
while(not try_lock()) {
/// The implementation of our little trick when waiting
signal_locking(thread_state::waiting, handle, std::this_thread::get_id());
/// The exponential backing
std::this_thread::sleep_for(wait);
wait += wait < max ? std::chrono::nanoseconds(wait.count()/2) : 0ns;
}
/// The implementation of our little trick when locking
signal_locking(thread_state::locking, handle, std::this_thread::get_id());
}
void unlock() {
if(!try_unlock()) throw std::runtime_error("Unlocking failed in fast_bottleneck: potential double unlocking issue");
/// The implementation of our little trick when unlocking
signal_locking(thread_state::unlocking, handle, std::this_thread::get_id());
}
};
Detecting deadlocks
void signal_locking(thread_state state, mutex_handle mtx, std::thread::id thrd) noexcept(false);
This is the signature of the function that will handle all the detection of bad locking.
void signal_locking(thread_state state, mutex_handle mtx, std::thread::id thrd) noexcept(false) {
bool bad = false;
checker_lock.lock();
{
switch(state) {
case thread_state::locking: {
waiting_locks[thrd].reset();
owned_locks[thrd].push_back(mtx);
} break;
case thread_state::unlocking: {
auto it = std::find(owned_locks[thrd].begin(), owned_locks[thrd].end(), mtx);
if(it != owned_locks[thrd].end()) {
owned_locks[thrd].erase(it);
}
} break;
case thread_state::waiting: {
waiting_locks[thrd] = mtx;
bad = build_dependency_graph(mtx, thrd, owned_locks, waiting_locks);
} break;
}
}
checker_lock.unlock();
if(bad) throw std::runtime_error("Deadlock detected");
}
It mostly manages data about existing locks, but when waiting occurs, it checks if the dependency graph made by waiting locks depending on locks locked by the same thread. If that graph is acyclic, there is no deadlock, else we ave a bad graph and throw an exception.
bool build_dependency_graph (
const mutex_handle mtx,
const std::thread::id thrd,
std::map<std::thread::id, std::vector<mutex_handle>>& owned_locks,
std::map<std::thread::id, std::optional<mutex_handle>>& waiting_locks
) {
std::map<mutex_handle, std::set<mutex_handle>> graph;
for(auto& elem : waiting_locks) {
if(elem.second.has_value()) {
for(auto& n : owned_locks[elem.first]) {
graph[n].insert(elem.second.value());
}
}
}
lister_lock.lock();
auto nodes = locks_that_exist;
lister_lock.unlock();
bool happened = true;
while(happened) {
happened = false;
for(auto& n : nodes) {
if(graph[n].size() == 0)
{
happened = true;
for(auto v : graph) {
v.second.erase(n);
}
nodes.erase(n);
break;
}
}
}
return nodes.size();
}
Note that we keep a list of all existing nodes via lock constructors and destructors:
void mutex_state_update(mutex_handle mtx, mutex_state state) {
lister_lock.lock();
switch(state) {
case mutex_state::create: {
locks_that_exist.insert(mtx);
}break;
case mutex_state::remove: {
locks_that_exist.erase(mtx);
}break;
}
lister_lock.unlock();
}
Also note that this implementation uses an additional spinlock that is unguarded, The way it is used is guaranteed not to deadlock.
class dirty_bottleneck {
std::atomic_bool flag;
[[nodiscard]] bool try_lock() {
bool f = false;
bool t = true;
return flag.compare_exchange_strong(f,t,std::memory_order::acquire);
}
[[nodiscard]] bool try_unlock() {
bool f = false;
bool t = true;
return flag.compare_exchange_strong(t,f,std::memory_order::release);
}
public:
dirty_bottleneck() = default;
dirty_bottleneck(dirty_bottleneck&) = delete;
dirty_bottleneck(dirty_bottleneck&&) = delete;
void lock() {
while(not try_lock());
}
void unlock() {
if(!try_unlock()) throw std::runtime_error("Unlocking failed in dirty_bottleneck: potential double unlocking issue");
}
};
This is the most constrained those two locks can be:
Why this works?
Deadlocks are in essence cyclic dependencies: to take the lock that you are waiting for, you depend on other nodes, the ones you locked beforehands. To make the detections of those cycles, we "named" the locks with a counter. That way we can build a dependency graph and use Kahn algorithm to reduce the graph and remove any node that doesn't depend on any node recursively, this leaves only the nodes that form a cycle in the list.
In theory we could even point out the "names" of the faulty locks, in a system where locks can afford to be individually refered, this would allow debugging to effectively handle those very difficult to address error. We will however leave that as an exercise to the reader.
Conclusion
For now we have some very basic exclusion methods for concurency, in the next part, we will explore the reverse: systems for enabling threads to run when an event happens such as cooperative yields and condition variables.
Jitter is a way to describe inconsistent latency in the system, with more or less noticeable spikes of delay ↩︎
We would not change the
unlock
function: unlocking as fast as possible is worth contention on the semaphore nd the hurdle on the CPU, caches and memory ↩︎This is called exponential backing, the
n-th
time we wait, we wait for256·cn-1
withc
a coefficient (here it isc = 1.5
) ↩︎A binary semaphore has only two states, meaning it represent mutual exclusion. They are often named MutEx because of that. ↩︎
If you like my content and want more, please donate. If everyone that finds my content useful paid $1 every week, I would be able to produce content for 1 week a month without relying on other sources of income for that week.