Skip to content

gene_merger

ensembl.io.genomio.gff3.gene_merger

Merge split genes in a GFF file.

GFFGeneMerger

Specialized class to merge split genes in a GFF3 file, prior to further parsing.

Source code in src/python/ensembl/io/genomio/gff3/gene_merger.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class GFFGeneMerger:
    """Specialized class to merge split genes in a GFF3 file, prior to further parsing."""

    def __init__(self) -> None:
        source = files(ensembl.io.genomio.data.gff3).joinpath("biotypes.json")
        with as_file(source) as biotypes_json:
            self._biotypes = get_json(biotypes_json)

    def merge(self, in_gff_path: PathLike, out_gff_path: PathLike) -> List[str]:
        """
        Merge genes in a gff that are split in multiple lines.

        Args:
            in_gff_path: Input GFF3 that may have split merge.
            out_gff_path: Output GFF3 with those genes merged.

        Returns:
            List of all merged genes, each represented as a string of the GFF3 lines of all their parts.
        """
        to_merge = []
        merged: List[str] = []

        with Path(in_gff_path).open("r") as in_gff_fh, Path(out_gff_path).open("w") as out_gff_fh:
            for line in in_gff_fh:
                # Skip comments
                if line.startswith("#"):
                    if line.startswith("##FASTA"):
                        logging.warning("This GFF3 file contains FASTA sequences")
                        break
                    out_gff_fh.write(line)
                else:
                    # Parse one line
                    line = line.rstrip()
                    fields = line.split("\t")
                    attr_fields = fields[8].split(";")
                    attrs = {}
                    for a in attr_fields:
                        (key, value) = a.split("=")
                        attrs[key] = value

                    # Check this is a gene to merge; cache it then
                    if fields[2] in self._biotypes["gene"]["supported"] and (
                        "part" in attrs or "is_ordered" in attrs
                    ):
                        to_merge.append(fields)

                    # If not, merge previous gene if needed, and print the line
                    else:
                        if to_merge:
                            merged_str = []
                            for line_to_merge in to_merge:
                                merged_str.append("\t".join(line_to_merge))
                            merged.append("\n".join(merged_str) + "\n")

                            new_line = self._merge_genes(to_merge)
                            out_gff_fh.write(new_line)
                            to_merge = []
                        out_gff_fh.write(line + "\n")

            # Print last merged gene if there is one
            if to_merge:
                merged_str = []
                for line_to_merge in to_merge:
                    merged_str.append("\t".join(line_to_merge))
                merged.append("\n".join(merged_str) + "\n")

                new_line = self._merge_genes(to_merge)
                out_gff_fh.write(new_line)

        logging.debug(f"Merged lines: {len(merged)}")
        return merged

    def _merge_genes(self, to_merge: List) -> str:
        """Returns a single gene gff3 line merged from separate parts.

        Args:
            to_merge: List of gff3 lines with gene parts.

        """
        min_start = -1
        max_end = -1
        for gene in to_merge:
            start = int(gene[3])
            end = int(gene[4])

            if start < min_start or min_start < 0:
                min_start = start
            if end > max_end or max_end < 0:
                max_end = end

        # Take the first line as template and replace things
        new_gene = to_merge[0]
        new_gene[3] = str(min_start)
        new_gene[4] = str(max_end)

        attrs = new_gene[8]
        attrs = attrs.replace(";is_ordered=true", "")
        attrs = re.sub(r";part=\d+/\d+", "", attrs)
        new_gene[8] = attrs

        return "\t".join(new_gene) + "\n"

merge(in_gff_path, out_gff_path)

Merge genes in a gff that are split in multiple lines.

Parameters:

Name Type Description Default
in_gff_path PathLike

Input GFF3 that may have split merge.

required
out_gff_path PathLike

Output GFF3 with those genes merged.

required

Returns:

Type Description
List[str]

List of all merged genes, each represented as a string of the GFF3 lines of all their parts.

Source code in src/python/ensembl/io/genomio/gff3/gene_merger.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def merge(self, in_gff_path: PathLike, out_gff_path: PathLike) -> List[str]:
    """
    Merge genes in a gff that are split in multiple lines.

    Args:
        in_gff_path: Input GFF3 that may have split merge.
        out_gff_path: Output GFF3 with those genes merged.

    Returns:
        List of all merged genes, each represented as a string of the GFF3 lines of all their parts.
    """
    to_merge = []
    merged: List[str] = []

    with Path(in_gff_path).open("r") as in_gff_fh, Path(out_gff_path).open("w") as out_gff_fh:
        for line in in_gff_fh:
            # Skip comments
            if line.startswith("#"):
                if line.startswith("##FASTA"):
                    logging.warning("This GFF3 file contains FASTA sequences")
                    break
                out_gff_fh.write(line)
            else:
                # Parse one line
                line = line.rstrip()
                fields = line.split("\t")
                attr_fields = fields[8].split(";")
                attrs = {}
                for a in attr_fields:
                    (key, value) = a.split("=")
                    attrs[key] = value

                # Check this is a gene to merge; cache it then
                if fields[2] in self._biotypes["gene"]["supported"] and (
                    "part" in attrs or "is_ordered" in attrs
                ):
                    to_merge.append(fields)

                # If not, merge previous gene if needed, and print the line
                else:
                    if to_merge:
                        merged_str = []
                        for line_to_merge in to_merge:
                            merged_str.append("\t".join(line_to_merge))
                        merged.append("\n".join(merged_str) + "\n")

                        new_line = self._merge_genes(to_merge)
                        out_gff_fh.write(new_line)
                        to_merge = []
                    out_gff_fh.write(line + "\n")

        # Print last merged gene if there is one
        if to_merge:
            merged_str = []
            for line_to_merge in to_merge:
                merged_str.append("\t".join(line_to_merge))
            merged.append("\n".join(merged_str) + "\n")

            new_line = self._merge_genes(to_merge)
            out_gff_fh.write(new_line)

    logging.debug(f"Merged lines: {len(merged)}")
    return merged