commit c942e1ebf0d8f7e942851149d6ef775c330a5a9d
parent 70ae04636f723a5e639f750d5e367841f3c854ef
Author: fundamental <mark.d.mccurry@gmail.com>
Date: Mon, 2 Nov 2015 13:45:22 -0500
Middleware: Add Thread Agnostic OSC Handler
https://sourceforge.net/p/zynaddsubfx/feature-requests/57/
Diffstat:
7 files changed, 224 insertions(+), 1 deletion(-)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
@@ -363,6 +363,7 @@ add_library(zynaddsubfx_core STATIC
globals.cpp
../tlsf/tlsf.c
Containers/NotePool.cpp
+ Containers/MultiPseudoStack.cpp
${zynaddsubfx_dsp_SRCS}
${zynaddsubfx_effect_SRCS}
${zynaddsubfx_misc_SRCS}
diff --git a/src/Containers/MultiPseudoStack.cpp b/src/Containers/MultiPseudoStack.cpp
@@ -0,0 +1,47 @@
+#include "MultiPseudoStack.h"
+#include <cassert>
+
+LockFreeStack::LockFreeStack(void)
+{
+ head.next = &head;
+}
+//Aquire node R := head
+//Update head := next(R)
+iql_t *LockFreeStack::read(void) {
+retry:
+ iql_t *R = head.next.load();
+ if(R == &head)
+ return 0;
+ iql_t *N = R->next.load();
+
+ //Stack H-R-N-...-H
+ //when head.next is still the old next 'r'
+ //set head.next to r->next
+ if(!head.next.compare_exchange_strong(R, N))
+ goto retry;
+
+ return R;
+}
+
+//Insert Node Q
+void LockFreeStack::write(iql_t *Q) {
+ if(!Q)
+ return;
+retry:
+ iql_t *old_next = head.next.load();
+
+ Q->next = old_next;
+ if(!head.next.compare_exchange_strong(old_next, Q))
+ goto retry;
+}
+
+MultiPseudoStack::MultiPseudoStack(void)
+{
+ //32 instances of 2kBi memory chunks
+ for(int i=0; i<32; ++i) {
+ iql_t *ptr = new iql_t;
+ ptr->size = 2048;
+ ptr->memory = new char[2048];
+ free(ptr);
+ }
+}
diff --git a/src/Containers/MultiPseudoStack.h b/src/Containers/MultiPseudoStack.h
@@ -0,0 +1,48 @@
+#pragma once
+#include <atomic>
+
+struct IntrusiveQueueList
+{
+ typedef IntrusiveQueueList iql_t;
+ IntrusiveQueueList(void)
+ :memory(0), next(0), size(0)
+ {}
+
+ char *memory;
+ std::atomic<iql_t*> next;
+ uint32_t size;
+};
+
+
+static_assert(sizeof(IntrusiveQueueList) <= 3*sizeof(void*),
+ "Atomic Types Must Not Add Overhead To Intrusive Queue");
+
+typedef IntrusiveQueueList iql_t;
+//Many reader many writer
+class LockFreeStack
+{
+ iql_t head;
+ public:
+ LockFreeStack(void);
+ iql_t *read(void);
+ void write(iql_t *Q);
+};
+
+
+/*
+ * Many reader Many writer capiable queue
+ * - lock free
+ * - allocation free (post initialization)
+ */
+class MultiPseudoStack
+{
+ LockFreeStack m_free;
+ LockFreeStack m_msgs;
+
+ public:
+ MultiPseudoStack(void);
+ iql_t *alloc(void) { return m_free.read(); }
+ void free(iql_t *q) { m_free.write(q); }
+ void write(iql_t *q) { m_msgs.write(q); }
+ iql_t *read(void) { return m_msgs.read(); }
+};
diff --git a/src/Misc/MiddleWare.cpp b/src/Misc/MiddleWare.cpp
@@ -22,6 +22,7 @@
#include "Master.h"
#include "Part.h"
#include "PresetExtractor.h"
+#include "../Containers/MultiPseudoStack.h"
#include "../Params/PresetsStore.h"
#include "../Params/ADnoteParameters.h"
#include "../Params/SUBnoteParameters.h"
@@ -578,6 +579,10 @@ public:
const char *rtmsg = bToU->read();
bToUhandle(rtmsg);
}
+ while(iql_t *m = multi_thread_source.read()) {
+ handleMsg(m->memory);
+ multi_thread_source.free(m);
+ }
}
@@ -642,6 +647,9 @@ public:
rtosc::ThreadLink *bToU;
rtosc::ThreadLink *uToB;
+ //Link to the unknown
+ MultiPseudoStack multi_thread_source;
+
//LIBLO
lo_server server;
string last_url, curr_url;
@@ -701,7 +709,7 @@ class MwDataObj:public rtosc::RtData
};
//virtual void broadcast(const char *path, const char *args, ...){(void)path;(void)args;};
//virtual void broadcast(const char *msg){(void)msg;};
-
+
virtual void chain(const char *msg) override
{
assert(msg);
@@ -1443,6 +1451,23 @@ void MiddleWare::transmitMsg_va(const char *path, const char *args, va_list va)
fprintf(stderr, "Error in transmitMsg(va)n");
}
+void MiddleWare::messageAnywhere(const char *path, const char *args, ...)
+{
+ auto *mem = impl->multi_thread_source.alloc();
+ if(!mem)
+ fprintf(stderr, "Middleware::messageAnywhere memory pool out of memory...\n");
+
+ va_list va;
+ va_start(va,args);
+ if(rtosc_vmessage(mem->memory,mem->size,path,args,va))
+ impl->multi_thread_source.write(mem);
+ else {
+ fprintf(stderr, "Middleware::messageAnywhere message too big...\n");
+ impl->multi_thread_source.free(mem);
+ }
+}
+
+
void MiddleWare::pendingSetBank(int bank)
{
impl->bToU->write("/setbank", "c", bank);
diff --git a/src/Misc/MiddleWare.h b/src/Misc/MiddleWare.h
@@ -34,6 +34,9 @@ class MiddleWare
//Handle a rtosc Message uToB
void transmitMsg_va(const char *, const char *args, va_list va);
+ //Send a message to middleware from an arbitrary thread
+ void messageAnywhere(const char *msg, const char *args, ...);
+
//Indicate that a bank will be loaded
//NOTE: Can only be called by realtime thread
void pendingSetBank(int bank);
diff --git a/src/Tests/CMakeLists.txt b/src/Tests/CMakeLists.txt
@@ -16,6 +16,7 @@ CXXTEST_ADD_TEST(PluginTest PluginTest.cpp ${CMAKE_CURRENT_SOURCE_DIR}/PluginTes
CXXTEST_ADD_TEST(MiddlewareTest MiddlewareTest.cpp ${CMAKE_CURRENT_SOURCE_DIR}/MiddlewareTest.h)
CXXTEST_ADD_TEST(MessageTest MessageTest.cpp ${CMAKE_CURRENT_SOURCE_DIR}/MessageTest.h)
CXXTEST_ADD_TEST(UnisonTest UnisonTest.cpp ${CMAKE_CURRENT_SOURCE_DIR}/UnisonTest.h)
+CXXTEST_ADD_TEST(MqTest MqTest.cpp ${CMAKE_CURRENT_SOURCE_DIR}/MqTest.h)
#CXXTEST_ADD_TEST(RtAllocTest RtAllocTest.cpp ${CMAKE_CURRENT_SOURCE_DIR}/RtAllocTest.h)
CXXTEST_ADD_TEST(AllocatorTest AllocatorTest.cpp
${CMAKE_CURRENT_SOURCE_DIR}/AllocatorTest.h)
@@ -36,6 +37,7 @@ target_link_libraries(OscilGenTest ${test_lib})
target_link_libraries(XMLwrapperTest ${test_lib})
target_link_libraries(RandTest ${test_lib})
target_link_libraries(PADnoteTest ${test_lib})
+target_link_libraries(MqTest ${test_lib})
target_link_libraries(PluginTest zynaddsubfx_core zynaddsubfx_nio
zynaddsubfx_gui_bridge
${GUI_LIBRARIES} ${NIO_LIBRARIES} ${AUDIO_LIBRARIES})
diff --git a/src/Tests/MqTest.h b/src/Tests/MqTest.h
@@ -0,0 +1,97 @@
+/*
+ ZynAddSubFX - a software synthesizer
+
+ PluginTest.h - CxxTest for embedding zyn
+ Copyright (C) 2013-2013 Mark McCurry
+ Authors: Mark McCurry
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of version 2 of the GNU General Public License
+ as published by the Free Software Foundation.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License (version 2 or later) for more details.
+
+ You should have received a copy of the GNU General Public License (version 2)
+ along with this program; if not, write to the Free Software Foundation,
+ Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+*/
+#include <cxxtest/TestSuite.h>
+#include <cmath>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+#include <string>
+#include <thread>
+#include <rtosc/thread-link.h>
+#include <unistd.h>
+#include "../Containers/MultiPseudoStack.h"
+using namespace std;
+
+char *instance_name=(char*)"";
+
+class MessageTest:public CxxTest::TestSuite
+{
+ public:
+ MultiPseudoStack *s;
+ void setUp() {
+ s = new MultiPseudoStack;
+ }
+
+ void tearDown() {
+ delete s;
+ }
+
+ void testBasic(void)
+ {
+ auto *mem = s->alloc();
+ TS_ASSERT(mem);
+ TS_ASSERT(mem->memory);
+ TS_ASSERT(!s->read());
+ s->write(mem);
+ auto *mem2 = s->read();
+ TS_ASSERT_EQUALS(mem, mem2);
+ s->free(mem2);
+ }
+
+#define OPS 10000
+ void testThreads(void)
+ {
+ std::thread *t[32];
+ for(int i=0; i<32; ++i) {
+ t[i] = new std::thread([this,i](){
+ int op=0;
+ while(op<OPS) {
+ int read = rand()%2;
+ if(read) {
+ auto *mem = s->read();
+ if(mem) {
+ //printf("r%d",i%10);
+ //printf("got: <%s>\n", mem->memory);
+ op++;
+ }
+ s->free(mem);
+ } else {
+ auto *mem = s->alloc();
+ if(mem) {
+ sprintf(mem->memory,"written by %d@op%d", i,op);
+ //printf("w%d",i%10);
+ op++;
+ }
+ s->write(mem);
+ }
+ }
+ });
+ }
+
+ printf("thread started...\n");
+ for(int i=0; i<32; ++i) {
+ t[i]->join();
+ delete t[i];
+ }
+ printf("thread stopped...\n");
+ }
+};