From fc3d3c0a4f110a23335398c327ac0a4fc949d5cb Mon Sep 17 00:00:00 2001 From: Guenter Geiger Date: Mon, 17 Jun 2002 10:13:57 +0000 Subject: This commit was generated by cvs2svn to compensate for changes in r12, which included commits to RCS files with non-trunk default branches. svn path=/trunk/externals/ggee/; revision=13 --- signal/streamin~.c | 404 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 404 insertions(+) create mode 100755 signal/streamin~.c (limited to 'signal/streamin~.c') diff --git a/signal/streamin~.c b/signal/streamin~.c new file mode 100755 index 0000000..54ab283 --- /dev/null +++ b/signal/streamin~.c @@ -0,0 +1,404 @@ + +/* Written by Guenter Geiger (C) 1999 */ + +#include +#include "stream.h" + +#include +#include +#ifdef unix +#include +#include +#include +#include +#include +#include +#include +#include +#define SOCKET_ERROR -1 +#else +#include +#endif + + + +#ifdef NT +#pragma warning( disable : 4244 ) +#pragma warning( disable : 4305 ) +#endif + +#ifdef unix +#define NONBLOCKED +#endif +#define INBUFSIZE 8192 +#define MAXFRAMES 128 +#define MAXFRAMESIZE 256 +#define AVERAGENUM 10 + +/*#define DEBUGMESS(x) x*/ +#define DEBUGMESS(x) + +/* Utility functions */ + +/* TODO !!!! + - check udp support +*/ + + +#ifdef NT +extern int close(int); +extern void sys_rmpollfn(int fd); +extern sys_addpollfn(int fd, void* fn, void *ptr); +#endif + +static void sys_sockerror(char *s) +{ +#ifdef unix + int err = errno; +#else + int err = WSAGetLastError(); + if (err == 10054) return; +#endif + post("%s: %s (%d)\n", s, strerror(err), err); +} + + +static void sys_closesocket(int fd) +{ +#ifdef UNIX + close(fd); +#endif +#ifdef NT + closesocket(fd); +#endif +} + + +int setsocketoptions(int sockfd) +{ +#ifdef unix + int sockopt = 1; + if (setsockopt(sockfd, SOL_TCP, TCP_NODELAY, (const char*) &sockopt, sizeof(int)) < 0) + DEBUGMESS(post("setsockopt NODELAY failed\n")); + else + DEBUGMESS(post("TCP_NODELAY set")); + + /* if we don`t use REUSEADDR we have to wait under unix until the + address gets freed after a close ... this can be very annoying + when working with netsend/netreceive GG + */ + + sockopt = 1; + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof(int)) < 0) + post("setsockopt failed\n"); +#endif + return 0; +} + + + +/* ------------------------ streamin~ ----------------------------- */ + + +static t_class *streamin_class; + + + +typedef struct _streamin +{ + t_object x_obj; + int x_socket; + int x_connectsocket; + int x_nconnections; + int x_ndrops; + int x_fd; + int x_tcp; + + /* buffering */ + + int framein; + int frameout; + t_frame frames[MAXFRAMES]; + int maxframes; + + int nbytes; + int counter; + int average[AVERAGENUM]; + int averagecur; + int underflow; +} t_streamin; + + + + +static void streamin_datapoll(t_streamin *x) +{ + int ret; + int n; + t_tag tag; + int i; + + n = x->nbytes; + if (x->nbytes == 0) { /* get the new tag */ + ret = recv(x->x_socket, (char*) &x->frames[x->framein].tag,sizeof(t_tag),MSG_PEEK); + if (ret != sizeof(t_tag)) return; + ret = recv(x->x_socket, (char*) &x->frames[x->framein].tag,sizeof(t_tag),0); + + x->nbytes = n = x->frames[x->framein].tag.framesize; + } + + ret = recv(x->x_socket, (char*) x->frames[x->framein].data + x->frames[x->framein].tag.framesize - n, n, 0); + if (ret > 0) + n-=ret; + + x->nbytes = n; + if (n == 0) { + x->counter++; + x->framein++; + x->framein %= MAXFRAMES; + } +} + +static void streamin_reset(t_streamin* x,t_floatarg frames) +{ + int i; + x->counter = 0; + x->nbytes = 0; + x->framein = 0; + x->frameout = 0; + for (i=0;iaverage[i] = x->maxframes; + x->averagecur=0; + if (frames == 0.0) + x->maxframes = MAXFRAMES/2; + else + x->maxframes = frames; + x->underflow = 0; +} + + +static void streamin_connectpoll(t_streamin *x) +{ + int fd = accept(x->x_connectsocket, 0, 0); + +#ifdef NONBLOCKED + fcntl(fd,F_SETFL,O_NONBLOCK); +#endif + if (fd < 0) { + post("streamin~: accept failed"); + return; + } + + if (x->x_socket > 0) { + post("streamin~: new connection"); + close(x->x_socket); + sys_rmpollfn(x->x_socket); + } + + streamin_reset(x,0); + x->x_socket = fd; + sys_addpollfn(fd, streamin_datapoll, x); +} + + +static int streamin_createsocket(t_streamin* x, int portno,t_symbol* prot) +{ + struct sockaddr_in server; + int sockfd; + int tcp = x->x_tcp; + + /* create a socket */ + if (!tcp) + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + else + sockfd = socket(AF_INET, SOCK_STREAM, 0); + + if (sockfd < 0) + { + sys_sockerror("socket"); + return (0); + } + server.sin_family = AF_INET; + server.sin_addr.s_addr = INADDR_ANY; + + /* assign server port number */ + + server.sin_port = htons((u_short)portno); + post("listening to port number %d", portno); + + + setsocketoptions(sockfd); + + + /* name the socket */ + + if (bind(sockfd, (struct sockaddr *)&server, sizeof(server)) < 0) { + sys_sockerror("bind"); + sys_closesocket(sockfd); + return (0); + } + + + if (!tcp) { + x->x_socket = sockfd; + x->nbytes = 0; + sys_addpollfn(sockfd, streamin_datapoll, x); + } + else { + if (listen(sockfd, 5) < 0) { + sys_sockerror("listen"); + sys_closesocket(sockfd); + } + else { + x->x_connectsocket = sockfd; + sys_addpollfn(sockfd, streamin_connectpoll, x); + } + } + return 1; +} + + + +static void streamin_free(t_streamin *x) +{ + if (x->x_connectsocket > 0) { + sys_closesocket(x->x_connectsocket); + sys_rmpollfn(x->x_connectsocket); + } + sys_rmpollfn(x->x_socket); + sys_closesocket(x->x_socket); +} + + +#define QUEUESIZE ((x->framein + MAXFRAMES - x->frameout)%MAXFRAMES) + +static t_int *streamin_perform(t_int *w) +{ + t_streamin *x = (t_streamin*) (w[1]); + t_float *out = (t_float *)(w[2]); + int n = (int)(w[3]); + int ret; + int i = 0; + + if (x->counter < x->maxframes) { + return (w+4); + } + + if (x->framein == x->frameout) { + x->underflow++; + return w+4; + } + + + /* queue balancing */ + + x->average[x->averagecur] = QUEUESIZE; + x->averagecur++; + x->averagecur %= AVERAGENUM; + + switch (x->frames[x->frameout].tag.format) { + case SF_FLOAT: { + t_float* buf = (t_float*)(x->frames[x->frameout].data); + while (n--) + *out++ = *buf++; + x->frameout++; + x->frameout %= MAXFRAMES; + break; + } + case SF_16BIT: + { + short* buf = (short*)(x->frames[x->frameout].data); + + while (n--) + *out++ = (float) *buf++*3.051850e-05; + x->frameout++; + x->frameout %= MAXFRAMES; + + break; + } + case SF_8BIT: + { + unsigned char* buf = (char*)(x->frames[x->frameout].data); + + while (n--) + *out++ = (float) (0.0078125 * (*buf++)) - 1.0; + x->frameout++; + x->frameout %= MAXFRAMES; + break; + } + default: + post("unknown format %d",x->frames[x->frameout].tag.format); + break; + } + + return (w+4); +} + + + +static void streamin_dsp(t_streamin *x, t_signal **sp) +{ + dsp_add(streamin_perform, 3, x, sp[0]->s_vec, sp[0]->s_n); +} + + +static void streamin_print(t_streamin* x) +{ + int i; + int avg = 0; + for (i=0;iaverage[i]; + post("last size = %d, avg size = %d, %d underflows", + QUEUESIZE,avg,x->underflow); +} + + + + +static void *streamin_new(t_floatarg fportno, t_floatarg prot) +{ + t_streamin *x; + int i; + + post("port %f",fportno); + x = (t_streamin *)pd_new(streamin_class); + + x->x_connectsocket = -1; + x->x_socket = -1; + x->x_tcp = 1; + outlet_new(&x->x_obj, &s_signal); + x->x_nconnections = 0; + x->x_ndrops = 0; + + for (i=0;iframes[i].data = getbytes(MAXFRAMESIZE); + } + x->framein = 0; + x->frameout = 0; + x->maxframes = MAXFRAMES/2; + + if (prot) + x->x_tcp = 0; + + streamin_createsocket(x, fportno, gensym("tcp")); + + return (x); +} + + + + + +void streamin_tilde_setup(void) +{ + streamin_class = class_new(gensym("streamin~"), + (t_newmethod) streamin_new, (t_method) streamin_free, + sizeof(t_streamin), 0, A_DEFFLOAT,A_DEFFLOAT, A_NULL); + + class_addmethod(streamin_class, nullfn, gensym("signal"), 0); + class_addmethod(streamin_class, (t_method) streamin_dsp, gensym("dsp"), 0); + class_addmethod(streamin_class, (t_method) streamin_print, + gensym("print"), 0); + class_addmethod(streamin_class, (t_method) streamin_reset, + gensym("reset"),A_DEFFLOAT, 0); +} -- cgit v1.2.1