Source code for ensembl.tools.anno.repeat_annotation.trf

# See the NOTICE file distributed with this work for additional information #pylint: disable=missing-module-docstring
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
    Tandem Repeats Finder is a program to locate and display tandem repeats in DNA sequences.
    Benson G. Tandem repeats finder: a program to analyze DNA sequences.
    Nucleic Acids Res. 1999; 27(2):573–580. doi:10.1093/nar/27.2.573
"""
__all__ = ["run_trf"]

import argparse
import logging
import logging.config
import multiprocessing
import os
from os import PathLike
from pathlib import Path
import re
import subprocess
import tempfile
from typing import List


from ensembl.tools.anno.utils._utils import (
    check_exe,
    create_dir,
    check_gtf_content,
    get_seq_region_length,
    get_slice_id,
    slice_output_to_gtf,
    get_sequence,
)

logger = logging.getLogger(__name__)


[docs]def run_trf( genome_file: PathLike, output_dir: Path, num_threads: int = 1, trf_bin: Path = Path("trf"), match_score: int = 2, mismatch_score: int = 5, delta: int = 7, pm: int = 80, pi: int = 10, minscore: int = 40, maxperiod: int = 500, ) -> None: """ Executes TRF on genomic slices :param genome_file: Genome file path. :type genome_file: PathLike :param output_dir: Working directory path. :type output_dir: Path :param num_threads: int, number of threads. :type num_threads: int, default 1 :param trf_bin: TRF software path. :type trf_bin: Path, default trf :param match_score: Matching weight. :type match_score: int, default 2 :param mismatch_score: Mismatching penalty. :type mismatch_score: int, default 5 :param delta: Indel penalty. :type delta: int, default 7 :param pm: Match probability (whole number). :type pm: int, default 80 :param pi: Indel probability (whole number). :type pi: int, default 10 :param minscore: Minimum alignment score to report. :type minscore: int, default 40 :param maxperiod: Maximum period size to report. :type maxperiod: int, default 500 :return: None :rtype: None """ check_exe(trf_bin) trf_dir = create_dir(output_dir, "trf_output") os.chdir(str(trf_dir)) output_file = trf_dir / "annotation.gtf" if output_file.exists(): transcript_count = check_gtf_content(output_file, "repeat") if transcript_count > 0: logger.info("Trf gtf file exists, skipping analysis") return logger.info("Creating list of genomic slices") seq_region_to_length = get_seq_region_length(genome_file, 5000) slice_ids_per_region = get_slice_id( seq_region_to_length, slice_size=1000000, overlap=0, min_length=5000 ) trf_output_extension = ( f".{match_score}.{mismatch_score}.{delta}." f"{pm}.{pi}.{minscore}.{maxperiod}.dat" ) trf_cmd = [ trf_bin, None, str(match_score), str(mismatch_score), str(delta), str(pm), str(pi), str(minscore), str(maxperiod), "-d", "-h", ] logger.info("Running TRF") pool = multiprocessing.Pool(num_threads)#pylint:disable=consider-using-with for slice_id in slice_ids_per_region: pool.apply_async( _multiprocess_trf, args=( trf_cmd, slice_id, trf_dir, trf_output_extension, genome_file, ), ) pool.close() pool.join() slice_output_to_gtf(trf_dir, "repeat_id", "trf", True, ".trf.gtf") for gtf_file in trf_dir.glob("*.trf.gtf"): gtf_file.unlink()
def _multiprocess_trf( trf_cmd: List[str], slice_id: List[str], trf_dir: Path, trf_output_extension: Path, genome_file:Path, ) -> None: """ Run TRF on multiprocess on genomic slices Args: trf_cmd: TRF command to execute. slice_id: Slice Id to run TRF on. trf_dir : TRF output dir. trf_output_extension: TRF file output extension. genome_file : Genome file. """ region_name, start, end = slice_id logger.info( "Processing slice to find tandem repeats with TRF:%s:%s:%s", region_name, start, end, ) seq = get_sequence(region_name, int(start), int(end), 1, genome_file, trf_dir) slice_name = f"{region_name}.rs{start}.re{end}" with tempfile.TemporaryDirectory(dir=trf_dir) as tmpdirname: slice_file = trf_dir / tmpdirname / f"{slice_name}.fa" with open(slice_file, "w+", encoding="utf8") as region_out: region_out.write(f">{region_name}\n{seq}\n") region_results = trf_dir / f"{slice_name}.trf.gtf" # TRF writes to the current dir, so swtich to the output dir for it # os.chdir(str(trf_output_dir)) output_file = Path(f"{slice_file}{trf_output_extension}") trf_cmd = trf_cmd.copy() trf_cmd[1] = str(slice_file) logger.info("trf_cmd: %s", trf_cmd) # with open(trf_output_file_path, "w+") as trf_out: subprocess.run(trf_cmd, cwd=trf_dir / tmpdirname)#pylint:disable=subprocess-run-check _create_trf_gtf(output_file, region_results, region_name) slice_file.unlink() output_file.unlink() def _create_trf_gtf( output_file: Path, region_results: Path, region_name: str, ) -> None: """ Read the fasta file and save the content in gtf format TRF output format: cols 1+2: Indices of the repeat relative to the start of the sequence col 3: Period size of the repeat col 4: Number of copies aligned with the consensus pattern col 5: Size of consensus pattern (may differ slightly from the period size) col 6: Percent of matches between adjacent copies overall col 7: Percent of indels between adjacent copies overall col 8: Alignment score cols 9-12: Percent composition for each of the four nucleotides col 13: Entropy measure based on percent composition col 14: Consensus sequence col 15: Repeat sequence Args: output_file : GTF file with final results. region_results : GTF file with results per region. region_name : Coordinates of genomic slice. """ with open(output_file, "r", encoding="utf8") as trf_in, open( region_results, "w+", encoding="utf8" ) as trf_out: repeat_count = 1 for line in trf_in: result_match = re.search(r"^\d+", line) if result_match: results = line.split() if len(results) != 15: continue start = results[0] end = results[1] period = float(results[2]) copy_number = float(results[3]) percent_matches = float(results[5]) score = float(results[7]) repeat_consensus = results[13] if ( # pylint: disable=too-many-boolean-expressions score < 50 and percent_matches >= 80 and copy_number > 2 and period < 10 ) or (copy_number >= 2 and percent_matches >= 70 and score >= 50): gtf_line = ( f"{region_name}\tTRF\trepeat\t{start}\t{end}\t.\t+\t.\t" f'repeat_id "{repeat_count}"; score "{score}"; ' f'repeat_consensus "{repeat_consensus}";\n' ) trf_out.write(gtf_line) repeat_count += 1 def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="TRF's arguments") parser.add_argument("--genome_file", required=True, help="Genome file path") parser.add_argument("--output_dir", required=True, help="Output directory path") parser.add_argument("--trf_bin", default="trf", help="TRF executable path") parser.add_argument("--match_score", type=int, default=2, help="Matching weight") parser.add_argument("--mismatch_score", type=int, default=5, help="Mismatching penalty") parser.add_argument("--delta", type=int, default=7, help="Indel penalty") parser.add_argument("--pm", type=int, default=80, help="Match probability") parser.add_argument("--pi", type=int, default=10, help="Indel probability") parser.add_argument("--minscore", type=int, default=40, help="Minimum alignment score to report") parser.add_argument("--maxperiod", type=int, default=500, help="Maximum period size to report") parser.add_argument("--num_threads", type=int, default=1, help="Number of threads") return parser.parse_args() def main(): """TRF's entry-point.""" args = parse_args() log_file_path = create_dir(args.output_dir, "log") / "trf.log" loginipath = Path(__file__).parents[6] / "conf" / "logging.conf" logging.config.fileConfig( loginipath, defaults={"logfilename": str(log_file_path)}, disable_existing_loggers=False, ) run_trf( args.genome_file, args.output_dir, args.num_threads, args.trf_bin, args.match_score, args.mismatch_score, args.delta, args.pm, args.pi, args.minscore, args.maxperiod, ) if __name__ == "__main__": main()