Concurrency with nothing, Part 2
Green threads (introducing concurrency to the nothing)
To understand concurrency, it is very useful to have access to a form of concurrency that we can tinker with. Here is one: green thread, also called stackful coroutines (which is not entirely adequate but is a close approximation).
This code probably only works on x86_64 GNU Extensions C++
// Build with: clang++ --std=c++20 -g -O1 schedtest.cpp
#include <array>
#include <span>
#include <memory>
#include <list>
#include <functional>
#include <iostream>
/**
* This stores the registers that must be saved and restored between context switches
*/
struct platform_data {
platform_data() = default;
platform_data(std::span<char> stack_str)
: stack_ptr(&*(stack_str.end()-16))
, base_ptr(stack_ptr)
{}
uint64_t rbx, r12, r13, r14, r15;
void* stack_ptr;
void* base_ptr;
void pull() __attribute__((always_inline))
{
__asm__ __volatile__(
"movq %%rsp, %0\n"
"movq %%rbp, %1\n"
"movq %%rbx, %2\n"
"movq %%r12, %3\n"
"movq %%r13, %4\n"
"movq %%r14, %5\n"
"movq %%r15, %6\n"
: "=m"(stack_ptr)
, "=m"(base_ptr)
, "=m"(rbx)
, "=m"(r12)
, "=m"(r13)
, "=m"(r14)
, "=m"(r15)
);
}
void* push(void* location) __attribute__((always_inline))
{
volatile void* volatile tmp = static_cast<char*>(stack_ptr) - sizeof(void*);
*static_cast<volatile void* volatile * volatile>(tmp) = location;
__asm__ __volatile__(
"movq %1, %%rsp\n"
"movq %2, %%rbp\n"
"movq %3, %%rbx\n"
"movq %4, %%r12\n"
"movq %5, %%r13\n"
"movq %6, %%r14\n"
"movq %7, %%r15\n"
"popq %0\n"
: "+r"(location)
: "m"(tmp)
, "m"(base_ptr)
, "m"(rbx)
, "m"(r12)
, "m"(r13)
, "m"(r14)
, "m"(r15)
: "memory"
);
return location;
}
};
/**
* Each green thread must be in one of those states
*/
enum class process_status {
inactive = 0, //< it was never started
running = 1, //< it is ongoing
waiting = 2, //< unused, for now...
finished = 3, //< For cleaning when done
zombie = 4 //< it is in an invalid state (cannot be deleted, but cannot proceed either)
};
struct process {
char* stack; //< Each process has its own stack
size_t sz; //< stack which has a size
platform_data scheduling_swapper; //< it has an area to put its information when paused
process_status state = process_status::inactive; //< a status
std::function<void()> fn; //< and probably a function to execute (except the main thread)
process(std::function<void()> _fn, size_t _sz = 16384)
: stack(new char[_sz])
, sz(_sz)
, scheduling_swapper(std::span<char>(stack, sz))
, fn(_fn)
{}
process(char* stack_base)
: stack(stack_base)
, sz(0)
{}
process(const process&) = delete;
~process() {
if(sz) delete[] stack;
}
};
/**
* We have to pre-declare that function; it is responsible for making the first stack frame of every process
*/
__attribute__((noinline)) struct system* spawner (struct system* sys);
struct system {
static system sys; // it is a singleton-ish (we do not enforce it for the sake of shortness of code)
std::list<std::unique_ptr<process>>
running,
waiting,
naughty;
std::unique_ptr<process> previous;
std::unique_ptr<process> current;
/**
* Grabs a process that can execute
*/
std::unique_ptr<process> one() {
auto v = std::move(running.back());
running.pop_back();
return v;
}
/**
* Handles disposal of a process that has yet to start
*/
void rid(std::unique_ptr<process> current) {
switch(current->state) {
case process_status::inactive:
case process_status::running:
running.push_front(std::move(current));
break;
case process_status::finished:
clean(std::move(current));
break;
case process_status::zombie:
naughty.push_front(std::move(current));
break;
case process_status::waiting:
waiting.push_front(std::move(current));
break;
}
}
/**
* Handles disposal of a finished process
*/
void clean(std::unique_ptr<process>) {}
/**
* Yields to a target process
*/
void yield_to(std::unique_ptr<process> target) noexcept {
current->scheduling_swapper.pull();
sys.rid(std::move(current));
current = std::move(target);
current->scheduling_swapper.push(this);
spawner(&sys);
}
/**
* Yields to a process in the list
*/
void yield() noexcept {
current->scheduling_swapper.pull();
sys.rid(std::move(current));
current = one();
current->scheduling_swapper.push(this);
spawner(&sys);
}
};
/**
* Executes the frocess in the current slot
*/
__attribute__((noinline)) struct system* spawner (struct system* sys) {
auto& proc = *system::sys.current;
if(proc.state == process_status::inactive) {
proc.state = process_status::running;
proc.fn();
proc.state = process_status::finished;
sys->current->scheduling_swapper.pull();
sys->yield();
}
return sys;
}
/**
* Evil global
*/
struct system system::sys;
All of this code is extremely finnicky and fragile, and relies on codegen not being an insufferable prick.
The ideas are the following:
To make a green thread
- Make a
process
space for it- A stack
- Storage for saving the execution state
- Any global also goes there
- Any threadlocal goes there too
- Yield to the new
process
- Save the currently executing
process
(for themain
one, create one artificially) - Swap to the new
process
registers - Execute the
spawner
- When
spawner
reaches its end:- put your process as
finished
- Yield to another thread
- put your process as
- Save the currently executing
Here you can see a very simple example:
int main() {
char c;
system::sys.current = std::make_unique<process>(&c);
std::cout << "1";
system::sys.current->state = process_status::running;
system::sys.yield_to(std::make_unique<process>([](){
std::cout << "2";
}));
std::cout << "3\n";
return 0;
}
The code is accessible here.
Thread-waiting mutexes
We can now combine our fast_bottleneck
from Part 1 with our scheduling of userspace threads:
// Build with: clang++ --std=c++20 -g -O1 schedtest.cpp
#include <array>
#include <span>
#include <memory>
#include <list>
#include <functional>
#include <iostream>
#include <map>
#include <chrono>
#include <exception>
#include <atomic>
#include <set>
#include <vector>
#include <optional>
struct platform_data {...}; //< No changes
enum class process_status {
inactive = 0,
running = 1,
waiting = 2,
finished = 3,
zombie = 4
};
struct process {
static int64_t counter; //< The process gets a global counter
char* stack;
size_t sz;
platform_data scheduling_swapper;
process_status state = process_status::inactive;
std::function<void()> fn;
int64_t t_id; //< And it also gets a "thread_id"
process(std::function<void()> _fn, size_t _sz = 16384) {...} // No changes
process(char* stack_base) {...} // No changes
process(const process&) = delete;
~process() {...} // No changes
};
int64_t process::counter = 0;
__attribute__((noinline)) struct system* spawner (struct system* sys);
struct system {
static system sys;
std::list<std::unique_ptr<process>>
running,
waiting,
naughty;
std::unique_ptr<process> previous;
std::unique_ptr<process> current;
std::unique_ptr<process> one() {...} // No changes
void rid(std::unique_ptr<process> current) {...} // No changes
void clean(std::unique_ptr<process>) {}
void yield_to(std::unique_ptr<process> target) noexcept {...} // No changes
void yield() noexcept {...} // No changes
/**
* A yield with a callback that receives the current thread to yank it before the scheduler runs it
*/
template<typename fn>
void steal_and_yield(fn func) noexcept {
current->scheduling_swapper.pull();
func(std::move(current));
current = one();
current->scheduling_swapper.push(this);
spawner(&sys);
}
};
__attribute__((noinline)) struct system* spawner (struct system* sys) {...} //< No changes
struct system system::sys;
/********************* ***********************/
class dirty_bottleneck {...}; // See part 1
/********************* ***********************/
template<typename T>
class lock_guard {
T& ref;
public:
lock_guard(T& _ref)
: ref(_ref)
{
ref.lock();
}
~lock_guard() {
ref.unlock();
}
};
/********************* ***********************/
using mutex_handle = size_t;
enum class thread_state {
locking,
waiting,
unlocking
};
enum class mutex_state {
remove = 0,
create = 1
};
using namespace std::chrono_literals;
void mutex_state_update(mutex_handle, mutex_state);
void signal_locking(thread_state state, mutex_handle mtx, int64_t thrd);
class fast_bottleneck {
static std::atomic<size_t> counter;
const mutex_handle handle;
dirty_bottleneck trigger_lock; //< A lock for the list of processes
std::list<std::unique_ptr<process>> waiting; //< The list of processes that failed locking and are waiting
std::atomic_bool flag;
[[nodiscard]] bool try_lock() {
bool f = false;
bool t = true;
return flag.compare_exchange_strong(f,t,std::memory_order::acquire);
}
[[nodiscard]] bool try_unlock() {
bool f = false;
bool t = true;
return flag.compare_exchange_strong(t,f,std::memory_order::release);
}
public:
fast_bottleneck()
: flag()
, handle(counter.fetch_add(1)) //< We have to initialize that
{
mutex_state_update(handle, mutex_state::create);
}
fast_bottleneck(fast_bottleneck&) = delete;
fast_bottleneck(fast_bottleneck&&) = delete;
fast_bottleneck& operator=(fast_bottleneck&) = delete;
fast_bottleneck& operator=(fast_bottleneck&&) = delete;
~fast_bottleneck() {
mutex_state_update(handle, mutex_state::remove);
}
void lock() {
while(not try_lock()) {
signal_locking(thread_state::waiting, handle, system::sys.current->t_id);
// We yank the process and put it in hiatus within the mutex
system::sys.steal_and_yield([&](std::unique_ptr<process> p){
lock_guard triggers(trigger_lock);
p->state = process_status::waiting;
waiting.push_front(std::move(p));
});
}
signal_locking(thread_state::locking, handle, system::sys.current->t_id);
}
void unlock() {
if(!try_unlock()) throw std::runtime_error("Unlocking failed in fast_bottleneck: potential double unlocking issue");
signal_locking(thread_state::unlocking, handle, system::sys.current->t_id);
{ // If we were waiting for anyone while unlocking, pop back one waiter
lock_guard triggers(trigger_lock);
if(waiting.size()) {
system::sys.running.push_front(std::move(waiting.back()));
waiting.pop_back();
}
}
}
};
dirty_bottleneck checker_lock;
dirty_bottleneck lister_lock;
std::map<int64_t, std::vector<mutex_handle>> owned_locks;
std::map<int64_t, std::optional<mutex_handle>> waiting_locks;
std::set<mutex_handle> locks_that_exist;
void mutex_state_update(mutex_handle mtx, mutex_state state) {...} // No changes
bool build_dependency_graph (
const mutex_handle mtx,
const int64_t thrd,
std::map<int64_t, std::vector<mutex_handle>>& owned_locks,
std::map<int64_t, std::optional<mutex_handle>>& waiting_locks
) {...} // No changes
void signal_locking(thread_state state, mutex_handle mtx, int64_t thrd) {...} // No changes
std::atomic<size_t> fast_bottleneck::counter;
Here we use the mutex to store the waiting processes and resume one of them on unlock, which will try locking it once more.
This can be used to implement any form of event waiting in a concurrent context, this is very efficient in terms of lost computation time and throughput, but increases latency significantly in real world systems due to the cost of context switching, deadlock detection and scheduling of intersticial processes.
This implementation is simplistic and is provided as is. It is probably not any flavour of production ready
Example:
fast_bottleneck A;
int main() {
char c;
system::sys.current = std::make_unique<process>(&c);
std::cout << "1" << std::endl;
A.lock();
system::sys.current->state = process_status::running;
system::sys.yield_to(std::make_unique<process>([](){
A.lock();
std::cout << "A" << std::endl;
A.unlock();
}));
A.unlock();
system::sys.yield();
system::sys.yield_to(std::make_unique<process>([](){
A.lock();
std::cout << "B" << std::endl;
A.unlock();
}));
std::cout << "3" << std::endl;
return 0;
}
The code is accessible here.
To give a train analogy:
- a spinlock would be an area for a train to wait while keeping its speed: a loop in the tracks, using lots of fuel
- a spinlock with exponential backing would be an equivalent, but the train slows down more and more, wasting a bit less fuel
- this condition waiting we saw today would be akin to a train stopping at a triage station until the way is free, wasting no fuel but requiring a complete stop
If you like my content and want more, please donate. If everyone that finds my content useful paid $1 every week, I would be able to produce content for 1 week a month without relying on other sources of income for that week.