#! /usr/bin/env python
#
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
from __future__ import print_function
NAME='dynparam'
import roslib; roslib.load_manifest('dynamic_reconfigure')
import rospy
import optparse
import sys
import yaml
import dynamic_reconfigure.client
def do_list():
connect()
list = dynamic_reconfigure.find_reconfigure_services()
for s in list:
print(s)
def do_set_from_parameters():
usage = """Usage: %prog set_from_parameters [options] node
Example command line:
dynparam set_from_parameters wge100_camera _camera_url:=foo
Example launch file:
<launch>
<node name="$(anon adjust-wge100_camera)" pkg="dynamic_reconfigure" type="dynparam" args="set_from_parameters wge100_camera">
<param name="camera_url" value="foo" />
<param name="brightness" value="58" />
</node>
</launch>"""
parser = optparse.OptionParser(usage=usage, prog=NAME)
add_timeout_option(parser)
options, args = parser.parse_args(myargv[2:])
if len(args) == 0:
parser.error("invalid arguments. Please specify a node name")
elif len(args) > 1:
parser.error("too many arguments")
node = args[0]
connect()
try:
params = rospy.get_param("~")
except KeyError:
print('error updating parameters: no parameters found on parameter server', file=sys.stderr)
return
set_params(node, params, timeout=options.timeout)
def do_set():
usage = """Usage: %prog set [options] node parameter value
or: %prog set [options] node values
Examples:
dynparam set wge100_camera camera_url foo
dynparam set wge100_camera "{'camera_url':'foo', 'brightness':58}" """
args, optparse_args = [], []
for s in myargv[2:]:
if s.startswith('-'):
if len(s) > 1 and ord(s[1]) >= ord('0') and ord(s[1]) <= ord('9'):
args.append(s)
else:
optparse_args.append(s)
else:
args.append(s)
parser = optparse.OptionParser(usage=usage, prog=NAME)
add_timeout_option(parser)
options, _ = parser.parse_args(optparse_args)
if len(args) > 3:
parser.error("too many arguments")
elif len(args) < 2:
parser.error("invalid arguments. Please specify either a node name, parameter name and parameter value, or a node name and a YAML dictionary")
node = args[0]
if len(args) == 2:
node, value = args[0], args[1]
values_dict = yaml.load(value)
if type(values_dict) != dict:
parser.error('invalid arguments. Please specify either a node name, parameter name and parameter value, or a node name and a YAML dictionary')
elif len(args) == 3:
node, parameter, value = args[0], args[1], args[2]
values_dict = { parameter : value }
connect()
try:
set_params(node, values_dict, timeout=options.timeout)
except rospy.service.ServiceException:
print('couldn\'t set parameters at node %s' % node)
except rospy.exceptions.ROSException:
print('couldn\'t set parameters at node %s' % node)
def do_get():
usage = "Usage: %prog get [options] node"
parser = optparse.OptionParser(usage=usage, prog=NAME)
add_timeout_option(parser)
options, args = parser.parse_args(myargv[2:])
if len(args) == 0:
parser.error("invalid arguments. Please specify a node name")
elif len(args) > 1:
parser.error("too many arguments")
node = args[0]
connect()
params = get_params(node, timeout=options.timeout)
if params is not None:
print(params)
def do_load():
usage = "Usage: %prog load [options] node file"
parser = optparse.OptionParser(usage=usage, prog=NAME)
add_timeout_option(parser)
options, args = parser.parse_args(myargv[2:])
if len(args) == 0:
parser.error("invalid arguments. Please specify a node name")
elif len(args) == 1:
parser.error("invalid arguments. Please specify an input file")
elif len(args) > 2:
parser.error("too many arguments")
node, path = args[0], args[1]
f = open(path, 'r')
try:
params = {}
for doc in yaml.load_all(f.read()):
params.update(doc)
finally:
f.close()
connect()
set_params(node, params, timeout=options.timeout)
def do_dump():
usage = "Usage: %prog dump [options] node file"
parser = optparse.OptionParser(usage=usage, prog=NAME)
add_timeout_option(parser)
options, args = parser.parse_args(myargv[2:])
if len(args) == 0:
parser.error("invalid arguments. Please specify a node name")
elif len(args) == 1:
parser.error("invalid arguments. Please specify an output file")
elif len(args) > 2:
parser.error("too many arguments")
node, path = args[0], args[1]
connect()
params = get_params(node, timeout=options.timeout)
if params is not None:
f = open(path, 'w')
try:
yaml.dump(params, f)
return
finally:
f.close()
print("couldn't get parameters from node %s" % node)
def get_params(node, timeout=None):
client = dynamic_reconfigure.client.Client(node, timeout=timeout)
return client.get_configuration(timeout=timeout)
def set_params(node, params, timeout=None):
client = dynamic_reconfigure.client.Client(node, timeout=timeout)
try:
client.update_configuration(params)
except dynamic_reconfigure.DynamicReconfigureParameterException as e:
print('error updating parameters: ' + str(e))
def add_timeout_option(parser):
parser.add_option('-t', '--timeout', action='store', type='float', default=None, help='timeout in secs')
def print_usage():
print("""dynparam is a command-line tool for getting, setting, and
deleting parameters of a dynamically configurable node.
Commands:
\tdynparam set configure node
\tdynparam set_from_parameters copy configuration from parameter server
\tdynparam get get node configuration
\tdynparam load load configuration from file
\tdynparam dump dump configuration to file
\tdynparam list list configurable nodes
Type dynparam <command> -h for more detailed usage, e.g. 'dynparam get -h'
""")
sys.exit(1)
def connect():
rospy.init_node('dynparam', anonymous=True)
if __name__ == '__main__':
myargv = rospy.myargv()
if len(myargv) == 1:
print_usage()
else:
cmd = myargv[1]
try:
if cmd == 'list': do_list()
elif cmd == 'set_from_parameters': do_set_from_parameters()
elif cmd == 'set': do_set()
elif cmd == 'get': do_get()
elif cmd == 'load': do_load()
elif cmd == 'dump': do_dump()
else: print_usage()
except rospy.exceptions.ROSInterruptException:
pass