Added examples for newer API.

Former-commit-id: 7895f083379c6935d11e8ed164ee9a9eae75bc7e
This commit is contained in:
simleek
2019-10-03 00:17:38 -07:00
parent 295ffd4c43
commit 31eb98c925
11 changed files with 264 additions and 237 deletions

197
README.md
View File

@ -1,132 +1,91 @@
# displayarray # displayarray
A threaded PubSub OpenCV interfaceREADME.md. Webcam and video feeds to multiple windows is supported. ## Display arrays, and any updates to those arrays
![](https://i.imgur.com/UEt6iR6.gif)
from displayarray import display
import numpy as np
arr = np.random.normal(0.5, 0.1, (100, 100, 3))
with display(arr) as d:
while d:
arr[:] += np.random.normal(0.001, 0.0005, (100, 100, 3))
arr %= 1.0
## Get webcams and videos at 60fps, and run functions on the data:
[![](https://thumbs.gfycat.com/AbsoluteEarnestEelelephant-size_restricted.gif)](https://gfycat.com/absoluteearnesteelelephant)
from displayarray import display
import math as m
def forest_color(arr):
forest_color.i += 1
arr[..., 0] = (m.sin(forest_color.i*(2*m.pi)*4/360)*255 + arr[..., 0]) % 255
arr[..., 1] = (m.sin((forest_color.i * (2 * m.pi) * 5 + 45) / 360) * 255 + arr[..., 1]) % 255
arr[..., 2] = (m.cos(forest_color.i*(2*m.pi)*3/360)*255 + arr[..., 2]) % 255
forest_color.i = 0
display("fractal test.mp4", callbacks=forest_color, blocking=True, fps_limit=120)
## Display tensors as they're running in tensorflow, like this denoising autoencoder:
![](https://i.imgur.com/TejCpIP.png)
# see test_display_tensorflow in test_simple_apy for full code.
...
autoencoder.compile(loss="mse", optimizer="adam")
while displayer:
grab = tf.convert_to_tensor(
displayer.FRAME_DICT["fractal test.mp4frame"][np.newaxis, ...].astype(np.float32)
/ 255.0
)
grab_noise = tf.convert_to_tensor(
(((displayer.FRAME_DICT["fractal test.mp4frame"][np.newaxis, ...].astype(
np.float32) + np.random.uniform(0, 255, grab.shape)) / 2) % 255)
/ 255.0
)
displayer.update((grab_noise.numpy()[0] * 255.0).astype(np.uint8), "uid for grab noise")
autoencoder.fit(grab_noise, grab, steps_per_epoch=1, epochs=1)
output_image = autoencoder.predict(grab, steps=1)
displayer.update((output_image[0] * 255.0).astype(np.uint8), "uid for autoencoder output")
## Handle input
Mouse events captured whenever the mouse moves over the window:
event:0
x,y:133,387
flags:0
param:None
Code:
from displayarray.input import mouse_loop
from displayarray import display
@mouse_loop
def print_mouse_thread(mouse_event):
print(mouse_event)
display("fractal test.mp4", blocking=True)
## Installation ## Installation
displayarray is distributed on `PyPI <https://pypi.org>`_ as a universal displayarray is distributed on [PyPI] (https://pypi.org) as a universal
wheel and is available on Linux/macOS and Windows and supports wheel in Python 3.6+ and PyPy.
Python 2.7/3.5+ and PyPy.
$ pip install displayarray $ pip install displayarray
## Usage ## Usage
### Video Editing and Publishing See tests for more example code. API will be generated soon.
#### Display your webcam
import displayarray.webcam_pub as w
w.VideoHandlerThread().display()
#### Change Display Arguments
import displayarray.webcam_pub as w
video_thread = w.VideoHandlerThread(video_source=0,
callbacks = w.display_callbacks,
request_size=(800, 600),
high_speed = False,
fps_limit = 8
)
#### handle mouse input
import displayarray.webcam_pub as w
from displayarray.input import mouse_loop
@mouse_loop
def print_mouse(mouse_event):
print(mouse_event)
w.VideoHandlerThread().display()
#### take in key input
import displayarray.webcam_pub as w
from displayarray.input import key_loop
@key_loop
def print_key_thread(key_chr):
print("key pressed: " + str(key_chr))
w.VideoHandlerThread().display()
#### Run your own functions on the frames
import displayarray.webcam_pub as w
def redden_frame_print_spam(frame, cam_id):
frame[:, :, 0] = 0
frame[:, :, 1] = 0
print("Spam!")
w.VideoHandlerThread(callbacks=[redden_frame_print_spam] + w.display_callbacks).display()
#### Display a tensor
def tensor_from_image(frame, cam_id):
ten = tensor_from_pytorch_or_tensorflow(frame)
return ten
t = wp.VideoHandlerThread(video_source=cam, callbacks=[tensor_from_image] + wp.display_callbacks)
t.display()
#### Display multiple windows from one source
import displayarray.webcam_pub as w
from displayarray.window_sub import SubscriberWindows
def cam_handler(frame, cam_id):
SubscriberWindows.set_global_frame_dict(cam_id, frame, frame)
t = w.VideoHandlerThread(0, [cam_handler],
request_size=(1280, 720),
high_speed=True,
fps_limit=240
)
t.start()
SubscriberWindows(window_names=['cammy', 'cammy2'],
video_sources=[str(0)]
).loop()
t.join()
#### Display multiple windows from multiple sources
iport displayarray.webcam_pub as w
from displayarray.window_sub import SubscriberWindows
t1 = w.VideoHandlerThread(0)
t2 = w.VideoHandlerThread(1)
t1.start()
t2.start()
SubscriberWindows(window_names=['cammy', 'cammy2'],
video_sources=[0,1]
).loop()
t1.join()
t1.join()
#### Run a function on each pixel
from displayarray.webcam_pub import VideoHandlerThread
from displayarray.webcam_pub.callbacks import function_display_callback
img = np.zeros((50, 50, 1))
img[0:5, 0:5, :] = 1
def conway_game_of_life(array, coords, finished):
neighbors = np.sum(array[max(coords[0] - 1, 0):min(coords[0] + 2, 50),
max(coords[1] - 1, 0):min(coords[1] + 2, 50)])
neighbors = max(neighbors - np.sum(array[coords[0:2]]), 0.0)
if array[coords] == 1.0:
if neighbors < 2 or neighbors > 3:
array[coords] = 0.0
elif 2 <= neighbors <= 3:
array[coords] = 1.0
else:
if neighbors == 3:
array[coords] = 1.0
VideoHandlerThread(video_source=img, callbacks=function_display_callback(conway_game_of_life)).display()
## License ## License

View File

@ -19,7 +19,7 @@ class VideoHandlerThread(threading.Thread):
self, self,
video_source: Union[int, str, np.ndarray] = 0, video_source: Union[int, str, np.ndarray] = 0,
callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None, callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None,
request_size: Tuple[int, int] = (99999, 99999), request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True, high_speed: bool = True,
fps_limit: float = 240, fps_limit: float = 240,
): ):

View File

@ -42,8 +42,10 @@ def register_cam(cam_id):
def stop_cam(cam_id: Union[int, str]): def stop_cam(cam_id: Union[int, str]):
"""Tell camera "cam_id" to end it's main loop.""" """Tell camera "cam_id" to end it's main loop."""
CV_CAMS_DICT[str(cam_id)].cmd_pub.publish("quit", blocking=True) if str(cam_id) in CV_CAMS_DICT:
CV_CAM_HANDLERS_DICT[str(cam_id)].cmd_pub.publish("quit", blocking=True) CV_CAMS_DICT[str(cam_id)].cmd_pub.publish("quit", blocking=True)
if str(cam_id) in CV_CAM_HANDLERS_DICT:
CV_CAM_HANDLERS_DICT[str(cam_id)].cmd_pub.publish("quit", blocking=True)
def cam_cmd_sub(cam_id, blocking=True): def cam_cmd_sub(cam_id, blocking=True):

View File

@ -13,6 +13,20 @@ from displayarray.frame_publising.frame_update_thread import FrameCallable
from displayarray.frame_publising.frame_update_thread import VideoHandlerThread from displayarray.frame_publising.frame_update_thread import VideoHandlerThread
from displayarray.input import MouseEvent from displayarray.input import MouseEvent
from displayarray.subscriber_window import window_commands from displayarray.subscriber_window import window_commands
import weakref
class WeakMethod(weakref.WeakMethod):
"""Pass any method to OpenCV without it keeping a reference forever."""
def __call__(self, *args, **kwargs):
"""Call the actual method this object was made with."""
obj = super().__call__()
func = self._func_ref()
if obj is None or func is None:
return None
meth = self._meth_type(func, obj)
meth(*args, **kwargs)
class SubscriberWindows(object): class SubscriberWindows(object):
@ -22,10 +36,10 @@ class SubscriberWindows(object):
ESC_KEY_CODES = [27] # ESC key on most keyboards ESC_KEY_CODES = [27] # ESC key on most keyboards
def __init__( def __init__(
self, self,
window_names: Iterable[str] = ("displayarray",), window_names: Iterable[str] = ("displayarray",),
video_sources: Iterable[Union[str, int]] = (0,), video_sources: Iterable[Union[str, int]] = (0,),
callbacks: Optional[List[Callable[[np.ndarray], Any]]] = None, callbacks: Optional[List[Callable[[np.ndarray], Any]]] = None,
): ):
self.source_names: List[Union[str, int]] = [] self.source_names: List[Union[str, int]] = []
self.close_threads: Optional[List[Thread]] = None self.close_threads: Optional[List[Thread]] = None
@ -33,6 +47,7 @@ class SubscriberWindows(object):
self.input_vid_global_names: List[str] = [] self.input_vid_global_names: List[str] = []
self.window_names: List[str] = [] self.window_names: List[str] = []
self.input_cams: List[str] = [] self.input_cams: List[str] = []
self.exited = False
if callbacks is None: if callbacks is None:
callbacks = [] callbacks = []
@ -42,6 +57,12 @@ class SubscriberWindows(object):
for name in window_names: for name in window_names:
self.add_window(name) self.add_window(name)
self.update()
def __bool__(self):
self.update()
return not self.exited
def add_source(self, name): def add_source(self, name):
"""Add another source for this class to display.""" """Add another source for this class to display."""
uid = uid_for_source(name) uid = uid_for_source(name)
@ -53,7 +74,11 @@ class SubscriberWindows(object):
"""Add another window for this class to display sources with. The name will be the title.""" """Add another window for this class to display sources with. The name will be the title."""
self.window_names.append(name) self.window_names.append(name)
cv2.namedWindow(name + " (press ESC to quit)") cv2.namedWindow(name + " (press ESC to quit)")
cv2.setMouseCallback(name + " (press ESC to quit)", self.handle_mouse) m = WeakMethod(self.handle_mouse)
cv2.setMouseCallback(name + " (press ESC to quit)", m)
def del_window(self, name):
cv2.setMouseCallback(name + " (press ESC to quit)", lambda *args: None)
def add_callback(self, callback): def add_callback(self, callback):
"""Add a callback for this class to apply to videos.""" """Add a callback for this class to apply to videos."""
@ -64,12 +89,13 @@ class SubscriberWindows(object):
subscriber_dictionary.stop_cam(c) subscriber_dictionary.stop_cam(c)
def handle_keys( def handle_keys(
self, key_input # type: int self, key_input # type: int
): ):
"""Capture key input for the escape function and passing to key control subscriber threads.""" """Capture key input for the escape function and passing to key control subscriber threads."""
if key_input in self.ESC_KEY_CODES: if key_input in self.ESC_KEY_CODES:
for name in self.window_names: for name in self.window_names:
cv2.destroyWindow(name + " (press ESC to quit)") cv2.destroyWindow(name + " (press ESC to quit)")
self.exited = True
window_commands.quit() window_commands.quit()
self.__stop_all_cams() self.__stop_all_cams()
return "quit" return "quit"
@ -96,9 +122,9 @@ class SubscriberWindows(object):
for f in range(len(frames)): for f in range(len(frames)):
# detect nested: # detect nested:
if ( if (
isinstance(frames[f], (list, tuple)) isinstance(frames[f], (list, tuple))
or frames[f].dtype.num == 17 or frames[f].dtype.num == 17
or len(frames[f].shape) > 3 or len(frames[f].shape) > 3
): ):
win_num = self._display_frames(frames[f], win_num, ids) win_num = self._display_frames(frames[f], win_num, ids)
else: else:
@ -115,11 +141,11 @@ class SubscriberWindows(object):
win_num = 0 win_num = 0
for i in range(len(self.input_vid_global_names)): for i in range(len(self.input_vid_global_names)):
if self.input_vid_global_names[i] in self.FRAME_DICT and not isinstance( if self.input_vid_global_names[i] in self.FRAME_DICT and not isinstance(
self.FRAME_DICT[self.input_vid_global_names[i]], NoData self.FRAME_DICT[self.input_vid_global_names[i]], NoData
): ):
if ( if (
len(self.callbacks) > 0 len(self.callbacks) > 0
and self.callbacks[i % len(self.callbacks)] is not None and self.callbacks[i % len(self.callbacks)] is not None
): ):
self.frames = self.callbacks[i % len(self.callbacks)]( self.frames = self.callbacks[i % len(self.callbacks)](
self.FRAME_DICT[self.input_vid_global_names[i]] self.FRAME_DICT[self.input_vid_global_names[i]]
@ -158,9 +184,19 @@ class SubscriberWindows(object):
for t in self.close_threads: for t in self.close_threads:
t.join() t.join()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
self.end() self.end()
def __del__(self):
self.end()
def __delete__(self, instance):
del self.handle_mouse
self.end()
def loop(self): def loop(self):
"""Continually update window frame. OpenCV only allows this in the main thread.""" """Continually update window frame. OpenCV only allows this in the main thread."""
sub_cmd = window_commands.win_cmd_sub() sub_cmd = window_commands.win_cmd_sub()
@ -174,7 +210,8 @@ class SubscriberWindows(object):
def _get_video_callback_dict_threads( def _get_video_callback_dict_threads(
*vids, callbacks: Optional[Dict[Any, FrameCallable]] = None *vids, callbacks: Optional[Dict[Any, FrameCallable]] = None,
fps=240, size=(-1, -1)
): ):
assert callbacks is not None assert callbacks is not None
vid_threads = [] vid_threads = []
@ -185,39 +222,42 @@ def _get_video_callback_dict_threads(
v_callbacks.append(callbacks[v_name]) v_callbacks.append(callbacks[v_name])
if v in callbacks: if v in callbacks:
v_callbacks.append(callbacks[v]) v_callbacks.append(callbacks[v])
vid_threads.append(VideoHandlerThread(v, callbacks=v_callbacks)) vid_threads.append(VideoHandlerThread(v, callbacks=v_callbacks, fps_limit=fps, request_size=size))
return vid_threads return vid_threads
def _get_video_threads( def _get_video_threads(
*vids, *vids,
callbacks: Optional[ callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable] Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None ] = None,
fps=240, size=(-1, -1)
): ):
vid_threads: List[Thread] = [] vid_threads: List[Thread] = []
if isinstance(callbacks, Dict): if isinstance(callbacks, Dict):
vid_threads = _get_video_callback_dict_threads(*vids, callbacks=callbacks) vid_threads = _get_video_callback_dict_threads(*vids, callbacks=callbacks, fps=fps, size=size)
elif isinstance(callbacks, List): elif isinstance(callbacks, List):
for v in vids: for v in vids:
vid_threads.append(VideoHandlerThread(v, callbacks=callbacks)) vid_threads.append(VideoHandlerThread(v, callbacks=callbacks, fps_limit=fps, request_size=size))
elif callable(callbacks): elif callable(callbacks):
for v in vids: for v in vids:
vid_threads.append(VideoHandlerThread(v, callbacks=[callbacks])) vid_threads.append(VideoHandlerThread(v, callbacks=[callbacks], fps_limit=fps, request_size=size))
else: else:
for v in vids: for v in vids:
if v is not None: if v is not None:
vid_threads.append(VideoHandlerThread(v)) vid_threads.append(VideoHandlerThread(v, fps_limit=fps, request_size=size))
return vid_threads return vid_threads
def display( def display(
*vids, *vids,
callbacks: Optional[ callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable] Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None, ] = None,
window_names=None, window_names=None,
blocking=False blocking=False,
fps_limit=240,
size=(-1, -1)
): ):
""" """
Display all the arrays, cameras, and videos passed in. Display all the arrays, cameras, and videos passed in.
@ -226,20 +266,16 @@ def display(
data before displaying. data before displaying.
Window names end up becoming the title of the windows Window names end up becoming the title of the windows
""" """
vid_threads = _get_video_threads(*vids, callbacks=callbacks) vid_threads = _get_video_threads(*vids, callbacks=callbacks, fps=fps_limit, size=size)
for v in vid_threads: for v in vid_threads:
v.start() v.start()
if window_names is None: if window_names is None:
window_names = ["window {}".format(i) for i in range(len(vids))] window_names = ["window {}".format(i) for i in range(len(vids))]
if blocking: if blocking:
SubscriberWindows(window_names=window_names, video_sources=vids).loop() SubscriberWindows(window_names=window_names, video_sources=vids).loop()
for v in vid_threads: for vt in vid_threads:
v.join() vt.join()
else: else:
s = SubscriberWindows(window_names=window_names, video_sources=vids) s = SubscriberWindows(window_names=window_names, video_sources=vids)
s.close_threads = vid_threads s.close_threads = vid_threads
v_names = [] return s
for v in vids:
v_name = uid_for_source(v)
v_names.append(v_name)
return s, v_names

1
examples/__init__.py Normal file
View File

@ -0,0 +1 @@
# Fractal test is from: https://www.youtube.com/watch?v=WgXQ59rg0GM

View File

@ -0,0 +1 @@
12bab3f6f28175747f7fb95d70619fe672fe3555

View File

@ -0,0 +1,97 @@
import unittest as ut
class TestSubWin(ut.TestCase):
def test_display_numpy(self):
from displayarray import display
import numpy as np
display(np.random.normal(0.5, 0.1, (500, 500, 3)))
def test_display_numpy_callback(self):
from displayarray import display
import numpy as np
arr = np.random.normal(0.5, 0.1, (500, 500, 3))
def fix_arr_cv(arr_in):
arr_in[:] += np.random.normal(0.01, 0.005, (500, 500, 3))
arr_in %= 1.0
display(arr, callbacks=fix_arr_cv, blocking=True)
def test_display_numpy_loop(self):
from displayarray import display
import numpy as np
arr = np.random.normal(0.5, 0.1, (100, 100, 3))
with display(arr) as displayer:
while displayer:
arr[:] += np.random.normal(0.001, 0.0005, (100, 100, 3))
arr %= 1.0
def test_display_camera(self):
from displayarray import display
import numpy as np
def black_and_white(arr):
return (np.sum(arr, axis=-1) / 3).astype(np.uint8)
display(0, callbacks=black_and_white, blocking=True)
def test_display_video(self):
from displayarray import display
import math as m
def forest_color(arr):
forest_color.i += 1
arr[..., 0] = (m.sin(forest_color.i * (2 * m.pi) * 4 / 360) * 255 + arr[..., 0]) % 255
arr[..., 1] = (m.sin((forest_color.i * (2 * m.pi) * 5 + 45) / 360) * 255 + arr[..., 1]) % 255
arr[..., 2] = (m.cos(forest_color.i * (2 * m.pi) * 3 / 360) * 255 + arr[..., 2]) % 255
forest_color.i = 0
display("fractal test.mp4", callbacks=forest_color, blocking=True, fps_limit=120)
def test_display_tensorflow(self):
from displayarray import display
import numpy as np
from tensorflow.keras import layers, models
import tensorflow as tf
for gpu in tf.config.experimental.list_physical_devices("GPU"):
tf.compat.v2.config.experimental.set_memory_growth(gpu, True)
displayer = display("fractal test.mp4")
displayer.wait_for_init()
autoencoder = models.Sequential()
autoencoder.add(
layers.Conv2D(
20, (3, 3), activation="sigmoid", input_shape=displayer.frames[0].shape
)
)
autoencoder.add(
layers.Conv2D(
20, (3, 3), activation="sigmoid", input_shape=displayer.frames[0].shape
)
)
autoencoder.add(layers.Conv2DTranspose(3, (3, 3), activation="sigmoid"))
autoencoder.add(layers.Conv2DTranspose(3, (3, 3), activation="sigmoid"))
autoencoder.compile(loss="mse", optimizer="adam")
while displayer:
grab = tf.convert_to_tensor(
displayer.FRAME_DICT["fractal test.mp4frame"][np.newaxis, ...].astype(np.float32)
/ 255.0
)
grab_noise = tf.convert_to_tensor(
(((displayer.FRAME_DICT["fractal test.mp4frame"][np.newaxis, ...].astype(
np.float32) + np.random.uniform(0, 255, grab.shape)) / 2) % 255)
/ 255.0
)
displayer.update((grab_noise.numpy()[0] * 255.0).astype(np.uint8), "uid for grab noise")
autoencoder.fit(grab_noise, grab, steps_per_epoch=1, epochs=1)
output_image = autoencoder.predict(grab, steps=1)
displayer.update((output_image[0] * 255.0).astype(np.uint8), "uid for autoencoder output")

View File

@ -14,7 +14,7 @@ class TestSubWin(ut.TestCase):
def print_mouse_thread(mouse_event): def print_mouse_thread(mouse_event):
print(mouse_event) print(mouse_event)
w.VideoHandlerThread().display() display("fractal test.mp4", blocking=True)
def test_key_loop(self): def test_key_loop(self):
@key_loop @key_loop
@ -99,8 +99,8 @@ class TestSubWin(ut.TestCase):
def conway(array, coords, finished): def conway(array, coords, finished):
neighbors = np.sum( neighbors = np.sum(
array[ array[
max(coords[0] - 1, 0) : min(coords[0] + 2, 50), max(coords[0] - 1, 0): min(coords[0] + 2, 50),
max(coords[1] - 1, 0) : min(coords[1] + 2, 50), max(coords[1] - 1, 0): min(coords[1] + 2, 50),
] ]
) )
neighbors = max(neighbors - np.sum(array[coords[0:2]]), 0.0) neighbors = max(neighbors - np.sum(array[coords[0:2]]), 0.0)
@ -115,20 +115,20 @@ class TestSubWin(ut.TestCase):
@mouse_loop @mouse_loop
def conway_add( def conway_add(
mouse_event # type:MouseEvent mouse_event # type:MouseEvent
): ):
if 0 <= mouse_event.x < 50 and 0 <= mouse_event.y < 50: if 0 <= mouse_event.x < 50 and 0 <= mouse_event.y < 50:
if mouse_event.flags == cv2.EVENT_FLAG_LBUTTON: if mouse_event.flags == cv2.EVENT_FLAG_LBUTTON:
img[ img[
mouse_event.y - 5 : mouse_event.y + 10, mouse_event.y - 5: mouse_event.y + 10,
mouse_event.x - 5 : mouse_event.x + 10, mouse_event.x - 5: mouse_event.x + 10,
:, :,
] = 0.0 ] = 0.0
elif mouse_event.flags == cv2.EVENT_FLAG_RBUTTON: elif mouse_event.flags == cv2.EVENT_FLAG_RBUTTON:
img[ img[
mouse_event.y - 5 : mouse_event.y + 10, mouse_event.y - 5: mouse_event.y + 10,
mouse_event.x - 5 : mouse_event.x + 10, mouse_event.x - 5: mouse_event.x + 10,
:, :,
] = 1.0 ] = 1.0
VideoHandlerThread( VideoHandlerThread(

View File

View File

@ -1,69 +0,0 @@
import unittest as ut
class TestSubWin(ut.TestCase):
def test_display_numpy(self):
from displayarray import display
import numpy as np
s, vids = display(np.random.normal(0.5, 0.1, (500, 500, 3)))
s.end()
print("ended")
def test_display_numpy_callback(self):
from displayarray import display
import numpy as np
arr = np.random.normal(0.5, 0.1, (500, 500, 3))
def fix_arr_cv(arr_in):
arr_in[:] += np.random.normal(0.01, 0.005, (500, 500, 3))
arr_in %= 1.0
display(arr, callbacks=fix_arr_cv, blocking=True)
def test_display_numpy_loop(self):
from displayarray import display
import numpy as np
arr = np.random.normal(0.5, 0.1, (500, 500, 3))
displayer, ids = display(arr, blocking=False)
while True:
arr[:] += np.random.normal(0.01, 0.005, (500, 500, 3))
arr %= 1.0
displayer.update(arr, ids[0])
displayer.end()
def test_display_tensorflow(self):
from displayarray import display
import numpy as np
from tensorflow.keras import layers, models
import tensorflow as tf
for gpu in tf.config.experimental.list_physical_devices("GPU"):
tf.compat.v2.config.experimental.set_memory_growth(gpu, True)
displayer, ids = display(0, blocking=False)
displayer.wait_for_init()
autoencoder = models.Sequential()
autoencoder.add(
layers.Conv2D(
20, (3, 3), activation="sigmoid", input_shape=displayer.frames[0].shape
)
)
autoencoder.add(layers.Conv2DTranspose(3, (3, 3), activation="sigmoid"))
autoencoder.compile(loss="mse", optimizer="adam")
while True:
grab = tf.convert_to_tensor(
displayer.FRAME_DICT["0frame"][np.newaxis, ...].astype(np.float32)
/ 255.0
)
autoencoder.fit(grab, grab, steps_per_epoch=1, epochs=1)
output_image = autoencoder.predict(grab, steps=1)
displayer.update(
(output_image[0] * 255.0).astype(np.uint8), "uid for autoencoder output"
)