98 lines
2.7 KiB
Python
98 lines
2.7 KiB
Python
import flask
|
|
import threading
|
|
import paho.mqtt as mqtt
|
|
import paho.mqtt.client
|
|
import paho.mqtt.publish
|
|
from pathlib import Path
|
|
import os
|
|
import yaml
|
|
import re
|
|
import json
|
|
|
|
secrets_path = Path(os.environ.get("LIGHTS_WEB_SECRETS", 'secrets.yaml'))
|
|
config_path = Path(os.environ.get("LIGHTS_WEB_CONFIG", 'config.yaml'))
|
|
config = yaml.safe_load(config_path.read_text())
|
|
config.update(yaml.safe_load(secrets_path.read_text()))
|
|
|
|
c = threading.Condition()
|
|
application = flask.Flask(__name__)
|
|
application.msg="ab"
|
|
application.lights = [False, False, False, False]
|
|
|
|
#def on_message(client,userdata,message):
|
|
# c.acquire()
|
|
# application.msg = str(message.payload)
|
|
# print(message.payload)
|
|
# light_status = re.compile('devices/light_[^/]+/light_([0-9]+)/on')
|
|
# m = light_status.search(message.topic)
|
|
# if m:
|
|
# n = int(m.group(1))
|
|
# if n > 0 and n < 5:
|
|
# if message.payload == "true":
|
|
# application.lights[n-1] = True
|
|
# else:
|
|
# application.lights[n-1] = False
|
|
#
|
|
#
|
|
# c.release()
|
|
# print("MQTT Thread: " + str(threading.current_thread()))
|
|
#
|
|
#mc = mqtt.client.Client()
|
|
#mc.username_pw_set(config['mqtt_rw_username'], config['mqtt_rw_password'])
|
|
#mc.connect("10.8.1.16")
|
|
#mc.on_message = on_message
|
|
#mc.subscribe("devices/#")
|
|
#mc.loop_start()
|
|
|
|
|
|
@application.route("/")
|
|
def main():
|
|
return flask.send_file('static/index.html')
|
|
|
|
@application.route("/status")
|
|
def status():
|
|
print("Main Thread: " + str(threading.current_thread()))
|
|
c.acquire()
|
|
st = {"lights" : application.lights}
|
|
c.release()
|
|
return json.dumps(st)
|
|
|
|
|
|
@application.route("/config")
|
|
def get_config():
|
|
return flask.jsonify({
|
|
"mqtt_server": config["mqtt_server"],
|
|
"mqtt_port": config["mqtt_port"],
|
|
"mqtt_username": config["mqtt_ro_username"],
|
|
"mqtt_password": config["mqtt_ro_password"],
|
|
"lights": config['lights']
|
|
})
|
|
|
|
def publish(path, payload):
|
|
mqtt.publish.single(
|
|
path,
|
|
payload,
|
|
qos=1,
|
|
hostname=config["mqtt_server"],
|
|
auth = {
|
|
'username':config["mqtt_rw_username"],
|
|
'password':config["mqtt_rw_password"]
|
|
}
|
|
)
|
|
|
|
@application.route('/light/<lid>/on/set', methods=['POST'])
|
|
def set_light(lid):
|
|
if lid in config["lights"]:
|
|
publish(config["lights"][lid]["mqtt_path"] + "/set", "true")
|
|
return flask.jsonify({"ok": True})
|
|
|
|
|
|
@application.route('/light/<lid>/on/toggle', methods=['POST'])
|
|
def toggle_light(lid):
|
|
if lid in config["lights"]:
|
|
publish(config["lights"][lid]["mqtt_path"] + "/toggle", "true")
|
|
return flask.jsonify({"ok": True})
|
|
|
|
if __name__ == "__main__":
|
|
application.run(host='127.0.0.1',port=8000,debug=True)
|
|
|