/ English

Concurrency with nothing, Part 2

To read part 1, click here.

Green threads (introducing concurrency to the nothing)

To understand concurrency, it is very useful to have access to a form of concurrency that we can tinker with. Here is one: green thread, also called stackful coroutines (which is not entirely adequate but is a close approximation).

This code probably only works on x86_64 GNU Extensions C++

// Build with: clang++ --std=c++20 -g -O1 schedtest.cpp

#include <array>
#include <span>
#include <memory>
#include <list>
#include <functional>
#include <iostream>

/**
 * This stores the registers that must be saved and restored between context switches
 */
struct platform_data {
    platform_data() = default;
    
    platform_data(std::span<char> stack_str)
    : stack_ptr(&*(stack_str.end()-16))
    , base_ptr(stack_ptr)
    {}

    uint64_t rbx, r12, r13, r14, r15;

    void* stack_ptr;
    void* base_ptr;

    void pull() __attribute__((always_inline))
    {
        __asm__ __volatile__(
            "movq %%rsp, %0\n"
            "movq %%rbp, %1\n" 
            "movq %%rbx, %2\n" 
            "movq %%r12, %3\n" 
            "movq %%r13, %4\n" 
            "movq %%r14, %5\n" 
            "movq %%r15, %6\n" 
            : "=m"(stack_ptr)
            , "=m"(base_ptr)
            , "=m"(rbx)
            , "=m"(r12)
            , "=m"(r13)
            , "=m"(r14)
            , "=m"(r15)
        );
    }

    void* push(void* location)  __attribute__((always_inline))
    {
        volatile void* volatile tmp = static_cast<char*>(stack_ptr) - sizeof(void*);
        *static_cast<volatile void* volatile * volatile>(tmp) = location;
        __asm__ __volatile__(
            "movq %1, %%rsp\n"
            "movq %2, %%rbp\n"
            "movq %3, %%rbx\n"
            "movq %4, %%r12\n"
            "movq %5, %%r13\n"
            "movq %6, %%r14\n"
            "movq %7, %%r15\n"
            "popq %0\n"
            : "+r"(location)
            : "m"(tmp)
            , "m"(base_ptr)
            , "m"(rbx)
            , "m"(r12)
            , "m"(r13)
            , "m"(r14)
            , "m"(r15)
            : "memory"
        );
        return location;
    }
};

/**
 * Each green thread must be in one of those states
 */
enum class process_status {
    inactive = 0, //< it was never started
    running = 1, //< it is ongoing
    waiting = 2, //< unused, for now...
    finished = 3, //< For cleaning when done
    zombie = 4 //< it is in an invalid state (cannot be deleted, but cannot proceed either)
};

struct process {
    char* stack; //< Each process has its own stack
    size_t sz; //< stack which has a size
    platform_data scheduling_swapper; //< it has an area to put its information when paused
    process_status state = process_status::inactive; //< a status
    std::function<void()> fn; //< and probably a function to execute (except the main thread)

    process(std::function<void()> _fn, size_t _sz = 16384)
    : stack(new char[_sz])
    , sz(_sz)
    , scheduling_swapper(std::span<char>(stack, sz))
    , fn(_fn)
    {}

    process(char* stack_base)
    : stack(stack_base)
    , sz(0)
    {}

    process(const process&) = delete;

    ~process() {
        if(sz) delete[] stack;
    }
};

/**
 * We have to pre-declare that function; it is responsible for making the first stack frame of every process
 */
__attribute__((noinline)) struct system* spawner (struct system* sys);

struct system {
    static system sys; // it is a singleton-ish (we do not enforce it for the sake of shortness of code)

    std::list<std::unique_ptr<process>>
        running,
        waiting,
        naughty;
    
    std::unique_ptr<process> previous;
    std::unique_ptr<process> current;
    
    
    /**
     * Grabs a process that can execute
     */
    std::unique_ptr<process> one() {
        auto v = std::move(running.back());
        running.pop_back();
        return v;
    }

    /**
     * Handles disposal of a process that has yet to start
     */
    void rid(std::unique_ptr<process> current) {
        switch(current->state) {
            case process_status::inactive:
            case process_status::running:
                running.push_front(std::move(current));
                break;
            case process_status::finished:
                clean(std::move(current));
                break;
            case process_status::zombie:
                naughty.push_front(std::move(current));
                break;
            case process_status::waiting:
                waiting.push_front(std::move(current));
                break;
        }
    }

    /**
     * Handles disposal of a finished process
     */
    void clean(std::unique_ptr<process>) {}

    /**
     * Yields to a target process
     */
    void yield_to(std::unique_ptr<process> target) noexcept {
        current->scheduling_swapper.pull();
        sys.rid(std::move(current));
        current = std::move(target);
        current->scheduling_swapper.push(this);
        spawner(&sys);
    }

    /**
     * Yields to a process in the list
     */
    void yield() noexcept {
        current->scheduling_swapper.pull();
        sys.rid(std::move(current));
        current = one();
        current->scheduling_swapper.push(this);
        spawner(&sys);
    }
};


/**
 * Executes the frocess in the current slot
 */
__attribute__((noinline)) struct system* spawner (struct system* sys) {
    auto& proc = *system::sys.current;
    if(proc.state == process_status::inactive) {
        proc.state = process_status::running;
        proc.fn();
        proc.state = process_status::finished;
        sys->current->scheduling_swapper.pull();
        sys->yield();
    }
    return sys;
}

/**
 * Evil global
 */
struct system system::sys;

All of this code is extremely finnicky and fragile, and relies on codegen not being an insufferable prick.

The ideas are the following:

To make a green thread

  • Make a process space for it
    • A stack
    • Storage for saving the execution state
    • Any global also goes there
    • Any threadlocal goes there too
  • Yield to the new process
    • Save the currently executing process (for the main one, create one artificially)
    • Swap to the new process registers
    • Execute the spawner
    • When spawner reaches its end:
      • put your process as finished
      • Yield to another thread

Here you can see a very simple example:

int main() {
    char c;
    system::sys.current = std::make_unique<process>(&c);
    std::cout << "1";
    system::sys.current->state = process_status::running;
    system::sys.yield_to(std::make_unique<process>([](){
        std::cout << "2";
    }));
    std::cout << "3\n";
    return 0;
}

The code is accessible here.

Thread-waiting mutexes

We can now combine our fast_bottleneck from Part 1 with our scheduling of userspace threads:

// Build with: clang++ --std=c++20 -g -O1 schedtest.cpp

#include <array>
#include <span>
#include <memory>
#include <list>
#include <functional>
#include <iostream>
#include <map>
#include <chrono>
#include <exception>
#include <atomic>
#include <set>
#include <vector>
#include <optional>

struct platform_data {...}; //< No changes

enum class process_status {
    inactive = 0,
    running = 1,
    waiting = 2,
    finished = 3,
    zombie = 4
};

struct process {
    static int64_t counter; //< The process gets a global counter

    char* stack;
    size_t sz;
    platform_data scheduling_swapper;
    process_status state = process_status::inactive;
    std::function<void()> fn;
    int64_t t_id; //< And it also gets a "thread_id"

    process(std::function<void()> _fn, size_t _sz = 16384) {...} // No changes

    process(char* stack_base) {...} // No changes

    process(const process&) = delete;

    ~process()  {...} // No changes
};
int64_t process::counter = 0;


__attribute__((noinline)) struct system* spawner (struct system* sys);

struct system {
    static system sys;

    std::list<std::unique_ptr<process>>
        running,
        waiting,
        naughty;
    
    std::unique_ptr<process> previous;
    std::unique_ptr<process> current;
    
    std::unique_ptr<process> one()  {...} // No changes

    void rid(std::unique_ptr<process> current)  {...} // No changes

    void clean(std::unique_ptr<process>) {}

    void yield_to(std::unique_ptr<process> target) noexcept {...} // No changes

    void yield() noexcept {...} // No changes
    
    /**
     * A yield with a callback that receives the current thread to yank it before the scheduler runs it
     */
    template<typename fn>
    void steal_and_yield(fn func) noexcept {
        current->scheduling_swapper.pull();
        func(std::move(current));
        current = one();
        current->scheduling_swapper.push(this);
        spawner(&sys);
    }
};


__attribute__((noinline)) struct system* spawner (struct system* sys) {...} //< No changes

struct system system::sys;

/*********************       ***********************/

class dirty_bottleneck {...}; // See part 1

/*********************       ***********************/

template<typename T>
class lock_guard {
    T& ref;
public:
    lock_guard(T& _ref)
    : ref(_ref)
    {
        ref.lock();
    }

    ~lock_guard() {
        ref.unlock();
    }
};

/*********************       ***********************/

using mutex_handle = size_t;

enum class thread_state {
    locking,
    waiting,
    unlocking
};

enum class mutex_state {
    remove = 0,
    create = 1
};

using namespace std::chrono_literals;

void mutex_state_update(mutex_handle, mutex_state);
void signal_locking(thread_state state, mutex_handle mtx, int64_t thrd);

class fast_bottleneck {
    static std::atomic<size_t> counter;
    const mutex_handle handle;
    
    dirty_bottleneck trigger_lock; //< A lock for the list of processes
    std::list<std::unique_ptr<process>> waiting; //< The list of processes that failed locking and are waiting
    
    std::atomic_bool flag;

    [[nodiscard]] bool try_lock() {
        bool f = false;
        bool t = true;
        return flag.compare_exchange_strong(f,t,std::memory_order::acquire);
    }

    [[nodiscard]] bool try_unlock() {
        bool f = false;
        bool t = true;
        return flag.compare_exchange_strong(t,f,std::memory_order::release);
    }
public:
    fast_bottleneck()
    : flag()
    , handle(counter.fetch_add(1)) //< We have to initialize that
    {
        mutex_state_update(handle, mutex_state::create);
    }
    fast_bottleneck(fast_bottleneck&) = delete;
    fast_bottleneck(fast_bottleneck&&) = delete;
    fast_bottleneck& operator=(fast_bottleneck&) = delete;
    fast_bottleneck& operator=(fast_bottleneck&&) = delete;
    
    ~fast_bottleneck() {
        mutex_state_update(handle, mutex_state::remove);
    }


    void lock() {
        while(not try_lock()) {
            signal_locking(thread_state::waiting, handle, system::sys.current->t_id);
            
            // We yank the process and put it in hiatus within the mutex
            system::sys.steal_and_yield([&](std::unique_ptr<process> p){
                lock_guard triggers(trigger_lock);
                p->state = process_status::waiting;
                waiting.push_front(std::move(p));
            });
        }
        
        signal_locking(thread_state::locking, handle, system::sys.current->t_id);
    }

    void unlock() {
        if(!try_unlock()) throw std::runtime_error("Unlocking failed in fast_bottleneck: potential double unlocking issue");
        signal_locking(thread_state::unlocking, handle, system::sys.current->t_id);
        
        { // If we were waiting for anyone while unlocking, pop back one waiter
            lock_guard triggers(trigger_lock);
            if(waiting.size()) {
                system::sys.running.push_front(std::move(waiting.back()));
                waiting.pop_back();
            }
        }
    }
};

dirty_bottleneck checker_lock;
dirty_bottleneck lister_lock;
std::map<int64_t, std::vector<mutex_handle>> owned_locks;
std::map<int64_t, std::optional<mutex_handle>> waiting_locks;
std::set<mutex_handle> locks_that_exist;

void mutex_state_update(mutex_handle mtx, mutex_state state)  {...} // No changes

bool build_dependency_graph (
    const mutex_handle mtx,
    const int64_t thrd, 
    std::map<int64_t, std::vector<mutex_handle>>& owned_locks, 
    std::map<int64_t, std::optional<mutex_handle>>& waiting_locks
)  {...} // No changes

void signal_locking(thread_state state, mutex_handle mtx, int64_t thrd) {...} // No changes

std::atomic<size_t> fast_bottleneck::counter;

Here we use the mutex to store the waiting processes and resume one of them on unlock, which will try locking it once more.

This can be used to implement any form of event waiting in a concurrent context, this is very efficient in terms of lost computation time and throughput, but increases latency significantly in real world systems due to the cost of context switching, deadlock detection and scheduling of intersticial processes.

This implementation is simplistic and is provided as is. It is probably not any flavour of production ready

Example:

fast_bottleneck A;

int main() {
    char c;
    system::sys.current = std::make_unique<process>(&c);

    std::cout << "1" << std::endl;
    A.lock();
    system::sys.current->state = process_status::running;
    system::sys.yield_to(std::make_unique<process>([](){
        A.lock();
        std::cout << "A" << std::endl;
        A.unlock();
    }));
    A.unlock();
    system::sys.yield();
    system::sys.yield_to(std::make_unique<process>([](){
        A.lock();
        std::cout << "B" << std::endl;
        A.unlock();
    }));
    std::cout << "3" << std::endl;
    return 0;
}

The code is accessible here.

To give a train analogy:

  • a spinlock would be an area for a train to wait while keeping its speed: a loop in the tracks, using lots of fuel
  • a spinlock with exponential backing would be an equivalent, but the train slows down more and more, wasting a bit less fuel
  • this condition waiting we saw today would be akin to a train stopping at a triage station until the way is free, wasting no fuel but requiring a complete stop
Concurrency with nothing, Part 2
Share this