streaming-docker/gst-snowmix/python/srcelement.py

136 lines
4.7 KiB
Python

#!/usr/bin/env python
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
# sinkelement.py
# (c) 2005 Edward Hervey <edward@fluendo.com>
# (c) 2007 Jan Schmidt <jan@fluendo.com>
# Licensed under LGPL
#
# Small test application to show how to write a sink element
# in 20 lines in python and place into the gstreamer registry
# so it can be autoplugged or used from parse_launch.
#
# You can run the example from the source doing from gst-python/:
#
# $ export GST_PLUGIN_PATH=$GST_PLUGIN_PATH:$PWD/plugin:$PWD/examples/plugins
# $ GST_DEBUG=python:4 gst-launch-1.0 fakesrc num-buffers=10 ! mysink
# inf notes 20190221
# https://gist.github.com/jackersson/9d3b0c578c1e625b6b79ea04e2cebd15
# https://mathieuduponchelle.github.io/2018-02-01-Python-Elements.html?gi-language=undefined
import os
from snowmix import SnowmixClient
from gi.repository import Gst, GObject, GstBase, Gio
Gst.init(None)
#
# Simple Sink element created entirely in python
#
class SnowmixAudioSource(Gst.Bin):
__gstmetadata__ = ('CustomSource','Source', \
'Custom test source element', 'Edward Hervey')
__gsttemplates__ = Gst.PadTemplate.new("src",
Gst.PadDirection.SRC,
Gst.PadPresence.ALWAYS,
Gst.Caps.new_any())
__gproperties__ = {
"mixer": (int,
"Mixer",
"Source mixer number",
1,
255,
1,
GObject.ParamFlags.READWRITE
),
#"host": (str,
# "Snowmix Host",
# "Target snowmix host",
# '',
# GObject.ParamFlags.READWRITE
# ),
"port": (int,
"Snowmix Port",
"Target snowmix port",
0,
65535,
9999,
GObject.ParamFlags.READWRITE
),
# [...]
}
def __init__(self, *args, **kwargs):
Gst.info('init(%r, %r)' % (args, kwargs))
super(SnowmixAudioSource, self).__init__(*args, **kwargs)
self.caps = caps = Gst.ElementFactory.make("capsfilter", None)
self.source = source = Gst.ElementFactory.make("socketsrc", None)
self.add(source, caps)
# Link from left to right
#source.link(caps)
#caps.link(sink)
# Expose first/last
self.add_pad(Gst.GhostPad.new("src", source.get_static_pad("src")))
#print(dir(self))
#print(self.list_properties())
self._propstorage = {e.name: e.default_value for e in self.list_properties()}
#self.props = {k: e[5] for k, e in } #SnowmixAudioSource.__gproperties__.entries()}
def do_change_state(self, state):
Gst.info('do_change_state(%r)' % (state))
if state == Gst.StateChange.NULL_TO_READY:
ip = '' or os.getenv('SNOWMIX_IP') or '127.0.0.1'
port = self.props.port or os.getenv('SNOWMIX_PORT') or 9999
client = SnowmixClient(ip, port)
rate = client.audio_rate('sink')[self.props.mixer]
channels = client.audio_channels('sink')[self.props.mixer]
c = Gst.Caps.from_string(
"audio/x-raw,rate=%d,channels=%d,format=S16LE,layout=interleaved" % (rate, channels))
self.source.set_property('do-timestamp', True)
self.source.set_property("caps", c)
# self.caps.set_property("caps", c)
client.close()
self.socket = Gio.Socket.new(
Gio.SocketFamily.IPV4, Gio.SocketType.STREAM,
Gio.SocketProtocol.DEFAULT)
self.socket.connect(Gio.InetSocketAddress.new_from_string(
ip, port))
self.socket.send(b'audio sink ctr isaudio %d\n' % (self.props.mixer))
self.source.set_property('socket', self.socket)
return Gst.Bin.do_change_state(self, state)
def do_set_property(self, prop, value):
Gst.info('do_set_property(%r, %r)' % (prop, value))
if prop.name not in self._propstorage:
raise AttributeError('unknown property %s' % prop.name)
self._propstorage[prop.name] = value
raise AttributeError('unknown property %s' % prop.name)
#return Gst.Bin.do_set_property(self, prop, value)
def do_get_property(self, prop):
if prop.name in self._propstorage:
return self._propstorage[prop.name]
def do_render(self, buffer):
Gst.info("timestamp(buffer):%s" % (Gst.TIME_ARGS(buffer.pts)))
return Gst.FlowReturn.OK
GObject.type_register(SnowmixAudioSource)
__gstelementfactory__ = ("snowmixaudiosrc", Gst.Rank.NONE, SnowmixAudioSource)