Skip to content

chunk

ensembl.io.genomio.fasta.chunk

Split a set of nucleotide sequence(s) (.fasta, .gz) into smaller chunks.

check_chunk_size_and_tolerance(chunk_size, chunk_tolerance, error_f=_on_value_error)

Check the chunk size and the tolerance are positive and chunk size is not too small

Parameters:

Name Type Description Default
chunk_size int

Chunk size to check

required
chunk_tolerance int

Chunk tolerance to check

required
Dies

If checks failed dies with parser.error

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def check_chunk_size_and_tolerance(
    chunk_size: int,
    chunk_tolerance: int,
    error_f: Callable[[str], None] = _on_value_error,
) -> None:
    """Check the chunk size and the tolerance are positive and chunk size is not too small

    Args:
        chunk_size: Chunk size to check
        chunk_tolerance: Chunk tolerance to check

    Dies:
        If checks failed dies with `parser.error`
    """
    if chunk_size < 50_000:
        error_f(f"wrong '--chunk_size' value: '{chunk_size}'. should be greater then 50_000. exiting...")
    if chunk_tolerance < 0:
        error_f(f"wrong '--chunk_tolerance' value: '{chunk_tolerance}'. can't be less then 0. exiting...")

chunk_fasta(input_fasta_file, chunk_size, chunk_size_tolerated, out_file_name, individual_file_prefix, *, agp_output_file=None, n_sequence_len=0, chunk_sfx='ens_chunk', append_offset_to_chunk_name=None)

Open input_fasta_file and split into a smaller chunks based on stretches of "N"s and then based on chunk_size_tolerated and store either to the out_file_name if no individual_file_prefix is provided or store each individual chunk to a file starting with non-empty individual_file_prefix.

Parameters:

Name Type Description Default
input_fasta_file str

Input FASTA

required
chunk_size int

Size of the chunks to split into.

required
chunk_size_tolerated int

If more flexibility allowed, use this as the maximum size of a chunk.

required
out_file_name str

Output FASTA to store the chunks into if no individual_file_prefix is provided.

required
individual_file_prefix Optional[Path]

A file path prefix including dirs and filenames part to use as a first part of the chunk file name.

required
agp_output_file Optional[str]

Output AGP file to store the map for the chunking procedure if present and non-empty.

None
n_sequence_len int

Length of the stretch of Ns to split at; not slitting on Ns if 0.

0
chunk_sfx str

A string to put between the original sequence name and the chunk suffix.

'ens_chunk'
append_offset_to_chunk_name Optional[bool]

Append 0-based offset in the form of _off_{offset} to the chunk name.

None
Source code in src/python/ensembl/io/genomio/fasta/chunk.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def chunk_fasta(
    input_fasta_file: str,
    chunk_size: int,
    chunk_size_tolerated: int,
    out_file_name: str,
    individual_file_prefix: Optional[Path],
    *,
    agp_output_file: Optional[str] = None,
    n_sequence_len: int = 0,
    chunk_sfx: str = "ens_chunk",
    append_offset_to_chunk_name: Optional[bool] = None,
) -> None:
    """Open `input_fasta_file` and split into a smaller chunks based on
    stretches of "N"s and then based on chunk_size_tolerated and store either to
    the `out_file_name` if no `individual_file_prefix` is provided or
    store each individual chunk to a file starting with non-empty `individual_file_prefix`.

    Args:
        input_fasta_file: Input FASTA
        chunk_size: Size of the chunks to split into.
        chunk_size_tolerated: If more flexibility allowed, use this as the maximum size of a chunk.
        out_file_name: Output FASTA to store the chunks into if no `individual_file_prefix` is provided.
        individual_file_prefix: A file path prefix including dirs and filenames part to use as a
                first part of the chunk file name.
        agp_output_file: Output AGP file to store the map for the chunking procedure if present and non-empty.
        n_sequence_len: Length of the stretch of `N`s to split at; not slitting on `N`s if 0.
        chunk_sfx: A string to put between the original sequence name and the chunk suffix.
        append_offset_to_chunk_name: Append 0-based offset in the form of `_off_{offset}` to the chunk name.
    """

    # process input fasta
    with open_gz_file(input_fasta_file) as fasta:
        logging.info(
            f"splitting sequences from '{input_fasta_file}', chunk size {chunk_size:_}, \
                splitting on {n_sequence_len} Ns (0 -- disabled)"
        )
        # do not open a joined file if you plan to open many individual ones
        with (
            individual_file_prefix
            and nullcontext(None)
            or open(out_file_name, "wt", encoding="utf-8") as out_file_joined
        ):
            agp_lines = chunk_fasta_stream(
                fasta,
                chunk_size,
                chunk_size_tolerated,
                out_file_joined,
                individual_file_prefix,
                n_sequence_len=n_sequence_len,
                chunk_sfx=chunk_sfx,
                append_offset_to_chunk_name=append_offset_to_chunk_name,
            )

        # dump AGP
        if agp_output_file:
            with open(agp_output_file, "w", encoding="utf-8") as agp_out:
                agp_out.write("\n".join(agp_lines) + "\n")

chunk_fasta_stream(input_fasta, chunk_size, chunk_size_tolerated, output_fasta, individual_file_prefix, *, n_sequence_len=0, chunk_sfx='ens_chunk', append_offset_to_chunk_name=None, open_individual=_individual_file_opener)

Split input TextIOWrapper stream with fasta into a smaller chunks based on stretches of "N"s and then based on chunk_size_tolerated and store either to the output_fasta stream (if valid) or to the files created by invocation of the open_individual callable.

Parameters:

Name Type Description Default
input_fasta TextIOWrapper

Input FASTA as the TextIOWrapper stream.

required
chunk_size int

Size of the chunks to split into.

required
chunk_size_tolerated int

If more flexibility allowed, use this as the maximum size of a chunk.

required
output_fasta Optional[TextIOWrapper] | nullcontext[Any]

Output FASTA TextIOWrapper stream to store the chunks into, if none or False, open_individual is used (see below).

required
individual_file_prefix Optional[Path]

A file path prefix including dirs and filenames part to use as a first part of the chunk file name.

required
n_sequence_len int

Length of the stretch of Ns to split at; not slitting on Ns if 0.

0
chunk_sfx str

A string to put between the original sequence name and the chunk suffix.

'ens_chunk'
append_offset_to_chunk_name Optional[bool]

A flag to append 0-based offset (_off_{offset}) to the chunk name.

None
open_individual Callable[[str], ContextManager[Any]]

A callable taking filename as an input to generate the output file for individual contig if out_put FASTA is false of None, folders should be preexisting.

_individual_file_opener
Source code in src/python/ensembl/io/genomio/fasta/chunk.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def chunk_fasta_stream(
    input_fasta: TextIOWrapper,
    chunk_size: int,
    chunk_size_tolerated: int,
    output_fasta: Optional[TextIOWrapper] | nullcontext[Any],
    individual_file_prefix: Optional[Path],
    *,
    n_sequence_len: int = 0,
    chunk_sfx: str = "ens_chunk",
    append_offset_to_chunk_name: Optional[bool] = None,
    open_individual: Callable[[str], ContextManager[Any]] = _individual_file_opener,
) -> list[str]:
    """Split input TextIOWrapper stream with fasta into a smaller chunks based on
    stretches of "N"s and then based on chunk_size_tolerated and store either to
    the output_fasta stream (if valid) or to the files created by
    invocation of the `open_individual` callable.

    Args:
        input_fasta: Input FASTA as the TextIOWrapper stream.
        chunk_size: Size of the chunks to split into.
        chunk_size_tolerated: If more flexibility allowed, use this as the maximum size of a chunk.
        output_fasta: Output FASTA TextIOWrapper stream to store the chunks into,
                if none or False, `open_individual` is used (see below).
        individual_file_prefix: A file path prefix including dirs and filenames part to use as a
                first part of the chunk file name.
        n_sequence_len: Length of the stretch of `N`s to split at; not slitting on `N`s if 0.
        chunk_sfx: A string to put between the original sequence name and the chunk suffix.
        append_offset_to_chunk_name: A flag to append 0-based offset (`_off_{offset}`) to the chunk name.
        open_individual: A callable taking filename as an input to generate the output file for
                individual contig if out_put FASTA is `false` of `None`, folders should be preexisting.
    """

    chunk_size_tolerated = max(chunk_size, chunk_size_tolerated)
    # output agp_lines list
    agp_lines = []

    # make sure not used for n_seq <= 0
    n_split_regex = None
    if n_sequence_len > 0:
        pattern = f"(N{{{n_sequence_len},}})"
        n_split_regex = re.compile(pattern)

    # process stream
    fasta_parser = SeqIO.parse(input_fasta, "fasta")
    for rec_count, rec in enumerate(fasta_parser, start=1):
        rec_name = str(rec.name)

        ends = split_seq_by_n(str(rec.seq), n_split_regex)
        ends = split_seq_by_chunk_size(ends, chunk_size, chunk_size_tolerated)

        offset = 0
        for chunk, chunk_end in enumerate(ends, start=1):
            chunk_name = f"{rec_name}_{chunk_sfx}_{chunk:03d}"
            chunk_file_name = ""
            if individual_file_prefix:
                chunk_file_name = f"{individual_file_prefix}.{rec_count:03d}.{chunk:03d}.fa"
            if append_offset_to_chunk_name:
                chunk_name += f"_off_{offset}"

            rec_from = offset + 1
            rec_to = chunk_end
            chunk_len = chunk_end - offset

            # form agp lines
            agp_line = f"{rec_name}\t{rec_from}\t{rec_to}\t{chunk}\tW\t{chunk_name}\t1\t{chunk_len}\t+"
            agp_lines.append(agp_line)

            # use agp lines as fasta description
            agp_line = agp_line.replace("\t", " ")
            logging.info(f"Dumping {chunk_name} AGP {agp_line}")

            # get slice and put it out
            tmp_record = SeqRecord(
                Seq(rec.seq[offset:chunk_end]),
                id=chunk_name,
                description=f"AGP {agp_line}",
                name="",
            )

            # if user specified chunks -- store each chunk in an individual output file
            with output_fasta and nullcontext(output_fasta) or open_individual(chunk_file_name) as out_file:
                out_file.write(tmp_record.format("fasta"))  # type: ignore[union-attr]

            del tmp_record
            offset = chunk_end

    return agp_lines

get_tolerated_size(size, tolerance)

Calculate max tolerated size of the chunk based on initial size and percent of allowed deviation.

Parameters:

Name Type Description Default
size int

Base chunk size

required
tolerance int

Percent of allowed deviance as integer.

required

Returns:

Type Description
int

Maximum tolerated chunk size.

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def get_tolerated_size(size: int, tolerance: int) -> int:
    """Calculate max tolerated size of the chunk based on initial size and percent of allowed deviation.

    Args:
        size: Base chunk size
        tolerance: Percent of allowed deviance as integer.

    Returns:
        Maximum tolerated chunk size.
    """
    tolerance = max(tolerance, 0)

    tolerated_size = size
    tolerated_size += size * tolerance // 100
    return tolerated_size

main()

Module entry-point.

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def main() -> None:
    """Module entry-point."""
    parser = ArgumentParser(description=__doc__)
    parser.add_argument_src_path(
        "--fasta_dna",
        required=True,
        metavar="input[.fa | .gz]",
        help="Raw or compressed (.gz) FASTA file with DNA sequences to be split",
    )
    parser.add_argument(
        "--out",
        required=False,
        default="chunks.fna",
        help="Chunks output file",
    )
    parser.add_argument(
        "--individual_out_dir",
        required=False,
        default=None,
        type=Path,
        help="Output directory for writing files with individual chunks to. \
            If provided,`--out` value used as a filename prefix",
    )
    parser.add_argument_dst_path(
        "--agp_output_file",
        required=False,
        default=None,
        help="AGP file with chunks to contigs mapping.",
    )
    parser.add_argument(
        "--chunk_size",
        required=False,
        default=100_000_000,
        metavar="100_000_000",
        type=int,
        help="Maximum chunk size (should be greater then 50k).",
    )
    parser.add_argument(
        "--chunk_sfx",
        required=False,
        default="ens_chunk",
        type=str,
        help="Added to contig ID before chunk number.",
    )
    parser.add_argument(
        "--chunk_tolerance",
        required=False,
        default=0,
        type=int,
        help="Chunk size tolerance percentage. If the to-be-written chunk is longer \
            than the defined chunk size by less than the specified tolerance percentage,\
            it will not be split.",
    )
    parser.add_argument(
        "--n_seq",
        required=False,
        default=0,
        type=int,
        help="Split into chunks at positions of at least this number of N characters.",
    )
    parser.add_argument(
        "--add_offset",
        required=False,
        action="store_true",
        help="Append zero-based offset to chunk name ('_off_{offset}').",
    )
    parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
    parser.add_log_arguments(add_log_file=True)
    args = parser.parse_args()
    init_logging_with_args(args)

    check_chunk_size_and_tolerance(args.chunk_size, args.chunk_tolerance, error_f=parser.error)

    chunk_size_tolerated = get_tolerated_size(args.chunk_size, args.chunk_tolerance)

    file_prefix = prepare_out_dir_for_individuals(args.individual_out_dir, args.out or args.fasta_dna)

    chunk_fasta(
        args.fasta_dna,
        args.chunk_size,
        chunk_size_tolerated,
        args.out,
        individual_file_prefix=file_prefix,
        agp_output_file=args.agp_output_file,
        n_sequence_len=args.n_seq,
        chunk_sfx=args.chunk_sfx,
        append_offset_to_chunk_name=args.add_offset,
    )

prepare_out_dir_for_individuals(dir_name, file_part)

Creates dir_name (including upstream dirs) and returns its paths with the file_part appended.

Parameters:

Name Type Description Default
dir_name Path

Directory to create.

required
file_part str

File part to append.

required

Returns:

Type Description
Optional[Path]

dir_name with appended file_part.

Throws

exception if not able to create directory.

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def prepare_out_dir_for_individuals(dir_name: Path, file_part: str) -> Optional[Path]:
    """Creates `dir_name` (including upstream dirs) and returns its paths with the `file_part` appended.

    Args:
        dir_name: Directory to create.
        file_part: File part to append.

    Returns:
        `dir_name` with appended `file_part`.

    Throws:
        exception if not able to create directory.
    """
    file_prefix = None
    if dir_name:
        dir_name.mkdir(parents=True, exist_ok=True)
        file_prefix = Path(dir_name, file_part)
    return file_prefix

split_seq_by_chunk_size(ends, chunk_size, tolerated_size=None)

Split list of end coordinates, to form chunks not longer then chunk_size.

Parameters:

Name Type Description Default
ends list[int]

List of one or more chunk(s) to split a sequence.

required
chunk_size int

Size of chunks to split a sequence into.

required
tolerated_size Optional[int]

Threshold to use instead of chunk_size to determine when to split a sequence.

None

Returns:

Type Description
list[int]

List with open coordinates of the chunks ends (or with only a single sequence length).

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def split_seq_by_chunk_size(
    ends: list[int], chunk_size: int, tolerated_size: Optional[int] = None
) -> list[int]:
    """Split list of end coordinates, to form chunks not longer then
    chunk_size.

    Args:
        ends: List of one or more chunk(s) to split a sequence.
        chunk_size: Size of chunks to split a sequence into.
        tolerated_size: Threshold to use instead of `chunk_size` to determine when to split a sequence.

    Returns:
        List with open coordinates of the chunks ends (or with only a single sequence length).
    """
    if not ends or chunk_size < 1:
        return ends

    if tolerated_size is None or tolerated_size < chunk_size:
        tolerated_size = chunk_size
    result = []
    offset = 0
    for chunk_end in ends:
        chunk_len = chunk_end - offset
        if chunk_len > tolerated_size:
            # exclude starts, as they are 0 or pushed as previous chunk_ends
            result += list(range(offset, chunk_end, chunk_size))[1:]
        result.append(chunk_end)
        offset = chunk_end
    return result

split_seq_by_n(seq, split_pattern)

Split a string into chunks at the positions where the pattern is found. Ns (pattern) are appended to the chunk on the left.

The end point of each chunk will correspond to the end of the matching part.

Parameters:

Name Type Description Default
seq str

Sequence to be split into chunks.

required
split_pattern Optional[Pattern]

Pattern to search in the sequence.

required

Returns:

Type Description
list[int]

List with open coordinates of the chunks ends (or with only a single sequence length).

Source code in src/python/ensembl/io/genomio/fasta/chunk.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def split_seq_by_n(seq: str, split_pattern: Optional[re.Pattern]) -> list[int]:
    """Split a string into chunks at the positions where the
    pattern is found. `N`s (pattern) are appended to the chunk on the left.

    The end point of each chunk will correspond to the end
    of the matching part.

    Args:
        seq: Sequence to be split into chunks.
        split_pattern: Pattern to search in the sequence.

    Returns:
        List with open coordinates of the chunks ends (or with only a single sequence length).
    """
    seq_len = len(seq)
    if not split_pattern:
        return [seq_len]
    split_points = [m.end() for m in split_pattern.finditer(seq)]
    if not split_points or split_points[-1] != seq_len:
        split_points.append(seq_len)
    return split_points