Skip to content

Examples

Code examples below are located in examples directory.

Basic usage

import asyncio
from odrive_can import ODriveCAN, CanMsg
from odrive_can.tools import UDP_Client

AXIS_ID = 0
INTERFACE = "vcan0"
SETPOINT = 50

udp = UDP_Client()  # send data to UDP server for plotting


def feedback_callback_fcn(msg: CanMsg, caller: ODriveCAN):
    """called on position estimate"""
    print(msg)
    udp.send(msg.data)


async def main():
    """connect to odrive"""
    drv = ODriveCAN(axis_id=AXIS_ID, interface=INTERFACE)

    # set up callback (optional)
    drv.feedback_callback = feedback_callback_fcn

    # start
    await drv.start()

    # check errors (raises exception if any)
    drv.check_errors()

    # set controller mode
    drv.set_controller_mode("POSITION_CONTROL", "POS_FILTER")

    # reset encoder
    drv.set_linear_count(0)

    # set axis state
    await drv.set_axis_state("CLOSED_LOOP_CONTROL")

    # set position gain
    drv.set_pos_gain(3.0)

    for _ in range(2):
        # setpoint
        drv.set_input_pos(SETPOINT)
        await asyncio.sleep(5.0)
        drv.set_input_pos(-SETPOINT)
        await asyncio.sleep(5.0)

    drv.set_input_pos(0.0)


asyncio.run(main())

Driver demo

#!/usr/bin/env python3
"""
 demonstrate usage of OdriveCAN class

 Copyright (c) 2023 ROX Automation - Jev Kuznetsov
"""

import asyncio
import logging

import coloredlogs  # type: ignore

from odrive_can import LOG_FORMAT, TIME_FORMAT
from odrive_can.odrive import CommandId, ODriveCAN
from odrive_can.tools import UDP_Client
from odrive_can.timer import timeit

log = logging.getLogger()
coloredlogs.install(level="DEBUG", fmt=LOG_FORMAT, datefmt=TIME_FORMAT)


AXIS_ID = 1
INTERFACE = "slcan0"

udp = UDP_Client()


def position_callback(data):
    """position callback, send data to UDP client"""
    udp.send(data)


async def get_bus_voltage_current(drv: ODriveCAN):
    """request bus voltage and current"""

    @timeit
    async def request(drv: ODriveCAN):
        data = await drv.get_bus_voltage_current()
        log.info(f"{data=}")

    log.info("Requesting bus voltage and current")
    # request bus voltage and current
    for _ in range(4):
        log.info("------------------------------")
        try:
            await request(drv)

        except Exception as e:  # pylint: disable=broad-except
            log.warning(e)
        await asyncio.sleep(0.1)


async def position_control(drv: ODriveCAN):
    """simple position control loop"""

    log.info("-----------Running position control-----------------")

    # set positiion to zero
    drv.set_linear_count(0)

    drv.set_controller_mode("POSITION_CONTROL", "POS_FILTER")
    drv.set_pos_gain(3.0)

    # setpoint
    setpoint = 20
    duration = 5

    for _ in range(4):
        try:
            drv.check_errors()
            log.info(f"Setting position setpoint to {setpoint}")
            drv.set_input_pos(setpoint)
            await asyncio.sleep(duration)
            log.info(f"Setting position setpoint to {-setpoint}")
            drv.set_input_pos(-setpoint)
            await asyncio.sleep(duration)

        except Exception as e:  # pylint: disable=broad-except
            log.warning(e)


async def main():
    drv = ODriveCAN(axis_id=AXIS_ID, interface=INTERFACE)
    drv.feedback_callback = position_callback
    await drv.start()

    # log some messages
    await asyncio.sleep(1.0)

    await drv.set_axis_state("CLOSED_LOOP_CONTROL")

    # set log level to INFO
    coloredlogs.set_level(logging.INFO)

    # ignore encoder estimate messages
    drv.ignore_message(CommandId.ENCODER_ESTIMATE)

    # request bus voltage and current
    await get_bus_voltage_current(drv)

    # enable encoder feedback
    drv.allow_message(CommandId.ENCODER_ESTIMATE)

    # reset encoder
    drv.set_linear_count(0)
    await asyncio.sleep(1.0)

    # run position control
    await position_control(drv)

    # shutdown
    drv.stop()
    await asyncio.sleep(0.5)


if __name__ == "__main__":
    asyncio.run(main())

Position control

#!/usr/bin/env python3
"""
 Demonstration of position control using CAN interface

 Copyright (c) 2023 ROX Automation - Jev Kuznetsov
"""

import asyncio
import time
import logging
from odrive_can.odrive import ODriveCAN, CanMsg
from odrive_can.tools import UDP_Client
from odrive_can.utils import run_main_async

SETTLE_TIME = 5.0  # settle time in [s]


log = logging.getLogger("pos_ctl")
udp = UDP_Client()


def feedback_callback(msg: CanMsg, caller: ODriveCAN):
    """position callback, send data to UDP client"""
    data = msg.data
    data["setpoint"] = caller.setpoint
    data["ts"] = time.time()
    udp.send({f"axis_{msg.axis_id}": data}, add_timestamp=False)


async def configure_controller(
    drv: ODriveCAN, input_mode: str = "POS_FILTER", accel: float = 120.0
):
    """setup control parameters"""
    log.info("Configuring controller")

    drv.clear_errors()

    # set parameters
    drv.set_pos_gain(5.0)

    drv.set_traj_vel_limit(40.0)
    drv.set_traj_accel_limits(accel, accel)

    # reset encoder
    drv.set_linear_count(0)

    drv.set_controller_mode("POSITION_CONTROL", input_mode)

    # set position control mode
    await drv.set_axis_state("CLOSED_LOOP_CONTROL")
    drv.check_errors()


async def main_loop(
    drv: ODriveCAN, input_mode: str = "POS_FILTER", amplitude: float = 40.0
):
    """position demo"""

    log.info("-----------Running position control-----------------")

    # start running

    drv.set_input_pos(amplitude)
    await asyncio.sleep(2)

    idx = 0
    try:
        while True:
            drv.check_errors()

            drv.set_input_pos(amplitude)
            idx += 1
            await asyncio.sleep(SETTLE_TIME)
            amplitude = -amplitude

    except KeyboardInterrupt:
        log.info("Stopping")
    finally:
        drv.stop()
        await asyncio.sleep(0.5)


async def main(
    axis_id: int,
    interface: str,
    input_mode: str = "POS_FILTER",
    amplitude: float = 40.0,
):
    print("Starting position control demo, press CTRL+C to exit")
    log.info(f"{interface=} {axis_id=} {input_mode=} {amplitude=}")
    drv = ODriveCAN(axis_id, interface)
    drv.feedback_callback = feedback_callback

    await drv.start()
    await asyncio.sleep(0.5)
    # drv.reboot()
    # await asyncio.sleep(1.0)

    drv.check_alive()
    log.info("Clearing errors")
    drv.clear_errors()
    drv.check_errors()

    await configure_controller(drv, input_mode)
    await main_loop(drv, input_mode, amplitude)  # Added back the main loop


if __name__ == "__main__":
    run_main_async(main(1, "can0"))

Velocity control

#!/usr/bin/env python3
"""
 Demonstration of velocity control using CAN interface

 Copyright (c) 2023 ROX Automation - Jev Kuznetsov
"""

import asyncio
import logging

from odrive_can.odrive import ODriveCAN
from odrive_can.setpoints import sawtooth_generator
from odrive_can.tools import UDP_Client
from odrive_can.utils import run_main_async

from odrive_can.examples.position_control import feedback_callback

VERSION = "2024.02.17"

SETPOINT_DELAY = 0.1


log = logging.getLogger("pos_ctl")
udp = UDP_Client()


async def configure_controller(drv: ODriveCAN):
    """setup control parameters"""

    # reset encoder
    drv.set_linear_count(0)

    drv.set_controller_mode("VELOCITY_CONTROL", "VEL_RAMP")

    # set position control mode
    await drv.set_axis_state("CLOSED_LOOP_CONTROL")
    await drv.wait_for_heartbeat()
    drv.check_errors()


async def main(axis_id: int, interface: str, amplitude: float = 40.0):
    """velocity control demo"""

    log.info(f"Running velocity control demo on axis {axis_id} using {interface}")

    drv = ODriveCAN(axis_id, interface)

    drv.feedback_callback = feedback_callback
    await drv.start()

    await asyncio.sleep(0.5)
    drv.check_alive()
    log.info("Clearing errors")
    drv.clear_errors()
    await drv.wait_for_heartbeat()
    drv.check_errors()

    await configure_controller(drv)

    # make setpoint generator
    setpoint_gen = sawtooth_generator(roc=10.0, max_val=40.0)

    log.info("Running velocity control")
    try:
        while True:
            drv.check_errors()
            setpoint = next(setpoint_gen)

            drv.set_input_vel(setpoint)
            await asyncio.sleep(SETPOINT_DELAY)

    finally:
        drv.stop()
        await asyncio.sleep(0.5)


if __name__ == "__main__":
    run_main_async(main(1, "can0"))