lights-web/app.py

99 lines
2.7 KiB
Python

import flask
import threading
import paho.mqtt as mqtt
import paho.mqtt.client
import paho.mqtt.publish
from pathlib import Path
import os
import yaml
import re
import json
secrets_path = Path(os.environ.get("LIGHTS_WEB_SECRETS", 'secrets.yaml'))
config_path = Path(os.environ.get("LIGHTS_WEB_CONFIG", 'config.yaml'))
config = yaml.safe_load(config_path.read_text())
config.update(yaml.safe_load(secrets_path.read_text()))
c = threading.Condition()
application = flask.Flask(__name__)
application.msg="ab"
application.lights = [False, False, False, False]
#def on_message(client,userdata,message):
# c.acquire()
# application.msg = str(message.payload)
# print(message.payload)
# light_status = re.compile('devices/light_[^/]+/light_([0-9]+)/on')
# m = light_status.search(message.topic)
# if m:
# n = int(m.group(1))
# if n > 0 and n < 5:
# if message.payload == "true":
# application.lights[n-1] = True
# else:
# application.lights[n-1] = False
#
#
# c.release()
# print("MQTT Thread: " + str(threading.current_thread()))
#
#mc = mqtt.client.Client()
#mc.username_pw_set(config['mqtt_rw_username'], config['mqtt_rw_password'])
#mc.connect("10.8.1.16")
#mc.on_message = on_message
#mc.subscribe("devices/#")
#mc.loop_start()
@application.route("/")
def main():
return flask.send_file('static/index.html')
@application.route("/status")
def status():
print("Main Thread: " + str(threading.current_thread()))
c.acquire()
st = {"lights" : application.lights}
c.release()
return json.dumps(st)
@application.route("/config")
def get_config():
return flask.jsonify({
"mqtt_server": config["mqtt_server"],
"mqtt_port": config["mqtt_port"],
"mqtt_username": config["mqtt_ro_username"],
"mqtt_password": config["mqtt_ro_password"],
"lights": config['lights']
})
def publish(path, payload):
mqtt.publish.single(
path,
payload,
qos=1,
hostname=config["mqtt_server"],
auth = {
'username':config["mqtt_rw_username"],
'password':config["mqtt_rw_password"]
}
)
@application.route('/light/<lid>/on/set', methods=['POST'])
def set_light(lid):
if lid in config["lights"]:
publish(config["lights"][lid]["mqtt_path"] + "/set", "true")
return flask.jsonify({"ok": True})
@application.route('/light/<lid>/on/toggle', methods=['POST'])
def toggle_light(lid):
if lid in config["lights"]:
publish(config["lights"][lid]["mqtt_path"] + "/toggle", "true")
return flask.jsonify({"ok": True})
if __name__ == "__main__":
application.run(host='127.0.0.1',port=8000,debug=True)