Page 1 of 1

Pyobjc script to control CF with BLE

Posted: Sat Feb 14, 2015 1:30 pm
by phil
I want to control my CF with BLE on my macbook. I had a look at the iOS client and the Python API and wrote following script :

Code: Select all

import time
import struct
from threading import Timer
from binascii import *
import objc
from PyObjCTools import AppHelper
import array

objc.loadBundle("CoreBluetooth", globals(),
    bundle_path=objc.pathForFramework(u'/System/Library/Frameworks/IOBluetooth.framework/Versions/A/Frameworks/CoreBluetooth.framework'))

crazyflie_service = CBUUID.UUIDWithString_(u'00000201-1C7F-4F9E-947B-43B7C00A9A08')
crtp_characteristic = CBUUID.UUIDWithString_(u'00000202-1C7F-4F9E-947B-43B7C00A9A08')

def main():
	cf = BLECrazyFlie()
	# add methods that the crazyflie executes
	cf.add_callback(hover)
	#cf.send_setpoint(0, 0, 0, 45000)
	manager = CBCentralManager.alloc()
	manager.initWithDelegate_queue_options_(cf, None, None)
	
	AppHelper.runConsoleEventLoop(None, True, 'NSDefaultRunLoopMode')

def hover(cf):
	# send thrust 45000
	while 1:
		cf.send_setpoint(50, 100, 250, 45000)
		time.sleep(0.5)

	# stop thrust, start hover
	#cf.set_param('flightmode.althold', 'True')
	#cf.commander.send_setpoint(0, 0, 0, 32767)
	#while 1:
	#	cf.commander.send_setpoint(0,0,0,32767)
	#	time.sleep(0.5)

class BLECrazyFlie():
    def __init__(self):
        self.manager = None
        self.peripheral = None
        self.service = None
        self.crtp_characteristic = None
        self.connected = False
        self.callbacks = []
    
    def send_setpoint(self, roll, pitch, yaw, thrust):
		#data = struct.pack('<fffH', roll, -pitch, yaw, thrust)
		test_pk = '300000000000000000008000000000ea60'
		#print struct.unpack('<idddH', test_pk)
		#bytes = bytearray.fromhex(test_pk)
		bytes = NSData.dataWithBytes_length_(test_pk, len(test_pk))
		print bytes
		self.peripheral.writeValue_forCharacteristic_type_(bytes, self.crtp_characteristic, 1)
    	
    def add_callback(self, cb):
        if ((cb in self.callbacks) is False):
            self.callbacks.append(cb)

    def remove_callback(self, cb):
        self.callbacks.remove(cb)

    def call(self, *args):
        for cb in self.callbacks:
            cb(*args)
		
    def centralManagerDidUpdateState_(self, manager):
		if self.connected == False:
			self.manager = manager
			manager.scanForPeripheralsWithServices_options_(None, None)

    def centralManager_didDiscoverPeripheral_advertisementData_RSSI_(self, manager, peripheral, data, rssi):
		print peripheral.name()
		if peripheral.name() == 'Crazyflie':
			manager.stopScan()
			self.peripheral = peripheral
			manager.connectPeripheral_options_(peripheral, None)

    def centralManager_didConnectPeripheral_(self, manager, peripheral):
		print 'Connected to ' + peripheral.name()
		self.connected = True
		self.peripheral.setDelegate_(self)
		self.peripheral.discoverServices_([crazyflie_service])

    def centralManager_didFailToConnectPeripheral_error_(self, manager, peripheral, error):
        print repr(error)

    def centralManager_didDisconnectPeripheral_error_(self, manager, peripheral, error):
		self.connected = False
		print repr(error)
		AppHelper.stopEventLoop()

    def peripheral_didDiscoverServices_(self, peripheral, services):
		self.service = self.peripheral.services()[0]
		self.peripheral.discoverCharacteristics_forService_([crtp_characteristic], self.service)
        
    def peripheral_didDiscoverCharacteristicsForService_error_(self, peripheral, service, error):
		for characteristic in self.service.characteristics():
			if characteristic.UUID().UUIDString() == crtp_characteristic.UUIDString():
				self.crtp_characteristic = characteristic
				self.peripheral.setNotifyValue_forCharacteristic_(True, self.crtp_characteristic)

    def peripheral_didWriteValueForCharacteristic_error_(self, peripheral, characteristic, error):
		if error != None:
			print repr(error)
		else:
			print 'Sent!'

    def peripheral_didUpdateNotificationStateForCharacteristic_error_(self, peripheral, characteristic, error):
        print 'Receiving notifications'
        self.call(self)

    def peripheral_didUpdateValueForCharacteristic_error_(self, peripheral, characteristic, error):
        print repr(characteristic.value().bytes().tobytes())

if __name__ == "__main__":
	main()

It is able to connect to the CF and send it packets. The led on the left front arm of the CF flashes with each packet sent.
Now I'm trying to write the logic for assembling the commander packets, but the CF doesn't respond at all.
I've copied a valid commander packet from the iOS client (300000000000000000008000000000ea60, see send_setpoint), which sets the thrust to a certain value, but the CF doesn't spin the engines.
Am I missing something obvious here? I'm guessing I don't need the 0xaa, nor the checksum. Or do I?
Any help will be greatly appreciated.

Re: Pyobjc script to control CF with BLE

Posted: Mon Feb 16, 2015 1:13 pm
by arnaud
Hi,

This is great job to run from python over bluetooth! When this works it could be used to add bluetooth support to the client on Mac :).

I modified a couple of thing and got it to send commands to the copter:

Code: Select all

import time
import struct
from threading import Timer
from binascii import *
import objc
from PyObjCTools import AppHelper
import array

objc.loadBundle("CoreBluetooth", globals(),
    bundle_path=objc.pathForFramework(u'/System/Library/Frameworks/IOBluetooth.framework/Versions/A/Frameworks/CoreBluetooth.framework'))

crazyflie_service = CBUUID.UUIDWithString_(u'00000201-1C7F-4F9E-947B-43B7C00A9A08')
crtp_characteristic = CBUUID.UUIDWithString_(u'00000202-1C7F-4F9E-947B-43B7C00A9A08')

def main():
   cf = BLECrazyFlie()
   # add methods that the crazyflie executes
   cf.add_callback(hover)
   #cf.send_setpoint(0, 0, 0, 45000)
   manager = CBCentralManager.alloc()
   manager.initWithDelegate_queue_options_(cf, None, None)
   
   AppHelper.runConsoleEventLoop(None, True, 'NSDefaultRunLoopMode')

def hover(cf):
   # send thrust 45000
   cf.send_setpoint(0, 0, 0, 0)
   for i in range(10):
      cf.send_setpoint(0, 0, 0, 15000)
      time.sleep(0.5)

   # stop thrust, start hover
   #cf.set_param('flightmode.althold', 'True')
   #cf.commander.send_setpoint(0, 0, 0, 32767)
   #while 1:
   #   cf.commander.send_setpoint(0,0,0,32767)
   #   time.sleep(0.5)

class BLECrazyFlie():
    def __init__(self):
        self.manager = None
        self.peripheral = None
        self.service = None
        self.crtp_characteristic = None
        self.connected = False
        self.callbacks = []

        self.init = False
    
    def send_setpoint(self, roll, pitch, yaw, thrust):
      data = struct.pack('<BfffH', 0x30, roll, -pitch, yaw, thrust)
      #test_pk = '300000000000000000008000000000ea60'
      print struct.unpack('<BfffH', data)
      #bytes = bytearray.fromhex(test_pk)
      bytes = NSData.dataWithBytes_length_(data, len(data))
      print bytes
      self.peripheral.writeValue_forCharacteristic_type_(bytes, self.crtp_characteristic, 1)
       
    def add_callback(self, cb):
        if ((cb in self.callbacks) is False):
            self.callbacks.append(cb)

    def remove_callback(self, cb):
        self.callbacks.remove(cb)

    def call(self, *args):
        for cb in self.callbacks:
            cb(*args)
      
    def centralManagerDidUpdateState_(self, manager):
      if self.connected == False:
         self.manager = manager
         manager.scanForPeripheralsWithServices_options_(None, None)

    def centralManager_didDiscoverPeripheral_advertisementData_RSSI_(self, manager, peripheral, data, rssi):
      print peripheral.name()
      if peripheral.name() == 'Crazyflie':
         manager.stopScan()
         self.peripheral = peripheral
         manager.connectPeripheral_options_(peripheral, None)

    def centralManager_didConnectPeripheral_(self, manager, peripheral):
      print 'Connected to ' + peripheral.name()
      self.connected = True
      self.peripheral.setDelegate_(self)
      self.peripheral.discoverServices_([crazyflie_service])

    def centralManager_didFailToConnectPeripheral_error_(self, manager, peripheral, error):
        print repr(error)

    def centralManager_didDisconnectPeripheral_error_(self, manager, peripheral, error):
      self.connected = False
      print repr(error)
      AppHelper.stopEventLoop()

    def peripheral_didDiscoverServices_(self, peripheral, services):
      self.service = self.peripheral.services()[0]
      self.peripheral.discoverCharacteristics_forService_([crtp_characteristic], self.service)
        
    def peripheral_didDiscoverCharacteristicsForService_error_(self, peripheral, service, error):
      for characteristic in self.service.characteristics():
         if characteristic.UUID().UUIDString() == crtp_characteristic.UUIDString():
            self.crtp_characteristic = characteristic
            self.peripheral.setNotifyValue_forCharacteristic_(True, self.crtp_characteristic)

    def peripheral_didWriteValueForCharacteristic_error_(self, peripheral, characteristic, error):
      if error != None:
         print repr(error)
      else:
         print 'Sent!'

    def peripheral_didUpdateNotificationStateForCharacteristic_error_(self, peripheral, characteristic, error):
        print 'Receiving notifications'
        self.call(self)

    def peripheral_didUpdateValueForCharacteristic_error_(self, peripheral, characteristic, error):
        print repr(characteristic.value().bytes().tobytes())

if __name__ == "__main__":
   main()

The main problem did seem to come from the packet encoding, it was trying to send the hex string and endded up sending nothing (I could verify that nothing was passing throw with a Bluetooth sniffer). I fixed that and re-enabled the real commander packet packing. I also added a cf.send_setpoint(0, 0, 0, 0) which is required with newer firmware to unlock the thrust.

It now spinns the motor but it sill have a couple of bugs (for example it does not quit and "packet sent" callbacks are called only at the end).

Re: Pyobjc script to control CF with BLE

Posted: Mon Feb 16, 2015 6:06 pm
by phil
Thank you for solving this! :-) :-)
It was driving me crazy. I didn't know about the necessity to send the zero setpoint, so you saved my day. Also thanks for modyfing the packet handling.
I intend to work on this to implement scripted flight in a first phase and then of course this could be integrated to provide BLE support on the Mac. I'm trying to make it a self-containing script without any need for the API, but this can be easily changed.
I've put it on github (https://github.com/ppossemiers/BLECrazyflie), so everyone can fork it. In the meantime I will try to improve the script.

Re: Pyobjc script to control CF with BLE

Posted: Mon Feb 23, 2015 5:15 pm
by phil
Ok, I cleaned up the code a bit and started work on a set_param method.
This is wat I have :

Code: Select all

	
def set_param(self, ident, pytype, value):
		#PARAM = 0x02
		#WRITE_CHANNEL = 2
		header = ((0x02 & 0x0f) << 4 | 3 << 2 |(0x02 & 0x03))
		format = '<BB' + pytype
		data = struct.pack(format, header, ident, eval(value))
		print struct.unpack(format, data)
		bytes = NSData.dataWithBytes_length_(data, len(data))
		self.peripheral.writeValue_forCharacteristic_type_(bytes, self.crtp_characteristic, 1)
And I call it like this :

Code: Select all

	# stop thrust, start hover
	print 'Now hovering'
	# ident for flightmode.althold is 10
	# https://github.com/bitcraze/crazyflie-clients-python/blob/master/lib/cflib/cache/E8BC7DAD.json
	cf.set_param(10, '?', 'True')
	cf.send_setpoint(0, 0, 0, 32767)
	while 1:
		cf.send_setpoint(0, 0, 0, 32767)
		time.sleep(0.5)
But it doesn't work :-(
Help will -as always- be very much appreciated.

Re: Pyobjc script to control CF with BLE

Posted: Sat Mar 28, 2015 7:13 am
by phil
Ok, figured it out.
I've implemented a set_param method and also the altHold mode in the script : https://github.com/ppossemiers/BLECrazyflie

Re: Pyobjc script to control CF with BLE

Posted: Tue Apr 14, 2015 1:11 am
by chad
This is great phil! I'm looking forward to making some time to try your script out!!