Source code for ensembl.tools.anno.snc_rna_annotation.trnascan

# See the NOTICE file distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
tRNAscan-SE identifies 99-100% of transfer RNA genes in DNA sequence while
giving less than one false positive per 15 gigabases.
Lowe TM, Eddy SR: tRNAscan-SE: a program for improved detection of transfer
RNA genes in genomic sequence.
Nucleic Acids Res. 1997, 25(5):955-64. [PMID: 9023104]
"""
__all__ = ["run_trnascan"]

import argparse
import logging
import logging.config
import multiprocessing
from os import PathLike
from pathlib import Path
import re
import subprocess
from typing import List

from ensembl.tools.anno.utils._utils import (
    check_exe,
    check_file,
    create_dir,
    check_gtf_content,
    get_seq_region_length,
    get_slice_id,
    slice_output_to_gtf,
    get_sequence,
)

logger = logging.getLogger(__name__)


[docs]def run_trnascan( genome_file: PathLike, output_dir: Path, trnascan_bin: Path = Path("tRNAscan-SE"), trnascan_filter: Path = Path("EukHighConfidenceFilter"), num_threads: int = 1, ) -> None: """ Executes tRNAscan-SE on genomic slices :param genome_file: Genome file path. :type genome_file: PathLike :param output_dir: working directory path. :type output_dir: Path :param trnascan_bin: tRNAscan-SE software path. :type trnascan_bin: Path, default tRNAscan-SE :param trnascan_filter: tRNAscan-SE filter set path. :type trnascan_filter: Path, default EukHighConfidenceFilter :param num_threads: int, number of threads. :type num_threads: int, default 1 :return: None :rtype: None """ check_exe(trnascan_bin) check_file(trnascan_filter) trnascan_dir = create_dir(output_dir, "trnascan_output") output_file = trnascan_dir / "annotation.gtf" if output_file.exists(): transcript_count = check_gtf_content(output_file, "transcript") if transcript_count > 0: logger.info("Trnascan gtf file exists, skipping analysis") return logger.info("Creating list of genomic slices") seq_region_to_length = get_seq_region_length(genome_file, 5000) slice_ids_per_region = get_slice_id(seq_region_to_length, 1000000, 0, 5000) trnascan_cmd = [ str(trnascan_bin), None, "-o", None, "-f", None, "-H", # show both primary and secondary structure components to covariance model bit scores "-q", # quiet mode "--detail", "-Q", ] logger.info("Running tRNAscan-SE") pool = multiprocessing.Pool(num_threads) # pylint: disable=consider-using-with for slice_id in slice_ids_per_region: pool.apply_async( _multiprocess_trnascan, args=( trnascan_cmd, slice_id, genome_file, trnascan_filter, trnascan_dir, ), ) pool.close() pool.join() slice_output_to_gtf(output_dir=trnascan_dir, unique_ids=True, file_extension=".trna.gtf") for gtf_file in trnascan_dir.glob("*.trna.gtf"): gtf_file.unlink()
def _multiprocess_trnascan( trnascan_cmd: List[str], slice_id: List[str], genome_file: Path, trnascan_filter: Path, trnascan_dir: Path, ) -> None: """ Run tRNAscan-SE on multiprocess on genomic slices Args: trnascan_cmd: tRNAscan-SE command to execute. slice_id: Slice Id to run tRNAscan-SE on. genome_file : Genome file. trnascan_dir : tRNAscan-SE output dir. trnascan_filter: tRNAscan-SE filter set. """ region_name, start, end = slice_id logger.info( "Processing slice to find tRNAs using tRNAscan-SE:%s:%s:%s", region_name, start, end, ) seq = get_sequence(region_name, int(start), int(end), 1, genome_file, trnascan_dir) slice_name = f"{region_name}.rs{start}.re{end}" slice_file = trnascan_dir / f"{slice_name}.fa" with open(slice_file, "w+", encoding="utf8") as region_out: region_out.write(f">{region_name}\n{seq}\n") # trnscan output region_results = trnascan_dir / f"{slice_name}.trna.gtf" output_file = Path(f"{slice_file}.trna") ss_output_file = Path(f"{output_file}.ss") # filtering filter_prefix_file = f"{slice_name}.filt" filter_output_file = trnascan_dir / f"{filter_prefix_file}.out" filter_log_file = trnascan_dir / f"{filter_prefix_file}.log" filter_ss_file = trnascan_dir / f"{filter_prefix_file}.ss" # trnascan_cmd = generic_trnascan_cmd.copy() trnascan_cmd[1], trnascan_cmd[3], trnascan_cmd[5] = ( str(slice_file), str(output_file), str(ss_output_file), ) logger.info("tRNAscan-SE command: %s", " ".join(trnascan_cmd)) subprocess.run(trnascan_cmd, check=True) # If the trnascan output is empty there is no need to go on with filtering if output_file.stat().st_size == 0: output_file.unlink() slice_file.unlink() ss_output_file.unlink(missing_ok=True) return filter_cmd = [ str(trnascan_filter), "--result", # tRNAscan-SE output file used as input str(output_file), "--ss", # tRNAscan-SE secondary structure file used as input str(ss_output_file), "--output", str(trnascan_dir), "--prefix", str(filter_prefix_file), ] logger.info("tRNAscan-SE filter command: %s", " ".join(str(item) for item in filter_cmd)) subprocess.run(filter_cmd) # pylint:disable=subprocess-run-check _create_trnascan_gtf(region_results, filter_output_file, region_name) output_file.unlink(missing_ok=True) slice_file.unlink(missing_ok=True) ss_output_file.unlink(missing_ok=True) Path(filter_prefix_file).unlink(missing_ok=True) filter_log_file.unlink(missing_ok=True) filter_ss_file.unlink(missing_ok=True) filter_output_file.unlink(missing_ok=True) def _create_trnascan_gtf(region_results: Path, filter_output_file: Path, region_name: str) -> None: """ Read the fasta file and save the content in gtf format All the genomic slices are collected in a single gtf output Args: region_results : GTF file with the results per region. filter_file : GTF file with the filtered results per region. region_name :Coordinates of genomic slice. tRNAscan-SE output format: col0: GtRNAdb Gene Symbol - gene ID in corresponding genome col1: tRNAscan-SE ID - tRNA ID in tRNAscan-SE prediction results col2-3: Locus - Genomic coordinates of predicted gene col4: Isotype (from Anticodon) - tRNA isotype determined by anticodon col5: Anticodon - anticodon of predicted tRNA gene col6-7: Intron boundaries col8: General tRNA Model Score - covariance model bit score from tRNAscan-SE results col9: Best Isotype Model - best matching (highest scoring) isotype determined by isotype-specific covariance model classification col10-11-12: Anticodon and Isotype Model Agreement - consistency between anticodon from predicted gene sequence and best isotype model col13: Features - special gene features that may include gene set categorization, number of introns, possible pseudogenes, possible truncation, or base-pair mismatches """ with open(filter_output_file, "r", encoding="utf8") as trna_in, open( region_results, "w+", encoding="utf8" ) as trna_out: gene_counter = 1 for line in trna_in: result_match = re.search(r"^" + region_name, line) if result_match: results = line.split() start = int(results[2]) end = int(results[3]) strand = "+" if start > end: strand = "-" start, end = end, start biotype = "tRNA" if re.search(r"high confidence set", line) else "tRNA_pseudogene" transcript_string = ( f"{region_name}\ttRNAscan\ttranscript\t{start}\t{end}\t.\t" f'{strand}\t.\tgene_id "{gene_counter}"; transcript_id ' f'"{gene_counter}"; biotype "{biotype}";\n' ) exon_string = ( f"{region_name}\ttRNAscan\texon\t{start}\t{end}\t.\t" f'{strand}\t.\tgene_id "{gene_counter}"; transcript_id ' f'"{gene_counter}"; exon_number "1"; biotype "{biotype}";\n' ) trna_out.write(transcript_string) trna_out.write(exon_string) trna_out.flush() gene_counter += 1 def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="tRNAscan-SE's arguments") parser.add_argument("--genome_file", required=True, help="Genome file path") parser.add_argument("--trnascan_bin", default="tRNAscan-SE", help="tRNAscan-SE executable path") parser.add_argument( "--trnascan_filter", default="/hps/software/users/ensembl/ensw/C8-MAR21-sandybridge/linuxbrew/bin/EukHighConfidenceFilter", help="tRNAscan-SE filter path", ) parser.add_argument("--output_dir", required=True, help="Output directory path") parser.add_argument("--num_threads", type=int, default=1, help="Number of threads") return parser.parse_args() def main(): """tRNAscan-SE's entry-point.""" args = parse_args() log_file_path = create_dir(args.output_dir, "log") / "trnascan.log" loginipath = Path(__file__).parents[6] / "conf" / "logging.conf" logging.config.fileConfig( loginipath, defaults={"logfilename": str(log_file_path)}, disable_existing_loggers=False, ) run_trnascan( args.genome_file, args.output_dir, args.trnascan_bin, Path(args.trnascan_filter), args.num_threads, ) if __name__ == "__main__": main()