import paho.mqtt.client as mqtt import threading import re import datetime import copy con = mqtt.Client() con.connect('10.8.1.16') con.subscribe('iot/+/+/energy') class Meters: class Meter: def __init__(self, _id, history_size = 10): self.id = _id self.energy = [] self.history_size = 10 def add_energy_measurement(self, energy): self.energy.append((datetime.datetime.now(), energy)) self.energy = self.energy[-self.history_size:] def get_metrics(self): if len(self.energy) > 0: last = self.energy[-1] s = ('energy{{node="esp8266-{mid:s}"}} {energy:f} ' '{epoch:d}\n').format(mid=self.id, energy=last[1], epoch=int(last[0].timestamp() * 1000)) return s else: return '' def __init__(self): self._meters = {} self._lock = threading.Lock() def _get_meter_by_id(self, _id): if _id not in self._meters: self._meters[_id] = self.Meter(_id) return self._meters[_id] def add_energy_measurement(self, meter_id, energy): with self._lock: meter = self._get_meter_by_id(meter_id) meter.add_energy_measurement(energy) def get_metrics(self): with self._lock: s = '' for meter in self._meters.values(): s += meter.get_metrics() return s meters = Meters() def on_message(client, userdata, message): m = re.match('iot/(?P[^/]+)/[^/]+/energy', message.topic) if m is not None: try: energy = int(re.match('^(?P[0-9]*) J', message.payload.decode('ASCII')).group('joules')) except AttributeError: pass else: if message.retain == 0: mid = m.group('id') meters.add_energy_measurement(mid, energy) con.on_message = on_message con.loop_start() import flask app = flask.Flask(__name__) @app.route('/metrics') def metrics(): r = "" r += "# HELP energy energy in joules\n" r += "# TYPE energy counter\n" r += meters.get_metrics() return flask.Response(r, mimetype='text/plain; version=0.0.4') app.run('0.0.0.0')