Created
December 22, 2020 08:41
-
-
Save KevinLADLee/a4f239f166ddb5267fc6df7dc1761f66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import glob | |
| import os | |
| import sys | |
| import carla | |
| import argparse | |
| import logging | |
| import numpy as np | |
| import cv2 as cv | |
| import yaml | |
| import collections | |
| # 10 cm/pix | |
| # PIXELS_PER_METER = 10 | |
| PIXELS_PER_METER = 2 | |
| class CarlaGrid(object): | |
| def __init__(self, name, args, timeout): | |
| self.client = None | |
| self.name = name | |
| self.args = args | |
| self.timeout = timeout | |
| self.world = None | |
| self.town_map = None | |
| self._pixels_per_meter = PIXELS_PER_METER | |
| self._world_offset = (0, 0) | |
| self.waypoint_precision = 0.05 | |
| self.patches = [] | |
| self.max_x = 0 | |
| self.min_x = 0 | |
| self.max_y = 0 | |
| self.min_y = 0 | |
| self.height = 0 | |
| self.width = 0 | |
| def carla_transform_to_ros(self, location): | |
| location.y = -location.y | |
| return location | |
| def _get_data_from_carla(self): | |
| """Retrieves the data from the server side""" | |
| try: | |
| self.client = carla.Client(self.args.host, self.args.port) | |
| self.client.set_timeout(self.timeout) | |
| if self.args.map is None: | |
| world = self.client.get_world() | |
| else: | |
| world = self.client.load_world(self.args.map) | |
| print(self.args.map) | |
| town_map = world.get_map() | |
| return (world, town_map) | |
| except RuntimeError as ex: | |
| logging.error(ex) | |
| def start(self): | |
| self.world, self.town_map = self._get_data_from_carla() | |
| settings = self.world.get_settings() | |
| settings.no_rendering_mode = self.args.no_rendering | |
| self.world.apply_settings(settings) | |
| waypoints = self.town_map.generate_waypoints(2) | |
| margin = 50 | |
| self.max_x = max(waypoints, key=lambda x: x.transform.location.x).transform.location.x + margin | |
| self.max_y = max(waypoints, key=lambda x: x.transform.location.y).transform.location.y + margin | |
| self.min_x = min(waypoints, key=lambda x: x.transform.location.x).transform.location.x - margin | |
| self.min_y = min(waypoints, key=lambda x: x.transform.location.y).transform.location.y - margin | |
| self._world_offset = (self.min_x, self.min_y) | |
| topology = self.town_map.get_topology() | |
| self.draw_topology(topology) | |
| self.generate_yaml() | |
| def world_to_pixel(self, location_x, location_y): | |
| """Converts the world coordinates to pixel coordinates""" | |
| x = self._pixels_per_meter * (location_x - self._world_offset[0]) | |
| y = self._pixels_per_meter * (location_y - self._world_offset[1]) | |
| return [int(x), int(y)] | |
| def pixel_to_world(self, x, y): | |
| location_x = (x / self._pixels_per_meter) + self._world_offset[0] | |
| location_y = (y / self._pixels_per_meter) + self._world_offset[1] | |
| return [location_x, location_y] | |
| def lateral_shift(self, transform, shift): | |
| """Makes a lateral shift of the forward vector of a transform""" | |
| transform.rotation.yaw += 90 | |
| return transform.location + shift * transform.get_forward_vector() | |
| def draw_topology(self, carla_topology): | |
| """ Draws traffic signs and the roads network with sidewalks, parking and shoulders by generating waypoints""" | |
| topology = [x[0] for x in carla_topology] | |
| topology = sorted(topology, key=lambda w: w.transform.location.z) | |
| set_waypoints = [] | |
| for waypoint in topology: | |
| waypoints = [waypoint] | |
| # Generate waypoints of a road id. Stop when road id differs | |
| nxt = waypoint.next(self.waypoint_precision) | |
| if len(nxt) > 0: | |
| nxt = nxt[0] | |
| while nxt.road_id == waypoint.road_id: | |
| waypoints.append(nxt) | |
| nxt = nxt.next(self.waypoint_precision) | |
| if len(nxt) > 0: | |
| nxt = nxt[0] | |
| else: | |
| break | |
| set_waypoints.append(waypoints) | |
| for w in waypoints: | |
| # Classify lane types until there are no waypoints by going left | |
| l = w.get_left_lane() | |
| while l and l.lane_type != carla.LaneType.Driving: | |
| l = l.get_left_lane() | |
| # Classify lane types until there are no waypoints by going right | |
| r = w.get_right_lane() | |
| while r and r.lane_type != carla.LaneType.Driving: | |
| r = r.get_right_lane() | |
| # Draw Roads | |
| self.width = self._pixels_per_meter * int(self.max_x-self.min_x) | |
| self.height = self._pixels_per_meter * int(self.max_y-self.min_y) | |
| img = np.zeros((self.height, self.width,1),np.uint8) | |
| for waypoints in set_waypoints: | |
| # waypoint = waypoints[0] | |
| road_left_side = [self.lateral_shift(w.transform, -w.lane_width * 0.5) for w in waypoints] | |
| road_right_side = [self.lateral_shift(w.transform, w.lane_width * 0.5) for w in waypoints] | |
| polygon = road_left_side + [x for x in reversed(road_right_side)] | |
| polygon = [self.world_to_pixel(x.x, x.y) for x in polygon] | |
| if len(polygon) > 2: | |
| poly_array = np.array(polygon, np.int32) | |
| poly_array.reshape((-1,1,2)) | |
| cv.fillPoly(img, [poly_array], (255), lineType=cv.LINE_AA) | |
| # cv.polylines(img,[poly_array],True,(0,0,255)) | |
| # init = self.world_to_pixel(0,0) | |
| # cv.circle(img, (init[0],init[1]), 5, (100), -1) | |
| cv.imwrite("%s.png" % self.args.map, img) | |
| # gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY) | |
| # cv.imwrite("%s_gray.png" % self.args.map, gray) | |
| def generate_yaml(self): | |
| origin = self.pixel_to_world(0, self.height) | |
| print(origin) | |
| dict_file = { | |
| 'image' : '%s.png' % self.args.map, | |
| 'resolution' : 1.0 / PIXELS_PER_METER, | |
| 'origin' : [origin[0], -origin[1],0], | |
| 'negate': 0, | |
| 'occupied_thresh': 0.65, | |
| 'free_thresh': 0.2 | |
| } | |
| with open('%s.yaml' % self.args.map, 'w') as file: | |
| yaml.dump(dict_file, file) | |
| # For python yaml 5.0 and later | |
| # documents = yaml.dump(dict_file, file, sort_keys=False) | |
| def main(): | |
| argparser = argparse.ArgumentParser( | |
| description='CARLA Map to Grid') | |
| argparser.add_argument( | |
| '-v', '--verbose', | |
| action='store_true', | |
| dest='debug', | |
| help='print debug information') | |
| argparser.add_argument( | |
| '--host', | |
| metavar='H', | |
| default='127.0.0.1', | |
| help='IP of the host server (default: 127.0.0.1)') | |
| argparser.add_argument( | |
| '-p', '--port', | |
| metavar='P', | |
| default=2000, | |
| type=int, | |
| help='TCP port to listen to (default: 2000)') | |
| argparser.add_argument( | |
| '--map', | |
| metavar='TOWN', | |
| default=None, | |
| help='start a new episode at the given TOWN') | |
| argparser.add_argument( | |
| '--no-rendering', | |
| action='store_true', | |
| help='switch off server rendering') | |
| args = argparser.parse_args() | |
| args.description = argparser.description | |
| log_level = logging.DEBUG if args.debug else logging.INFO | |
| logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) | |
| logging.info('listening to server %s:%s', args.host, args.port) | |
| print(__doc__) | |
| world = CarlaGrid("CarlaGrid", args, timeout=2.0) | |
| world.start() | |
| if __name__ == '__main__': | |
| main() |
Author
KevinLADLee
commented
Dec 22, 2020
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment