Source code for ensembl.tools.anno.repeat_annotation.dust

# See the NOTICE file distributed with this work for additional information #pylint: disable=missing-module-docstring
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DustMasker is a program that identifies and masks out low complexity
parts of a genome using a new and improved DUST algorithm.

Morgulis A, Gertz EM, Schaffer AA, Agarwala R. A Fast and Symmetric
DUST Implementation to Mask Low-Complexity DNA Sequences.
"""
__all__ = ["run_dust"]

import logging
import logging.config
import multiprocessing
import os
from os import PathLike
from pathlib import Path
import re
import subprocess
import tempfile
from typing import List
import argparse


from ensembl.tools.anno.utils._utils import (
    check_exe,
    create_dir,
    check_gtf_content,
    get_seq_region_length,
    get_slice_id,
    slice_output_to_gtf,
    get_sequence,
)

logger = logging.getLogger(__name__)


[docs]def run_dust( genome_file: PathLike, output_dir: Path, dust_bin: Path = Path("dustmasker"), num_threads: int = 1, ) -> None: """ Run Dust on genomic slices with mutiprocessing :param genome_file: Genome file path. :type genome_file: PathLike :param output_dir: Working directory path. :type output_dir: Path :param dust_bin: Dust software path. :type dust_bin: Path, default dustmasker :param num_threads: Number of threads. :type num_threads: int, default 1 :return: None :rtype: None """ check_exe(dust_bin) dust_dir = create_dir(output_dir, "dust_output") os.chdir(str(dust_dir)) output_file = dust_dir / "annotation.gtf" if output_file.exists(): transcript_count = check_gtf_content(output_file, "repeat") if transcript_count > 0: logger.info("Dust gtf file exists, skipping analysis") return logger.info("Creating list of genomic slices") seq_region_to_length = get_seq_region_length(genome_file, 5000) slice_ids_per_region = get_slice_id(seq_region_to_length, slice_size=1000000, overlap=0, min_length=5000) dust_cmd = [dust_bin, "-in"] pool = multiprocessing.Pool(num_threads) # pylint: disable=consider-using-with for slice_id in slice_ids_per_region: pool.apply_async( _multiprocess_dust, args=( dust_cmd, slice_id, dust_dir, genome_file, ), ) pool.close() pool.join() slice_output_to_gtf(dust_dir, "repeat_id", "dust", True, ".dust.gtf") for gtf_file in dust_dir.glob("*.dust.gtf"): gtf_file.unlink()
def _multiprocess_dust( # pylint: disable=too-many-locals dust_cmd: List[str], slice_id: List[str], dust_dir: Path, genome_file: Path, ) -> None: """ Run Dust on multiprocess on genomic slices Args: dust_cmd: Dust command to execute. slice_id: List of slice IDs. dust_dir : Dust output directory path. genome_file : Genome file. """ region_name, start, end = slice_id logger.info( "Processing slice to find low complexity regions with Dust: %s:%s:%s", region_name, start, end, ) seq = get_sequence(region_name, int(start), int(end), 1, genome_file, dust_dir) slice_name = f"{region_name}.rs{start}.re{end}" with tempfile.TemporaryDirectory(dir=dust_dir) as tmpdirname: slice_file = dust_dir / tmpdirname / f"{slice_name}.fa" with open(slice_file, "w+", encoding="utf8") as region_out: region_out.write(f">{region_name}\n{seq}\n") region_results = dust_dir / f"{slice_name}.dust.gtf" output_file = Path(f"{slice_file}.dust") dust_cmd.append(str(slice_file)) logger.info("dust_cmd: %s", dust_cmd) with open(output_file, "w+", encoding="utf8") as dust_out: subprocess.run(dust_cmd, stdout=dust_out, check=True) _create_dust_gtf(output_file, region_results, region_name) slice_file.unlink() output_file.unlink() def _create_dust_gtf( output_file: Path, region_results: Path, region_name: str, ) -> None: """ Read the fasta file and save the content in gtf format All the genomic slices are collected in a single gtf output Args: output_file : GTF file with final results. region_results : GTF file with the results per region. region_name :Coordinates of genomic slice. """ with open(output_file, "r", encoding="utf8") as dust_in, open( region_results, "w+", encoding="utf8" ) as dust_out: repeat_count = 1 for line in dust_in: result_match = re.search(r"(\d+)\ - (\d+)", line) if result_match: start = int(result_match.group(1)) + 1 end = int(result_match.group(2)) + 1 gtf_line = ( f"{region_name}\tDust\trepeat\t{start}\t" f'{end}\t.\t+\t.\trepeat_id "{repeat_count}";\n' ) dust_out.write(gtf_line) repeat_count += 1 def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="DustMasker arguments") parser.add_argument("--genome_file", required=True, help="Genome file path") parser.add_argument("--output_dir", required=True, help="Output directory path") parser.add_argument( "--dust_bin", default="dustmasker", help="Dust executable path", ) parser.add_argument( "--num_threads", type=int, default=1, help="Number of threads" ) return parser.parse_args() def main(): """Dust's entry-point.""" args = parse_args() log_file_path = create_dir(args.output_dir, "log") / "dust.log" loginipath = Path(__file__).parents[6] / "conf" / "logging.conf" logging.config.fileConfig( loginipath, defaults={"logfilename": str(log_file_path)}, disable_existing_loggers=False, ) run_dust( args.genome_file, args.output_dir, args.dust_bin, args.num_threads, ) if __name__ == "__main__": main()