import paho.mqtt.client as mqtt import threading import re import datetime import copy con = mqtt.Client() con.connect('10.8.1.16') lock = threading.Lock() measurements = {} con.subscribe('iot/+/+/energy') class Measurements: class Meter: def __init__(self, _id): self.id = _id self.energy = [] def add_energy_measurement(energy): self.energy.append((datetime.datetime.now(), energy)) self.energy = self.energy[-10:] def get_metrics(): if len(self.energy) > 0: last = self.energy[-1] s = ('energy{{node="esp8266-{mid:s}"}} {energy:f} ' '{epoch:d}\n').format(mid=self.id, energy=last[1], epoch=int(last[0].timestamp() * 1000)) return s else: return '' def __init__(self): self._meters = [] self._lock = threading.Lock() def _get_meter_by_id(self, _id): if _id not in self._meters: self._meters[_id] = Meter(_id) return self._meters[_id] def add_energy_measurement(meter_id, energy): with self._lock: meter = self._get_meter_by_id(meter_id) meter.energy.append((datetime.datetime.now(), energy)) def on_message(client, userdata, message): m = re.match('iot/(?P[^/]+)/[^/]+/energy', message.topic) if m is not None: try: energy = int(re.match('^(?P[0-9]*) J', message.payload.decode('ASCII')).group('joules')) except AttributeError: pass else: if message.retain == 0: mid = m.group('id') with lock: if mid not in measurements: measurements[mid] = [] ms = measurements[mid] ms.append((datetime.datetime.now(), energy)) measurements[mid] = ms[-10:] con.on_message = on_message con.loop_start() import flask app = flask.Flask(__name__) @app.route('/metrics') def metrics(): r = "" r += "# HELP energy energy in joules\n" r += "# TYPE energy counter\n" with lock: m = copy.deepcopy(measurements) for mid in m: ms = [m[mid][-1]] for s in ms: r += 'energy{{node="esp8266-{mid:s}"}} {energy:f} {epoch:d}\n'.format(mid=mid, energy=s[1], epoch=int(s[0].timestamp() * 1000) ) return flask.Response(r, mimetype='text/plain; version=0.0.4') app.run('0.0.0.0')