Page 1 of 1

How is asynchronous logging realized?

Posted: Mon Jun 28, 2021 8:44 pm
by DarkKnight
Hello,

When I try to learn how to use the logging function in your website(https://www.bitcraze.io/documentation/r ... log_param/). I especially feel interested about asynchronous logging. here are some parts that I feel confused:
Q1: What is the principle behind the log callback function? why asynchronous logging can be achieved in log callback function? Is there a new thread created here?(this is the question that I care most)

Q2:I tried to use LogConfig() class to create two instances since I have too many variables to log and stored them into corresponding csv files. For example, the frequency of logging of both instances is set 100Hz, and the logging time is 10s. Ideally, there should be 1000 data points in each csv file. However, the data points of each file is much less than 1000. is there anything that I do it wrong?

Here is the main function

Code: Select all

if __name__ == '__main__':
    cflib.crtp.init_drivers()

    # This will create a new NatNet client
    streamingClient = NatNetClient()

    # Configure the streaming client to call our rigid body handler on the emulator to send data out.
    streamingClient.newFrameListener = receiveNewFrame
    streamingClient.rigidBodyListener = receiveRigidBodyFrame

    with open("IEKF_pose.csv","a",newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        writer.writerow(["time","x","y","z","q0","q1","q2","q3"])
    csvfile.close()

    with open("IEKF_stateEstimate.csv","a",newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        writer.writerow(["time","stateEstimate.x","stateEstimate.y","stateEstimate.z","stateEstimate.qw","stateEstimate.qx","stateEstimate.qy","stateEstimate.qz",\
                        "stateEstimate.roll","stateEstimate.pitch", "stateEstimate.yaw"])
    csvfile.close()
    
    global scf
    with SyncCrazyflie(URI, cf=Crazyflie(rw_cache='./cache')) as scf:

        scf.cf.param.add_update_callback(group='deck', name='bcFlow2',
                                         cb=param_deck_flow)
        time.sleep(1)

        with open("IEKF_imu.csv","a",newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=',')
            writer.writerow(["time","acc.x","acc.y","acc.z","gyro.x","gyro.y","gyro.z","stabilizer.thrust","stabilizer.yaw"])
        csvfile.close()

        logconf = LogConfig(name='Acc_Gyro_Thrust_Yaw', period_in_ms=10)
        logconf.add_variable('acc.x', 'FP16')
        logconf.add_variable('acc.y', 'FP16')
        logconf.add_variable('acc.z', 'FP16')
        logconf.add_variable('gyro.x', 'FP16')
        logconf.add_variable('gyro.y', 'FP16')
        logconf.add_variable('gyro.z', 'FP16')
        logconf.add_variable('stabilizer.thrust', 'FP16')
        logconf.add_variable('stabilizer.yaw', 'FP16')

        scf.cf.log.add_config(logconf)
        logconf.data_received_cb.add_callback(log_IEKF_imu_callback)

        logconf1 = LogConfig(name='stateEstimate', period_in_ms=10)
        logconf1.add_variable('stateEstimate.x', 'FP16')
        logconf1.add_variable('stateEstimate.y', 'FP16')
        logconf1.add_variable('stateEstimate.z', 'FP16')
        logconf1.add_variable('stateEstimate.qw', 'FP16')
        logconf1.add_variable('stateEstimate.qx', 'FP16')
        logconf1.add_variable('stateEstimate.qy', 'FP16')
        logconf1.add_variable('stateEstimate.qz', 'FP16')
        logconf1.add_variable('stateEstimate.roll', 'FP16')
        logconf1.add_variable('stateEstimate.pitch', 'FP16')
        logconf1.add_variable('stateEstimate.yaw', 'FP16')
        scf.cf.log.add_config(logconf1)
        logconf1.data_received_cb.add_callback(log_stateEstimate_callback)

        activate_kalman_estimator(scf.cf)
        activate_high_level_commander(scf.cf)
        reset_estimator(scf.cf)

        # Start up the streaming client now that the callbacks are set up.
        # This will run perpetually, and operate on a separate thread.
        streamingClient.run()
        
        logconf.start()
        logconf1.start()
        time.sleep(10)
        logconf.stop()
        logconf1.stop()
        
Here is the log callback function:

Code: Select all

def log_IEKF_imu_callback(timestamp, data, logconf):
    #print(data)
    global IEKF_imu
    IEKF_imu[0] = data['acc.x']
    IEKF_imu[1] = data['acc.y']
    IEKF_imu[2] = data['acc.z']
    IEKF_imu[3] = data['gyro.x']
    IEKF_imu[4] = data['gyro.y']
    IEKF_imu[5] = data['gyro.z']
    IEKF_imu[6] = data['stabilizer.thrust']
    IEKF_imu[7] = data['stabilizer.yaw']
    #the following process is used to convert the float data type to Half‑precision floating data type 
    hexVal0=hex(IEKF_imu[0])
    hexVal1=hex(IEKF_imu[1])
    hexVal2=hex(IEKF_imu[2])
    hexVal3=hex(IEKF_imu[3])
    hexVal4=hex(IEKF_imu[4])
    hexVal5=hex(IEKF_imu[5])
    hexVal6=hex(IEKF_imu[6])
    hexVal7=hex(IEKF_imu[7])

    y0 = struct.pack("h",int(hexVal0,16))
    y1 = struct.pack("h",int(hexVal1,16))
    y2 = struct.pack("h",int(hexVal2,16))
    y3 = struct.pack("h",int(hexVal3,16))
    y4 = struct.pack("h",int(hexVal4,16))
    y5 = struct.pack("h",int(hexVal5,16))
    y6 = struct.pack("h",int(hexVal6,16))
    y7 = struct.pack("h",int(hexVal7,16))

    IEKF_imu[0] = np.frombuffer(y0,dtype=np.float16)[0]
    IEKF_imu[1] = np.frombuffer(y1,dtype=np.float16)[0]
    IEKF_imu[2] = np.frombuffer(y2,dtype=np.float16)[0]
    IEKF_imu[3] = np.frombuffer(y3,dtype=np.float16)[0]
    IEKF_imu[4] = np.frombuffer(y4,dtype=np.float16)[0]
    IEKF_imu[5] = np.frombuffer(y5,dtype=np.float16)[0]
    IEKF_imu[6] = np.frombuffer(y6,dtype=np.float16)[0]
    IEKF_imu[7] = np.frombuffer(y7,dtype=np.float16)[0]

    print('IEKF_imu:', IEKF_imu)

    with open("IEKF_imu.csv","a",newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        writer.writerow([timestamp, IEKF_imu[0], IEKF_imu[1], IEKF_imu[2], IEKF_imu[3], IEKF_imu[4], IEKF_imu[5], IEKF_imu[6], IEKF_imu[7]])
        #writer.writerow([timestamp, IEKF_imu[0], IEKF_imu[1], IEKF_imu[2]])
    csvfile.close()

def log_stateEstimate_callback(timestamp, data, logconf):
    
    poseStateEstimate[0] = data['stateEstimate.x']
    poseStateEstimate[1] = data['stateEstimate.y']
    poseStateEstimate[2] = data['stateEstimate.z']
    poseStateEstimate[3] = data['stateEstimate.qw']
    poseStateEstimate[4] = data['stateEstimate.qx']
    poseStateEstimate[5] = data['stateEstimate.qy']
    poseStateEstimate[6] = data['stateEstimate.qz']
    poseStateEstimate[7] = data['stateEstimate.roll']
    poseStateEstimate[8] = data['stateEstimate.pitch']
    poseStateEstimate[9] = data['stateEstimate.yaw']
    #the following process is used to convert the float data type to Half‑precision floating data type 
    hexVal0=hex(poseStateEstimate[0])
    hexVal1=hex(poseStateEstimate[1])
    hexVal2=hex(poseStateEstimate[2])
    hexVal3=hex(poseStateEstimate[3])
    hexVal4=hex(poseStateEstimate[4])
    hexVal5=hex(poseStateEstimate[5])
    hexVal6=hex(poseStateEstimate[6])
    hexVal7=hex(poseStateEstimate[7])
    hexVal8=hex(poseStateEstimate[8])
    hexVal9=hex(poseStateEstimate[9])

    y0 = struct.pack("h",int(hexVal0,16))
    y1 = struct.pack("h",int(hexVal1,16))
    y2 = struct.pack("h",int(hexVal2,16))
    y3 = struct.pack("h",int(hexVal3,16))
    y4 = struct.pack("h",int(hexVal4,16))
    y5 = struct.pack("h",int(hexVal5,16))
    y6 = struct.pack("h",int(hexVal6,16))
    y7 = struct.pack("h",int(hexVal7,16))
    y8 = struct.pack("h",int(hexVal8,16))
    y9 = struct.pack("h",int(hexVal9,16))

    poseStateEstimate[0] = np.frombuffer(y0,dtype=np.float16)[0]
    poseStateEstimate[1] = np.frombuffer(y1,dtype=np.float16)[0]
    poseStateEstimate[2] = np.frombuffer(y2,dtype=np.float16)[0]
    poseStateEstimate[3] = np.frombuffer(y3,dtype=np.float16)[0]
    poseStateEstimate[4] = np.frombuffer(y4,dtype=np.float16)[0]
    poseStateEstimate[5] = np.frombuffer(y5,dtype=np.float16)[0]
    poseStateEstimate[6] = np.frombuffer(y6,dtype=np.float16)[0]
    poseStateEstimate[7] = np.frombuffer(y7,dtype=np.float16)[0]
    poseStateEstimate[8] = np.frombuffer(y8,dtype=np.float16)[0]
    poseStateEstimate[9] = np.frombuffer(y9,dtype=np.float16)[0]

    print('poseStateEstimate:', poseStateEstimate)

    with open("IEKF_stateEstimate.csv","a",newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        writer.writerow([timestamp,poseStateEstimate[0],poseStateEstimate[1],poseStateEstimate[2],poseStateEstimate[3],poseStateEstimate[4],\
                        poseStateEstimate[5],poseStateEstimate[6],poseStateEstimate[7],poseStateEstimate[8],poseStateEstimate[9]])
    csvfile.close()

Re: How is asynchronous logging realized?

Posted: Tue Jun 29, 2021 4:32 am
by jonasdn
Hi DarkKnight!

Q1:
There is a thread for incoming packet that sends logging data to the correct handler/callback.
https://github.com/bitcraze/crazyflie-l ... .py#L357:7

Q2

I modified your example to get it to run:

Code: Select all

import time
import struct

import cflib
import csv
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.utils import uri_helper
from cflib.crazyflie import Crazyflie



URI = uri_helper.uri_from_env(default='radio://0/56/2M/E7E7E7E7E7')

def log_IEKF_imu_callback(timestamp, data, logconf):
    #print(data)
    IEKF_imu = [0.0] * 8
    IEKF_imu[0] = data['acc.x']
    IEKF_imu[1] = data['acc.y']
    IEKF_imu[2] = data['acc.z']
    IEKF_imu[3] = data['gyro.x']
    IEKF_imu[4] = data['gyro.y']
    IEKF_imu[5] = data['gyro.z']
    IEKF_imu[6] = data['stabilizer.thrust']
    IEKF_imu[7] = data['stabilizer.yaw']
    #the following process is used to convert the float data type to Half‑precision floating data type
    # hexVal0=hex(IEKF_imu[0])
    # hexVal1=hex(IEKF_imu[1])
    # hexVal2=hex(IEKF_imu[2])
    # hexVal3=hex(IEKF_imu[3])
    # hexVal4=hex(IEKF_imu[4])
    # hexVal5=hex(IEKF_imu[5])
    # hexVal6=hex(IEKF_imu[6])
    # hexVal7=hex(IEKF_imu[7])

    # y0 = struct.pack("h",int(hexVal0,16))
    # y1 = struct.pack("h",int(hexVal1,16))
    # y2 = struct.pack("h",int(hexVal2,16))
    # y3 = struct.pack("h",int(hexVal3,16))
    # y4 = struct.pack("h",int(hexVal4,16))
    # y5 = struct.pack("h",int(hexVal5,16))
    # y6 = struct.pack("h",int(hexVal6,16))
    # y7 = struct.pack("h",int(hexVal7,16))

    # IEKF_imu[0] = np.frombuffer(y0,dtype=np.float16)[0]
    # IEKF_imu[1] = np.frombuffer(y1,dtype=np.float16)[0]
    # IEKF_imu[2] = np.frombuffer(y2,dtype=np.float16)[0]
    # IEKF_imu[3] = np.frombuffer(y3,dtype=np.float16)[0]
    # IEKF_imu[4] = np.frombuffer(y4,dtype=np.float16)[0]
    # IEKF_imu[5] = np.frombuffer(y5,dtype=np.float16)[0]
    # IEKF_imu[6] = np.frombuffer(y6,dtype=np.float16)[0]
    # IEKF_imu[7] = np.frombuffer(y7,dtype=np.float16)[0]

    print('IEKF_imu:', IEKF_imu)

    with open("IEKF_imu.csv","a",newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        writer.writerow([timestamp, IEKF_imu[0], IEKF_imu[1], IEKF_imu[2], IEKF_imu[3], IEKF_imu[4], IEKF_imu[5], IEKF_imu[6], IEKF_imu[7]])
        #writer.writerow([timestamp, IEKF_imu[0], IEKF_imu[1], IEKF_imu[2]])
    csvfile.close()

def log_stateEstimate_callback(timestamp, data, logconf):
    poseStateEstimate = [0] * 10

    poseStateEstimate[0] = data['stateEstimate.x']
    poseStateEstimate[1] = data['stateEstimate.y']
    poseStateEstimate[2] = data['stateEstimate.z']
    poseStateEstimate[3] = data['stateEstimate.qw']
    poseStateEstimate[4] = data['stateEstimate.qx']
    poseStateEstimate[5] = data['stateEstimate.qy']
    poseStateEstimate[6] = data['stateEstimate.qz']
    poseStateEstimate[7] = data['stateEstimate.roll']
    poseStateEstimate[8] = data['stateEstimate.pitch']
    poseStateEstimate[9] = data['stateEstimate.yaw']
    #the following process is used to convert the float data type to Half‑precision floating data type
    # hexVal0=hex(poseStateEstimate[0])
    # hexVal1=hex(poseStateEstimate[1])
    # hexVal2=hex(poseStateEstimate[2])
    # hexVal3=hex(poseStateEstimate[3])
    # hexVal4=hex(poseStateEstimate[4])
    # hexVal5=hex(poseStateEstimate[5])
    # hexVal6=hex(poseStateEstimate[6])
    # hexVal7=hex(poseStateEstimate[7])
    # hexVal8=hex(poseStateEstimate[8])
    # hexVal9=hex(poseStateEstimate[9])

    # y0 = struct.pack("h",int(hexVal0,16))
    # y1 = struct.pack("h",int(hexVal1,16))
    # y2 = struct.pack("h",int(hexVal2,16))
    # y3 = struct.pack("h",int(hexVal3,16))
    # y4 = struct.pack("h",int(hexVal4,16))
    # y5 = struct.pack("h",int(hexVal5,16))
    # y6 = struct.pack("h",int(hexVal6,16))
    # y7 = struct.pack("h",int(hexVal7,16))
    # y8 = struct.pack("h",int(hexVal8,16))
    # y9 = struct.pack("h",int(hexVal9,16))

    # poseStateEstimate[0] = np.frombuffer(y0,dtype=np.float16)[0]
    # poseStateEstimate[1] = np.frombuffer(y1,dtype=np.float16)[0]
    # poseStateEstimate[2] = np.frombuffer(y2,dtype=np.float16)[0]
    # poseStateEstimate[3] = np.frombuffer(y3,dtype=np.float16)[0]
    # poseStateEstimate[4] = np.frombuffer(y4,dtype=np.float16)[0]
    # poseStateEstimate[5] = np.frombuffer(y5,dtype=np.float16)[0]
    # poseStateEstimate[6] = np.frombuffer(y6,dtype=np.float16)[0]
    # poseStateEstimate[7] = np.frombuffer(y7,dtype=np.float16)[0]
    # poseStateEstimate[8] = np.frombuffer(y8,dtype=np.float16)[0]
    # poseStateEstimate[9] = np.frombuffer(y9,dtype=np.float16)[0]

    print('poseStateEstimate:', poseStateEstimate)

    with open("IEKF_stateEstimate.csv","a",newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        writer.writerow([timestamp,poseStateEstimate[0],poseStateEstimate[1],poseStateEstimate[2],poseStateEstimate[3],poseStateEstimate[4],\
                        poseStateEstimate[5],poseStateEstimate[6],poseStateEstimate[7],poseStateEstimate[8],poseStateEstimate[9]])


if __name__ == '__main__':
    cflib.crtp.init_drivers()

    # This will create a new NatNet client
    # streamingClient = NatNetClient()

    # # Configure the streaming client to call our rigid body handler on the emulator to send data out.
    # streamingClient.newFrameListener = receiveNewFrame
    # streamingClient.rigidBodyListener = receiveRigidBodyFrame

    with open("IEKF_pose.csv","a",newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        writer.writerow(["time","x","y","z","q0","q1","q2","q3"])

    with open("IEKF_stateEstimate.csv","a",newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        writer.writerow(["time","stateEstimate.x","stateEstimate.y","stateEstimate.z","stateEstimate.qw","stateEstimate.qx","stateEstimate.qy","stateEstimate.qz",\
                        "stateEstimate.roll","stateEstimate.pitch", "stateEstimate.yaw"])

    global scf
    with SyncCrazyflie(URI, cf=Crazyflie(rw_cache='./cache')) as scf:

        with open("IEKF_imu.csv","a",newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=',')
            writer.writerow(["time","acc.x","acc.y","acc.z","gyro.x","gyro.y","gyro.z","stabilizer.thrust","stabilizer.yaw"])
        csvfile.close()

        logconf = LogConfig(name='Acc_Gyro_Thrust_Yaw', period_in_ms=10)
        logconf.add_variable('acc.x', 'FP16')
        logconf.add_variable('acc.y', 'FP16')
        logconf.add_variable('acc.z', 'FP16')
        logconf.add_variable('gyro.x', 'FP16')
        logconf.add_variable('gyro.y', 'FP16')
        logconf.add_variable('gyro.z', 'FP16')
        logconf.add_variable('stabilizer.thrust', 'FP16')
        logconf.add_variable('stabilizer.yaw', 'FP16')

        scf.cf.log.add_config(logconf)
        logconf.data_received_cb.add_callback(log_IEKF_imu_callback)

        logconf1 = LogConfig(name='stateEstimate', period_in_ms=10)
        logconf1.add_variable('stateEstimate.x', 'FP16')
        logconf1.add_variable('stateEstimate.y', 'FP16')
        logconf1.add_variable('stateEstimate.z', 'FP16')
        logconf1.add_variable('stateEstimate.qw', 'FP16')
        logconf1.add_variable('stateEstimate.qx', 'FP16')
        logconf1.add_variable('stateEstimate.qy', 'FP16')
        logconf1.add_variable('stateEstimate.qz', 'FP16')
        logconf1.add_variable('stateEstimate.roll', 'FP16')
        logconf1.add_variable('stateEstimate.pitch', 'FP16')
        logconf1.add_variable('stateEstimate.yaw', 'FP16')
        scf.cf.log.add_config(logconf1)
        logconf1.data_received_cb.add_callback(log_stateEstimate_callback)

        # Start up the streaming client now that the callbacks are set up.
        # This will run perpetually, and operate on a separate thread.
        # streamingClient.run()

        logconf.start()
        logconf1.start()
        time.sleep(10)
        logconf.stop()
        logconf1.stop()

And after running this:

Code: Select all

$ wc -l IEKF_imu.csv 
1011 IEKF_imu.csv
$ wc -l IEKF_stateEstimate.csv 
1013 IEKF_stateEstimate.csv
So it seems wer can get and write in 100hz, maybe you have something that takes a lot of CPU in your handlers if this does not work for you?

Re: How is asynchronous logging realized?

Posted: Wed Jun 30, 2021 9:35 pm
by DarkKnight
Hey,Jonasdn,

Thank you for your reply. For Q1: is that means if I use LogConfig() class to create two instances and there are two corresponding callback functions. Then two threads are also created?

For Q2, I also tried your modified example, but the data and number of data are both wrong(attached here). What do you mean for the" I have something that takes a lot of CPU in my handlers"?

Re: How is asynchronous logging realized?

Posted: Tue Jul 06, 2021 4:45 am
by jonasdn
Hi!

Q1: No there is only one thread that can call multiple callbacks.

Q2: If you have something that takes a lot of CPU in your loop then you might miss some packets.

Are you in a VM, then the additional radio latency it might be hard for you to receive all packets, would you have the option to try without a VM?

Re: How is asynchronous logging realized?

Posted: Wed Jul 21, 2021 3:00 pm
by DarkKnight
Yes. I am using a VM, and I believe that is the reason why the data is missing. I have done some tests without a VM, your modified example works well and the data quality is perfect.

Thank you very much!