commit c422065cdb03adec7c8ec04fcc6aeb63872ff7fc Author: Piotr Dobrowolski Date: Sat Sep 23 21:33:32 2017 +0200 Initial vortex spejsiot adapter diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..df81b2c --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.pyc +config.py diff --git a/config.py.dist b/config.py.dist new file mode 100644 index 0000000..c5a523e --- /dev/null +++ b/config.py.dist @@ -0,0 +1,10 @@ +INPUTS = { + 'mic1': [5], + 'aux1': [1, 2], + 'aux2': [3, 4], + } + +OUTPUTS = { + 'main': ['A', 'B'], + 'recording': ['C', 'D'], + } diff --git a/vortex.py b/vortex.py new file mode 100644 index 0000000..849fc3c --- /dev/null +++ b/vortex.py @@ -0,0 +1,131 @@ +import socket +import threading +import logging +import queue +import contextlib +import time + +DEFAULT_TIMEOUT = 0.5 + +class TimeoutException(Exception): pass + +class VortexConnection(object): + def __init__(self, ip, port): + self.logger = logging.getLogger(self.__class__.__name__) + self.handlers = [ + self.on_message + ] + + self.queues = [] + + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((ip, port)) + self.fd = self.socket.makefile('rw') + + def loop_start(self): + self.th = threading.Thread(target=self.loop) + self.th.daemon = True + self.th.start() + + def loop(self): + print('Looping...') + + for line in self.readlines(): + for h in self.handlers: + h(line.strip()) + + def send(self, cmd): + self.logger.debug('>> %r', cmd) + self.fd.write(cmd + '\r') + self.fd.flush() + + def readlines(self): + buf = '' + data = True + + while data: + data = self.socket.recv(4096).decode('utf-8') + buf += data + + while buf.find('\r') != -1: + line, buf = buf.split('\r', 1) + yield line + + + def on_message(self, msg): + self.logger.debug('<< %r', msg) + for q in self.queues: + q.put_nowait(msg) + + @contextlib.contextmanager + def client(self): + q = queue.Queue() + self.queues.append(q) + try: + yield q + finally: + self.queues.remove(q) + + def call(self, cmd, timeout=DEFAULT_TIMEOUT, wait=True): + with self.client() as q: + self.send(cmd) + if not wait: + return + + start = time.time() + while time.time()-start < timeout: + try: + m = q.get(timeout=timeout-(time.time()-start)) + except queue.Empty: + return + + yield m + + def discover(self, timeout=DEFAULT_TIMEOUT): + return { + resp[:3]: VortexDevice(self, resp[:3]) + for resp in self.call('***PING', timeout=timeout) + if resp[3:] == 'PONG' + } + + def __getitem__(self, key): + return VortexDevice(self, key) + +class VortexDevice(object): + def __init__(self, connection, device_id): + self.conn = connection + self.device_id = device_id + + def __repr__(self): + return ''.format(self.device_id) + + def mute(self): + self.call_single('GMUTEO1') + + def unmute(self): + self.call_single('GMUTEO0') + + def call_single(self, *args, **kwargs): + return next(self.call(*args, **kwargs), None) + + def call(self, cmd, expect=None, *args, **kwargs): + if expect is None: + expect = cmd.replace('?', '') + + for resp in self.conn.call(self.device_id + cmd, *args, **kwargs): + if not resp.startswith(self.device_id): + continue + + resp = resp[len(self.device_id):] + + if expect is None or resp.startswith(expect): + yield resp + +if __name__ == "__main__": + c = VortexConnection('127.0.0.1', 10001) + c.loop_start() + print(c.discover()) + + #for n in range(1000): + # c['T00'].call_single('SSTEXT0,1,XD {}'.format(n), expect=False, timeout=0) + # time.sleep(0.1) diff --git a/vortexiot.py b/vortexiot.py new file mode 100644 index 0000000..14d4f70 --- /dev/null +++ b/vortexiot.py @@ -0,0 +1,135 @@ +import paho.mqtt.client as mqtt +import vortex +import logging +import itertools + +import config + +logging.basicConfig(level=logging.INFO) + +class VortexSpejsIOTClient(mqtt.Client): + topic_prefix = 'iot/polycom/' + + def __init__(self, *args, **kwargs): + super(VortexSpejsIOTClient, self).__init__(*args, **kwargs) + + self.logger = logging.getLogger(self.__class__.__name__) + + self.vortex = vortex.VortexConnection('localhost', 10001) + self.vortex.handlers.append(self.on_vortex_message) + self.device_id = '' + + def on_connect(self, client, userdata, flags, rc): + self.subscribe(self.topic_prefix + '+/+/set') + self.logger.info('Connected') + self.device_id = self.vortex.discover().keys()[0] + + def run(self): + self.loop_start() + self.vortex.loop() + + def multi_call(self, cmd, channels, args): + for ch in channels: + self.vortex[self.device_id].call_single('%s%s%s' % ( + cmd, ch, args + ), wait=False) + + def matrix_call(self, cmd, pairs, args): + for s, t in pairs: + self.vortex[self.device_id].call_single('%s%s,%s,%s' % ( + cmd, s, t, args + ), wait=False) + + def notify(self, node, attribute, value): + if isinstance(value, bool): + value = str(value).lower() + else: + value = str(value) + + self.publish('%s%s/%s' % (self.topic_prefix, node, attribute), value) + + def on_message(self, client, userdata, msg): + topic = msg.topic[len(self.topic_prefix):] + self.logger.info('mqtt -> %r %r', topic, msg.payload) + + node, attrib, _ = topic.split('/') + + if node in config.INPUTS: + if attrib == 'gain': + self.multi_call('GAINI', config.INPUTS[node], msg.payload) + elif attrib == 'mute': + self.multi_call('MUTEI', config.INPUTS[node], '1' if msg.payload == 'true' else '0') + elif node in config.OUTPUTS: + if attrib == 'gain': + self.multi_call('GAINO', config.OUTPUTS[node], msg.payload) + elif attrib == 'mute': + self.multi_call('MUTEO', config.OUTPUTS[node], '1' if msg.payload == 'true' else '0') + elif ':' in node: + # This is matrix operation... + inp, _, out = node.partition(':') + + if inp not in config.INPUTS or out not in config.OUTPUTS: + self.logger.warning('Invalid route: %r', node) + return + + inp_chs = config.INPUTS[inp] + out_chs = config.OUTPUTS[out] + + pairs = zip(itertools.cycle(inp_chs), out_chs) + + if attrib == 'mute': + self.matrix_call('MMUTE', pairs, '1' if msg.payload == 'true' else '0') + elif attrib == 'gain': + self.matrix_call('MGAIN', pairs, msg.payload) + + def on_vortex_message(self, msg): + self.logger.info('vortex -> %r', msg) + device_id, msg = msg[:3], msg[3:] + + if self.device_id != device_id: + self.logger.debug('%r/%r: Invalid device id', device_id, self.device_id) + return + + if msg.startswith('MMUTE'): + inp, out, muted = msg[5:].split(',') + inp_label = self.input_from_channel(inp) + out_label = self.output_from_channel(out) + self.notify('%s:%s' % (inp_label, out_label), 'mute', muted == '1') + + elif msg.startswith('MGAIN'): + inp, out, gain = msg[5:].split(',') + inp_label = self.input_from_channel(inp) + out_label = self.output_from_channel(out) + self.notify('%s:%s' % (inp_label, out_label), 'gain', gain) + + elif msg.startswith('GAINI'): + channel_id, gain = msg[5], msg[6:] + label = self.input_from_channel(channel_id) + self.notify(label, 'gain', gain) + + elif msg.startswith('MUTEI'): + channel_id, muted = msg[5], msg[6:] + label = self.input_from_channel(channel_id) + self.notify(label, 'mute', muted == '1') + + def input_from_channel(self, channel): + return self.find_label(config.INPUTS, channel) + + def output_from_channel(self, channel): + return self.find_label(config.OUTPUTS, channel) + + def find_label(self, labels, channel): + for label, channels in labels.items(): + if channel in map(str, channels): + return label + + return None + + +def main(): + client = VortexSpejsIOTClient() + client.connect('localhost') + client.run() + +if __name__ == "__main__": + main()