#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2018 Takashi Sakamoto
from hinawa_utils.misc.cli_kit import CliKit
from hinawa_utils.bebob.phase_go_unit import PhaseGoUnit
def handle_current_status(unit, args):
print('Current status:')
print(' Packet Streaming:')
print(' started: {0}'.format(unit.get_property('streaming')))
rate = unit.protocol.get_sampling_rate()
print(' sampling-rate: {0}'.format(rate))
source = unit.protocol.get_clock_source()
print(' clock-source: {0}'.format(source))
print(' Routing:')
for target in unit.protocol.get_output_labels():
source = unit.protocol.get_output_source(target)
print(' {0}: {1}'.format(target, source))
print('Mixer inputs:')
for target in unit.protocol.get_mixer_input_labels():
print(' {0}:'.format(target))
for i in range(1, 3):
gain = unit.protocol.get_mixer_input_volume(target, i)
mute = unit.protocol.get_mixer_input_mute(target, i)
print(' gain: {0}, mute: {1}'.format(gain, mute))
print('Mixer output:')
for i in range(1, 3):
volume = unit.protocol.get_mixer_output_volume(i)
mute = unit.protocol.get_mixer_output_mute(i)
print(' volume: {0}, mute: {1}'.format(gain, mute))
if hasattr(unit.protocol, 'get_analog_input_level_labels'):
print('Analog input settings:')
level = unit.protocol.get_analog_input_level()
print(' level: {0}'.format(level))
if hasattr(unit.protocol, 'get_analog_output_labels'):
print('Analog output settings:')
for target in unit.protocol.get_analog_output_labels():
print(' {0}'.format(target))
for i in range(1, 3):
volume = unit.protocol.get_analog_output_volume(target, i)
mute = unit.protocol.get_analog_output_mute(target, i)
print(' volume: {0}, mute: {1}'.format(volume, mute))
return True
def handle_firmware_info(unit, args):
print('BeBoB firmware information:')
for category, values in unit.firmware_info.items():
if isinstance(values, dict):
print(' {0}:'.format(category))
for name, value in values.items():
print(' {0}: {1}'.format(name, value))
else:
print(' {0}: {1}'.format(category, values))
return True
def handle_output_source(unit, args):
ops = ('set', 'get')
targets = unit.protocol.get_output_labels()
sources = unit.protocol.get_output_source_labels()
if len(args) >= 1 and args[0] in targets:
target = args[0]
if len(args) >= 2 and args[1] in ops:
op = args[1]
if len(args) >= 3 and op == 'set' and args[2] in sources:
source = args[2]
unit.protocol.set_output_source(target, source)
return True
elif op == 'get':
print(unit.protocol.get_output_source(target))
return True
print('Arguments for output-source command:')
print(' output-source TARGET OP [SOURCE]')
print(' TARGET: [{0}]'.format('|'.join(targets)))
print(' OP: [{0}]'.format('|'.join(ops)))
print(' SOURCE: [{0}]'.format('|'.join(sources)))
return False
def _handle_target_request(unit, args, cmd, items, targets):
chs = ('1', '2')
ops = ('set', 'get')
if len(args) >= 1 and args[0] in items:
item = args[0]
set_func = items[item][0]
get_func = items[item][1]
if len(args) >= 2 and args[1] in targets:
target = args[1]
if len(args) >= 3 and args[2] in chs:
ch = int(args[2])
if len(args) >= 4 and args[3] in ops:
op = args[3]
if len(args) >= 5 and op == 'set':
if item == 'volume':
val = float(args[4])
else:
val = bool(int(args[4]))
set_func(target, ch, val)
return True
elif op == 'get':
print(get_func(target, ch))
return True
print('Arguments for {0} command:'.format(cmd))
print(' {0} ITEM TARGET CH OP [dB|ENABLE]'.format(cmd))
print(' ITEM: [{0}]'.format('|'.join(items)))
print(' TARGET: [{0}]'.format('|'.join(targets)))
print(' CH: [{0}]'.format('|'.join(chs)))
print(' OP: [{0}]'.format('|'.join(ops)))
print(' dB: [-128.0..128.0] if ITEM=volume and OP=set')
print(' ENABLE: [0|1] if ITEM=mute and OP=set')
return False
def handle_mixer_input(unit, args):
items = {
'volume': (unit.protocol.set_mixer_input_volume,
unit.protocol.get_mixer_input_volume),
'mute': (unit.protocol.set_mixer_input_mute,
unit.protocol.get_mixer_input_mute),
}
return _handle_target_request(unit, args, 'mixer-input', items,
unit.protocol.get_mixer_input_labels())
def handle_mixer_output(unit, args):
items = {
'volume': (unit.protocol.set_mixer_output_volume,
unit.protocol.get_mixer_output_volume),
'mute': (unit.protocol.set_mixer_output_mute,
unit.protocol.get_mixer_output_mute),
}
ops = ('set', 'get')
chs = ('1', '2')
if len(args) >= 1 and args[0] in items:
item = args[0]
set_func = items[item][0]
get_func = items[item][1]
if len(args) >= 2 and args[1] in chs:
ch = int(args[1])
if len(args) >= 3 and args[2] in ops:
op = args[2]
if len(args) >= 4 and op == 'set':
if item == 'volume':
val = float(args[3])
else:
val = bool(int(args[3]))
set_func(ch, val)
return True
elif op == 'get':
print(get_func(ch))
return True
print('Arguments for mixer-output command:')
print(' mixer-output ITEM OP CH [dB|ENABLE]')
print(' ITEM: [{0}]'.format('|'.join(items)))
print(' CH: [{0}]'.format('|'.join(chs)))
print(' OP: [{0}]'.format('|'.join(ops)))
print(' dB: [-128.0..128.0] if ITEM=volume and OP=set')
print(' ENABLE: [0|1] if ITEM=mute and OP=set')
return False
def handle_sampling_rate(unit, args):
ops = ('set', 'get')
rates = unit.protocol.get_sampling_rate_labels()
if len(args) >= 1 and args[0] in ops:
op = args[0]
if len(args) >= 2 and op == 'set' and int(args[1]) in rates:
rate = int(args[1])
if unit.get_property('streaming'):
print('Packet streaming started.')
return False
unit.protocol.set_sampling_rate(rate)
return True
elif op == 'get':
print(unit.protocol.get_sampling_rate())
return True
print('Arguments for sampling-rate command:')
print(' sampling-rate OP [RATE]')
print(' OP: {0}'.format('|'.join(ops)))
print(' RATE: {0}'.format('|'.join(map(str, rates))))
return False
def handle_clock_source(unit, args):
ops = ('set', 'get')
sources = unit.protocol.get_clock_source_labels()
if len(args) >= 1 and args[0] in ops:
op = args[0]
if len(args) >= 2 and op == 'set' and args[1] in sources:
if unit.get_property('streaming'):
print('Packet streaming started.')
return False
source = args[1]
unit.protocol.set_clock_source(source)
return True
elif op == 'get':
print(unit.protocol.get_clock_source())
return True
print('Arguments for clock-source command:')
print(' clock-source OPERATION [SOURCE]')
print(' OPERATION: [{0}]'.format('|'.join(ops)))
print(' SOURCE: [{0}]'.format('|'.join(sources)))
return False
def handle_analog_input_level(unit, args):
ops = ('set', 'get')
levels = unit.protocol.get_analog_input_level_labels()
if len(args) >= 1 and args[0] in ops:
op = args[0]
if len(args) >= 2 and op == 'set':
unit.protocol.set_analog_input_level(args[1])
return True
elif op == 'get':
print(unit.protocol.get_analog_input_level())
return True
print('Arguments for analog-input-level command:')
print(' analog-input-level OP [LEVEL]')
print(' OP: [{0}]'.format('|'.join(ops)))
print(' LEVEL: [{0}] if OP=set'.format('|'.join(levels)))
return False
def handle_analog_output(unit, args):
items = {
'volume': (unit.protocol.set_analog_output_volume,
unit.protocol.get_analog_output_volume),
'mute': (unit.protocol.set_analog_output_mute,
unit.protocol.get_analog_output_mute),
}
return _handle_target_request(unit, args, 'analog-output', items,
unit.protocol.get_analog_output_labels())
cmds = {
'current-status': handle_current_status,
'firmware-info': handle_firmware_info,
'output-source': handle_output_source,
'mixer-input': handle_mixer_input,
'mixer-output': handle_mixer_output,
'sampling-rate': handle_sampling_rate,
'clock-source': handle_clock_source,
}
fullpath = CliKit.seek_snd_unit_path()
if fullpath:
with PhaseGoUnit(fullpath) as unit:
if hasattr(unit.protocol, 'get_analog_input_level_labels'):
cmds['input-level'] = handle_analog_input_level
if hasattr(unit.protocol, 'get_analog_output_labels'):
cmds['analog-output'] = handle_analog_output
CliKit.dispatch_command(unit, cmds)