I tried to make get_bs_geometry.py swarm, but it fails

Topics related to the Lighthouse positioning system, configuration and use
Post Reply
cgpenguin
Member
Posts: 35
Joined: Wed Aug 05, 2020 3:49 pm

I tried to make get_bs_geometry.py swarm, but it fails

Post by cgpenguin »

Code: Select all

import argparse
import logging
import math
import time
import numpy as np
import cv2 as cv

import cflib.crtp
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.swarm import CachedCfFactory
from cflib.crazyflie.swarm import Swarm
from cflib.crazyflie.syncLogger import SyncLogger
from cflib.crazyflie.mem import LighthouseBsGeometry
from cflib.crazyflie.mem import MemoryElement

# TODO get this list from a configuration file (maybe CSV?)
URI1 = 'radio://0/70/2M/E7E7E7E701'
URI2 = 'radio://0/70/2M/E7E7E7E702'
URI3 = 'radio://0/70/2M/E7E7E7E703'

uris = {
    URI1,
    URI2,
    URI3,
}

def wait_for_param_download(scf):
    while not scf.cf.param.is_updated:
        time.sleep(1.0)
    print('Parameters downloaded for', scf.cf.link_uri)

def read_sensors(scf):
    logs = {
        'lighthouse.angle0x':   [0, 0, 0],
        'lighthouse.angle0y':   [0, 0, 1],
        'lighthouse.angle0x_1': [0, 1, 0],
        'lighthouse.angle0y_1': [0, 1, 1],
        'lighthouse.angle0x_2': [0, 2, 0],
        'lighthouse.angle0y_2': [0, 2, 1],
        'lighthouse.angle0x_3': [0, 3, 0],
        'lighthouse.angle0y_3': [0, 3, 1],
        'lighthouse.angle1x':   [1, 0, 0],
        'lighthouse.angle1y':   [1, 0, 1],
        'lighthouse.angle1x_1': [1, 1, 0],
        'lighthouse.angle1y_1': [1, 1, 1],
        'lighthouse.angle1x_2': [1, 2, 0],
        'lighthouse.angle1y_2': [1, 2, 1],
        'lighthouse.angle1x_3': [1, 3, 0],
        'lighthouse.angle1y_3': [1, 3, 1],
    }

    storage = {}

    confs = [
        LogConfig(name='lh1', period_in_ms=10),
        LogConfig(name='lh2', period_in_ms=10),
        LogConfig(name='lh3', period_in_ms=10),
    ]

    # Set up log blocks
    count = 0
    floats_per_block = 6
    for name in logs.keys():
        confs[int(count / floats_per_block)].add_variable(name, 'float')
        count += 1

    # Read log data
    with SyncLogger(scf, confs) as logger:
        end_time = time.time() + 1.0
        for log_entry in logger:
            data = log_entry[1]
            for log_name in data:
                if not log_name in storage:
                    storage[log_name] = []
                storage[log_name].append(data[log_name])
            if time.time() > end_time:
                break

    # Average sensor data
    sensor_sweeps = np.zeros([2, 4, 2])
    for log_name, data in storage.items():
        sensor_info = logs[log_name]
        sensor_sweeps[sensor_info[0]][sensor_info[1]][sensor_info[2]] = np.average(data)

    return sensor_sweeps

# Create a hash from a vector with sensor ids
def hash_sensor_order(order):
    hash = 0
    for i in range(4):
        hash += order[i] * 4 ** i
    return hash

def estimate_yaw_to_base_station(sensor_sweeps):
    direction = {
        hash_sensor_order([2, 0, 1, 3]): math.radians(0),
        hash_sensor_order([2, 0, 3, 1]): math.radians(25),
        hash_sensor_order([2, 3, 0, 1]): math.radians(65),
        hash_sensor_order([3, 2, 0, 1]): math.radians(90),
        hash_sensor_order([3, 2, 1, 0]): math.radians(115),
        hash_sensor_order([3, 1, 2, 0]): math.radians(155),
        hash_sensor_order([1, 3, 2, 0]): math.radians(180),
        hash_sensor_order([1, 3, 0, 2]): math.radians(205),
        hash_sensor_order([1, 0, 3, 2]): math.radians(245),
        hash_sensor_order([0, 1, 3, 2]): math.radians(270),
        hash_sensor_order([0, 1, 2, 3]): math.radians(295),
        hash_sensor_order([0, 2, 1, 3]): math.radians(335),
    }

    # Assume bs is faicing slighly downwards and fairly horizontal
    # Sort sensors in the order they are hit by the horizontal sweep
    # and use the order to figure out roughly the direction to the
    # base station
    sweeps_x = {0: sensor_sweeps[0][0], 1: sensor_sweeps[1][0], 2: sensor_sweeps[2][0], 3: sensor_sweeps[3][0]}
    ordered_map = {k: v for k, v in sorted(sweeps_x.items(), key=lambda item: item[1])}
    sensor_order = list(ordered_map.keys())

    # The base station is roughly in this direction, in CF (world) coordinates
    return direction[hash_sensor_order(sensor_order)]


def generate_initial_estimate(bs_direction):
    # Base station height
    bs_h = 2
    # Distance to base station along the floor
    bs_fd = 3
    # Distance to base station
    bs_dist = math.sqrt(bs_h ** 2 + bs_fd ** 2)
    elevation = math.atan2(bs_h, bs_fd)

    # Initial position of the CF in camera coordinate system, open cv style
    tvec_start = np.array([0, 0, bs_dist])

    # Calculate rotation matrix
    d_c = math.cos(-bs_direction + math.pi)
    d_s = math.sin(-bs_direction + math.pi)
    R_rot_y = np.array([
        [d_c, 0.0, d_s],
        [0.0, 1.0, 0.0],
        [-d_s, 0.0, d_c],
    ])

    e_c = math.cos(elevation)
    e_s = math.sin(elevation)
    R_rot_x = np.array([
        [1.0, 0.0, 0.0],
        [0.0, e_c, -e_s],
        [0.0, e_s, e_c],
    ])

    R = np.dot(R_rot_x, R_rot_y)
    rvec_start, _ = cv.Rodrigues(R)

    return rvec_start, tvec_start


def calc_initial_estimate(sensor_sweeps):
    yaw = estimate_yaw_to_base_station(sensor_sweeps)
    return generate_initial_estimate(yaw)


def cam_to_world(rvec_c, tvec_c):
    R_c, _ = cv.Rodrigues(rvec_c)
    R_w = np.linalg.inv(R_c)
    tvec_w = -np.matmul(R_w, tvec_c)
    return R_w, tvec_w


def opencv_to_cf(R_cv, t_cv):
    R_opencv_to_cf = np.array([
        [0.0, 0.0, 1.0],
        [-1.0, 0.0, 0.0],
        [0.0, -1.0, 0.0],
    ])

    R_cf_to_opencv = np.array([
        [0.0, -1.0, 0.0],
        [0.0, 0.0, -1.0],
        [1.0, 0.0, 0.0],
    ])

    t_cf = np.dot(R_opencv_to_cf, t_cv)
    R_cf = np.dot(R_opencv_to_cf, np.dot(R_cv, R_cf_to_opencv))

    return R_cf, t_cf


def estimate_geometry(sensor_sweeps, rvec_start, tvec_start):
    sensor_distance_width = 0.015
    sensor_distance_length = 0.03

    # Sensor positions in world coordinates, open cv style
    lighthouse_3d = np.float32(
        [
            [-sensor_distance_width / 2, 0, -sensor_distance_length / 2],
            [sensor_distance_width / 2, 0, -sensor_distance_length / 2],
            [-sensor_distance_width / 2, 0, sensor_distance_length / 2],
            [sensor_distance_width / 2, 0, sensor_distance_length / 2]
        ])

    # Sensors as seen by the "camera"
    lighthouse_image_projection = np.float32(
        [
            [-math.tan(sensor_sweeps[0][0]), -math.tan(sensor_sweeps[0][1])],
            [-math.tan(sensor_sweeps[1][0]), -math.tan(sensor_sweeps[1][1])],
            [-math.tan(sensor_sweeps[2][0]), -math.tan(sensor_sweeps[2][1])],
            [-math.tan(sensor_sweeps[3][0]), -math.tan(sensor_sweeps[3][1])]
        ])

    # Camera matrix
    K = np.float64(
        [
            [1.0, 0.0, 0.0],
            [0.0, 1.0, 0.0],
            [0.0, 0.0, 1.0]
        ])

    dist_coef = np.zeros(4)

    _ret, rvec_est, tvec_est = cv.solvePnP(
        lighthouse_3d,
        lighthouse_image_projection,
        K,
        dist_coef,
        flags=cv.SOLVEPNP_ITERATIVE,
        rvec=rvec_start,
        tvec=tvec_start,
        useExtrinsicGuess=True)

    if not _ret:
        raise Exception("No solution found")

    Rw_ocv, Tw_ocv = cam_to_world(rvec_est, tvec_est)
    return Rw_ocv, Tw_ocv

def print_geo(rotation_cf, position_cf):
    print("{.origin = {", end='')
    for i in position_cf:
        print("{:0.6f}, ".format(i), end='')

    print("}, .mat = {", end='')

    for i in rotation_cf:
        print("{", end='')
        for j in i:
            print("{:0.6f}, ".format(j), end='')
        print("}, ", end='')

    print("}},")

class WriteMem:
    def __init__(self, scf, bs1, bs2):
        self.data_written = False

        mems = scf.cf.mem.get_mems(MemoryElement.TYPE_LH)

        count = len(mems)
        if count != 1:
            raise Exception('Unexpected nr of memories found:', count)

        mems[0].geometry_data = [bs1, bs2]
        mems[0].write_data(self._data_written)

        while not self.data_written:
            time.sleep(1)

    def _data_written(self, mem, addr):
        self.data_written = True
        print('Data written')


def upload_geo_data(scf, geometries):
    bs1 = LighthouseBsGeometry()
    bs1.rotation_matrix = geometries[0][0]
    bs1.origin = geometries[0][1]

    bs2 = LighthouseBsGeometry()
    bs2.rotation_matrix = geometries[1][0]
    bs2.origin = geometries[1][1]

    WriteMem(scf, bs1, bs2)


def sanity_check(position_cf):
    max_pos = 10.0
    for coord in position_cf:
        if (abs(coord) > max_pos):
            return False
    return True

def calculate_lh_bs_data(scf):
    print("Reading sensor data...")
    sensor_sweeps_all = read_sensors(scf)
    print("Estimating position of base stations...")

    geometries = []
    for bs in range(2):
        sensor_sweeps = sensor_sweeps_all[bs]
        rvec_start, tvec_start = calc_initial_estimate(sensor_sweeps)
        geometry = estimate_geometry(sensor_sweeps, rvec_start, tvec_start)
        rotation_cf, position_cf = opencv_to_cf(geometry[0], geometry[1])

        if not sanity_check(position_cf):
            position_cf = [0, 0, 0]
            rotation_cf = [[1, 0, 0], [0, 1, 0], [0, 0, 1]]
            print("Could not find valid solution for base station ", bs)

        print_geo(rotation_cf, position_cf)
        geometries.append([rotation_cf, position_cf])

    # Do we need this to be optional?
    if args.write:
        print("Uploading geo data")
        upload_geo_data(scf, geometries)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--write", help="upload the calculated geo data to the Crazflie", action="store_true")
    args = parser.parse_args()

    logging.basicConfig(level=logging.ERROR)
    cflib.crtp.init_drivers(enable_debug_driver=False)

    factory = CachedCfFactory(rw_cache='./cache')
    with Swarm(uris, factory=factory) as swarm:
        # The current values of all parameters are downloaded as a part of the
        # connections sequence. Since this clogs up connection and takes a while
        # we do it in this script rather than the flight script
        print('Waiting for parameters to be downloaded...')
        swarm.parallel(wait_for_param_download)

        # Here we do the actual LH geometry calculation
        warm.parallel(calculate_lh_bs_data)
I tried modifying the get_bs_geometry.py script to work in swarm (to make this process more streamlined and possibly faster for bigger drone amounts). However, when I run the script, despite the output indicating that it all succeeded, there appears to be some missing parts of the expected outputs in the console (e.g. "Data written" gets printed out less times, the individual BS position data count isn't equal to 2*drone count, but is less (usually 4)). Also, the position data in the drones seems quite the bit off.
So here are my questions:
  • What am I doing wrong here?
  • Is this even a good idea in the first place?
kimberly
Bitcraze
Posts: 1050
Joined: Fri Jul 06, 2018 11:13 am

Re: I tried to make get_bs_geometry.py swarm, but it fails

Post by kimberly »

Hmmm... is this handy to do though? It is meant to use on just one crazyflie (to get one world orgin) and flash the outcome to all the crazyflies in the swarm. If you do this script on all the crazyflies, they will all think that they are at coordinate (0,0,0), but with a swarm you generally want all of them to have the same coordinate system.

Moreover, the outcome of this script needs to be double check because sometimes the solution is flipped in axis, which you don't want to do for each crazyflie.

So I technically don't think its handy to do it like this. It would be better to first connect to one crazyflie, get the geometry, check it, and then write that to the memory of the others (check upload_geo_data).

Just remember that writing it in memory is not permanent, and you need to do the same thing every time a crazyflie restarts. With a swarm it might be handier to hardcode it, like in the lighthouse app api
Post Reply