streaming-docker/api/snowmix.py

213 lines
7.4 KiB
Python

import socket
import re
import logging
import threading
from pyparsing import nestedExpr, originalTextFor
tclparser = nestedExpr('{', '}')
scene_head_re = re.compile(r'Scene (\d+) active ([01])"? WxH (\d+)x(\d+) at (\d+),(\d+) name (.*)')
scene_back_re = re.compile(r'- back : (\w+) (\d+) WxH (\d+)x(\d+) at ([\d.]+),([\d.]+) shape (\d+) place (\d+)')
scene_frame_re = re.compile(r'- frame (\d+) active (\d+) : (\d+)x(\d+) at (\d+),(\d+) source ([\w-]+),([\w-]+) id ([\d-]+),([\d-]+) shape (\d+),(\d+) place (\d+),(\d+)')
image_list_re = re.compile(r'^image load (\d+) <(.*)> (\d+)x(\d+) bit depth (\d+) type (.*) seqno (\d+)$')
image_name_re = re.compile(r'^image name (\d+) <(.*)>$')
feed_list_re = re.compile(r'^feed (\d+) : (.*) (.*) (.*) (\d+)x(\d+) (\d+),(\d+) (\d+)x(\d+) (\d+),(\d+) (\d+):(\d+) (\d+) (\d+) (\d+) <(.*)>$')
def parse_tcl(data):
return tclparser.parseString('{%s}' % (data,))[0].asList()
def cast(l, t, kl, default=None):
for k in kl:
try:
l[k] = t(l[k])
except:
logging.exception('Cast to %r failed on %r -> %r', t, k, l)
l[k] = default
return l
class SnowmixClient(object):
reconnect = True
def __init__(self, host='127.0.0.1', port=9999):
self.connect(host, port)
self.sem = threading.Semaphore()
def connect(self, host, port):
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(1.0)
self.sock.connect((host, port))
self.fd = self.sock.makefile('rw')
self.version = self.fd.readline().split(' ')[2]
def close(self):
self.sock.close()
def flush_input(self):
self.sock.setblocking(0)
try:
self.fd.read()
except:
pass
self.sock.setblocking(1)
self.sock.settimeout(1.0)
def call(self, command, expect='MSG:'):
with self.sem:
try:
self.flush_input()
self.fd.write(command + '\r\n')
self.fd.flush()
while True:
line = self.fd.readline()
if not line:
raise Exception('Remote host closed socket')
if expect and line.startswith(expect):
line = line[len(expect):].strip()
if not line:
return
yield line
except Exception:
if self.reconnect:
self.connect(self.host, self.port)
raise
def tcl(self, code):
return parse_tcl(next(
self.call('tcl eval snowmix message [%s]' % code), ''))
def scene_list(self):
return [int(s) for s in next(self.call('tcl eval ScenesList')).split(' ')[2:]]
def scene_info(self, scene_id):
lines = list(self.call('tcl eval SceneList %d' % scene_id))
s = {'frames': {}}
s['id'], s['active'], s['width'], s['height'], s['x'], s['y'], s['name'] = \
scene_head_re.match(lines[0]).groups()
bg_type, bg_id, _, _, _, _ ,_ ,_ = \
scene_back_re.match(lines[1]).groups()
for l in lines[2:]:
pos = {}
front = {'source': [None, None]}
back = {'source': [None, None]}
frame = {'position': pos, 'back': back, 'front': front}
frame['id'], frame['active'], pos['width'], pos['height'], \
pos['x'], pos['y'], front['source'][0], back['source'][0], \
front['source'][1], back['source'][1], _, _, _, _ = \
scene_frame_re.match(l).groups()
s['frames'][frame['id']] = frame
frame['active'] = frame['active'] == '1'
cast(pos, int, ['width', 'height', 'x', 'y'])
back['source'] = ':'.join(back['source'])
front['source'] = ':'.join(front['source'])
alpha = self.tcl('SceneAlpha %d' % scene_id)
s['alpha'] = float(alpha[0][0])
s['background_alpha'] = float(alpha[0][1])
s['text_alpha'] = float(alpha[0][2])
for k, front_alpha, back_alpha in alpha[1:]:
s['frames'][k]['front']['alpha'] = float(front_alpha)
s['frames'][k]['back']['alpha'] = float(back_alpha)
alphalink = self.tcl('SceneAlphaLink %d' % scene_id)
s['alpha_background_link'] = alphalink[0][0] == '1'
s['alpha_text_link'] = alphalink[0][1] == '1'
for k, link in alphalink[1:]:
s['frames'][k]['alpha_link'] = link == '1'
cast(s, int, ['active', 'width', 'height', 'x', 'y',
'alpha_background_link', 'alpha_text_link'])
return s
def image_list(self):
images = {}
for l in self.call('image load'):
img = {}
img['id'], img['source'], img['width'], img['height'], \
img['bit'], img['type'], img['seqno'] = \
image_list_re.match(l).groups()
cast(img, int, ['width', 'height', 'bit', 'seqno'])
images[img['id']] = img
for l in self.call('image name'):
img_id, name = image_name_re.match(l).groups()
images[img_id]['name'] = name
return list(images.values())
def feed_list(self):
feeds = []
lines = list(self.call('feed info', 'STAT:'))
for l in lines[4:]:
f = {}
f['id'], f['state'], f['live'], f['oneshot'], f['width'], f['height'], \
f['cutstartx'], f['cutstarty'], f['cutw'], f['cuth'], \
f['offsetx'], f['offsety'], f['fifo1'], f['fifo2'], \
f['good'], f['missed'], f['dropped'], f['name'] = \
feed_list_re.match(l).groups()
cast(f, int, ['width', 'height', 'cutstartx', 'cutstarty',
'cutw', 'cuth', 'offsetx', 'offsety',
'fifo1', 'fifo2', 'good', 'missed', 'dropped'])
feeds.append(f)
return feeds
def scene_cut(self, scene_id):
self.tcl('SceneSetState %d 1 0' % scene_id)
def scene_fade(self, scene_id):
self.tcl('SceneSetState %d 1 1' % scene_id)
def scene_alpha(self, scene_id, alpha, frame_id=-1, back=False):
if back:
self.tcl('SceneAlpha %d %d - %f' % (scene_id, frame_id, alpha))
else:
self.tcl('SceneAlpha %d %d %f' % (scene_id, frame_id, alpha))
def frame_set_source(self, scene_id, frame_id, source, back=False):
source = source.split(':')
self.tcl('SceneSetFrameSource %d %d %s %s %d 1' % (scene_id, frame_id, source[0], source[1], not back))
def frame_set_active(self, scene_id, frame_id, active):
print(scene_id, frame_id, active)
print(next(self.call('tcl eval SceneSetFrameActive %d %d %d 0' % (scene_id, frame_id, active))))
def audio_rate(self, audio_type='feed'):
return {
int(line.split(' ')[2]): int(line.split(' ')[4])
for line in self.call('audio %s rate' % (audio_type), 'STAT:')
}
def audio_channels(self, audio_type='feed'):
return {
int(line.split(' ')[2]): int(line.split(' ')[4])
for line in self.call('audio %s channels' % (audio_type), 'STAT:')
}
if __name__ == "__main__":
c = SnowmixClient('snowmix')
for l in c.call('feed info', 'STAT:'):
print('Result: %s' % (l,))
print(c.tcl('SceneAlpha 1'))
import pprint
pprint.pprint(c.image_list())