Pyobjc script to control CF with BLE
Posted: Sat Feb 14, 2015 1:30 pm
I want to control my CF with BLE on my macbook. I had a look at the iOS client and the Python API and wrote following script :
It is able to connect to the CF and send it packets. The led on the left front arm of the CF flashes with each packet sent.
Now I'm trying to write the logic for assembling the commander packets, but the CF doesn't respond at all.
I've copied a valid commander packet from the iOS client (300000000000000000008000000000ea60, see send_setpoint), which sets the thrust to a certain value, but the CF doesn't spin the engines.
Am I missing something obvious here? I'm guessing I don't need the 0xaa, nor the checksum. Or do I?
Any help will be greatly appreciated.
Code: Select all
import time
import struct
from threading import Timer
from binascii import *
import objc
from PyObjCTools import AppHelper
import array
objc.loadBundle("CoreBluetooth", globals(),
bundle_path=objc.pathForFramework(u'/System/Library/Frameworks/IOBluetooth.framework/Versions/A/Frameworks/CoreBluetooth.framework'))
crazyflie_service = CBUUID.UUIDWithString_(u'00000201-1C7F-4F9E-947B-43B7C00A9A08')
crtp_characteristic = CBUUID.UUIDWithString_(u'00000202-1C7F-4F9E-947B-43B7C00A9A08')
def main():
cf = BLECrazyFlie()
# add methods that the crazyflie executes
cf.add_callback(hover)
#cf.send_setpoint(0, 0, 0, 45000)
manager = CBCentralManager.alloc()
manager.initWithDelegate_queue_options_(cf, None, None)
AppHelper.runConsoleEventLoop(None, True, 'NSDefaultRunLoopMode')
def hover(cf):
# send thrust 45000
while 1:
cf.send_setpoint(50, 100, 250, 45000)
time.sleep(0.5)
# stop thrust, start hover
#cf.set_param('flightmode.althold', 'True')
#cf.commander.send_setpoint(0, 0, 0, 32767)
#while 1:
# cf.commander.send_setpoint(0,0,0,32767)
# time.sleep(0.5)
class BLECrazyFlie():
def __init__(self):
self.manager = None
self.peripheral = None
self.service = None
self.crtp_characteristic = None
self.connected = False
self.callbacks = []
def send_setpoint(self, roll, pitch, yaw, thrust):
#data = struct.pack('<fffH', roll, -pitch, yaw, thrust)
test_pk = '300000000000000000008000000000ea60'
#print struct.unpack('<idddH', test_pk)
#bytes = bytearray.fromhex(test_pk)
bytes = NSData.dataWithBytes_length_(test_pk, len(test_pk))
print bytes
self.peripheral.writeValue_forCharacteristic_type_(bytes, self.crtp_characteristic, 1)
def add_callback(self, cb):
if ((cb in self.callbacks) is False):
self.callbacks.append(cb)
def remove_callback(self, cb):
self.callbacks.remove(cb)
def call(self, *args):
for cb in self.callbacks:
cb(*args)
def centralManagerDidUpdateState_(self, manager):
if self.connected == False:
self.manager = manager
manager.scanForPeripheralsWithServices_options_(None, None)
def centralManager_didDiscoverPeripheral_advertisementData_RSSI_(self, manager, peripheral, data, rssi):
print peripheral.name()
if peripheral.name() == 'Crazyflie':
manager.stopScan()
self.peripheral = peripheral
manager.connectPeripheral_options_(peripheral, None)
def centralManager_didConnectPeripheral_(self, manager, peripheral):
print 'Connected to ' + peripheral.name()
self.connected = True
self.peripheral.setDelegate_(self)
self.peripheral.discoverServices_([crazyflie_service])
def centralManager_didFailToConnectPeripheral_error_(self, manager, peripheral, error):
print repr(error)
def centralManager_didDisconnectPeripheral_error_(self, manager, peripheral, error):
self.connected = False
print repr(error)
AppHelper.stopEventLoop()
def peripheral_didDiscoverServices_(self, peripheral, services):
self.service = self.peripheral.services()[0]
self.peripheral.discoverCharacteristics_forService_([crtp_characteristic], self.service)
def peripheral_didDiscoverCharacteristicsForService_error_(self, peripheral, service, error):
for characteristic in self.service.characteristics():
if characteristic.UUID().UUIDString() == crtp_characteristic.UUIDString():
self.crtp_characteristic = characteristic
self.peripheral.setNotifyValue_forCharacteristic_(True, self.crtp_characteristic)
def peripheral_didWriteValueForCharacteristic_error_(self, peripheral, characteristic, error):
if error != None:
print repr(error)
else:
print 'Sent!'
def peripheral_didUpdateNotificationStateForCharacteristic_error_(self, peripheral, characteristic, error):
print 'Receiving notifications'
self.call(self)
def peripheral_didUpdateValueForCharacteristic_error_(self, peripheral, characteristic, error):
print repr(characteristic.value().bytes().tobytes())
if __name__ == "__main__":
main()
Now I'm trying to write the logic for assembling the commander packets, but the CF doesn't respond at all.
I've copied a valid commander packet from the iOS client (300000000000000000008000000000ea60, see send_setpoint), which sets the thrust to a certain value, but the CF doesn't spin the engines.
Am I missing something obvious here? I'm guessing I don't need the 0xaa, nor the checksum. Or do I?
Any help will be greatly appreciated.