Codebase list gfapy / debian/1.2.1+dfsg-1 gfapy / rgfa.py
debian/1.2.1+dfsg-1

Tree @debian/1.2.1+dfsg-1 (Download .tar.gz)

rgfa.py @debian/1.2.1+dfsg-1raw · history · blame

import gfapy

class RGFA():
  """
  Add support of rGFA format.
  A dialect-specific validation method is added, as well as convenience
  methods to handle the stable sequence names.
  """

  def is_rgfa(self):
    """
    Indicate that rGFA dialect of GFA1 shall be used
    """
    return self._dialect == "rgfa"

  @property
  def stable_sequence_names(self):
    """Stable sequence names from rGFA SN tags"""
    if self._dialect != "rgfa":
      return []
    stable_seqs = set()
    for s in self.segments:
      stable_seqs.add(s.SN)
    return list(stable_seqs)

  def validate_rgfa(self):
    """
    Validate rGFA

    - version must be 1.0
    - no H, P, C lines are present
    - S lines have rGFA-specific predefined tags
    - if L lines have rGFA-specific tags, they have the correct type
    - overlaps must be 0M
    """
    self._validate_rgfa_version()
    self._validate_rgfa_no_headers()
    self._validate_rgfa_no_containments()
    self._validate_rgfa_no_paths()
    self._validate_rgfa_tags_in_lines(self.segments)
    self._validate_rgfa_tags_in_lines(self.dovetails)
    self._validate_rgfa_link_overlaps()

  def _validate_rgfa_version(self):
    """Validate version of rGFA (it must be gfa1)"""
    if self.version != "gfa1":
      raise gfapy.VersionError("rGFA format only supports GFA version 1")

  def _validate_rgfa_no_headers(self):
    """Validate the absence of H lines in rGFA"""
    if self.headers:
      raise gfapy.ValueError("rGFA does not support header lines")

  def _validate_rgfa_no_containments(self):
    """Validate the absence of C lines in rGFA"""
    if self.containments:
      raise gfapy.ValueError("rGFA does not support containment lines")

  def _validate_rgfa_no_paths(self):
    """Validate the absence of P lines in rGFA"""
    if self.paths:
      raise gfapy.ValueError("rGFA does not support path lines")

  RGFA_TAGS = {
    "mandatory": {
        "S": {"SN": "Z", "SO": "i", "SR": "i"},
        "L": {},
      },
    "optional": {
      "S": {},
      "L": {"SR": "i", "L1": "i", "L2": "i"},
      },
    }

  def _validate_rgfa_tags_in_lines(self, lines):
    """
    Validate rGFA tags for a group of lines
    """
    for line in lines:
      rt = line.record_type
      tags_check_presence = gfapy.Gfa.RGFA_TAGS["mandatory"].get(rt, {})
      tags_check_datatype = tags_check_presence.copy()
      tags_check_datatype.update(gfapy.Gfa.RGFA_TAGS["optional"].get(rt,{}))
      for tag, datatype in tags_check_presence.items():
        if tag not in line.tagnames:
          raise gfapy.NotFoundError(
            "rGFA {} lines must have a {} tag\n".format(rt, tag)+
            "offending line:\n{}".format(str(line)))
      for tag, datatype in tags_check_datatype.items():
        if tag in line.tagnames:
          if line.get_datatype(tag) != datatype:
            raise gfapy.ValueError(
              "rGFA {} tags in {} lines must have datatype {}\n".format(
                tag, rt, datatype)+
              "offending line:\n{}".format(str(line)))

  def _validate_rgfa_link_overlaps(self):
    for link in self.dovetails:
      if link.field_to_s("overlap") != "0M":
        raise gfapy.ValueError("rGFA CIGARs must be 0M\n",
              "offending line:\n{}".format(str(link)))