JunkVision/junkvision.py

279 lines
10 KiB
Python

import cv2
import cv2.cv
import numpy as np
from functools import partial
class AreaSelector(object):
def __init__(self, window_name, orig_img, max_areas = 4):
self.window_name = window_name
self.orig_img = orig_img
self.selection = False
self.area = 0
self.max_areas = max_areas
self.rects = []
for i in xrange(max_areas):
self.rects.append([])
self.colors = [
(0,255,0),
(255,0,127),
(0,127,255),
(255,255,0)
]
@staticmethod
def areasSelection(event, x, y, flags, self):
if event == cv2.EVENT_FLAG_RBUTTON:
self.area += 1
self.area %= self.max_areas
if self.selection:
self.selection = False
self.redrawRects()
elif event == cv2.EVENT_LBUTTONDOWN and not self.selection:
self.selection = True
self.first_x = x
self.first_y = y
elif event == cv2.EVENT_LBUTTONDOWN and self.selection:
self.selection = False
self.redrawRects()
elif event == cv2.EVENT_MOUSEMOVE and self.selection:
self.rects[self.area] = (min(self.first_x, x), min(self.first_y, y), max(self.first_x, x), max(self.first_y, y))
self.redrawRects()
def redrawRects(self):
cv2.imshow(self.window_name, self.orig_img)
new_img = self.orig_img.copy()
for i, r in enumerate(self.rects):
if not r:
continue
cv2.rectangle(new_img, (r[0], r[1]), (r[2], r[3]), self.colors[i], 4)
cv2.imshow(self.window_name, new_img)
class CorrectionSelector(object):
def __init__(self, window_name, orig_img):
self.selecting_rect = True
self.points = []
self.cur_point = 0
self.orig_img = orig_img
self.window_name = window_name
self.colors = [
(0,255,127),
(255,0,127),
(0,127,255),
(255,255,0)
]
self.transform = []
self.rect_x = 0
self.rect_y = 0
@staticmethod
def correctionSelection(event, x, y, flags, self):
if self.selecting_rect:
if event == cv2.EVENT_FLAG_RBUTTON:
self.selecting_rect = False
self.redraw()
elif event == cv2.EVENT_FLAG_LBUTTON:
self.rect_x = x
self.rect_y = y
elif event == cv2.EVENT_MOUSEMOVE:
self.rect = min(self.rect_x, x), min(self.rect_y, y), max(self.rect_x, x), max(self.rect_y, y)
self.redraw()
else:
if event == cv2.EVENT_FLAG_RBUTTON:
self.redraw()
self.calc_transform()
self.selecting_rect = True
elif event == cv2.EVENT_FLAG_LBUTTON:
if len(self.points) < 4:
self.points.append((x,y))
else:
self.points[self.cur_point] = (x,y)
self.cur_point += 1
self.cur_point %= 4
self.redraw()
def redraw(self):
cv2.imshow(self.window_name, self.orig_img)
new_img = self.orig_img.copy()
if self.rect:
cv2.rectangle(new_img, (self.rect[0], self.rect[1]), (self.rect[2], self.rect[3]), (0, 255, 0), 4)
self.points.sort()
for i, p in enumerate(self.points):
cv2.circle(new_img, p, 4, self.colors[i], -1)
cv2.imshow(self.window_name, new_img)
def calc_transform(self):
self.rect_points = [
(self.rect[0], self.rect[1]),
(self.rect[0], self.rect[3]),
(self.rect[2], self.rect[1]),
(self.rect[2], self.rect[3])
]
sorted_points = []
best_match = 0
for i in xrange(4):
min_delta = cap_width * cap_height
for j in xrange(4):
delta = (abs(self.rect_points[i][0] - self.points[j][0]), abs(self.rect_points[i][1] - self.points[j][1]))
print i, j, delta
if delta[0] + delta[1] < min_delta:
best_match = j
min_delta = delta[0] + delta[1]
print best_match
sorted_points.append(self.points[best_match])
print np.array(self.points), np.array(self.rect_points), np.array(sorted_points)
self.transform = cv2.getPerspectiveTransform(np.array(sorted_points, dtype=np.float32), np.array(self.rect_points, dtype=np.float32))
print self.transform
def get_transform(self):
return self.transform
def check_mess_fun(i, r):
print "Will check for mess on desk:", i, "rect:", r
def main():
vc = cv2.VideoCapture(1)
if not vc.isOpened():
raise RuntimeError("Failed to open camera!")
ret, img = vc.read()
if not ret:
raise RuntimeError("Failed to initialize camera!")
cap_width = int(vc.get(cv2.cv.CV_CAP_PROP_FRAME_WIDTH))
cap_height = int(vc.get(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT))
# CV_CAP_PROP_FPS not supported...
fps = 60.0
orig_img = np.array([0])
cnt = 0
while cnt < 100:
ret, orig_img = vc.read()
if not ret:
raise RuntimeError("Couldn't get non-mess image!")
cnt += 1
#print "Got image on frame", cnt
ret, orig_img = vc.read()
orig_img = cv2.medianBlur(orig_img, 3)
cv2.namedWindow("detection_areas")
#cv2.imshow("detection_areas", orig_img)
#correction_selector = CorrectionSelector("detection_areas", orig_img)
#cv2.setMouseCallback("detection_areas", CorrectionSelector.correctionSelection, correction_selector)
#cv2.waitKey()
#perspective = correction_selector.get_transform()
perspective = np.array(
[[ 1.33036171e+00, 3.18020707e-01, -1.38751879e+02],
[ -1.71116647e-01, 1.55350072e+00, -8.06609130e+00],
[ -3.52454848e-04, 1.09892154e-03, 1.00000000e+00]]
)
orig_img = cv2.warpPerspective(orig_img, perspective, (orig_img.shape[1], orig_img.shape[0]))
#cv2.imshow("detection_areas", orig_img)
area_selector = AreaSelector("detection_areas", orig_img)
#cv2.setMouseCallback("detection_areas", AreaSelector.areasSelection, area_selector)
#cv2.waitKey()
#print area_selector.rects
area_selector.rects = [(107, 68, 282, 209), (313, 67, 493, 308), (316, 323, 489, 475), (105, 209, 288, 476)]
cv2.destroyWindow("detection_areas")
cv2.namedWindow("test")
orig_img_bw = cv2.cvtColor(orig_img, cv2.cv.CV_BGR2GRAY)
#tmp_buf = np.zeros((16,16), dtype=np.float32)
#for i in xrange(0, cap_height, 16):
# for j in xrange(0, cap_width, 16):
# cv2.normalize(orig_img_bw[i:i+16,j:j+16].astype(np.float32), tmp_buf, 0, 255, cv2.NORM_MINMAX)
# orig_img_bw[i:i+16,j:j+16] = tmp_buf.astype(np.uint8)
#orig_img_bw = orig_img_bw[:,:,0]/255.0 * 0.2 + orig_img_bw[:,:,1]/255.0 * 0.4 + orig_img_bw[:,:,2]/255.0 * 0.4
#orig_img_bw *= 255
#orig_img_bw = orig_img_bw.astype(np.uint8)
#orig_img_bw /= np.iinfo(orig_img.dtype).max
#cv2.normalize(orig_img[:,:,0].astype(np.float32), orig_img_bw, 0.0, 1.0, norm_type=cv2.NORM_MINMAX)
cv2.imshow("test", orig_img_bw)
#print orig_img_bw.min(), orig_img_bw.max()
cv2.waitKey()
avgimg = np.zeros((cap_height, cap_width), dtype=np.float32)
tmp_img_bw = np.zeros((cap_height, cap_width), dtype=np.float32)
"""while True:
ret, img = vc.read()
if not ret:
raise RuntimeError("Failed to read image!")
img_bw = cv2.cvtColor(img, cv2.cv.CV_BGR2GRAY)
#img_bw = img_bw[:,:,0]/255.0 * 0.2 + img_bw[:,:,1]/255.0 * 0.4 + img_bw[:,:,2]/255.0 * 0.4
#img_bw *= 255
#img_bw = img_bw.astype(np.uint8)
#tmp_img_bw = cv2.medianBlur(img_bw, 17)
#tmp_img_bw /= np.iinfo(img_bw.dtype).max
#cv2.normalize(img_bw.astype(np.float32), tmp_img_bw, 0.0, 1.0, norm_type=cv2.NORM_MINMAX)
img_bw = cv2.medianBlur(img_bw, 9)
for i in xrange(0, cap_height, 16):
for j in xrange(0, cap_width, 16):
cv2.normalize(img_bw[i:i+16,j:j+16].astype(np.float32), tmp_buf, 0, 255, cv2.NORM_MINMAX)
img_bw[i:i+16,j:j+16] = tmp_buf.astype(np.uint8)
diffimg = cv2.absdiff(orig_img_bw, img_bw)
cv2.accumulateWeighted(diffimg.astype(np.float32)/255.0, avgimg, 0.3)
#diffimg /= diffimg.max()
cv2.imshow("test", avgimg)
if cv2.waitKey(10) == 27:
break
print np.sum(diffimg)
"""
fg_mask = np.zeros((cap_height, cap_width), dtype=np.uint8)
from threading import Timer
timeout = 5.0
timers = []
check_mess = []
for i, r in enumerate(area_selector.rects):
fun = partial(check_mess_fun, i, r)
check_mess.append(fun)
if not r:
timers.append(None)
else:
t = Timer(timeout, fun)
timers.append(t)
t.start()
for i, f in enumerate(check_mess):
print i, id(check_mess[i])
bgsub = cv2.BackgroundSubtractorMOG(int(1*fps), 2, 0.05)
bgsub.apply(orig_img_bw)
while True:
ret, img = vc.read()
if not ret:
raise RuntimeError("Failed to read image!")
img = cv2.medianBlur(img, 3)
img = cv2.warpPerspective(img, perspective, (img.shape[1], img.shape[0]))
img_bw = cv2.cvtColor(img, cv2.cv.CV_BGR2GRAY)
bgsub.apply(img_bw, fg_mask, 0.03)
fg_mask2 = cv2.cvtColor(fg_mask, cv2.cv.CV_GRAY2RGB)
for i, rect in enumerate(area_selector.rects):
if rect:
if np.any(fg_mask[rect[1]:rect[3],rect[0]:rect[2]]):
timers[i].cancel()
timers[i] = Timer(timeout, check_mess[i])
timers[i].start()
cv2.rectangle(fg_mask2, (rect[0], rect[1]), (rect[2], rect[3]), area_selector.colors[i], 4)
else:
#print rect
cv2.rectangle(fg_mask2, (rect[0], rect[1]), (rect[2], rect[3]), area_selector.colors[i], 1)
cv2.imshow("test", fg_mask2)
if cv2.waitKey(20) == 27:
break
for t in timers:
t.cancel()
if __name__ == "__main__":
main()