Codebase list ariba / debian/2.14.4+ds-2.1 ariba / assembly_variants.py
debian/2.14.4+ds-2.1

Tree @debian/2.14.4+ds-2.1 (Download .tar.gz)

assembly_variants.py @debian/2.14.4+ds-2.1raw · history · blame

import operator
import pyfastaq
import pymummer
from ariba import sequence_variant
from pyfastaq import intervals


class Error (Exception): pass

class AssemblyVariants:
    def __init__(self,
      refdata,
      nucmer_snp_file,
    ):
        self.refdata = refdata
        self.nucmer_snp_file = nucmer_snp_file


    @classmethod
    def _get_codon_start(cls, gene_start, position):
        assert position >= gene_start
        while  (position - gene_start) % 3 != 0:
            position -= 1
        return position


    @classmethod
    def _get_mummer_variants(cls, snp_file):
        variants = pymummer.snp_file.get_all_variants(snp_file)
        mummer_variants = {}

        if len(variants) == 0:
            return {}

        variants.sort(key=operator.attrgetter('qry_name'))
        variants.sort(key=operator.attrgetter('ref_start'))

        for v in variants:
            if v.qry_name not in mummer_variants:
                mummer_variants[v.qry_name] = []
            mummer_variants[v.qry_name].append(v)

        for contig in mummer_variants:
            l = mummer_variants[contig]
            if len(l) > 1:
                new_l = [[l[0]]]
                previous_codon_start = AssemblyVariants._get_codon_start(0, l[0].ref_start)
                for variant in l[1:]:
                    codon_start = AssemblyVariants._get_codon_start(0, variant.ref_start)
                    if codon_start == previous_codon_start:
                        new_l[-1].append(variant)
                    else:
                        new_l.append([variant])
                        previous_codon_start = codon_start
                mummer_variants[contig] = new_l
            else:
                mummer_variants[contig] = [l]

        return mummer_variants


    @classmethod
    def _get_variant_effect(cls, variants, ref_sequence):
        '''variants = list of variants in the same codon.
           returns type of variant (cannot handle more than one indel in the same codon).'''
        assert len(variants) != 0

        var_types = [x.var_type for x in variants]
        if len(set(var_types)) != 1:
            return 'MULTIPLE', '.', '.'

        var_type = var_types[0]

        assert set([x.ref_name for x in variants]) == set([ref_sequence.id])
        codon_starts = [AssemblyVariants._get_codon_start(0, x.ref_start) for x in variants]
        assert len(set(codon_starts)) == 1
        codon_start = codon_starts[0]
        aa_start = codon_start // 3
        ref_codon = pyfastaq.sequences.Fasta('codon', ref_sequence[codon_start:codon_start+3])
        ref_aa = ref_codon.translate()

        if var_type == pymummer.variant.SNP:
            new_codon = list(ref_codon.seq)
            for v in variants:
                new_codon[v.ref_start - codon_start] = v.qry_base
            new_codon = pyfastaq.sequences.Fasta('new', ''.join(new_codon))
            qry_aa = new_codon.translate()

            if ref_aa.seq == qry_aa.seq:
                return ('SYN', '.', aa_start)
            elif qry_aa.seq == '*':
                return ('TRUNC', ref_aa.seq + str(aa_start + 1) + 'trunc', aa_start)
            else:
                return ('NONSYN', ref_aa.seq + str(aa_start + 1) + qry_aa.seq, aa_start)
        elif var_type in [pymummer.variant.INS, pymummer.variant.DEL]:
            if len(variants) > 1:
                return 'INDELS', '.', aa_start

            var = variants[0]

            if var_type == pymummer.variant.INS:
                new_seq = pyfastaq.sequences.Fasta('seq', var.qry_base)
            else:
                new_seq = pyfastaq.sequences.Fasta('seq', var.ref_base)

            if len(new_seq) % 3 != 0:
                return ('FSHIFT', ref_aa.seq + str(aa_start + 1) + 'fs', aa_start)

            new_seq_aa = new_seq.translate()
            if '*' in new_seq_aa.seq:
                return ('TRUNC', ref_aa.seq + str(aa_start + 1) + 'trunc', aa_start)
            elif var_type == pymummer.variant.INS:
                ref_codon_after_ins = pyfastaq.sequences.Fasta('codon', ref_sequence[codon_start+3:codon_start+6])
                aa_after_ins = ref_codon_after_ins.translate()
                return ('INS', ref_aa.seq + str(aa_start + 1) + '_' + aa_after_ins.seq + str(aa_start + 2) + 'ins' + new_seq_aa.seq , aa_start)
            else:
                if len(new_seq) == 3:
                    return ('DEL', ref_aa.seq + str(aa_start + 1) + 'del', aa_start)
                else:
                    assert len(new_seq) % 3 == 0
                    ref_codon_after_ins = pyfastaq.sequences.Fasta('codon', ref_sequence[codon_start+3:codon_start+6])
                    aa_after_ins = ref_codon_after_ins.translate()
                    return ('DEL', ref_aa.seq + str(aa_start + 1)+ '_' + aa_after_ins.seq + str(aa_start + 2) + 'del', aa_start)

        else:
            return ('UNKNOWN', '.', aa_start)


    @staticmethod
    def _filter_mummer_variants(mummer_variants, ref_sequence):
        if len(mummer_variants) == 0:
            return

        for contig in mummer_variants:
            variants = mummer_variants[contig]
            for i in range(len(variants)):
                t = AssemblyVariants._get_variant_effect(variants[i], ref_sequence)
                if t is not None and t[0] in ['TRUNC', 'FSHIFT']:
                    break
            mummer_variants[contig] = variants[:i+1]


    @staticmethod
    def _get_one_variant_for_one_contig_non_coding(refdata_var_dict, mummer_variant):
        var_tuple = None
        used_known_variants = set()

        # if the variant is at the same position as a known variant in the reference
        if refdata_var_dict is not None and mummer_variant.ref_start in refdata_var_dict['n']:
            if mummer_variant.var_type == pymummer.variant.SNP:
                variants_at_this_position = {x for x in refdata_var_dict['n'][mummer_variant.ref_start]}
                matching_variants = {x for x in variants_at_this_position if mummer_variant.qry_base == x.variant.variant_value}
                not_interesting_variants = {x for x in variants_at_this_position if mummer_variant.qry_base == x.variant.wild_value}
                variants_at_this_position = variants_at_this_position.difference(matching_variants)
            else:
                matching_variants = set()
                variants_at_this_position = refdata_var_dict['n'][mummer_variant.ref_start]
                not_interesting_variants = set()

            if len(not_interesting_variants) == 0:
                var_tuple = (
                    mummer_variant.ref_start,
                    'n',
                    mummer_variant.ref_base + str(mummer_variant.ref_start + 1) + mummer_variant.qry_base,
                    pymummer.variant.var_types[mummer_variant.var_type],
                    [mummer_variant],
                    matching_variants,
                    variants_at_this_position
                )

            used_known_variants.update(matching_variants, variants_at_this_position)
        else: # not at a known variant position in the reference
            var_tuple = (
                mummer_variant.ref_start,
                'n',
                mummer_variant.ref_base + str(mummer_variant.ref_start + 1) + mummer_variant.qry_base,
                pymummer.variant.var_types[mummer_variant.var_type],
                [mummer_variant],
                set(),
                set()
            )

        return var_tuple, used_known_variants


    @staticmethod
    def _get_one_variant_for_one_contig_coding(ref_sequence, refdata_var_dict, mummer_variants_list):
        aa_var_effect, aa_var_string, aa_var_position = AssemblyVariants._get_variant_effect(mummer_variants_list, ref_sequence)
        var_tuple = None
        used_known_variants = set()

        # if this variant is at the same position as a known variant in the reference
        if refdata_var_dict is not None and aa_var_position in refdata_var_dict['p']:
            if aa_var_effect == 'NONSYN':
                aa_variant = sequence_variant.Variant('p', aa_var_string, '.')
                variants_at_this_position = {x for x in refdata_var_dict['p'][aa_variant.position]}
                matching_variants = {x for x in variants_at_this_position if aa_variant.variant_value == x.variant.variant_value}
                not_interesting_variants = {x for x in variants_at_this_position if aa_variant.variant_value == x.variant.wild_value}
                variants_at_this_position = variants_at_this_position.difference(matching_variants)
            else:
                matching_variants = set()
                variants_at_this_position = refdata_var_dict['p'][aa_var_position]
                not_interesting_variants = set()

            if len(not_interesting_variants) == 0:
                var_tuple = (
                    aa_var_position,
                    'p',
                    aa_var_string,
                    aa_var_effect,
                    mummer_variants_list,
                    matching_variants,
                    variants_at_this_position
                )

            used_known_variants.update(matching_variants, variants_at_this_position)
        else: # this variant is not at a known position in the reference
            var_tuple = (
                aa_var_position,
                'p',
                aa_var_string,
                aa_var_effect,
                mummer_variants_list,
                set(),
                set()
            )

        return var_tuple, used_known_variants


    @staticmethod
    def _get_remaining_known_ref_variants(known_ref_variants, used_ref_variants, nucmer_coords):
        '''Finds variants where ref has the variant and so does the contig. Which means
           that there was no mummer call to flag it up so need to look through the known
           ref variants. Also need to check that the variant is in a nucmer match to an
           assembly contig.'''
        variants = []

        for ref_variant_pos, ref_variants_set in sorted(known_ref_variants.items()):
            for known_ref_variant in ref_variants_set:
                if known_ref_variant not in used_ref_variants:
                    variant_pos_matches_contig = False
                    pos = known_ref_variant.variant.position

                    if known_ref_variant.seq_type == 'n':
                        ref_interval = intervals.Interval(pos, pos)
                    elif known_ref_variant.seq_type == 'p':
                        ref_interval = intervals.Interval(3 * pos, 3 * pos + 2)
                    else:
                        raise Error('Unexpected variant type "' + known_ref_variant.variant_type + '" in _get_remaining_known_ref_variants. Cannot continue')

                    for interval in nucmer_coords:
                        if ref_interval.intersects(interval):
                            variant_pos_matches_contig = True
                            break

                    if variant_pos_matches_contig:
                        variants.append((None, known_ref_variant.seq_type, None, None, None, {known_ref_variant}, set()))

        return variants


    def get_variants(self, ref_sequence_name, allowed_ctg_coords, allowed_ref_coords, nucmer_matches=None):
        '''Nucmr coords = dict. Key=contig name. Value = list of intervals of ref coords that match the contig.
           Made by assembly_compare.AssemblyCompare.nucmer_hits_to_ref_coords
           Returns dictionary. Key=contig name. Value = list of variants. Each variant
           is a tuple: (
               0 = position,
               1 = type in ['n', 'p']
               2 = Variant string, eg 'D2E',
               3 = variant effect (as returned by _get_variant_effect)
               4 = list of pymummer.variant.Variant that made up this variant (could be more than one because of
                   variants in the same codon)
               5 = set {matching known variants from metadata (=sequence_metadata.SequenceMetadata)}
               6 = set {known ref metadata (=sequence_metadata.SequenceMetadata)  at same position as SNP}, excluding those from 4
           )
        '''
        mummer_variants = self._get_mummer_variants(self.nucmer_snp_file)
        variants = {}
        seq_type, is_variant_only = self.refdata.sequence_type(ref_sequence_name)
        ref_sequence = self.refdata.sequence(ref_sequence_name)

        if ref_sequence_name in self.refdata.metadata:
            refdata_var_dict = self.refdata.metadata[ref_sequence_name]
        else:
            refdata_var_dict = None

        known_non_wild_variants_in_ref = self.refdata.all_non_wild_type_variants(ref_sequence_name)

        for contig in allowed_ctg_coords:
            if contig not in allowed_ref_coords:
                continue

            used_known_variants = set()
            variants[contig] = []

            if contig in mummer_variants:
                for mummer_variant_list in mummer_variants[contig]:
                    ref_start = min([x.ref_start for x in mummer_variant_list])
                    ref_end = max([x.ref_end for x in mummer_variant_list])
                    ctg_start = min([x.qry_start for x in mummer_variant_list])
                    ctg_end = min([x.qry_end for x in mummer_variant_list])
                    ref_interval = intervals.Interval(ref_start, ref_end)
                    ctg_interval = intervals.Interval(ctg_start, ctg_end)
                    ref_ok = True in {x.intersects(ref_interval) for x in allowed_ref_coords[contig]}
                    qry_ok = True in {x.intersects(ctg_interval) for x in allowed_ctg_coords[contig]}

                    if not (ref_ok and qry_ok):
                        continue

                    if seq_type == 'p':
                        new_variant, used_variants = self._get_one_variant_for_one_contig_coding(ref_sequence, refdata_var_dict, mummer_variant_list)
                    else:
                        for mummer_variant in mummer_variant_list:
                            new_variant, used_variants = self._get_one_variant_for_one_contig_non_coding(refdata_var_dict, mummer_variant)

                    if new_variant is not None:
                            variants[contig].append(new_variant)
                    used_known_variants.update(used_variants)

            # for this contig, need to know all the ref sequence and coords it maps to.
            # Then report just the unused known variants, as the contig also has these variants
            if seq_type == 'p':
                new_variants = self._get_remaining_known_ref_variants(known_non_wild_variants_in_ref['p'], used_known_variants, allowed_ref_coords[contig])

            else:
                new_variants = self._get_remaining_known_ref_variants(known_non_wild_variants_in_ref['n'], used_known_variants, allowed_ref_coords[contig])

            if is_variant_only:
                new_variants = [x for x in new_variants if len(x[5]) > 0]

            variants[contig].extend(new_variants)
            if len(variants[contig]) == 0:
                del variants[contig]

        return variants