streaming-docker/api/snowmix.py

165 lines
5.8 KiB
Python

import socket
import re
import logging
import threading
from pyparsing import nestedExpr, originalTextFor
tclparser = nestedExpr('{', '}')
scene_head_re = re.compile(r'Scene (\d+) active ([01])"? WxH (\d+)x(\d+) at (\d+),(\d+) name (.*)')
scene_back_re = re.compile(r'- back : (\w+) (\d+) WxH (\d+)x(\d+) at ([\d.]+),([\d.]+) shape (\d+) place (\d+)')
scene_frame_re = re.compile(r'- frame (\d+) active (\d+) : (\d+)x(\d+) at (\d+),(\d+) source ([\w-]+),([\w-]+) id ([\d-]+),([\d-]+) shape (\d+),(\d+) place (\d+),(\d+)')
image_list_re = re.compile('^image load (\d+) <(.*)> (\d+)x(\d+) bit depth (\d+) type (.*) seqno (\d+)$')
feed_list_re = re.compile(r'^feed (\d+) : (.*) (.*) (.*) (\d+)x(\d+) (\d+),(\d+) (\d+)x(\d+) (\d+),(\d+) (\d+):(\d+) (\d+) (\d+) (\d+) <(.*)>$')
def parse_tcl(data):
return tclparser.parseString('{%s}' % (data,))[0].asList()
def cast(l, t, kl, default=None):
for k in kl:
try:
l[k] = t(l[k])
except:
logging.exception('Cast to %r failed on %r -> %r', t, k, l)
l[k] = default
return l
class SnowmixClient(object):
def __init__(self, host='127.0.0.1', port=9999):
self.connect(host, port)
self.sem = threading.Semaphore()
def connect(self, host, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(1.0)
self.sock.connect((host, port))
self.fd = self.sock.makefile('rw')
self.version = self.fd.readline().split(' ')[2]
def flush_input(self):
self.sock.setblocking(0)
try:
self.fd.read()
except:
pass
self.sock.setblocking(1)
self.sock.settimeout(1.0)
def call(self, command, expect='MSG:'):
with self.sem:
self.flush_input()
self.fd.write(command + '\r\n')
self.fd.flush()
while True:
line = self.fd.readline()
print('< ' + line.strip())
if expect and line.startswith(expect):
line = line[len(expect):].strip()
if not line:
return
yield line
def tcl(self, code):
return parse_tcl(next(
self.call('tcl eval snowmix message [%s]' % code), ''))
def scene_list(self):
return [int(s) for s in next(self.call('tcl eval ScenesList')).split(' ')[2:]]
def scene_info(self, scene_id):
lines = list(self.call('tcl eval SceneList %d' % scene_id))
s = {'frames': {}}
s['id'], s['active'], s['width'], s['height'], s['x'], s['y'], s['name'] = \
scene_head_re.match(lines[0]).groups()
bg_type, bg_id, _, _, _, _ ,_ ,_ = \
scene_back_re.match(lines[1]).groups()
for l in lines[2:]:
pos = {}
front = {'source': [None, None]}
back = {'source': [None, None]}
frame = {'position': pos, 'back': back, 'front': front}
frame['id'], frame['active'], pos['width'], pos['height'], \
pos['x'], pos['y'], front['source'][0], back['source'][0], \
front['source'][1], back['source'][1], _, _, _, _ = \
scene_frame_re.match(l).groups()
s['frames'][frame['id']] = frame
frame['active'] = frame['active'] == '1'
cast(pos, int, ['width', 'height', 'x', 'y'])
back['source'] = ':'.join(back['source'])
front['source'] = ':'.join(front['source'])
alpha = self.tcl('SceneAlpha %d' % scene_id)
s['alpha'] = float(alpha[0][0])
s['background_alpha'] = float(alpha[0][1])
s['text_alpha'] = float(alpha[0][2])
for k, front_alpha, back_alpha in alpha[1:]:
s['frames'][k]['front']['alpha'] = float(front_alpha)
s['frames'][k]['back']['alpha'] = float(back_alpha)
alphalink = self.tcl('SceneAlphaLink %d' % scene_id)
s['alpha_background_link'] = alphalink[0][0] == '1'
s['alpha_text_link'] = alphalink[0][1] == '1'
for k, link in alphalink[1:]:
s['frames'][k]['alpha_link'] = link == '1'
cast(s, int, ['active', 'width', 'height', 'x', 'y',
'alpha_background_link', 'alpha_text_link'])
return s
def image_list(self):
images = []
for l in self.call('image load'):
img = {}
img['id'], img['source'], img['width'], img['height'], \
img['bit'], img['type'], img['seqno'] = \
image_list_re.match(l).groups()
cast(img, int, ['width', 'height', 'bit', 'seqno'])
images.append(img)
return images
def feed_list(self):
feeds = []
lines = list(self.call('feed info', 'STAT:'))
for l in lines[4:]:
f = {}
f['id'], f['state'], f['live'], f['oneshot'], f['width'], f['height'], \
f['cutstartx'], f['cutstarty'], f['cutw'], f['cuth'], \
f['offsetx'], f['offsety'], f['fifo1'], f['fifo2'], \
f['good'], f['missed'], f['dropped'], f['name'] = \
feed_list_re.match(l).groups()
cast(f, int, ['width', 'height', 'cutstartx', 'cutstarty',
'cutw', 'cuth', 'offsetx', 'offsety',
'fifo1', 'fifo2', 'good', 'missed', 'dropped'])
feeds.append(f)
return feeds
def scene_cut(self, scene_id):
self.tcl('SceneSetState %d 1 0' % scene_id)
def scene_fade(self, scene_id):
self.tcl('SceneSetState %d 1 1' % scene_id)
def scene_alpha(self, scene_id, alpha, frame_id=-1):
self.tcl('SceneAlpha %d %d %f' % (scene_id, frame_id, alpha))
if __name__ == "__main__":
c = SnowmixClient('127.0.0.1')
for l in c.call('feed info', 'STAT:'):
print('Result: %s' % (l,))
print(c.tcl('SceneAlpha 1'))