Changed message queue... but still not happy with that...

master
Gina Häußge 2013-01-10 21:02:47 +01:00
parent d9f31fccfd
commit e51d7dc7a0
3 changed files with 115 additions and 47 deletions

View File

@ -5,6 +5,7 @@ __license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agp
import time
from threading import Thread
import Queue
import collections
import printer_webui.util.comm as comm
from printer_webui.util import gcodeInterpreter
@ -69,7 +70,16 @@ class Printer():
self._callbacks = []
self._lastProgressReport = None
self._updateQueue = Queue.Queue()
self._updateQueue = MessageQueue()
self._updateQueue.registerMessageType("zchange", self._sendZChangeCallbacks, overwrite=True)
self._updateQueue.registerMessageType("state", self._sendStateCallbacks)
self._updateQueue.registerMessageType("temperature", self._sendTemperatureCallbacks, mergeFunction=(lambda x,y: x + y))
self._updateQueue.registerMessageType("log", self._sendLogCallbacks, mergeFunction=(lambda x, y: x + y))
self._updateQueue.registerMessageType("message", self._sendMessageCallbacks, mergeFunction=(lambda x, y: x + y))
self._updateQueue.registerMessageType("progress", self._sendProgressCallbacks, overwrite=True)
self._updateQueue.registerMessageType("job", self._sendJobCallbacks, throttling=0.5)
self._updateQueue.registerMessageType("gcode", self._sendGcodeCallbacks, throttling=0.5)
self._updateQueueWorker = Thread(target=self._processQueue)
self._updateQueueWorker.start()
@ -95,17 +105,17 @@ class Printer():
def _sendTemperatureCallbacks(self, data):
for callback in self._callbacks:
try: callback.temperatureChangeCB(data["currentTime"], data["temp"], data["bedTemp"], data["targetTemp"], data["targetBedTemp"])
try: callback.temperatureChangeCB(data)
except: pass
def _sendLogCallbacks(self, data):
for callback in self._callbacks:
try: callback.logChangeCB(data["log"])
try: callback.logChangeCB(data)
except: pass
def _sendMessageCallbacks(self, data):
for callback in self._callbacks:
try: callback.messageChangeCB(data["message"])
try: callback.messageChangeCB(data)
except: pass
def _sendProgressCallbacks(self, data):
@ -124,12 +134,9 @@ class Printer():
except:
pass
def _addUpdate(self, target, data):
self._updateQueue.put((target, data))
def _processQueue(self):
while True:
(target, data) = self._updateQueue.get()
(target, data) = self._updateQueue.read()
target(data)
self._updateQueue.task_done()
@ -234,11 +241,11 @@ class Printer():
def _setCurrentZ(self, currentZ):
self._currentZ = currentZ
self._addUpdate(self._sendZChangeCallbacks, {"currentZ": self._currentZ})
self._updateQueue.message("zchange", {"currentZ": self._currentZ})
def _setState(self, state):
self._state = state
self._addUpdate(self._sendStateCallbacks, {"state": self._state, "stateString": self.getStateString(), "stateFlags": self._getStateFlags()})
self._updateQueue.message("state", {"state": self._state, "stateString": self.getStateString(), "stateFlags": self._getStateFlags()})
def _addLog(self, log):
"""
@ -247,13 +254,13 @@ class Printer():
self._latestLog = log
self._log.append(log)
self._log = self._log[-300:]
self._addUpdate(self._sendLogCallbacks, {"log": self._latestLog})
self._updateQueue.message("log", [self._latestLog])
def _addMessage(self, message):
self._latestMessage = message
self._messages.append(message)
self._messages = self._messages[-300:]
self._addUpdate(self._sendLogCallbacks, {"message": self._latestLog})
self._updateQueue.message("message", [self._latestLog])
def _setProgressData(self, progress, printTime, printTimeLeft):
self._progress = progress
@ -261,7 +268,7 @@ class Printer():
self._printTimeLeft = printTimeLeft
#if not self._lastProgressReport or self._lastProgressReport + 0.5 <= time.time():
self._addUpdate(self._sendProgressCallbacks, {"progress": self._progress, "printTime": self._printTime, "printTimeLeft": self._printTimeLeft})
self._updateQueue.message("progress", {"progress": self._progress, "printTime": self._printTime, "printTimeLeft": self._printTimeLeft})
# self._lastProgressReport = time.time()
def _addTemperatureData(self, temp, bedTemp, targetTemp, bedTargetTemp):
@ -288,7 +295,7 @@ class Printer():
self._targetTemp = targetTemp
self._targetBedTemp = bedTargetTemp
self._addUpdate(self._sendTemperatureCallbacks, {"currentTime": currentTime, "temp": self._temp, "bedTemp": self._bedTemp, "targetTemp": self._targetTemp, "targetBedTemp": self._targetBedTemp, "history": self._temps})
self._updateQueue.message("temperature", [{"currentTime": currentTime, "temp": self._temp, "bedTemp": self._bedTemp, "targetTemp": self._targetTemp, "targetBedTemp": self._targetBedTemp, "history": self._temps}])
def _setJobData(self, filename, gcode, gcodeList):
self._filename = filename
@ -305,7 +312,7 @@ class Printer():
estimatedPrintTime = self._gcode.totalMoveTimeMinute
filament = self._gcode.extrusionAmount
self._addUpdate(self._sendJobCallbacks, {"filename": self._filename, "lines": lines, "estimatedPrintTime": estimatedPrintTime, "filament": filament})
self._updateQueue.message("job", {"filename": self._filename, "lines": lines, "estimatedPrintTime": estimatedPrintTime, "filament": filament})
def _sendInitialStateUpdate(self, callback):
lines = None
@ -321,10 +328,8 @@ class Printer():
try:
callback.zChangeCB(self._currentZ)
callback.stateChangeCB(self._state, self.getStateString(), self._getStateFlags())
callback.logChangeCB(self._latestLog)
callback.messageChangeCB(self._latestMessage)
callback.progressChangeCB(self._progress, self._printTime, self._printTimeLeft)
callback.temperatureChangeCB(time.time() * 1000, self._temp, self._bedTemp, self._targetTemp, self._targetBedTemp)
callback.temperatureChangeCB([{"currentTime": time.time() * 1000, "temp": self._temp, "bedTemp": self._bedTemp, "targetTemp": self._targetTemp, "bedTargetTemp": self._targetBedTemp}])
callback.jobDataChangeCB(self._filename, lines, estimatedPrintTime, filament)
callback.sendHistoryData(self._temps, self._log, self._messages)
except Exception, err:
@ -402,7 +407,7 @@ class Printer():
#~~ callbacks triggered by gcodeLoader
def onGcodeLoadingProgress(self, progress):
self._addUpdate(self._sendGcodeCallbacks, {"filename": self._gcodeLoader._filename, "progress": progress})
self._updateQueue.message("gcode", {"filename": self._gcodeLoader._filename, "progress": progress})
def onGcodeLoaded(self):
self._setJobData(self._gcodeLoader._filename, self._gcodeLoader._gcode, self._gcodeLoader._gcodeList)
@ -410,7 +415,7 @@ class Printer():
self._setProgressData(None, None, None)
self._gcodeLoader = None
self._addUpdate(self._sendStateCallbacks, {"state": self._state, "stateString": self.getStateString(), "stateFlags": self._getStateFlags()})
self._updateQueue.message("state", {"state": self._state, "stateString": self.getStateString(), "stateFlags": self._getStateFlags()})
#~~ state reports
@ -540,4 +545,61 @@ class PrinterCallback(object):
pass
def sendHistoryData(self, tempHistory, logHistory, messageHistory):
pass
pass
class MessageQueue(Queue.Queue):
def __init__(self, maxsize=0):
Queue.Queue.__init__(self, maxsize)
self._messageTypes = dict()
self._lastSends = dict()
def registerMessageType(self, messageType, callback, overwrite=False, throttling=None, mergeFunction=None):
self._messageTypes[messageType] = (callback, overwrite, throttling, mergeFunction)
if throttling is not None:
self._lastSends[messageType] = time.time()
def message(self, messageType, data, timestamp=time.time()):
if not self._messageTypes.has_key(messageType):
return
(callback, overwrite, throttling, merger) = self._messageTypes[messageType]
updated = False
try:
self.mutex.acquire()
if overwrite or throttling is not None or merger is not None:
for item in self.queue:
if item.type == messageType and ((throttling is not None and item.timestamp + throttling < time.time()) or overwrite or merger is not None):
if merger is not None:
item.payload = merger(item.payload, data)
else:
item.payload = data
updated = True
break
finally:
self.mutex.release()
if not updated:
item = MessageQueueItem(messageType, timestamp, data)
self.put(item)
def read(self):
item = None
while item is None:
item = self.get()
if not self._messageTypes.has_key(item.type):
self.task_done()
item = None
(callback, overwrite, throttling, merger) = self._messageTypes[item.type]
if throttling and self._lastSends[item.type] + throttling > time.time():
self.message(item.type, item.payload, item.timestamp)
item = None
self._lastSends[item.type] = time.time()
return (callback, item.payload)
class MessageQueueItem(object):
def __init__(self, type, timestamp, payload):
self.type = type
self.timestamp = timestamp
self.payload = payload

View File

@ -68,31 +68,25 @@ class PrinterStateConnection(tornadio2.SocketConnection, PrinterCallback):
"printTimeLeft": formattedPrintTimeLeft
})
def temperatureChangeCB(self, currentTime, temp, bedTemp, targetTemp, targetBedTemp):
def temperatureChangeCB(self, temperatures):
print("Sending temperatureChange...")
self.emit("temperature", {
"currentTime": currentTime,
"temp": temp,
"bedTemp": bedTemp,
"targetTemp": targetTemp,
"targetBedTemp": targetBedTemp
})
self.emit("temperatures", temperatures)
def stateChangeCB(self, state, stateString, booleanStates):
print("Sending stateChange...")
self.emit("state", {"currentState": stateString, "flags": booleanStates})
def logChangeCB(self, line):
def logChangeCB(self, lines):
print("Sending logChange...")
self.emit("log", {"line": line})
self.emit("log", {"lines": lines})
def messageChangeCB(self, line):
def messageChangeCB(self, lines):
print("Sending messageChange...")
self.emit("message", {"line": line})
self.emit("message", {"lines": lines})
def gcodeChangeCB(self, filename, progress):
print("Sending gcodeChange...")
self.emit("jobData", {"filename": "Loading... (%d%%)" % (round(progress * 100)), "lineCount": None, "estimatedPrintTime": None, "filament": None})
self.emit("gcode", {"filename": filename, "progress": progress})
def jobDataChangeCB(self, filename, lines, estimatedPrintTimeInMinutes, filamentLengthInMillimeters):
formattedPrintTimeEstimation = None

View File

@ -157,6 +157,12 @@ function PrinterStateViewModel() {
self.filament(data.filament);
}
self.fromGcodeEvent = function(data) {
if (self.isLoading()) {
self.filename("Loading... (" + Math.round(data.progress * 100) + ")");
}
}
self.fromProgressEvent = function(data) {
self.currentLine(data.currentLine);
self.printTime(data.printTime);
@ -246,12 +252,14 @@ function TemperatureViewModel() {
}
self.fromTemperatureEvent = function(data) {
self.temp(data.temp);
self.bedTemp(data.bedTemp);
self.targetTemp(data.targetTemp);
self.bedTargetTemp(data.bedTargetTemp);
if (data.length == 0)
return;
self.temp(data[data.length - 1].temp);
self.bedTemp(data[data.length - 1].bedTemp);
self.targetTemp(data[data.length - 1].targetTemp);
self.bedTargetTemp(data[data.length - 1].bedTargetTemp);
// plot
if (!self.temperatures)
self.temperatures = [];
if (!self.temperatures.actual)
@ -263,11 +271,12 @@ function TemperatureViewModel() {
if (!self.temperatures.targetBed)
self.temperatures.targetBed = [];
self.temperatures.actual.push([data.currentTime, data.temp])
self.temperatures.target.push([data.currentTime, data.targetTemp])
self.temperatures.actualBed.push([data.currentTime, data.bedTemp])
self.temperatures.targetBed.push([data.currentTime, data.bedTargetTemp])
for (var i = 0; i < data.length; i++) {
self.temperatures.actual.push([data[i].currentTime, data[i].temp])
self.temperatures.target.push([data[i].currentTime, data[i].targetTemp])
self.temperatures.actualBed.push([data[i].currentTime, data[i].bedTemp])
self.temperatures.targetBed.push([data[i].currentTime, data[i].bedTargetTemp])
}
self.temperatures.actual = self.temperatures.actual.slice(-300);
self.temperatures.target = self.temperatures.target.slice(-300);
self.temperatures.actualBed = self.temperatures.actualBed.slice(-300);
@ -361,7 +370,7 @@ function TerminalViewModel() {
self.fromLogEvent = function(data) {
if (!self.log)
self.log = []
self.log.push(data.line)
self.log.concat(data.line)
self.updateOutput();
}
@ -547,13 +556,16 @@ function DataUpdater(connectionViewModel, printerStateViewModel, temperatureView
self.speedViewModel.fromStateEvent(data);
self.webcamViewModel.fromStateEvent(data);
})
self.socket.on("temperature", function(data) {
self.socket.on("temperatures", function(data) {
self.temperatureViewModel.fromTemperatureEvent(data);
})
self.socket.on("jobData", function(data) {
self.printerStateViewModel.fromJobEvent(data);
})
self.socket.on("log", function(data) {
self.socket.on("gcode", function(data) {
self.printerStateViewModel.fromGcodeEvent(data);
})
self.socket.on("logs", function(data) {
self.terminalViewModel.fromLogEvent(data);
})
self.socket.on("printProgress", function(data) {