Skip to content

Instantly share code, notes, and snippets.

@cvramanan
Created July 26, 2023 07:15
Show Gist options
  • Save cvramanan/1a51a7e84ec72215b3fa6c9733ec6a83 to your computer and use it in GitHub Desktop.
Save cvramanan/1a51a7e84ec72215b3fa6c9733ec6a83 to your computer and use it in GitHub Desktop.
# -----------------------------------------------------------------------------
# Copyright (c) 2022, Lucid Vision Labs, Inc.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# -----------------------------------------------------------------------------
import time
from arena_api.system import system
import time
from datetime import datetime
from arena_api.enums import PixelFormat
from arena_api.__future__.save import Writer
from arena_api.system import system
from arena_api.buffer import BufferFactory
import queue
import threading
TAB1 = " "
pixel_format = PixelFormat.BGR8
count = 0
'''
Trigger: Introduction
This example introduces basic trigger configuration and use. In order to
configure trigger, enable trigger mode and set the source and selector. The
trigger must be armed before it is prepared to execute. Once the trigger is
armed, execute the trigger and retrieve an image.
'''
TAB1 = " "
TAB2 = " "
def create_devices_with_tries():
'''
Waits for the user to connect a device before
raising an exception if it fails
'''
tries = 0
tries_max = 6
sleep_time_secs = 10
devices = None
while tries < tries_max: # Wait for device for 60 seconds
devices = system.create_device()
if not devices:
print(
f'Try {tries+1} of {tries_max}: waiting for {sleep_time_secs} '
f'secs for a device to be connected!')
for sec_count in range(sleep_time_secs):
time.sleep(1)
print(f'{sec_count + 1 } seconds passed ',
'.' * sec_count, end='\r')
tries += 1
else:
return devices
else:
raise Exception(f'No device found! Please connect a device and run '
f'the example again.')
def store_initial(nodemap):
'''
Stores intial node values, return their values at the end
'''
nodes = nodemap.get_node(['TriggerSelector', 'TriggerMode',
'TriggerSource','ExposureAuto','ExposureTime'])
trigger_selector_initial = nodes['TriggerSelector'].value
trigger_mode_initial = nodes['TriggerMode'].value
trigger_source_initial = nodes['TriggerSource'].value
nodes['ExposureAuto'].value = 'Off'
nodes['ExposureTime'].value = 1000.0
initial_vals = [trigger_selector_initial, trigger_mode_initial,
trigger_source_initial]
return nodes, initial_vals
def save(buffer):
'''
demonstrates saving an image
(1) converts image to a displayable pixel format
(2) prepares image parameters
(3) prepares image writer
(4) saves image
(5) destroys converted image
'''
'''
Convert image
Convert the image to a displayable pixel format. It is worth keeping in
mind the best pixel and file formats for your application. This example
converts the image so that it is displayable by the operating system.
'''
global count
converted = BufferFactory.convert(buffer, pixel_format)
print(f"{TAB1}Converted image to {pixel_format.name}")
'''
Prepare image writer
The image writer requires 2 parameters to save an image: the buffer and
specified file name or pattern. Default name for the image is
'image_<count>.jpg' where count is a pre-defined tag that gets updated
every time a buffer image.
'''
print(f'{TAB1}Prepare Image Writer')
writer = Writer()
count = count + 1
writer.pattern = 'images/image_'+str(count)+'.jpg'
# Save converted buffer
writer.save(converted)
print(f'{TAB1}Image saved')
# Destroy converted buffer to avoid memory leaks
BufferFactory.destroy(converted)
bufferQueuewe = queue.Queue(maxsize=100)
def save_with_thread():
'''
demonstrates saving an image
(1) converts image to a displayable pixel format
(2) prepares image parameters
(3) prepares image writer
(4) saves image
(5) destroys converted image
'''
'''
Convert image
Convert the image to a displayable pixel format. It is worth keeping in
mind the best pixel and file formats for your application. This example
converts the image so that it is displayable by the operating system.
'''
global count,bufferQueuewe
while True:
if bufferQueuewe.empty():
time.sleep(0.001)
continue
buffer = bufferQueuewe.get()
converted = BufferFactory.convert(buffer, pixel_format)
print(f"{TAB1}Converted image to {pixel_format.name}")
'''
Prepare image writer
The image writer requires 2 parameters to save an image: the buffer and
specified file name or pattern. Default name for the image is
'image_<count>.jpg' where count is a pre-defined tag that gets updated
every time a buffer image.
'''
print(f'{TAB1}Prepare Image Writer')
writer = Writer()
count = count + 1
writer.pattern = 'images/image_'+str(count)+'.jpg'
# Save converted buffer
writer.save(converted)
print(f'{TAB1}Image saved')
# Destroy converted buffer to avoid memory leaks
BufferFactory.destroy(converted)
def configure_trigger_acquire_image(device, nodemap, nodes, initial_vals):
'''
Trigger Armed
Continually checks until trigger is armed. Once the trigger is armed, it
is ready to be executed.
'''
sT = time.time()
print(f'{TAB2}Wait until trigger is armed')
trigger_armed = False
while trigger_armed is False:
trigger_armed = bool(nodemap['TriggerArmed'].value)
'''
Trigger an image
Trigger an image manually, since trigger mode is enabled. This triggers
the camera to acquire a single image. A buffer is then filled and moved
to the output queue, where it will wait to be retrieved.
'''
print(f'{TAB2}Trigger Image')
nodemap['TriggerSoftware'].execute()
'''
Get image
Once an image has been triggered, it can be retrieved. If no image has
been triggered, trying to retrieve an image will hang for the duration of
the timeout and then throw an exception.
'''
buffer = device.get_buffer()
print("Time taken for get_buffer: ", time.time() - sT)
# if bufferQueuewe.qsize() < 10:
# bufferQueuewe.put(buffer)
save(buffer)
print("Time taken for save: ", time.time() - sT)
# Print some info about the image in the buffer
print(f'{TAB2}Buffer received | ['
f'Width = {buffer.width} pxl, '
f'Height = {buffer.height} pxl]')
# Requeue buffer
print(f'{TAB2}Requeue Buffer')
device.requeue_buffer(buffer)
# Stop stream
# print(f'{TAB1}Stop stream')
# device.stop_stream()
# '''
# Return nodes to their initial values
# '''
# nodes['TriggerSelector'].value = initial_vals[0]
# nodes['TriggerMode'].value = initial_vals[1]
# nodes['TriggerSource'].value = initial_vals[2]
def example_entry_point():
global count
# Prepare example
devices = create_devices_with_tries()
device = devices[0]
nodemap = device.nodemap
nodes, initial_vals = store_initial(nodemap)
print("Example Started\n")
sT = time.time()
threading.Thread(target=save_with_thread).start()
'''
demonstrates basic trigger configuration and use
(1) sets trigger mode, source, and selector
(2) starts stream
(3) waits until trigger is armed
(4) triggers image
(5) gets image
(6) requeues buffer
(7) stops stream
'''
'''
Set trigger selector
Set the trigger selector to FrameStart. When triggered, the device will
start acquiring a single frame. This can also be set to AcquisitionStart
or FrameBurstStart.
'''
global count ,bufferQueuewe
print(f'{TAB1}Set trigger selector to FrameStart')
nodes['TriggerSelector'].value = 'FrameStart'
'''
Set trigger selector
Set the trigger selector to FrameStart. When triggered, the device will
start acquiring a single frame. This can also be set to AcquisitionStart
or FrameBurstStart.
'''
print(f'{TAB1}Enable trigger mode')
nodes['TriggerMode'].value = 'On'
'''
Set trigger mode
Enable trigger mode before setting the source and selector and before
starting the stream. Trigger mode cannot be turned on and off while the
device is streaming.
'''
print(f'{TAB1}Set trigger source to software')
nodes['TriggerSource'].value = 'Software'
'''
Setup stream values
'''
tl_stream_nodemap = device.tl_stream_nodemap
tl_stream_nodemap['StreamAutoNegotiatePacketSize'].value = True
tl_stream_nodemap['StreamPacketResendEnable'].value = True
'''
Start stream
When trigger mode is off and the acquisition mode is set to stream
continuously, starting the stream will have the camera begin acquiring a
steady stream of images. However, with trigger mode enabled, the device
will wait for the trigger before acquiring any.
'''
print(f'{TAB1}Start stream')
device.start_stream()
pT = time.time()
while True:
if time.time() - sT > 0.1:
sT = time.time()
configure_trigger_acquire_image(device, nodemap, nodes, initial_vals)
print(f'{TAB1}Time taken-------------------------------------------: {time.time() - pT}')
pT = time.time()
# Cleanup
# Destory Device
system.destroy_device(device)
print("\nExample Completed")
if __name__ == "__main__":
example_entry_point()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment