kivy / oscpy

An efficient OSC implementation compatible with python2.7 and 3.5+
MIT License
109 stars 27 forks source link
mit-license network osc python python2 python3 tested thread udp

OSCPy

Coverage Status CI is done by Github Checks, see the current commit for build status.

A modern implementation of OSC for python2/3.

What is OSC.

OpenSoundControl is an UDP based network protocol, that is designed for fast dispatching of time-sensitive messages, as the name suggests, it was designed as a replacement for MIDI, but applies well to other situations. The protocol is simple to use, OSC addresses look like http URLs, and accept various basic types, such as string, float, int, etc. You can think of it basically as an http POST, with less overhead.

You can learn more about OSC on OpenSoundControl.org

Goals

Features

Install

pip install oscpy

Usage

Server (thread)

from oscpy.server import OSCThreadServer
from time import sleep

def callback(*values):
    print("got values: {}".format(values))

osc = OSCThreadServer()  # See sources for all the arguments

# You can also use an \*nix socket path here
sock = osc.listen(address='0.0.0.0', port=8000, default=True)
osc.bind(b'/address', callback)
sleep(1000)
osc.stop()  # Stop the default socket

osc.stop_all()  # Stop all sockets

# Here the server is still alive, one might call osc.listen() again

osc.terminate_server()  # Request the handler thread to stop looping

osc.join_server()  # Wait for the handler thread to finish pending tasks and exit

or you can use the decorator API.

Server (thread)

from oscpy.server import OSCThreadServer
from time import sleep

osc = OSCThreadServer()
sock = osc.listen(address='0.0.0.0', port=8000, default=True)

@osc.address(b'/address')
def callback(*values):
    print("got values: {}".format(values))

sleep(1000)
osc.stop()

Servers are also client, in the sense they can send messages and answer to messages from other servers

from oscpy.server import OSCThreadServer
from time import sleep

osc_1 = OSCThreadServer()
osc_1.listen(default=True)

@osc_1.address(b'/ping')
def ping(*values):
    print("ping called")
    if True in values:
        cont.append(True)
    else:
        osc_1.answer(b'/pong')

osc_2 = OSCThreadServer()
osc_2.listen(default=True)

@osc_2.address(b'/pong')
def pong(*values):
    print("pong called")
    osc_2.answer(b'/ping', [True])

osc_2.send_message(b'/ping', [], *osc_1.getaddress())

timeout = time() + 1
while not cont:
    if time() > timeout:
        raise OSError('timeout while waiting for success message.')

Server (async) (TODO!)

from oscpy.server import OSCThreadServer

with OSCAsyncServer(port=8000) as OSC:
    for address, values in OSC.listen():
       if address == b'/example':
            print("got {} on /example".format(values))
       else:
            print("unknown address {}".format(address))

Client

from oscpy.client import OSCClient

address = "127.0.0.1"
port = 8000

osc = OSCClient(address, port)
for i in range(10):
    osc.send_message(b'/ping', [i])

Unicode

By default, the server and client take bytes (encoded strings), not unicode strings, for osc addresses as well as osc strings. However, you can pass an encoding parameter to have your strings automatically encoded and decoded by them, so your callbacks will get unicode strings (unicode in python2, str in python3).

osc = OSCThreadServer(encoding='utf8')
osc.listen(default=True)

values = []

@osc.address(u'/encoded')
def encoded(*val):
    for v in val:
        assert not isinstance(v, bytes)
    values.append(val)

send_message(
    u'/encoded',
    [u'hello world', u'ééééé ààààà'],
    *osc.getaddress(), encoding='utf8')

(u literals added here for clarity).

CLI

OSCPy provides an "oscli" util, to help with debugging:

See oscli -h for more information.

GOTCHAS

TODO

Contributing

Check out our contribution guide and feel free to improve OSCPy.

License

OSCPy is released under the terms of the MIT License. Please see the LICENSE.txt file.