# coding=utf-8 __author__ = "Gina Häußge " __license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html' import os import Queue import threading import datetime import yaml import time import octoprint.util as util import octoprint.util.gcodeInterpreter as gcodeInterpreter from octoprint.settings import settings from werkzeug.utils import secure_filename class GcodeManager: def __init__(self): self._uploadFolder = settings().getBaseFolder("uploads") self._metadata = {} self._metadataDirty = False self._metadataFile = os.path.join(self._uploadFolder, "metadata.yaml") self._metadataFileAccessMutex = threading.Lock() self._metadataAnalyzer = MetadataAnalyzer(getPathCallback=self.getAbsolutePath, loadedCallback=self._onMetadataAnalysisFinished) self._loadMetadata() def _onMetadataAnalysisFinished(self, filename, gcode): print("Got gcode analysis for file %s: %r" % (filename, gcode)) if filename is None or gcode is None: return basename = os.path.basename(filename) absolutePath = self.getAbsolutePath(basename) if absolutePath is None: return analysisResult = {} dirty = False if gcode.totalMoveTimeMinute: analysisResult["estimatedPrintTime"] = util.getFormattedTimeDelta(datetime.timedelta(minutes=gcode.totalMoveTimeMinute)) dirty = True if gcode.extrusionAmount: analysisResult["filament"] = "%.2fm" % (gcode.extrusionAmount / 1000) dirty = True if dirty: metadata = self.getFileMetadata(basename) metadata["gcodeAnalysis"] = analysisResult self._metadata[basename] = metadata self._metadataDirty = True self._saveMetadata() def _loadMetadata(self): if os.path.exists(self._metadataFile) and os.path.isfile(self._metadataFile): with self._metadataFileAccessMutex: with open(self._metadataFile, "r") as f: self._metadata = yaml.safe_load(f) def _saveMetadata(self, force=False): if not self._metadataDirty and not force: return with self._metadataFileAccessMutex: with open(self._metadataFile, "wb") as f: yaml.safe_dump(self._metadata, f, default_flow_style=False, indent=" ", allow_unicode=True) self._metadataDirty = False self._loadMetadata() def _getBasicFilename(self, filename): if filename.startswith(self._uploadFolder): return filename[len(self._uploadFolder + os.path.sep):] else: return filename def addFile(self, file): if file: absolutePath = self.getAbsolutePath(file.filename, mustExist=False) if absolutePath is not None: file.save(absolutePath) self._metadataAnalyzer.addFileToQueue(os.path.basename(absolutePath)) return absolutePath return None def removeFile(self, filename): filename = self._getBasicFilename(filename) absolutePath = self.getAbsolutePath(filename) if absolutePath is not None: os.remove(absolutePath) def getAbsolutePath(self, filename, mustExist=True): """ Returns the absolute path of the given filename in the gcode upload folder. Ensures that @param filename the name of the file for which to determine the absolute path @param mustExist if set to true, the method also checks if the file exists and is a file @return the absolute path of the file or None if the file is not valid """ filename = self._getBasicFilename(filename) if not util.isAllowedFile(filename, set(["gcode"])): return None secure = os.path.join(self._uploadFolder, secure_filename(self._getBasicFilename(filename))) if mustExist and (not os.path.exists(secure) or not os.path.isfile(secure)): return None return secure def getAllFileData(self): files = [] for osFile in os.listdir(self._uploadFolder): fileData = self.getFileData(osFile) if fileData is not None: files.append(fileData) return files def getFileData(self, filename): filename = self._getBasicFilename(filename) absolutePath = self.getAbsolutePath(filename) if absolutePath is None: return None statResult = os.stat(absolutePath) fileData = { "name": filename, "size": util.getFormattedSize(statResult.st_size), "date": util.getFormattedDateTime(datetime.datetime.fromtimestamp(statResult.st_ctime)) } # enrich with additional metadata from analysis if available if filename in self._metadata.keys(): for key in self._metadata[filename].keys(): if key == "prints": val = self._metadata[filename][key] formattedLast = None if val["last"] is not None: formattedLast = { "date": util.getFormattedDateTime(datetime.datetime.fromtimestamp(val["last"]["date"])), "success": val["last"]["success"] } formattedPrints = { "success": val["success"], "failure": val["failure"], "last": formattedLast } fileData["prints"] = formattedPrints else: fileData[key] = self._metadata[filename][key] return fileData def getFileMetadata(self, filename): filename = self._getBasicFilename(filename) if filename in self._metadata.keys(): return self._metadata[filename] else: return { "prints": { "success": 0, "failure": 0, "last": None } } def setFileMetadata(self, filename, metadata): filename = self._getBasicFilename(filename) self._metadata[filename] = metadata self._metadataDirty = True def printSucceeded(self, filename): filename = self._getBasicFilename(filename) absolutePath = self.getAbsolutePath(filename) if absolutePath is None: return metadata = self.getFileMetadata(filename) metadata["prints"]["success"] += 1 metadata["prints"]["last"] = { "date": time.time(), "success": True } self.setFileMetadata(filename, metadata) self._saveMetadata() def printFailed(self, filename): filename = self._getBasicFilename(filename) absolutePath = self.getAbsolutePath(filename) if absolutePath is None: return metadata = self.getFileMetadata(filename) metadata["prints"]["failure"] += 1 metadata["prints"]["last"] = { "date": time.time(), "success": False } self.setFileMetadata(filename, metadata) self._saveMetadata() class MetadataAnalyzer: def __init__(self, getPathCallback, loadedCallback): self._getPathCallback = getPathCallback self._loadedCallback = loadedCallback self._active = threading.Event() self._active.set() self._currentFile = None self._currentProgress = None self._queue = Queue.Queue() self._gcode = None self._worker = threading.Thread(target=self._work) self._worker.daemon = True self._worker.start() def addFileToQueue(self, filename): self._queue.put(filename) def working(self): return self.isActive() and not (self._queue.empty() and self._currentFile is None) def isActive(self): return self._active.is_set() def pause(self): self._active.clear() if self._gcode is not None: self._gcode.abort() def resume(self): self._active.set() def _work(self): aborted = None while True: self._active.wait() if aborted is not None: filename = aborted aborted = None else: filename = self._queue.get() try: self._analyzeGcode(filename) self._queue.task_done() except gcodeInterpreter.AnalysisAborted: aborted = filename def _analyzeGcode(self, filename): path = self._getPathCallback(filename) if path is None: return self._currentFile = filename self._currentProgress = 0 try: self._gcode = gcodeInterpreter.gcode() self._gcode.progressCallback = self._onParsingProgress self._gcode.load(path) self._loadedCallback(self._currentFile, self._gcode) finally: self._gcode = None self._currentProgress = None self._currentFile = None def _onParsingProgress(self, progress): self._currentProgress = progress