Codebase list ctdconverter / e37a3fe8-4096-4dae-8a5a-a84f56cc74c4/main common / utils.py
e37a3fe8-4096-4dae-8a5a-a84f56cc74c4/main

Tree @e37a3fe8-4096-4dae-8a5a-a84f56cc74c4/main (Download .tar.gz)

utils.py @e37a3fe8-4096-4dae-8a5a-a84f56cc74c4/mainraw · history · blame

#!/usr/bin/env python
# encoding: utf-8
import ntpath
import os

from lxml import etree
from string import strip
from logger import info, error, warning

from common.exceptions import ApplicationException
from CTDopts.CTDopts import CTDModel, ParameterGroup


MESSAGE_INDENTATION_INCREMENT = 2


# simple struct-class containing a tuple with input/output location and the in-memory CTDModel
class ParsedCTD:
    def __init__(self, ctd_model=None, input_file=None, suggested_output_file=None):
        self.ctd_model = ctd_model
        self.input_file = input_file
        self.suggested_output_file = suggested_output_file


class ParameterHardcoder:
    def __init__(self):
        # map whose keys are the composite names of tools and parameters in the following pattern:
        # [ToolName][separator][ParameterName] -> HardcodedValue
        # if the parameter applies to all tools, then the following pattern is used:
        # [ParameterName] -> HardcodedValue

        # examples (assuming separator is '#'):
        # threads -> 24
        # XtandemAdapter#adapter -> xtandem.exe
        # adapter -> adapter.exe
        self.separator = "!"
        self.parameter_map = {}

    # the most specific value will be returned in case of overlap
    def get_hardcoded_value(self, parameter_name, tool_name):
        # look for the value that would apply for all tools
        generic_value = self.parameter_map.get(parameter_name, None)
        specific_value = self.parameter_map.get(self.build_key(parameter_name, tool_name), None)
        if specific_value is not None:
            return specific_value

        return generic_value

    def register_parameter(self, parameter_name, parameter_value, tool_name=None):
        self.parameter_map[self.build_key(parameter_name, tool_name)] = parameter_value

    def build_key(self, parameter_name, tool_name):
        if tool_name is None:
            return parameter_name
        return "%s%s%s" % (parameter_name, self.separator, tool_name)


def validate_path_exists(path):
    if not os.path.isfile(path) or not os.path.exists(path):
        raise ApplicationException("The provided path (%s) does not exist or is not a valid file path." % path)


def validate_argument_is_directory(args, argument_name):
    file_name = getattr(args, argument_name)
    if file_name is not None and os.path.isdir(file_name):
        raise ApplicationException("The provided output file name (%s) points to a directory." % file_name)


def validate_argument_is_valid_path(args, argument_name):
    paths_to_check = []
    # check if we are handling a single file or a list of files
    member_value = getattr(args, argument_name)
    if member_value is not None:
        if isinstance(member_value, list):
            for file_name in member_value:
                paths_to_check.append(strip(str(file_name)))
        else:
            paths_to_check.append(strip(str(member_value)))

        for path_to_check in paths_to_check:
            validate_path_exists(path_to_check)


# taken from
# http://stackoverflow.com/questions/8384737/python-extract-file-name-from-path-no-matter-what-the-os-path-format
def get_filename(path):
    head, tail = ntpath.split(path)
    return tail or ntpath.basename(head)


def get_filename_without_suffix(path):
    root, ext = os.path.splitext(os.path.basename(path))
    return root


def parse_input_ctds(xsd_location, input_ctds, output_destination, output_file_extension):
    is_converting_multiple_ctds = len(input_ctds) > 1
    parsed_ctds = []
    schema = None
    if xsd_location is not None:
        try:
            info("Loading validation schema from %s" % xsd_location, 0)
            schema = etree.XMLSchema(etree.parse(xsd_location))
        except Exception, e:
            error("Could not load validation schema %s. Reason: %s" % (xsd_location, str(e)), 0)
    else:
        warning("Validation against a schema has not been enabled.", 0)

    for input_ctd in input_ctds:
        if schema is not None:
            validate_against_schema(input_ctd, schema)

        output_file = output_destination
        # if multiple inputs are being converted, we need to generate a different output_file for each input
        if is_converting_multiple_ctds:
            output_file = os.path.join(output_file, get_filename_without_suffix(input_ctd) + "." + output_file_extension)
        info("Parsing %s" % input_ctd)
        parsed_ctds.append(ParsedCTD(CTDModel(from_file=input_ctd), input_ctd, output_file))

    return parsed_ctds


def flatten_list_of_lists(args, list_name):
    setattr(args, list_name, [item for sub_list in getattr(args, list_name) for item in sub_list])


def validate_against_schema(ctd_file, schema):
    try:
        parser = etree.XMLParser(schema=schema)
        etree.parse(ctd_file, parser=parser)
    except etree.XMLSyntaxError, e:
        raise ApplicationException("Invalid CTD file %s. Reason: %s" % (ctd_file, str(e)))


def add_common_parameters(parser, version, last_updated):
    parser.add_argument("FORMAT", default=None, help="Output format (mandatory). Can be one of: cwl, galaxy.")
    parser.add_argument("-i", "--input", dest="input_files", default=[], required=True, nargs="+", action="append",
                        help="List of CTD files to convert.")
    parser.add_argument("-o", "--output-destination", dest="output_destination", required=True,
                        help="If multiple input files are given, then a folder in which all converted "
                             "files will be generated is expected; "
                             "if a single input file is given, then a destination file is expected.")
    parser.add_argument("-x", "--default-executable-path", dest="default_executable_path",
                        help="Use this executable path when <executablePath> is not present in the CTD",
                        default=None, required=False)
    parser.add_argument("-b", "--blacklist-parameters", dest="blacklisted_parameters", default=[], nargs="+",
                        action="append",
                        help="List of parameters that will be ignored and won't appear on the galaxy stub",
                        required=False)
    parser.add_argument("-p", "--hardcoded-parameters", dest="hardcoded_parameters", default=None, required=False,
                        help="File containing hardcoded values for the given parameters. Run with '-h' or '--help' "
                             "to see a brief example on the format of this file.")
    parser.add_argument("-V", "--validation-schema", dest="xsd_location", default=None, required=False,
                        help="Location of the schema to use to validate CTDs. If not provided, no schema validation "
                             "will take place.")

    # TODO: add verbosity, maybe?
    program_version = "v%s" % version
    program_build_date = str(last_updated)
    program_version_message = "%%(prog)s %s (%s)" % (program_version, program_build_date)
    parser.add_argument("-v", "--version", action="version", version=program_version_message)


def parse_hardcoded_parameters(hardcoded_parameters_file):
    parameter_hardcoder = ParameterHardcoder()
    if hardcoded_parameters_file is not None:
        line_number = 0
        with open(hardcoded_parameters_file) as f:
            for line in f:
                line_number += 1
                if line is None or not line.strip() or line.strip().startswith("#"):
                    pass
                else:
                    # the third column must not be obtained as a whole, and not split
                    parsed_hardcoded_parameter = line.strip().split(None, 2)
                    # valid lines contain two or three columns
                    if len(parsed_hardcoded_parameter) != 2 and len(parsed_hardcoded_parameter) != 3:
                        warning("Invalid line at line number %d of the given hardcoded parameters file. Line will be"
                                "ignored:\n%s" % (line_number, line), 0)
                        continue

                    parameter_name = parsed_hardcoded_parameter[0]
                    hardcoded_value = parsed_hardcoded_parameter[1]
                    tool_names = None
                    if len(parsed_hardcoded_parameter) == 3:
                        tool_names = parsed_hardcoded_parameter[2].split(',')
                    if tool_names:
                        for tool_name in tool_names:
                            parameter_hardcoder.register_parameter(parameter_name, hardcoded_value, tool_name.strip())
                    else:
                        parameter_hardcoder.register_parameter(parameter_name, hardcoded_value)

    return parameter_hardcoder


def extract_tool_help_text(ctd_model):
    manual = ""
    doc_url = None
    if "manual" in ctd_model.opt_attribs.keys():
        manual += "%s\n\n" % ctd_model.opt_attribs["manual"]
    if "docurl" in ctd_model.opt_attribs.keys():
        doc_url = ctd_model.opt_attribs["docurl"]

    help_text = "No help available"
    if manual is not None:
        help_text = manual
    if doc_url is not None:
        help_text = ("" if manual is None else manual) + "\nFor more information, visit %s" % doc_url

    return help_text


def extract_tool_executable_path(model, default_executable_path):
    # rules to build the executable path:
    # if executablePath is null, then use default_executable_path
    # if executablePath is null and executableName is null, then the name of the tool will be used
    # if executablePath is null and executableName is not null, then executableName will be used
    # if executablePath is not null and executableName is null,
    #   then executablePath and the name of the tool will be used
    # if executablePath is not null and executableName is not null, then both will be used

    # first, check if the model has executablePath / executableName defined
    executable_path = model.opt_attribs.get("executablePath", None)
    executable_name = model.opt_attribs.get("executableName", None)

    # check if we need to use the default_executable_path
    if executable_path is None:
        executable_path = default_executable_path

    # fix the executablePath to make sure that there is a '/' in the end
    if executable_path is not None:
        executable_path = executable_path.strip()
        if not executable_path.endswith("/"):
            executable_path += "/"

    # assume that we have all information present
    command = str(executable_path) + str(executable_name)
    if executable_path is None:
        if executable_name is None:
            command = model.name
        else:
            command = executable_name
    else:
        if executable_name is None:
            command = executable_path + model.name
    return command


def extract_and_flatten_parameters(ctd_model):
    parameters = []
    if len(ctd_model.parameters.parameters) > 0:
        # use this to put parameters that are to be processed
        # we know that CTDModel has one parent ParameterGroup
        pending = [ctd_model.parameters]
        while len(pending) > 0:
            # take one element from 'pending'
            parameter = pending.pop()
            if type(parameter) is not ParameterGroup:
                parameters.append(parameter)
            else:
                # append the first-level children of this ParameterGroup
                pending.extend(parameter.parameters.values())
    # returned the reversed list of parameters (as it is now,
    # we have the last parameter in the CTD as first in the list)
    return reversed(parameters)


# some parameters are mapped to command line options, this method helps resolve those mappings, if any
def resolve_param_mapping(param, ctd_model):
    # go through all mappings and find if the given param appears as a reference name in a mapping element
    param_mapping = None
    for cli_element in ctd_model.cli:
        for mapping_element in cli_element.mappings:
            if mapping_element.reference_name == param.name:
                if param_mapping is not None:
                    warning("The parameter %s has more than one mapping in the <cli> section. "
                            "The first found mapping, %s, will be used." % (param.name, param_mapping), 1)
                else:
                    param_mapping = cli_element.option_identifier

    return param_mapping if param_mapping is not None else param.name


def _extract_param_cli_name(param, ctd_model):
    # we generate parameters with colons for subgroups, but not for the two topmost parents (OpenMS legacy)
    if type(param.parent) == ParameterGroup:
        if not hasattr(param.parent.parent, 'parent'):
            return resolve_param_mapping(param, ctd_model)
        elif not hasattr(param.parent.parent.parent, 'parent'):
            return resolve_param_mapping(param, ctd_model)
        else:
            if ctd_model.cli:
                warning("Using nested parameter sections (NODE elements) is not compatible with <cli>", 1)
            return extract_param_name(param.parent) + ":" + resolve_param_mapping(param, ctd_model)
    else:
        return resolve_param_mapping(param, ctd_model)


def extract_param_name(param):
    # we generate parameters with colons for subgroups, but not for the two topmost parents (OpenMS legacy)
    if type(param.parent) == ParameterGroup:
        if not hasattr(param.parent.parent, "parent"):
            return param.name
        elif not hasattr(param.parent.parent.parent, "parent"):
            return param.name
        else:
            return extract_param_name(param.parent) + ":" + param.name
    else:
        return param.name


def extract_command_line_prefix(param, ctd_model):
    param_name = extract_param_name(param)
    param_cli_name = _extract_param_cli_name(param, ctd_model)
    if param_name == param_cli_name:
        # there was no mapping, so for the cli name we will use a '-' in the prefix
        param_cli_name = "-" + param_name
    return param_cli_name