diff options
Diffstat (limited to 'supercollider/python/OSC.py')
-rwxr-xr-x | supercollider/python/OSC.py | 230 |
1 files changed, 0 insertions, 230 deletions
diff --git a/supercollider/python/OSC.py b/supercollider/python/OSC.py deleted file mode 100755 index 7c3c20f5..00000000 --- a/supercollider/python/OSC.py +++ /dev/null @@ -1,230 +0,0 @@ -#!/usr/bin/env python -# ====================================================================== -# file: OSC.py -# author: stefan kersten <steve@k-hornz.de> -# contents: OSC client module for python -# license: public domain -# ====================================================================== -# $Id: OSC.py,v 1.1 2004-01-20 16:54:44 ksvalast Exp $ -# ====================================================================== -# copyright (c) 2000 stefan kersten -# ====================================================================== -# this module provides simple OSC client functionality -# usage examples down at the end of the file -# ====================================================================== - -__revision__ = "$Revision: 1.1 $" - -# ====================================================================== -# imports - -import cStringIO, exceptions, math, socket, struct, time, types - -# ====================================================================== -# constants - -SECONDS_UTC_TO_UNIX_EPOCH = 2208988800.0 -FLOAT_TO_INT_SCALE = pow(2.0, 32.0) - -# ====================================================================== -# types - -class Value: - """Abstract OSC value.""" - def __init__(self, value): - self.value = value - - def binary_value(self): - pass - - def type_tag(self): - pass - -class Int(Value): - """32 bit integer value.""" - def __init__(self, value): - Value.__init__(self, long(value)) - - def binary_value(self): - return struct.pack('!l', self.value) - - def type_tag(self): - return 'i' - -class Float(Value): - """32 bit floating point value.""" - def __init__(self, value): - Value.__init__(self, float(value)) - - def binary_value(self): - return struct.pack('!f', self.value) - - def type_tag(self): - return 'f' - -class String(Value): - """Null-terminated string padded to multiples of 4 byte.""" - def __init__(self, value): - Value.__init__(self, str(value)) - - def binary_value(self): - v = self.value - l = len(v) - return struct.pack('%ds%dx' % (l, self.pad_amount(l)), v) - - def type_tag(self): - return 's' - - def pad_amount(self, len): - return 4 - (len % 4) - -class Time(Value): - """64 bit timetag in NTP format.""" - def __init__(self, value): - Value.__init__(self, float(value)) - - def __add__(self, time): - return Time(float(self.value + time.value)) - - def binary_value(self): - t = self.value - # FIXME: how to convert without overflows? - s = long(t) - f = long(math.fmod(t, 1.0)*FLOAT_TO_INT_SCALE) - return struct.pack('!LL', s, f) - -# ====================================================================== -# utilities - -time_module = time -def time(): - """Return current time as float in OSC format.""" - return SECONDS_UTC_TO_UNIX_EPOCH + time_module.time() - -# ====================================================================== -# classes - -class Packet: - """Abstract base class for all OSC-related containers. - - Has methods for retrieving the proper binary representation - and its size. - """ - def __init__(self, packets): - stream = cStringIO.StringIO() - self._write_contents(packets, stream) - self._data = stream.getvalue() - - def get_packet(self): - """Return the binary representation of the receiver's contents. - - This data is in the proper OSC format and can be sent over a - socket. - """ - return self._data - - def get_size(self): - """Return the size of the receiver's binary data.""" - return len(self._data) - - def _write_contents(self, packets, stream): - """Write packets on stream. - - Private. - - Override in subclasses for specific behavior. - """ - pass - - def __repr__(self): - return '<' + \ - str(self.__class__.__name__) + \ - ' instance, size=' + \ - str(self.get_size()) + \ - '>' - - def sendto(self, host, port): - """Send the receiver's data through a UDP socket.""" - s = socket.socket(socket.SOCK_DGRAM, socket.AF_INET) - packet = self.get_packet() - s.sendto(packet, (host, port)) - s.close() - - def sendlocal(self, port): - """Send the receiver's data through a UDP socket locally.""" - self.sendto('localhost', port) - -def _value(x): - """Convert x(int, float or string) to an OSC object.""" - t = type(x) - if t == types.FloatType: - return Float(x) - if t == types.IntType or t == types.LongType: - return Int(x) - # return string representation as default - return String(str(x)) - -class Message(Packet): - """Single OSC message with arguments. - - Message(address, *args) -> Message - - address -- OSC address string - *args -- message argument list - """ - def __init__(self, address, args=[]): - Packet.__init__(self, [String(address)] + map(lambda x: _value(x), args)) - - def _write_contents(self, args, stream): - t_stream = cStringIO.StringIO() # tag stream - v_stream = cStringIO.StringIO() # value stream - # open signature string - t_stream.write(',') - # collect tags and arguments - for v in args[1:]: - t_stream.write(v.type_tag()) - v_stream.write(v.binary_value()) - # write address - stream.write(args[0].binary_value()) - # write signature - stream.write(String(t_stream.getvalue()).binary_value()) - # write arguments - stream.write(v_stream.getvalue()) - -class Bundle(Packet): - """OSC container type with timing information. - - Bundle(time, packets) -> Bundle - - time -- floating point timetag in OSC units - packets -- array of Packet(s) - """ - def __init__(self, time, packets): - Packet.__init__(self, [Time(time)] + packets) - - def _write_contents(self, args, stream): - # write '#bundle' preamble - stream.write(String('#bundle').binary_value()) - # write timetag - stream.write(args[0].binary_value()) - # write packets, prefixed with a byte count - for packet in args[1:]: - data = packet.get_packet() - size = len(data) - stream.write(Int(size).binary_value()) - stream.write(data) - -def test(port): - """Some example messages and bundles, sent to port.""" - Message("/filter/cutoff", [145.1232]).sendlocal(port) - Message("/http", ["www dot k-hornz dot de", 12, 3.41, "bulb"]).sendlocal(port) - #print Int(len(Message("/msg").get_packet())).binary_value() - Bundle(0.1, [Message("/fubar")]).sendlocal(port) - Bundle(time(), [Message("/msg", [1.0, "+", 1, 61, "0"]), Message("/bang!")]).sendlocal(port) - -if __name__ == "__main__": - """Run dumpOSC on port 10000.""" - test(10000) - -# EOF -# ====================================================================== |