Specialized class to merge split genes in a GFF3 file, prior to further parsing.
Source code in src/python/ensembl/io/genomio/gff3/gene_merger.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 | class GFFGeneMerger:
"""Specialized class to merge split genes in a GFF3 file, prior to further parsing."""
def __init__(self) -> None:
source = files(ensembl.io.genomio.data.gff3).joinpath("biotypes.json")
with as_file(source) as biotypes_json:
self._biotypes = get_json(biotypes_json)
def merge(self, in_gff_path: PathLike, out_gff_path: PathLike) -> List[str]:
"""
Merge genes in a gff that are split in multiple lines.
Args:
in_gff_path: Input GFF3 that may have split merge.
out_gff_path: Output GFF3 with those genes merged.
Returns:
List of all merged genes, each represented as a string of the GFF3 lines of all their parts.
"""
to_merge = []
merged: List[str] = []
with Path(in_gff_path).open("r") as in_gff_fh, Path(out_gff_path).open("w") as out_gff_fh:
for line in in_gff_fh:
# Skip comments
if line.startswith("#"):
if line.startswith("##FASTA"):
logging.warning("This GFF3 file contains FASTA sequences")
break
out_gff_fh.write(line)
else:
# Parse one line
line = line.rstrip()
fields = line.split("\t")
attr_fields = fields[8].split(";")
attrs = {}
for a in attr_fields:
(key, value) = a.split("=")
attrs[key] = value
# Check this is a gene to merge; cache it then
if fields[2] in self._biotypes["gene"]["supported"] and (
"part" in attrs or "is_ordered" in attrs
):
to_merge.append(fields)
# If not, merge previous gene if needed, and print the line
else:
if to_merge:
merged_str = []
for line_to_merge in to_merge:
merged_str.append("\t".join(line_to_merge))
merged.append("\n".join(merged_str) + "\n")
new_line = self._merge_genes(to_merge)
out_gff_fh.write(new_line)
to_merge = []
out_gff_fh.write(line + "\n")
# Print last merged gene if there is one
if to_merge:
merged_str = []
for line_to_merge in to_merge:
merged_str.append("\t".join(line_to_merge))
merged.append("\n".join(merged_str) + "\n")
new_line = self._merge_genes(to_merge)
out_gff_fh.write(new_line)
logging.debug(f"Merged lines: {len(merged)}")
return merged
def _merge_genes(self, to_merge: List) -> str:
"""Returns a single gene gff3 line merged from separate parts.
Args:
to_merge: List of gff3 lines with gene parts.
"""
min_start = -1
max_end = -1
for gene in to_merge:
start = int(gene[3])
end = int(gene[4])
if start < min_start or min_start < 0:
min_start = start
if end > max_end or max_end < 0:
max_end = end
# Take the first line as template and replace things
new_gene = to_merge[0]
new_gene[3] = str(min_start)
new_gene[4] = str(max_end)
attrs = new_gene[8]
attrs = attrs.replace(";is_ordered=true", "")
attrs = re.sub(r";part=\d+/\d+", "", attrs)
new_gene[8] = attrs
return "\t".join(new_gene) + "\n"
|
merge(in_gff_path, out_gff_path)
Merge genes in a gff that are split in multiple lines.
Parameters:
Name |
Type |
Description |
Default |
in_gff_path
|
PathLike
|
Input GFF3 that may have split merge.
|
required
|
out_gff_path
|
PathLike
|
Output GFF3 with those genes merged.
|
required
|
Returns:
Type |
Description |
List[str]
|
List of all merged genes, each represented as a string of the GFF3 lines of all their parts.
|
Source code in src/python/ensembl/io/genomio/gff3/gene_merger.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 | def merge(self, in_gff_path: PathLike, out_gff_path: PathLike) -> List[str]:
"""
Merge genes in a gff that are split in multiple lines.
Args:
in_gff_path: Input GFF3 that may have split merge.
out_gff_path: Output GFF3 with those genes merged.
Returns:
List of all merged genes, each represented as a string of the GFF3 lines of all their parts.
"""
to_merge = []
merged: List[str] = []
with Path(in_gff_path).open("r") as in_gff_fh, Path(out_gff_path).open("w") as out_gff_fh:
for line in in_gff_fh:
# Skip comments
if line.startswith("#"):
if line.startswith("##FASTA"):
logging.warning("This GFF3 file contains FASTA sequences")
break
out_gff_fh.write(line)
else:
# Parse one line
line = line.rstrip()
fields = line.split("\t")
attr_fields = fields[8].split(";")
attrs = {}
for a in attr_fields:
(key, value) = a.split("=")
attrs[key] = value
# Check this is a gene to merge; cache it then
if fields[2] in self._biotypes["gene"]["supported"] and (
"part" in attrs or "is_ordered" in attrs
):
to_merge.append(fields)
# If not, merge previous gene if needed, and print the line
else:
if to_merge:
merged_str = []
for line_to_merge in to_merge:
merged_str.append("\t".join(line_to_merge))
merged.append("\n".join(merged_str) + "\n")
new_line = self._merge_genes(to_merge)
out_gff_fh.write(new_line)
to_merge = []
out_gff_fh.write(line + "\n")
# Print last merged gene if there is one
if to_merge:
merged_str = []
for line_to_merge in to_merge:
merged_str.append("\t".join(line_to_merge))
merged.append("\n".join(merged_str) + "\n")
new_line = self._merge_genes(to_merge)
out_gff_fh.write(new_line)
logging.debug(f"Merged lines: {len(merged)}")
return merged
|