W jaki sposób zmodyfikować metodę Push, która jest single producer tak, żeby, bez blokowania, działała jako multi producer?

0

Mam klasę kolejki, która działa super gdy jest tylko jeden wytwórca. Kolejka to prosty ring buffer. Mam już metodę Pop zaimplementowaną dla kilku czytających i działa jak natura chciała. Nie mogę jednak poprawnie napisać metody puszującej. Próbowałem pozycję do której chcemy pusznąć element wziąć w pętlę CAS, ale apka się po prostu zawiesza i nie chce wyjść z tej pętli.

template<typename T, uint size = 1>
struct mpWriter
{
    bool push(const T& element)
    {
        while (true) {
            uint oldWritePosition = writePosition.load();
            uint newWritePosition = getPositionAfter(oldWritePosition);

            // jeśli pozycja wpisania jest starą pozycją
            // zamień ją na nową 
            // i dodaj dane na starej pozycji
            // i zwróć true
            // jeśli to w ogóle ma jakiś sens
            if (writePosition.compare_exchange_strong(oldWritePosition, newWritePosition)) {
                ringBuffer[oldWritePosition].store(element);
                return true;
            }
        }
    }

    static constexpr uint getPositionAfter(uint position) noexcept
    {
        return ++position == ringBufferSize ? 0 : position;
    }

    static constexpr uint ringBufferSize = size + 1;
    std::atomic<T> ringBuffer[ringBufferSize];
    std::atomic<uint> writePosition = 0;
};
int main()
{
    mpWriter<int, 50> ints;
    std::vector<std::thread> producers;

    for(int i = 0; i < 50; ++i) {
        producers.push_back(std::thread([&](){
            ints.push(0);
        }));
    }

    for (auto& producer : producers) {
        producer.join();
    }

    return 0;
}

Co jest nie tak z tym kodem? Jak poprawnie to napisać? Nie chce używać już napisanych kolejek. Chce zrozumieć o co w tym chodzi.

1

No skoro Ci mieli while() w nieskończoność to chyba warto się przyjrzeć jakie warunki doprowadzają do tego, że nigdy nie jest wykonywany return.

1

ileś tam wątków czyta writePosition, wykonuje coś innego i myśli że nadal writePosition jest taka sama? skoro inny wątek ją może zmienić?

0

No właśnie pytanie ile wątków wisi na tej pętli. Nie jest tak, że jeden chodzi dobrze, a kręcą się pozostałe?

0

Tutaj jest przykładowa implementacja Lock Free Ring Buffer i jak widać jest to zadanie o dużym poziomie skomplikowania (porównaj sobie ze swoim kodem).
W Twoim przypadku ( jeżeli musiałbym wykorzystać technikę lock-free ) użył bym zwykłego stosu.

template<typename T>
class stack
{
private:
    struct node
    {
        std::shared_ptr<T> data;
        node* next;
        node(T const& data_) :  data {std::make_shared<T>(data_)}   {}
    };
    std::atomic<node*> head;
public:
    void push(T const& data)
    {
        node* const new_node=new node(data);
        new_node->next=head.load();
        while( !head.compare_exchange_weak(new_node->next,new_node) );
    }
    std::shared_ptr<T> pop()
    {
        node* old_head=head.load();
        while( old_head && !head.compare_exchange_weak(old_head,old_head->next) );
        return old_head ? old_head->data : std::shared_ptr<T>();
    }
};

Inną opcją jest zastosowanie mutexów lub użycie zmiennej atomic<bool> w trybie memory_order_seq_cst jak w poniższym przykładzie

template<typename T, int size = 1>
struct mpWriter
{
    void push( const T& element )
    {
        while(mIsPushing.load(std::memory_order_seq_cst));
        mIsPushing.store(true,std::memory_order_seq_cst);
        ringBuffer[writePosition++%ringBufferSize] = element;
        mIsPushing.store(false,std::memory_order_seq_cst);
    }

    static constexpr int ringBufferSize = size + 1;
    T ringBuffer[ringBufferSize];
    int writePosition {0};
    std::atomic<bool> mIsPushing {false};
};

To wymaga jeszcze gruntownego przetestowania, bo nawet jeśli u mnie działa to bez zarzutów, to nie oznacza to, że dany kod nie posiada błędów.
Tak to już jest, że aplikacja wykorzystująca wątki może działać poprawnie w trybie debug, a w release już się wywalać.

2

Ja rozumiem, że wątki są teraz modne, ale ich opanowanie jest bardzo trudne.
Największa trudność polega na tym, że nieprawidłowy kod zawierający błędy wielowątkowości w 99% przypadków będzie wydawał się działać poprawnie.
Losowe wpisanie std::atomic nie czyni kodu poprawnym wielowątkowo.

Sam wiem jak duże braki mam w tym temacie. Wiem na tyle, by nie wciskać wątków na siłę do swojego kodu, a jak już muszę to z wielką ostrożnością.

Moja wersja bez gwarancji poprawności:

template <typename T, size_t SIZE>
class RingBuffer {
public:

    template<typename U>
    void push(U &&x) {
        std::unique_lock<std::mutex> locker{m};
        notFull.wait(locker, [this]() { return queue.size() < SIZE; });
        queue.push(std::forward<T>(x));
        notEmpty.notify_one();
    }

    T take_next() {
        std::unique_lock<std::mutex> locker{m};
        notEmpty.wait(locker, [this]() { return queue.size() > 0; });
        T value = queue.front();
        queue.pop();
        notFull.notify_one();
        return value;
    }
    
    size_t size() const {
        std::lock_guard<std::mutex> locker{m};
        return queue.size();
    }

private:
    std::condition_variable notEmpty;
    std::condition_variable notFull;
    mutable std::mutex m;
    std::queue<T> queue;
};

https://wandbox.org/permlink/LTUmfUZ7As75N1eT

1

@TomaszLiMoon: Twoja implementacja CAS jest nieprawidłowa:

while(mIsPushing.load(std::memory_order_seq_cst));
// aktualny wątek może zostać w tym miejscu wywłaszczony i wznowiony za moment, gdy `mIsPushing` zostanie np. przez inny wątek ustawione znów na `true`
mIsPushing.store(true,std::memory_order_seq_cst);

1 użytkowników online, w tym zalogowanych: 0, gości: 1