commit b6e33b8f8296b42f8243ad0ff9ee6b4a12627b75
parent 49cb585d7e5802be9df8db13fbb7f21cc542e0d1
Author: fundamental <mark.d.mccurry@gmail.com>
Date: Tue, 3 Nov 2015 21:07:22 -0500
Fixup Multi Threaded Queue Implementation
Diffstat:
4 files changed, 153 insertions(+), 63 deletions(-)
diff --git a/src/Containers/MultiPseudoStack.cpp b/src/Containers/MultiPseudoStack.cpp
@@ -1,47 +1,100 @@
#include "MultiPseudoStack.h"
#include <cassert>
-
-LockFreeStack::LockFreeStack(void)
+#include <cstdio>
+
+#define INVALID ((int32_t)0xffffffff)
+#define MAX ((int32_t)0x7fffffff)
+QueueListItem::QueueListItem(void)
+ :memory(0), size(0)
+{
+}
+
+LockFreeQueue::LockFreeQueue(qli_t *data_, int n)
+ :data(data_), elms(n), next_r(0), next_w(0), avail(0)
{
- head.next = &head;
+ tag = new std::atomic<uint32_t>[n];
+ for(int i=0; i<n; ++i)
+ tag[i] = INVALID;
}
-//Aquire node R := head
-//Update head := next(R)
-iql_t *LockFreeStack::read(void) {
+
+
+qli_t *LockFreeQueue::read(void) {
retry:
- iql_t *R = head.next.load();
- if(R == &head)
+ int8_t free_elms = avail.load();
+ if(free_elms <= 0)
return 0;
- iql_t *N = R->next.load();
- //Stack H-R-N-...-H
- //when head.next is still the old next 'r'
- //set head.next to r->next
- if(!head.next.compare_exchange_strong(R, N))
- goto retry;
+ int32_t next_tag = next_r.load();
+ int32_t next_next_tag = (next_tag+1)&MAX;
+
+ assert(next_tag != INVALID);
+
+ for(int i=0; i<elms; ++i) {
+ uint32_t elm_tag = tag[i].load();
+
+ //attempt to remove tagged element
+ //if and only if it's next
+ if(((uint32_t)next_tag) == elm_tag) {
+ if(!tag[i].compare_exchange_strong(elm_tag, INVALID))
+ goto retry;
+
+ //Ok, now there is no element that can be removed from the list
+ //Effectively there's mutual exclusion over other readers here
+
+ //Set the next element
+ int sane_read = next_r.compare_exchange_strong(next_tag, next_next_tag);
+ assert(sane_read && "No double read on a single tag");
- return R;
+ //Decrement available elements
+ int32_t free_elms_next = avail.load();
+ while(!avail.compare_exchange_strong(free_elms_next, free_elms_next-1));
+
+ //printf("r%d ", free_elms_next-1);
+
+ return &data[i];
+ }
+ }
+ goto retry;
}
//Insert Node Q
-void LockFreeStack::write(iql_t *Q) {
+void LockFreeQueue::write(qli_t *Q) {
+retry:
if(!Q)
return;
-retry:
- iql_t *old_next = head.next.load();
-
- Q->next = old_next;
- if(!head.next.compare_exchange_strong(old_next, Q))
+ int32_t write_tag = next_w.load();
+ int32_t next_write_tag = (write_tag+1)&MAX;
+ if(!next_w.compare_exchange_strong(write_tag, next_write_tag))
goto retry;
+
+ uint32_t invalid_tag = INVALID;
+
+ //Update tag
+ int sane_write = tag[Q-data].compare_exchange_strong(invalid_tag, write_tag);
+ assert(sane_write);
+
+ //Increment available elements
+ int32_t free_elms = avail.load();
+ while(!avail.compare_exchange_strong(free_elms, free_elms+1))
+ assert(free_elms <= 32);
+ //printf("w%d ", free_elms+1);
}
-MultiPseudoStack::MultiPseudoStack(void)
+MultiQueue::MultiQueue(void)
+ :pool(new qli_t[32]), m_free(pool, 32), m_msgs(pool, 32)
{
//32 instances of 2kBi memory chunks
for(int i=0; i<32; ++i) {
- iql_t *ptr = new iql_t;
- ptr->size = 2048;
- ptr->memory = new char[2048];
- free(ptr);
+ qli_t &ptr = pool[i];
+ ptr.size = 2048;
+ ptr.memory = new char[2048];
+ free(&ptr);
}
}
+
+MultiQueue::~MultiQueue(void)
+{
+ for(int i=0; i<32; ++i)
+ delete [] pool[i].memory;
+ delete [] pool;
+}
diff --git a/src/Containers/MultiPseudoStack.h b/src/Containers/MultiPseudoStack.h
@@ -1,31 +1,30 @@
#pragma once
#include <atomic>
+#include <cassert>
-struct IntrusiveQueueList
+//XXX rename this thing
+typedef struct QueueListItem qli_t;
+struct QueueListItem
{
- typedef IntrusiveQueueList iql_t;
- IntrusiveQueueList(void)
- :memory(0), next(0), size(0)
- {}
-
- char *memory;
- std::atomic<iql_t*> next;
- uint32_t size;
+ QueueListItem(void);
+ char *memory;
+ uint32_t size;
};
-static_assert(sizeof(IntrusiveQueueList) <= 3*sizeof(void*),
- "Atomic Types Must Not Add Overhead To Intrusive Queue");
-
-typedef IntrusiveQueueList iql_t;
//Many reader many writer
-class LockFreeStack
+class LockFreeQueue
{
- iql_t head;
+ qli_t *const data;
+ const int elms;
+ std::atomic<uint32_t> *tag;
+ std::atomic<int32_t> next_r;
+ std::atomic<int32_t> next_w;
+ std::atomic<int32_t> avail;
public:
- LockFreeStack(void);
- iql_t *read(void);
- void write(iql_t *Q);
+ LockFreeQueue(qli_t *data_, int n);
+ qli_t *read(void);
+ void write(qli_t *Q);
};
@@ -34,15 +33,18 @@ class LockFreeStack
* - lock free
* - allocation free (post initialization)
*/
-class MultiPseudoStack
+class MultiQueue
{
- LockFreeStack m_free;
- LockFreeStack m_msgs;
+ qli_t *pool;
+ LockFreeQueue m_free;
+ LockFreeQueue m_msgs;
public:
- MultiPseudoStack(void);
- iql_t *alloc(void) { return m_free.read(); }
- void free(iql_t *q) { m_free.write(q); }
- void write(iql_t *q) { m_msgs.write(q); }
- iql_t *read(void) { return m_msgs.read(); }
+ MultiQueue(void);
+ ~MultiQueue(void);
+ void dump(void);
+ qli_t *alloc(void) { return m_free.read(); }
+ void free(qli_t *q) { m_free.write(q); }
+ void write(qli_t *q) { m_msgs.write(q); }
+ qli_t *read(void) { return m_msgs.read(); }
};
diff --git a/src/Misc/MiddleWare.cpp b/src/Misc/MiddleWare.cpp
@@ -579,7 +579,7 @@ public:
const char *rtmsg = bToU->read();
bToUhandle(rtmsg);
}
- while(iql_t *m = multi_thread_source.read()) {
+ while(auto *m = multi_thread_source.read()) {
handleMsg(m->memory);
multi_thread_source.free(m);
}
@@ -648,7 +648,7 @@ public:
rtosc::ThreadLink *uToB;
//Link to the unknown
- MultiPseudoStack multi_thread_source;
+ MultiQueue multi_thread_source;
//LIBLO
lo_server server;
diff --git a/src/Tests/MqTest.h b/src/Tests/MqTest.h
@@ -36,9 +36,9 @@ char *instance_name=(char*)"";
class MessageTest:public CxxTest::TestSuite
{
public:
- MultiPseudoStack *s;
+ MultiQueue *s;
void setUp() {
- s = new MultiPseudoStack;
+ s = new MultiQueue;
}
void tearDown() {
@@ -56,13 +56,29 @@ class MessageTest:public CxxTest::TestSuite
TS_ASSERT_EQUALS(mem, mem2);
s->free(mem2);
}
+ void testMed(void)
+ {
+ for(int i=0; i<100; ++i) {
+ auto *mem = s->alloc();
+ TS_ASSERT(mem);
+ TS_ASSERT(mem->memory);
+ TS_ASSERT(!s->read());
+ s->write(mem);
+ auto *mem2 = s->read();
+ TS_ASSERT_EQUALS(mem, mem2);
+ s->free(mem2);
+ }
+ }
-#define OPS 10000
+#define OPS 1000
+#define THREADS 8
void testThreads(void)
{
- std::thread *t[32];
- for(int i=0; i<32; ++i) {
- t[i] = new std::thread([this,i](){
+ uint8_t messages[OPS*THREADS];
+ memset(messages, 0, sizeof(messages));
+ std::thread *t[THREADS];
+ for(int i=0; i<THREADS; ++i) {
+ t[i] = new std::thread([this,i,&messages](){
int op=0;
while(op<OPS) {
int read = rand()%2;
@@ -71,13 +87,13 @@ class MessageTest:public CxxTest::TestSuite
if(mem) {
//printf("r%d",i%10);
//printf("got: <%s>\n", mem->memory);
- op++;
+ messages[atoi(mem->memory)]++;
}
s->free(mem);
} else {
auto *mem = s->alloc();
if(mem) {
- sprintf(mem->memory,"written by %d@op%d", i,op);
+ sprintf(mem->memory,"%d written by %d@op%d", i*OPS+op,i,op);
//printf("w%d",i%10);
op++;
}
@@ -88,10 +104,29 @@ class MessageTest:public CxxTest::TestSuite
}
printf("thread started...\n");
- for(int i=0; i<32; ++i) {
+ for(int i=0; i<THREADS; ++i) {
t[i]->join();
delete t[i];
}
printf("thread stopped...\n");
+ //read the last few
+ while(1) {
+ auto *mem = s->read();
+ if(mem) {
+ printf("got: <%s>\n", mem->memory);
+ messages[atoi(mem->memory)]++;
+ } else
+ break;
+ s->free(mem);
+ }
+
+ int good = 1;
+ for(int i=0; i<OPS*THREADS; ++i) {
+ if(messages[i] != 1) {
+ assert(false);
+ good = 0;
+ }
+ }
+ TS_ASSERT(good);
}
};