Skip to content

fasta

ensembl.io.genomio.fasta

Fasta files processing module.

CompareFasta

Read and compare the FASTA sequences.

Source code in src/python/ensembl/io/genomio/fasta/compare.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class CompareFasta:
    """Read and compare the FASTA sequences."""

    def __init__(self, fasta_ext: Path, fasta_core: Path, output_dir: Path) -> None:
        """
        Initialize the `CompareFasta` with input fasta files and output directory.

        Args:
            fasta_ext: Path to INSDC fasta file.
            fasta_core: Path to core db fasta file.
            output_dir: Directory where comparison logs will be stored.

        """
        self.fasta_ext = Path(fasta_ext)
        self.fasta_core = Path(fasta_core)
        self.output_dir = Path(output_dir)
        self.comp: list[str] = []

    def compare_seqs(self) -> None:
        """Compare two FASTA files for common, unique, and differing sequences.
        Use `write_results()` to generate a report.
        """
        seq_ext = self.read_fasta(self.fasta_ext)
        seq_core = self.read_fasta(self.fasta_core)

        # Compare sequences
        seq_ext_dict = self.build_seq_dict(seq_ext)
        seq_core_dict = self.build_seq_dict(seq_core)

        # Compare number of sequences
        if len(seq_ext) != len(seq_core):
            self.comp.append(
                "WARNING: Different number of sequences: "
                f"fasta_ext [ n = {len(seq_ext)} ] -Vs- fasta_core [ n = {len(seq_core)} ]"
            )
            logging.warning("Different number of sequences: fasta_ext compared to fasta_core")

        common = self.find_common_groups(seq_ext_dict, seq_core_dict)

        # Sequences that are not common
        only1 = {seq: group for seq, group in seq_ext_dict.items() if not seq in seq_core_dict}

        only2 = {seq: group for seq, group in seq_core_dict.items() if not seq in seq_ext_dict}

        if only1:
            self.comp.append(f"Sequences only in Fasta_1: {', '.join([str(ids) for ids in only1.values()])}")
        if only2:
            self.comp.append(f"Sequences only in Fasta_2: {', '.join([str(ids) for ids in only2.values()])}")

        if common:
            self.comp.append(f"Common ids: {', '.join([str(common_ids) for common_ids in common.items()])}")

        # Check for sequences with extra Ns
        if only1 and only2:
            self.compare_seq_for_Ns(only1, only2)

        # Write results to file
        self.write_results()

    def read_fasta(self, fasta_path: Path) -> dict[str, str]:
        """Reads a FASTA file and returns a dictionary mapping sequence IDs to sequences.

        Args:
            fasta_path: Path to the FASTA file. Supports gzipped files.

        Returns:
            A dictionary where keys are sequence IDs and values are sequences with all non-CGTA
                characters replaced by "N".
        """
        logging.info(f"Read fasta file {fasta_path}")
        sequences = {}
        with open_gz_file(fasta_path) as fasta_fh:
            for rec in SeqIO.parse(fasta_fh, "fasta"):
                name = rec.id
                sequences[name] = re.sub(r"[^CGTA]", "N", str(rec.seq.upper()))
        return sequences

    def build_seq_dict(self, seqs: dict[str, str]) -> dict[str, SeqGroup]:
        """Builds a dictionary of unique sequences and their associated IDs, accounting for duplicates.

        Args:
            seqs: A dictionary where keys are sequence IDs and values are sequences.

        Returns:
            A dictionary where keys are unique sequences and values are `SeqGroup` objects that
                group sequence IDs sharing the same sequence.
        """
        seqs_dict: dict[str, SeqGroup] = {}
        for name, seq in seqs.items():
            if seq in seqs_dict:
                seqs_dict[seq].add_id(name)
            else:
                seqs_dict[seq] = SeqGroup(name)

        return seqs_dict

    def find_common_groups(
        self, seq_ext_dict: dict[str, SeqGroup], seq_core_dict: dict[str, SeqGroup]
    ) -> dict[str, str]:
        """Find common sequences between two dictionaries and group them.

        Args:
            seq_ext_dict: Dictionary of sequences from the first dataset.
            seq_core_dict: Dictionary of sequences from the second dataset.

        Returns:
            A dictionary of common sequence mappings and a list of comparison results.

        """
        common = {}

        for seq1, group1 in seq_ext_dict.items():
            if seq1 in seq_core_dict:
                group2 = seq_core_dict[seq1]
                # Check that the 2 groups have the same number of sequences
                if group1.count == group2.count:
                    if group1.count == 1:
                        common[group1.ids[0]] = group2.ids[0]
                    else:
                        self.comp.append(f"Matched 2 identical groups of sequences: {group1} and {group2}")
                        # Map each ID in group1 to a possible group2 ID
                        possible_id2 = " OR ".join(group2.ids)
                        common.update({id1: possible_id2 for id1 in group1.ids})
                else:
                    self.comp.append(
                        "Matched 2 different groups of sequences"
                        f" ({group1.count} vs {group2.count}): {group1} and {group2}"
                    )

        return common

    def write_results(self) -> None:
        """Write the comparison results to a file in the output directory."""
        output_file = self.output_dir / "compare.log"
        observed_compare = set()

        logging.info(f"Writing results to {output_file}")
        with open(output_file, "w") as out_fh:
            for line in self.comp:
                if line not in observed_compare:
                    observed_compare.add(line)
                    out_fh.write(str(line) + "\n")

    def compare_seq_for_Ns(self, only1: dict[str, SeqGroup], only2: dict[str, SeqGroup]) -> None:
        """Compare sequences in `only1` and `only2` for differences in `N` content and length.

        Args:
            only1: Sequences unique to the first dataset, mapping sequence to group/identifier.
            only2: Sequences unique to the second dataset, mapping sequence to group/identifier.

        """
        # sequences which have extra N at the end
        for seq_1, name1 in only1.items():
            len1 = len(seq_1)
            seq1_N = seq_1.count("N")

            for seq_2, name2 in only2.items():
                len2 = len(seq_2)
                seq2_N = seq_2.count("N")

                if abs(seq1_N - seq2_N) == abs(len1 - len2):
                    self.comp.append(f"Please check extra Ns added in the accessions {name1} and {name2}")
                else:
                    self.comp.append(
                        f"ALERT INSERTIONS at the end or diff assembly level {name1} and {name2}"
                    )

                if len1 == len2:
                    if seq2_N > seq1_N:
                        self.comp.append(f"your fasta_core has more Ns, check {name1} and {name2}")
                    else:
                        self.comp.append(f"sequences have the same length, check {name1} and {name2}")

comp = [] instance-attribute

fasta_core = Path(fasta_core) instance-attribute

fasta_ext = Path(fasta_ext) instance-attribute

output_dir = Path(output_dir) instance-attribute

build_seq_dict(seqs)

Builds a dictionary of unique sequences and their associated IDs, accounting for duplicates.

Parameters:

Name Type Description Default
seqs dict[str, str]

A dictionary where keys are sequence IDs and values are sequences.

required

Returns:

Type Description
dict[str, SeqGroup]

A dictionary where keys are unique sequences and values are SeqGroup objects that group sequence IDs sharing the same sequence.

Source code in src/python/ensembl/io/genomio/fasta/compare.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def build_seq_dict(self, seqs: dict[str, str]) -> dict[str, SeqGroup]:
    """Builds a dictionary of unique sequences and their associated IDs, accounting for duplicates.

    Args:
        seqs: A dictionary where keys are sequence IDs and values are sequences.

    Returns:
        A dictionary where keys are unique sequences and values are `SeqGroup` objects that
            group sequence IDs sharing the same sequence.
    """
    seqs_dict: dict[str, SeqGroup] = {}
    for name, seq in seqs.items():
        if seq in seqs_dict:
            seqs_dict[seq].add_id(name)
        else:
            seqs_dict[seq] = SeqGroup(name)

    return seqs_dict

compare_seq_for_Ns(only1, only2)

Compare sequences in only1 and only2 for differences in N content and length.

Parameters:

Name Type Description Default
only1 dict[str, SeqGroup]

Sequences unique to the first dataset, mapping sequence to group/identifier.

required
only2 dict[str, SeqGroup]

Sequences unique to the second dataset, mapping sequence to group/identifier.

required
Source code in src/python/ensembl/io/genomio/fasta/compare.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def compare_seq_for_Ns(self, only1: dict[str, SeqGroup], only2: dict[str, SeqGroup]) -> None:
    """Compare sequences in `only1` and `only2` for differences in `N` content and length.

    Args:
        only1: Sequences unique to the first dataset, mapping sequence to group/identifier.
        only2: Sequences unique to the second dataset, mapping sequence to group/identifier.

    """
    # sequences which have extra N at the end
    for seq_1, name1 in only1.items():
        len1 = len(seq_1)
        seq1_N = seq_1.count("N")

        for seq_2, name2 in only2.items():
            len2 = len(seq_2)
            seq2_N = seq_2.count("N")

            if abs(seq1_N - seq2_N) == abs(len1 - len2):
                self.comp.append(f"Please check extra Ns added in the accessions {name1} and {name2}")
            else:
                self.comp.append(
                    f"ALERT INSERTIONS at the end or diff assembly level {name1} and {name2}"
                )

            if len1 == len2:
                if seq2_N > seq1_N:
                    self.comp.append(f"your fasta_core has more Ns, check {name1} and {name2}")
                else:
                    self.comp.append(f"sequences have the same length, check {name1} and {name2}")

compare_seqs()

Compare two FASTA files for common, unique, and differing sequences. Use write_results() to generate a report.

Source code in src/python/ensembl/io/genomio/fasta/compare.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def compare_seqs(self) -> None:
    """Compare two FASTA files for common, unique, and differing sequences.
    Use `write_results()` to generate a report.
    """
    seq_ext = self.read_fasta(self.fasta_ext)
    seq_core = self.read_fasta(self.fasta_core)

    # Compare sequences
    seq_ext_dict = self.build_seq_dict(seq_ext)
    seq_core_dict = self.build_seq_dict(seq_core)

    # Compare number of sequences
    if len(seq_ext) != len(seq_core):
        self.comp.append(
            "WARNING: Different number of sequences: "
            f"fasta_ext [ n = {len(seq_ext)} ] -Vs- fasta_core [ n = {len(seq_core)} ]"
        )
        logging.warning("Different number of sequences: fasta_ext compared to fasta_core")

    common = self.find_common_groups(seq_ext_dict, seq_core_dict)

    # Sequences that are not common
    only1 = {seq: group for seq, group in seq_ext_dict.items() if not seq in seq_core_dict}

    only2 = {seq: group for seq, group in seq_core_dict.items() if not seq in seq_ext_dict}

    if only1:
        self.comp.append(f"Sequences only in Fasta_1: {', '.join([str(ids) for ids in only1.values()])}")
    if only2:
        self.comp.append(f"Sequences only in Fasta_2: {', '.join([str(ids) for ids in only2.values()])}")

    if common:
        self.comp.append(f"Common ids: {', '.join([str(common_ids) for common_ids in common.items()])}")

    # Check for sequences with extra Ns
    if only1 and only2:
        self.compare_seq_for_Ns(only1, only2)

    # Write results to file
    self.write_results()

find_common_groups(seq_ext_dict, seq_core_dict)

Find common sequences between two dictionaries and group them.

Parameters:

Name Type Description Default
seq_ext_dict dict[str, SeqGroup]

Dictionary of sequences from the first dataset.

required
seq_core_dict dict[str, SeqGroup]

Dictionary of sequences from the second dataset.

required

Returns:

Type Description
dict[str, str]

A dictionary of common sequence mappings and a list of comparison results.

Source code in src/python/ensembl/io/genomio/fasta/compare.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def find_common_groups(
    self, seq_ext_dict: dict[str, SeqGroup], seq_core_dict: dict[str, SeqGroup]
) -> dict[str, str]:
    """Find common sequences between two dictionaries and group them.

    Args:
        seq_ext_dict: Dictionary of sequences from the first dataset.
        seq_core_dict: Dictionary of sequences from the second dataset.

    Returns:
        A dictionary of common sequence mappings and a list of comparison results.

    """
    common = {}

    for seq1, group1 in seq_ext_dict.items():
        if seq1 in seq_core_dict:
            group2 = seq_core_dict[seq1]
            # Check that the 2 groups have the same number of sequences
            if group1.count == group2.count:
                if group1.count == 1:
                    common[group1.ids[0]] = group2.ids[0]
                else:
                    self.comp.append(f"Matched 2 identical groups of sequences: {group1} and {group2}")
                    # Map each ID in group1 to a possible group2 ID
                    possible_id2 = " OR ".join(group2.ids)
                    common.update({id1: possible_id2 for id1 in group1.ids})
            else:
                self.comp.append(
                    "Matched 2 different groups of sequences"
                    f" ({group1.count} vs {group2.count}): {group1} and {group2}"
                )

    return common

read_fasta(fasta_path)

Reads a FASTA file and returns a dictionary mapping sequence IDs to sequences.

Parameters:

Name Type Description Default
fasta_path Path

Path to the FASTA file. Supports gzipped files.

required

Returns:

Type Description
dict[str, str]

A dictionary where keys are sequence IDs and values are sequences with all non-CGTA characters replaced by "N".

Source code in src/python/ensembl/io/genomio/fasta/compare.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def read_fasta(self, fasta_path: Path) -> dict[str, str]:
    """Reads a FASTA file and returns a dictionary mapping sequence IDs to sequences.

    Args:
        fasta_path: Path to the FASTA file. Supports gzipped files.

    Returns:
        A dictionary where keys are sequence IDs and values are sequences with all non-CGTA
            characters replaced by "N".
    """
    logging.info(f"Read fasta file {fasta_path}")
    sequences = {}
    with open_gz_file(fasta_path) as fasta_fh:
        for rec in SeqIO.parse(fasta_fh, "fasta"):
            name = rec.id
            sequences[name] = re.sub(r"[^CGTA]", "N", str(rec.seq.upper()))
    return sequences

write_results()

Write the comparison results to a file in the output directory.

Source code in src/python/ensembl/io/genomio/fasta/compare.py
194
195
196
197
198
199
200
201
202
203
204
def write_results(self) -> None:
    """Write the comparison results to a file in the output directory."""
    output_file = self.output_dir / "compare.log"
    observed_compare = set()

    logging.info(f"Writing results to {output_file}")
    with open(output_file, "w") as out_fh:
        for line in self.comp:
            if line not in observed_compare:
                observed_compare.add(line)
                out_fh.write(str(line) + "\n")

FastaParserError

Bases: Exception

Error while parsing a FASTA file.

Source code in src/python/ensembl/io/genomio/fasta/process.py
35
36
class FastaParserError(Exception):
    """Error while parsing a FASTA file."""

SeqGroup

Represents a group of sequence identifiers and maintains a count of them.

Source code in src/python/ensembl/io/genomio/fasta/compare.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class SeqGroup:
    """Represents a group of sequence identifiers and maintains a count of them."""

    def __init__(self, identifier: str | None = None) -> None:
        """Initializes a `SeqGroup` instance.

        Args:
            identifier: The first identifier to add to the group. If `None`, adds "None" as the identifier.

        """
        self.ids: list[str] = [str(identifier)]
        self.count = 1

    def __str__(self) -> str:
        """Return a comma-separated string of sequence identifiers."""
        return ", ".join(self.ids)

    def add_id(self, identifier: str | None = None) -> None:
        """Add a new identifier to the group and updates the count.

        Args:
            identifier: The identifier to add. If `None`, "None" is added instead.

        """
        self.ids.append(str(identifier))
        self.count += 1

count = 1 instance-attribute

ids = [str(identifier)] instance-attribute

add_id(identifier=None)

Add a new identifier to the group and updates the count.

Parameters:

Name Type Description Default
identifier str | None

The identifier to add. If None, "None" is added instead.

None
Source code in src/python/ensembl/io/genomio/fasta/compare.py
52
53
54
55
56
57
58
59
60
def add_id(self, identifier: str | None = None) -> None:
    """Add a new identifier to the group and updates the count.

    Args:
        identifier: The identifier to add. If `None`, "None" is added instead.

    """
    self.ids.append(str(identifier))
    self.count += 1

check_chunk_size_and_tolerance(chunk_size, chunk_tolerance, error_f=_on_value_error)

Check the chunk size and the tolerance are positive and chunk size is not too small

Parameters:

Name Type Description Default
chunk_size int

Chunk size to check

required
chunk_tolerance int

Chunk tolerance to check

required
Dies

If checks failed dies with parser.error

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def check_chunk_size_and_tolerance(
    chunk_size: int,
    chunk_tolerance: int,
    error_f: Callable[[str], None] = _on_value_error,
) -> None:
    """Check the chunk size and the tolerance are positive and chunk size is not too small

    Args:
        chunk_size: Chunk size to check
        chunk_tolerance: Chunk tolerance to check

    Dies:
        If checks failed dies with `parser.error`
    """
    if chunk_size < 50_000:
        error_f(f"wrong '--chunk_size' value: '{chunk_size}'. should be greater then 50_000. exiting...")
    if chunk_tolerance < 0:
        error_f(f"wrong '--chunk_tolerance' value: '{chunk_tolerance}'. can't be less then 0. exiting...")

chunk_fasta(input_fasta_file, chunk_size, chunk_size_tolerated, out_file_name, individual_file_prefix, *, agp_output_file=None, n_sequence_len=0, chunk_sfx='ens_chunk', append_offset_to_chunk_name=None)

Open input_fasta_file and split into a smaller chunks based on stretches of "N"s and then based on chunk_size_tolerated and store either to the out_file_name if no individual_file_prefix is provided or store each individual chunk to a file starting with non-empty individual_file_prefix.

Parameters:

Name Type Description Default
input_fasta_file str

Input FASTA

required
chunk_size int

Size of the chunks to split into.

required
chunk_size_tolerated int

If more flexibility allowed, use this as the maximum size of a chunk.

required
out_file_name str

Output FASTA to store the chunks into if no individual_file_prefix is provided.

required
individual_file_prefix Optional[Path]

A file path prefix including dirs and filenames part to use as a first part of the chunk file name.

required
agp_output_file Optional[str]

Output AGP file to store the map for the chunking procedure if present and non-empty.

None
n_sequence_len int

Length of the stretch of Ns to split at; not slitting on Ns if 0.

0
chunk_sfx str

A string to put between the original sequence name and the chunk suffix.

'ens_chunk'
append_offset_to_chunk_name Optional[bool]

Append 0-based offset in the form of _off_{offset} to the chunk name.

None
Source code in src/python/ensembl/io/genomio/fasta/chunk.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def chunk_fasta(
    input_fasta_file: str,
    chunk_size: int,
    chunk_size_tolerated: int,
    out_file_name: str,
    individual_file_prefix: Optional[Path],
    *,
    agp_output_file: Optional[str] = None,
    n_sequence_len: int = 0,
    chunk_sfx: str = "ens_chunk",
    append_offset_to_chunk_name: Optional[bool] = None,
) -> None:
    """Open `input_fasta_file` and split into a smaller chunks based on
    stretches of "N"s and then based on chunk_size_tolerated and store either to
    the `out_file_name` if no `individual_file_prefix` is provided or
    store each individual chunk to a file starting with non-empty `individual_file_prefix`.

    Args:
        input_fasta_file: Input FASTA
        chunk_size: Size of the chunks to split into.
        chunk_size_tolerated: If more flexibility allowed, use this as the maximum size of a chunk.
        out_file_name: Output FASTA to store the chunks into if no `individual_file_prefix` is provided.
        individual_file_prefix: A file path prefix including dirs and filenames part to use as a
                first part of the chunk file name.
        agp_output_file: Output AGP file to store the map for the chunking procedure if present and non-empty.
        n_sequence_len: Length of the stretch of `N`s to split at; not slitting on `N`s if 0.
        chunk_sfx: A string to put between the original sequence name and the chunk suffix.
        append_offset_to_chunk_name: Append 0-based offset in the form of `_off_{offset}` to the chunk name.
    """

    # process input fasta
    with open_gz_file(input_fasta_file) as fasta:
        logging.info(
            f"splitting sequences from '{input_fasta_file}', chunk size {chunk_size:_}, \
                splitting on {n_sequence_len} Ns (0 -- disabled)"
        )
        # do not open a joined file if you plan to open many individual ones
        with (
            individual_file_prefix
            and nullcontext(None)
            or open(out_file_name, "wt", encoding="utf-8") as out_file_joined
        ):
            agp_lines = chunk_fasta_stream(
                fasta,
                chunk_size,
                chunk_size_tolerated,
                out_file_joined,
                individual_file_prefix,
                n_sequence_len=n_sequence_len,
                chunk_sfx=chunk_sfx,
                append_offset_to_chunk_name=append_offset_to_chunk_name,
            )

        # dump AGP
        if agp_output_file:
            with open(agp_output_file, "w", encoding="utf-8") as agp_out:
                agp_out.write("\n".join(agp_lines) + "\n")

chunk_fasta_stream(input_fasta, chunk_size, chunk_size_tolerated, output_fasta, individual_file_prefix, *, n_sequence_len=0, chunk_sfx='ens_chunk', append_offset_to_chunk_name=None, open_individual=_individual_file_opener)

Split input TextIOWrapper stream with fasta into a smaller chunks based on stretches of "N"s and then based on chunk_size_tolerated and store either to the output_fasta stream (if valid) or to the files created by invocation of the open_individual callable.

Parameters:

Name Type Description Default
input_fasta TextIOWrapper

Input FASTA as the TextIOWrapper stream.

required
chunk_size int

Size of the chunks to split into.

required
chunk_size_tolerated int

If more flexibility allowed, use this as the maximum size of a chunk.

required
output_fasta Optional[TextIOWrapper] | nullcontext[Any]

Output FASTA TextIOWrapper stream to store the chunks into, if none or False, open_individual is used (see below).

required
individual_file_prefix Optional[Path]

A file path prefix including dirs and filenames part to use as a first part of the chunk file name.

required
n_sequence_len int

Length of the stretch of Ns to split at; not slitting on Ns if 0.

0
chunk_sfx str

A string to put between the original sequence name and the chunk suffix.

'ens_chunk'
append_offset_to_chunk_name Optional[bool]

A flag to append 0-based offset (_off_{offset}) to the chunk name.

None
open_individual Callable[[str], ContextManager[Any]]

A callable taking filename as an input to generate the output file for individual contig if out_put FASTA is false of None, folders should be preexisting.

_individual_file_opener
Source code in src/python/ensembl/io/genomio/fasta/chunk.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def chunk_fasta_stream(
    input_fasta: TextIOWrapper,
    chunk_size: int,
    chunk_size_tolerated: int,
    output_fasta: Optional[TextIOWrapper] | nullcontext[Any],
    individual_file_prefix: Optional[Path],
    *,
    n_sequence_len: int = 0,
    chunk_sfx: str = "ens_chunk",
    append_offset_to_chunk_name: Optional[bool] = None,
    open_individual: Callable[[str], ContextManager[Any]] = _individual_file_opener,
) -> list[str]:
    """Split input TextIOWrapper stream with fasta into a smaller chunks based on
    stretches of "N"s and then based on chunk_size_tolerated and store either to
    the output_fasta stream (if valid) or to the files created by
    invocation of the `open_individual` callable.

    Args:
        input_fasta: Input FASTA as the TextIOWrapper stream.
        chunk_size: Size of the chunks to split into.
        chunk_size_tolerated: If more flexibility allowed, use this as the maximum size of a chunk.
        output_fasta: Output FASTA TextIOWrapper stream to store the chunks into,
                if none or False, `open_individual` is used (see below).
        individual_file_prefix: A file path prefix including dirs and filenames part to use as a
                first part of the chunk file name.
        n_sequence_len: Length of the stretch of `N`s to split at; not slitting on `N`s if 0.
        chunk_sfx: A string to put between the original sequence name and the chunk suffix.
        append_offset_to_chunk_name: A flag to append 0-based offset (`_off_{offset}`) to the chunk name.
        open_individual: A callable taking filename as an input to generate the output file for
                individual contig if out_put FASTA is `false` of `None`, folders should be preexisting.
    """

    chunk_size_tolerated = max(chunk_size, chunk_size_tolerated)
    # output agp_lines list
    agp_lines = []

    # make sure not used for n_seq <= 0
    n_split_regex = None
    if n_sequence_len > 0:
        pattern = f"(N{{{n_sequence_len},}})"
        n_split_regex = re.compile(pattern)

    # process stream
    fasta_parser = SeqIO.parse(input_fasta, "fasta")
    for rec_count, rec in enumerate(fasta_parser, start=1):
        rec_name = str(rec.name)

        ends = split_seq_by_n(str(rec.seq), n_split_regex)
        ends = split_seq_by_chunk_size(ends, chunk_size, chunk_size_tolerated)

        offset = 0
        for chunk, chunk_end in enumerate(ends, start=1):
            chunk_name = f"{rec_name}_{chunk_sfx}_{chunk:03d}"
            chunk_file_name = ""
            if individual_file_prefix:
                chunk_file_name = f"{individual_file_prefix}.{rec_count:03d}.{chunk:03d}.fa"
            if append_offset_to_chunk_name:
                chunk_name += f"_off_{offset}"

            rec_from = offset + 1
            rec_to = chunk_end
            chunk_len = chunk_end - offset

            # form agp lines
            agp_line = f"{rec_name}\t{rec_from}\t{rec_to}\t{chunk}\tW\t{chunk_name}\t1\t{chunk_len}\t+"
            agp_lines.append(agp_line)

            # use agp lines as fasta description
            agp_line = agp_line.replace("\t", " ")
            logging.info(f"Dumping {chunk_name} AGP {agp_line}")

            # get slice and put it out
            tmp_record = SeqRecord(
                Seq(rec.seq[offset:chunk_end]),
                id=chunk_name,
                description=f"AGP {agp_line}",
                name="",
            )

            # if user specified chunks -- store each chunk in an individual output file
            with output_fasta and nullcontext(output_fasta) or open_individual(chunk_file_name) as out_file:
                out_file.write(tmp_record.format("fasta"))  # type: ignore[union-attr]

            del tmp_record
            offset = chunk_end

    return agp_lines

get_peptides_to_exclude(genbank_path, seqr_to_exclude)

Extract peptide IDs from a genbank file that are in a given list of seq regions

Source code in src/python/ensembl/io/genomio/fasta/process.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def get_peptides_to_exclude(genbank_path: PathLike, seqr_to_exclude: Set[str]) -> Set[str]:
    """
    Extract peptide IDs from a genbank file that are in a given list of seq regions
    """
    peptides_to_exclude: Set[str] = set()
    with open_gz_file(genbank_path) as in_genbank:
        for record in SeqIO.parse(in_genbank, "genbank"):
            if record.id in seqr_to_exclude:
                logging.info(f"Skip sequence {record.id}")
                for feat in record.features:
                    if feat.type == "CDS":
                        if "protein_id" in feat.qualifiers:
                            feat_id = feat.qualifiers["protein_id"]
                            peptides_to_exclude.add(feat_id[0])
                        else:
                            raise FastaParserError(f"Peptide without peptide ID ${feat}")
    return peptides_to_exclude

get_tolerated_size(size, tolerance)

Calculate max tolerated size of the chunk based on initial size and percent of allowed deviation.

Parameters:

Name Type Description Default
size int

Base chunk size

required
tolerance int

Percent of allowed deviance as integer.

required

Returns:

Type Description
int

Maximum tolerated chunk size.

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def get_tolerated_size(size: int, tolerance: int) -> int:
    """Calculate max tolerated size of the chunk based on initial size and percent of allowed deviation.

    Args:
        size: Base chunk size
        tolerance: Percent of allowed deviance as integer.

    Returns:
        Maximum tolerated chunk size.
    """
    tolerance = max(tolerance, 0)

    tolerated_size = size
    tolerated_size += size * tolerance // 100
    return tolerated_size

prep_fasta_data(fasta_infile, genbank_infile, fasta_outfile, peptide_mode=False)

Parameters:

Name Type Description Default
fasta_file

Input FASTA file - DNA / Protein

required
genbank_infile Optional[PathLike]

Input GenBank GBFF file (Optional)

required
fasta_outfile PathLike

Output FASTA sequence file.

required
peptide_mode bool

Process proteins instead of DNA

False
Source code in src/python/ensembl/io/genomio/fasta/process.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def prep_fasta_data(
    fasta_infile: PathLike,
    genbank_infile: Optional[PathLike],
    fasta_outfile: PathLike,
    peptide_mode: bool = False,
) -> None:
    """
    Args:
        fasta_file: Input FASTA file - DNA / Protein
        genbank_infile: Input GenBank GBFF file (Optional)
        fasta_outfile: Output FASTA sequence file.
        peptide_mode: Process proteins instead of DNA
    """
    file_path = Path(fasta_infile)

    to_exclude = set()
    seqr_to_exclude = set(exclude_seq_regions)
    if peptide_mode:
        if genbank_infile is not None:
            genbank_path = Path(genbank_infile)
            to_exclude = get_peptides_to_exclude(genbank_path, seqr_to_exclude)
    else:
        to_exclude = seqr_to_exclude

    # Copy and filter
    records = []

    # Final path
    with open_gz_file(file_path) as in_fasta:
        for record in SeqIO.parse(in_fasta, "fasta"):
            if record.id in to_exclude:
                logging.info(f"Skip record ${record.id}")
            else:
                records.append(record)
    with Path(fasta_outfile).open("w") as out_fasta:
        SeqIO.write(records, out_fasta, "fasta")

prepare_out_dir_for_individuals(dir_name, file_part)

Creates dir_name (including upstream dirs) and returns its paths with the file_part appended.

Parameters:

Name Type Description Default
dir_name Path

Directory to create.

required
file_part str

File part to append.

required

Returns:

Type Description
Optional[Path]

dir_name with appended file_part.

Throws

exception if not able to create directory.

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def prepare_out_dir_for_individuals(dir_name: Path, file_part: str) -> Optional[Path]:
    """Creates `dir_name` (including upstream dirs) and returns its paths with the `file_part` appended.

    Args:
        dir_name: Directory to create.
        file_part: File part to append.

    Returns:
        `dir_name` with appended `file_part`.

    Throws:
        exception if not able to create directory.
    """
    file_prefix = None
    if dir_name:
        dir_name.mkdir(parents=True, exist_ok=True)
        file_prefix = Path(dir_name, file_part)
    return file_prefix

split_seq_by_chunk_size(ends, chunk_size, tolerated_size=None)

Split list of end coordinates, to form chunks not longer then chunk_size.

Parameters:

Name Type Description Default
ends list[int]

List of one or more chunk(s) to split a sequence.

required
chunk_size int

Size of chunks to split a sequence into.

required
tolerated_size Optional[int]

Threshold to use instead of chunk_size to determine when to split a sequence.

None

Returns:

Type Description
list[int]

List with open coordinates of the chunks ends (or with only a single sequence length).

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def split_seq_by_chunk_size(
    ends: list[int], chunk_size: int, tolerated_size: Optional[int] = None
) -> list[int]:
    """Split list of end coordinates, to form chunks not longer then
    chunk_size.

    Args:
        ends: List of one or more chunk(s) to split a sequence.
        chunk_size: Size of chunks to split a sequence into.
        tolerated_size: Threshold to use instead of `chunk_size` to determine when to split a sequence.

    Returns:
        List with open coordinates of the chunks ends (or with only a single sequence length).
    """
    if not ends or chunk_size < 1:
        return ends

    if tolerated_size is None or tolerated_size < chunk_size:
        tolerated_size = chunk_size
    result = []
    offset = 0
    for chunk_end in ends:
        chunk_len = chunk_end - offset
        if chunk_len > tolerated_size:
            # exclude starts, as they are 0 or pushed as previous chunk_ends
            result += list(range(offset, chunk_end, chunk_size))[1:]
        result.append(chunk_end)
        offset = chunk_end
    return result

split_seq_by_n(seq, split_pattern)

Split a string into chunks at the positions where the pattern is found. Ns (pattern) are appended to the chunk on the left.

The end point of each chunk will correspond to the end of the matching part.

Parameters:

Name Type Description Default
seq str

Sequence to be split into chunks.

required
split_pattern Optional[Pattern]

Pattern to search in the sequence.

required

Returns:

Type Description
list[int]

List with open coordinates of the chunks ends (or with only a single sequence length).

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def split_seq_by_n(seq: str, split_pattern: Optional[re.Pattern]) -> list[int]:
    """Split a string into chunks at the positions where the
    pattern is found. `N`s (pattern) are appended to the chunk on the left.

    The end point of each chunk will correspond to the end
    of the matching part.

    Args:
        seq: Sequence to be split into chunks.
        split_pattern: Pattern to search in the sequence.

    Returns:
        List with open coordinates of the chunks ends (or with only a single sequence length).
    """
    seq_len = len(seq)
    if not split_pattern:
        return [seq_len]
    split_points = [m.end() for m in split_pattern.finditer(seq)]
    if not split_points or split_points[-1] != seq_len:
        split_points.append(seq_len)
    return split_points