iCub-main
Controlling the iCub Simulator from Python
Author
Marek Rucinski

Goal

This tutorial shows how to control the iCub simulator via YARP bindings for Python. Since there is no dedicated iCub simulator API available, this has to be achieved via the YARP RPC mechanism. Therefore, this tutorial should also give you a general idea on how to use the RPC interfaces from Python.

It is assumed that you have already compiled or installed the Python bindings. This tutorial assumes basic knowledge of how YARP works in C++, familiarity with YARP RPC mechanism, the specific RPC interface of the iCub simulator (described in the simulator README) as well as some basic knowledge of Python, it's essential packages and functions.

This tutorial has the form of a small Python module which implements a WorldController class used to communicate with the iCub simulator. Full code of this module can be found in python/python_simworld_control.py

Module initialisation

As usual, we have to import all modules we'll need, most prominently the yarp python bindings.

import collections
import yarp
yarp.Network.init() # Initialise YARP

Placing the call to yarp.Network.init() directly in the module code (as opposed to putting it inside some function defined later in the module) ensures that YARP is initialised as soon as the module is imported.

WorldController class

Now we're ready to define our WorldController class. At initialisation we create an RpcClient object used for communitation and connect it to the world port of the simulator (therefore the simulator must be up and running when an object of the WorldController class is created!):

class WorldController:
"""Class for controlling iCub simulator via its RPC world port."""
def __init__(self):
self._rpc_client = yarp.RpcClient()
self._port_name = "/WorldController-" + str(id(self)) + "/commands"
self._rpc_client.open(self._port_name)
self._rpc_client.addOutput("/icubSim/world")
# A dictionary to track simulator object IDs for all types of objects
self._sim_ids_counters = collections.defaultdict(lambda:0)
# A sequence to track internal object IDs. This list stores tuples (object type, simulator id)
# so that outside one does not have to remember the type of object.
self._objects = [ ]

Including id(self) in the local port name allows us to use multiple instances of the WorldController class at the same time without having yarp port name conflicts.

At the end of the constructor we prepare two data structures which will help us manage simulator object identifiers. Object identifiers in the iCub simulator are not unique, in other words you can have two objects (of different types, e.g. a 'sph' and a 'box'), having the same identifier (e.g. 1). This is rather inconvenient because one has to keep track of object types together with their identifiers. We'll use the WorldController class to maintain a single pool of identifiers, so that we can later refer to an object via its identifier only. self._sim_ids_counters is a dictionary indexed by object type ('sph', 'box', etc.) counting the number objects of each type created so far. self._objects is a sequence of pairs (object type, simulator object id) which will translate our unique object identifiers (essentially indexes of objects on this list) to simulator identifiers.

Now we'll define two helper methods useful for RPC communication:

def _execute(self, cmd):
"""Execute an RPC command, returning obtained answer bottle."""
ans = yarp.Bottle()
self._rpc_client.write(cmd, ans)
return ans
def _is_success(self, ans):
"""Check if RPC call answer Bottle indicates successfull execution."""
return ans.size() == 1 and ans.get(0).asVocab() == 27503 # Vocab for '[ok]'

Underscore in front of a class member name is Python convention for indicating "protected" class members, i.e. those not intended to be used directly by the user.

The most annoying part of executing an RPC call is preparation of the command bottle. In our code we'll separate the command preparation code from the code which actually executes the command and checks if the call was successful.

As a starting point, let's take the "world del all" command which removes all objects created in the simulator. This is a simple command which takes no arguments, therefore the code is also quite simple:

def _prepare_del_all_command(self):
"""Prepare the "world del all" command bottle."""
result = yarp.Bottle()
result.clear()
map(result.addString, [ "world", "del", "all" ])
return result
def del_all(self):
"""Delete all objects from the simultor"""
result = self._is_success(self._execute(self._prepare_del_all_command()))
if result:
# Clear the counters
self._sim_ids_counters.clear()
del self._objects[:]
return result

Note that upon successful deletion of objects from the simulator, we update internal structures for tracking identifiers.

Now for something a little more interesting. RPC interface of the simulator allows creating various objects (spheres, cylinders, etc.) in the simulated world. Let's define a function to prepare the "world mk" command Bottle:

def _prepare_create_command(self, obj, size, location, colour):
"""Prepare an RPC command for creating an object in the simulator environment.
See Simulator Readme section 'Object Creation'
Parameters:
obj - object type string. 'sph', 'box', 'cyl' 'ssph', 'sbox' or 'scyl'.
size - list of values specifying the size of an object. Parameters depend on object type:
(s)box: [ x, y, z ]
(s)sph: [ radius ]
(s)cyl: [ radius, length ]
location - coordinates of the object location, [ x, y, z ]
colour - object colour in RGB (normalised), [ r, g, b ]
Returns:
yarp.Bottle with the command, ready to be sent to the rpc port of the simulator
"""
result = yarp.Bottle()
result.clear()
map(result.addString, ["world", "mk", obj])
map(result.addDouble, size)
map(result.addDouble, location)
map(result.addDouble, colour)
return result

Note that all dirty work, especially handling different numbers of object size parameters for different types of objects is done for us automatically thanks to the excellent support for sequences in Python. Actually, since size, location and colour are all stored in the Bottle as Doubles, we could have gotten away with only two map() calls, though this would reduce code readability a little bit.

Now the actual method to create objects:

def create_object(self, obj, size, location, colour):
"""Create an object of a specified type, size, location and colour, returning internal object ID or -1 on error."""
cmd = self._prepare_create_command(obj, size, location, colour)
if self._is_success(self._execute(cmd)):
obj_sim_id = self._sim_ids_counters[obj] + 1 # iCub simulator IDs start from 1
# Update the counters
self._sim_ids_counters[obj] += 1
self._objects.append((obj, obj_sim_id))
# Internal object IDs are shared among all types of objects and start from 0;
# they are essentially indices of the self._objects sequence
return len(self._objects) - 1
else:
return -1 # error

In a similiar way, we can define methods to move objects around the simulator...

def _prepare_move_command(self, obj, obj_id, location):
"""Prepare the "world set <obj> <xyz>" command bottle."""
result = yarp.Bottle()
result.clear()
map(result.addString, ["world", "set", obj])
result.addInt(obj_id)
map(result.addDouble, location)
return result
def move_object(self, obj_id, location):
"""Move an object specified by the internal id to another location."""
obj_desc = self._objects[obj_id]
return self._is_success(self._execute(self._prepare_move_command(obj_desc[0], obj_desc[1], location)))

... as well as rotate them, set their colours, querying their positions, etc. Let's take the latter as the final example, as it will illustrate how to handle the result Bottle returned by the RPC call:

def _prepare_get_command(self, obj, obj_id):
"""Prepare the "world get <obj> <id>" command bottle."""
result = yarp.Bottle()
result.clear()
map(result.addString, ["world", "get", obj])
result.addInt(obj_id)
return result
def get_object_location(self, obj_id):
"""Obtain the object location from the simulator. Returns None on failure."""
obj_desc = self._objects[obj_id]
result = self._execute(self._prepare_get_command(obj_desc[0], obj_desc[1]))
if result.size() == 3:
return [ result.get(i).asDouble() for i in xrange(3) ] # 3-element list with xyz coordinates
else:
return None # An error occured

Finally, we need to make sure that after the WorldController is no longer needed, it will clean up stuff after itself:

def __del__(self):
try:
if self._rpc_client <> None:
self.del_all()
self._rpc_client.close()
del self._rpc_client
except AttributeError:
pass

Now that the WorldController class definition is complete, we can use our module in the following way, for instance from iPython console:

import python_simworld_control as psc # YARP is automatically initalised now
wc = psc.WorldController() # you will see YARP spitting out debugging messages as RPC client connects to the server
# Let's create some objects...
red_sphere = wc.create_object('ssph', [ 0.1 ], [ -1, 1, 1 ], [ 1, 0, 0 ])
green_box = wc.create_object('sbox', [ 0.2, 0.2, 0.2 ], [ 0, 1, 1 ], [ 0, 1, 0 ])
blue_cylinder = wc.create_object('scyl', [ 0.1, 0.1 ], [ 1, 1, 1 ], [ 0, 0, 1 ])
# ... move them around ...
wc.move_object(red_sphere, [ 1, 0.5, 1 ])
wc.move_object(green_box, [ -1, 0.5, 1 ])
wc.move_object(blue_cylinder, [ 0, 0.5, 1 ])
# ... and ask for their positions ...
print wc.get_object_location(green_box)
# Now let's cleanup
wc.del_all()
# No, wait, I want more!
monolith = wc.create_object('sbox', [ 0.8, 1.8, 0.2 ], [ 0, 0.9, 1 ], [ 0, 0, 0 ])
# OK, this time we're done for good - delete the wc object, what will invoke its destructor
del wc

This file can be edited at doc/python-simworld-control.dox