diff --git a/.gitignore b/.gitignore index 7e99e36..17952ae 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -*.pyc \ No newline at end of file +*.pyc +build +dist +dynamixel.egg-info diff --git a/License.txt b/License.txt old mode 100755 new mode 100644 index 023848e..d942805 --- a/License.txt +++ b/License.txt @@ -1,12 +1,12 @@ -This is a Python version of the ForestMoon Dynamixel library originally written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) for the Pi Robot Project which -lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. +This is a Python version of the ForestMoon Dynamixel library originally written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) for the Pi Robot Project which +lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. diff --git a/dynamixel/__init__.py b/dynamixel/__init__.py old mode 100755 new mode 100644 index 3da6910..f5e4ce4 --- a/dynamixel/__init__.py +++ b/dynamixel/__init__.py @@ -1,27 +1,27 @@ -#!/usr/bin/env python - -""" - -// This software was written and developed by Scott Ferguson. -// The current version can be found at http://www.forestmoon.com/Software/. -// Comments and suggestions are encouraged and can be sent to mailto:contact@forestmoon.com?subject=Software. -// This free software is distributed under the GNU General Public License. -// See http://www.gnu.org/licenses/gpl.html for details. -// This license restricts your usage of the software in derivative works. - -The original C# version of this software was converted to Python on behalf of the -Pi Robot Project at http://www.pirobot.org - -""" - -""" -Dynamixel AX-12+ Module -""" -__version__ = '1.0.1' - -from dynamixel import Dynamixel -from dynamixel_network import DynamixelNetwork -from echo_stream import EchoStream -from serial_stream import SerialStream -from defs import ERROR_STATUS, BAUD_RATE, REGISTER, INSTRUCTION, \ - STATUS_RETURN_LEVEL +#!/usr/bin/env python + +""" + +// This software was written and developed by Scott Ferguson. +// The current version can be found at http://www.forestmoon.com/Software/. +// Comments and suggestions are encouraged and can be sent to mailto:contact@forestmoon.com?subject=Software. +// This free software is distributed under the GNU General Public License. +// See http://www.gnu.org/licenses/gpl.html for details. +// This license restricts your usage of the software in derivative works. + +The original C# version of this software was converted to Python on behalf of the +Pi Robot Project at http://www.pirobot.org + +""" + +""" +Dynamixel AX-12+ Module +""" +__version__ = '1.0.1' + +from dynamixel import Dynamixel +from dynamixel_network import DynamixelNetwork +from echo_stream import EchoStream +from serial_stream import SerialStream +from defs import ERROR_STATUS, BAUD_RATE, REGISTER, INSTRUCTION, \ + STATUS_RETURN_LEVEL diff --git a/dynamixel/defs.py b/dynamixel/defs.py old mode 100755 new mode 100644 index 3362a8e..85453c9 --- a/dynamixel/defs.py +++ b/dynamixel/defs.py @@ -1,99 +1,99 @@ -#!/usr/bin/env python2.6 - -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * - -Module wide definitions - -""" - -import enumeration - -ERROR_STATUS = enumeration.Enumeration( [ - ('None',0x0,"None"), - ('InputVoltage', 0x1, "Input Voltage Error"), - ('AngleLimit', 0x2, "Angle Limit Error"), - ('Overheating', 0x4, "Overheating Error"), - ('Range', 0x8,"Range Error"), - ('Checksum', 0x10,"Checksum Error"), - ('Overload', 0x20,"Overload Error"), - ('Instruction', 0x40,"Instruction Error" ) ] ) - -BAUD_RATE = enumeration.Enumeration( [ - ('Baud_1000000', 1), - ('Baud_500000', 3), - ('Baud_400000', 4), - ('Baud_250000', 7), - ('Baud_200000', 9), - ('Baud_115200', 0x10), - ('Baud_57600', 0x22), - ('Baud_19200', 0x67), - ('Baud_9600', 0xcf) - ]) - -REGISTER = enumeration.Enumeration( [ - ('ModelNumber', 0, "Model Number"), - ('FirmwareVersion', 2, "Firmware Version"), - ('Id', 3, "Id"), - ('BaudRate', 4, "Baud Rate"), - ('ReturnDelay', 5, "Return Delay"), - ('CWAngleLimit', 6, "CW Angle Limit"), - ('CCWAngleLimit', 8, "CCW Angle Limit"), - ('TemperatureLimit', 11, "Temperature Limit"), - ('LowVoltageLimit', 12, "Low Voltage Limit"), - ('HighVoltageLimit', 13, "High Voltage Limit"), - ('MaxTorque', 14, "Max Torque"), - ('StatusReturnLevel', 16, "Status Return Level"), - ('AlarmLED', 17, "Alarm Led"), - ('AlarmShutdown', 18, "Alarm Shutdown"), - ('DownCalibration', 20, "Down Calibration"), - ('UpCalibration', 22, "Up Calibration"), - ('TorqueEnable', 24, "Torque Enable"), - ('LED', 25, "LED"), - ('CWComplianceMargin', 26, "CW Compliance Margin"), - ('CCWComplianceMargin', 27, "CCW Compliance Margin"), - ('CWComplianceSlope', 28, "CW Compliance Slope"), - ('CCWComplianceSlope', 29, "CCW Compliance Slope"), - ('GoalPosition', 30, "Goal Position"), - ('MovingSpeed', 32, "Moving Speed"), - ('TorqueLimit', 34, "Torque Limit"), - ('CurrentPosition', 36, "Current Position"), - ('CurrentSpeed', 38, "Current Speed"), - ('CurrentLoad', 40, "Current Load"), - ('CurrentVoltage', 42, "Current Voltage"), - ('CurrentTemperature', 43, "Current Temperature"), - ('RegisteredInstruction', 44, "Registered Instruction"), - ('Moving', 46, "Moving"), - ('Lock', 47, "Lock"), - ('Punch', 48, "Punch" )]) - -STATUS_RETURN_LEVEL = enumeration.Enumeration( [ - ('NoResponse', 0), - ('RespondToReadData', 1), - ('RespondToAll', 2 )]) - -INSTRUCTION = enumeration.Enumeration( [ - ('Ping', 1, "Respond only with a status packet."), - ('ReadData', 2, "Read register data."), - ('WriteData', 3, "Write register data."), - ('RegWrite', 4, "Delay writing register data until an Action \ -instruction is received."), - ('Action', 5, "Perform pending RegWrite instructions."), - ('Reset', 6, "Reset all registers (including ID) to default values."), - ('SyncWrite', 0x83, "Write register data to multiple \ -Dynamixels at once. ") - ] ) +#!/usr/bin/env python2.6 + +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * + +Module wide definitions + +""" + +import enumeration + +ERROR_STATUS = enumeration.Enumeration( [ + ('None',0x0,"None"), + ('InputVoltage', 0x1, "Input Voltage Error"), + ('AngleLimit', 0x2, "Angle Limit Error"), + ('Overheating', 0x4, "Overheating Error"), + ('Range', 0x8,"Range Error"), + ('Checksum', 0x10,"Checksum Error"), + ('Overload', 0x20,"Overload Error"), + ('Instruction', 0x40,"Instruction Error" ) ] ) + +BAUD_RATE = enumeration.Enumeration( [ + ('Baud_1000000', 1), + ('Baud_500000', 3), + ('Baud_400000', 4), + ('Baud_250000', 7), + ('Baud_200000', 9), + ('Baud_115200', 0x10), + ('Baud_57600', 0x22), + ('Baud_19200', 0x67), + ('Baud_9600', 0xcf) + ]) + +REGISTER = enumeration.Enumeration( [ + ('ModelNumber', 0, "Model Number"), + ('FirmwareVersion', 2, "Firmware Version"), + ('Id', 3, "Id"), + ('BaudRate', 4, "Baud Rate"), + ('ReturnDelay', 5, "Return Delay"), + ('CWAngleLimit', 6, "CW Angle Limit"), + ('CCWAngleLimit', 8, "CCW Angle Limit"), + ('TemperatureLimit', 11, "Temperature Limit"), + ('LowVoltageLimit', 12, "Low Voltage Limit"), + ('HighVoltageLimit', 13, "High Voltage Limit"), + ('MaxTorque', 14, "Max Torque"), + ('StatusReturnLevel', 16, "Status Return Level"), + ('AlarmLED', 17, "Alarm Led"), + ('AlarmShutdown', 18, "Alarm Shutdown"), + ('DownCalibration', 20, "Down Calibration"), + ('UpCalibration', 22, "Up Calibration"), + ('TorqueEnable', 24, "Torque Enable"), + ('LED', 25, "LED"), + ('CWComplianceMargin', 26, "CW Compliance Margin"), + ('CCWComplianceMargin', 27, "CCW Compliance Margin"), + ('CWComplianceSlope', 28, "CW Compliance Slope"), + ('CCWComplianceSlope', 29, "CCW Compliance Slope"), + ('GoalPosition', 30, "Goal Position"), + ('MovingSpeed', 32, "Moving Speed"), + ('TorqueLimit', 34, "Torque Limit"), + ('CurrentPosition', 36, "Current Position"), + ('CurrentSpeed', 38, "Current Speed"), + ('CurrentLoad', 40, "Current Load"), + ('CurrentVoltage', 42, "Current Voltage"), + ('CurrentTemperature', 43, "Current Temperature"), + ('RegisteredInstruction', 44, "Registered Instruction"), + ('Moving', 46, "Moving"), + ('Lock', 47, "Lock"), + ('Punch', 48, "Punch" )]) + +STATUS_RETURN_LEVEL = enumeration.Enumeration( [ + ('NoResponse', 0), + ('RespondToReadData', 1), + ('RespondToAll', 2 )]) + +INSTRUCTION = enumeration.Enumeration( [ + ('Ping', 1, "Respond only with a status packet."), + ('ReadData', 2, "Read register data."), + ('WriteData', 3, "Write register data."), + ('RegWrite', 4, "Delay writing register data until an Action \ +instruction is received."), + ('Action', 5, "Perform pending RegWrite instructions."), + ('Reset', 6, "Reset all registers (including ID) to default values."), + ('SyncWrite', 0x83, "Write register data to multiple \ +Dynamixels at once. ") + ] ) diff --git a/dynamixel/dynamixel.py b/dynamixel/dynamixel.py old mode 100755 new mode 100644 index 8251a49..da4d266 --- a/dynamixel/dynamixel.py +++ b/dynamixel/dynamixel.py @@ -1,497 +1,497 @@ -#!/usr/bin/env python2.6 - -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * - -Dynamixel interface - -""" - -import defs -import time -import dynamixel_network - -class Dynamixel (object): - """ Dynamixel AX-12+ class """ - def __init__( self, ident, dyn_net ): - """ Constructor - ident - the id for this dynamixel - dyn_net - the parent dynamixel network - """ - self._id = ident - self._dyn_net = dyn_net - self.cache = {} - self.changed = False - self._synchronized = True - data = self._dyn_net.read_registers( ident, defs.REGISTER.GoalPosition, - defs.REGISTER.MovingSpeed ) - self[ defs.REGISTER.GoalPosition ] = data[0] - self[ defs.REGISTER.MovingSpeed ] = data[1] - - def _no_cache( self, register ): - """ deteremine if a register value should be cached - - register - register - - returns True if should not be cached - """ - return register in [defs.REGISTER.CurrentLoad, - defs.REGISTER.CurrentPosition, - defs.REGISTER.CurrentSpeed, - defs.REGISTER.CurrentTemperature, - defs.REGISTER.CurrentVoltage, - defs.REGISTER.Moving, - defs.REGISTER.TorqueEnable ] - - def __getitem__( self, register ): - """ Get a cache value - - register - register to retrieve - - returns value or -1 if not in cache - """ - data = -1 - if register in self.cache: - data = self.cache[ register ] - return data - - def __setitem__( self, register, value ): - """ Set a cache value - - register - register to retrieve - """ - - self.cache[ register ] = value - - def _get_register_value( self, register ): - """ Get a register value from the cache, if present, - or by reading the value from the Dynamixel - - reg - register to read - - return the register value""" - if register in [defs.REGISTER.GoalPosition, defs.REGISTER.MovingSpeed]: - return self[ register ] - if self._no_cache( register ): - return self._dyn_net.read_register( self._id, register ) - else: - value = self[ register ] - if value == -1: - return self._dyn_net.read_register( self._id, register ) - else: - return value - - def set_register_value( self, register, value ): - """Set a register value and record in the cache, if applicable. - - register - register - value - byte or word value - """ - if register in [defs.REGISTER.GoalPosition, defs.REGISTER.MovingSpeed]: - if self._synchronized: - if register == defs.REGISTER.MovingSpeed and value == 0: - value = 1 - print "Moving speed %d " % (value) - self[ register ] = value - self.changed = True - elif register in [defs.REGISTER.ModelNumber, - defs.REGISTER.FirmwareVersion, - defs.REGISTER.CurrentPosition, - defs.REGISTER.CurrentSpeed, - defs.REGISTER.CurrentLoad, - defs.REGISTER.CurrentVoltage, - defs.REGISTER.CurrentTemperature, - defs.REGISTER.Moving]: - raise ValueError( "Cannot set register" ) - if self._no_cache( register ): - self._dyn_net.write_register( self._id, register, value, False ) - return - if self[ register ] == value: - return - self._dyn_net.write_register( self._id, register, value, False ) - self[ register ] = value - - - def read_all( self ): - """ Read all register values into the cache """ - regs = defs.REGISTER.values() - regs.sort() - values = self._dyn_net.read_registers( self._id, - defs.REGISTER.ModelNumber, - defs.REGISTER.Punch ) - for i, reg in enumerate( regs ): - self[ reg ] = values[i] - - def reset( self, ident): - - """Resets a dynamixel - - ident - id to reset - - Note: - This function should be used carefully, if at all. - It resets the unit ID to 1, so careful planning is required to to - avoid collisions on a network with more than one Dynamixel. - """ - self._dyn_net.write_instruction( ident, defs.INSTRUCTION.Reset, None ) - self._dyn_net.await_packet( ident, 0 ) - - def reset_registers(self ): - """Reset register values to factory default""" - self._dyn_net.dynamixel_id_change( self, 1 ) - self.reset( self._id ) - self._id = 1 - time.sleep(0.3) - self.read_all() - - - def __str__( self ): - return "Dyn %d" % ( self._id ) - - - def stop( self ): - """ - Stop the Dynamixel from moving. - - There is no direct way to command a Dynamixel to stop. - And there is no way to set the speed to 0, since the value 0 is - specially interpreted to mean 'as fast as possibly'. - The best we can do is command it to move to its current position - and set the speed to 1 to slow it down as much as possible. - If there is any significant lag between receiving the CurrentPosition - and setting it and the speed, there will be some residual movement - as the Dynamixel moves to its observed but stale CurrentPosition. - If the Dynamixel is in Sychronized mode, a call to - 'DynamixelNextwork.Synchronize' will be required to complete the operation. - """ - self.goal_position = self.current_position - self.moving_speed = 1 - - def _get_synchronized( self ): - """getter""" - return self._synchronized - - def _set_synchronized( self, value ): - """ setter """ - self._synchronized = value - - synchronized = property( _get_synchronized, _set_synchronized ) - - def _get_goal_position( self ): - """getter""" - return self._get_register_value( defs.REGISTER.GoalPosition ) - def _set_goal_position( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.GoalPosition, value ) - - goal_position = property( _get_goal_position, _set_goal_position ) - - def _get_moving_speed( self ): - """getter""" - return self._get_register_value( defs.REGISTER.MovingSpeed ) - def _set_moving_speed( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.MovingSpeed, value ) - - moving_speed = property( _get_moving_speed, _set_moving_speed ) - - def _get_alarm_led( self ): - """getter""" - return self._get_register_value( defs.REGISTER.AlarmLED ) - def _set_alarm_led( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.AlarmLED, value ) - - alarm_led = property( _get_alarm_led, _set_alarm_led ) - - def _get_alarm_shutdown( self ): - """getter""" - return self._get_register_value( defs.REGISTER.AlarmShutdown ) - def _set_alarm_shutdown( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.AlarmShutdown, value ) - - alarm_shutdown = property( _get_alarm_shutdown, _set_alarm_shutdown ) - - def _get_baud_rate( self ): - """getter""" - return self._get_register_value( defs.REGISTER.BaudRate ) - def _set_baud_rate( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.BaudRate, value ) - - baud_rate = property( _get_baud_rate, _set_baud_rate ) - - def _get_cw_angle_limit( self ): - """getter""" - return self._get_register_value( defs.REGISTER.CWAngleLimit ) - def _set_cw_angle_limit( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.CWAngleLimit, value ) - - cw_angle_limit = property( _get_cw_angle_limit, _set_cw_angle_limit ) - - def _get_ccw_angle_limit( self ): - """getter""" - return self._get_register_value( defs.REGISTER.CWAngleLimit ) - def _set_ccw_angle_limit( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.CWAngleLimit, value ) - - ccw_angle_limit = property( _get_ccw_angle_limit, _set_ccw_angle_limit ) - - - def _get_ccw_compliance_margin( self ): - """getter""" - return self._get_register_value( defs.REGISTER.CCWComplianceMargin ) - def _set_ccw_compliance_margin( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.CCWComplianceMargin, value ) - - ccw_compliance_margin = property( _get_ccw_compliance_margin, - _set_ccw_compliance_margin ) - - def _get_cw_compliance_margin( self ): - """getter""" - return self._get_register_value( defs.REGISTER.CWComplianceMargin ) - def _set_cw_compliance_margin( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.CWComplianceMargin, value ) - - cw_compliance_margin = property( _get_cw_compliance_margin, - _set_cw_compliance_margin ) - - def _get_ccw_compliance_slope( self ): - """getter""" - return self._get_register_value( defs.REGISTER.CCWComplianceSlope ) - def _set_ccw_compliance_slope( self, value ): - """ setter """ - self.set_register_value( defs.REGISTER.CCWComplianceSlope, value ) - - ccw_compliance_slope = property( _get_ccw_compliance_slope, - _set_ccw_compliance_slope ) - - def _get_cw_compliance_slope( self ): - """getter""" - return self._get_register_value(defs.REGISTER.CWComplianceSlope) - def _set_cw_compliance_slope( self, value ): - """ setter """ - self.set_register_value(defs.REGISTER.CWComplianceSlope, value) - - cw_compliance_slope = property( _get_cw_compliance_slope, - _set_cw_compliance_slope ) - - def _get_current_load( self ): - """getter""" - current_load = defs.REGISTER.CurrentLoad - val = self._dyn_net.read_register( self._id, current_load ) - if (val & 0x400) != 0: - return -(val & 0x3FF) - return val - - current_load = property( _get_current_load ) - - def _get_current_position( self ): - """getter""" - return self._get_register_value(defs.REGISTER.CurrentPosition) - - current_position = property( _get_current_position ) - - def _get_current_speed( self ): - """getter""" - val = self._dyn_net.read_register( self._id, - defs.REGISTER.CurrentSpeed ) - if (val & 0x400) != 0: - return -(val & 0x3FF) - return val - - current_speed = property( _get_current_speed ) - - def _get_current_temperature( self ): - """getter""" - return self._dyn_net.read_register( self._id, - defs.REGISTER.CurrentTemperature ) - - current_temperature = property( _get_current_temperature ) - - def _get_current_voltage( self ): - """getter""" - volts = self._dyn_net.read_register( self._id, - defs.REGISTER.CurrentVoltage) - return volts / 10.0 - - current_voltage = property( _get_current_voltage ) - - def _get_torque_enable( self ): - """getter""" - return (self._get_register_value(defs.REGISTER.TorqueEnable) != 0) - def _set_torque_enable( self, value ): - """ setter """ - self.set_register_value(defs.REGISTER.TorqueEnable, 1 if value else 0 ) - - torque_enable = property( _get_torque_enable, _set_torque_enable ) - - def _get_firmware_version( self ): - """getter""" - return self._get_register_value(defs.REGISTER.FirmwareVersion) - - firmware_version = property( _get_firmware_version ) - - def _get_id( self ): - """getter""" - return self._id - def _set_id( self, value ): - """change id of the dynamixel""" - broadcast_id = dynamixel_network.DynamixelInterface.BROADCAST_ID - if value < 0 or value >= broadcast_id: - raise ValueError( "Id must be in range 0 to 253") - if value == self._id: - return - self._dyn_net.dynamixel_id_change( self, value ) - self._dyn_net.write_register( self._id, defs.REGISTER.Id, value, False ) - self._id = value - - id = property( _get_id, _set_id ) - - def _get_led( self ): - """getter""" - return (self._get_register_value(defs.REGISTER.LED) != 0) - def _set_led( self, value ): - """setter""" - self.set_register_value(defs.REGISTER.LED, 1 if value else 0) - - led = property( _get_led, _set_led ) - - def _get_lock( self ): - """getter""" - return (self._get_register_value(defs.REGISTER.Lock) != 0) - - lock = property( _get_lock ) - - def _get_temperature_limit( self ): - """getter""" - return self._get_register_value( defs.REGISTER.TemperatureLimit ) - def _set_temperature_limit( self, value ): - """setter""" - self.set_register_value( defs.REGISTER.TemperatureLimit, value ) - - temperature_limit = property( _get_temperature_limit, - _set_temperature_limit ) - - def _get_max_torque( self ): - """getter""" - return self._get_register_value(defs.REGISTER.MaxTorque) - def _set_max_torque( self, value ): - """setter""" - self.set_register_value( defs.REGISTER.MaxTorque, value ) - - max_torque = property( _get_max_torque, _set_max_torque ) - - def _get_high_voltage_limit( self ): - """getter""" - return self._get_register_value(defs.REGISTER.HighVoltageLimit)/10.0 - def _set_high_voltage_limit( self, value ): - """setter""" - adj_value = int(round(value* 10.0)) - self.set_register_value(defs.REGISTER.HighVoltageLimit, adj_value ) - - high_voltage_limit = property( _get_high_voltage_limit, - _set_high_voltage_limit ) - - def _get_low_voltage_limit( self ): - """getter""" - return self._get_register_value(defs.REGISTER.LowVoltageLimit)/10.0 - def _set_low_voltage_limit( self, value ): - """setter""" - adj_value = int(round(value * 10.0)) - self.set_register_value(defs.REGISTER.LowVoltageLimit, adj_value ) - - low_voltage_limit = property( _get_low_voltage_limit, - _set_low_voltage_limit ) - - def _get_model_number( self ): - """getter""" - return self._get_register_value(defs.REGISTER.ModelNumber) - - # property( _get_X, _set_X ) - - model_number = property( _get_model_number ) - - - def _get_moving( self ): - """getter""" - is_moving = (self._get_register_value(defs.REGISTER.Moving) != 0) - return self.changed or is_moving - - moving = property( _get_moving ) - - - - def _get_punch( self ): - """getter""" - return self._get_register_value(defs.REGISTER.Punch) - def _set_punch( self, value ): - """setter""" - self.set_register_value( defs.REGISTER.Punch, value) - - punch = property(_get_punch, _set_punch ) - - def _get_registered_instruction( self ): - """getter""" - reg_inst = defs.REGISTER.RegisteredInstruction - result = self._dyn_net.read_register( self._id, reg_inst ) - return result != 0 - - def _set_registered_instruction( self, value ): - """setter""" - self.set_register_value(defs.REGISTER.RegisteredInstruction, - 1 if value else 0 ) - - registered_instruction = property(_get_registered_instruction, - _set_registered_instruction ) - - def _get_return_delay( self ): - """getter""" - return self._get_register_value(defs.REGISTER.ReturnDelay) * 2 - def _set_return_delay( self, value ): - """setter""" - self.set_register_value(defs.REGISTER.ReturnDelay, value / 2) - - return_delay = property( _get_return_delay, _set_return_delay ) - - - def _get_status_return_level( self ): - """getter""" - return self._get_register_value(defs.REGISTER.StatusReturnLevel) - - def _set_status_return_level(self, value ): - """setter""" - self.set_register_value(defs.REGISTER.StatusReturnLevel, value) - - status_return_level = property( _get_status_return_level, - _set_status_return_level ) - - def _get_torque_limit( self ): - """getter""" - return self._get_register_value(defs.REGISTER.TorqueLimit) - - def _set_torque_limit( self, value ): - """setter""" - self.set_register_value(defs.REGISTER.TorqueLimit, value) - - torque_limit = property( _get_torque_limit, _set_torque_limit ) - \ No newline at end of file +#!/usr/bin/env python2.6 + +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * + +Dynamixel interface + +""" + +import defs +import time +import dynamixel_network + +class Dynamixel (object): + """ Dynamixel AX-12+ class """ + def __init__( self, ident, dyn_net ): + """ Constructor + ident - the id for this dynamixel + dyn_net - the parent dynamixel network + """ + self._id = ident + self._dyn_net = dyn_net + self.cache = {} + self.changed = False + self._synchronized = True + data = self._dyn_net.read_registers( ident, defs.REGISTER.GoalPosition, + defs.REGISTER.MovingSpeed ) + self[ defs.REGISTER.GoalPosition ] = data[0] + self[ defs.REGISTER.MovingSpeed ] = data[1] + + def _no_cache( self, register ): + """ deteremine if a register value should be cached + + register - register + + returns True if should not be cached + """ + return register in [defs.REGISTER.CurrentLoad, + defs.REGISTER.CurrentPosition, + defs.REGISTER.CurrentSpeed, + defs.REGISTER.CurrentTemperature, + defs.REGISTER.CurrentVoltage, + defs.REGISTER.Moving, + defs.REGISTER.TorqueEnable ] + + def __getitem__( self, register ): + """ Get a cache value + + register - register to retrieve + + returns value or -1 if not in cache + """ + data = -1 + if register in self.cache: + data = self.cache[ register ] + return data + + def __setitem__( self, register, value ): + """ Set a cache value + + register - register to retrieve + """ + + self.cache[ register ] = value + + def _get_register_value( self, register ): + """ Get a register value from the cache, if present, + or by reading the value from the Dynamixel + + reg - register to read + + return the register value""" + if register in [defs.REGISTER.GoalPosition, defs.REGISTER.MovingSpeed]: + return self[ register ] + if self._no_cache( register ): + return self._dyn_net.read_register( self._id, register ) + else: + value = self[ register ] + if value == -1: + return self._dyn_net.read_register( self._id, register ) + else: + return value + + def set_register_value( self, register, value ): + """Set a register value and record in the cache, if applicable. + + register - register + value - byte or word value + """ + if register in [defs.REGISTER.GoalPosition, defs.REGISTER.MovingSpeed]: + if self._synchronized: + if register == defs.REGISTER.MovingSpeed and value == 0: + value = 1 + print "Moving speed %d " % (value) + self[ register ] = value + self.changed = True + elif register in [defs.REGISTER.ModelNumber, + defs.REGISTER.FirmwareVersion, + defs.REGISTER.CurrentPosition, + defs.REGISTER.CurrentSpeed, + defs.REGISTER.CurrentLoad, + defs.REGISTER.CurrentVoltage, + defs.REGISTER.CurrentTemperature, + defs.REGISTER.Moving]: + raise ValueError( "Cannot set register" ) + if self._no_cache( register ): + self._dyn_net.write_register( self._id, register, value, False ) + return + if self[ register ] == value: + return + self._dyn_net.write_register( self._id, register, value, False ) + self[ register ] = value + + + def read_all( self ): + """ Read all register values into the cache """ + regs = defs.REGISTER.values() + regs.sort() + values = self._dyn_net.read_registers( self._id, + defs.REGISTER.ModelNumber, + defs.REGISTER.Punch ) + for i, reg in enumerate( regs ): + self[ reg ] = values[i] + + def reset( self, ident): + + """Resets a dynamixel + + ident - id to reset + + Note: + This function should be used carefully, if at all. + It resets the unit ID to 1, so careful planning is required to to + avoid collisions on a network with more than one Dynamixel. + """ + self._dyn_net.write_instruction( ident, defs.INSTRUCTION.Reset, None ) + self._dyn_net.await_packet( ident, 0 ) + + def reset_registers(self ): + """Reset register values to factory default""" + self._dyn_net.dynamixel_id_change( self, 1 ) + self.reset( self._id ) + self._id = 1 + time.sleep(0.3) + self.read_all() + + + def __str__( self ): + return "Dyn %d" % ( self._id ) + + + def stop( self ): + """ + Stop the Dynamixel from moving. + + There is no direct way to command a Dynamixel to stop. + And there is no way to set the speed to 0, since the value 0 is + specially interpreted to mean 'as fast as possibly'. + The best we can do is command it to move to its current position + and set the speed to 1 to slow it down as much as possible. + If there is any significant lag between receiving the CurrentPosition + and setting it and the speed, there will be some residual movement + as the Dynamixel moves to its observed but stale CurrentPosition. + If the Dynamixel is in Sychronized mode, a call to + 'DynamixelNextwork.Synchronize' will be required to complete the operation. + """ + self.goal_position = self.current_position + self.moving_speed = 1 + + def _get_synchronized( self ): + """getter""" + return self._synchronized + + def _set_synchronized( self, value ): + """ setter """ + self._synchronized = value + + synchronized = property( _get_synchronized, _set_synchronized ) + + def _get_goal_position( self ): + """getter""" + return self._get_register_value( defs.REGISTER.GoalPosition ) + def _set_goal_position( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.GoalPosition, value ) + + goal_position = property( _get_goal_position, _set_goal_position ) + + def _get_moving_speed( self ): + """getter""" + return self._get_register_value( defs.REGISTER.MovingSpeed ) + def _set_moving_speed( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.MovingSpeed, value ) + + moving_speed = property( _get_moving_speed, _set_moving_speed ) + + def _get_alarm_led( self ): + """getter""" + return self._get_register_value( defs.REGISTER.AlarmLED ) + def _set_alarm_led( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.AlarmLED, value ) + + alarm_led = property( _get_alarm_led, _set_alarm_led ) + + def _get_alarm_shutdown( self ): + """getter""" + return self._get_register_value( defs.REGISTER.AlarmShutdown ) + def _set_alarm_shutdown( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.AlarmShutdown, value ) + + alarm_shutdown = property( _get_alarm_shutdown, _set_alarm_shutdown ) + + def _get_baud_rate( self ): + """getter""" + return self._get_register_value( defs.REGISTER.BaudRate ) + def _set_baud_rate( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.BaudRate, value ) + + baud_rate = property( _get_baud_rate, _set_baud_rate ) + + def _get_cw_angle_limit( self ): + """getter""" + return self._get_register_value( defs.REGISTER.CWAngleLimit ) + def _set_cw_angle_limit( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.CWAngleLimit, value ) + + cw_angle_limit = property( _get_cw_angle_limit, _set_cw_angle_limit ) + + def _get_ccw_angle_limit( self ): + """getter""" + return self._get_register_value( defs.REGISTER.CCWAngleLimit ) + def _set_ccw_angle_limit( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.CCWAngleLimit, value ) + + ccw_angle_limit = property( _get_ccw_angle_limit, _set_ccw_angle_limit ) + + + def _get_ccw_compliance_margin( self ): + """getter""" + return self._get_register_value( defs.REGISTER.CCWComplianceMargin ) + def _set_ccw_compliance_margin( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.CCWComplianceMargin, value ) + + ccw_compliance_margin = property( _get_ccw_compliance_margin, + _set_ccw_compliance_margin ) + + def _get_cw_compliance_margin( self ): + """getter""" + return self._get_register_value( defs.REGISTER.CWComplianceMargin ) + def _set_cw_compliance_margin( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.CWComplianceMargin, value ) + + cw_compliance_margin = property( _get_cw_compliance_margin, + _set_cw_compliance_margin ) + + def _get_ccw_compliance_slope( self ): + """getter""" + return self._get_register_value( defs.REGISTER.CCWComplianceSlope ) + def _set_ccw_compliance_slope( self, value ): + """ setter """ + self.set_register_value( defs.REGISTER.CCWComplianceSlope, value ) + + ccw_compliance_slope = property( _get_ccw_compliance_slope, + _set_ccw_compliance_slope ) + + def _get_cw_compliance_slope( self ): + """getter""" + return self._get_register_value(defs.REGISTER.CWComplianceSlope) + def _set_cw_compliance_slope( self, value ): + """ setter """ + self.set_register_value(defs.REGISTER.CWComplianceSlope, value) + + cw_compliance_slope = property( _get_cw_compliance_slope, + _set_cw_compliance_slope ) + + def _get_current_load( self ): + """getter""" + current_load = defs.REGISTER.CurrentLoad + val = self._dyn_net.read_register( self._id, current_load ) + if (val & 0x400) != 0: + return -(val & 0x3FF) + return val + + current_load = property( _get_current_load ) + + def _get_current_position( self ): + """getter""" + return self._get_register_value(defs.REGISTER.CurrentPosition) + + current_position = property( _get_current_position ) + + def _get_current_speed( self ): + """getter""" + val = self._dyn_net.read_register( self._id, + defs.REGISTER.CurrentSpeed ) + if (val & 0x400) != 0: + return -(val & 0x3FF) + return val + + current_speed = property( _get_current_speed ) + + def _get_current_temperature( self ): + """getter""" + return self._dyn_net.read_register( self._id, + defs.REGISTER.CurrentTemperature ) + + current_temperature = property( _get_current_temperature ) + + def _get_current_voltage( self ): + """getter""" + volts = self._dyn_net.read_register( self._id, + defs.REGISTER.CurrentVoltage) + return volts / 10.0 + + current_voltage = property( _get_current_voltage ) + + def _get_torque_enable( self ): + """getter""" + return (self._get_register_value(defs.REGISTER.TorqueEnable) != 0) + def _set_torque_enable( self, value ): + """ setter """ + self.set_register_value(defs.REGISTER.TorqueEnable, 1 if value else 0 ) + + torque_enable = property( _get_torque_enable, _set_torque_enable ) + + def _get_firmware_version( self ): + """getter""" + return self._get_register_value(defs.REGISTER.FirmwareVersion) + + firmware_version = property( _get_firmware_version ) + + def _get_id( self ): + """getter""" + return self._id + def _set_id( self, value ): + """change id of the dynamixel""" + broadcast_id = dynamixel_network.DynamixelInterface.BROADCAST_ID + if value < 0 or value >= broadcast_id: + raise ValueError( "Id must be in range 0 to 253") + if value == self._id: + return + self._dyn_net.dynamixel_id_change( self, value ) + self._dyn_net.write_register( self._id, defs.REGISTER.Id, value, False ) + self._id = value + + id = property( _get_id, _set_id ) + + def _get_led( self ): + """getter""" + return (self._get_register_value(defs.REGISTER.LED) != 0) + def _set_led( self, value ): + """setter""" + self.set_register_value(defs.REGISTER.LED, 1 if value else 0) + + led = property( _get_led, _set_led ) + + def _get_lock( self ): + """getter""" + return (self._get_register_value(defs.REGISTER.Lock) != 0) + + lock = property( _get_lock ) + + def _get_temperature_limit( self ): + """getter""" + return self._get_register_value( defs.REGISTER.TemperatureLimit ) + def _set_temperature_limit( self, value ): + """setter""" + self.set_register_value( defs.REGISTER.TemperatureLimit, value ) + + temperature_limit = property( _get_temperature_limit, + _set_temperature_limit ) + + def _get_max_torque( self ): + """getter""" + return self._get_register_value(defs.REGISTER.MaxTorque) + def _set_max_torque( self, value ): + """setter""" + self.set_register_value( defs.REGISTER.MaxTorque, value ) + + max_torque = property( _get_max_torque, _set_max_torque ) + + def _get_high_voltage_limit( self ): + """getter""" + return self._get_register_value(defs.REGISTER.HighVoltageLimit)/10.0 + def _set_high_voltage_limit( self, value ): + """setter""" + adj_value = int(round(value* 10.0)) + self.set_register_value(defs.REGISTER.HighVoltageLimit, adj_value ) + + high_voltage_limit = property( _get_high_voltage_limit, + _set_high_voltage_limit ) + + def _get_low_voltage_limit( self ): + """getter""" + return self._get_register_value(defs.REGISTER.LowVoltageLimit)/10.0 + def _set_low_voltage_limit( self, value ): + """setter""" + adj_value = int(round(value * 10.0)) + self.set_register_value(defs.REGISTER.LowVoltageLimit, adj_value ) + + low_voltage_limit = property( _get_low_voltage_limit, + _set_low_voltage_limit ) + + def _get_model_number( self ): + """getter""" + return self._get_register_value(defs.REGISTER.ModelNumber) + + # property( _get_X, _set_X ) + + model_number = property( _get_model_number ) + + + def _get_moving( self ): + """getter""" + is_moving = (self._get_register_value(defs.REGISTER.Moving) != 0) + return self.changed or is_moving + + moving = property( _get_moving ) + + + + def _get_punch( self ): + """getter""" + return self._get_register_value(defs.REGISTER.Punch) + def _set_punch( self, value ): + """setter""" + self.set_register_value( defs.REGISTER.Punch, value) + + punch = property(_get_punch, _set_punch ) + + def _get_registered_instruction( self ): + """getter""" + reg_inst = defs.REGISTER.RegisteredInstruction + result = self._dyn_net.read_register( self._id, reg_inst ) + return result != 0 + + def _set_registered_instruction( self, value ): + """setter""" + self.set_register_value(defs.REGISTER.RegisteredInstruction, + 1 if value else 0 ) + + registered_instruction = property(_get_registered_instruction, + _set_registered_instruction ) + + def _get_return_delay( self ): + """getter""" + return self._get_register_value(defs.REGISTER.ReturnDelay) * 2 + def _set_return_delay( self, value ): + """setter""" + self.set_register_value(defs.REGISTER.ReturnDelay, value / 2) + + return_delay = property( _get_return_delay, _set_return_delay ) + + + def _get_status_return_level( self ): + """getter""" + return self._get_register_value(defs.REGISTER.StatusReturnLevel) + + def _set_status_return_level(self, value ): + """setter""" + self.set_register_value(defs.REGISTER.StatusReturnLevel, value) + + status_return_level = property( _get_status_return_level, + _set_status_return_level ) + + def _get_torque_limit( self ): + """getter""" + return self._get_register_value(defs.REGISTER.TorqueLimit) + + def _set_torque_limit( self, value ): + """setter""" + self.set_register_value(defs.REGISTER.TorqueLimit, value) + + torque_limit = property( _get_torque_limit, _set_torque_limit ) + diff --git a/dynamixel/dynamixel_network.py b/dynamixel/dynamixel_network.py old mode 100755 new mode 100644 index 8dbf3fe..74b156d --- a/dynamixel/dynamixel_network.py +++ b/dynamixel/dynamixel_network.py @@ -1,643 +1,643 @@ -#!/usr/bin/env python2.6 -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * - -Dynamixel Network module - -""" - -import defs -import stream -import event_handler -import time -import dynamixel - -class DynamixelInterface(object): - """ Interface to Dynamixel CM-5 """ - BROADCAST_ID = 254 - def __init__(self, strm): - """ Constructor - stream - an open Stream - """ - self._stream = strm - self.dynamixel_error = event_handler.EventHandler() - self._in_error_handler = False - self._error_count_1st_header_byte = 0 - self._error_count_2nd_header_byte = 0 - self._error_count_3rd_header_byte = 0 - self._error_count_invalid_length = 0 - self._error_count_unexpected_ident = 0 - self._error_count_unexpected_length = 0 - self._response_total_elapsed = 0 - self._response_max_elapsed = 0 - self._response_count = 0 - - @staticmethod - def error_text(error_type): - """ Returns a list of the textual representation - of the ERROR_STATUS value """ - text = [] - for key, value, description in defs.ERROR_STATUS.items(): - if (error_type & value) != 0: - text.append(description) - return text - - @staticmethod - def register_length(reg): - """ Returns the register length""" - if reg in [defs.REGISTER.ModelNumber, - defs.REGISTER.CWAngleLimit, defs.REGISTER.CCWAngleLimit, - defs.REGISTER.MaxTorque, defs.REGISTER.DownCalibration, - defs.REGISTER.UpCalibration, defs.REGISTER.GoalPosition, - defs.REGISTER.MovingSpeed, defs.REGISTER.TorqueLimit, - defs.REGISTER.CurrentPosition, defs.REGISTER.CurrentSpeed, - defs.REGISTER.CurrentLoad, defs.REGISTER.Punch]: - return 2 - else: - return 1 - @staticmethod - def register_reserved( addr ): - """ Test to see if a register is reserved """ - return addr in [0xA,0x13] - - - def enter_toss_mode(self): - """ Try to put the CM-5 into Toss Mode - - Returns true on success - """ - state = 0 - buf = "" - save_timeout = self._stream.read_timeout - while state < 5: - try: - if state == 0: - # send a CR and look for a response from a CM-5 - # in manage mode - self._stream.write_byte('\r') - state = 1 - elif state == 1: - # look for a response from a CM-5 - buf += self._stream.read_byte() - if len(buf) >= 3 and buf[-3:] == "[CM": - # a CM-5 that has just been put into manage mode - # will respond to the first CR with a version - # string, e.g. "[CM-5 Version 1.15]" lengthen the - # timeout because the CM-5 will scan for Dynamixels - self._stream.read_timeout = 750 - state = 2 - if len(buf) >= 3 and buf[-3:] == "[CI": - # a CM-5 in manage mode that has already scanned - # will respond with a prompt string containing the - # ID of the current Dynamixel, e.g. "[CID:001(0x01)]" - # restore the shorter timeout - self._stream.read_timeout = save_timeout - state = 3 - elif state == 2: - buf += self._stream.read_byte() - if len(buf) >= 3 and buf[-3:] == "[CI": - # a CM-5 in manage mode that has already scanned - # will respond with a prompt string containing the - # ID of the current Dynamixel, e.g. "[CID:001(0x01)]" - # restore the shorter timeout - self._stream.read_timeout = save_timeout - state = 3 - elif state == 3: - buf += self._stream.read_byte() - if len(buf) >= 2 and buf[-2:] == "] ": - # found the end of the CID prompt - # put the CM-5 in Toss Mode - self._stream.write_byte('t') - self._stream.write_byte('\r') - state = 4 - elif state == 4: - buf += self._stream.read_byte() - if len(buf) >= 9 and buf[-9:] == "Toss Mode": - # found the "Toss Mode" response verification - # we're good to go - state = 5 - except stream.TimeoutException: - self._stream.read_timeout = save_timeout - if state > 1: - raise Exception("CM-5 detected but not in Managed Mode") - break - print "%d : %d" % (self._stream.read_timeout, - self._stream.write_timeout) - return state == 5 - - def dump_statistics(self): - """ Return a list of textual statistics - - Returns a list of strings - """ - - result = [] - if self._response_count != 0: - avg_response_time = self._response_total_elapsed / \ - float(self._response_count) - result.append("Average ms per Dynamixel response: %.1F" % \ - (avg_response_time)) - result.append("Maximum ms per Dynamixel response: %d" % \ - (self._response_max_elapsed)) - if self._error_count_1st_header_byte > 0: - result.append("1st Header Byte: %d" % \ - (self._error_count_1st_header_byte)) - if self._error_count_2nd_header_byte > 0: - result.append("2nd Header Byte: %d" % \ - (self._error_count_2nd_header_byte)) - if self._error_count_3rd_header_byte > 0: - result.append("3rd Header Byte: %d" % \ - (self._error_count_3rd_header_byte)) - if self._error_count_invalid_length > 0: - result.append("Invalid Length: %d" % \ - (self._error_count_invalid_length)) - if self._error_count_unexpected_ident > 0: - result.append("Unexpected ID: %d" % \ - (self._error_count_unexpected_ident)) - if self._error_count_unexpected_length > 0: - result.append("Unexpected Length: %d" % \ - (self._error_count_unexpected_length)) - return result - - def read_packet(self): - """Read packet - - Returns tuple (packet type identifier, packet) - """ - - retry = True - # read packet is only ever called immediately following a write - # instruction (sent packet ) so we can use this opportunity to - # time the response from dynamixel - start_time = time.time() - - # set an invalid id for error return cases - ident = 0xFF - ## - ## Status packet returned from dynamixel serve - ## 0 1 2 3 4 5 5 + data-length - ## 0xFF 0xFF id len err data checksum - - # first header byte - byte = ord(self._stream.read_byte()) - - # update statistics - end_time = time.time() - elapsed_time = (end_time - start_time) - self._response_total_elapsed += elapsed_time - self._response_count += 1 - self._response_max_elapsed = max(self._response_max_elapsed, \ - elapsed_time) - - if byte != 0xFF: - self._error_count_1st_header_byte += 1 - return (ident, None) - - # 2nd header byte - byte = ord(self._stream.read_byte()) - if byte != 0xFF: - self._error_count_2nd_header_byte += 1 - return (ident, None) - - # id or third header - byte = ord(self._stream.read_byte()) - if byte == 0xFF: - self._error_count_3rd_header_byte += 1 - byte = ord(self._stream.read_byte()) - - ident = byte - - # packet length includes data-length + 2 - length = ord(self._stream.read_byte()) - - if length < 0: - self._error_count_invalid_length += 1 - - error_status = ord(self._stream.read_byte()) - data = "" - # remove error and checksum bytes - length -= 2 - if length > 0: - while length > 0: - buf = self._stream.read(length) - length -= len(buf) - data += buf - data = [ord(byte) for byte in data] - # XXX below code/comment was in the original code - # get the checksum byte - # CONSIDER: Could validate the checksum and reject the packet - checksum = ord(self._stream.read_byte()) - # let anyone listing know about any errors reported in the packet - # use the InErrorHandler flag to avoid recursion from the - # user's handler - if error_status != 0 and not self._in_error_handler: - self._in_error_handler = True - self.dynamixel_error(self, (ident, error_status)) - self._in_error_handler = False - return (ident, data) - - def await_packet(self, ident, length): - """ Read a packet validing the id and length - Retries until a valid packet is founnd - - ident - packet identifier - length - packet length - """ - if ident == DynamixelInterface.BROADCAST_ID: - return None - while True: - (pid, data) = self.read_packet() - plen = 0 - # validate the id and length - if data: - plen = len(data) - if pid == ident and plen == length: - return data - if pid != ident: - self._error_count_unexpected_ident += 1 - if plen != length: - self._error_count_unexpected_length += 1 - - def write_instruction(self, ident, instruction, params=None): - """Send a command packet instruction - - ident - the id of the destination dynamixel or BROADCAST_ID to send to all - ins - instruction to send - param - parameters to send (list of bytes) or None - - """ - cmd = [] - # - # command packet sent to Dynamixel servo: - # 0 1 2 3 4 4 + data-length - # [0xFF] [0xFF] [id] [length] [...data...] [checksum] - # - # header bytes & id - if params == None: - params = [] - if not isinstance(params, list): - raise Exception( "Params must be a list") - cmd.append(0xFF) - cmd.append(0xFF) - cmd.append(ident) - # length is data-length + 2 - cmd.append(len(params) + 2) - cmd.append(instruction) - # parameter bytes - cmd = cmd + params - # checksum - chksum = 0 - for byte in cmd[2:]: - chksum += byte - chksum &= 0xFF - chksum = (~chksum) & 0xFF - cmd.append(chksum) - # convert from bytes to a string - cmd = ''.join([chr(c & 0xFF) for c in cmd]) - # write the string - self._stream.write(cmd) - self._stream.flush() - - def ping(self, ident): - """ Check the presence of a specific dynamixel on the network - - ident - identifier of a dynamixel to ping - """ - self.write_instruction(ident, defs.INSTRUCTION.Ping) - try: - self.await_packet(ident, 0) - except stream.TimeoutException: - return False - return True - - def _read_data(self, ident, start_address, count): - """Read register data from a Dynamixel. - - id - the id of the dynamixel to read - start_address - the starting register to read from - count - the number of bytes to read - - Note: - Some logical registers are one byte long and some are two. - The count is for the number of bytes, not the number of registers. - """ - - return_packet = None - retry = True - # the start address and count from the parameters of the command - # packet - cmd = [start_address, count] - while retry: - self.write_instruction(ident, defs.INSTRUCTION.ReadData, cmd) - try: - return_packet = self.await_packet(ident, count) - retry = False - except stream.TimeoutException: - print "TIMEOUT accessing servo ID: %d" % (ident) - self._stream.flush() - return return_packet - - def read_register(self, ident, register): - """Read the value of one logical register - - ident - the id of the Dynamixel to read - reg - logical register to read - - Returns the integer value of the logical register - - Note: - this takes into account the byte length of the logical - register""" - data = self._read_data(ident, register, - DynamixelInterface.register_length(register)) - if len(data) == 1: - return data[0] - return (data[1] << 8) + data[0] - - def read_registers(self, ident, first_register, last_register): - - """ Read the values of multiple logical registers - - ident - the id of the dynamixel - first_register - first logical register to read/address - last_register - the last logical register to read/address - - returns: - a list of register values - - Note: - this function takes into account the byte length of the - logical register - """ - # this function was cleaned up for clarity - - register_length = DynamixelInterface.register_length(last_register) - # calc number of bytes as delta based on addresses - byte_count = last_register + register_length - first_register - - # read data from servo - data = self._read_data(ident, first_register, byte_count) - if len( data ) != byte_count: - raise Exception( "Data received (%d) shorter than requested data (%d)" % (len(data), byte_count)) - # resulting values - result = [] - - regs = defs.REGISTER.values() - regs.sort() - # index of first and last register - first = regs.index( first_register ) - last = regs.index( last_register ) - - # offset to read from - offset = 0 - for i in xrange( first, last + 1 ): - reg = regs[ i ] - # calc the length; note this skips reserved registers - length = DynamixelInterface.register_length( reg ) - # calc offset - offset = reg - first_register - # reconstruct the value - if length == 1: - result.append(data[ offset ]) - else: - result.append((data[ offset + 1] << 8) + \ - data[offset]) - return result - - def write_data(self, ident, start_address, params, deferred): - """Write register data - - ident - dynamixel to write to - start_address - the starting register to write to - params - list of bytes to be written - deferred - if true the dynamixel will store the request - until the action command is received - """ - if not isinstance(params, list): - raise Exception( "Params must be a list") - cmd = [] - cmd.append(start_address) - cmd = cmd + params - inst = None - if deferred: - inst = defs.INSTRUCTION.RegWrite - else: - inst = defs.INSTRUCTION.WriteData - self.write_instruction(ident, inst, cmd) - if not deferred: - self.await_packet(ident, 0) - - def write_register(self, ident, register, value, deferred): - """Write data to one logical register - - ident - dynamixel to write to - register - the register to write to - value - the integer value to write - deferred - if true the dynamixel will store the request until the action - command is received - """ - if not isinstance( value, int): - raise ValueError( "Expected value to by an integer") - if DynamixelInterface.register_length(register) == 1: - self.write_data(ident, register, [ value ], deferred) - else: - values = [ value & 0xFF, (value >> 8) & 0xFF ] - self.write_data(ident, register, values, deferred) - - def action(self): - """Broadcasta an action instruction for all dynamixels - with deferred writes pending - - """ - self.write_instruction(DynamixelInterface.BROADCAST_ID, - defs.INSTRUCTION.Action, None) - - def sync_write(self, start_address, number_of_dynamixels, params): - """ Write to multiple registers on multiple Dynamixels - using one instruction. - - start_address = starting register to write to - number_of_dynamixels - the number of dynamixels being addressed - params - the data being written including the id and data for each - dynamixel - - Note: - This function provides the most efficient way of updating the same - registers on each of many different Dynaixels with different values - at the same time. - - The length of the 'parms' data will determine the number of sequential - registers being written to. - - For each Dynamixel the 'parms' data must include the id followed - by the register data. - """ - if len(params) % number_of_dynamixels != 0: - raise ValueError("Dynamixel SyncWrite params length error") - length = len(params) / number_of_dynamixels - 1 - cmd = [] - cmd.append(start_address) - cmd.append(length) - cmd = cmd + params - self.write_instruction(DynamixelInterface.BROADCAST_ID, - defs.INSTRUCTION.SyncWrite, cmd) - - - def scan_ids(self, start_id, end_id): - """Determine which ids are present - start_id - first id to start (must be less than end_id) - end_id - last id to search (0-253) - - Returns a list of ids on the network - - Scanning for all possible IDs (0 thru 254) can be time consuming. - So if the range can be constrained to predetermined values it can - speed up the process. - - throws ValueError on arguments out of range - """ - if end_id > 253 or end_id < 0: - raise ValueError("end_id must be 0 to 233") - if start_id > end_id or start_id < 0: - raise ValueError("start_id must be 0 to end_id") - ids = [] - for ident in xrange(start_id, end_id + 1): - if self.ping(ident): - ids.append(ident) - return ids - -class DynamixelNetwork (DynamixelInterface): - """ An abstract model of a Dynamixel network represented as a - collection of Dynamixel objects. - """ - - def __init__(self, strm): - """ The Constructor - stream - the stream to exchange command and status packets with - the dynamixel network - """ - DynamixelInterface.__init__(self, strm) - self._stopped = False - self._dynamixel_map = {} - - def __getitem__(self, ident): - """ array access to the dynamixels, indexed by id - - ident - id to retrieve - - returns the dynamixel object with that id or None if none - present on the network - """ - if ident in self._dynamixel_map: - return self._dynamixel_map[ ident ] - else: - return None - - def get_dynamixels(self): - """ A list of dynamixels present on the network """ - return self._dynamixel_map.values() - - dynamixels = property(get_dynamixels) - - def scan(self, start_id, end_id): - """ - Scan the network to discover the Dynamixels present. - - start_id - the id for the start of the search - end_id - the id for the end of the search - - Note: - function builds an internal list of Dynamixels present on the network. - Typically call this function only once per DynamixelNetwork instance - it will rebuild the list and create new Dynamixel instances to fill it, - any previously retrieved Dynamixels. - for all possible IDs (0 thru 254) can be time consuming. - So if the range can be constrained to predetermined values it can - speed up the process. - """ - self._dynamixel_map = {} - ids = self.scan_ids(start_id, end_id) - for ident in ids: - self._dynamixel_map[ ident ] = dynamixel.Dynamixel(ident, self) - - - def _get_stopped(self): - """Get if the dynamixels are stopped""" - return self._stopped - - def _set_stopped(self, value): - """Stop all dynamixels and prevent further movement - activity for the dynamixels that are synchronized - """ - if value: - for (ident, servo) in self._dynamixel_map.items(): - servo.stop() - self.synchronize() - self._stopped = value - - stopped = property(_get_stopped, _set_stopped) - - def synchronize(self): - """Send GoalPosition and MovingSpeed data for all - Dynamixels in Synchronized mode. - - This function collects all the changed GoalPosition and - MovingSpeed data that has been stored for each Dynamixel - flagged as Synchronized and sends it all out at once - using a SyncWrite instruction. - - If the network is 'Stopped', no data is sent. - """ - data = None - count = 0 - for (ident, servo) in self._dynamixel_map.items(): - if servo.changed: - if not self.stopped: - count += 1 - if count == 1: - data = [] - data.append(servo.id) - data.append(servo.goal_position & 0xFF) - data.append(servo.goal_position >> 8) - data.append(servo.moving_speed & 0xFF) - data.append(servo.moving_speed >> 8) - servo.changed = False - if count != 0: - self.sync_write(defs.REGISTER.GoalPosition, count, data) - - - def broadcast_register(self, reg, value): - """Write the value of one logical register to all Dynamixels. - reg - The logical register to write. - value - The integer value to write. - - Updates the cache value of the register for all Dynamixels. - """ - for (ident, servo) in self._dynamixel_map.items(): - servo[ reg ] = value - self.write_register(DynamixelInterface.BROADCAST_ID, reg, - value, False) - - def dynamixel_id_change(self, servo, new_id): - """ Prepare for a pending change in the Id of a dynamixel - - Note: you must change the dynamixel object to new_id - """ - if new_id in self._dynamixel_map: - raise ValueError("Dynamixel Id %d already in use" % (new_id)) - del self._dynamixel_map[ servo.id ] - self._dynamixel_map[ new_id ] = servo +#!/usr/bin/env python2.6 +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * + +Dynamixel Network module + +""" + +import defs +import stream +import event_handler +import time +import dynamixel + +class DynamixelInterface(object): + """ Interface to Dynamixel CM-* """ + BROADCAST_ID = 254 + def __init__(self, strm): + """ Constructor + stream - an open Stream + """ + self._stream = strm + self.dynamixel_error = event_handler.EventHandler() + self._in_error_handler = False + self._error_count_1st_header_byte = 0 + self._error_count_2nd_header_byte = 0 + self._error_count_3rd_header_byte = 0 + self._error_count_invalid_length = 0 + self._error_count_unexpected_ident = 0 + self._error_count_unexpected_length = 0 + self._response_total_elapsed = 0 + self._response_max_elapsed = 0 + self._response_count = 0 + + @staticmethod + def error_text(error_type): + """ Returns a list of the textual representation + of the ERROR_STATUS value """ + text = [] + for key, value, description in defs.ERROR_STATUS.items(): + if (error_type & value) != 0: + text.append(description) + return text + + @staticmethod + def register_length(reg): + """ Returns the register length""" + if reg in [defs.REGISTER.ModelNumber, + defs.REGISTER.CWAngleLimit, defs.REGISTER.CCWAngleLimit, + defs.REGISTER.MaxTorque, defs.REGISTER.DownCalibration, + defs.REGISTER.UpCalibration, defs.REGISTER.GoalPosition, + defs.REGISTER.MovingSpeed, defs.REGISTER.TorqueLimit, + defs.REGISTER.CurrentPosition, defs.REGISTER.CurrentSpeed, + defs.REGISTER.CurrentLoad, defs.REGISTER.Punch]: + return 2 + else: + return 1 + @staticmethod + def register_reserved( addr ): + """ Test to see if a register is reserved """ + return addr in [0xA,0x13] + + + def enter_toss_mode(self): + """ Try to put the CM-* (CM-5, CM-530) into Toss Mode + + Returns true on success + """ + state = 0 + buf = "" + save_timeout = self._stream.read_timeout + while state < 5: + try: + if state == 0: + # send a CR and look for a response from a CM-* + # in manage mode + self._stream.write_byte('\r') + state = 1 + elif state == 1: + # look for a response from a CM-* + buf += self._stream.read_byte() + if len(buf) >= 3 and buf[-3:] == "[CM": + # a CM-* that has just been put into manage mode + # will respond to the first CR with a version + # string, e.g. "[CM-* Version 1.15]" lengthen the + # timeout because the CM-* will scan for Dynamixels + self._stream.read_timeout = 750 + state = 2 + if len(buf) >= 3 and buf[-3:] == "[CI": + # a CM-* in manage mode that has already scanned + # will respond with a prompt string containing the + # ID of the current Dynamixel, e.g. "[CID:001(0x01)]" + # restore the shorter timeout + self._stream.read_timeout = save_timeout + state = 3 + elif state == 2: + buf += self._stream.read_byte() + if len(buf) >= 3 and buf[-3:] == "[CI": + # a CM-* in manage mode that has already scanned + # will respond with a prompt string containing the + # ID of the current Dynamixel, e.g. "[CID:001(0x01)]" + # restore the shorter timeout + self._stream.read_timeout = save_timeout + state = 3 + elif state == 3: + buf += self._stream.read_byte() + if buf[-2:] == "] " or buf[-3:] == "]} ": + # found the end of the CID prompt + # put the CM-* in Toss Mode + self._stream.write_byte('t') + self._stream.write_byte('\r') + state = 4 + elif state == 4: + buf += self._stream.read_byte() + if len(buf) >= 9 and buf[-9:] == "Toss Mode": + # found the "Toss Mode" response verification + # we're good to go + state = 5 + except stream.TimeoutException: + self._stream.read_timeout = save_timeout + if state > 1: + raise Exception("CM-* detected but not in Managed Mode") + break + print "%s : %s" % (self._stream.read_timeout, + self._stream.write_timeout) + return state == 5 + + def dump_statistics(self): + """ Return a list of textual statistics + + Returns a list of strings + """ + + result = [] + if self._response_count != 0: + avg_response_time = self._response_total_elapsed / \ + float(self._response_count) + result.append("Average ms per Dynamixel response: %.1F" % \ + (avg_response_time)) + result.append("Maximum ms per Dynamixel response: %d" % \ + (self._response_max_elapsed)) + if self._error_count_1st_header_byte > 0: + result.append("1st Header Byte: %d" % \ + (self._error_count_1st_header_byte)) + if self._error_count_2nd_header_byte > 0: + result.append("2nd Header Byte: %d" % \ + (self._error_count_2nd_header_byte)) + if self._error_count_3rd_header_byte > 0: + result.append("3rd Header Byte: %d" % \ + (self._error_count_3rd_header_byte)) + if self._error_count_invalid_length > 0: + result.append("Invalid Length: %d" % \ + (self._error_count_invalid_length)) + if self._error_count_unexpected_ident > 0: + result.append("Unexpected ID: %d" % \ + (self._error_count_unexpected_ident)) + if self._error_count_unexpected_length > 0: + result.append("Unexpected Length: %d" % \ + (self._error_count_unexpected_length)) + return result + + def read_packet(self): + """Read packet + + Returns tuple (packet type identifier, packet) + """ + + retry = True + # read packet is only ever called immediately following a write + # instruction (sent packet ) so we can use this opportunity to + # time the response from dynamixel + start_time = time.time() + + # set an invalid id for error return cases + ident = 0xFF + ## + ## Status packet returned from dynamixel serve + ## 0 1 2 3 4 5 5 + data-length + ## 0xFF 0xFF id len err data checksum + + # first header byte + byte = ord(self._stream.read_byte()) + + # update statistics + end_time = time.time() + elapsed_time = (end_time - start_time) + self._response_total_elapsed += elapsed_time + self._response_count += 1 + self._response_max_elapsed = max(self._response_max_elapsed, \ + elapsed_time) + + if byte != 0xFF: + self._error_count_1st_header_byte += 1 + return (ident, None) + + # 2nd header byte + byte = ord(self._stream.read_byte()) + if byte != 0xFF: + self._error_count_2nd_header_byte += 1 + return (ident, None) + + # id or third header + byte = ord(self._stream.read_byte()) + if byte == 0xFF: + self._error_count_3rd_header_byte += 1 + byte = ord(self._stream.read_byte()) + + ident = byte + + # packet length includes data-length + 2 + length = ord(self._stream.read_byte()) + + if length < 0: + self._error_count_invalid_length += 1 + + error_status = ord(self._stream.read_byte()) + data = "" + # remove error and checksum bytes + length -= 2 + if length > 0: + while length > 0: + buf = self._stream.read(length) + length -= len(buf) + data += buf + data = [ord(byte) for byte in data] + # XXX below code/comment was in the original code + # get the checksum byte + # CONSIDER: Could validate the checksum and reject the packet + checksum = ord(self._stream.read_byte()) + # let anyone listing know about any errors reported in the packet + # use the InErrorHandler flag to avoid recursion from the + # user's handler + if error_status != 0 and not self._in_error_handler: + self._in_error_handler = True + self.dynamixel_error(self, (ident, error_status)) + self._in_error_handler = False + return (ident, data) + + def await_packet(self, ident, length): + """ Read a packet validing the id and length + Retries until a valid packet is founnd + + ident - packet identifier + length - packet length + """ + if ident == DynamixelInterface.BROADCAST_ID: + return None + while True: + (pid, data) = self.read_packet() + plen = 0 + # validate the id and length + if data: + plen = len(data) + if pid == ident and plen == length: + return data + if pid != ident: + self._error_count_unexpected_ident += 1 + if plen != length: + self._error_count_unexpected_length += 1 + + def write_instruction(self, ident, instruction, params=None): + """Send a command packet instruction + + ident - the id of the destination dynamixel or BROADCAST_ID to send to all + ins - instruction to send + param - parameters to send (list of bytes) or None + + """ + cmd = [] + # + # command packet sent to Dynamixel servo: + # 0 1 2 3 4 4 + data-length + # [0xFF] [0xFF] [id] [length] [...data...] [checksum] + # + # header bytes & id + if params == None: + params = [] + if not isinstance(params, list): + raise Exception( "Params must be a list") + cmd.append(0xFF) + cmd.append(0xFF) + cmd.append(ident) + # length is data-length + 2 + cmd.append(len(params) + 2) + cmd.append(instruction) + # parameter bytes + cmd = cmd + params + # checksum + chksum = 0 + for byte in cmd[2:]: + chksum += byte + chksum &= 0xFF + chksum = (~chksum) & 0xFF + cmd.append(chksum) + # convert from bytes to a string + cmd = ''.join([chr(c & 0xFF) for c in cmd]) + # write the string + self._stream.write(cmd) + self._stream.flush() + + def ping(self, ident): + """ Check the presence of a specific dynamixel on the network + + ident - identifier of a dynamixel to ping + """ + self.write_instruction(ident, defs.INSTRUCTION.Ping) + try: + self.await_packet(ident, 0) + except stream.TimeoutException: + return False + return True + + def _read_data(self, ident, start_address, count): + """Read register data from a Dynamixel. + + id - the id of the dynamixel to read + start_address - the starting register to read from + count - the number of bytes to read + + Note: + Some logical registers are one byte long and some are two. + The count is for the number of bytes, not the number of registers. + """ + + return_packet = None + retry = True + # the start address and count from the parameters of the command + # packet + cmd = [start_address, count] + while retry: + self.write_instruction(ident, defs.INSTRUCTION.ReadData, cmd) + try: + return_packet = self.await_packet(ident, count) + retry = False + except stream.TimeoutException: + print "TIMEOUT accessing servo ID: %d" % (ident) + self._stream.flush() + return return_packet + + def read_register(self, ident, register): + """Read the value of one logical register + + ident - the id of the Dynamixel to read + reg - logical register to read + + Returns the integer value of the logical register + + Note: + this takes into account the byte length of the logical + register""" + data = self._read_data(ident, register, + DynamixelInterface.register_length(register)) + if len(data) == 1: + return data[0] + return (data[1] << 8) + data[0] + + def read_registers(self, ident, first_register, last_register): + + """ Read the values of multiple logical registers + + ident - the id of the dynamixel + first_register - first logical register to read/address + last_register - the last logical register to read/address + + returns: + a list of register values + + Note: + this function takes into account the byte length of the + logical register + """ + # this function was cleaned up for clarity + + register_length = DynamixelInterface.register_length(last_register) + # calc number of bytes as delta based on addresses + byte_count = last_register + register_length - first_register + + # read data from servo + data = self._read_data(ident, first_register, byte_count) + if len( data ) != byte_count: + raise Exception( "Data received (%d) shorter than requested data (%d)" % (len(data), byte_count)) + # resulting values + result = [] + + regs = defs.REGISTER.values() + regs.sort() + # index of first and last register + first = regs.index( first_register ) + last = regs.index( last_register ) + + # offset to read from + offset = 0 + for i in xrange( first, last + 1 ): + reg = regs[ i ] + # calc the length; note this skips reserved registers + length = DynamixelInterface.register_length( reg ) + # calc offset + offset = reg - first_register + # reconstruct the value + if length == 1: + result.append(data[ offset ]) + else: + result.append((data[ offset + 1] << 8) + \ + data[offset]) + return result + + def write_data(self, ident, start_address, params, deferred): + """Write register data + + ident - dynamixel to write to + start_address - the starting register to write to + params - list of bytes to be written + deferred - if true the dynamixel will store the request + until the action command is received + """ + if not isinstance(params, list): + raise Exception( "Params must be a list") + cmd = [] + cmd.append(start_address) + cmd = cmd + params + inst = None + if deferred: + inst = defs.INSTRUCTION.RegWrite + else: + inst = defs.INSTRUCTION.WriteData + self.write_instruction(ident, inst, cmd) + if not deferred: + self.await_packet(ident, 0) + + def write_register(self, ident, register, value, deferred): + """Write data to one logical register + + ident - dynamixel to write to + register - the register to write to + value - the integer value to write + deferred - if true the dynamixel will store the request until the action + command is received + """ + if not isinstance( value, int): + raise ValueError( "Expected value to by an integer") + if DynamixelInterface.register_length(register) == 1: + self.write_data(ident, register, [ value ], deferred) + else: + values = [ value & 0xFF, (value >> 8) & 0xFF ] + self.write_data(ident, register, values, deferred) + + def action(self): + """Broadcasta an action instruction for all dynamixels + with deferred writes pending + + """ + self.write_instruction(DynamixelInterface.BROADCAST_ID, + defs.INSTRUCTION.Action, None) + + def sync_write(self, start_address, number_of_dynamixels, params): + """ Write to multiple registers on multiple Dynamixels + using one instruction. + + start_address = starting register to write to + number_of_dynamixels - the number of dynamixels being addressed + params - the data being written including the id and data for each + dynamixel + + Note: + This function provides the most efficient way of updating the same + registers on each of many different Dynaixels with different values + at the same time. + + The length of the 'parms' data will determine the number of sequential + registers being written to. + + For each Dynamixel the 'parms' data must include the id followed + by the register data. + """ + if len(params) % number_of_dynamixels != 0: + raise ValueError("Dynamixel SyncWrite params length error") + length = len(params) / number_of_dynamixels - 1 + cmd = [] + cmd.append(start_address) + cmd.append(length) + cmd = cmd + params + self.write_instruction(DynamixelInterface.BROADCAST_ID, + defs.INSTRUCTION.SyncWrite, cmd) + + + def scan_ids(self, start_id, end_id): + """Determine which ids are present + start_id - first id to start (must be less than end_id) + end_id - last id to search (0-253) + + Returns a list of ids on the network + + Scanning for all possible IDs (0 thru 254) can be time consuming. + So if the range can be constrained to predetermined values it can + speed up the process. + + throws ValueError on arguments out of range + """ + if end_id > 253 or end_id < 0: + raise ValueError("end_id must be 0 to 233") + if start_id > end_id or start_id < 0: + raise ValueError("start_id must be 0 to end_id") + ids = [] + for ident in xrange(start_id, end_id + 1): + if self.ping(ident): + ids.append(ident) + return ids + +class DynamixelNetwork (DynamixelInterface): + """ An abstract model of a Dynamixel network represented as a + collection of Dynamixel objects. + """ + + def __init__(self, strm): + """ The Constructor + stream - the stream to exchange command and status packets with + the dynamixel network + """ + DynamixelInterface.__init__(self, strm) + self._stopped = False + self._dynamixel_map = {} + + def __getitem__(self, ident): + """ array access to the dynamixels, indexed by id + + ident - id to retrieve + + returns the dynamixel object with that id or None if none + present on the network + """ + if ident in self._dynamixel_map: + return self._dynamixel_map[ ident ] + else: + return None + + def get_dynamixels(self): + """ A list of dynamixels present on the network """ + return self._dynamixel_map.values() + + dynamixels = property(get_dynamixels) + + def scan(self, start_id, end_id): + """ + Scan the network to discover the Dynamixels present. + + start_id - the id for the start of the search + end_id - the id for the end of the search + + Note: + function builds an internal list of Dynamixels present on the network. + Typically call this function only once per DynamixelNetwork instance + it will rebuild the list and create new Dynamixel instances to fill it, + any previously retrieved Dynamixels. + for all possible IDs (0 thru 254) can be time consuming. + So if the range can be constrained to predetermined values it can + speed up the process. + """ + self._dynamixel_map = {} + ids = self.scan_ids(start_id, end_id) + for ident in ids: + self._dynamixel_map[ ident ] = dynamixel.Dynamixel(ident, self) + + + def _get_stopped(self): + """Get if the dynamixels are stopped""" + return self._stopped + + def _set_stopped(self, value): + """Stop all dynamixels and prevent further movement + activity for the dynamixels that are synchronized + """ + if value: + for (ident, servo) in self._dynamixel_map.items(): + servo.stop() + self.synchronize() + self._stopped = value + + stopped = property(_get_stopped, _set_stopped) + + def synchronize(self): + """Send GoalPosition and MovingSpeed data for all + Dynamixels in Synchronized mode. + + This function collects all the changed GoalPosition and + MovingSpeed data that has been stored for each Dynamixel + flagged as Synchronized and sends it all out at once + using a SyncWrite instruction. + + If the network is 'Stopped', no data is sent. + """ + data = None + count = 0 + for (ident, servo) in self._dynamixel_map.items(): + if servo.changed: + if not self.stopped: + count += 1 + if count == 1: + data = [] + data.append(servo.id) + data.append(servo.goal_position & 0xFF) + data.append(servo.goal_position >> 8) + data.append(servo.moving_speed & 0xFF) + data.append(servo.moving_speed >> 8) + servo.changed = False + if count != 0: + self.sync_write(defs.REGISTER.GoalPosition, count, data) + + + def broadcast_register(self, reg, value): + """Write the value of one logical register to all Dynamixels. + reg - The logical register to write. + value - The integer value to write. + + Updates the cache value of the register for all Dynamixels. + """ + for (ident, servo) in self._dynamixel_map.items(): + servo[ reg ] = value + self.write_register(DynamixelInterface.BROADCAST_ID, reg, + value, False) + + def dynamixel_id_change(self, servo, new_id): + """ Prepare for a pending change in the Id of a dynamixel + + Note: you must change the dynamixel object to new_id + """ + if new_id in self._dynamixel_map: + raise ValueError("Dynamixel Id %d already in use" % (new_id)) + del self._dynamixel_map[ servo.id ] + self._dynamixel_map[ new_id ] = servo diff --git a/dynamixel/echo_stream.py b/dynamixel/echo_stream.py old mode 100755 new mode 100644 index 1754119..f00bdea --- a/dynamixel/echo_stream.py +++ b/dynamixel/echo_stream.py @@ -1,101 +1,101 @@ -#!/usr/bin/env python -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * - -Echo Stream module - -""" - -import stream -import event_handler - -class EchoStream( stream.Stream ): - """A wrapper for a stream class that also has a event handler for echoing - """ - def __init__( self, strm ): - """ Constructor - strm - stream to wrap - """ - stream.Stream.__init__( self ) - self.writing = False - self.echo_byte = 0 - self._stream = strm - self.echo = event_handler.EventHandler() - - def echo_write( self, byte ): - """ Call the echo event handler for a write """ - self.writing = True - self.echo_byte = byte - self.echo( self, None ) - - def echo_read( self, byte ): - """ Call the echo event handler for a read """ - self.writing = False - self.echo_byte = byte - self.echo( self, None ) - - def flush( self ): - """ Flush the wrapped stream """ - self._stream.Flush() - - def read( self, count ): - """ Read up to count bytes from the stream - - count - maximum number of bytes to read - """ - buf = self._stream.Read( count ) - for char in buf: - self.echo_read( char ) - return buf - - def write( self, buf ): - """ Write the given buffer to the wrapped stream - - buf - string or list of bytes to write """ - if isinstance( buf, list ): - buf = ''.join( [chr( c ) for c in buf] ) - for char in buf: - self.echo_write( char ) - self._stream.write( buf ) - - def read_byte( self ): - """ Read a byte from the wrapped stream - """ - return self.read( 1 ) - - def write_byte( self, byte ): - """ Write a byte to the wrapped stream - byte - one character string - """ - self.write( byte ) - - def get_read_timeout( self ): - """ Get the read timeout """ - return self._stream.read_timeout - - def set_read_timeout( self, value ): - """ Set the read timeout """ - self._stream.read_timeout = value - - def get_write_timeout( self ): - """ Get the write timeout """ - return self._stream.read_timeout - - def set_write_timeout( self, value ): - """ Set the write timeout """ - self._stream.read_timeout = value - +#!/usr/bin/env python +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * + +Echo Stream module + +""" + +import stream +import event_handler + +class EchoStream( stream.Stream ): + """A wrapper for a stream class that also has a event handler for echoing + """ + def __init__( self, strm ): + """ Constructor + strm - stream to wrap + """ + stream.Stream.__init__( self ) + self.writing = False + self.echo_byte = 0 + self._stream = strm + self.echo = event_handler.EventHandler() + + def echo_write( self, byte ): + """ Call the echo event handler for a write """ + self.writing = True + self.echo_byte = byte + self.echo( self, None ) + + def echo_read( self, byte ): + """ Call the echo event handler for a read """ + self.writing = False + self.echo_byte = byte + self.echo( self, None ) + + def flush( self ): + """ Flush the wrapped stream """ + self._stream.Flush() + + def read( self, count ): + """ Read up to count bytes from the stream + + count - maximum number of bytes to read + """ + buf = self._stream.Read( count ) + for char in buf: + self.echo_read( char ) + return buf + + def write( self, buf ): + """ Write the given buffer to the wrapped stream + + buf - string or list of bytes to write """ + if isinstance( buf, list ): + buf = ''.join( [chr( c ) for c in buf] ) + for char in buf: + self.echo_write( char ) + self._stream.write( buf ) + + def read_byte( self ): + """ Read a byte from the wrapped stream + """ + return self.read( 1 ) + + def write_byte( self, byte ): + """ Write a byte to the wrapped stream + byte - one character string + """ + self.write( byte ) + + def get_read_timeout( self ): + """ Get the read timeout """ + return self._stream.read_timeout + + def set_read_timeout( self, value ): + """ Set the read timeout """ + self._stream.read_timeout = value + + def get_write_timeout( self ): + """ Get the write timeout """ + return self._stream.read_timeout + + def set_write_timeout( self, value ): + """ Set the write timeout """ + self._stream.read_timeout = value + diff --git a/dynamixel/enumeration.py b/dynamixel/enumeration.py old mode 100755 new mode 100644 index 8ea9615..830a911 --- a/dynamixel/enumeration.py +++ b/dynamixel/enumeration.py @@ -1,118 +1,116 @@ -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * - -Enumeration - -Analog to the enum type in other languages - -""" - -class Enumeration( object ): - """ An Enumeration Class """ - def __init__( self, enums ): - """ Default Constructor - mapping -- (key, value[, description) tuple key must be a string - should start with a capital letter""" - self._values = {} - self._descriptions = {} - values = set() - for enum in enums: - key, value, description = None, None, None - # deconstruct parameters - if len( enum ) == 2: - key, value = enum - else: - key, value, description = enum - # check for duplicate data - if value in values: - raise Exception( "Duplicate Value in Enumeration" ) - # store persistant data - self._values[ key ] = value - self._descriptions[ key ] = description - # add to value set to ensure non-duplicate values - values.add( value ) - # populate the attributes of the class - for key, value in self._values.items(): - object.__setattr__( self, key, value ) - def __getitem__( self, key ): - """ Lookup an item based on a string - key -- string - - Returns the value associated with the key - Throws KeyError - """ - return self._values[ key ] - def items( self ): - """ Get items in the enumeration - - Returns all members of the Enumeration in the form of - a (key, value, description) tuple list - """ - result = [] - for key in self._values: - description = None - value = self._values[ key ] - if key in self._descriptions: - description = self._descriptions[ key ] - result.append( (key, value, description) ) - return result - def key( self, value ): - """ Get the key associated with a given value - Returns key as a string - Throws KeyError if key is not associated with a value - """ - for key, val in self._values.items(): - if val == value: - return key - raise KeyError( "Cannot find associated key in enumeration" ) - def description( self, key=None, value=None ): - """ Get the description associated with the key or value - key -- key as a string - value -- value - - Note: only key or value may be specified - - Returns description string or None - throws LookupError if neither key or value are specified - throws LookupError if both key and value are specified - throws KeyError if associated description could not be found - """ - if value == None and key == None: - raise LookupError( "Must provide either a key or a value" ) - if value != None and key != None: - raise LookupError( "Must provide either a key or a value" ) - # if value is specified lookup the associated key - if value != None: - key = self.key( value ) - # find the key - for key in self._descriptions: - return self._descriptions[ key ] - raise KeyError( "Cannot find associated description" ) - def keys( self ): - """ Return the keys in the enumeration as a list of strings """ - return self._values.keys() - def values( self ): - """ Return the values in the enumeration as a list of values """ - return self._values.values() - def __iter__( self ): - """ Return the iterator for the Enumeration """ - return iter( self._values ) - def __len__( self ): - """ Return the number of elements in the enumeration """ - return len( self._values ) - def __repr__( self ): - return "Enumeration( %r )" % (self.items()) +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * + +Enumeration + +Analog to the enum type in other languages + +""" + +class Enumeration( object ): + """ An Enumeration Class """ + def __init__( self, enums ): + """ Default Constructor + mapping -- (key, value[, description) tuple key must be a string + should start with a capital letter""" + self._values = {} + self._descriptions = {} + values = set() + for enum in enums: + key, value, description = None, None, None + # deconstruct parameters + if len( enum ) == 2: + key, value = enum + else: + key, value, description = enum + # check for duplicate data + if value in values: + raise Exception( "Duplicate Value in Enumeration" ) + # store persistant data + self._values[ key ] = value + self._descriptions[ key ] = description + # add to value set to ensure non-duplicate values + values.add( value ) + # populate the attributes of the class + for key, value in self._values.items(): + object.__setattr__( self, key, value ) + def __getitem__( self, key ): + """ Lookup an item based on a string + key -- string + + Returns the value associated with the key + Throws KeyError + """ + return self._values[ key ] + def items( self ): + """ Get items in the enumeration + + Returns all members of the Enumeration in the form of + a (key, value, description) tuple list + """ + result = [] + for key in self._values: + value = self._values[ key ] + description = self._descriptions.get(key) + result.append( (key, value, description) ) + return result + def key( self, value ): + """ Get the key associated with a given value + Returns key as a string + Throws KeyError if key is not associated with a value + """ + for key, val in self._values.items(): + if val == value: + return key + raise KeyError( "Cannot find associated key in enumeration" ) + def description( self, key=None, value=None ): + """ Get the description associated with the key or value + key -- key as a string + value -- value + + Note: only key or value may be specified + + Returns description string or None + throws LookupError if neither key or value are specified + throws LookupError if both key and value are specified + throws KeyError if associated description could not be found + """ + if value == None and key == None: + raise LookupError( "Must provide either a key or a value" ) + if value != None and key != None: + raise LookupError( "Must provide either a key or a value" ) + # if value is specified lookup the associated key + if value != None: + key = self.key( value ) + # find the key + for key in self._descriptions: + return self._descriptions[ key ] + raise KeyError( "Cannot find associated description" ) + def keys( self ): + """ Return the keys in the enumeration as a list of strings """ + return self._values.keys() + def values( self ): + """ Return the values in the enumeration as a list of values """ + return self._values.values() + def __iter__( self ): + """ Return the iterator for the Enumeration """ + return iter( self._values ) + def __len__( self ): + """ Return the number of elements in the enumeration """ + return len( self._values ) + def __repr__( self ): + return "Enumeration( %r )" % (self.items()) diff --git a/dynamixel/event_handler.py b/dynamixel/event_handler.py old mode 100755 new mode 100644 index 13d6157..6d8ff52 --- a/dynamixel/event_handler.py +++ b/dynamixel/event_handler.py @@ -1,43 +1,43 @@ -#!/usr/bin/env python2.6 -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * - -Event handler module - -""" - - -class EventHandler( object ): - """ Analog to C#'s event handler """ - def __init__( self ): - """ Default constructor """ - self._observers = set( [] ) - - def __call__( self, sender, args ): - """ Call all observering methods """ - for observer in self._observers: - observer( sender, args ) - - def __iadd__( self, listener ): - """ Add a listener """ - self._observers.add( listener ) - return self - - def __isub__( self, listener ): - """ Remove a listener """ - self._observers.remove( listener ) +#!/usr/bin/env python2.6 +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * + +Event handler module + +""" + + +class EventHandler( object ): + """ Analog to C#'s event handler """ + def __init__( self ): + """ Default constructor """ + self._observers = set( [] ) + + def __call__( self, sender, args ): + """ Call all observering methods """ + for observer in self._observers: + observer( sender, args ) + + def __iadd__( self, listener ): + """ Add a listener """ + self._observers.add( listener ) + return self + + def __isub__( self, listener ): + """ Remove a listener """ + self._observers.remove( listener ) return self \ No newline at end of file diff --git a/dynamixel/serial_stream.py b/dynamixel/serial_stream.py old mode 100755 new mode 100644 index 7ae8f66..5856d0e --- a/dynamixel/serial_stream.py +++ b/dynamixel/serial_stream.py @@ -1,76 +1,76 @@ -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * - -An implementation of the Steam interface using pyserial -""" - -try: - # PySerial Module - import serial -except: - print "This module requires the pySerial to be installed \ -to use the Serial Stream" - - -from stream import Stream, TimeoutException - -class SerialStream( Stream ): - """ A stream using the pyserial interface """ - def __init__( self, **kw ): - """ Default constructor - Creates and opens a serial port - - **kw - keyword arguments to pass into a pySerial serial port - """ - Stream.__init__( self ) - self.port = serial.Serial( **kw ) - #self.port.open() # Seems to cause "permission denied" with PySerial 2.x - def flush( self ): - """ Flush the port """ - self.port.flush() - def read( self, count ): - """ Read up to count bytes - - count - maximum number of bytes to read - throws TimeoutException if read returns empty or None - """ - buf = self.port.read( count ) - if len( buf ) == 0: - raise TimeoutException() - return buf - - def write( self, buf ): - """ Write buf to the port - buf - string or list of bytes - """ - if isinstance( buf, list ): - buf = ''.join( [chr( c ) for c in buf] ) - self.port.write( buf ) - def get_read_timeout( self ): - """ Get the read timeout """ - return self.port.timeout - def set_read_timeout( self, value ): - """ Set the read timeout """ - self.port.timeout = value - def get_write_timeout( self ): - """ Get the write timeout """ - return self.port.writeTimeout - def set_write_timeout( self, value ): - """ Set the write timeout """ - self.port.writeTimeout = value - def close(self): - self.port.close() +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * + +An implementation of the Steam interface using pyserial +""" + +try: + # PySerial Module + import serial +except: + print "This module requires the pySerial to be installed \ +to use the Serial Stream" + + +from stream import Stream, TimeoutException + +class SerialStream( Stream ): + """ A stream using the pyserial interface """ + def __init__( self, **kw ): + """ Default constructor + Creates and opens a serial port + + **kw - keyword arguments to pass into a pySerial serial port + """ + Stream.__init__( self ) + self.port = serial.Serial( **kw ) + #self.port.open() # Seems to cause "permission denied" with PySerial 2.x + def flush( self ): + """ Flush the port """ + self.port.flush() + def read( self, count ): + """ Read up to count bytes + + count - maximum number of bytes to read + throws TimeoutException if read returns empty or None + """ + buf = self.port.read( count ) + if len( buf ) == 0: + raise TimeoutException() + return buf + + def write( self, buf ): + """ Write buf to the port + buf - string or list of bytes + """ + if isinstance( buf, list ): + buf = ''.join( [chr( c ) for c in buf] ) + self.port.write( buf ) + def get_read_timeout( self ): + """ Get the read timeout """ + return self.port.timeout + def set_read_timeout( self, value ): + """ Set the read timeout """ + self.port.timeout = value + def get_write_timeout( self ): + """ Get the write timeout """ + return self.port.writeTimeout + def set_write_timeout( self, value ): + """ Set the write timeout """ + self.port.writeTimeout = value + def close(self): + self.port.close() diff --git a/dynamixel/stream.py b/dynamixel/stream.py old mode 100755 new mode 100644 index 23c4792..8a1276b --- a/dynamixel/stream.py +++ b/dynamixel/stream.py @@ -1,76 +1,76 @@ -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * -Stream interface - -""" - -class Stream( object ): - """ Stream Interface class""" - def flush( self ): - """ Flush the stream""" - raise NotImplementedError() - def read( self, count ): - """ Reads from a given stream - - count - number of bytes to read - - Returns a string of length count or less - """ - raise NotImplementedError() - def write( self, buf ): - """ Writes to a stream - - buf - a string - """ - raise NotImplementedError() - def write_byte( self, byte ): - """ Write a single byte - - byte - a string """ - return self.write( byte ) - def read_byte( self ): - """ Read a single byte - - Returns a string """ - return self.read( 1 ) - def get_read_timeout( self ): - """ Get the Read Timeout """ - raise NotImplementedError() - def set_read_timeout( self, value ): - """ Set the Read Timeout """ - raise NotImplementedError() - def get_write_timeout( self ): - """ Get the Write Timeout """ - raise NotImplementedError() - def set_write_timeout( self, value ): - """ Set the Write Timeout """ - raise NotImplementedError() - read_timeout = property( get_read_timeout, set_read_timeout ) - write_timeout = property( get_write_timeout, set_write_timeout ) - -class TimeoutException( Exception ): - """ Timeout exception """ - def __init__( self, msg=None ): - """ Default constructor """ - Exception.__init__( self ) - self.msg = msg - def __str__( self ): - """ Get the string representation of the Exception """ - return repr( self ) - def __repr__( self ): - """ Get the object representation of the Exception """ +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * +Stream interface + +""" + +class Stream( object ): + """ Stream Interface class""" + def flush( self ): + """ Flush the stream""" + raise NotImplementedError() + def read( self, count ): + """ Reads from a given stream + + count - number of bytes to read + + Returns a string of length count or less + """ + raise NotImplementedError() + def write( self, buf ): + """ Writes to a stream + + buf - a string + """ + raise NotImplementedError() + def write_byte( self, byte ): + """ Write a single byte + + byte - a string """ + return self.write( byte ) + def read_byte( self ): + """ Read a single byte + + Returns a string """ + return self.read( 1 ) + def get_read_timeout( self ): + """ Get the Read Timeout """ + raise NotImplementedError() + def set_read_timeout( self, value ): + """ Set the Read Timeout """ + raise NotImplementedError() + def get_write_timeout( self ): + """ Get the Write Timeout """ + raise NotImplementedError() + def set_write_timeout( self, value ): + """ Set the Write Timeout """ + raise NotImplementedError() + read_timeout = property( lambda self : self.get_read_timeout(), lambda self , v: self.set_read_timeout(v) ) + write_timeout = property( lambda self : self.get_write_timeout(), lambda self, v : self.set_write_timeout(v) ) + +class TimeoutException( Exception ): + """ Timeout exception """ + def __init__( self, msg=None ): + """ Default constructor """ + Exception.__init__( self ) + self.msg = msg + def __str__( self ): + """ Get the string representation of the Exception """ + return repr( self ) + def __repr__( self ): + """ Get the object representation of the Exception """ return "TimeoutError( %r )" % (self.msg) \ No newline at end of file diff --git a/dynamixel/tests.py b/dynamixel/tests.py old mode 100755 new mode 100644 index 983aab8..7d62ab2 --- a/dynamixel/tests.py +++ b/dynamixel/tests.py @@ -1,442 +1,442 @@ -""" -This is a Python version of the ForestMoon Dynamixel library originally -written in C# by Scott Ferguson. - -The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) -for the Pi Robot Project which lives at http://www.pirobot.org. - -The original license for the C# version is as follows: - -This software was written and developed by Scott Ferguson. -The current version can be found at http://www.forestmoon.com/Software/. -This free software is distributed under the GNU General Public License. -See http://www.gnu.org/licenses/gpl.html for details. -This license restricts your usage of the software in derivative works. - -* * * * * - -Dynamixel unit test module - -""" - -import unittest - -import dynamixel -import dynamixel_network -import event_handler -import stream -import defs - -class MockStream( stream.Stream ): - """ Mock Stream implementation """ - - def __init__( self, data = "" ): - """ Constructor """ - stream.Stream.__init__( self ) - self.ibuffer = data - self.obuffer = "" - def append(self, data ): - """ Append data to incoming stream """ - self.ibuffer += data - - def flush( self ): - """ Flush the stream""" - pass - def read( self, count ): - """ Reads from a given stream - - count - number of bytes to read - - Returns a string of length count or less - """ - if len( self.ibuffer ) > 0: - to_read = min( count, len( self.ibuffer ) ) - buf = self.ibuffer[:to_read] - self.ibuffer = self.ibuffer[to_read:] - return buf - else: - raise stream.TimeoutException() - def write( self, buf ): - """ Writes to a stream - - buf - a string - """ - self.obuffer += buf - def write_byte( self, byte ): - """ Write a single byte - - byte - a string """ - return self.write( byte ) - def read_byte( self ): - """ Read a single byte - - Returns a string """ - return self.read( 1 ) - def get_read_timeout( self ): - """ Get the Read Timeout """ - return 0 - def set_read_timeout( self, value ): - """ Set the Read Timeout """ - pass - def get_write_timeout( self ): - """ Get the Write Timeout """ - return 0 - def set_write_timeout( self, value ): - """ Set the Write Timeout """ - pass - - read_timeout = property( get_read_timeout, set_read_timeout ) - write_timeout = property( get_write_timeout, set_write_timeout ) - -def make_packet( ident, err, data, length = None ): - """ Create a packet for testing """ - ## - ## Status packet returned from dynamixel serve - ## 0 1 2 3 4 5 5 + data-length - ## 0xFF 0xFF id len err/inst data checksum - if length == None: - length = len( data ) + 2 - obuffer = '\xFF\xFF' + chr( ident ) + chr( length ) + chr( err ) - chksum = 0 - chksum = (chksum + ident) & 0xFF - chksum = (chksum + length) & 0xFF - chksum = (chksum + err) & 0xFF - for byte in data: - chksum = (chksum + byte) & 0xFF - obuffer += chr( byte ) - chksum = (~chksum) & 0xFF - obuffer += chr( chksum ) - return obuffer - - -class EventHandlerTest(unittest.TestCase): - """ Test cases for the EventHandler """ - def setUp( self ): - """ Setup """ - self.is_called = False - self.is_called = False - self.is_called_2 = False - self.obj = None - self.obj2 = None - self.sender = None - self.sender2 = None - - - def test_add( self ): - """ Test a single add and call""" - self.handler = event_handler.EventHandler() - self.handler += self.do_call - self.clear() - self.handler( self, None ) - self.assertTrue( self.is_called ) - self.assertTrue( self.sender == self) - self.assertTrue( self.obj == None ) - - def test_multi_add( self ): - """ Test Multi-add and call """ - self.handler = event_handler.EventHandler() - self.handler += self.do_call - self.handler += self.do_call2 - self.clear() - self.handler( self, None ) - self.assertTrue( self.is_called and self.is_called_2) - self.assertTrue( self.sender == self) - self.assertTrue( self.obj == None ) - self.assertTrue( self.sender2 == self) - self.assertTrue( self.obj2 == None ) - - def test_sub( self ): - """ Test Multi-add and call """ - self.handler = event_handler.EventHandler() - self.handler += self.do_call - self.handler += self.do_call2 - self.handler -= self.do_call2 - self.clear() - self.handler( self, None ) - self.assertTrue( self.is_called and not self.is_called_2) - self.assertTrue( self.sender == self) - self.assertTrue( self.obj == None ) - - def clear( self ): - """ Reset test """ - self.is_called = False - self.is_called_2 = False - self.obj = None - self.obj2 = None - self.sender = None - self.sender2 = None - - def do_call( self, sender, obj ): - """ Target method """ - self.is_called = True - self.sender = sender - self.obj = obj - - def do_call2( self, sender, obj ): - """ Target method """ - self.is_called_2 = True - self.sender2 = sender - self.obj2 = obj - -class DynamixelInterfaceTest(unittest.TestCase): - """ Test cases for the DynamixelInterface """ - def setUp( self ): - """ Reset the interface """ - self.reset() - def reset( self ): - """ Reset the interface """ - self.has_errors = False - self.error_codes = [] - - def test_read_packet( self ): - "Ensure read packet reads the appropriate data" - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - self.reset() - # verify packet w/o errors - packet = make_packet(1, 0, [0, 1, 2, 3] ) - istream.append( packet ) - - (ident, data) = iface.read_packet() - self.assertTrue( ident == 1) - self.assertTrue( data == [0, 1, 2, 3] ) - self.assertTrue( self.has_errors == False ) - # verify packet w/ errors - self.reset() - packet = make_packet(1, defs.ERROR_STATUS.InputVoltage, [0, 1, 2, 3] ) - istream.append( packet ) - (ident, data) = iface.read_packet() - self.assertTrue( ident == 1) - self.assertTrue( data == [0, 1, 2, 3] ) - self.assertTrue( self.has_errors == True ) - - def test_read_ping( self ): - "Test ping also tests await packet and write instruction" - - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - # test for no response - self.reset() - # verify ping failed - self.assertFalse( iface.ping( 1 ) ) - chksum = (~(1 + 2 + 1)) & 0xFF - # verify outgoing packet is well formed - self.assertEqual( istream.obuffer, "\xFF\xFF\x01\x02\x01\xfb" ) - # create mock ping response - # create an ignored packet - istream.append( make_packet( 0, 0, [])) - # create the ping response - istream.append( make_packet( 1, 0, [])) - # ping again and verify ping came back - self.assertTrue( iface.ping( 1 ) ) - - def test_scan_ids(self ): - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - # test for no response - self.reset() - # verify ping failed - istream.append( make_packet( 1, 0, [])) - self.assertTrue( iface.scan_ids( 1, 10 ) == [1] ) - - def test_read_data( self ): - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - # test for no response - self.reset() - # verify read data - istream.append( make_packet( 1, 0, [0x20])) - result = iface._read_data( 1, 0x2b, 1 ) - self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0x1,0x4,0x2,0x2b,0x1,0xcc])))) - self.assertEqual( result, [0x20]) - - def test_read_register( self ): - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - # test for no response - self.reset() - # verify read data - istream.append( make_packet( 1, 0, [0x20])) - result = iface.read_register( 1, 0x2b) - self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0x1,0x4,0x2,0x2b,0x1,0xcc])))) - self.assertEqual( result, 0x20) - - def test_read_registers( self ): - """ Full range test for read registers""" - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - self.reset() - istream.append( make_packet( 1, 0, [0x20])) - # dummy packet full range of eeprom - istream.append( make_packet( 1, 0, range(0,50) )) - result = iface.read_registers( 1, defs.REGISTER.ModelNumber, defs.REGISTER.Punch) - # assert only the number of registers are returned - self.assertEqual( len( result ), len(defs.REGISTER.values()) ) - # TODO possibly add test for values of registers in future - - def test_write_data( self ): - """ Test writing data to eeprom """ - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - self.reset() - # verify write as per Spec - iface.write_data( dynamixel_network.DynamixelInterface.BROADCAST_ID, 0x3,[0x1], False ) - self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0xfe,0x4,0x3,0x3,0x1,0xf6])))) - def test_write_register( self ): - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - self.reset() - - # verify write as per variant of spec - iface.write_register( 0x1, 0x3,0x1, True ) - self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0x1,0x4,0x4,0x3,0x1,0xf2])))) - - def test_action( self ): - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - self.reset() - # verify outgoing action command - iface.action( ) - self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0xfe,0x2,0x5,0xfa])))) - - def test_sync_write( self ): - istream = MockStream() - iface = dynamixel_network.DynamixelInterface( istream ) - iface.dynamixel_error += self.error_handler - - self.reset() - # verify outgoing action command - iface.sync_write(0x1e, 4, [0X00, 0X10, 0X00, 0X50, 0X01, - 0X01, 0X20, 0X02, 0X60, 0X03, 0X02, - 0X30, 0X00, 0X70, 0X01, 0X03, - 0X20, 0X02, 0X80, 0X03]) - expected = ''.join((map(chr, [0XFF, 0XFF, 0XFE, 0X18, 0X83, 0X1E, - 0X04, 0X00, 0X10, 0X00, 0X50, 0X01, - 0X01, 0X20, 0X02, 0X60, 0X03, 0X02, - 0X30, 0X00, 0X70, 0X01, 0X03, - 0X20, 0X02, 0X80, 0X03, 0X12]))) - self.assertEqual( istream.obuffer, expected) - - - - def error_handler(self, sender, (ident, error_status)): - """ Error handler hook """ - self.has_errors = True - self.error_codes.append( error_status ) - - -class DynamixelNetwork(unittest.TestCase): - """ Test cases for the DynamixelNetwork """ - def setUp( self ): - """ Reset the interface """ - self.reset() - def reset( self ): - """ Reset the interface """ - self.has_errors = False - self.error_codes = [] - def test_scan(self): - """ Test a scan of registers""" - istream = MockStream() - iface = dynamixel_network.DynamixelNetwork( istream ) - iface.dynamixel_error += self.error_handler - self.reset() - # ping response - istream.append( make_packet( 1, 0, [])) - # read register response - istream.append( make_packet( 1, 0, [1,2,3,4])) - # scan for one servo - iface.scan( 1, 1 ) - # get by id - dyn = iface[1] - # assert initial read registers are ok - self.assertEqual( dyn.moving_speed, 1027 ) - self.assertEqual( dyn.goal_position, 513 ) - self.assertFalse(self.has_errors ) - - def test_reset(self): - """ Test a reset call""" - istream = MockStream() - iface = dynamixel_network.DynamixelNetwork( istream ) - iface.dynamixel_error += self.error_handler - self.reset() - # ping response - istream.append( make_packet( 1, 0, [])) - # read register response - istream.append( make_packet( 1, 0, [1,2,3,4])) - # scan for one servo - iface.scan( 1, 1 ) - # get by id - dyn = iface[1] - istream.obuffer='' - istream.append( make_packet(0x01,0x00,[])) - dyn.reset(1) - self.assertEqual( istream.obuffer, ''.join(map(chr,[0xff,0xff,0x1,0x2,0x6,0xf6]))) - self.assertFalse(self.has_errors ) - - def test_set_attr(self): - """ Test setting an attribute""" - istream = MockStream() - iface = dynamixel_network.DynamixelNetwork( istream ) - iface.dynamixel_error += self.error_handler - self.reset() - # ping response - istream.append( make_packet( 1, 0, [])) - # read register response - istream.append( make_packet( 1, 0, [1,2,3,4])) - # scan for one servo - iface.scan( 1, 1 ) - # get by id - dyn = iface[1] - dyn.synchronized = False - istream.obuffer='' - istream.append( make_packet(0x01,0x00,[])) - dyn.goal_position = 0x3FF - self.assertEqual( istream.obuffer, ''.join(map(chr,[0xFF, 0xFF, 0x01, 0x05, 0x03, 0x1E, 0xFF, 0x03, 0xD6]))) - self.assertFalse(self.has_errors ) - - def test_get_attr(self): - """ Test getting an attribute""" - istream = MockStream() - iface = dynamixel_network.DynamixelNetwork( istream ) - iface.dynamixel_error += self.error_handler - self.reset() - # ping response - istream.append( make_packet( 1, 0, [])) - # read register response - istream.append( make_packet( 1, 0, [1,2,3,4])) - # scan for one servo - iface.scan( 1, 1 ) - # get by id - dyn = iface[1] - dyn.synchronized = False - istream.obuffer='' - istream.append( make_packet(0x01,0x00,[0x20,0x00])) - value = dyn.current_position - self.assertEqual( value, 0x20) - - - - def error_handler(self, sender, (ident, error_status)): - """ Error handler hook """ - self.has_errors = True - self.error_codes.append( error_status ) - -if __name__ == '__main__': - unittest.main() +""" +This is a Python version of the ForestMoon Dynamixel library originally +written in C# by Scott Ferguson. + +The Python version was created by Patrick Goebel (mailto:patrick@pirobot.org) +for the Pi Robot Project which lives at http://www.pirobot.org. + +The original license for the C# version is as follows: + +This software was written and developed by Scott Ferguson. +The current version can be found at http://www.forestmoon.com/Software/. +This free software is distributed under the GNU General Public License. +See http://www.gnu.org/licenses/gpl.html for details. +This license restricts your usage of the software in derivative works. + +* * * * * + +Dynamixel unit test module + +""" + +import unittest + +import dynamixel +import dynamixel_network +import event_handler +import stream +import defs + +class MockStream( stream.Stream ): + """ Mock Stream implementation """ + + def __init__( self, data = "" ): + """ Constructor """ + stream.Stream.__init__( self ) + self.ibuffer = data + self.obuffer = "" + def append(self, data ): + """ Append data to incoming stream """ + self.ibuffer += data + + def flush( self ): + """ Flush the stream""" + pass + def read( self, count ): + """ Reads from a given stream + + count - number of bytes to read + + Returns a string of length count or less + """ + if len( self.ibuffer ) > 0: + to_read = min( count, len( self.ibuffer ) ) + buf = self.ibuffer[:to_read] + self.ibuffer = self.ibuffer[to_read:] + return buf + else: + raise stream.TimeoutException() + def write( self, buf ): + """ Writes to a stream + + buf - a string + """ + self.obuffer += buf + def write_byte( self, byte ): + """ Write a single byte + + byte - a string """ + return self.write( byte ) + def read_byte( self ): + """ Read a single byte + + Returns a string """ + return self.read( 1 ) + def get_read_timeout( self ): + """ Get the Read Timeout """ + return 0 + def set_read_timeout( self, value ): + """ Set the Read Timeout """ + pass + def get_write_timeout( self ): + """ Get the Write Timeout """ + return 0 + def set_write_timeout( self, value ): + """ Set the Write Timeout """ + pass + + read_timeout = property( get_read_timeout, set_read_timeout ) + write_timeout = property( get_write_timeout, set_write_timeout ) + +def make_packet( ident, err, data, length = None ): + """ Create a packet for testing """ + ## + ## Status packet returned from dynamixel serve + ## 0 1 2 3 4 5 5 + data-length + ## 0xFF 0xFF id len err/inst data checksum + if length == None: + length = len( data ) + 2 + obuffer = '\xFF\xFF' + chr( ident ) + chr( length ) + chr( err ) + chksum = 0 + chksum = (chksum + ident) & 0xFF + chksum = (chksum + length) & 0xFF + chksum = (chksum + err) & 0xFF + for byte in data: + chksum = (chksum + byte) & 0xFF + obuffer += chr( byte ) + chksum = (~chksum) & 0xFF + obuffer += chr( chksum ) + return obuffer + + +class EventHandlerTest(unittest.TestCase): + """ Test cases for the EventHandler """ + def setUp( self ): + """ Setup """ + self.is_called = False + self.is_called = False + self.is_called_2 = False + self.obj = None + self.obj2 = None + self.sender = None + self.sender2 = None + + + def test_add( self ): + """ Test a single add and call""" + self.handler = event_handler.EventHandler() + self.handler += self.do_call + self.clear() + self.handler( self, None ) + self.assertTrue( self.is_called ) + self.assertTrue( self.sender == self) + self.assertTrue( self.obj == None ) + + def test_multi_add( self ): + """ Test Multi-add and call """ + self.handler = event_handler.EventHandler() + self.handler += self.do_call + self.handler += self.do_call2 + self.clear() + self.handler( self, None ) + self.assertTrue( self.is_called and self.is_called_2) + self.assertTrue( self.sender == self) + self.assertTrue( self.obj == None ) + self.assertTrue( self.sender2 == self) + self.assertTrue( self.obj2 == None ) + + def test_sub( self ): + """ Test Multi-add and call """ + self.handler = event_handler.EventHandler() + self.handler += self.do_call + self.handler += self.do_call2 + self.handler -= self.do_call2 + self.clear() + self.handler( self, None ) + self.assertTrue( self.is_called and not self.is_called_2) + self.assertTrue( self.sender == self) + self.assertTrue( self.obj == None ) + + def clear( self ): + """ Reset test """ + self.is_called = False + self.is_called_2 = False + self.obj = None + self.obj2 = None + self.sender = None + self.sender2 = None + + def do_call( self, sender, obj ): + """ Target method """ + self.is_called = True + self.sender = sender + self.obj = obj + + def do_call2( self, sender, obj ): + """ Target method """ + self.is_called_2 = True + self.sender2 = sender + self.obj2 = obj + +class DynamixelInterfaceTest(unittest.TestCase): + """ Test cases for the DynamixelInterface """ + def setUp( self ): + """ Reset the interface """ + self.reset() + def reset( self ): + """ Reset the interface """ + self.has_errors = False + self.error_codes = [] + + def test_read_packet( self ): + "Ensure read packet reads the appropriate data" + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + self.reset() + # verify packet w/o errors + packet = make_packet(1, 0, [0, 1, 2, 3] ) + istream.append( packet ) + + (ident, data) = iface.read_packet() + self.assertTrue( ident == 1) + self.assertTrue( data == [0, 1, 2, 3] ) + self.assertTrue( self.has_errors == False ) + # verify packet w/ errors + self.reset() + packet = make_packet(1, defs.ERROR_STATUS.InputVoltage, [0, 1, 2, 3] ) + istream.append( packet ) + (ident, data) = iface.read_packet() + self.assertTrue( ident == 1) + self.assertTrue( data == [0, 1, 2, 3] ) + self.assertTrue( self.has_errors == True ) + + def test_read_ping( self ): + "Test ping also tests await packet and write instruction" + + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + # test for no response + self.reset() + # verify ping failed + self.assertFalse( iface.ping( 1 ) ) + chksum = (~(1 + 2 + 1)) & 0xFF + # verify outgoing packet is well formed + self.assertEqual( istream.obuffer, "\xFF\xFF\x01\x02\x01\xfb" ) + # create mock ping response + # create an ignored packet + istream.append( make_packet( 0, 0, [])) + # create the ping response + istream.append( make_packet( 1, 0, [])) + # ping again and verify ping came back + self.assertTrue( iface.ping( 1 ) ) + + def test_scan_ids(self ): + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + # test for no response + self.reset() + # verify ping failed + istream.append( make_packet( 1, 0, [])) + self.assertTrue( iface.scan_ids( 1, 10 ) == [1] ) + + def test_read_data( self ): + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + # test for no response + self.reset() + # verify read data + istream.append( make_packet( 1, 0, [0x20])) + result = iface._read_data( 1, 0x2b, 1 ) + self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0x1,0x4,0x2,0x2b,0x1,0xcc])))) + self.assertEqual( result, [0x20]) + + def test_read_register( self ): + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + # test for no response + self.reset() + # verify read data + istream.append( make_packet( 1, 0, [0x20])) + result = iface.read_register( 1, 0x2b) + self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0x1,0x4,0x2,0x2b,0x1,0xcc])))) + self.assertEqual( result, 0x20) + + def test_read_registers( self ): + """ Full range test for read registers""" + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + self.reset() + istream.append( make_packet( 1, 0, [0x20])) + # dummy packet full range of eeprom + istream.append( make_packet( 1, 0, range(0,50) )) + result = iface.read_registers( 1, defs.REGISTER.ModelNumber, defs.REGISTER.Punch) + # assert only the number of registers are returned + self.assertEqual( len( result ), len(defs.REGISTER.values()) ) + # TODO possibly add test for values of registers in future + + def test_write_data( self ): + """ Test writing data to eeprom """ + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + self.reset() + # verify write as per Spec + iface.write_data( dynamixel_network.DynamixelInterface.BROADCAST_ID, 0x3,[0x1], False ) + self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0xfe,0x4,0x3,0x3,0x1,0xf6])))) + def test_write_register( self ): + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + self.reset() + + # verify write as per variant of spec + iface.write_register( 0x1, 0x3,0x1, True ) + self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0x1,0x4,0x4,0x3,0x1,0xf2])))) + + def test_action( self ): + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + self.reset() + # verify outgoing action command + iface.action( ) + self.assertEqual( istream.obuffer, ''.join((map(chr,[0xff,0xff,0xfe,0x2,0x5,0xfa])))) + + def test_sync_write( self ): + istream = MockStream() + iface = dynamixel_network.DynamixelInterface( istream ) + iface.dynamixel_error += self.error_handler + + self.reset() + # verify outgoing action command + iface.sync_write(0x1e, 4, [0X00, 0X10, 0X00, 0X50, 0X01, + 0X01, 0X20, 0X02, 0X60, 0X03, 0X02, + 0X30, 0X00, 0X70, 0X01, 0X03, + 0X20, 0X02, 0X80, 0X03]) + expected = ''.join((map(chr, [0XFF, 0XFF, 0XFE, 0X18, 0X83, 0X1E, + 0X04, 0X00, 0X10, 0X00, 0X50, 0X01, + 0X01, 0X20, 0X02, 0X60, 0X03, 0X02, + 0X30, 0X00, 0X70, 0X01, 0X03, + 0X20, 0X02, 0X80, 0X03, 0X12]))) + self.assertEqual( istream.obuffer, expected) + + + + def error_handler(self, sender, (ident, error_status)): + """ Error handler hook """ + self.has_errors = True + self.error_codes.append( error_status ) + + +class DynamixelNetwork(unittest.TestCase): + """ Test cases for the DynamixelNetwork """ + def setUp( self ): + """ Reset the interface """ + self.reset() + def reset( self ): + """ Reset the interface """ + self.has_errors = False + self.error_codes = [] + def test_scan(self): + """ Test a scan of registers""" + istream = MockStream() + iface = dynamixel_network.DynamixelNetwork( istream ) + iface.dynamixel_error += self.error_handler + self.reset() + # ping response + istream.append( make_packet( 1, 0, [])) + # read register response + istream.append( make_packet( 1, 0, [1,2,3,4])) + # scan for one servo + iface.scan( 1, 1 ) + # get by id + dyn = iface[1] + # assert initial read registers are ok + self.assertEqual( dyn.moving_speed, 1027 ) + self.assertEqual( dyn.goal_position, 513 ) + self.assertFalse(self.has_errors ) + + def test_reset(self): + """ Test a reset call""" + istream = MockStream() + iface = dynamixel_network.DynamixelNetwork( istream ) + iface.dynamixel_error += self.error_handler + self.reset() + # ping response + istream.append( make_packet( 1, 0, [])) + # read register response + istream.append( make_packet( 1, 0, [1,2,3,4])) + # scan for one servo + iface.scan( 1, 1 ) + # get by id + dyn = iface[1] + istream.obuffer='' + istream.append( make_packet(0x01,0x00,[])) + dyn.reset(1) + self.assertEqual( istream.obuffer, ''.join(map(chr,[0xff,0xff,0x1,0x2,0x6,0xf6]))) + self.assertFalse(self.has_errors ) + + def test_set_attr(self): + """ Test setting an attribute""" + istream = MockStream() + iface = dynamixel_network.DynamixelNetwork( istream ) + iface.dynamixel_error += self.error_handler + self.reset() + # ping response + istream.append( make_packet( 1, 0, [])) + # read register response + istream.append( make_packet( 1, 0, [1,2,3,4])) + # scan for one servo + iface.scan( 1, 1 ) + # get by id + dyn = iface[1] + dyn.synchronized = False + istream.obuffer='' + istream.append( make_packet(0x01,0x00,[])) + dyn.goal_position = 0x3FF + self.assertEqual( istream.obuffer, ''.join(map(chr,[0xFF, 0xFF, 0x01, 0x05, 0x03, 0x1E, 0xFF, 0x03, 0xD6]))) + self.assertFalse(self.has_errors ) + + def test_get_attr(self): + """ Test getting an attribute""" + istream = MockStream() + iface = dynamixel_network.DynamixelNetwork( istream ) + iface.dynamixel_error += self.error_handler + self.reset() + # ping response + istream.append( make_packet( 1, 0, [])) + # read register response + istream.append( make_packet( 1, 0, [1,2,3,4])) + # scan for one servo + iface.scan( 1, 1 ) + # get by id + dyn = iface[1] + dyn.synchronized = False + istream.obuffer='' + istream.append( make_packet(0x01,0x00,[0x20,0x00])) + value = dyn.current_position + self.assertEqual( value, 0x20) + + + + def error_handler(self, sender, (ident, error_status)): + """ Error handler hook """ + self.has_errors = True + self.error_codes.append( error_status ) + +if __name__ == '__main__': + unittest.main() diff --git a/example.py b/example.py index be2c86e..5ec6bec 100755 --- a/example.py +++ b/example.py @@ -1,3 +1,4 @@ +#!/usr/bin/python import os import dynamixel import time diff --git a/example2.py b/example2.py index 1d58d1b..b0ba6a7 100755 --- a/example2.py +++ b/example2.py @@ -1,3 +1,4 @@ +#!/usr/bin/python import os, dynamixel, time, random def actuators_moving(actuators): @@ -55,4 +56,4 @@ def actuators_moving(actuators): for actuator in myActuators: data = [actuator.cache[dynamixel.defs.REGISTER['Id']], actuator.cache[dynamixel.defs.REGISTER['CurrentPosition']]] time.sleep(0.05) - \ No newline at end of file + diff --git a/setup.py b/setup.py index 039312f..df17c8f 100755 --- a/setup.py +++ b/setup.py @@ -1,29 +1,29 @@ -#!/usr/bin/python - -from dynamixel import __version__ - -sdict = { - 'name' : 'dynamixel', - 'version' : __version__, - 'description' : "Dynamixel Servo Library", - 'url': 'http://github.com/iandanforth/pydynamixel', - 'download_url' : 'http://cloud.github.com/downloads/iandanforth/pydynamixel/dynamixel-%s.tar.gz' % __version__, - 'author' : 'Patrick Goebel', - 'author_email' : 'patrick@pirobot.org', - 'maintainer' : 'Ian Danforth', - 'maintainer_email' : 'iandanforth@gmail.com', - 'keywords' : ['dynamixel', 'robotis', 'ax-12'], - 'license' : 'GPL', - 'packages' : ['dynamixel'], - 'classifiers' : [ - 'Environment :: Console', - 'Intended Audience :: Developers', - 'Programming Language :: Python'], -} - -try: - from setuptools import setup -except ImportError: - from distutils.core import setup - -setup(**sdict) +#!/usr/bin/python + +from dynamixel import __version__ + +sdict = { + 'name' : 'dynamixel', + 'version' : __version__, + 'description' : "Dynamixel Servo Library", + 'url': 'http://github.com/iandanforth/pydynamixel', + 'download_url' : 'http://cloud.github.com/downloads/iandanforth/pydynamixel/dynamixel-%s.tar.gz' % __version__, + 'author' : 'Patrick Goebel', + 'author_email' : 'patrick@pirobot.org', + 'maintainer' : 'Ian Danforth', + 'maintainer_email' : 'iandanforth@gmail.com', + 'keywords' : ['dynamixel', 'robotis', 'ax-12'], + 'license' : 'GPL', + 'packages' : ['dynamixel'], + 'classifiers' : [ + 'Environment :: Console', + 'Intended Audience :: Developers', + 'Programming Language :: Python'], +} + +try: + from setuptools import setup +except ImportError: + from distutils.core import setup + +setup(**sdict)