213 lines
7.4 KiB
Python
213 lines
7.4 KiB
Python
import socket
|
|
import re
|
|
import logging
|
|
import threading
|
|
|
|
from pyparsing import nestedExpr, originalTextFor
|
|
|
|
tclparser = nestedExpr('{', '}')
|
|
|
|
scene_head_re = re.compile(r'Scene (\d+) active ([01])"? WxH (\d+)x(\d+) at (\d+),(\d+) name (.*)')
|
|
scene_back_re = re.compile(r'- back : (\w+) (\d+) WxH (\d+)x(\d+) at ([\d.]+),([\d.]+) shape (\d+) place (\d+)')
|
|
scene_frame_re = re.compile(r'- frame (\d+) active (\d+) : (\d+)x(\d+) at (\d+),(\d+) source ([\w-]+),([\w-]+) id ([\d-]+),([\d-]+) shape (\d+),(\d+) place (\d+),(\d+)')
|
|
image_list_re = re.compile(r'^image load (\d+) <(.*)> (\d+)x(\d+) bit depth (\d+) type (.*) seqno (\d+)$')
|
|
image_name_re = re.compile(r'^image name (\d+) <(.*)>$')
|
|
feed_list_re = re.compile(r'^feed (\d+) : (.*) (.*) (.*) (\d+)x(\d+) (\d+),(\d+) (\d+)x(\d+) (\d+),(\d+) (\d+):(\d+) (\d+) (\d+) (\d+) <(.*)>$')
|
|
|
|
def parse_tcl(data):
|
|
return tclparser.parseString('{%s}' % (data,))[0].asList()
|
|
|
|
def cast(l, t, kl, default=None):
|
|
for k in kl:
|
|
try:
|
|
l[k] = t(l[k])
|
|
except:
|
|
logging.exception('Cast to %r failed on %r -> %r', t, k, l)
|
|
l[k] = default
|
|
|
|
return l
|
|
|
|
class SnowmixClient(object):
|
|
reconnect = True
|
|
|
|
def __init__(self, host='127.0.0.1', port=9999):
|
|
self.connect(host, port)
|
|
self.sem = threading.Semaphore()
|
|
|
|
def connect(self, host, port):
|
|
self.host = host
|
|
self.port = port
|
|
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.settimeout(1.0)
|
|
self.sock.connect((host, port))
|
|
self.fd = self.sock.makefile('rw')
|
|
self.version = self.fd.readline().split(' ')[2]
|
|
|
|
def close(self):
|
|
self.sock.close()
|
|
|
|
def flush_input(self):
|
|
self.sock.setblocking(0)
|
|
|
|
try:
|
|
self.fd.read()
|
|
except:
|
|
pass
|
|
|
|
self.sock.setblocking(1)
|
|
self.sock.settimeout(1.0)
|
|
|
|
def call(self, command, expect='MSG:'):
|
|
with self.sem:
|
|
try:
|
|
self.flush_input()
|
|
|
|
self.fd.write(command + '\r\n')
|
|
self.fd.flush()
|
|
|
|
while True:
|
|
line = self.fd.readline()
|
|
|
|
if not line:
|
|
raise Exception('Remote host closed socket')
|
|
|
|
if expect and line.startswith(expect):
|
|
line = line[len(expect):].strip()
|
|
if not line:
|
|
return
|
|
|
|
yield line
|
|
except Exception:
|
|
if self.reconnect:
|
|
self.connect(self.host, self.port)
|
|
|
|
raise
|
|
|
|
def tcl(self, code):
|
|
return parse_tcl(next(
|
|
self.call('tcl eval snowmix message [%s]' % code), ''))
|
|
|
|
def scene_list(self):
|
|
return [int(s) for s in next(self.call('tcl eval ScenesList')).split(' ')[2:]]
|
|
|
|
def scene_info(self, scene_id):
|
|
lines = list(self.call('tcl eval SceneList %d' % scene_id))
|
|
|
|
s = {'frames': {}}
|
|
|
|
s['id'], s['active'], s['width'], s['height'], s['x'], s['y'], s['name'] = \
|
|
scene_head_re.match(lines[0]).groups()
|
|
bg_type, bg_id, _, _, _, _ ,_ ,_ = \
|
|
scene_back_re.match(lines[1]).groups()
|
|
for l in lines[2:]:
|
|
pos = {}
|
|
front = {'source': [None, None]}
|
|
back = {'source': [None, None]}
|
|
frame = {'position': pos, 'back': back, 'front': front}
|
|
|
|
frame['id'], frame['active'], pos['width'], pos['height'], \
|
|
pos['x'], pos['y'], front['source'][0], back['source'][0], \
|
|
front['source'][1], back['source'][1], _, _, _, _ = \
|
|
scene_frame_re.match(l).groups()
|
|
s['frames'][frame['id']] = frame
|
|
frame['active'] = frame['active'] == '1'
|
|
cast(pos, int, ['width', 'height', 'x', 'y'])
|
|
back['source'] = ':'.join(back['source'])
|
|
front['source'] = ':'.join(front['source'])
|
|
|
|
alpha = self.tcl('SceneAlpha %d' % scene_id)
|
|
s['alpha'] = float(alpha[0][0])
|
|
s['background_alpha'] = float(alpha[0][1])
|
|
s['text_alpha'] = float(alpha[0][2])
|
|
|
|
for k, front_alpha, back_alpha in alpha[1:]:
|
|
s['frames'][k]['front']['alpha'] = float(front_alpha)
|
|
s['frames'][k]['back']['alpha'] = float(back_alpha)
|
|
|
|
alphalink = self.tcl('SceneAlphaLink %d' % scene_id)
|
|
s['alpha_background_link'] = alphalink[0][0] == '1'
|
|
s['alpha_text_link'] = alphalink[0][1] == '1'
|
|
for k, link in alphalink[1:]:
|
|
s['frames'][k]['alpha_link'] = link == '1'
|
|
|
|
cast(s, int, ['active', 'width', 'height', 'x', 'y',
|
|
'alpha_background_link', 'alpha_text_link'])
|
|
return s
|
|
|
|
def image_list(self):
|
|
images = {}
|
|
for l in self.call('image load'):
|
|
img = {}
|
|
img['id'], img['source'], img['width'], img['height'], \
|
|
img['bit'], img['type'], img['seqno'] = \
|
|
image_list_re.match(l).groups()
|
|
cast(img, int, ['width', 'height', 'bit', 'seqno'])
|
|
|
|
images[img['id']] = img
|
|
|
|
for l in self.call('image name'):
|
|
img_id, name = image_name_re.match(l).groups()
|
|
images[img_id]['name'] = name
|
|
|
|
return list(images.values())
|
|
|
|
def feed_list(self):
|
|
feeds = []
|
|
lines = list(self.call('feed info', 'STAT:'))
|
|
for l in lines[4:]:
|
|
f = {}
|
|
f['id'], f['state'], f['live'], f['oneshot'], f['width'], f['height'], \
|
|
f['cutstartx'], f['cutstarty'], f['cutw'], f['cuth'], \
|
|
f['offsetx'], f['offsety'], f['fifo1'], f['fifo2'], \
|
|
f['good'], f['missed'], f['dropped'], f['name'] = \
|
|
feed_list_re.match(l).groups()
|
|
cast(f, int, ['width', 'height', 'cutstartx', 'cutstarty',
|
|
'cutw', 'cuth', 'offsetx', 'offsety',
|
|
'fifo1', 'fifo2', 'good', 'missed', 'dropped'])
|
|
feeds.append(f)
|
|
|
|
return feeds
|
|
|
|
def scene_cut(self, scene_id):
|
|
self.tcl('SceneSetState %d 1 0' % scene_id)
|
|
|
|
def scene_fade(self, scene_id):
|
|
self.tcl('SceneSetState %d 1 1' % scene_id)
|
|
|
|
def scene_alpha(self, scene_id, alpha, frame_id=-1, back=False):
|
|
if back:
|
|
self.tcl('SceneAlpha %d %d - %f' % (scene_id, frame_id, alpha))
|
|
else:
|
|
self.tcl('SceneAlpha %d %d %f' % (scene_id, frame_id, alpha))
|
|
|
|
def frame_set_source(self, scene_id, frame_id, source, back=False):
|
|
source = source.split(':')
|
|
self.tcl('SceneSetFrameSource %d %d %s %s %d 1' % (scene_id, frame_id, source[0], source[1], not back))
|
|
|
|
def frame_set_active(self, scene_id, frame_id, active):
|
|
print(scene_id, frame_id, active)
|
|
print(next(self.call('tcl eval SceneSetFrameActive %d %d %d 0' % (scene_id, frame_id, active))))
|
|
|
|
def audio_rate(self, audio_type='feed'):
|
|
return {
|
|
int(line.split(' ')[2]): int(line.split(' ')[4])
|
|
for line in self.call('audio %s rate' % (audio_type), 'STAT:')
|
|
}
|
|
|
|
def audio_channels(self, audio_type='feed'):
|
|
return {
|
|
int(line.split(' ')[2]): int(line.split(' ')[4])
|
|
for line in self.call('audio %s channels' % (audio_type), 'STAT:')
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
c = SnowmixClient('snowmix')
|
|
|
|
for l in c.call('feed info', 'STAT:'):
|
|
print('Result: %s' % (l,))
|
|
|
|
print(c.tcl('SceneAlpha 1'))
|
|
import pprint
|
|
pprint.pprint(c.image_list())
|