6.1.2. Tools for Turtlesim¶
Note
VulcanAI is currently in active development, and new features and improvements are being added regularly. Current version is in Beta stage.
The first step when creating new tools for VulcanAI agents is to decide:
What does the tool need to accomplish.
What are the inputs that the tool is going to receive.
What should be the output that the tool will generate.
By answering these questions, we can define the name, description, input_schema and output_schema of the tool, which are essential for the agent to understand how to use it.
For this tutorial, we will create a tool for most basic functionalities of TurtleSim, including services for resetting the environment or spawning new turtles and classic pub-sub topology to move the turtles.
First, create a ROS 2 package inside your workspace (make sure to replace <your_workspace> with the path to your ROS 2 workspace):
cd ~/<your_workspace>/src && \
ros2 pkg create --build-type ament_python vulcanai_turtlesim_demo && \
cd vulcanai_turtlesim_demo
6.1.2.2. Define the tools¶
TurtleSim exposes multiple services and topics that can be used to control the turtles and interact with the environment. For this tutorial, tools can be obtained directly from Vulcanexus repository. We will only explain the most relevant parts of the code, illustrating each type of tool:
An AtomicTool that calls a service to spawn a turtle.
An AtomicTool that creates a publisher to move a turtle.
An AtomicTool that creates a subscription to get the current state of a turtle.
A CompositeTool that combines multiple AtomicTools to draw a rectangle.
Run the following commands to download the tools (make sure to replace <your_workspace> with the path to your ROS 2 workspace):
cd ~/<your_workspace>/src/vulcanai_turtlesim_demo/vulcanai_turtlesim_demo && \
wget https://https://raw.githubusercontent.com/eProsima/vulcanexus/refs/heads/main/code/vulcanai_turtlesim/vulcanai_turtlesim_demo/vulcanai_turtlesim_demo/turtlesim_tools.py
6.1.2.2.1. Examining a tool with a service client¶
To explain how to implement a tool that calls a service, we will use the SpawnTurtleTool as an example.
This tool will call the /spawn service to create a new turtle in the environment, which uses the turtlesim_msgs/srv/Spawn service type.
The tool is marked with the @vulcanai_tool decorator, which is needed to register the tool in VulcanAI.
It inherits from the AtomicTool class, as it only performs a single action and does not have any dependencies on other tools.
@vulcanai_tool
class SpawnTurtleTool(AtomicTool):
name = "spawn_turtle"
description = "Spawn a new turtle in turtlesim. 'spawned' indicates success."
tags = ["turtlesim", "spawn", "add", "create", "service"]
input_schema = [
("x", "float"),
("y", "float"),
("theta", "float"),
("name", "string"),
]
output_schema = {"name": "string", "spawned": "bool"}
The name and description attributes are used by the agent to understand the purpose of the tool.
The tags attribute is optional, but here it is used to categorize the tool and also add keywords that can help the agent to find it when needed.
The input_schema and output_schema attributes define the inputs and outputs of the tool.
In this case, the tool requires the name of the turtle to be spawned, as well as its initial position and orientation.
The output is the name of the spawned turtle and a boolean indicating if the operation was successful.
The run() method contains the logic of the tool.
It first retrieves the shared ROS 2 node from the blackboard, and then calls the already explained get_client() method of the shared node for the /spawn service.
def run(self, **kwargs):
node = self.bb.get("main_node", None)
if node is None:
raise Exception("Could not find shared node, aborting...")
client = node.get_client(Spawn, "spawn")
if not client.wait_for_service(timeout_sec=5.0):
raise Exception("Service not available, aborting...")
Note that if the shared_node is not found or the service is not available, the tool will raise an exception and abort the operation. Exceptions are handled by VulcanAI, automatically assigning a null output to the tool and informing the agent about the error.
Then, a request object is created and filled with the input data, which is retrieved from the kwargs dictionary. Finally, the service is called and the future is awaited until the response is received before returning the output.
req = Spawn.Request()
req.x = float(kwargs.get("x", 5.54))
req.y = float(kwargs.get("y", 5.54))
req.theta = float(kwargs.get("theta", 0.0))
req.name = kwargs.get("name", "")
with node.node_lock:
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=5.0)
result = future.result()
if result is None:
raise Exception("Service call failed timeout, aborting...")
return {"name": result.name, "spawned": True}
The output is returned as a dictionary, matching the defined output schema.
The same pattern is applied to other tools that call services, such as the KillTurtleTool, ResetTurtlesimTool, ClearTurtlesimTool, AbsoluteTeleportTurtleTool and RelativeTeleportTurtleTool.
6.1.2.2.2. Examining a tool with a publisher¶
The MoveTurtleTool uses a publisher to send velocity commands to a turtle.
The tool is defined in the same way as the previous one, with the necessary basic attributes to inform the agent about its purpose and how to use it.
@vulcanai_tool
class MoveTurtleTool(AtomicTool):
name = "move_turtle"
description = "Move the turtle 'name' with 'linear' and 'angular' velocity by publishing the message 'duration' times (seconds). Use zero velocity to stop. 'success' indicates if the command was sent correctly."
tags = ["turtlesim", "move", "velocity", "cmd_vel", "stop", "draw"]
input_schema = [
("name", "string"),
("linear", "float"),
("angular", "float"),
("duration", "int"),
]
output_schema = [{"success": "bool"}]
The run method differs from the previous one, as it needs to create a publisher instead of a service client.
First, the input parameters are retrieved from the kwargs dictionary.
The name is used to create the topic name, which follows the pattern /<turtle_name>/cmd_vel.
Linear and angular velocities are used to create a Twist message that will be published to the topic.
def run(self, **kwargs):
node = self.bb.get("main_node", None)
if node is None:
raise Exception("Could not find shared node, aborting...")
if not kwargs.get("name", ""):
print("No turtle name provided, aborting...")
return {"success": False}
name = kwargs.get('name', "")
pub = node.get_publisher(Twist, f"/{name}/cmd_vel")
msg = Twist()
msg.linear.x = float(kwargs.get("linear", 0.0))
msg.angular.z = float(kwargs.get("angular", 0.0))
Then, the generated message is published to the topic.
A for loop is used to publish the message multiple times, based on the duration input parameter.
for idx in range(int(kwargs.get("duration", 1.0))):
node.get_logger().info(f"Publishing message {idx + 1} to topic /{name}/cmd_vel: linear={msg.linear.x}, angular={msg.angular.z}")
pub.publish(msg)
time.sleep(1)
return {"success": True}
6.1.2.2.3. Examining a tool with a subscriber¶
The GetTurtlePoseTool uses a subscription to get the current pose of a turtle.
The tool is defined in the same way as the previous ones, with the necessary basic attributes to inform the agent about its purpose and how to use it.
@vulcanai_tool
class GetTurtlePose(AtomicTool):
name = "get_turtle_pose"
description = "Get the current pose of a turtle 'name' in turtlesim. Fails if something goes wrong."
tags = ["turtlesim", "pose", "position", "location"]
input_schema = [
("name", "string"),
]
output_schema = {
"name": "string",
"x": "float",
"y": "float",
"theta": "float",
}
In this case, the run() method needs to call the wait_for_message(), which creates a subscription to the topic and waits for a single message to be received.
Note that this method is the one responsible for the spinning of the node until a message is received, so we don’t need to call rclpy.spin() or similar methods.
The output of the method is the received message, which is then used to fill the output dictionary.
def run(self, name: str = ""):
node = self.bb.get("main_node", None)
if node is None:
raise Exception("Could not find shared node, aborting...")
name = name or "turtle1"
topic = f"/{name}/pose"
msg = node.wait_for_message(Pose, topic, timeout_sec=5.0)
if msg is None:
print(f"Could not get pose for turtle '{name}', aborting...")
raise Exception("No pose message received")
return {"name": name, "x": msg.x, "y": msg.y, "theta": msg.theta}
Take a look at the definition of the method run(self, name: str = "").
The input parameter is defined as a regular method parameter instead of using the kwargs dictionary, which is also valid and more convenient when there are few input parameters.
6.1.2.2.4. Examining a tool with a CompositeTool¶
The last tool we will examine is the DrawRectangleTool, which is a composite tool that combines multiple atomic tools to draw a rectangle with a turtle.
The definition of the tool remains almost the same as the previous ones, adding only one new attribute: dependencies.
@vulcanai_tool
class DrawRectangleTool(CompositeTool):
name = "draw_rectangle"
description = "Move the turtle 'name' in a rectangular shape. 'success' indicates if rectangle was drawn successfully."
tags = ["turtlesim", "draw", "rectangle", "move", "cmd_vel"]
input_schema = [
("name", "string"),
("size", "float"),
]
output_schema = {"success": "bool"}
dependencies = ["move_turtle", "relative_teleport_turtle"]
The dependencies attribute is a list of tool names that will be used during the execution of the CompositeTool.
In this case, the tool depends on the MoveTurtleTool and RelativeTeleportTurtleTool to move the turtle and teleport it to the starting position.
Dependencies are indicated by using the exact name of the tool, as defined in the name attribute of the tool.
In this example, dependencies are move_turtle and relative_teleport_turtle.
The definition of the run() is similar to the previous tools, as well as the retrieval of the shared node from the blackboard.
def run(self, name: str = "", size: float = 2.0):
node = self.bb.get("main_node", None)
if node is None:
raise Exception("Could not find shared node, aborting...")
if not name:
print("No turtle name provided, aborting...")
return {"success": False}
# Access the instances of the dependent tools and set their blackboards
move_tool = self.resolved_deps.get("move_turtle", None)
tp_relative_tool = self.resolved_deps.get("relative_teleport_turtle", None)
move_tool.bb = self.bb
tp_relative_tool.bb = self.bb
However, the run() method of a CompositeTool adds new behavior, as it needs to reuse the functionality of the dependent tools.
To do so, CompositeTools owns an attribute called self.resolved_deps, which is a dictionary that contains a reference to the dependent tools, indexed by their name.
This dictionary is automatically filled by VulcanAI when the tool is instantiated, so we can directly use it in the run() method.
Note lines:
move_tool.bb = self.bb
tp_relative_tool.bb = self.bb
It is important to remark how we are manually assigning the current blackboard to the dependent tools. This step is usually done by the VulcanAI manager when tools are called, but in this case we need to do it manually, as the CompositeTool is the one being called by the agent, and not the dependent tools.
The Rectangle will be drawn by moving the turtle in a straight line and then turning 90 degrees, repeating this process four times but with different distances for the sides.
Therefore, we create 3 sets of arguments that will be passed to the move_tool to move the turtle the desired distance and to tp_relative_tool to teleport it a rotation of 90 degrees.
name = name or "turtle1"
size = size
linear_speed = 1.0
angular_turn = pi / 2
# Arguments are passed as dictionaries when calling directly other tools
side_1_args = {
"name": name,
"linear": linear_speed,
"angular": 0.0,
"duration": size / linear_speed,
}
side_2_args = {
"name": name,
"linear": linear_speed,
"angular": 0.0,
"duration": (size - 1) / linear_speed,
}
turn_args = {
"name": name,
"linear": 0.0,
"angular": angular_turn,
}
Finally, we call the run() method of the dependent tools with the corresponding arguments, and return a success output.
We could check the output of each tool call to ensure that the operation was successful, but for simplicity we will assume that everything works as expected.
# Pass arguments as kwargs
move_tool.run(**side_1_args)
tp_relative_tool.run(**turn_args)
move_tool.run(**side_2_args)
tp_relative_tool.run(**turn_args)
move_tool.run(**side_1_args)
tp_relative_tool.run(**turn_args)
move_tool.run(**side_2_args)
tp_relative_tool.run(**turn_args)
return {"success": True}
6.1.2.3. Next steps¶
In this tutorial we have learned how to create custom tools for VulcanAI to control TurtleSim, using a shared ROS 2 node to create and reuse service clients and publishers. We have examined different types of tools, including AtomicTools that call services, create publishers and subscriptions, and a CompositeTool that combines multiple AtomicTools to accomplish a more complex task.
Now, we are missing the main script that will be used to run the VulcanAI console and manage the interaction between the user, the agent, and the tools. Continue with the VulcanAI Console and manager tutorial to learn how to create the main script and run the console.