6.1.2. Tools for Turtlesim

Note

VulcanAI is currently in active development, and new features and improvements are being added regularly. Current version is in Beta stage.

The first step when creating new tools for VulcanAI agents is to decide:

  • What does the tool need to accomplish.

  • What are the inputs that the tool is going to receive.

  • What should be the output that the tool will generate.

By answering these questions, we can define the name, description, input_schema and output_schema of the tool, which are essential for the agent to understand how to use it.

For this tutorial, we will create a tool for most basic functionalities of TurtleSim, including services for resetting the environment or spawning new turtles and classic pub-sub topology to move the turtles. First, create a ROS 2 package inside your workspace (make sure to replace <your_workspace> with the path to your ROS 2 workspace):

cd ~/<your_workspace>/src && \
ros2 pkg create --build-type ament_python vulcanai_turtlesim_demo && \
cd vulcanai_turtlesim_demo

6.1.2.1. Sharing a single ROS 2 Node between tools

In order to be able to interact with the ROS 2 environment, the tools need to have access to a ROS 2 node, as they will need to create publishers, subscribers or service clients. To avoid creating a new node for each tool, which would be highly inefficient as it would require the discovery of a new entity in the ROS 2 network for every single tool call, we will create a single ROS 2 node and share it between all the tools.

In this way, all the tools will be able to reuse the same entities created by other calls, and it will also exemplify how to share data between tools by means of the blackboard. The blackboard is a shared memory space where tools can write and read data, allowing them to share information without the need for the LLM to reason about how to pass data between tools. To get a better understanding of this concept, please refer to the Sharing data between tools section in the VulcanAI Tools page.

To create the shared ROS 2 node, create a new file called ros2_node.py inside the vulcanai_turtlesim_demo/vulcanai_turtlesim_demo folder with the following content:

import threading

import rclpy
from rclpy.node import Node
from rclpy.task import Future
class SharedNode(Node):
    def __init__(self, name: str = "vulcanai_shared_node"):
        super().__init__(name)
        # Dictionary to store created clients
        self._vulcan_clients = {}
        # Dictionary to store created publishers
        self._vulcan_publishers = {}

        # Ensure entities creation is thread-safe.
        self.node_lock = threading.Lock()

    def get_client(self, srv_type, srv_name):
        """
        Get a cached client for the specified service type and name or
        create a new one if it doesn't exist.
        """
        key = (srv_type, srv_name)
        with self.node_lock:
            if key not in self._vulcan_clients:
                client = self.create_client(srv_type, srv_name)
                self._vulcan_clients[key] = client
                self.get_logger().info(f"Created new client for {srv_name}")
            return self._vulcan_clients[key]

    def get_publisher(self, msg_type, topic_name):
        """
        Get a cached publisher for the specified message type and topic name or
        create a new one if it doesn't exist.
        """
        key = (msg_type, topic_name)
        with self.node_lock:
            if key not in self._vulcan_publishers:
                publisher = self.create_publisher(msg_type, topic_name, 10)
                self._vulcan_publishers[key] = publisher
                self.get_logger().info(f"Created new publisher for {topic_name}")
            return self._vulcan_publishers[key]

    def wait_for_message(self, msg_type, topic: str, timeout_sec: float = None):
        """
        Block until a message is received or timeout expires.
        Subscriptions are created on demand and destroyed after use to avoid
        handling spins and callbacks in a separate thread.
        """
        future = Future()

        def callback(msg):
            if not future.done():
                future.set_result(msg)

        sub = self.create_subscription(msg_type, topic, callback, 10)

        rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec)
        self.destroy_subscription(sub)

        if future.done():
            return future.result()
        return None

6.1.2.1.1. Examine the code

This code creates a ROS 2 node that can be shared between multiple tools, exposing methods to provide and reuse service clients and publishers.

import threading

import rclpy
from rclpy.node import Node
from rclpy.task import Future

We import the basic threading module to create a lock that will ensure that the creation of new entities is thread-safe. If you don’t know how locks work, just consider them as a protection mechanism needed to avoid errors when multiple threads access the same shared resource simultaneously. It will avoid the creation of duplicate entities when multiple tools try to create them at the same time.

The other imports are related to ROS 2, including the rclpy library, the Node class and a Future object that will be used to wait for messages.

class SharedNode(Node):
    def __init__(self, name: str = "vulcanai_shared_node"):
        super().__init__(name)
        # Dictionary to store created clients
        self._vulcan_clients = {}
        # Dictionary to store created publishers
        self._vulcan_publishers = {}

        # Ensure entities creation is thread-safe.
        self.node_lock = threading.Lock()

The SharedNode class inherits from the Node class, and it initializes the node with the name vulcanai_shared_node. Then, two dictionaries are created to store a reference to created service clients and publishers, so they can be reused later. Finally, a lock from the threading module is created to ensure thread-safety.

    def get_client(self, srv_type, srv_name):
        """
        Get a cached client for the specified service type and name or
        create a new one if it doesn't exist.
        """
        key = (srv_type, srv_name)
        with self.node_lock:
            if key not in self._vulcan_clients:
                client = self.create_client(srv_type, srv_name)
                self._vulcan_clients[key] = client
                self.get_logger().info(f"Created new client for {srv_name}")
            return self._vulcan_clients[key]

The method get_client creates a new service client if it doesn’t exist yet, or returns the existing one if it has already been created. Everything under the with self.lock: statement is protected by the lock, ensuring that no duplicated service clients are created.

    def get_publisher(self, msg_type, topic_name):
        """
        Get a cached publisher for the specified message type and topic name or
        create a new one if it doesn't exist.
        """
        key = (msg_type, topic_name)
        with self.node_lock:
            if key not in self._vulcan_publishers:
                publisher = self.create_publisher(msg_type, topic_name, 10)
                self._vulcan_publishers[key] = publisher
                self.get_logger().info(f"Created new publisher for {topic_name}")
            return self._vulcan_publishers[key]

The method get_publisher works in a similar way, creating or returning an existing publisher.

    def wait_for_message(self, msg_type, topic: str, timeout_sec: float = None):
        """
        Block until a message is received or timeout expires.
        Subscriptions are created on demand and destroyed after use to avoid
        handling spins and callbacks in a separate thread.
        """
        future = Future()

        def callback(msg):
            if not future.done():
                future.set_result(msg)

        sub = self.create_subscription(msg_type, topic, callback, 10)

        rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec)
        self.destroy_subscription(sub)

        if future.done():
            return future.result()
        return None

Finally, the method wait_for_message is a utility function that allows waiting for a single message from a topic, which is needed for tools that will use subscriptions. Subscriptions, unlike publishers or service clients, will not be reused.

To control TurtleSim we do not require keeping a subscription alive and handling its callbacks between tool calls, as we will only need to get the current state of the turtles when requested by the user. Therefore, we will create a new subscription every time we need to get a message from a topic, spin the node until we get a message, and then destroy it after receiving the first message. The Future object is used in the subscriber callback to wait for the message to be received.

6.1.2.2. Define the tools

TurtleSim exposes multiple services and topics that can be used to control the turtles and interact with the environment. For this tutorial, tools can be obtained directly from Vulcanexus repository. We will only explain the most relevant parts of the code, illustrating each type of tool:

  • An AtomicTool that calls a service to spawn a turtle.

  • An AtomicTool that creates a publisher to move a turtle.

  • An AtomicTool that creates a subscription to get the current state of a turtle.

  • A CompositeTool that combines multiple AtomicTools to draw a rectangle.

Run the following commands to download the tools (make sure to replace <your_workspace> with the path to your ROS 2 workspace):

cd ~/<your_workspace>/src/vulcanai_turtlesim_demo/vulcanai_turtlesim_demo && \
wget https://https://raw.githubusercontent.com/eProsima/vulcanexus/refs/heads/main/code/vulcanai_turtlesim/vulcanai_turtlesim_demo/vulcanai_turtlesim_demo/turtlesim_tools.py

6.1.2.2.1. Examining a tool with a service client

To explain how to implement a tool that calls a service, we will use the SpawnTurtleTool as an example. This tool will call the /spawn service to create a new turtle in the environment, which uses the turtlesim_msgs/srv/Spawn service type.

The tool is marked with the @vulcanai_tool decorator, which is needed to register the tool in VulcanAI. It inherits from the AtomicTool class, as it only performs a single action and does not have any dependencies on other tools.

@vulcanai_tool
class SpawnTurtleTool(AtomicTool):
    name = "spawn_turtle"
    description = "Spawn a new turtle in turtlesim. 'spawned' indicates success."
    tags = ["turtlesim", "spawn", "add", "create", "service"]
    input_schema = [
        ("x", "float"),
        ("y", "float"),
        ("theta", "float"),
        ("name", "string"),
    ]
    output_schema = {"name": "string", "spawned": "bool"}

The name and description attributes are used by the agent to understand the purpose of the tool. The tags attribute is optional, but here it is used to categorize the tool and also add keywords that can help the agent to find it when needed. The input_schema and output_schema attributes define the inputs and outputs of the tool. In this case, the tool requires the name of the turtle to be spawned, as well as its initial position and orientation. The output is the name of the spawned turtle and a boolean indicating if the operation was successful.

The run() method contains the logic of the tool. It first retrieves the shared ROS 2 node from the blackboard, and then calls the already explained get_client() method of the shared node for the /spawn service.

    def run(self, **kwargs):
        node = self.bb.get("main_node", None)
        if node is None:
            raise Exception("Could not find shared node, aborting...")

        client = node.get_client(Spawn, "spawn")
        if not client.wait_for_service(timeout_sec=5.0):
            raise Exception("Service not available, aborting...")

Note that if the shared_node is not found or the service is not available, the tool will raise an exception and abort the operation. Exceptions are handled by VulcanAI, automatically assigning a null output to the tool and informing the agent about the error.

Then, a request object is created and filled with the input data, which is retrieved from the kwargs dictionary. Finally, the service is called and the future is awaited until the response is received before returning the output.

        req = Spawn.Request()
        req.x = float(kwargs.get("x", 5.54))
        req.y = float(kwargs.get("y", 5.54))
        req.theta = float(kwargs.get("theta", 0.0))
        req.name = kwargs.get("name", "")

        with node.node_lock:
            future = client.call_async(req)
            rclpy.spin_until_future_complete(node, future, timeout_sec=5.0)
            result = future.result()
            if result is None:
                raise Exception("Service call failed timeout, aborting...")

        return {"name": result.name, "spawned": True}

The output is returned as a dictionary, matching the defined output schema.

The same pattern is applied to other tools that call services, such as the KillTurtleTool, ResetTurtlesimTool, ClearTurtlesimTool, AbsoluteTeleportTurtleTool and RelativeTeleportTurtleTool.

6.1.2.2.2. Examining a tool with a publisher

The MoveTurtleTool uses a publisher to send velocity commands to a turtle. The tool is defined in the same way as the previous one, with the necessary basic attributes to inform the agent about its purpose and how to use it.

@vulcanai_tool
class MoveTurtleTool(AtomicTool):
    name = "move_turtle"
    description = "Move the turtle 'name' with 'linear' and 'angular' velocity by publishing the message 'duration' times (seconds). Use zero velocity to stop. 'success' indicates if the command was sent correctly."
    tags = ["turtlesim", "move", "velocity", "cmd_vel", "stop", "draw"]
    input_schema = [
        ("name", "string"),
        ("linear", "float"),
        ("angular", "float"),
        ("duration", "int"),
    ]
    output_schema = [{"success": "bool"}]

The run method differs from the previous one, as it needs to create a publisher instead of a service client. First, the input parameters are retrieved from the kwargs dictionary. The name is used to create the topic name, which follows the pattern /<turtle_name>/cmd_vel. Linear and angular velocities are used to create a Twist message that will be published to the topic.

    def run(self, **kwargs):
        node = self.bb.get("main_node", None)
        if node is None:
            raise Exception("Could not find shared node, aborting...")
        if not kwargs.get("name", ""):
            print("No turtle name provided, aborting...")
            return {"success": False}

        name = kwargs.get('name', "")
        pub = node.get_publisher(Twist, f"/{name}/cmd_vel")
        msg = Twist()
        msg.linear.x = float(kwargs.get("linear", 0.0))
        msg.angular.z = float(kwargs.get("angular", 0.0))

Then, the generated message is published to the topic. A for loop is used to publish the message multiple times, based on the duration input parameter.

        for idx in range(int(kwargs.get("duration", 1.0))):
            node.get_logger().info(f"Publishing message {idx + 1} to topic /{name}/cmd_vel: linear={msg.linear.x}, angular={msg.angular.z}")
            pub.publish(msg)
            time.sleep(1)
        return {"success": True}

6.1.2.2.3. Examining a tool with a subscriber

The GetTurtlePoseTool uses a subscription to get the current pose of a turtle. The tool is defined in the same way as the previous ones, with the necessary basic attributes to inform the agent about its purpose and how to use it.

@vulcanai_tool
class GetTurtlePose(AtomicTool):
    name = "get_turtle_pose"
    description = "Get the current pose of a turtle 'name' in turtlesim. Fails if something goes wrong."
    tags = ["turtlesim", "pose", "position", "location"]
    input_schema = [
        ("name", "string"),
    ]
    output_schema = {
        "name": "string",
        "x": "float",
        "y": "float",
        "theta": "float",
    }

In this case, the run() method needs to call the wait_for_message(), which creates a subscription to the topic and waits for a single message to be received. Note that this method is the one responsible for the spinning of the node until a message is received, so we don’t need to call rclpy.spin() or similar methods. The output of the method is the received message, which is then used to fill the output dictionary.

    def run(self, name: str = ""):
        node = self.bb.get("main_node", None)
        if node is None:
            raise Exception("Could not find shared node, aborting...")
        name = name or "turtle1"
        topic = f"/{name}/pose"

        msg = node.wait_for_message(Pose, topic, timeout_sec=5.0)
        if msg is None:
            print(f"Could not get pose for turtle '{name}', aborting...")
            raise Exception("No pose message received")

        return {"name": name, "x": msg.x, "y": msg.y, "theta": msg.theta}

Take a look at the definition of the method run(self, name: str = ""). The input parameter is defined as a regular method parameter instead of using the kwargs dictionary, which is also valid and more convenient when there are few input parameters.

6.1.2.2.4. Examining a tool with a CompositeTool

The last tool we will examine is the DrawRectangleTool, which is a composite tool that combines multiple atomic tools to draw a rectangle with a turtle. The definition of the tool remains almost the same as the previous ones, adding only one new attribute: dependencies.

@vulcanai_tool
class DrawRectangleTool(CompositeTool):
    name = "draw_rectangle"
    description = "Move the turtle 'name' in a rectangular shape. 'success' indicates if rectangle was drawn successfully."
    tags = ["turtlesim", "draw", "rectangle", "move", "cmd_vel"]
    input_schema = [
        ("name", "string"),
        ("size", "float"),
    ]
    output_schema = {"success": "bool"}
    dependencies = ["move_turtle", "relative_teleport_turtle"]

The dependencies attribute is a list of tool names that will be used during the execution of the CompositeTool. In this case, the tool depends on the MoveTurtleTool and RelativeTeleportTurtleTool to move the turtle and teleport it to the starting position. Dependencies are indicated by using the exact name of the tool, as defined in the name attribute of the tool. In this example, dependencies are move_turtle and relative_teleport_turtle.

The definition of the run() is similar to the previous tools, as well as the retrieval of the shared node from the blackboard.

    def run(self, name: str = "", size: float = 2.0):
        node = self.bb.get("main_node", None)
        if node is None:
            raise Exception("Could not find shared node, aborting...")
        if not name:
            print("No turtle name provided, aborting...")
            return {"success": False}

        # Access the instances of the dependent tools and set their blackboards
        move_tool = self.resolved_deps.get("move_turtle", None)
        tp_relative_tool = self.resolved_deps.get("relative_teleport_turtle", None)
        move_tool.bb = self.bb
        tp_relative_tool.bb = self.bb

However, the run() method of a CompositeTool adds new behavior, as it needs to reuse the functionality of the dependent tools. To do so, CompositeTools owns an attribute called self.resolved_deps, which is a dictionary that contains a reference to the dependent tools, indexed by their name. This dictionary is automatically filled by VulcanAI when the tool is instantiated, so we can directly use it in the run() method.

Note lines:

        move_tool.bb = self.bb
        tp_relative_tool.bb = self.bb

It is important to remark how we are manually assigning the current blackboard to the dependent tools. This step is usually done by the VulcanAI manager when tools are called, but in this case we need to do it manually, as the CompositeTool is the one being called by the agent, and not the dependent tools.

The Rectangle will be drawn by moving the turtle in a straight line and then turning 90 degrees, repeating this process four times but with different distances for the sides. Therefore, we create 3 sets of arguments that will be passed to the move_tool to move the turtle the desired distance and to tp_relative_tool to teleport it a rotation of 90 degrees.

        name = name or "turtle1"
        size = size
        linear_speed = 1.0
        angular_turn = pi / 2

        # Arguments are passed as dictionaries when calling directly other tools
        side_1_args = {
            "name": name,
            "linear": linear_speed,
            "angular": 0.0,
            "duration": size / linear_speed,
        }
        side_2_args = {
            "name": name,
            "linear": linear_speed,
            "angular": 0.0,
            "duration": (size - 1) / linear_speed,
        }
        turn_args = {
            "name": name,
            "linear": 0.0,
            "angular": angular_turn,
        }

Finally, we call the run() method of the dependent tools with the corresponding arguments, and return a success output. We could check the output of each tool call to ensure that the operation was successful, but for simplicity we will assume that everything works as expected.

        # Pass arguments as kwargs
        move_tool.run(**side_1_args)
        tp_relative_tool.run(**turn_args)
        move_tool.run(**side_2_args)
        tp_relative_tool.run(**turn_args)
        move_tool.run(**side_1_args)
        tp_relative_tool.run(**turn_args)
        move_tool.run(**side_2_args)
        tp_relative_tool.run(**turn_args)

        return {"success": True}

6.1.2.3. Next steps

In this tutorial we have learned how to create custom tools for VulcanAI to control TurtleSim, using a shared ROS 2 node to create and reuse service clients and publishers. We have examined different types of tools, including AtomicTools that call services, create publishers and subscriptions, and a CompositeTool that combines multiple AtomicTools to accomplish a more complex task.

Now, we are missing the main script that will be used to run the VulcanAI console and manage the interaction between the user, the agent, and the tools. Continue with the VulcanAI Console and manager tutorial to learn how to create the main script and run the console.