From c50ce0e0217ea07e2d450add2ab29cecea66fa96 Mon Sep 17 00:00:00 2001 From: Hans-Christoph Steiner Date: Mon, 28 Nov 2005 01:07:25 +0000 Subject: This commit was generated by cvs2svn to compensate for changes in r4059, which included commits to RCS files with non-trunk default branches. svn path=/trunk/externals/pdp/; revision=4060 --- modules/generic/pdp_rawin.c | 299 +++++++++++++++++++++++++++++++++ modules/generic/pdp_rawout.c | 320 ++++++++++++++++++++++++++++++++++++ modules/generic/pdp_udp_receive.c | 203 +++++++++++++++++++++++ modules/generic/pdp_udp_send.c | 336 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 1158 insertions(+) create mode 100644 modules/generic/pdp_rawin.c create mode 100644 modules/generic/pdp_rawout.c create mode 100644 modules/generic/pdp_udp_receive.c create mode 100644 modules/generic/pdp_udp_send.c (limited to 'modules') diff --git a/modules/generic/pdp_rawin.c b/modules/generic/pdp_rawin.c new file mode 100644 index 0000000..28ef8fb --- /dev/null +++ b/modules/generic/pdp_rawin.c @@ -0,0 +1,299 @@ +/* + * Pure Data Packet module. packet forth console + * Copyright (c) by Tom Schouten + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "pdp_pd.h" +#include "pdp_debug.h" +#include "pdp_list.h" +#include "pdp_comm.h" +#include "pdp_post.h" +#include "pdp_packet.h" + + +#define PERIOD 1.0f +#define D if (1) + + + + +/* raw input from a unix pipe */ + +typedef struct rawin_struct +{ + /* pd */ + t_object x_obj; + t_outlet *x_outlet; + t_outlet *x_sync_outlet; + t_clock *x_clock; + + /* comm */ + t_pdp_list *x_queue; // packet queue + + /* thread */ + pthread_mutex_t x_mut; + pthread_attr_t x_attr; + pthread_t x_thread; + + /* sync */ + int x_giveup; // 1-> terminate reader thread + int x_active; // 1-> reader thread is launched + int x_done; // 1-> reader thread has exited + + /* config */ + t_symbol *x_pipe; + t_pdp_symbol *x_type; + +} t_rawin; + + +static inline void lock(t_rawin *x){pthread_mutex_lock(&x->x_mut);} +static inline void unlock(t_rawin *x){pthread_mutex_unlock(&x->x_mut);} + +static void rawin_close(t_rawin *x); +static void tick(t_rawin *x) +{ + /* send all packets in queue to outlet */ + lock(x); + while (x->x_queue->elements){ + outlet_pdp_atom(x->x_outlet, x->x_queue->first); + pdp_list_pop(x->x_queue); // pop stale reference + } + unlock(x); + clock_delay(x->x_clock, PERIOD); + + /* check if thread is done */ + if (x->x_done) rawin_close(x); + +} + +static void move_current_to_queue(t_rawin *x, int packet) +{ + lock(x); + pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet); + unlock(x); +} + +static void *rawin_thread(void *y) +{ + int pipe; + int packet = -1; + t_rawin *x = (t_rawin *)y; + int period_sec; + int period_usec; + + + //D pdp_post("pipe: %s", x->x_pipe->s_name); + //D pdp_post("type: %s", x->x_type->s_name); + + /* open pipe */ + if (-1 == (pipe = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){ + perror(x->x_pipe->s_name); + goto exit; + } + + /* main loop (packets) */ + while(1){ + void *data = 0; + int left = -1; + + /* create packet */ + if (-1 != packet){ + pdp_post("WARNING: deleting stale packet"); + pdp_packet_mark_unused(packet); + } + packet = pdp_factory_newpacket(x->x_type); + if (-1 == packet){ + pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name); + goto exit; + } + + /* fill packet */ + data = pdp_packet_data(packet); + left = pdp_packet_data_size(packet); + // D pdp_post("packet %d, data %x, size %d", packet, data, left); + + /* inner loop: pipe reads */ + while(left){ + + fd_set inset; + struct timeval tv = {0,10000}; + + /* check if we need to stop */ + if (x->x_giveup){ + pdp_packet_mark_unused(packet); + goto close; + } + /* select, with timeout */ + FD_ZERO(&inset); + FD_SET(pipe, &inset); + if (-1 == select(pipe+1, &inset, NULL,NULL, &tv)){ + pdp_post("select error"); + goto close; + } + + /* if ready, read, else retry */ + if (FD_ISSET(pipe, &inset)){ + int bytes = read(pipe, data, left); + if (!bytes){ + /* if no bytes are read, pipe is closed */ + goto close; + } + data += bytes; + left -= bytes; + } + } + + /* move to queue */ + move_current_to_queue(x, packet); + packet = -1; + + + + } + + close: + /* close pipe */ + close(pipe); + + + exit: + x->x_done = 1; + return 0; +} + + + +static void rawin_type(t_rawin *x, t_symbol *type) +{ + x->x_type = pdp_gensym(type->s_name); +} + +static void rawin_open(t_rawin *x, t_symbol *pipe) +{ + /* save pipe name if not empty */ + if (pipe->s_name[0]) {x->x_pipe = pipe;} + + if (x->x_active) { + pdp_post("already open"); + return; + } + /* start thread */ + x->x_giveup = 0; + x->x_done = 0; + pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x); + x->x_active = 1; +} + +static void rawin_close(t_rawin *x) +{ + + if (!x->x_active) return; + + /* stop thread: set giveup + wait */ + x->x_giveup = 1; + pthread_join(x->x_thread, NULL); + x->x_active = 0; + + /* notify */ + outlet_bang(x->x_sync_outlet); + pdp_post("connection to %s closed", x->x_pipe->s_name); + + + + + +} + +static void rawin_free(t_rawin *x) +{ + rawin_close(x); + clock_free(x->x_clock); + pdp_tree_strip_packets(x->x_queue); + pdp_tree_free(x->x_queue); +} + +t_class *rawin_class; + + +static void *rawin_new(t_symbol *pipe, t_symbol *type) +{ + t_rawin *x; + + pdp_post("%s %s", pipe->s_name, type->s_name); + + /* allocate & init */ + x = (t_rawin *)pd_new(rawin_class); + x->x_outlet = outlet_new(&x->x_obj, &s_anything); + x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything); + x->x_clock = clock_new(x, (t_method)tick); + x->x_queue = pdp_list_new(0); + x->x_active = 0; + x->x_giveup = 0; + x->x_done = 0; + x->x_type = pdp_gensym("image/YCrCb/320x240"); //default + x->x_pipe = gensym("/tmp/pdpraw"); // default + pthread_attr_init(&x->x_attr); + pthread_mutex_init(&x->x_mut, NULL); + clock_delay(x->x_clock, PERIOD); + + /* args */ + rawin_type(x, type); + if (pipe->s_name[0]) x->x_pipe = pipe; + + return (void *)x; + +} + + + +#ifdef __cplusplus +extern "C" +{ +#endif + + +void pdp_rawin_setup(void) +{ + int i; + + /* create a standard pd class: [pdp_rawin pipe type] */ + rawin_class = class_new(gensym("pdp_rawin"), (t_newmethod)rawin_new, + (t_method)rawin_free, sizeof(t_rawin), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL); + + /* add global message handler */ + class_addmethod(rawin_class, (t_method)rawin_type, gensym("type"), A_SYMBOL, A_NULL); + class_addmethod(rawin_class, (t_method)rawin_open, gensym("open"), A_DEFSYMBOL, A_NULL); + class_addmethod(rawin_class, (t_method)rawin_close, gensym("close"), A_NULL); + + +} + +#ifdef __cplusplus +} +#endif diff --git a/modules/generic/pdp_rawout.c b/modules/generic/pdp_rawout.c new file mode 100644 index 0000000..e1e9edf --- /dev/null +++ b/modules/generic/pdp_rawout.c @@ -0,0 +1,320 @@ +/* + * Pure Data Packet module. packet forth console + * Copyright (c) by Tom Schouten + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "pdp_pd.h" +#include "pdp_debug.h" +#include "pdp_list.h" +#include "pdp_comm.h" +#include "pdp_post.h" +#include "pdp_packet.h" + + +#define D if (1) +#define MAX_QUEUESIZE 4 +#define PIPE_BLOCKSIZE 4096 + + + + +/* raw input from a unix pipe */ + +typedef struct rawout_struct +{ + /* pd */ + t_object x_obj; + //t_outlet *x_outlet; + t_outlet *x_sync_outlet; + + /* comm */ + t_pdp_list *x_queue; // packet queue + + /* thread */ + pthread_mutex_t x_mut; + pthread_attr_t x_attr; + pthread_t x_thread; + + /* sync */ + int x_giveup; // 1-> terminate writer thread + int x_active; // 1-> writer thread is launched + int x_done; // 1-> writer thread has exited + + /* config */ + t_symbol *x_pipe; + t_pdp_symbol *x_type; + +} t_rawout; + + +static inline void lock(t_rawout *x){pthread_mutex_lock(&x->x_mut);} +static inline void unlock(t_rawout *x){pthread_mutex_unlock(&x->x_mut);} + +static void rawout_close(t_rawout *x); +static void pdp_in(t_rawout *x, t_symbol *s, t_float f) +{ + /* save packet to pdp queue, if size is smaller than maxsize */ + if (s == S_REGISTER_RO){ + if (x->x_queue->elements < MAX_QUEUESIZE){ + int p = (int)f; + p = pdp_packet_copy_ro(p); + if (p != -1){ + lock(x); + pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p); + unlock(x); + } + } + else { + pdp_post("pdp_rawout: dropping packet: (queue full)", MAX_QUEUESIZE); + } + + } + + /* check if thread is done */ + if (x->x_done) rawout_close(x); + +} + + + +static void *rawout_thread(void *y) +{ + int pipe; + int packet = -1; + t_rawout *x = (t_rawout *)y; + int period_sec; + int period_usec; + sigset_t sigvec; /* signal handling */ + + /* ignore pipe signal */ + sigemptyset(&sigvec); + sigaddset(&sigvec,SIGPIPE); + pthread_sigmask(SIG_BLOCK, &sigvec, 0); + + //D pdp_post("pipe: %s", x->x_pipe->s_name); + //D pdp_post("type: %s", x->x_type->s_name); + + /* open pipe */ + if (-1 == (pipe = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK))){ + perror(x->x_pipe->s_name); + goto exit; + } + + /* main loop (packets) */ + while(1){ + void *data = 0; + int left = -1; + + /* try again if queue is empty */ + if (!x->x_queue->elements){ + /* check if we need to stop */ + if (x->x_giveup){ + goto close; + } + else { + usleep(1000.0f); // sleep before polling again + continue; + } + } + /* get packet from queue */ + lock(x); + packet = pdp_list_pop(x->x_queue).w_packet; + unlock(x); + + /* send packet */ + data = pdp_packet_data(packet); + left = pdp_packet_data_size(packet); + + /* inner loop: pipe reads */ + while(left){ + + fd_set outset; + struct timeval tv = {0,10000}; + + /* check if we need to stop */ + if (x->x_giveup){ + pdp_packet_mark_unused(packet); + goto close; + } + + /* select, with timeout */ + FD_ZERO(&outset); + FD_SET(pipe, &outset); + if (-1 == select(pipe+1, NULL, &outset, NULL, &tv)){ + pdp_post("select error"); + goto close; + } + + /* if ready, read, else retry */ + if (FD_ISSET(pipe, &outset)){ + int bytes = write(pipe, data, left); + /* handle errors */ + if (bytes <= 0){ + perror(x->x_pipe->s_name); + if (bytes != EAGAIN) goto close; + } + /* or update pointers */ + else{ + data += bytes; + left -= bytes; + //pdp_post("left %d", left); + } + } + else { + //pdp_post("retrying write"); + } + } + + /* discard packet */ + pdp_packet_mark_unused(packet); + + + } + + close: + /* close pipe */ + close(pipe); + + + exit: + x->x_done = 1; + return 0; +} + + + +static void rawout_type(t_rawout *x, t_symbol *type) +{ + x->x_type = pdp_gensym(type->s_name); +} + +static void rawout_open(t_rawout *x, t_symbol *pipe) +{ + /* save pipe name if not empty */ + if (pipe->s_name[0]) {x->x_pipe = pipe;} + + if (x->x_active) { + pdp_post("already open"); + return; + } + /* start thread */ + x->x_giveup = 0; + x->x_done = 0; + pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x); + x->x_active = 1; +} + +static void rawout_close(t_rawout *x) +{ + + if (!x->x_active) return; + + /* stop thread: set giveup + wait */ + x->x_giveup = 1; + pthread_join(x->x_thread, NULL); + x->x_active = 0; + + /* notify */ + outlet_bang(x->x_sync_outlet); + pdp_post("connection to %s closed", x->x_pipe->s_name); + + + + + +} + +static void rawout_free(t_rawout *x) +{ + rawout_close(x); + pdp_tree_strip_packets(x->x_queue); + pdp_tree_free(x->x_queue); +} + +t_class *rawout_class; + + +static void *rawout_new(t_symbol *pipe, t_symbol *type) +{ + t_rawout *x; + + pdp_post("%s %s", pipe->s_name, type->s_name); + + /* allocate & init */ + x = (t_rawout *)pd_new(rawout_class); + //x->x_outlet = outlet_new(&x->x_obj, &s_anything); + x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything); + x->x_queue = pdp_list_new(0); + x->x_active = 0; + x->x_giveup = 0; + x->x_done = 0; + x->x_type = pdp_gensym("image/YCrCb/320x240"); //default + x->x_pipe = gensym("/tmp/pdpraw"); // default + pthread_attr_init(&x->x_attr); + pthread_mutex_init(&x->x_mut, NULL); + + /* args */ + rawout_type(x, type); + if (pipe->s_name[0]) x->x_pipe = pipe; + + return (void *)x; + +} + + + +#ifdef __cplusplus +extern "C" +{ +#endif + + +void pdp_rawout_setup(void) +{ + int i; + + /* create a standard pd class: [pdp_rawout pipe type] */ + rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new, + (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL); + + /* add global message handler */ + class_addmethod(rawout_class, (t_method)pdp_in, + gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL); + + class_addmethod(rawout_class, (t_method)rawout_type, gensym("type"), A_SYMBOL, A_NULL); + class_addmethod(rawout_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL); + class_addmethod(rawout_class, (t_method)rawout_close, gensym("close"), A_NULL); + + +} + +#ifdef __cplusplus +} +#endif diff --git a/modules/generic/pdp_udp_receive.c b/modules/generic/pdp_udp_receive.c new file mode 100644 index 0000000..3d42466 --- /dev/null +++ b/modules/generic/pdp_udp_receive.c @@ -0,0 +1,203 @@ +/* + * Pure Data Packet module. + * Copyright (c) by Tom Schouten + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + +/* this module sends receives an udp packet stream and converts to pdp packet */ + +#include "pdp_net.h" +#include "pdp.h" +#include "pdp_resample.h" + +#define D if(0) + +typedef struct pdp_udp_receive_struct +{ + + t_object x_obj; + t_float x_f; + + /* receiver object */ + t_pdp_udp_receiver *x_receiver; + + + /* thread vars */ + pthread_attr_t x_attr; + pthread_t x_thread; + int x_exit_thread; + + /* packet queue */ + int x_index; + int x_packet[2]; + + /* polling clock */ + t_clock *x_clock; + /* outlet */ + t_outlet *x_outlet0; + +} t_pdp_udp_receive; + + +static void clock_tick(t_pdp_udp_receive *x) +{ + /* poll for new packet */ + + pdp_pass_if_valid(x->x_outlet0, &x->x_packet[!x->x_index]); + clock_delay(x->x_clock, 1.0f); +} + + + + +static void *receive_thread(void *threaddata) +{ + t_pdp_udp_receive *x = (t_pdp_udp_receive *)threaddata; + t_pdp *pdp_header = 0; + void *pdp_data = 0; + int tmp_packet = -1; + char *type = 0; + unsigned int size = 0; + + /* listen for packets */ + while (!x->x_exit_thread){ + + + switch(pdp_udp_receiver_receive(x->x_receiver, 100)){ + case -1: + /* error */ + goto exit; + case 0: + /* timeout */ + continue; + case 1: + /* data ready */ + break; + } + + /* create a new packet */ + type = pdp_udp_receiver_type(x->x_receiver); + tmp_packet = pdp_factory_newpacket(pdp_gensym(type)); + pdp_header = pdp_packet_header(tmp_packet); + pdp_data = pdp_packet_data(tmp_packet); + + /* check if we were able to create the pdp packet */ + if (!(pdp_header && pdp_data)){ + post("pdp_netreceive: can't create packet (type %s)", type); + pdp_udp_receiver_reset(x->x_receiver); + continue; + } + + /* check size */ + size = pdp_udp_receiver_size(x->x_receiver); + if ((pdp_header->size - PDP_HEADER_SIZE) != size){ + pdp_packet_mark_unused(tmp_packet); + tmp_packet = -1; + post("pdp_netreceive: invalid packet size %d (pdp packet size = %d)", + size, pdp_header->size - PDP_HEADER_SIZE); + continue; + } + + /* copy the data */ + memcpy(pdp_data, pdp_udp_receiver_data(x->x_receiver), size); + + /* copy the packet into queue */ + x->x_index ^= 1; + pdp_packet_mark_unused(x->x_packet[x->x_index]); + x->x_packet[x->x_index] = tmp_packet; + + + } + + exit: + post("thread exiting"); + return 0; +} + + +static void pdp_udp_receive_free(t_pdp_udp_receive *x) +{ + int i; + void* retval; + x->x_exit_thread = 1; // wait for thread to finish + pthread_join(x->x_thread, &retval); + + pdp_udp_receiver_free(x->x_receiver); + + pdp_packet_mark_unused(x->x_packet[0]); + pdp_packet_mark_unused(x->x_packet[1]); + +} + +t_class *pdp_udp_receive_class; + + + +void *pdp_udp_receive_new(t_floatarg fport) +{ + int i; + int port; + struct hostent *hp; + + t_pdp_udp_receive *x = (t_pdp_udp_receive *)pd_new(pdp_udp_receive_class); + + x->x_outlet0 = outlet_new(&x->x_obj, &s_anything); + + x->x_packet[0] = -1; + x->x_packet[1] = -1; + x->x_index = 0; + + port = (fport == 0.0f) ? 7777 : fport; + x->x_receiver = pdp_udp_receiver_new(port); + + /* setup thread stuff & create thread */ + x->x_exit_thread = 0; + pthread_attr_init(&x->x_attr); + pthread_attr_setschedpolicy(&x->x_attr, SCHED_OTHER); + pthread_create(&x->x_thread, &x->x_attr, receive_thread, x); + + + /* setup the clock */ + x->x_clock = clock_new(x, (t_method)clock_tick); + clock_delay(x->x_clock, 0); + + post("pdp_netreceive: WARNING: experimental object"); + + return (void *)x; +} + + +#ifdef __cplusplus +extern "C" +{ +#endif + + +void pdp_udp_receive_setup(void) +{ + + + pdp_udp_receive_class = class_new(gensym("pdp_netreceive"), (t_newmethod)pdp_udp_receive_new, + (t_method)pdp_udp_receive_free, sizeof(t_pdp_udp_receive), 0, A_DEFFLOAT, A_NULL); + + +} + +#ifdef __cplusplus +} +#endif diff --git a/modules/generic/pdp_udp_send.c b/modules/generic/pdp_udp_send.c new file mode 100644 index 0000000..cb55ad1 --- /dev/null +++ b/modules/generic/pdp_udp_send.c @@ -0,0 +1,336 @@ +/* + * Pure Data Packet module. + * Copyright (c) by Tom Schouten + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + + +/* this module sends a pure packet out as an udp packet stream */ + +#include "pdp_net.h" +#include "pdp.h" +#include "pdp_resample.h" + +#include +#include +#include +#include +#include +#include + +#define DD if(0) // print DROP debug info +#define D if(0) // print extra connection debug info +#define V if(0) // be verbose (parameter setting feedback) + +typedef struct pdp_udp_send_struct +{ + + t_object x_obj; + t_float x_f; + + /* sender object */ + t_pdp_udp_sender *x_sender; + + /* pthread vars */ + pthread_mutex_t x_mut; + pthread_cond_t x_cond_data_ready; + pthread_cond_t x_cond_send_done; + pthread_t x_thread; + int x_exit_thread; + + // drop info + unsigned int x_drop; + + t_outlet *x_outlet0; + + // packet queue + int x_nb_packets; + int x_read_packet; + int x_write_packet; + int *x_packet; + + +} t_pdp_udp_send; + + + + + +/* some synchro code */ + +static int _wait_for_feeder(t_pdp_udp_send *x) +{ + + /* only use locking when there is no data */ + if (x->x_packet[x->x_read_packet] == -1){ + + /* signal sending is done */ + pthread_mutex_lock(&x->x_mut); + pthread_cond_signal(&x->x_cond_send_done); + + /* wait until there is an item in the queue */ + while((x->x_packet[x->x_read_packet] == -1) && (!x->x_exit_thread)){ + pthread_cond_wait(&x->x_cond_data_ready, &x->x_mut); + } + pthread_mutex_unlock(&x->x_mut); + + /* check if we need to stop the thread */ + if (x->x_exit_thread) return 0; + + } + + return !x->x_exit_thread; +} + +static void _signal_sender(t_pdp_udp_send *x) +{ + + pthread_mutex_lock(&x->x_mut); + pthread_cond_signal(&x->x_cond_data_ready); + pthread_mutex_unlock(&x->x_mut); +} + +static void _wait_until_done(t_pdp_udp_send *x) +{ + pthread_mutex_lock(&x->x_mut); + while (x->x_packet[x->x_read_packet] != -1){ + pthread_cond_wait(&x->x_cond_send_done, &x->x_mut); + } + pthread_mutex_unlock(&x->x_mut); +} + + +static void _remove_packet_from_queue(t_pdp_udp_send *x) +{ + +} + + + + + +static void *send_thread(void *threaddata) +{ + t_pdp_udp_send *x = (t_pdp_udp_send *)threaddata; + + /* main thread loop */ + + /* get a pdp packet from queue */ + /* send header packet and make sure it has arrived */ + /* send a chunk burst */ + /* send done packet and get the resend list */ + /* repeat until send list is empty */ + + while (_wait_for_feeder(x)){ + t_pdp *header; + void *data; + + /* check if we have a valid pdp packet */ + if ((!(header = pdp_packet_header(x->x_packet[x->x_read_packet]))) + ||(!(data = pdp_packet_data(x->x_packet[x->x_read_packet]))) + ||(0 == header->desc)) goto remove; /* nothing to transmit */ + + /* send it */ + pdp_udp_sender_send(x->x_sender, + header->desc->s_name, + header->size - PDP_HEADER_SIZE, data); + + + remove: + /* remove packet from queue */ + pdp_packet_mark_unused(x->x_packet[x->x_read_packet]); + x->x_packet[x->x_read_packet] = -1; + x->x_read_packet++; + x->x_read_packet %= x->x_nb_packets; + + } + return 0; +} + + +static void pdp_udp_send_input_0(t_pdp_udp_send *x, t_symbol *s, t_floatarg f) +{ + + int p = (int)f; + int my_p; + int transferred = 0; + + if (s== gensym("register_ro")){ + + + // check if packet can be stored in the queue + // this is possible if the current write location does not contain a packet + + if (x->x_packet[x->x_write_packet] == -1){ + + // get the packet outside of the lock + my_p = pdp_packet_copy_ro(p); + + + // add to queue (do we really need to lock here?> + //pthread_mutex_lock(&x->x_mut); // LOCK + x->x_packet[x->x_write_packet] = my_p; + x->x_write_packet++; + x->x_write_packet %= x->x_nb_packets; + transferred = 1; + //pthread_mutex_unlock(&x->x_mut); // UNLOCK + } + + // signal sender if transfer succeded + if (transferred) _signal_sender(x); + + // else send a float indicating the number of drops so far + else{ + x->x_drop++; + //outlet_float(x->x_outlet0, (float)x->x_drop); + + DD post ("pdp_netsend: DROP: queue full"); + } + } +} + + + +/* some flow control hacks */ + +static void pdp_udp_send_timeout(t_pdp_udp_send *x, float f) +{ + if (f < 0.0f) f = 0.0f; + pdp_udp_sender_timeout_us(x->x_sender, 1000.0f * f); +} + + +static void pdp_udp_send_sleepgrain(t_pdp_udp_send *x, float f) +{ + if (f < 0.0f) f = 0.0f; + pdp_udp_sender_sleepgrain_us(x->x_sender, 1000.0f * f); +} + +static void pdp_udp_send_sleepperiod(t_pdp_udp_send *x, float f) +{ + if (f < 0.0f) f = 0.0f; + pdp_udp_sender_sleepperiod(x->x_sender, f); +} + + +static void pdp_udp_send_udpsize(t_pdp_udp_send *x, float f) +{ + if (f < 0.0f) f = 0.0f; + pdp_udp_sender_udp_packet_size(x->x_sender, f); +} + +static void pdp_udp_send_connect(t_pdp_udp_send *x, t_symbol *shost, t_float fport) +{ + unsigned int port; + struct hostent *hp; + + /* suspend until sending thread is finished */ + _wait_until_done(x); + + /* set target address */ + port = (fport == 0.0f) ? 7777 : fport; + if (shost == gensym("")) shost = gensym("127.0.0.1"); + + /* connect */ + pdp_udp_sender_connect(x->x_sender, shost->s_name, port); + +} + + +static void pdp_udp_send_free(t_pdp_udp_send *x) +{ + int i; + void* retval; + _wait_until_done(x); // send all remaining packets + x->x_exit_thread = 1; // .. and wait for thread to finish + _signal_sender(x); + pthread_join(x->x_thread, &retval); + + pdp_udp_sender_free(x->x_sender); + + + for (i=0; ix_nb_packets; i++) pdp_packet_mark_unused(x->x_packet[i]); + pdp_dealloc(x->x_packet); + +} + +t_class *pdp_udp_send_class; + + + +void *pdp_udp_send_new(void) +{ + int i; + pthread_attr_t attr; + + t_pdp_udp_send *x = (t_pdp_udp_send *)pd_new(pdp_udp_send_class); + + x->x_sender = pdp_udp_sender_new(); + + //x->x_outlet0 = outlet_new(&x->x_obj, &s_anything); + + x->x_nb_packets = 4; + x->x_packet = malloc(sizeof(int)*x->x_nb_packets); + for (i=0; ix_nb_packets; i++) x->x_packet[i] = -1; + x->x_read_packet = 0; + x->x_write_packet = 0; + + x->x_drop = 0; + + + + /* setup thread stuff & create thread */ + x->x_exit_thread = 0; + pthread_mutex_init(&x->x_mut, NULL); + pthread_cond_init(&x->x_cond_data_ready, NULL); + pthread_cond_init(&x->x_cond_send_done, NULL); + pthread_attr_init(&attr); + //pthread_attr_setschedpolicy(&attr, SCHED_OTHER); + pthread_create(&x->x_thread, &attr, send_thread, x); + post("pdp_netsend: WARNING: experimental object"); + + + return (void *)x; +} + + +#ifdef __cplusplus +extern "C" +{ +#endif + + +void pdp_udp_send_setup(void) +{ + + pdp_udp_send_class = class_new(gensym("pdp_netsend"), (t_newmethod)pdp_udp_send_new, + (t_method)pdp_udp_send_free, sizeof(t_pdp_udp_send), 0, A_NULL); + + + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_input_0, gensym("pdp"), A_SYMBOL, A_DEFFLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_sleepgrain, gensym("sleepgrain"), A_FLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_sleepperiod, gensym("sleepperiod"), A_FLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_udpsize, gensym("udpsize"), A_FLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_timeout, gensym("timeout"), A_FLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_connect, gensym("connect"), A_SYMBOL, A_FLOAT, A_NULL); + +} + +#ifdef __cplusplus +} +#endif -- cgit v1.2.1