diff options
Diffstat (limited to 'system')
-rw-r--r-- | system/X11/pdp_xvideo.c | 201 | ||||
-rw-r--r-- | system/kernel/CONTENTS | 7 | ||||
-rw-r--r-- | system/kernel/pdp_debug.c | 21 | ||||
-rw-r--r-- | system/kernel/pdp_mem.c | 129 | ||||
-rw-r--r-- | system/kernel/pdp_packet2.c | 623 | ||||
-rw-r--r-- | system/kernel/pdp_post.c | 48 | ||||
-rw-r--r-- | system/kernel/pdp_symbol.c | 196 | ||||
-rw-r--r-- | system/net/Makefile | 11 | ||||
-rw-r--r-- | system/net/pdp_net.c | 685 |
9 files changed, 1921 insertions, 0 deletions
diff --git a/system/X11/pdp_xvideo.c b/system/X11/pdp_xvideo.c new file mode 100644 index 0000000..dab060d --- /dev/null +++ b/system/X11/pdp_xvideo.c @@ -0,0 +1,201 @@ +/* + * Pure Data Packet system module. - x window glue code (fairly tied to pd and pdp) + * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + +// this code is fairly tied to pd and pdp. serves mainly as reusable glue code +// for pdp_xv, pdp_glx, pdp_3d_windowcontext, ... + +#include <string.h> + +#include "pdp_xwindow.h" +#include "pdp_xvideo.h" +#include "pdp_post.h" +#include "pdp_packet.h" + +#define D if(0) + + + + +/************************************* PDP_XVIDEO ************************************/ + +static void pdp_xvideo_create_xvimage(t_pdp_xvideo *xvid, int width, int height) +{ + int i; + long size; + + //post("pdp_xvideo_create_xvimage"); + + xvid->width = width; + xvid->height = height; + size = (xvid->width * xvid->height + (((xvid->width>>1)*(xvid->height>>1))<<1)); + //post("create xvimage %d %d", xvid->width, xvid->height); + xvid->data = (unsigned char *)pdp_alloc(size); + for (i=0; i<size; i++) xvid->data[i] = i; + xvid->xvi = XvCreateImage(xvid->xdpy->dpy, xvid->xv_port, xvid->xv_format, (char *)xvid->data, xvid->width, xvid->height); + xvid->last_encoding = -1; + if ((!xvid->xvi) || (!xvid->data)) pdp_post ("ERROR CREATING XVIMAGE"); + //pdp_post("created xvimag data:%x xvi:%x",xvid->data,xvid->xvi); + +} + +static void pdp_xvideo_destroy_xvimage(t_pdp_xvideo *xvid) +{ + if(xvid->data) pdp_dealloc(xvid->data); + if (xvid->xvi) XFree(xvid->xvi); + xvid->xvi = 0; + xvid->data = 0; +} + +void pdp_xvideo_display_packet(t_pdp_xvideo *xvid, t_pdp_xwindow *xwin, int packet) +{ + t_pdp *header = pdp_packet_header(packet); + void *data = pdp_packet_data(packet); + t_bitmap * bm = pdp_packet_bitmap_info(packet); + unsigned int width, height, encoding, size, nbpixels; + + /* some checks: only display when initialized and when pacet is bitmap YV12 */ + if (!xvid->initialized) return; + if (!header) return; + if (!bm) return; + + width = bm->width; + height = bm->height; + encoding = bm->encoding; + size = (width * height + (((width>>1)*(height>>1))<<1)); + nbpixels = width * height; + + if (PDP_BITMAP != header->type) return; + if (PDP_BITMAP_YV12 != encoding) return; + + /* check if xvimage needs to be recreated */ + if ((width != xvid->width) || (height != xvid->height)){ + //pdp_post("pdp_xv: replace image"); + pdp_xvideo_destroy_xvimage(xvid); + pdp_xvideo_create_xvimage(xvid, width, height); + } + + /* copy the data to the XvImage buffer */ + memcpy(xvid->data, data, size); + + /* display */ + XvPutImage(xvid->xdpy->dpy,xvid->xv_port, xwin->win,xwin->gc,xvid->xvi, + 0,0,xvid->width,xvid->height, 0,0,xwin->winwidth,xwin->winheight); + XFlush(xvid->xdpy->dpy); + + + +} + + + +void pdp_xvideo_close(t_pdp_xvideo* xvid) +{ + if (xvid->initialized){ + if (xvid->xvi) pdp_xvideo_destroy_xvimage(xvid); + XvUngrabPort(xvid->xdpy->dpy, xvid->xv_port, CurrentTime); + xvid->xv_port = 0; + xvid->xdpy = 0; + xvid->last_encoding = -1; + xvid->initialized = false; + } +} + +void pdp_xvideo_cleanup(t_pdp_xvideo* xvid) +{ + // close xvideo port (and delete XvImage) + pdp_xvideo_close(xvid); + + // no more dynamic data to free + +} + +void pdp_xvideo_free(t_pdp_xvideo* xvid){ + pdp_xvideo_cleanup(xvid); + pdp_dealloc(xvid); +} + +void pdp_xvideo_init(t_pdp_xvideo *xvid) +{ + + xvid->xdpy = 0; + + xvid->xv_format = FOURCC_YV12; + xvid->xv_port = 0; + + xvid->width = 320; + xvid->height = 240; + + xvid->data = 0; + xvid->xvi = 0; + + xvid->initialized = 0; + xvid->last_encoding = -1; + +} +t_pdp_xvideo *pdp_xvideo_new(void) +{ + t_pdp_xvideo *xvid = pdp_alloc(sizeof(*xvid)); + pdp_xvideo_init(xvid); + return xvid; +} + +int pdp_xvideo_open_on_display(t_pdp_xvideo *xvid, t_pdp_xdisplay *d) +{ + unsigned int ver, rel, req, ev, err, i, j; + unsigned int adaptors; + int formats; + XvAdaptorInfo *ai; + + if (xvid->initialized) return 1; + if (!d) return 0; + xvid->xdpy = d; + + if (Success != XvQueryExtension(xvid->xdpy->dpy,&ver,&rel,&req,&ev,&err)) return 0; + + /* find + lock port */ + if (Success != XvQueryAdaptors(xvid->xdpy->dpy,DefaultRootWindow(xvid->xdpy->dpy),&adaptors,&ai)) + return 0; + for (i = 0; i < adaptors; i++) { + if ((ai[i].type & XvInputMask) && (ai[i].type & XvImageMask)) { + for (j=0; j < ai[i].num_ports; j++){ + if (Success != XvGrabPort(xvid->xdpy->dpy,ai[i].base_id+j,CurrentTime)) { + //fprintf(stderr,"INFO: Xvideo port %ld on adapter %d: is busy, skipping\n",ai[i].base_id+j, i); + } + else { + xvid->xv_port = ai[i].base_id + j; + goto breakout; + } + } + } + } + + + breakout: + + XFree(ai); + if (0 == xvid->xv_port) return 0; + pdp_post("pdp_xvideo: grabbed port %d on adaptor %d", xvid->xv_port, i); + xvid->initialized = 1; + pdp_xvideo_create_xvimage(xvid, xvid->width, xvid->height); + return 1; +} + + diff --git a/system/kernel/CONTENTS b/system/kernel/CONTENTS new file mode 100644 index 0000000..c2f7c8c --- /dev/null +++ b/system/kernel/CONTENTS @@ -0,0 +1,7 @@ +debug debug stuff +forth the forth system +list the list implementation +mem memory allocation stuf +packet the packet memory manager +type the type handling and conversion system +symbol symbol implementation, with namespaces for forth, types, classes, ... diff --git a/system/kernel/pdp_debug.c b/system/kernel/pdp_debug.c new file mode 100644 index 0000000..07f6541 --- /dev/null +++ b/system/kernel/pdp_debug.c @@ -0,0 +1,21 @@ +#include <sys/types.h> +#include <signal.h> +#include <unistd.h> +#include "pdp_post.h" + +int pdp_debug_sigtrap_on_assert; + + +void pdp_assert_hook (char *condition, char *file, int line) +{ + pdp_post("PDP_ASSERT (%s) failed in file %s, line %u. ", condition, file, line); + pdp_post("%s.\n", pdp_debug_sigtrap_on_assert ? "sending SIGTRAP" : "continuing"); + + if (pdp_debug_sigtrap_on_assert) kill(getpid(), SIGTRAP); +} + + +void pdp_debug_setup(void) +{ + pdp_debug_sigtrap_on_assert = 1; +} diff --git a/system/kernel/pdp_mem.c b/system/kernel/pdp_mem.c new file mode 100644 index 0000000..33822ef --- /dev/null +++ b/system/kernel/pdp_mem.c @@ -0,0 +1,129 @@ +/* + * Pure Data Packet system file: memory allocation + * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include <stdlib.h> +#include "pdp_mem.h" +#include "pdp_debug.h" + + +/* malloc wrapper that calls garbage collector */ +void *pdp_alloc(int size) +{ + void *ptr = malloc(size); + + PDP_ASSERT(ptr); + + return ptr; + + //TODO: REPAIR THIS + //post ("malloc failed in a pdp module: running garbage collector."); + //pdp_pool_collect_garbage(); + //return malloc(size); +} + + +void pdp_dealloc(void *stuff) +{ + free (stuff); +} + + +/* fast atom allocation object + well, this is not too fast yet, but will be later + when it suports linux futexes or atomic operations */ + +//#include <pthread.h> + +/* private linked list struct */ +typedef struct _fastalloc +{ + struct _fastalloc * next; +} t_fastalloc; + + + + +static void _pdp_fastalloc_lock(t_pdp_fastalloc *x){pthread_mutex_lock(&x->mut);} +static void _pdp_fastalloc_unlock(t_pdp_fastalloc *x){pthread_mutex_unlock(&x->mut);} + +static void _pdp_fastalloc_refill_freelist(t_pdp_fastalloc *x) +{ + t_fastalloc *atom; + unsigned int i; + + PDP_ASSERT(x->freelist == 0); + + /* get a new block + there is no means of freeing the data afterwards, + this is a fast implementation with the tradeoff of data + fragmentation "memory leaks".. */ + + x->freelist = pdp_alloc(x->block_elements * x->atom_size); + + /* link all atoms together */ + atom = x->freelist; + for (i=0; i<x->block_elements-1; i++){ + atom->next = (t_fastalloc *)(((char *)atom) + x->atom_size); + atom = atom->next; + } + atom->next = 0; + +} + +void *pdp_fastalloc_new_atom(t_pdp_fastalloc *x) +{ + t_fastalloc *atom; + + _pdp_fastalloc_lock(x); + + /* get an atom from the freelist + or refill it and try again */ + while (!(atom = x->freelist)){ + _pdp_fastalloc_refill_freelist(x); + } + + /* delete the element from the freelist */ + x->freelist = x->freelist->next; + atom->next = 0; + + _pdp_fastalloc_unlock(x); + + return (void *)atom; + +} +void pdp_fastalloc_save_atom(t_pdp_fastalloc *x, void *atom) +{ + _pdp_fastalloc_lock(x); + ((t_fastalloc *)atom)->next = x->freelist; + x->freelist = (t_fastalloc *)atom; + _pdp_fastalloc_unlock(x); +} + +t_pdp_fastalloc *pdp_fastalloc_new(unsigned int size) +{ + t_pdp_fastalloc *x = pdp_alloc(sizeof(*x)); + if (size < sizeof(t_fastalloc)) size = sizeof(t_fastalloc); + x->freelist = 0; + x->atom_size = size; + x->block_elements = PDP_FASTALLOC_BLOCK_ELEMENTS; + pthread_mutex_init(&x->mut, NULL); + return x; +} + diff --git a/system/kernel/pdp_packet2.c b/system/kernel/pdp_packet2.c new file mode 100644 index 0000000..3717a77 --- /dev/null +++ b/system/kernel/pdp_packet2.c @@ -0,0 +1,623 @@ +/* + * Pure Data Packet system implementation: Packet Manager + * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + + +#include <stdio.h> +#include <pthread.h> +#include <unistd.h> +#include <string.h> +#include "pdp_post.h" +#include "pdp_packet.h" +#include "pdp_mem.h" +#include "pdp_list.h" +#include "pdp_type.h" +#include "pdp_debug.h" + + +/* packet implementation. contains class and packet (instance) handling + + some notes on packet operations. + copy ro/rw and unregister are relatively straightforward + packet creation can be done in 2 ways in this interface: + create + reuse + however, these methods should only be called by specific factory + methods, so the user should only create packets using pdp_factory_newpacket + + reuse or create is thus the responsability of the factory methods for + each packet type (class) implementation + + +*/ + + +/* NOTE: + the packet pool methods are called within the pool locks. this probably + needs to change, because it will cause deadlocks for container packets (fobs) */ + + +/* new implementation: probably just a minor adjustment: add the reuse fifo attached + to type desc symbol name + need to check and possibly eliminate hacks for non-pure packets + + pdp_packet_new: + LOCK + 1. check reuse fifo + 2. empty -> create packet+return (search array) + 3. element -> check if type is correct, yes->pop+return, no->goto 1. + UNLOCK + 4. wakeup + + pdp_packet_mark_unused + + 1. check refcount. if > 1 dec + exit + 2. if 1 put packet to sleep + 3. dec refcount + 4. add to reuse fifo (no fifo -> create) + + pdp_packet_delete: analogous to mark_unused + pdp_packet_copy_ro/rw: analogous to new + +*/ + + +/* the pool */ +#define PDP_INITIAL_POOL_SIZE 64 +static int pdp_pool_size; +static t_pdp** pdp_pool; + +/* mutex: protects the pool and reuse lists attached to symbols */ +static pthread_mutex_t pdp_pool_mutex; +#define LOCK pthread_mutex_lock (&pdp_pool_mutex) +#define UNLOCK pthread_mutex_unlock (&pdp_pool_mutex) + +/* the list of classes */ +static t_pdp_list *class_list; + +/* debug */ +void +pdp_packet_print_debug(int packet) +{ + t_pdp *h = pdp_packet_header(packet); + pdp_post("debug info for packet %d", packet); + if (!h){ + pdp_post("invalid packet"); + } + else{ + pdp_post ("\ttype: %d", h->type); + pdp_post ("\tdesc: %s", h->desc ? h->desc->s_name : "unknown"); + pdp_post ("\tsize: %d", h->size); + pdp_post ("\tflags: %x", h->flags); + pdp_post ("\tusers: %d", h->users); + pdp_post ("\tclass: %x", h->theclass); + } +} + + + +/* setup methods */ + +void +pdp_packet_setup(void) +{ + + pdp_pool_size = PDP_INITIAL_POOL_SIZE; + pdp_pool = (t_pdp **)pdp_alloc(PDP_INITIAL_POOL_SIZE * sizeof(t_pdp *)); + bzero(pdp_pool, pdp_pool_size * sizeof(t_pdp *)); + class_list = pdp_list_new(0); + pthread_mutex_init(&pdp_pool_mutex, NULL); +} + +/* class methods */ +t_pdp_class *pdp_class_new(t_pdp_symbol *type, t_pdp_factory_method create){ + t_pdp_class *c = (t_pdp_class *)pdp_alloc(sizeof(t_pdp_class)); + memset(c, 0, sizeof(t_pdp_class)); + c->create = create; + c->type = type; // set type + pdp_list_add(class_list, a_pointer, (t_pdp_word)((void *)c)); + return c; +} + +/* the packet factory */ +int pdp_factory_newpacket(t_pdp_symbol *type) +{ + int p; + t_pdp_class *c; + t_pdp_atom *a = class_list->first; + + /* try to reuse first + THINK: should this be the responsability of the type specific constructors, + or should a packet allways be reusable (solution: depends on what the cleanup method returns??) + */ + p = pdp_packet_reuse(type); + if (-1 != p) return p; + + + /* call class constructor */ + while(a){ + c = (t_pdp_class *)(a->w.w_pointer); + if (c->type && pdp_type_description_match(type, c->type)){ + //pdp_post("method %x, type %s", c->create, type->s_name); + return (c->create) ? (*c->create)(type) : -1; + } + a = a->next; + } + return -1; +} + +static void +_pdp_pool_expand_nolock(void){ + int i; + + /* double the size */ + int new_pool_size = pdp_pool_size << 1; + t_pdp **new_pool = (t_pdp **)pdp_alloc(new_pool_size * sizeof(t_pdp *)); + bzero(new_pool, new_pool_size * sizeof(t_pdp *)); + memcpy(new_pool, pdp_pool, pdp_pool_size * sizeof(t_pdp *)); + pdp_dealloc(pdp_pool); + pdp_pool = new_pool; + pdp_pool_size = new_pool_size; +} + + + + +/* private _pdp_packet methods */ + +/* packets can only be created and destroyed using these 2 methods */ +/* it updates the mem usage and total packet count */ + +static void +_pdp_packet_dealloc_nolock(t_pdp *p) +{ + /* free memory */ + pdp_dealloc (p); +} + +static t_pdp* +_pdp_packet_alloc_nolock(unsigned int datatype, unsigned int datasize) +{ + unsigned int totalsize = datasize + PDP_HEADER_SIZE; + t_pdp *p = (t_pdp *)pdp_alloc(totalsize); + if (p){ + memset(p, 0, PDP_HEADER_SIZE); //initialize header to 0 + p->type = datatype; + p->size = totalsize; + p->users = 1; + } + return p; +} + + +/* create a new packet and expand pool if necessary */ +static int +_pdp_packet_create_nolock(unsigned int datatype, unsigned int datasize) +{ + int p = 0; + while(1){ + for (; p < pdp_pool_size; p++){ + if (!pdp_pool[p]){ + /* found slot to store packet*/ + t_pdp *header = _pdp_packet_alloc_nolock(datatype, datasize); + if (!header) return -1; // error allocating packet + pdp_pool[p] = header; + return p; + } + } + /* no slot found, expand pool */ + _pdp_pool_expand_nolock(); + } +} + + +void +pdp_packet_destroy(void) +{ + int i = 0; + /* dealloc all the data in object stack */ + pdp_post("DEBUG: pdp_packet_destroy: clearing object pool."); + while ((i < pdp_pool_size) && (pdp_pool[i])) _pdp_packet_dealloc_nolock(pdp_pool[i++]); +} + + + + + + + + +/* public pool operations: have to be thread safe so each entry point + locks the mutex */ + + +/* create a new packet. + this should only be used by type specific factory methods, and only if the + reuse method fails, since it will always create a new packet */ +int +pdp_packet_create(unsigned int datatype, unsigned int datasize /*without header*/) +{ + int packet; + LOCK; + packet = _pdp_packet_create_nolock(datatype, datasize); + UNLOCK; + return packet; +} + + +/* return a new packet. + it tries to reuse a packet based on + 1. matching data size + 2. abscence of destructor (which SHOULD mean there are no enclosed references) + + it obviously can't use the reuse fifo tagged to a symbolic type description + + ALWAYS USE pdp_packet_reuse BEFORE calling pdp_packet_new if possible + use both ONLY IN CONSTRUCTORS !!! + + use pdp_packet_factory to create packets as a "user" + + this is a summary of all internal packet creation mechanisms: + + -> pdp_packet_reuse, which uses symbolic type descriptions, and should work for all packet types + it returns an initialized container (meta = correct, data = garbage) + + -> pdp_packet_new, which only works for non-pure packets, and reuses packets based on data type + it returns a pure packet (meta + data = garbage) + + -> pdp_packet_create, like pdp_packet_new, only it always creates a new packet + + + +*/ + +int +pdp_packet_new(unsigned int datatype, unsigned int datasize) +{ + t_pdp *header; + int packet; + LOCK; + for (packet = 0; packet < pdp_pool_size; packet++){ + header = pdp_pool[packet]; + /* check data size */ + if (header + && header->users == 0 + && header->size == datasize + PDP_HEADER_SIZE + && !(header->theclass && header->theclass->cleanup)){ + + /* ok, got one. initialize */ + memset(header, 0, PDP_HEADER_SIZE); + header->users = 1; + header->type = datatype; + header->size = datasize + PDP_HEADER_SIZE; + + UNLOCK; //EXIT1 + return packet; + } + } + + /* no usable non-pure packet found, create a new one */ + + UNLOCK; //EXIT2 + return pdp_packet_create(datatype, datasize); + + + +} + + +/* internal method to add a packet to a packet type + description symbol's unused packet fifo */ +void +_pdp_packet_save_nolock(int packet) +{ + t_pdp *header = pdp_packet_header(packet); + t_pdp_symbol *s; + PDP_ASSERT(header); + PDP_ASSERT(header->users == 0); + PDP_ASSERT(header->desc); + s = header->desc; + if (!s->s_reusefifo) s->s_reusefifo = pdp_list_new(0); + pdp_list_add(s->s_reusefifo, a_packet, (t_pdp_word)packet); +} + +/* this will revive a packet matching a certain type description + no wildcards are allowed */ +int +pdp_packet_reuse(t_pdp_symbol *type_description) +{ + int packet = -1; + t_pdp *header = 0; + t_pdp_list *l = 0; + LOCK; + if (!type_description || !(l = type_description->s_reusefifo)) goto exit; + while(l->elements){ + packet = pdp_list_pop(l).w_packet; + header = pdp_packet_header(packet); + + /* check if reuse fifo is consistent (packet unused + correct type) + packet could be deleted and replaced with another one, or + revived without the index updated (it's a "hint cache") */ + + if (header->users == 0){ + /* check if type matches */ + if (pdp_type_description_match(header->desc, type_description)){ + header->users++; // revive + goto exit; + } + /* if not, add the packet to the correct reuse fifo */ + else{ + _pdp_packet_save_nolock(packet); + } + } + + /* remove dangling refs */ + header = 0; + packet = -1; + } + + exit: + UNLOCK; + if (header && header->theclass && header->theclass->wakeup){ + header->theclass->wakeup(header); // revive if necessary + } + return packet; +} + +/* find all unused packets in pool, marked as used (to protect from other reapers) + and return them as a list. non-pure packets are not revived */ + + + + + +/* this returns a copy of a packet for read only access. + (increases refcount of the packet -> packet will become readonly if it was + writable, i.e. had rc=1 */ + +int +pdp_packet_copy_ro(int handle) +{ + t_pdp* header; + + if (header = pdp_packet_header(handle)){ + PDP_ASSERT(header->users); // consistency check + LOCK; + header->users++; // increment reference count + UNLOCK; + } + else handle = -1; + return handle; +} + +/* clone a packet: create a new packet with the same + type as the source packet */ + +int +pdp_packet_clone_rw(int handle) +{ + t_pdp* header; + int new_handle = -1; + + + if (header = pdp_packet_header(handle)){ + /* consistency checks */ + PDP_ASSERT(header->users); + PDP_ASSERT(header->desc); + + /* first try to reuse old packet */ + new_handle = pdp_packet_reuse(header->desc); + + /* if this failed, create a new one using the central packet factory method */ + if (-1 == new_handle) new_handle = pdp_factory_newpacket(header->desc); + } + + return new_handle; +} + +/* return a copy of a packet (clone + copy data) */ +int +pdp_packet_copy_rw(int handle) +{ + t_pdp *header, *new_header; + int new_handle = -1; + + if (!(header = pdp_packet_header(handle))) return -1; + + /* check if we are allowed to copy */ + if (header->flags & PDP_FLAG_DONOTCOPY) return -1; + + /* get target packet */ + new_handle = pdp_packet_clone_rw(handle); + if (-1 == new_handle) return -1; + new_header = pdp_packet_header(new_handle); + + /* if there is a copy method, use that one */ + if (header->theclass && header->theclass->copy){ + header->theclass->copy(header, new_header); + } + + /* otherwize copy the data verbatim */ + else { + memcpy(pdp_packet_data(new_handle), + pdp_packet_data(handle), + pdp_packet_data_size(handle)); + } + + return new_handle; + +} + + +/* decrement refcount */ +void pdp_packet_mark_unused(int handle) +{ + t_pdp *header; + if (!(header = pdp_packet_header(handle))) return; + + PDP_ASSERT(header->users); // consistency check + + LOCK; + + /* just decrement refcount */ + if (header->users > 1){ + header->users--; + } + + /* put packet to sleep if refcount 1->0 */ + else { + if (header->theclass && header->theclass->sleep){ + /* call sleep method (if any) outside of lock + while the packet is still alive, so it won't be + acclaimed by another thread */ + UNLOCK; + header->theclass->sleep(header); + LOCK; + } + /* clear refcount & save in fifo for later use */ + header->users = 0; + if (header->desc) // sleep could have destructed packet.. + _pdp_packet_save_nolock(handle); + } + + UNLOCK; +} + + + +/* delete a packet. rc needs to be == 1 */ +void pdp_packet_delete(int handle) +{ + t_pdp *header; + header = pdp_packet_header(handle); + PDP_ASSERT(header); + PDP_ASSERT(header->users == 1); // consistency check + + LOCK; + + if (header->theclass && header->theclass->cleanup){ + /* call cleanup method (if any) outside of lock + while the packet is still alive, so it won't be + acclaimed by another thread */ + UNLOCK; + header->theclass->cleanup(header); + LOCK; + } + + /* delete the packet */ + pdp_pool[handle] = 0; + _pdp_packet_dealloc_nolock(header); + + + UNLOCK; +} + + + + + + + +/* public data access methods */ + +t_pdp* +pdp_packet_header(int handle) +{ + if ((handle >= 0) && (handle < pdp_pool_size)) return pdp_pool[handle]; + else return 0; +} + +void* +pdp_packet_subheader(int handle) +{ + t_pdp* header = pdp_packet_header(handle); + if (!header) return 0; + return (void *)(&header->info.raw); +} + +void* +pdp_packet_data(int handle) +{ + t_pdp *h; + if ((handle >= 0) && (handle < pdp_pool_size)) + { + h = pdp_pool[handle]; + if (!h) return 0; + return (char *)(h) + PDP_HEADER_SIZE; + } + else return 0; +} + +int +pdp_packet_data_size(int handle) +{ + t_pdp *h; + if ((handle >= 0) && (handle < pdp_pool_size)) + { + h = pdp_pool[handle]; + if (!h) return 0; + return h->size - PDP_HEADER_SIZE; + } + else return 0; +} + + + + +int pdp_packet_writable(int packet) /* returns true if packet is writable */ +{ + t_pdp *h = pdp_packet_header(packet); + if (!h) return 0; + return (h->users == 1); +} + +void pdp_packet_replace_with_writable(int *packet) /* replaces a packet with a writable copy */ +{ + int new_p; + if (!pdp_packet_writable(*packet)){ + new_p = pdp_packet_copy_rw(*packet); + pdp_packet_mark_unused(*packet); + *packet = new_p; + } + +} + +/* pool stuff */ + +int +pdp_pool_collect_garbage(void) +{ + pdp_post("ERROR: garbage collector not implemented"); + return 0; +} + +void +pdp_pool_set_max_mem_usage(int max) +{ + pdp_post("ERROR: mem limit not implemented"); +} + + + + + + +#ifdef __cplusplus +} +#endif diff --git a/system/kernel/pdp_post.c b/system/kernel/pdp_post.c new file mode 100644 index 0000000..fb761d0 --- /dev/null +++ b/system/kernel/pdp_post.c @@ -0,0 +1,48 @@ + +/* + * Pure Data Packet system file. pdp logging. + * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include <stdlib.h> +#include <stdio.h> +#include <stdarg.h> +#include <string.h> +#include <errno.h> +#include "pdp_post.h" + +/* list printing should be moved here too */ + +/* write a message to log (console) */ +void pdp_post_n(char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); +} +void pdp_post(char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + putc('\n', stderr); +} + + diff --git a/system/kernel/pdp_symbol.c b/system/kernel/pdp_symbol.c new file mode 100644 index 0000000..32e9e33 --- /dev/null +++ b/system/kernel/pdp_symbol.c @@ -0,0 +1,196 @@ +/* + * Pure Data Packet system implementation. : code implementing pdp's namespace (symbols) + * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include <string.h> +#include <pthread.h> +#include "pdp_symbol.h" +#include "pdp_list.h" +#include "pdp_debug.h" + +// some extra prototypes +void *pdp_alloc(int size); +void pdp_dealloc(void *data); + +// the symbol hash mutex +static pthread_mutex_t pdp_hash_mutex; + +#define HASHSIZE 1024 +static t_pdp_symbol *pdp_symhash[HASHSIZE]; + + +#define LOCK pthread_mutex_lock(&pdp_hash_mutex) +#define UNLOCK pthread_mutex_unlock(&pdp_hash_mutex) + + +static void _pdp_symbol_init(t_pdp_symbol *s) +{ + memset(s, 0, sizeof(*s)); + s->s_forth.t = a_undef; +} + + +/* shamelessly copied from pd src and made thread safe */ +t_pdp_symbol *_pdp_dogensym(char *s, t_pdp_symbol *oldsym) +{ + t_pdp_symbol **sym1, *sym2; + unsigned int hash1 = 0, hash2 = 0; + int length = 0; + char *s2 = s; + while (*s2) + { + hash1 += *s2; + hash2 += hash1; + length++; + s2++; + } + sym1 = pdp_symhash + (hash2 & (HASHSIZE-1)); + + /* lock hash */ + LOCK; + + while (sym2 = *sym1) + { + if (!strcmp(sym2->s_name, s)) goto gotit; + sym1 = &sym2->s_next; + } + if (oldsym){ + sym2 = oldsym; + } + else + { + sym2 = (t_pdp_symbol *)pdp_alloc(sizeof(*sym2)); + _pdp_symbol_init(sym2); + sym2->s_name = pdp_alloc(length+1); + sym2->s_next = 0; + strcpy(sym2->s_name, s); + } + *sym1 = sym2; + + gotit: + + /* unlock hash */ + UNLOCK; + return (sym2); +} + +t_pdp_symbol *pdp_gensym(char *s) +{ + return(_pdp_dogensym(s, 0)); +} + + +/* connect a parsed typelist to a symbol type name + 1 = succes, 0 = error (symbol already connected) */ +int pdp_symbol_set_typelist(t_pdp_symbol *s, t_pdp_list *typelist) +{ + int status = 0; + LOCK; + if (!s->s_type){ + s->s_type = typelist; + status = 1; + } + UNLOCK; + return status; +} + + +void pdp_symbol_apply_all(t_pdp_symbol_iterator it) +{ + int i; + for (i=0; i<HASHSIZE; i++){ + t_pdp_symbol *s; + for (s = pdp_symhash[i]; s; s=s->s_next){ + it(s); + } + + } +} + +t_pdp_symbol _pdp_sym_wildcard; +t_pdp_symbol _pdp_sym_float; +t_pdp_symbol _pdp_sym_int; +t_pdp_symbol _pdp_sym_symbol; +t_pdp_symbol _pdp_sym_packet; +t_pdp_symbol _pdp_sym_pointer; +t_pdp_symbol _pdp_sym_invalid; +t_pdp_symbol _pdp_sym_list; +t_pdp_symbol _pdp_sym_question_mark; +t_pdp_symbol _pdp_sym_atom; +t_pdp_symbol _pdp_sym_null; +t_pdp_symbol _pdp_sym_quote_start; +t_pdp_symbol _pdp_sym_quote_end; +t_pdp_symbol _pdp_sym_return; +t_pdp_symbol _pdp_sym_nreturn; +t_pdp_symbol _pdp_sym_defstart; +t_pdp_symbol _pdp_sym_defend; +t_pdp_symbol _pdp_sym_if; +t_pdp_symbol _pdp_sym_then; +t_pdp_symbol _pdp_sym_local; +t_pdp_symbol _pdp_sym_forth; +t_pdp_symbol _pdp_sym_call; +t_pdp_symbol _pdp_sym_push; +t_pdp_symbol _pdp_sym_pop; + +static void _sym(char *name, t_pdp_symbol *s) +{ + t_pdp_symbol *realsym; + _pdp_symbol_init(s); + s->s_name = name; + realsym = _pdp_dogensym(name, s); + PDP_ASSERT(realsym == s); // if this fails, the symbol was already defined +} + +void pdp_symbol_setup(void) +{ + // create mutexes + pthread_mutex_init(&pdp_hash_mutex, NULL); + + // init symbol hash + memset(pdp_symhash, 0, HASHSIZE * sizeof(t_pdp_symbol *)); + + // setup predefined symbols (those that have direct pointer access for speedup) + _sym("*", &_pdp_sym_wildcard); + _sym("float", &_pdp_sym_float); + _sym("int", &_pdp_sym_int); + _sym("symbol", &_pdp_sym_symbol); + _sym("packet", &_pdp_sym_packet); + _sym("pointer", &_pdp_sym_pointer); + _sym("invalid", &_pdp_sym_invalid); + _sym("list", &_pdp_sym_list); + _sym("?", &_pdp_sym_question_mark); + _sym("atom", &_pdp_sym_atom); + _sym("null", &_pdp_sym_null); + _sym("[", &_pdp_sym_quote_start); + _sym("]", &_pdp_sym_quote_end); + _sym("ret", &_pdp_sym_return); + _sym("nret", &_pdp_sym_nreturn); + _sym(":", &_pdp_sym_defstart); + _sym(";", &_pdp_sym_defend); + _sym("if", &_pdp_sym_if); + _sym("then", &_pdp_sym_then); + _sym("local", &_pdp_sym_local); + _sym("forth", &_pdp_sym_forth); + _sym("call", &_pdp_sym_call); + _sym("push", &_pdp_sym_push); + _sym("pop", &_pdp_sym_pop); + +} + + diff --git a/system/net/Makefile b/system/net/Makefile new file mode 100644 index 0000000..53d1d61 --- /dev/null +++ b/system/net/Makefile @@ -0,0 +1,11 @@ + +OBJECTS = pdp_net.o + + +include ../../Makefile.config + +all: $(OBJECTS) + +clean: + rm -f *~ + rm -f *.o diff --git a/system/net/pdp_net.c b/system/net/pdp_net.c new file mode 100644 index 0000000..04c97d6 --- /dev/null +++ b/system/net/pdp_net.c @@ -0,0 +1,685 @@ + +#include "pdp_net.h" +#include "pdp_debug.h" +#include "pdp_post.h" +#include "pdp_mem.h" + +#define D if (0) // DEBUG MSG +#define DD if (0) // DROP DEBUG MSG + +/* shared internals */ + +static int _is_udp_header(t_pdp_udp_header *header, unsigned int size) +{ + if (size < sizeof(t_pdp_udp_header)) return 0; + if (strcmp(header->signature, "PDP")) return 0; + if (PDP_UDP_VERSION != header->version) return 0; + return 1; +} + +static void _make_udp_header(t_pdp_udp_header *header) +{ + strcpy(header->signature, "PDP"); + header->version = PDP_UDP_VERSION; +} + + + + +/* R E C E I V E R */ + + +/* INTERNALS */ + +static void _send_packet(t_pdp_udp_receiver *x) +{ + _make_udp_header(&x->x_resend_header); + PDP_ASSERT(x->x_resend_udp_packet_size <= sizeof(t_pdp_udp_header) + sizeof(x->x_resend_chunks)); + + /* send the packet */ + if (-1 == sendto (x->x_socket, &x->x_resend_header, x->x_resend_udp_packet_size, 0, + (struct sockaddr *)&x->x_source_socket, x->x_sslen)){ + pdp_post("pdp_netreceive: send failed"); + } +} + +static void _send_ack_new(t_pdp_udp_receiver *x) +{ + /* setup resend header */ + x->x_resend_header.connection_id = x->x_connection_id; + x->x_resend_header.sequence_number = PDP_UDP_ACKNEW; + x->x_resend_udp_packet_size = sizeof(t_pdp_udp_header); + + _send_packet(x); + +} + + +static int _handle_PDP_UDP_NEW(t_pdp_udp_receiver *x) +{ + /* we've got a PDP_UDP_NEW packet, so prepare to receive the data */ + t_pdp_udp_newpacket *np = (t_pdp_udp_newpacket *)x->x_buf; + + + //pdp_post("conn_id = %x", x->x_header.connection_id); + //pdp_post("size = %d", np->data_size); + //pdp_post("nb_chunks = %d", np->nb_chunks); + //pdp_post("chunk_size = %d", np->chunk_size); + //pdp_post("type = %s", np->type); + + /* check if it is a resend of the PDP_UDP_NEW packet (if NEW_ACK didn't get through) + if not, prepare for reception */ + + if (x->x_connection_id != x->x_header.connection_id){ + + + /* prepare for reception : TODO add some more checks here */ + + // setup type info + if (x->x_data_type) pdp_dealloc (x->x_data_type); + x->x_data_type = pdp_alloc(1 + strlen(np->type)); + strcpy(x->x_data_type, np->type); + + // setup data buffer + x->x_data_size = np->data_size; + if (x->x_data) pdp_dealloc (x->x_data); + x->x_data = pdp_alloc(x->x_data_size); + memset(x->x_data, 0, x->x_data_size); // clear for debug + + // setup connection info + x->x_connection_id = x->x_header.connection_id; + x->x_nb_chunks = np->nb_chunks; + x->x_chunk_size = np->chunk_size; + + /* setup chunk list */ + if (x->x_chunk_list) pdp_dealloc(x->x_chunk_list); + x->x_chunk_list = pdp_alloc(sizeof(unsigned int)*x->x_nb_chunks); + memset(x->x_chunk_list, 0, sizeof(unsigned int)*x->x_nb_chunks); + + x->x_receive_finished = 0; // we're in a receiving state + x->x_packet_transferred = 0; // we didn't pass the packet yet + } + + /* send ACK */ + _send_ack_new(x); + + + + return 1; +} + +static void _handle_PDP_UDP_DONE(t_pdp_udp_receiver *x) +{ + unsigned int chunk; + unsigned int missing; + unsigned int i; + unsigned int resend_packet_size; + + + /* check the connection id */ + if (x->x_connection_id != x->x_header.connection_id) return; + + /* determine how many packets are missing */ + missing = 0; + for (i=0; i<x->x_nb_chunks; i++) + if (!x->x_chunk_list[i]) missing++; + + D pdp_post ("last packet %x had %d/%d dropped chunks", x->x_connection_id, missing, x->x_nb_chunks); + + + /* build the resend request (chunk list )*/ + if (missing > RESEND_MAX_CHUNKS) missing = RESEND_MAX_CHUNKS; + chunk = 0; + i = missing; + while(i--){ + while (x->x_chunk_list[chunk]) chunk++; // find next missing chunk + x->x_resend_chunks[i] = chunk++; // store it in list + } + + /* set the packet size to include the list */ + x->x_resend_udp_packet_size = sizeof(t_pdp_udp_header) + + missing * sizeof(unsigned int); + + /* setup resend header */ + strcpy((char *)&x->x_resend_header, "PDP"); + x->x_resend_header.version = PDP_UDP_VERSION; + x->x_resend_header.connection_id = x->x_connection_id; + x->x_resend_header.sequence_number = PDP_UDP_RESEND; + + D pdp_post("pdp_netreceive: sending RESEND response for %u chunks", missing); + + /* send out */ + _send_packet(x); + + /* indicate we're done if there's no chunks missing */ + if (!missing) x->x_receive_finished = 1; + +} + + +static int _handle_UDP_DATA(t_pdp_udp_receiver *x) +{ + unsigned int seq = x->x_header.sequence_number; + unsigned int offset = x->x_chunk_size * seq; + + /* ignore the packet if we're not expecting it */ + if ((!x->x_connection_id) || (x->x_connection_id != x->x_header.connection_id)){ + //pdp_post("pdp_netreceive: got invalid data packet: transmission id %x is not part of current transmisson %x", + // x->x_header.connection_id, x->x_connection_id); + return 0; + } + + /* check if it is valid */ + if (seq >= x->x_nb_chunks){ + pdp_post("pdp_netreceive: got invalid data packet: sequence number %u out of bound (nb_chunks=%u)", + seq, x->x_nb_chunks); + return 0; + } + + /* final check */ + PDP_ASSERT(offset + x->x_buf_size <= x->x_data_size); + + /* write & log it */ + memcpy(x->x_data + offset, x->x_buf, x->x_buf_size); + x->x_chunk_list[seq] = 1; + return 1; + +} + +/* INTERFACE */ + +/* setup */ +t_pdp_udp_receiver *pdp_udp_receiver_new(int port) +{ + t_pdp_udp_receiver *x = pdp_alloc(sizeof(*x)); + memset(x, 0, sizeof(*x)); + + /* init */ + x->x_data = 0; + x->x_data_type = 0; + x->x_data_size = 0; + x->x_chunk_list = 0; + x->x_receive_finished = 0; + x->x_packet_transferred = 0; + x->x_zero_terminator = 0; + + x->x_socket = socket(PF_INET, SOCK_DGRAM, 0); + x->x_connection_id = 0; /* zero for bootstrap (0 == an invalid id) */ + x->x_sslen = sizeof(struct sockaddr_in); + + /* bind socket */ + x->x_sa.sin_port = htons(port); + x->x_sa.sin_addr.s_addr = 0; + if (-1 != bind (x->x_socket, (struct sockaddr *)&x->x_sa, + sizeof(struct sockaddr_in))) return x; + + /* suicide if find failed */ + else { + pdp_dealloc(x); + return 0; + } +} +void pdp_udp_receiver_free(t_pdp_udp_receiver *x) +{ + if (!x) return; + if (x->x_socket != 1) close (x->x_socket); + if (x->x_data) pdp_dealloc(x->x_data); + if (x->x_data_type) pdp_dealloc (x->x_data_type); + if (x->x_chunk_list) pdp_dealloc (x->x_chunk_list); +} + +void pdp_udp_receiver_reset(t_pdp_udp_receiver *x) +{ + x->x_connection_id = 0; +} + + +/* receive loop, returns 1 on success, -1 on error, 0 on timeout */ +int pdp_udp_receiver_receive(t_pdp_udp_receiver *x, unsigned int timeout_ms) +{ + /* listen for packets */ + + unsigned int size; + struct timeval tv = {0,1000 * timeout_ms}; + fd_set inset; + FD_ZERO(&inset); + FD_SET(x->x_socket, &inset); + switch(select (x->x_socket+1, &inset, NULL, NULL, &tv)){ + case -1: + return -1; /* select error */ + case 0: + return 0; /* select time out */ + default: + break; /* data ready */ + } + + /* this won't block, since there's data available */ + if (-1 == (int)(size = recvfrom(x->x_socket, (void *)&x->x_header, + PDP_UDP_BUFSIZE+sizeof(x->x_header), 0, + (struct sockaddr *)&x->x_source_socket, &x->x_sslen))) return -1; + + /* store the data size of the packet */ + x->x_buf_size = size - sizeof(t_pdp_udp_header); + + /* parse the udp packet */ + if (_is_udp_header(&x->x_header, size)){ + + /* it is a control packet */ + if ((int)x->x_header.sequence_number < 0){ + + switch (x->x_header.sequence_number){ + case PDP_UDP_NEW: + _handle_PDP_UDP_NEW(x); + break; + + case PDP_UDP_DONE: + _handle_PDP_UDP_DONE(x); + + /* check if we got a complete packet + and signal arrival if we haven't done this already */ + if (x->x_receive_finished && !x->x_packet_transferred){ + x->x_packet_transferred = 1; + return 1; // data complete, please receive + } + break; + + default: + pdp_post("got unknown msg"); + break; + } + } + + /* it is a data packet */ + else { + _handle_UDP_DATA(x); + } + + + } + + else { + pdp_post("pdp_netreceive: got invalid UDP packet (size = %d)", size); + } + + return 0; //no major event, please poll again + +} + +/* get meta & data */ +char *pdp_udp_receiver_type(t_pdp_udp_receiver *x){return x->x_data_type;} +unsigned int pdp_udp_receiver_size(t_pdp_udp_receiver *x){return x->x_data_size;} +void *pdp_udp_receiver_data(t_pdp_udp_receiver *x){return x->x_data;} + + +/* S E N D E R */ + +/* INTERNALS */ + +static void _sleep(t_pdp_udp_sender *x) +{ + int sleep_period = x->x_sleep_period; + + if (sleep_period) { + if (!x->x_sleep_count++) usleep(x->x_sleepgrain_us); + x->x_sleep_count %= sleep_period; + } +} + +static void _send(t_pdp_udp_sender *x) +{ + //post("sending %u data bytes", x->x_buf_size); + + _make_udp_header(&x->x_header); + + PDP_ASSERT (x->x_buf_size <= PDP_UDP_BUFSIZE); + + if (-1 == sendto (x->x_socket, &x->x_header, x->x_buf_size + sizeof(t_pdp_udp_header), + 0, (struct sockaddr *)&x->x_sa, sizeof(struct sockaddr_in))) + pdp_post("pdp_netsend: send FAILED"); + + _sleep(x); + +} + + +static void _prepare_for_new_transmission(t_pdp_udp_sender *x, char *type, unsigned int size, void *data) +{ + unsigned int i; + + /* setup data for transmission */ + x->x_data_type = type; + x->x_data_size = size; + x->x_data = data; + x->x_chunk_size = x->x_udp_payload_size; + x->x_nb_chunks = (x->x_data_size - 1) / x->x_chunk_size + 1; + + /* generate a connection id (non-zero) */ + while (!(x->x_connection_id = rand())); + + /* setup chunk list to contain all chunks */ + if (x->x_chunk_list) free (x->x_chunk_list); + x->x_chunk_list_size = x->x_nb_chunks; + x->x_chunk_list = malloc(sizeof(unsigned int)*x->x_chunk_list_size); + for (i=0; i<x->x_chunk_list_size; i++) x->x_chunk_list[i] = i; + +} + +static void _send_header_packet(t_pdp_udp_sender *x) +{ + t_pdp_udp_newpacket *np = (t_pdp_udp_newpacket *)x->x_buf; /* buf contains the PDP_UDP_NEW body */ + + /* init packet */ + x->x_header.sequence_number = PDP_UDP_NEW; + x->x_header.connection_id = x->x_connection_id; + np->data_size = x->x_data_size; + np->nb_chunks = x->x_nb_chunks; + np->chunk_size = x->x_chunk_size; + strcpy(np->type, x->x_data_type); + x->x_buf_size = sizeof(*np) + strlen(np->type) + 1; + PDP_ASSERT(x->x_buf_size <= PDP_UDP_BUFSIZE); + + /* send the packet */ + _send(x); +} + +/* saend the chunks in the chunk list */ +static void _send_chunks(t_pdp_udp_sender *x){ + unsigned int i; + unsigned int count = 0; + + /* send chunks: this requires header is setup ok (sig,ver,connid)*/ + for (i=0; i<x->x_chunk_list_size; i++){ + unsigned int offset; + unsigned int current_chunk_size; + unsigned int seq = x->x_chunk_list[i]; + + PDP_ASSERT(seq < x->x_nb_chunks); + x->x_header.sequence_number = seq; // store chunk number + + /* get current chunk offset */ + offset = seq * x->x_chunk_size; + PDP_ASSERT(offset < x->x_data_size); + + + /* get current chunk size */ + current_chunk_size = (offset + x->x_chunk_size > x->x_data_size) ? + (x->x_data_size - offset) : x->x_chunk_size; + x->x_buf_size = current_chunk_size; + PDP_ASSERT(x->x_buf_size <= PDP_UDP_BUFSIZE); + + /* copy chunk to transmission buffer & send */ + PDP_ASSERT(offset + current_chunk_size <= x->x_data_size); + memcpy(x->x_buf, x->x_data + offset, current_chunk_size); + + + /* send the chunk */ + _send(x); + count++; + + } + D pdp_post("sent %d chunks, id=%x", count,x->x_connection_id); +} + +/* send a DONE packet */ +static void _send_done(t_pdp_udp_sender *x){ + x->x_header.sequence_number = PDP_UDP_DONE; + x->x_buf_size = 0; + _send(x); +} +static int _receive_packet(t_pdp_udp_sender *x, int desired_type) +/* 0 == timeout, -1 == error, 1 == got packet */ +{ + unsigned int size; + int type; + + struct timeval tv; + fd_set inset; + int sr; + + + while (1){ + int retval; + + /* wait for incoming */ + tv.tv_sec = 0; + tv.tv_usec = x->x_timeout_us; + FD_ZERO(&inset); + FD_SET(x->x_socket, &inset); + switch (select (x->x_socket+1, &inset, NULL, NULL, &tv)){ + case -1: + return -1; /* select error */ + case 0: + return 0; /* select time out */ + default: + break; /* data ready */ + } + + /* read packet */ + if (-1 == (int)(size = recv(x->x_socket, (void *)&x->x_resend_header, MAX_UDP_PACKET, 0))){ + pdp_post("pdp_netsend: error while reading from socket"); + return -1; + } + + /* check if it is a valid PDP_UDP packet */ + if (!_is_udp_header(&x->x_resend_header, size)){ + pdp_post("pdp_netsend: ignoring invalid UDP packet (size = %u)", size); + continue; + } + + + /* check connection id */ + if (x->x_connection_id != x->x_resend_header.connection_id){ + D pdp_post("pdp_netsend: ignoring ghost packet id=%x, current id=%x", + x->x_resend_header.connection_id, x->x_connection_id); + continue; + } + + /* check type */ + type = x->x_resend_header.sequence_number; + if (type != desired_type) continue; + + + /* setup data buffer for known packets */ + switch(type){ + case PDP_UDP_RESEND: + x->x_resend_items = (size - sizeof(t_pdp_udp_header)) / sizeof(unsigned int); + break; + default: + break; + } + + return 1; + } + +} + +/* get the resend list */ +static int _need_resend(t_pdp_udp_sender *x) { + + int retries = 3; + int retval; + while (retries--){ + + /* send a DONE msg */ + _send_done(x); + + /* wait for ACK */ + switch(_receive_packet(x, PDP_UDP_RESEND)){ + case 0: + /* timeout, retry */ + continue; + case -1: + /* error */ + goto move_on; + + default: + /* got PDP_UDP_RESEND packet: setup resend list */ + if (x->x_resend_items > x->x_nb_chunks){ + pdp_post("pdp_netsend: WARNING: chunk list size (%d) is too big, ignoring RESEND request", + x->x_resend_items); + x->x_resend_items = 0; + continue; + } + x->x_chunk_list_size = x->x_resend_items; + + memcpy(x->x_chunk_list, x->x_resend_chunks, sizeof(unsigned int) * x->x_resend_items); + D pdp_post("got RESEND request for %d chunks (id %x)", x->x_resend_items,x->x_connection_id); + + return x->x_chunk_list_size > 0; + } + + } + + /* timeout */ + move_on: + x->x_chunk_list_size = 0; + return 0; + + +} + + +/* INTERFACE */ + + +/* some flow control hacks */ + +void pdp_udp_sender_timeout_us(t_pdp_udp_sender *x, unsigned int timeout_us) +{ + x->x_timeout_us = timeout_us; +} + + +void pdp_udp_sender_sleepgrain_us(t_pdp_udp_sender *x, unsigned int sleepgrain_us) +{ + x->x_sleepgrain_us = sleepgrain_us; +} + +void pdp_udp_sender_sleepperiod(t_pdp_udp_sender *x, unsigned int sleepperiod) +{ + x->x_sleep_period = sleepperiod; +} + + +void pdp_udp_sender_udp_packet_size(t_pdp_udp_sender *x, unsigned int udp_packet_size) +{ + int i = (int)udp_packet_size - sizeof(t_pdp_udp_header); + if (i < 1024) i = 1024; + if (i > PDP_UDP_BUFSIZE) i = PDP_UDP_BUFSIZE; + x->x_udp_payload_size = i; +} + +void pdp_udp_sender_connect(t_pdp_udp_sender *x, char *host, unsigned int port) +{ + struct hostent *hp; + + hp = gethostbyname(host); + if (!hp){ + pdp_post("pdp_udp_sender: host %s not found", host); + } + else{ + /* host ok, setup address */ + x->x_sa.sin_family = AF_INET; + x->x_sa.sin_port = htons(port); + memcpy((char *)&x->x_sa.sin_addr, (char *)hp->h_addr, hp->h_length); + + /* create the a socket if necessary */ + if (x->x_socket == -1){ + if (-1 == (x->x_socket = socket(PF_INET, SOCK_DGRAM, 0))){ + pdp_post("pdp_udp_sender: can't create socket"); + } + if (1){ + int on = 1; + if (setsockopt(x->x_socket,SOL_SOCKET,SO_BROADCAST,(char *)&on,sizeof(on))<0) + pdp_post("pdp_udp_sender: can't set broadcast flag"); + } + } + } +} + +/* setup */ +t_pdp_udp_sender *pdp_udp_sender_new(void) +{ + t_pdp_udp_sender *x = pdp_alloc(sizeof(*x)); + memset(x,0,sizeof(*x)); + + x->x_chunk_list = 0; + + /* no connection */ + x->x_socket = -1; + + + /* set flow control */ + pdp_udp_sender_timeout_us(x, 50000); + x->x_sleep_count = 0; + pdp_udp_sender_sleepgrain_us(x, 0); + pdp_udp_sender_sleepperiod(x, 50); + pdp_udp_sender_udp_packet_size(x, 1472); //optimal udp packet size (ip: 1500 = 28 + 1472) + + + return x; +} + +void pdp_udp_sender_free(t_pdp_udp_sender *x) +{ + int i; + void* retval; + if (x->x_socket != -1) close(x->x_socket); + if (x->x_chunk_list) free (x->x_chunk_list); +} + +/* send, returns 1 on success, 0 on error */ +int pdp_udp_sender_send(t_pdp_udp_sender *x, char* type, unsigned int size, void *data) +{ + + /* SEND A PACKET */ + + /* get the type and data from caller */ + /* send header packet and make sure it has arrived */ + /* send a chunk burst */ + /* send done packet and get the resend list */ + /* repeat until send list is empty */ + + + int hs_retry = 5; // retry count for initial handshake + int rs_retry = 5; // retry count for resends + + /* check if we have a target */ + if (-1 == x->x_socket) goto transerror; + + /* setup internal state */ + _prepare_for_new_transmission(x,type,size,data); + + /* handshake a new transmission */ + do { + if (!(hs_retry--)) break; + // pdp_post("handshake retry %d for packet %x", hscount, x->x_connection_id); + _send_header_packet(x); + } while (!_receive_packet(x, PDP_UDP_ACKNEW)); + + + /* exit if no handshake was possible */ + if (hs_retry < 0){ + DD pdp_post("pdp_netsend: DROP: receiver does not accept new transmission"); + goto transerror; + } + + /* transmission loop */ + do { + if (!(rs_retry--)) break; + _send_chunks(x); + } while (_need_resend(x)); + + /* exit if transmission was not successful */ + if (rs_retry < 0){ + DD pdp_post("pdp_netsend: DROP: receiver did not confirm reception"); + goto transerror; + } + + /* send successful */ + return 1; + + transerror: + /* transmission error */ + return 0; +} |