Codebase list hinawa-utils / debian/0.2.0-3 hinawa-yamaha-terratec-cli
debian/0.2.0-3

Tree @debian/0.2.0-3 (Download .tar.gz)

hinawa-yamaha-terratec-cli @debian/0.2.0-3raw · history · blame

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2018 Takashi Sakamoto

from hinawa_utils.misc.cli_kit import CliKit
from hinawa_utils.bebob.phase_go_unit import PhaseGoUnit


def handle_current_status(unit, args):
    print('Current status:')
    print('  Packet Streaming:')
    print('    started:          {0}'.format(unit.get_property('streaming')))
    rate = unit.protocol.get_sampling_rate()
    print('    sampling-rate:    {0}'.format(rate))
    source = unit.protocol.get_clock_source()
    print('    clock-source:   {0}'.format(source))

    print('  Routing:')
    for target in unit.protocol.get_output_labels():
        source = unit.protocol.get_output_source(target)
        print('    {0}: {1}'.format(target, source))

    print('Mixer inputs:')
    for target in unit.protocol.get_mixer_input_labels():
        print('  {0}:'.format(target))
        for i in range(1, 3):
            gain = unit.protocol.get_mixer_input_volume(target, i)
            mute = unit.protocol.get_mixer_input_mute(target, i)
            print('    gain: {0}, mute: {1}'.format(gain, mute))

    print('Mixer output:')
    for i in range(1, 3):
        volume = unit.protocol.get_mixer_output_volume(i)
        mute = unit.protocol.get_mixer_output_mute(i)
        print('  volume: {0}, mute: {1}'.format(gain, mute))

    if hasattr(unit.protocol, 'get_analog_input_level_labels'):
        print('Analog input settings:')
        level = unit.protocol.get_analog_input_level()
        print('  level: {0}'.format(level))

    if hasattr(unit.protocol, 'get_analog_output_labels'):
        print('Analog output settings:')
        for target in unit.protocol.get_analog_output_labels():
            print('  {0}'.format(target))
            for i in range(1, 3):
                volume = unit.protocol.get_analog_output_volume(target, i)
                mute = unit.protocol.get_analog_output_mute(target, i)
                print('    volume: {0}, mute: {1}'.format(volume, mute))

    return True


def handle_firmware_info(unit, args):
    print('BeBoB firmware information:')
    for category, values in unit.firmware_info.items():
        if isinstance(values, dict):
            print('  {0}:'.format(category))
            for name, value in values.items():
                print('    {0}: {1}'.format(name, value))
        else:
            print('  {0}: {1}'.format(category, values))
    return True


def handle_output_source(unit, args):
    ops = ('set', 'get')
    targets = unit.protocol.get_output_labels()
    sources = unit.protocol.get_output_source_labels()
    if len(args) >= 1 and args[0] in targets:
        target = args[0]
        if len(args) >= 2 and args[1] in ops:
            op = args[1]
            if len(args) >= 3 and op == 'set' and args[2] in sources:
                source = args[2]
                unit.protocol.set_output_source(target, source)
                return True
            elif op == 'get':
                print(unit.protocol.get_output_source(target))
                return True
    print('Arguments for output-source command:')
    print('  output-source TARGET OP [SOURCE]')
    print('    TARGET: [{0}]'.format('|'.join(targets)))
    print('    OP:     [{0}]'.format('|'.join(ops)))
    print('    SOURCE: [{0}]'.format('|'.join(sources)))
    return False


def _handle_target_request(unit, args, cmd, items, targets):
    chs = ('1', '2')
    ops = ('set', 'get')
    if len(args) >= 1 and args[0] in items:
        item = args[0]
        set_func = items[item][0]
        get_func = items[item][1]
        if len(args) >= 2 and args[1] in targets:
            target = args[1]
            if len(args) >= 3 and args[2] in chs:
                ch = int(args[2])
                if len(args) >= 4 and args[3] in ops:
                    op = args[3]
                    if len(args) >= 5 and op == 'set':
                        if item == 'volume':
                            val = float(args[4])
                        else:
                            val = bool(int(args[4]))
                        set_func(target, ch, val)
                        return True
                    elif op == 'get':
                        print(get_func(target, ch))
                        return True
    print('Arguments for {0} command:'.format(cmd))
    print('  {0} ITEM TARGET CH OP [dB|ENABLE]'.format(cmd))
    print('    ITEM:   [{0}]'.format('|'.join(items)))
    print('    TARGET: [{0}]'.format('|'.join(targets)))
    print('    CH:     [{0}]'.format('|'.join(chs)))
    print('    OP:     [{0}]'.format('|'.join(ops)))
    print('    dB:     [-128.0..128.0] if ITEM=volume and OP=set')
    print('    ENABLE: [0|1]           if ITEM=mute   and OP=set')
    return False


def handle_mixer_input(unit, args):
    items = {
        'volume':   (unit.protocol.set_mixer_input_volume,
                     unit.protocol.get_mixer_input_volume),
        'mute':     (unit.protocol.set_mixer_input_mute,
                     unit.protocol.get_mixer_input_mute),
    }
    return _handle_target_request(unit, args, 'mixer-input', items,
                                  unit.protocol.get_mixer_input_labels())


def handle_mixer_output(unit, args):
    items = {
        'volume':   (unit.protocol.set_mixer_output_volume,
                     unit.protocol.get_mixer_output_volume),
        'mute':     (unit.protocol.set_mixer_output_mute,
                     unit.protocol.get_mixer_output_mute),
    }
    ops = ('set', 'get')
    chs = ('1', '2')
    if len(args) >= 1 and args[0] in items:
        item = args[0]
        set_func = items[item][0]
        get_func = items[item][1]
        if len(args) >= 2 and args[1] in chs:
            ch = int(args[1])
            if len(args) >= 3 and args[2] in ops:
                op = args[2]
                if len(args) >= 4 and op == 'set':
                    if item == 'volume':
                        val = float(args[3])
                    else:
                        val = bool(int(args[3]))
                    set_func(ch, val)
                    return True
                elif op == 'get':
                    print(get_func(ch))
                    return True
    print('Arguments for mixer-output command:')
    print('  mixer-output ITEM OP CH [dB|ENABLE]')
    print('    ITEM:   [{0}]'.format('|'.join(items)))
    print('    CH:     [{0}]'.format('|'.join(chs)))
    print('    OP:     [{0}]'.format('|'.join(ops)))
    print('    dB:     [-128.0..128.0] if ITEM=volume and OP=set')
    print('    ENABLE: [0|1]           if ITEM=mute   and OP=set')
    return False


def handle_sampling_rate(unit, args):
    ops = ('set', 'get')
    rates = unit.protocol.get_sampling_rate_labels()
    if len(args) >= 1 and args[0] in ops:
        op = args[0]
        if len(args) >= 2 and op == 'set' and int(args[1]) in rates:
            rate = int(args[1])
            if unit.get_property('streaming'):
                print('Packet streaming started.')
                return False
            unit.protocol.set_sampling_rate(rate)
            return True
        elif op == 'get':
            print(unit.protocol.get_sampling_rate())
            return True
    print('Arguments for sampling-rate command:')
    print('  sampling-rate OP [RATE]')
    print('    OP:     {0}'.format('|'.join(ops)))
    print('    RATE:   {0}'.format('|'.join(map(str, rates))))
    return False


def handle_clock_source(unit, args):
    ops = ('set', 'get')
    sources = unit.protocol.get_clock_source_labels()
    if len(args) >= 1 and args[0] in ops:
        op = args[0]
        if len(args) >= 2 and op == 'set' and args[1] in sources:
            if unit.get_property('streaming'):
                print('Packet streaming started.')
                return False
            source = args[1]
            unit.protocol.set_clock_source(source)
            return True
        elif op == 'get':
            print(unit.protocol.get_clock_source())
            return True
    print('Arguments for clock-source command:')
    print('  clock-source OPERATION [SOURCE]')
    print('    OPERATION: [{0}]'.format('|'.join(ops)))
    print('    SOURCE:    [{0}]'.format('|'.join(sources)))
    return False


def handle_analog_input_level(unit, args):
    ops = ('set', 'get')
    levels = unit.protocol.get_analog_input_level_labels()
    if len(args) >= 1 and args[0] in ops:
        op = args[0]
        if len(args) >= 2 and op == 'set':
            unit.protocol.set_analog_input_level(args[1])
            return True
        elif op == 'get':
            print(unit.protocol.get_analog_input_level())
            return True
    print('Arguments for analog-input-level command:')
    print('  analog-input-level OP [LEVEL]')
    print('    OP:     [{0}]'.format('|'.join(ops)))
    print('    LEVEL:  [{0}] if OP=set'.format('|'.join(levels)))
    return False


def handle_analog_output(unit, args):
    items = {
        'volume':   (unit.protocol.set_analog_output_volume,
                     unit.protocol.get_analog_output_volume),
        'mute':     (unit.protocol.set_analog_output_mute,
                     unit.protocol.get_analog_output_mute),
    }
    return _handle_target_request(unit, args, 'analog-output', items,
                                  unit.protocol.get_analog_output_labels())


cmds = {
    'current-status':   handle_current_status,
    'firmware-info':    handle_firmware_info,
    'output-source':    handle_output_source,
    'mixer-input':      handle_mixer_input,
    'mixer-output':     handle_mixer_output,
    'sampling-rate':    handle_sampling_rate,
    'clock-source':     handle_clock_source,
}

fullpath = CliKit.seek_snd_unit_path()
if fullpath:
    with PhaseGoUnit(fullpath) as unit:
        if hasattr(unit.protocol, 'get_analog_input_level_labels'):
            cmds['input-level'] = handle_analog_input_level
        if hasattr(unit.protocol, 'get_analog_output_labels'):
            cmds['analog-output'] = handle_analog_output
        CliKit.dispatch_command(unit, cmds)