Codebase list gfapy / debian/1.2.1+dfsg-1 gfapy / gfa.py
debian/1.2.1+dfsg-1

Tree @debian/1.2.1+dfsg-1 (Download .tar.gz)

gfa.py @debian/1.2.1+dfsg-1raw · history · blame

import gfapy
from .lines import Lines
from .graph_operations import GraphOperations
from collections import defaultdict
from .rgfa import RGFA
import sys

class Gfa(Lines,GraphOperations,RGFA):
  """Representation of the data in a GFA file.

  Parameters:
    data (str or list): optional, string content of a GFA file, or
       the same as a list (splitted on newlines); default: create an empty Gfa
       instance
    vlevel (int): validation level (default: 1)
    version (str): GFA version ('gfa1' or 'gfa2';
        default: automatic recognition)
    dialect (str): dialect ('standard' or 'rgfa';
        default: standard)

  Raises:
    ~gfapy.error.ArgumentError: if the vlevel or version are invalid
    ~gfapy.error.FormatError: if data is provided, which is invalid
    ~gfapy.error.VersionError: if an unknown version is specified, or data is
        provided, which is not compatible with the specified version
    ~gfapy.error.VersionError: if an unknown dialect is specified
  """

  def __init__(self, *args, vlevel = 1, version = None, dialect = "standard"):
    if not isinstance(vlevel, int):
      raise gfapy.ArgumentError("vlevel is not an integer ({})".format(vlevel))
    if vlevel < 0:
      raise gfapy.ArgumentError(
          "vlevel is not a positive integer ({})".format(vlevel))
    if not version in ['gfa1', 'gfa2', None]:
      raise gfapy.VersionError("GFA version unknown ({})".format(version))
    if not dialect in ['standard', 'rgfa', None]:
      raise gfapy.VersionError("GFA dialect unknown ({})".format(dialect))
    self._vlevel = vlevel
    self._max_int_name = 0
    self._records = defaultdict(dict)
    self._records["H"] = gfapy.line.Header(["H"], vlevel = vlevel)
    self._records["H"].connect(self)
    self._records["S"] = {}
    self._records["P"] = {}
    self._records["F"] = {}
    self._records["\n"] = {}
    self._records["E"] = {}
    self._records["U"] = {}
    self._records["G"] = {}
    self._records["O"] = {}
    self._records["C"] = {}
    self._records["L"] = {}
    self._records["#"] = {}
    self._segments_first_order = False
    self._progress = None
    self._default = {"count_tag": "RC", "unit_length": 1}
    self._line_queue = []
    if version is None:
      self._version = None
      self._version_explanation = None
      self._version_guess = "gfa2"
    else:
      self._version = version
      self._version_explanation = "set during initialization"
      self._version_guess = version
      self._validate_version()
    self._dialect = dialect.lower()
    if len(args) == 1:
      lst = None
      if isinstance(args[0], str):
        lst = args[0].split("\n")
      elif isinstance(args[0], list):
        lst = args[0]
      else:
        raise gfapy.ArgumentError("Cannot create a Gfa"+
            " instance from an object of type {}".format(type(args[0])))
      for line in lst:
          self.add_line(line)
      self.process_line_queue()
      if vlevel >= 1:
        self.validate()
    elif len(args) > 1:
      raise gfapy.ArgumentError("Wrong number of arguments for Gfa()"+
          "({})".format(len(args)))

  @property
  def version(self):
    """GFA version ('gfa1' or 'gfa2')"""
    return self._version

  @version.setter
  def version(self,value):
    self._version=value

  @property
  def dialect(self):
    """GFA dialect ('standard' or 'rgfa')"""
    return self._dialect

  @dialect.setter
  def dialect(self, value):
    self._dialect = value.lower()

  @property
  def vlevel(self):
    """Level of validation"""
    return self._vlevel

  @vlevel.setter
  def vlevel(self,value):
    self._vlevel=value

  def validate(self):
    """Validate the GFA instance

    Checks if all references are solved correctly.
    """
    self.__validate_segment_references()
    self.__validate_path_links()
    self.__validate_group_items()
    self.__validate_gfa2_positions()
    if self._dialect == "rgfa":
      self.validate_rgfa()

  def __str__(self):
    return "\n".join([str(line) for line in self.lines])

  def to_gfa1_s(self):
    """Create a GFA1 string representation for the GFA data

    If the Gfa has version 'gfa1', its string representation is
    returned. Otherwise a conversion from GFA2 is performed.
    """
    if self.version == "gfa1":
      return str(self)
    else:
      lines = []
      for line in self.lines:
        converted = line.to_gfa1_s()
        if converted:
          lines.append(converted)
      return "\n".join(lines)

  def to_gfa1(self):
    """Create a GFA1 Gfa instance for the GFA data

    If the Gfa has version 'gfa1', it is
    returned. Otherwise a conversion from GFA2 is performed.
    """
    if self.version == "gfa1":
      return self
    else:
      gfa1 = gfapy.Gfa(version="gfa1", vlevel=self.vlevel)
      for line in self.lines:
        gfa1.add_line(line.to_gfa1(raise_on_failure=False))
      return gfa1

  def to_gfa2_s(self):
    """Create a GFA2 string representation for the GFA data

    If the Gfa has version 'gfa2', its string representation is
    returned. Otherwise a conversion from GFA1 is performed.
    """
    if self.version == "gfa2":
      return str(self)
    else:
      lines = []
      for line in self.lines:
        converted = line.to_gfa2_s()
        if converted:
          lines.append(converted)
      return "\n".join(lines)

  def to_gfa2(self):
    """Create a GFA2 Gfa instance for the GFA data.

    If the Gfa has version 'gfa2', it is
    returned. Otherwise a conversion from GFA1 is performed.
    """
    if self.version == "gfa2":
      return self
    else:
      gfa2 = gfapy.Gfa(version="gfa2", vlevel=self.vlevel)
      for line in self.lines:
        gfa2.add_line(line.to_gfa2(raise_on_failure=False))
      return gfa2

  # TODO: implement clone (see how clone for lines was implemented)

  def read_file(self, filename):
    """Read GFA data from a file and load it into the Gfa instance.

    Parameters:
      filename (str)
    """
    if self._progress:
      linecount = 0
      with open(filename) as f:
        for line in f:
          linecount += 1
      # TODO: better implementation of linecount
      self._progress_log_init("read_file", "lines", linecount,
                              "Parsing file {}".format(filename)+
                              " containing {} lines".format(linecount))
    with open(filename) as f:
      for line in f:
        self.add_line(line.rstrip('\r\n'))
        if self._progress:
          self._progress_log("read_file")
    if self._line_queue:
      self._version = self._version_guess
      self.process_line_queue()
    if self._progress:
      self._progress_log_end("read_file")
    if self._vlevel >= 1:
      self.validate()
    return self

  @classmethod
  def from_file(cls, filename, vlevel = 1, version = None, dialect="standard"):
    """Create a Gfa instance from the contents of a GFA file.

    Parameters:
      filename (str)
      vlevel (int) : the validation level
      version (str) : the GFA version ('gfa1' or 'gfa2'; default:
          determine version automatically)

    Returns:
      gfapy.Gfa
    """
    gfa = cls(vlevel = vlevel, version = version, dialect = dialect)
    gfa.read_file(filename)
    return gfa

  def to_file(self, filename):
    """Write the content of the instance to a GFA file

    Parameters:
      filename (str)
    """
    with open(filename, "w") as f:
      for line in self.lines:
        f.write(str(line)+"\n")

  def __eq__(self, other):
    self.lines == other.lines

  def __lenstats(self):
    sln = [ s.try_length for s in self.segments ]
    sln = sorted(sln)
    n = len(sln)
    tlen = 0
    for l in sln:
      tlen += l
    n50_target = tlen//2
    n50 = None
    curr_sum = 0
    for l in reversed(sln):
      curr_sum += l
      if curr_sum >= n50_target:
        n50 = l
        break
    q = (sln[0], sln[(n//4)-2], sln[(n//2)-1], sln[((n*3)//4)-1], sln[-1])
    return (q, n50, tlen)

  def __validate_segment_references(self):
    for s in self.segments:
      if s.virtual:
        raise gfapy.NotFoundError("Segment {} ".format(s.name)+
            "does not exist\nReferences to {} ".format(s.name)+
            "were found in the following lines:\n"+s.refstr())

  def __validate_path_links(self):
    for pt in self._gfa1_paths:
      for ol in pt.links:
        l = ol.line
        if l.virtual:
          raise gfapy.NotFoundError("A link equivalent to:\n{}\n".format(\
                  l.to_str(add_virtual_commentary=False))+
              "does not exist, but is required by the following paths:\n"+
              l.refstr())

  def __validate_group_items(self):
    if self.version == "gfa1":
      return
    for group in self.sets + self.paths:
      for item in group.items:
        if isinstance(item, gfapy.OrientedLine):
          item = item.line
        if item.virtual:
          raise gfapy.NotFoundError("A line with identifier {}\n".format(\
                  item.name)+
              "does not exist, but is required by the following groups:\n"+
              item.refstr())

  def __validate_gfa2_positions(self):
    if self.version == "gfa1":
      return
    for line in self.edges + self.fragments:
      line.validate_positions()

  def _validate_version(self):
    if (self._version != None) and (self._version not in gfapy.VERSIONS):
      raise gfapy.VersionError("GFA specification version {} not supported".
              format(self._version))

  # Progress logging related-methods:

  def enable_progress_logging(self, part=0.1, channel=sys.stderr):
    '''Activate logging of progress for some graph operations.

    Parameters:
      part (float) : report when every specified portion of the computation
          is completed (default: 0.1)
      channel : output channel (default: standard error)
    '''
    self._progress = gfapy.Logger(channel=channel)
    self._progress.enable_progress(part=part)

  def _progress_log_init(self, symbol, units, total, initmsg = None):
    if self._progress is not None:
      self._progress.progress_init(symbol, units, total, initmsg)

  def _progress_log(self, symbol, progress=1, **keyargs):
    if self._progress is not None:
      self._progress.progress_log(symbol, progress)

  def _progress_log_end(self, symbol, **keyargs):
    if self._progress is not None:
      self._progress.progress_end(symbol)