Created
July 26, 2023 07:15
-
-
Save cvramanan/1a51a7e84ec72215b3fa6c9733ec6a83 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ----------------------------------------------------------------------------- | |
| # Copyright (c) 2022, Lucid Vision Labs, Inc. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
| # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES | |
| # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
| # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS | |
| # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN | |
| # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
| # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
| # THE SOFTWARE. | |
| # ----------------------------------------------------------------------------- | |
| import time | |
| from arena_api.system import system | |
| import time | |
| from datetime import datetime | |
| from arena_api.enums import PixelFormat | |
| from arena_api.__future__.save import Writer | |
| from arena_api.system import system | |
| from arena_api.buffer import BufferFactory | |
| import queue | |
| import threading | |
| TAB1 = " " | |
| pixel_format = PixelFormat.BGR8 | |
| count = 0 | |
| ''' | |
| Trigger: Introduction | |
| This example introduces basic trigger configuration and use. In order to | |
| configure trigger, enable trigger mode and set the source and selector. The | |
| trigger must be armed before it is prepared to execute. Once the trigger is | |
| armed, execute the trigger and retrieve an image. | |
| ''' | |
| TAB1 = " " | |
| TAB2 = " " | |
| def create_devices_with_tries(): | |
| ''' | |
| Waits for the user to connect a device before | |
| raising an exception if it fails | |
| ''' | |
| tries = 0 | |
| tries_max = 6 | |
| sleep_time_secs = 10 | |
| devices = None | |
| while tries < tries_max: # Wait for device for 60 seconds | |
| devices = system.create_device() | |
| if not devices: | |
| print( | |
| f'Try {tries+1} of {tries_max}: waiting for {sleep_time_secs} ' | |
| f'secs for a device to be connected!') | |
| for sec_count in range(sleep_time_secs): | |
| time.sleep(1) | |
| print(f'{sec_count + 1 } seconds passed ', | |
| '.' * sec_count, end='\r') | |
| tries += 1 | |
| else: | |
| return devices | |
| else: | |
| raise Exception(f'No device found! Please connect a device and run ' | |
| f'the example again.') | |
| def store_initial(nodemap): | |
| ''' | |
| Stores intial node values, return their values at the end | |
| ''' | |
| nodes = nodemap.get_node(['TriggerSelector', 'TriggerMode', | |
| 'TriggerSource','ExposureAuto','ExposureTime']) | |
| trigger_selector_initial = nodes['TriggerSelector'].value | |
| trigger_mode_initial = nodes['TriggerMode'].value | |
| trigger_source_initial = nodes['TriggerSource'].value | |
| nodes['ExposureAuto'].value = 'Off' | |
| nodes['ExposureTime'].value = 1000.0 | |
| initial_vals = [trigger_selector_initial, trigger_mode_initial, | |
| trigger_source_initial] | |
| return nodes, initial_vals | |
| def save(buffer): | |
| ''' | |
| demonstrates saving an image | |
| (1) converts image to a displayable pixel format | |
| (2) prepares image parameters | |
| (3) prepares image writer | |
| (4) saves image | |
| (5) destroys converted image | |
| ''' | |
| ''' | |
| Convert image | |
| Convert the image to a displayable pixel format. It is worth keeping in | |
| mind the best pixel and file formats for your application. This example | |
| converts the image so that it is displayable by the operating system. | |
| ''' | |
| global count | |
| converted = BufferFactory.convert(buffer, pixel_format) | |
| print(f"{TAB1}Converted image to {pixel_format.name}") | |
| ''' | |
| Prepare image writer | |
| The image writer requires 2 parameters to save an image: the buffer and | |
| specified file name or pattern. Default name for the image is | |
| 'image_<count>.jpg' where count is a pre-defined tag that gets updated | |
| every time a buffer image. | |
| ''' | |
| print(f'{TAB1}Prepare Image Writer') | |
| writer = Writer() | |
| count = count + 1 | |
| writer.pattern = 'images/image_'+str(count)+'.jpg' | |
| # Save converted buffer | |
| writer.save(converted) | |
| print(f'{TAB1}Image saved') | |
| # Destroy converted buffer to avoid memory leaks | |
| BufferFactory.destroy(converted) | |
| bufferQueuewe = queue.Queue(maxsize=100) | |
| def save_with_thread(): | |
| ''' | |
| demonstrates saving an image | |
| (1) converts image to a displayable pixel format | |
| (2) prepares image parameters | |
| (3) prepares image writer | |
| (4) saves image | |
| (5) destroys converted image | |
| ''' | |
| ''' | |
| Convert image | |
| Convert the image to a displayable pixel format. It is worth keeping in | |
| mind the best pixel and file formats for your application. This example | |
| converts the image so that it is displayable by the operating system. | |
| ''' | |
| global count,bufferQueuewe | |
| while True: | |
| if bufferQueuewe.empty(): | |
| time.sleep(0.001) | |
| continue | |
| buffer = bufferQueuewe.get() | |
| converted = BufferFactory.convert(buffer, pixel_format) | |
| print(f"{TAB1}Converted image to {pixel_format.name}") | |
| ''' | |
| Prepare image writer | |
| The image writer requires 2 parameters to save an image: the buffer and | |
| specified file name or pattern. Default name for the image is | |
| 'image_<count>.jpg' where count is a pre-defined tag that gets updated | |
| every time a buffer image. | |
| ''' | |
| print(f'{TAB1}Prepare Image Writer') | |
| writer = Writer() | |
| count = count + 1 | |
| writer.pattern = 'images/image_'+str(count)+'.jpg' | |
| # Save converted buffer | |
| writer.save(converted) | |
| print(f'{TAB1}Image saved') | |
| # Destroy converted buffer to avoid memory leaks | |
| BufferFactory.destroy(converted) | |
| def configure_trigger_acquire_image(device, nodemap, nodes, initial_vals): | |
| ''' | |
| Trigger Armed | |
| Continually checks until trigger is armed. Once the trigger is armed, it | |
| is ready to be executed. | |
| ''' | |
| sT = time.time() | |
| print(f'{TAB2}Wait until trigger is armed') | |
| trigger_armed = False | |
| while trigger_armed is False: | |
| trigger_armed = bool(nodemap['TriggerArmed'].value) | |
| ''' | |
| Trigger an image | |
| Trigger an image manually, since trigger mode is enabled. This triggers | |
| the camera to acquire a single image. A buffer is then filled and moved | |
| to the output queue, where it will wait to be retrieved. | |
| ''' | |
| print(f'{TAB2}Trigger Image') | |
| nodemap['TriggerSoftware'].execute() | |
| ''' | |
| Get image | |
| Once an image has been triggered, it can be retrieved. If no image has | |
| been triggered, trying to retrieve an image will hang for the duration of | |
| the timeout and then throw an exception. | |
| ''' | |
| buffer = device.get_buffer() | |
| print("Time taken for get_buffer: ", time.time() - sT) | |
| # if bufferQueuewe.qsize() < 10: | |
| # bufferQueuewe.put(buffer) | |
| save(buffer) | |
| print("Time taken for save: ", time.time() - sT) | |
| # Print some info about the image in the buffer | |
| print(f'{TAB2}Buffer received | [' | |
| f'Width = {buffer.width} pxl, ' | |
| f'Height = {buffer.height} pxl]') | |
| # Requeue buffer | |
| print(f'{TAB2}Requeue Buffer') | |
| device.requeue_buffer(buffer) | |
| # Stop stream | |
| # print(f'{TAB1}Stop stream') | |
| # device.stop_stream() | |
| # ''' | |
| # Return nodes to their initial values | |
| # ''' | |
| # nodes['TriggerSelector'].value = initial_vals[0] | |
| # nodes['TriggerMode'].value = initial_vals[1] | |
| # nodes['TriggerSource'].value = initial_vals[2] | |
| def example_entry_point(): | |
| global count | |
| # Prepare example | |
| devices = create_devices_with_tries() | |
| device = devices[0] | |
| nodemap = device.nodemap | |
| nodes, initial_vals = store_initial(nodemap) | |
| print("Example Started\n") | |
| sT = time.time() | |
| threading.Thread(target=save_with_thread).start() | |
| ''' | |
| demonstrates basic trigger configuration and use | |
| (1) sets trigger mode, source, and selector | |
| (2) starts stream | |
| (3) waits until trigger is armed | |
| (4) triggers image | |
| (5) gets image | |
| (6) requeues buffer | |
| (7) stops stream | |
| ''' | |
| ''' | |
| Set trigger selector | |
| Set the trigger selector to FrameStart. When triggered, the device will | |
| start acquiring a single frame. This can also be set to AcquisitionStart | |
| or FrameBurstStart. | |
| ''' | |
| global count ,bufferQueuewe | |
| print(f'{TAB1}Set trigger selector to FrameStart') | |
| nodes['TriggerSelector'].value = 'FrameStart' | |
| ''' | |
| Set trigger selector | |
| Set the trigger selector to FrameStart. When triggered, the device will | |
| start acquiring a single frame. This can also be set to AcquisitionStart | |
| or FrameBurstStart. | |
| ''' | |
| print(f'{TAB1}Enable trigger mode') | |
| nodes['TriggerMode'].value = 'On' | |
| ''' | |
| Set trigger mode | |
| Enable trigger mode before setting the source and selector and before | |
| starting the stream. Trigger mode cannot be turned on and off while the | |
| device is streaming. | |
| ''' | |
| print(f'{TAB1}Set trigger source to software') | |
| nodes['TriggerSource'].value = 'Software' | |
| ''' | |
| Setup stream values | |
| ''' | |
| tl_stream_nodemap = device.tl_stream_nodemap | |
| tl_stream_nodemap['StreamAutoNegotiatePacketSize'].value = True | |
| tl_stream_nodemap['StreamPacketResendEnable'].value = True | |
| ''' | |
| Start stream | |
| When trigger mode is off and the acquisition mode is set to stream | |
| continuously, starting the stream will have the camera begin acquiring a | |
| steady stream of images. However, with trigger mode enabled, the device | |
| will wait for the trigger before acquiring any. | |
| ''' | |
| print(f'{TAB1}Start stream') | |
| device.start_stream() | |
| pT = time.time() | |
| while True: | |
| if time.time() - sT > 0.1: | |
| sT = time.time() | |
| configure_trigger_acquire_image(device, nodemap, nodes, initial_vals) | |
| print(f'{TAB1}Time taken-------------------------------------------: {time.time() - pT}') | |
| pT = time.time() | |
| # Cleanup | |
| # Destory Device | |
| system.destroy_device(device) | |
| print("\nExample Completed") | |
| if __name__ == "__main__": | |
| example_entry_point() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment