keymouse: Created input loop decorators. Added tests. Added mouse events and mouse event handling. Added mouse interaction to conway test.
This commit is contained in:
@@ -0,0 +1,90 @@
|
||||
from cvpubsubs.window_sub.winctrl import WinCtrl
|
||||
import threading
|
||||
|
||||
if False:
|
||||
from typing import Callable
|
||||
from cvpubsubs.window_sub.mouse_event import MouseEvent
|
||||
|
||||
|
||||
class mouse_thread(object): # NOSONAR
|
||||
|
||||
def __init__(self, f):
|
||||
self.f = f
|
||||
self.sub_mouse = WinCtrl.mouse_pub.make_sub()
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.f(self.sub_mouse, *args, **kwargs)
|
||||
|
||||
|
||||
class mouse_loop_thread(object): # NOSONAR
|
||||
|
||||
def __init__(self, f, run_when_no_events=False):
|
||||
self.f = f
|
||||
self.sub_mouse = WinCtrl.mouse_pub.make_sub()
|
||||
self.sub_cmd = WinCtrl.win_cmd_pub.make_sub()
|
||||
self.sub_cmd.return_on_no_data = ''
|
||||
self.run_when_no_events = run_when_no_events
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
msg_cmd = ''
|
||||
while msg_cmd != 'quit':
|
||||
mouse_xyzclick = self.sub_mouse.get() # type: MouseEvent
|
||||
if mouse_xyzclick is not self.sub_mouse.return_on_no_data:
|
||||
self.f(mouse_xyzclick, *args, **kwargs)
|
||||
elif self.run_when_no_events:
|
||||
self.f(None, *args, **kwargs)
|
||||
msg_cmd = self.sub_cmd.get()
|
||||
WinCtrl.quit(force_all_read=False)
|
||||
|
||||
|
||||
class mouse_loop(object): # NOSONAR
|
||||
|
||||
def __init__(self, f, run_when_no_events=False):
|
||||
self.t = threading.Thread(target=mouse_loop_thread(f, run_when_no_events))
|
||||
self.t.start()
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.t
|
||||
|
||||
|
||||
class key_thread(object): # NOSONAR
|
||||
|
||||
def __init__(self, f):
|
||||
self.f = f
|
||||
self.sub_key = WinCtrl.key_pub.make_sub()
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.f(self.sub_key, *args, **kwargs)
|
||||
|
||||
|
||||
class key_loop_thread(object): # NOSONAR
|
||||
|
||||
def __init__(self, f, run_when_no_events=False):
|
||||
self.f = f
|
||||
self.sub_key = WinCtrl.key_pub.make_sub()
|
||||
self.sub_cmd = WinCtrl.win_cmd_pub.make_sub()
|
||||
self.sub_cmd.return_on_no_data = ''
|
||||
self.run_when_no_events = run_when_no_events
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
msg_cmd = ''
|
||||
while msg_cmd != 'quit':
|
||||
key_chr = self.sub_key.get() # type: chr
|
||||
if key_chr is not self.sub_key.return_on_no_data:
|
||||
self.f(key_chr, *args, **kwargs)
|
||||
elif self.run_when_no_events:
|
||||
self.f(None, *args, **kwargs)
|
||||
msg_cmd = self.sub_cmd.get()
|
||||
WinCtrl.quit(force_all_read=False)
|
||||
|
||||
|
||||
class key_loop(object): # NOSONAR
|
||||
|
||||
def __init__(self,
|
||||
f, # type: Callable[[chr],None]
|
||||
run_when_no_events=False):
|
||||
self.t = threading.Thread(target=key_loop_thread(f, run_when_no_events))
|
||||
self.t.start()
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.t
|
||||
@@ -6,6 +6,7 @@ import numpy as np
|
||||
from .winctrl import WinCtrl
|
||||
from cvpubsubs.webcam_pub.camctrl import CamCtrl
|
||||
from localpubsub import NoData
|
||||
from cvpubsubs.window_sub.mouse_event import MouseEvent
|
||||
|
||||
if False:
|
||||
from typing import List, Union, Callable, Any
|
||||
@@ -36,6 +37,9 @@ class SubscriberWindows(object):
|
||||
|
||||
self.callbacks = callbacks
|
||||
self.input_cams = video_sources
|
||||
for name in self.window_names:
|
||||
cv2.namedWindow(name + " (press ESC to quit)")
|
||||
cv2.setMouseCallback(name + " (press ESC to quit)", self.handle_mouse)
|
||||
|
||||
@staticmethod
|
||||
def set_global_frame_dict(name, *args):
|
||||
@@ -67,6 +71,10 @@ class SubscriberWindows(object):
|
||||
RuntimeWarning("Unknown key code: [{}]. Please report to cv_pubsubs issue page.".format(key_input))
|
||||
)
|
||||
|
||||
def handle_mouse(self, event, x, y, flags, param):
|
||||
mousey = MouseEvent(event, x, y, flags, param)
|
||||
WinCtrl.mouse_pub.publish(mousey)
|
||||
|
||||
def _display_frames(self, frames, win_num):
|
||||
if isinstance(frames, Exception):
|
||||
raise frames
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
class MouseEvent(object):
|
||||
def __init__(self, event, x, y, flags, param):
|
||||
self.event = event
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.flags = flags
|
||||
self.param = param
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def __str__(self):
|
||||
return "event:{}\nx,y:{},{}\nflags:{}\nparam:{}\n".format(self.event, self.x, self.y, self.flags, self.param)
|
||||
@@ -6,6 +6,7 @@ from localpubsub import VariablePub, VariableSub
|
||||
|
||||
class WinCtrl(object):
|
||||
key_pub = VariablePub()
|
||||
mouse_pub = VariablePub()
|
||||
win_cmd_pub = VariablePub()
|
||||
|
||||
@staticmethod
|
||||
|
||||
+26
-20
@@ -4,33 +4,29 @@ import unittest as ut
|
||||
import cvpubsubs.webcam_pub as w
|
||||
from cvpubsubs.window_sub import SubscriberWindows
|
||||
from cvpubsubs.window_sub.winctrl import WinCtrl
|
||||
from cvpubsubs.input import mouse_loop, key_loop
|
||||
|
||||
if False:
|
||||
import numpy as np
|
||||
|
||||
|
||||
def print_keys_thread():
|
||||
sub_key = WinCtrl.key_pub.make_sub()
|
||||
sub_cmd = WinCtrl.win_cmd_pub.make_sub()
|
||||
sub_cmd.return_on_no_data = ''
|
||||
msg_cmd = ''
|
||||
while msg_cmd != 'quit':
|
||||
key_chr = sub_key.get(sub_key) # type: np.ndarray
|
||||
WinCtrl.key_pub.publish(None) # consume data
|
||||
if key_chr is not None:
|
||||
print("key pressed: " + str(key_chr))
|
||||
msg_cmd = sub_cmd.get()
|
||||
WinCtrl.quit(force_all_read=False)
|
||||
|
||||
|
||||
def start_print_keys_thread(): # type: (...) -> threading.Thread
|
||||
t = threading.Thread(target=print_keys_thread, args=())
|
||||
t.start()
|
||||
return t
|
||||
from cvpubsubs.window_sub.mouse_event import MouseEvent
|
||||
|
||||
|
||||
class TestSubWin(ut.TestCase):
|
||||
|
||||
def test_mouse_loop(self):
|
||||
@mouse_loop
|
||||
def print_mouse_thread(mouse_event):
|
||||
print(mouse_event)
|
||||
|
||||
w.VideoHandlerThread().display()
|
||||
|
||||
def test_key_loop(self):
|
||||
@key_loop
|
||||
def print_key_thread(key_chr):
|
||||
print("key pressed: " + str(key_chr))
|
||||
|
||||
w.VideoHandlerThread().display()
|
||||
|
||||
def test_sub(self):
|
||||
w.VideoHandlerThread().display()
|
||||
|
||||
@@ -136,6 +132,7 @@ class TestSubWin(ut.TestCase):
|
||||
from cvpubsubs.webcam_pub import VideoHandlerThread
|
||||
from cvpubsubs.callbacks import function_display_callback
|
||||
import numpy as np
|
||||
import cv2
|
||||
img = np.zeros((50, 50, 1))
|
||||
img[0:5, 0:5, :] = 1
|
||||
|
||||
@@ -152,4 +149,13 @@ class TestSubWin(ut.TestCase):
|
||||
if neighbors == 3:
|
||||
array[coords] = 1.0
|
||||
|
||||
@mouse_loop
|
||||
def conway_add(mouse_event # type:MouseEvent
|
||||
):
|
||||
if 0 <= mouse_event.x < 50 and 0 <= mouse_event.y < 50:
|
||||
if mouse_event.flags == cv2.EVENT_FLAG_LBUTTON:
|
||||
img[mouse_event.y - 5:mouse_event.y + 10, mouse_event.x - 5:mouse_event.x + 10, :] = 0.0
|
||||
elif mouse_event.flags == cv2.EVENT_FLAG_RBUTTON:
|
||||
img[mouse_event.y - 5:mouse_event.y + 10, mouse_event.x - 5:mouse_event.x + 10, :] = 1.0
|
||||
|
||||
VideoHandlerThread(video_source=img, callbacks=function_display_callback(conway)).display()
|
||||
|
||||
Reference in New Issue
Block a user