commit 13b0f975482722ac8802755349359bf2f64b6610
parent bf04e520c6142471ffe4e8f740ec007a007a245f
Author: fundamental <mark.d.mccurry@gmail.com>
Date: Tue, 11 May 2010 08:46:00 -0400
Nio: Thread Safety
Converting SafeQueue to use semaphores
Diffstat:
3 files changed, 42 insertions(+), 33 deletions(-)
diff --git a/src/Nio/SafeQueue.cpp b/src/Nio/SafeQueue.cpp
@@ -3,12 +3,16 @@ template<class T>
SafeQueue<T>::SafeQueue(size_t maxlen)
:writePtr(0),readPtr(0),bufSize(maxlen)
{
+ sem_init(&w_space, PTHREAD_PROCESS_PRIVATE, maxlen-1);
+ sem_init(&r_space, PTHREAD_PROCESS_PRIVATE, 0);
buffer = new T[maxlen];
}
template<class T>
SafeQueue<T>::~SafeQueue()
{
+ sem_destroy(&w_space);
+ sem_destroy(&r_space);
delete[] buffer;
}
@@ -21,33 +25,17 @@ unsigned int SafeQueue<T>::size() const
template<class T>
unsigned int SafeQueue<T>::rSpace() const
{
- size_t w, r;
-
- w = writePtr;
- r = readPtr;
-
- if (w > r) {
- return w - r;
- }
- else {
- return (w - r + bufSize) % bufSize;
- }
+ int space = 0;
+ sem_getvalue(&r_space, &space);
+ return space;
}
template<class T>
unsigned int SafeQueue<T>::wSpace() const
{
- size_t w, r;
-
- w = writePtr;
- r = readPtr - 1;
-
- if (r > w) {
- return r - w;
- }
- else {
- return (r - w + bufSize) % bufSize;
- }
+ int space = 0;
+ sem_getvalue(&w_space, &space);
+ return space;
}
template<class T>
@@ -60,6 +48,10 @@ int SafeQueue<T>::push(const T &in)
size_t w = (writePtr + 1) % bufSize;
buffer[w] = in;
writePtr = w;
+
+ //adjust ranges
+ sem_wait(&w_space);//guaranteed not to wait
+ sem_post(&r_space);
return 0;
}
@@ -73,11 +65,18 @@ int SafeQueue<T>::pop(T &out)
size_t r = (readPtr + 1) % bufSize;
out = buffer[r];
readPtr = r;
+
+ //adjust ranges
+ sem_wait(&r_space);//guaranteed not to wait
+ sem_post(&w_space);
return 0;
}
template<class T>
void SafeQueue<T>::clear()
{
+ //thread unsafe
+ while(!sem_trywait(&r_space))
+ sem_post(&w_space);
readPtr = writePtr;
}
diff --git a/src/Nio/SafeQueue.h b/src/Nio/SafeQueue.h
@@ -2,6 +2,7 @@
#ifndef SAFEQUEUE_H
#define SAFEQUEUE_H
#include <cstdlib>
+#include <semaphore.h>
/**
* C++ thread safe lockless queue
@@ -28,10 +29,15 @@ class SafeQueue
unsigned int wSpace() const;
unsigned int rSpace() const;
- //next writting spot
- volatile size_t writePtr;
+ //write space
+ mutable sem_t w_space;
+ //read space
+ mutable sem_t r_space;
+
+ //next writing spot
+ size_t writePtr;
//next reading spot
- volatile size_t readPtr;
+ size_t readPtr;
const size_t bufSize;
T *buffer;
};
diff --git a/src/Nio/WavEngine.cpp b/src/Nio/WavEngine.cpp
@@ -80,8 +80,8 @@ void WavEngine::push(Stereo<REALTYPE *> smps, size_t len)
for(size_t i = 0; i < len; ++i) {
buffer.push(*smps.l()++);
buffer.push(*smps.r()++);
- sem_post(&work);
}
+ sem_post(&work);
}
void WavEngine::newFile(WavFile *_file)
@@ -104,17 +104,21 @@ void *WavEngine::_AudioThread(void *arg)
void *WavEngine::AudioThread()
{
- short int recordbuf_16bit[2];
+ short *recordbuf_16bit = new short[2*SOUND_BUFFER_SIZE];
while(!sem_wait(&work) && pThread)
{
- float left=0.0f, right=0.0f;
- buffer.pop(left);
- buffer.pop(right);
- recordbuf_16bit[0] = limit((int)(left * 32767.0), -32768, 32767);
- recordbuf_16bit[1] = limit((int)(right * 32767.0), -32768, 32767);
- file->writeStereoSamples(1, recordbuf_16bit);
+ for(int i = 0; i < SOUND_BUFFER_SIZE; ++i) {
+ float left=0.0f, right=0.0f;
+ buffer.pop(left);
+ buffer.pop(right);
+ recordbuf_16bit[2*i] = limit((int)(left * 32767.0), -32768, 32767);
+ recordbuf_16bit[2*i+1] = limit((int)(right * 32767.0), -32768, 32767);
+ file->writeStereoSamples(SOUND_BUFFER_SIZE, recordbuf_16bit);
+ }
}
+
+ delete[] recordbuf_16bit;
pthread_exit(NULL);
}