From f3f3f15d90ae9c0a8fec183efdfd0dcb8bd054eb Mon Sep 17 00:00:00 2001 From: Winfried Ritsch Date: Tue, 3 May 2005 07:18:08 +0000 Subject: This commit was generated by cvs2svn to compensate for changes in r2884, which included commits to RCS files with non-trunk default branches. svn path=/trunk/externals/iem/iemstream/; revision=2885 --- stream.cpp | 557 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 557 insertions(+) create mode 100644 stream.cpp (limited to 'stream.cpp') diff --git a/stream.cpp b/stream.cpp new file mode 100644 index 0000000..e684443 --- /dev/null +++ b/stream.cpp @@ -0,0 +1,557 @@ +/************************************************************* + * + * streaming external for PD + * + * File: stream.cpp + * + * Description: Implementation of the streamer class + * + * Author: Thomas Grill (t.grill@gmx.net) + * + *************************************************************/ + +#ifdef _WIN32 + #include +#else + #include + #include + #define Sleep(ms) usleep((ms)*1000) +#endif + +#include "stream.h" + + +// default size of encoded data fifo +#define ENCSIZE 10000 +// default size for encoding chunk +#define ENCCHUNK 500 +// default ratio for fifo filling +#define ENCTHRESH 0.95f + +// additional buffer frames for resampling algorithm +#define DECMORE 100 + + +// relative thread priority (-2...0) +#define THRPRIOR -1 + +// default time grain to wait on error (ms) +#define WAITGRAIN 100 +// default time until reconnecting (ms) +#define WAITRECONNECT 3000 + + +// explicit definition of report functions +extern "C" { + extern void post(char *fmt, ...); + extern void error(char *fmt, ...); +} + +Stream::Stream(): + encoded(ENCSIZE),encchunk(ENCCHUNK),encthresh(ENCTHRESH), + waitgrain(WAITGRAIN),waitreconnect(WAITRECONNECT), //waitthread(WAITTHREAD), + file(-1), + exit(false),state(ST_IDLE),debug(false), + bufch(0),bufs(NULL),decoded(NULL), + src_channels(0),src_factor(1),src_state(NULL) +{ + pthread_mutex_init(&mutex,NULL); + pthread_cond_init(&cond,NULL); + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE); + pthread_create(&thrid,&attr,thr_func,this); + pthread_attr_destroy(&attr); +} + +Stream::~Stream() +{ + // cause thread to exit + exit = true; + pthread_cond_signal(&cond); + if(pthread_join(thrid,NULL) != 0) post("join failed"); + + pthread_cond_destroy(&cond); + pthread_mutex_destroy(&mutex); + + Reset(); + + if(bufs) delete[] bufs; + if(decoded) delete[] decoded; + if(src_state) { + for(int i = 0; i < bufch; ++i) src_delete(src_state[i]); + delete[] src_state; + } +} + +void Stream::Reset() +{ + if(file >= 0) { Disconnect(file); file = -1; } + encoded.Clear(); + + if(src_state) { + for(int i = 0; i < bufch; ++i) { + src_reset(src_state[i]); + decoded[i].Clear(); + } + } +} + +void Stream::ResetHost() +{ + hostname = mountpt = ""; port = -1; +} + +bool Stream::doInit(const char *url) +{ + if(isInitializing()) { + // this is not completely clean (should be within the lock) but otherwise the + // caller would have to wait for the thread lock + + post("Still initializing %s/%s:%i",hostname.c_str(),mountpt.c_str(),port); + return false; + } + + bool ok = true; + + pthread_mutex_lock(&mutex); + + // close open file + Reset(); + + // try to set host name, mount point, port number + if(ok) { + char *err = "Invalid URL"; + try { ok = SetURL(url); } + catch(char *tx) { err = tx; ok = false; } + catch(...) { ok = false; } + + if(!ok) { post(err); ResetHost(); } + } + + if(ok) { + state = ST_INIT; + pthread_cond_signal(&cond); + } + + pthread_mutex_unlock(&mutex); + + // let the thread worker do the rest + + return ok; +} + +bool Stream::doExit() +{ + pthread_mutex_lock(&mutex); + + Reset(); + ResetHost(); + state = ST_IDLE; + + pthread_mutex_unlock(&mutex); + + return true; +} + +/*! Get sample frames + \param buf pointer array to channel buffers + \param frames number of maximum frames to get + \return frames put into buffers +*/ +int Stream::doGet(int ch,float *const *buf,int frames,float sr) +{ + ASSERT(ch > 0 && frames >= 0 && sr > 0); + + if(isOk() && !isInitializing()) { + // signal thread worker + pthread_cond_signal(&cond); + + // check/(re)allocate buffers + + int strch = getChannels(); + if(bufs && bufch < strch) { + delete[] decoded; + for(int i = 0; i < bufch; ++i) src_delete(src_state[i]); + delete[] src_state; + delete[] bufs; bufs = NULL; + } + + if(!bufs) { + if(bufch < strch) bufch = strch; + bufs = new float *[bufch]; + decoded = new Fifo[bufch]; + + src_state = new SRC_STATE *[bufch]; + for(int i = 0; i < bufch; ++i) { + int error; + src_state[i] = src_new(SRC_ZERO_ORDER_HOLD,1,&error); + if(!src_state[i]) post("src init error %i",error); + } + } + + // get frames + + float ratio = sr/getSamplerate(); + int frneed = (int)(frames/ratio)+DECMORE; // number of frames to read from decoder fifo + + if(decoded[0].Size() < frneed) { + // fifos are too small -> resize them (while keeping their contents) + for(int i = 0; i < bufch; ++i) decoded[i].Resize(frneed,true); + } + + // how many frames do we need to get from decoder? + int frread = frneed-decoded[0].Have(); + + int ret = state == ST_WAIT?0:DataRead(frread); + + if(ret < 0) { + if(debug) post("read error"); + // clear output + for(int c = 0; c < ch; ++c) + memset(buf[c],0,frames*sizeof *buf[c]); + return 0; + } + else { + // how many channels do we really need for output? + // this should be set elsewhere, because we can't change anyway!!! + // (SRC_STATE for dangling channels would be incorrect) + int cmin = strch; + if(ch < cmin) cmin = ch; + + // write data to fifo + for(int i = 0; i < cmin; ++i) { + int wr = decoded[i].Write(ret,bufs[i]); + if(wr < ret) post("fifo overflow"); + } + +// state = ST_PROCESS; + + if(ratio == 1) { + // no resampling necessary + + // hopefully all channel fifos advance uniformly..... + for(int i = 0; i < cmin; ++i) { + + for(int got = 0; got < frames; ) { + int cnt = frames-got; + + if(decoded[i].Have()) { + got += decoded[i].Read(cnt,buf[i]+got); + } + else { + state = ST_WAIT; + if(debug) post("fifo underrun"); + + // Buffer underrun!! -> zero output buffer + memset(buf[i]+got,0,cnt*sizeof(*buf[i])); + got += cnt; + } + } + } + } + else + { + SRC_DATA src_data; + src_data.src_ratio = ratio; + src_data.end_of_input = 0; + + // hopefully all channel fifos advance uniformly..... + for(int i = 0; i < cmin; ++i) { + src_set_ratio(src_state[i],ratio); + + for(int got = 0; got < frames; ) { + src_data.data_out = buf[i]+got; + src_data.output_frames = frames-got; + + if(decoded[i].Have()) { + src_data.data_in = decoded[i].ReadPtr(); + src_data.input_frames = decoded[i].ReadSamples(); + + int err = src_process(src_state[i],&src_data); + if(err) post("src_process error %i",err); + + // advance buffer + decoded[i].Read(src_data.input_frames_used,NULL); + } + else { + state = ST_WAIT; + if(debug) post("fifo underrun"); + + // Buffer underrun!! -> zero output buffer + memset(src_data.data_out,0,src_data.output_frames*sizeof(*src_data.data_out)); + src_data.output_frames_gen = src_data.output_frames; + } + got += src_data.output_frames_gen; + } + } + } + + // zero remaining channels + for(int c = cmin; c < ch; ++c) + memset(buf[c],0,frames*sizeof *buf[c]); + + return ret; + } + } + else { + for(int c = 0; c < ch; ++c) + memset(buf[c],0,frames*sizeof *buf[c]); + return 0; + } +} + +#define MAXZEROES 5 + +/*! + \param chunk amount of data to read + \param unlock unlock mutex +*/ +int Stream::ReadChunk(int chunk,bool unlock) +{ + if(chunk <= 0) return 0; + + bool ok = true; + char tmp[1024]; + int n = 0,errcnt = 0; + while(ok) { + int c = chunk-n; + if(c <= 0) break; // read enough data + if(c > sizeof tmp) c = sizeof tmp; + SOCKET fd = file; + + if(unlock) pthread_mutex_unlock(&mutex); + + int ret = Read(fd, tmp, c); + + if(unlock) pthread_mutex_lock(&mutex); + + if(ret < 0 || (!ret && ++errcnt == MAXZEROES)) { + if(debug) post("Receive error"); + ok = false; + } + else if(ret > 0) { + if(debug) post("read %i bytes",ret); + errcnt = 0; + encoded.Write(ret,tmp); + n += ret; + } + } + return n; +} + +/*! + \param buf data buffer + \param chunk amount of data to read + \param unlock unlock mutex +*/ +int Stream::ReadChunk(char *buf,int chunk,bool unlock) +{ + if(chunk <= 0) return 0; + + bool ok = true; + int n = 0,errcnt = 0; + while(ok) { + int c = chunk-n; + if(c <= 0) break; // read enough data + SOCKET fd = file; + + if(unlock) pthread_mutex_unlock(&mutex); + + int ret = Read(fd, buf+n, c); + + if(unlock) pthread_mutex_lock(&mutex); + + if(ret < 0 || (!ret && ++errcnt == MAXZEROES)) { + if(debug) post("Receive error"); + ok = false; + } + else if(ret > 0) { + if(debug) post("read %i bytes",ret); + errcnt = 0; + n += ret; + } + } + return n; +} + +#define MAXINITTRIES 5 + +/*! static pthreads thread function */ +void *Stream::thr_func(void *th) +{ + ((Stream *)th)->Work(); + return NULL; +} + +/*! Thread worker - fill the fifo with socket data */ +void Stream::Work() +{ + int waittime = 0; + + // lower thread priority + { + struct sched_param parm; + int policy; + if(pthread_getschedparam(pthread_self(),&policy,&parm) >= 0) { + int minprio = sched_get_priority_min(policy); + + if(debug) post("priority was %i (min = %i)",parm.sched_priority,minprio); + + parm.sched_priority += THRPRIOR; + + if(parm.sched_priority < minprio) parm.sched_priority = minprio; + pthread_setschedparam(pthread_self(),policy,&parm); + } + + if(pthread_getschedparam(pthread_self(),&policy,&parm) >= 0) { + if(debug) post("priority set to %i",parm.sched_priority); + } + } + + while(!exit) { + pthread_mutex_lock(&mutex); + + bool wait = true; + + if(!hostname.length() || !mountpt.length() || port < 0) {} + else + if(state == ST_INIT || state == ST_RECONNECT) { + // initialize! + + bool ok = true; + + try { + file = Connect( hostname.c_str(),mountpt.c_str(),port); + } + catch(char *str) { + if(state != ST_RECONNECT) post(str); + ok = false; + } + catch(...) { + post("Unknown error while connecting"); + ok = false; + } + + // initialize decoder + if(ok) ok = WorkInit(); + + // try to fill buffer + if(ok) { + int i,lim = (int)(encoded.Size()*encthresh); + for(i = MAXINITTRIES; i > 0 && encoded.Have() < lim; ) { + int n = ReadChunk(encoded.Free(),true); + if(!n) --i; + } + if(!i) ok = false; + } + + if(!ok) { + Reset(); + + if(state == ST_INIT) state = ST_IDLE; + // if reconnecting keep on doing that... + } + else { + state = ST_PROCESS; + waittime = 0; + } + } + else if(isOk()) { + SOCKET fd = file; + int chunk = encoded.Free(); + if(chunk > encchunk) chunk = encchunk; + + if(chunk) { + int n = ReadChunk(chunk,true); + + if(n == 0) { + if(debug) post("error receiving data"); + state = ST_WAIT; + } + else + // reset error state + state = ST_PROCESS; + } + + if(encoded.Have() < encoded.Size()*encthresh) + // immediately get the next chunk + wait = false; + } + + if(debug && encoded.Free()) { + post("fifo: sz/fill = %5i/%3.0f%%",encoded.Size(),(float)encoded.Have()/encoded.Size()*100); + } + + if(state == ST_WAIT) { + if(debug) post("Wait for data"); + Sleep(waitgrain); + waittime += waitgrain; + if(waittime > waitreconnect) { + if(debug) post("do reconnect"); + state = ST_RECONNECT; + } + wait = false; + } + else if(state == ST_RECONNECT) { + if(debug) post("Reconnecting again"); + Sleep(waitgrain); + wait = false; + } + + + if(wait) pthread_cond_wait(&cond,&mutex); + + pthread_mutex_unlock(&mutex); + } + + state = ST_FINISHED; +} + +bool Stream::SetURL(const char *url) +{ + char *p = (char *)url; + + // strip prefixes + if(!strncmp(p, "http://", 7)) p += 7; + if(!strncmp(p, "ftp://", 6)) p += 6; + + char *hostptr = p; // points to host name + + char *pathptr = strchr(hostptr,'/'); + if(pathptr) + // / found -> skip / + ++pathptr; + else + // no / found!! ILLEGAL + throw "URL path not found"; + + // get port number + int portno; + char *portptr = strchr(hostptr,':'); + if(portptr && portptr < pathptr) { + portptr++; + int sl = (int)(pathptr-portptr-1); + char *p0 = new char[sl+1]; + ASSERT(p0); + strncpy(p0,portptr,sl); + p0[sl] = 0; + + for(p = p0; *p && isdigit(*p); p++) ; + *p = 0; + + // convert port from string to int + portno = (int)strtol(p0, NULL, 10); + delete[] p0; + } + else + portno = 8000; + + // assign found things to function parameters + hostname = std::string(hostptr,(portptr?portptr:pathptr)-1-hostptr); + mountpt = pathptr; + port = portno; + + return true; +} -- cgit v1.2.1