From c50ce0e0217ea07e2d450add2ab29cecea66fa96 Mon Sep 17 00:00:00 2001 From: Hans-Christoph Steiner Date: Mon, 28 Nov 2005 01:07:25 +0000 Subject: This commit was generated by cvs2svn to compensate for changes in r4059, which included commits to RCS files with non-trunk default branches. svn path=/trunk/externals/pdp/; revision=4060 --- modules/generic/pdp_udp_send.c | 336 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 modules/generic/pdp_udp_send.c (limited to 'modules/generic/pdp_udp_send.c') diff --git a/modules/generic/pdp_udp_send.c b/modules/generic/pdp_udp_send.c new file mode 100644 index 0000000..cb55ad1 --- /dev/null +++ b/modules/generic/pdp_udp_send.c @@ -0,0 +1,336 @@ +/* + * Pure Data Packet module. + * Copyright (c) by Tom Schouten + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + + +/* this module sends a pure packet out as an udp packet stream */ + +#include "pdp_net.h" +#include "pdp.h" +#include "pdp_resample.h" + +#include +#include +#include +#include +#include +#include + +#define DD if(0) // print DROP debug info +#define D if(0) // print extra connection debug info +#define V if(0) // be verbose (parameter setting feedback) + +typedef struct pdp_udp_send_struct +{ + + t_object x_obj; + t_float x_f; + + /* sender object */ + t_pdp_udp_sender *x_sender; + + /* pthread vars */ + pthread_mutex_t x_mut; + pthread_cond_t x_cond_data_ready; + pthread_cond_t x_cond_send_done; + pthread_t x_thread; + int x_exit_thread; + + // drop info + unsigned int x_drop; + + t_outlet *x_outlet0; + + // packet queue + int x_nb_packets; + int x_read_packet; + int x_write_packet; + int *x_packet; + + +} t_pdp_udp_send; + + + + + +/* some synchro code */ + +static int _wait_for_feeder(t_pdp_udp_send *x) +{ + + /* only use locking when there is no data */ + if (x->x_packet[x->x_read_packet] == -1){ + + /* signal sending is done */ + pthread_mutex_lock(&x->x_mut); + pthread_cond_signal(&x->x_cond_send_done); + + /* wait until there is an item in the queue */ + while((x->x_packet[x->x_read_packet] == -1) && (!x->x_exit_thread)){ + pthread_cond_wait(&x->x_cond_data_ready, &x->x_mut); + } + pthread_mutex_unlock(&x->x_mut); + + /* check if we need to stop the thread */ + if (x->x_exit_thread) return 0; + + } + + return !x->x_exit_thread; +} + +static void _signal_sender(t_pdp_udp_send *x) +{ + + pthread_mutex_lock(&x->x_mut); + pthread_cond_signal(&x->x_cond_data_ready); + pthread_mutex_unlock(&x->x_mut); +} + +static void _wait_until_done(t_pdp_udp_send *x) +{ + pthread_mutex_lock(&x->x_mut); + while (x->x_packet[x->x_read_packet] != -1){ + pthread_cond_wait(&x->x_cond_send_done, &x->x_mut); + } + pthread_mutex_unlock(&x->x_mut); +} + + +static void _remove_packet_from_queue(t_pdp_udp_send *x) +{ + +} + + + + + +static void *send_thread(void *threaddata) +{ + t_pdp_udp_send *x = (t_pdp_udp_send *)threaddata; + + /* main thread loop */ + + /* get a pdp packet from queue */ + /* send header packet and make sure it has arrived */ + /* send a chunk burst */ + /* send done packet and get the resend list */ + /* repeat until send list is empty */ + + while (_wait_for_feeder(x)){ + t_pdp *header; + void *data; + + /* check if we have a valid pdp packet */ + if ((!(header = pdp_packet_header(x->x_packet[x->x_read_packet]))) + ||(!(data = pdp_packet_data(x->x_packet[x->x_read_packet]))) + ||(0 == header->desc)) goto remove; /* nothing to transmit */ + + /* send it */ + pdp_udp_sender_send(x->x_sender, + header->desc->s_name, + header->size - PDP_HEADER_SIZE, data); + + + remove: + /* remove packet from queue */ + pdp_packet_mark_unused(x->x_packet[x->x_read_packet]); + x->x_packet[x->x_read_packet] = -1; + x->x_read_packet++; + x->x_read_packet %= x->x_nb_packets; + + } + return 0; +} + + +static void pdp_udp_send_input_0(t_pdp_udp_send *x, t_symbol *s, t_floatarg f) +{ + + int p = (int)f; + int my_p; + int transferred = 0; + + if (s== gensym("register_ro")){ + + + // check if packet can be stored in the queue + // this is possible if the current write location does not contain a packet + + if (x->x_packet[x->x_write_packet] == -1){ + + // get the packet outside of the lock + my_p = pdp_packet_copy_ro(p); + + + // add to queue (do we really need to lock here?> + //pthread_mutex_lock(&x->x_mut); // LOCK + x->x_packet[x->x_write_packet] = my_p; + x->x_write_packet++; + x->x_write_packet %= x->x_nb_packets; + transferred = 1; + //pthread_mutex_unlock(&x->x_mut); // UNLOCK + } + + // signal sender if transfer succeded + if (transferred) _signal_sender(x); + + // else send a float indicating the number of drops so far + else{ + x->x_drop++; + //outlet_float(x->x_outlet0, (float)x->x_drop); + + DD post ("pdp_netsend: DROP: queue full"); + } + } +} + + + +/* some flow control hacks */ + +static void pdp_udp_send_timeout(t_pdp_udp_send *x, float f) +{ + if (f < 0.0f) f = 0.0f; + pdp_udp_sender_timeout_us(x->x_sender, 1000.0f * f); +} + + +static void pdp_udp_send_sleepgrain(t_pdp_udp_send *x, float f) +{ + if (f < 0.0f) f = 0.0f; + pdp_udp_sender_sleepgrain_us(x->x_sender, 1000.0f * f); +} + +static void pdp_udp_send_sleepperiod(t_pdp_udp_send *x, float f) +{ + if (f < 0.0f) f = 0.0f; + pdp_udp_sender_sleepperiod(x->x_sender, f); +} + + +static void pdp_udp_send_udpsize(t_pdp_udp_send *x, float f) +{ + if (f < 0.0f) f = 0.0f; + pdp_udp_sender_udp_packet_size(x->x_sender, f); +} + +static void pdp_udp_send_connect(t_pdp_udp_send *x, t_symbol *shost, t_float fport) +{ + unsigned int port; + struct hostent *hp; + + /* suspend until sending thread is finished */ + _wait_until_done(x); + + /* set target address */ + port = (fport == 0.0f) ? 7777 : fport; + if (shost == gensym("")) shost = gensym("127.0.0.1"); + + /* connect */ + pdp_udp_sender_connect(x->x_sender, shost->s_name, port); + +} + + +static void pdp_udp_send_free(t_pdp_udp_send *x) +{ + int i; + void* retval; + _wait_until_done(x); // send all remaining packets + x->x_exit_thread = 1; // .. and wait for thread to finish + _signal_sender(x); + pthread_join(x->x_thread, &retval); + + pdp_udp_sender_free(x->x_sender); + + + for (i=0; ix_nb_packets; i++) pdp_packet_mark_unused(x->x_packet[i]); + pdp_dealloc(x->x_packet); + +} + +t_class *pdp_udp_send_class; + + + +void *pdp_udp_send_new(void) +{ + int i; + pthread_attr_t attr; + + t_pdp_udp_send *x = (t_pdp_udp_send *)pd_new(pdp_udp_send_class); + + x->x_sender = pdp_udp_sender_new(); + + //x->x_outlet0 = outlet_new(&x->x_obj, &s_anything); + + x->x_nb_packets = 4; + x->x_packet = malloc(sizeof(int)*x->x_nb_packets); + for (i=0; ix_nb_packets; i++) x->x_packet[i] = -1; + x->x_read_packet = 0; + x->x_write_packet = 0; + + x->x_drop = 0; + + + + /* setup thread stuff & create thread */ + x->x_exit_thread = 0; + pthread_mutex_init(&x->x_mut, NULL); + pthread_cond_init(&x->x_cond_data_ready, NULL); + pthread_cond_init(&x->x_cond_send_done, NULL); + pthread_attr_init(&attr); + //pthread_attr_setschedpolicy(&attr, SCHED_OTHER); + pthread_create(&x->x_thread, &attr, send_thread, x); + post("pdp_netsend: WARNING: experimental object"); + + + return (void *)x; +} + + +#ifdef __cplusplus +extern "C" +{ +#endif + + +void pdp_udp_send_setup(void) +{ + + pdp_udp_send_class = class_new(gensym("pdp_netsend"), (t_newmethod)pdp_udp_send_new, + (t_method)pdp_udp_send_free, sizeof(t_pdp_udp_send), 0, A_NULL); + + + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_input_0, gensym("pdp"), A_SYMBOL, A_DEFFLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_sleepgrain, gensym("sleepgrain"), A_FLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_sleepperiod, gensym("sleepperiod"), A_FLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_udpsize, gensym("udpsize"), A_FLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_timeout, gensym("timeout"), A_FLOAT, A_NULL); + class_addmethod(pdp_udp_send_class, (t_method)pdp_udp_send_connect, gensym("connect"), A_SYMBOL, A_FLOAT, A_NULL); + +} + +#ifdef __cplusplus +} +#endif -- cgit v1.2.1