callbacks: Moved global_cv_display_callback to callbacks file. Added gpu-like function_display_callback. Removed need for wp.display_callback when using .display(). Added tests and examples.

This commit is contained in:
SimLeek
2019-02-24 01:28:48 -07:00
parent 3e09a8a196
commit 767ebc4381
5 changed files with 126 additions and 20 deletions
+21
View File
@@ -88,6 +88,27 @@ Python 2.7/3.5+ and PyPy.
t1.join()
t1.join()
#### Run a function on each pixel
from cvpubsubs.webcam_pub import VideoHandlerThread
from cvpubsubs.webcam_pub.callbacks import function_display_callback
img = np.zeros((50, 50, 1))
img[0:5, 0:5, :] = 1
def conway_game_of_life(array, coords, finished):
neighbors = np.sum(array[max(coords[0] - 1, 0):min(coords[0] + 2, 50),
max(coords[1] - 1, 0):min(coords[1] + 2, 50)])
neighbors = max(neighbors - np.sum(array[coords[0:2]]), 0.0)
if array[coords] == 1.0:
if neighbors < 2 or neighbors > 3:
array[coords] = 0.0
elif 2 <= neighbors <= 3:
array[coords] = 1.0
else:
if neighbors == 3:
array[coords] = 1.0
VideoHandlerThread(video_source=img, callbacks=function_display_callback(conway_game_of_life)).display()
## License
+63
View File
@@ -0,0 +1,63 @@
from cvpubsubs.window_sub.winctrl import WinCtrl
import numpy as np
if False:
from typing import Union
def global_cv_display_callback(frame, # type: np.ndarray
cam_id # type: Union[int, str]
):
from cvpubsubs.window_sub import SubscriberWindows
"""Default callback for sending frames to the global frame dictionary.
:param frame: The video or image frame
:type frame: np.ndarray
:param cam_id: The video or image source
:type cam_id: Union[int, str]
"""
SubscriberWindows.frame_dict[str(cam_id) + "frame"] = frame
class function_display_callback(object):
def __init__(self, display_function, finish_function=None):
"""Used for running arbitrary functions on pixels.
>>> import random
>>> from cvpubsubs.webcam_pub import VideoHandlerThread
>>> img = np.zeros((300, 300, 3))
>>> def fun(array, coords, finished):
... r,g,b = random.random()/20.0, random.random()/20.0, random.random()/20.0
... array[coords[0:2]] = (array[coords[0:2]] + [r,g,b])%1.0
>>> VideoHandlerThread(video_source=img, callbacks=function_display_callback(fun)).display()
:param display_function:
:param finish_function:
"""
self.looping = True
self.first_call = True
def _display_internal(self, frame, cam_id, *args, **kwargs):
finished = True
if self.first_call:
self.first_call = False
return
if self.looping:
it = np.nditer(frame, flags=['multi_index'])
while not it.finished:
x, y, c = it.multi_index
finished = display_function(frame, (x, y, c), finished, *args, **kwargs)
it.iternext()
if finished:
self.looping = False
if not callable(finish_function):
WinCtrl.quit()
else:
finished = finish_function(frame, Ellipsis, finished, *args, **kwargs)
if finished:
WinCtrl.quit()
self.inner_function = _display_internal
def __call__(self, *args, **kwargs):
return self.inner_function(self, *args, **kwargs)
+19 -18
View File
@@ -2,33 +2,24 @@ import threading
import numpy as np
from .pub_cam import pub_cam_thread
from cvpubsubs.webcam_pub.pub_cam import pub_cam_thread
from cvpubsubs.webcam_pub.camctrl import CamCtrl
if False:
from typing import Union, Tuple, Any, Callable, List
from typing import Union, Tuple, Any, Callable, List, Optional
from cvpubsubs.window_sub import SubscriberWindows
FrameCallable = Callable[[np.ndarray, int], Optional[np.ndarray]]
def global_cv_display_callback(frame, # type: np.ndarray
cam_id # type: Union[int, str]
):
"""Default callback for sending frames to the global frame dictionary.
:param frame: The video or image frame
:type frame: np.ndarray
:param cam_id: The video or image source
:type cam_id: Union[int, str]
"""
SubscriberWindows.frame_dict[str(cam_id) + "frame"] = frame
from cvpubsubs.webcam_pub.callbacks import global_cv_display_callback
display_callbacks = [global_cv_display_callback]
class VideoHandlerThread(threading.Thread):
"Thread for publishing frames from a video source."
def __init__(self, video_source=0, # type: Union[int, str]
callbacks=(global_cv_display_callback,), # type: List[Callable[[np.ndarray, int], Any]]
def __init__(self, video_source=0, # type: Union[int, str, np.ndarray]
callbacks=(global_cv_display_callback,), # type: Union[List[FrameCallable], FrameCallable]
request_size=(-1, -1), # type: Tuple[int, int]
high_speed=True, # type: bool
fps_limit=240 # type: float
@@ -55,7 +46,10 @@ class VideoHandlerThread(threading.Thread):
raise TypeError(
"Only strings or ints representing cameras, or numpy arrays representing pictures supported.")
self.video_source = video_source
self.callbacks = callbacks
if callable(callbacks):
self.callbacks = [callbacks]
else:
self.callbacks = callbacks
self.request_size = request_size
self.high_speed = high_speed
self.fps_limit = fps_limit
@@ -85,11 +79,18 @@ class VideoHandlerThread(threading.Thread):
def display(self,
callbacks=() # type: List[Callable[[List[np.ndarray]], Any]]
):
from cvpubsubs.window_sub import SubscriberWindows
"""Default display operation. For multiple video sources, please use something outside of this class.
:param callbacks: List of callbacks to be run on frames before displaying to the screen.
:type callbacks: List[Callable[[List[np.ndarray]], Any]]
"""
if global_cv_display_callback not in self.callbacks:
if isinstance(self.callbacks, tuple):
self.callbacks = self.callbacks + (global_cv_display_callback,)
else:
self.callbacks.append(global_cv_display_callback)
self.start()
SubscriberWindows(video_sources=[self.cam_id], callbacks=callbacks).loop()
self.join()
+1 -1
View File
@@ -4,7 +4,7 @@ import cv2
import numpy as np
from .winctrl import WinCtrl
from ..webcam_pub.camctrl import CamCtrl
from cvpubsubs.webcam_pub.camctrl import CamCtrl
from localpubsub import NoData
if False:
+22 -1
View File
@@ -55,7 +55,7 @@ class TestSubWin(ut.TestCase):
frame[:, :, 0] = 0
frame[:, :, 2] = 0
w.VideoHandlerThread(callbacks=[redden_frame_print_spam] + w.display_callbacks).display()
w.VideoHandlerThread(callbacks=redden_frame_print_spam).display()
def test_multi_cams_one_source(self):
def cam_handler(frame, cam_id):
@@ -103,3 +103,24 @@ class TestSubWin(ut.TestCase):
).loop()
v.join()
def test_conway_life(self):
from cvpubsubs.webcam_pub import VideoHandlerThread
from cvpubsubs.webcam_pub.callbacks import function_display_callback
img = np.zeros((50, 50, 1))
img[0:5, 0:5, :] = 1
def conway(array, coords, finished):
neighbors = np.sum(array[max(coords[0] - 1, 0):min(coords[0] + 2, 50),
max(coords[1] - 1, 0):min(coords[1] + 2, 50)])
neighbors = max(neighbors - np.sum(array[coords[0:2]]), 0.0)
if array[coords] == 1.0:
if neighbors < 2 or neighbors > 3:
array[coords] = 0.0
elif 2 <= neighbors <= 3:
array[coords] = 1.0
else:
if neighbors == 3:
array[coords] = 1.0
VideoHandlerThread(video_source=img, callbacks=function_display_callback(conway)).display()