# See the NOTICE file distributed with this work for additional information #pylint: disable=missing-module-docstring
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Infernal and its "cmsearch" tool are used for detecting sncRNAs in sequence databases.
sncRNA diversity: Small non-coding RNAs (sncRNAs) constitute a diverse group of RNA
molecules that play critical roles in various cellular processes, including gene regulation,
RNA interference, and post-transcriptional modifications. There are different types of sncRNAs,
such as microRNAs (miRNAs), small interfering RNAs (siRNAs), and small nucleolar RNAs (snoRNAs).
Despite their small size, many sncRNAs exhibit conserved structural features or sequence motifs
across species, essential to identify and study them.
Covariance models (CMs) can represent conserved RNA secondary structures as well as conserved
sequence patterns. This makes them well-suited for detecting sncRNAs in sequence databases.
Nawrocki, E. P., Kolbe, D. L., & Eddy, S. R. (2009). Infernal 1.0: inference of RNA alignments.
Bioinformatics, 25(10), 1335-1337.
"""
__all__ = ["run_cmsearch"]
import argparse
import logging
import logging.config
import multiprocessing
import os
from os import PathLike
from pathlib import Path
import re
import subprocess
import tempfile
from typing import List, Dict, Any, Union
import psutil
from ensembl.tools.anno.utils._utils import (
check_exe,
create_dir,
check_gtf_content,
get_seq_region_length,
get_slice_id,
slice_output_to_gtf,
get_sequence,
)
logger = logging.getLogger(__name__)
[docs]def run_cmsearch(
genome_file: PathLike,
output_dir: Path,
rfam_accession_file: Path,
rfam_cm_db: Path = Path("/hps/nobackup/flicek/ensembl/genebuild/blastdb/ncrna/Rfam_14.0/Rfam.cm"),
rfam_seeds_file: Path = Path("/hps/nobackup/flicek/ensembl/genebuild/blastdb/ncrna/Rfam_14.0/Rfam.seed"),
cmsearch_bin: Path = Path("cmsearch"),
rnafold_bin: Path = Path("RNAfold"),
num_threads: int = 1,
) -> None:
"""
Search CM(s) against a Rfam database
:param genome_file: Genome file path.
:type genome_file: PathLike
:param output_dir: Working directory path.
:type output_dir: Path
:param rfam_accessions: List of Rfam accessions.
:type rfam_accessions: Path
:param rfam_cm_db: Rfam database with cm models.
:type rfam_cm_db: Path
:param rfam_seed: Rfam seeds file.
:type rfam_seed: Path
:param cmsearch_bin: cmsearch software path.
:type cmsearch_bin: Path
:param rnafold_bin: RNAfold software path.
:type rnafold_bin: Path
:param num_threads: Number of threads.
:type num_threads: int
:return: None
:rtype: None
"""
check_exe(cmsearch_bin)
rfam_dir = create_dir(output_dir, "rfam_output")
os.chdir(rfam_dir)
output_file = rfam_dir / "annotation.gtf"
if output_file.exists():
transcript_count = check_gtf_content(output_file, "transcript")
if transcript_count > 0:
logger.info("Cmsearch gtf file exists, skipping analysis")
return
else:
logger.info("No gtf file, go on with the analysis")
rfam_selected_models_file = rfam_dir / "rfam_models.cm"
with open(rfam_accession_file) as rfam_accessions_in:
rfam_accessions = rfam_accessions_in.read().splitlines()
with open(rfam_cm_db, "r") as rfam_cm_in:
rfam_data = rfam_cm_in.read()
rfam_models = rfam_data.split("//\n") # get a chunck containing a model
with open(rfam_selected_models_file, "w+") as rfam_cm_out:
for model in rfam_models:
# The Rfam.cm file has INFERNAL and HMMR models, both are needed at this point
# Later we just want the INFERNAL ones for looking at thresholds
match = re.search(r"(RF\d+)", model)
if match:
model_accession = match.group(1)
if model_accession in rfam_accessions:
rfam_cm_out.write(model + "//\n")
seed_descriptions = _get_rfam_seed_descriptions(rfam_seeds_file)
cm_models = _extract_rfam_metrics(rfam_selected_models_file)
logger.info("Creating list of genomic slices")
seq_region_to_length = get_seq_region_length(genome_file, 5000)
slice_ids_per_region = get_slice_id(seq_region_to_length, slice_size=100000, overlap=0, min_length=5000)
cmsearch_cmd = [
str(cmsearch_bin),
"--rfam",
"--cpu",
str(num_threads),
"--nohmmonly",
"--cut_ga",
"--tblout",
]
logger.info("Running Rfam")
pool = multiprocessing.Pool(int(num_threads)) # pylint: disable=consider-using-with
for slice_id in slice_ids_per_region:
pool.apply_async(
_multiprocess_cmsearch,
args=(
cmsearch_cmd,
slice_id,
genome_file,
rfam_dir,
rfam_selected_models_file,
cm_models,
seed_descriptions,
rnafold_bin,
),
)
pool.close()
pool.join()
# Retry failed runs
_handle_failed_jobs(
rfam_dir,
cmsearch_cmd,
genome_file,
rfam_selected_models_file,
cm_models,
seed_descriptions,
)
slice_output_to_gtf(output_dir=rfam_dir, unique_ids=True, file_extension=".rfam.gtf")
for gtf_file in rfam_dir.glob("*.rfam.gtf"):
gtf_file.unlink()
def _handle_failed_jobs(
rfam_dir: Path,
cmsearch_cmd: List,
genome_file: PathLike,
rfam_selected_models_file: Path,
cm_models: Dict[str, Dict[str, Any]],
seed_descriptions: Dict[str, Dict[str, Any]],
rnafold_bin: Path = Path("RNAfold"),
) -> None:
"""Retry Rfam failed jobs using available cores and memory
Args:
rfam_dir (Path): Rfam output dir.
cmsearch_cmd (List): Cmsearch command.
genome_file (PathLike): Genome softmasked file.
rfam_selected_models_file (Path): Rfam model file.
cm_models (dict): Metrics of Rfam models.
seed_descriptions (dict): Rfam seed models file.
rnafold_bin: RNAfold software path.
"""
exception_file_list = rfam_dir.glob("*.rfam.except")
# Get the available system memory and CPU cores
available_memory = psutil.virtual_memory().available
# psutil.cpu_count(logical=False): number of physical CPU cores.
# psutil.cpu_count(logical=True): total number of logical CPU cores (physical cores and virtual cores)
# For computationally intensive tasks, using physical cores might be more appropriate,
# while for tasks that can benefit from parallelism, using logical cores might be beneficial.
available_cores = psutil.cpu_count(logical=False)
# Calculate the optimal memory per core and the number of cores to use
memory_per_core = available_memory // available_cores
num_cores = min(available_cores, available_memory // memory_per_core)
pool = multiprocessing.Pool(num_cores) # pylint: disable=consider-using-with
for exception_file in exception_file_list:
logger.info("Running himem job for failed region:%s\n", exception_file)
match = re.search(r"(.+)\.rs(\d+)\.re(\d+)\.", exception_file.name)
if match:
except_region = match.group(1)
except_start = match.group(2)
except_end = match.group(3)
except_slice_id = [except_region, except_start, except_end]
pool.apply_async(
_multiprocess_cmsearch,
args=(
cmsearch_cmd,
except_slice_id,
genome_file,
rfam_dir,
rfam_selected_models_file,
cm_models,
seed_descriptions,
memory_per_core,
rnafold_bin,
),
)
pool.close()
pool.join()
def _get_rfam_seed_descriptions(rfam_seeds_file: PathLike) -> Dict[str, Dict[str, Any]]:
"""Get Rfam seed description
Args:
rfam_seeds_file (PathLike): File of Rfam seeds
Returns:
dict: List of Rfam seeds with description,name, type
"""
descriptions: Dict[str, Dict[str, Any]] = {}
rfam_seeds = []
domain = ""
# NOTE: for some reason the decoder breaks on the seeds file,
# so I have made this ignore errors
with open(rfam_seeds_file, encoding="utf-8", errors="ignore") as rfam_seeds_in:
rfam_seeds = rfam_seeds_in.read().splitlines()
for seed in rfam_seeds:
matches = re.findall(r"^\#=GF (AC|DE|ID|TP)\s+(.+)", seed)
if matches:
key, value = matches[0]
# logger.info("rfam seed %s %s %s", key,value, matches)
if key == "AC":
domain = value
descriptions[domain] = {}
elif key == "DE":
assert domain is not None, "Domain should not be None at this point."
descriptions[domain]["description"] = value
elif key == "ID":
assert domain is not None, "Domain should not be None at this point."
descriptions[domain]["name"] = value
elif key == "TP":
assert domain is not None, "Domain should not be None at this point."
descriptions[domain]["type"] = value
return descriptions
def _extract_rfam_metrics(rfam_selected_models: PathLike) -> Dict[str, Dict[str, Any]]:
"""Get name, description, length, max length, threshold of each Rfam model.
Args:
rfam_selected_models_file : Path for Rfam models.
Returns:
parsed_cm_data: Rfam metrics.
"""
with open(rfam_selected_models, "r") as rfam_cm_in:
rfam_models = rfam_cm_in.read().split("//\n")
parsed_cm_data: Dict[str, Dict[str, Any]] = {}
for model in rfam_models:
model_name_match = re.search(r"NAME\s+(\S+)", model)
match_infernal = re.search(r"INFERNAL", model)
if model_name_match and match_infernal:
model_name = model_name_match.group(1)
parsed_cm_data[model_name] = {}
parse_regex = {
r"^NAME\s+(\S+)": "-name",
r"^DESC\s+(\S+)": "-description",
r"^CLEN\s+(\d+)": "-length",
r"^W\s+(\d+)": "-maxlength",
r"^ACC\s+(\S+)": "-accession",
r"^GA\s+(\d+)": "-threshold",
}
for line in model.split("\n"):
for pattern, value_type in parse_regex.items():
match = re.search(pattern, line)
if match:
parsed_cm_data[model_name][value_type] = match.group(1)
continue
return parsed_cm_data
def _multiprocess_cmsearch(
cmsearch_cmd: List[str],
slice_id: List[str],
genome_file: Path,
rfam_dir: Path,
rfam_selected_models_file: Path,
cm_models: Dict[str, Dict[str, Any]],
seed_descriptions: Dict[str, Dict[str, Any]],
rnafold_bin: Path = Path("RNAfold"),
) -> None:
"""Run cmsearch on multiprocess on genomic slices
Args:
cmsearch_cmd : Cmsearch command to execute.
slice_id: List of slice IDs.
genome_file : Genome softamsked file.
rfam_dir : Rfam output dir.
rfam_selected_models_file : Rfam model file.
cm_models : Metrics of Rfam models.
seed_descriptions : Rfam seed models file.
rnafold_bin : RNAfold software path.
"""
region_name, start, end = slice_id
logger.info(
"Processing Rfam data using cmsearch against slice: %s:%s:%s",
region_name,
start,
end,
)
seq = get_sequence(region_name, int(start), int(end), 1, genome_file, rfam_dir)
slice_name = f"{region_name}.rs{start}.re{end}"
slice_file = rfam_dir / f"{slice_name}.fa"
with open(slice_file, "w+", encoding="utf8") as region_out:
region_out.write(f">{region_name}\n{seq}\n")
region_tblout = rfam_dir / f"{slice_name}.tblout"
region_results = rfam_dir / f"{slice_name}.rfam.gtf"
exception_results = rfam_dir / f"{slice_name}.rfam.except"
cmsearch_cmd.append(str(region_tblout))
cmsearch_cmd.append(str(rfam_selected_models_file))
cmsearch_cmd.append(str(slice_file))
logger.info(" ".join(cmsearch_cmd))
# if memory_limit is not None:
# cmsearch_cmd = prlimit_command(cmsearch_cmd, memory_limit)
return_value = None
try:
return_value = subprocess.check_output(
cmsearch_cmd,
stderr=subprocess.STDOUT,
text=True,
)
except subprocess.CalledProcessError as ex:
# Note that writing to file was the only option here.
# If return_value was passed back, eventually it would clog the
# tiny pipe that is used by the workers to send info back.
# That would mean that it would eventually just be one
# worked running at a time
logger.error(
"Issue processing the following region with cmsearch: %s %s-%s ",
region_name,
start,
end,
)
logger.error("Return value: %s", return_value)
with open(exception_results, "w+") as exception_out:
exception_out.write(f"{region_name} {start} {end}\n")
region_results.unlink()
region_tblout.unlink()
raise ex # Re-raise the exception to allow caller to handle it further if needed
initial_table_results = _parse_rfam_tblout(region_tblout, region_name)
unique_table_results = _remove_rfam_overlap(initial_table_results)
filtered_table_results = _filter_rfam_results(unique_table_results, cm_models)
_create_rfam_gtf(
filtered_table_results,
cm_models,
seed_descriptions,
region_name,
region_results,
genome_file,
rfam_dir,
rnafold_bin,
)
region_results.unlink()
region_tblout.unlink()
def _parse_rfam_tblout(region_tblout: Path, region_name: str) -> List[Dict[str, Any]]:
"""Parse cmsearch output
col 0 Target Name : This is the name of the target sequence or sequence region that matched the query.
col 2 Query name : This is the name of the query sequence or model that was used for the search.
col 3 Accession : This usually refers to a unique identifier for the target sequence.
col 5 Query Start : The position where the match starts on the query sequence.
col 6 Query End : The position where the match ends on the query sequence.
col 7 Target Start : The position where the match starts on the target sequence.
col 8 Target End : The position where the match ends on the target sequence.
col 9 Strand : Indicates the orientation of the match on the target sequence.
It could be + for the forward strand or - for the reverse strand.
col 14 Hit Score : The score assigned to this match. Higher scores generally indicate better matches.
col 15 E-value : This is a statistical measure of the number of hits one can expect to see
when searching a database of a particular size.
Args:
region_tblout : Cmsearch output for the region name.
region_name : Region name.
Returns:
Formatted cmsearch output
"""
with open(region_tblout, "r") as rfam_tbl_in:
rfam_tbl_data = rfam_tbl_in.read()
tbl_results = rfam_tbl_data.split("\n")
all_parsed_results: List[Dict[str, Any]] = []
for result in tbl_results:
parsed_tbl_data: Dict[str, Any] = {}
if re.compile(rf"^{region_name}").match(result):
hit = result.split()
parsed_tbl_data = {
"accession": str(hit[3]),
"start": str(hit[7]),
"end": str(hit[8]),
"strand": 1 if hit[9] == "+" else -1,
"query_name": str(hit[2]),
"score": str(hit[14]),
}
all_parsed_results.append(parsed_tbl_data)
return all_parsed_results
def _remove_rfam_overlap(parsed_tbl_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Remove Rfam mdoels overlapping and with a lower score.
Args:
parsed_tbl_data : Cmsearch output
Returns:
Final Rfam models
"""
excluded_structures = {}
chosen_structures: List[Dict[str, Any]] = []
for structure_x in parsed_tbl_data:
chosen_structure = structure_x
structure_x_start = int(structure_x["start"])
structure_x_end = int(structure_x["end"])
structure_x_score = float(structure_x["score"])
structure_x_accession = structure_x["accession"]
structure_x_string = (
f"{structure_x_start}:{structure_x_end}:{structure_x_score}:{structure_x_accession}"
)
for structure_y in parsed_tbl_data:
structure_y_start = int(structure_y["start"])
structure_y_end = int(structure_y["end"])
structure_y_score = float(structure_y["score"])
structure_y_accession = structure_y["accession"]
structure_y_string = (
f"{structure_y_start}:{structure_y_end}:{structure_y_score}:{structure_y_accession}"
)
if structure_y_string in excluded_structures:
continue
if structure_x_start <= structure_y_end and structure_x_end >= structure_y_start:
if structure_x_score < structure_y_score:
chosen_structure = structure_y
excluded_structures[structure_x_string] = 1
else:
excluded_structures[structure_y_string] = 1
chosen_structures.append(chosen_structure)
return chosen_structures
def _filter_rfam_results(
unfiltered_tbl_data: List[Dict[str, Any]], cm_models: Dict[str, Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Filter Rfam models according to type and a set of thresholds.
Args:
unfiltered_tbl_data : Unfiltered Rfam output.
cm_models : Rfam models.
Returns:
filtered_results: List of filtered models
"""
filtered_results: List[Dict[str, Any]] = []
thresholds = {
"LSU_rRNA_eukarya": 1700,
"SSU_rRNA_eukarya": 1600,
"5_8S_rRNA": 85,
"5S_rRNA": 75,
}
for structure in unfiltered_tbl_data:
query = structure["query_name"]
if query in ["LSU_rRNA_archaea", "LSU_rRNA_bacteria"]:
threshold = cm_models.get(str(query), {}).get("-threshold")
else:
threshold = thresholds.get(str(query), cm_models.get(str(query), {}).get("-threshold"))
if threshold is not None and float(structure["score"]) >= float(threshold):
logger.info("filtera rfam results %s", structure)
filtered_results.append(structure)
return filtered_results
# NOTE: The below are notes from the perl code (which has extra code)
# about possible improvements that are not implemented there
# Although not included in RefSeq filters, additional filters that
# consider sizes and score_to_size ratios can be applied
# in future work to further exclude FPs
#
# my $is_valid_size = $mapping_length > $min_length && $mapping_length < $max_length ? 1 : 0;
# my $score_size_ratio = $result->{'score'} / $mapping_length;
def _create_rfam_gtf(
filtered_results: List[Dict[str, Any]],
cm_models: Dict[str, Dict[str, Any]],
seed_descriptions: Dict[str, Dict[str, Any]],
region_name: str,
region_results: Path,
genome_file: Path,
rfam_dir: Path,
rnafold_bin: Path = Path("RNAfold"),
) -> None:
"""Convert RFam output per single region in gtf format
Args:
filtered_results : Filtered Rfam results without overlapping.
cm_models : Rfam database.
seed_descriptions : Rfam seed file.
region_name : Slice name.
region_results : Rfam output file.
genome_file : Genome file.
rfam_dir : Output file.
rnafold_bin: RNAfold software path.
"""
if not filtered_results:
return
biotype_type_mapping = {
r"snRNA; snoRNA; scaRNA": "scaRNA",
r"snRNA; snoRNA": "snoRNA",
r"snRNA": "snRNA",
r"rRNA;": "rRNA",
r"antisense;": "antisense",
r"antitoxin;": "antitoxin",
r"ribozyme;": "ribozyme",
}
biotype_name_mapping = {
r"Vault": "Vault_RNA",
r"Y_RNA": "Y_RNA",
r"^RNaseP": "RNase_P_RNA",
r"^RNase_M": "RNase_MRP_RNA",
}
with open(region_results, "w+") as rfam_gtf_out:
gene_counter = 1
for structure in filtered_results:
query = structure["query_name"]
accession = structure["accession"]
if query in cm_models:
model = cm_models[query] # pylint: disable=unused-variable
description = seed_descriptions.get(accession, {})
rfam_type = description.get("type", "misc_RNA")
domain = structure["query_name"]
# padding = model["-length"]
gtf_strand = structure["strand"]
rnafold_strand = structure["strand"]
if gtf_strand == 1:
start = structure["start"]
end = structure["end"]
gtf_strand = "+"
else:
start = structure["end"]
end = structure["start"]
# score = structure["score"]
gtf_strand = "-"
rnafold_strand = -1
biotype = "misc_RNA"
# Flag to track if a match is found
match_found = False
# Check each pattern and update biotype if a match is found
for pattern, mapped_biotype in biotype_type_mapping.items():
if re.search(pattern, str(rfam_type)):
biotype = mapped_biotype
match_found = True
break # Break out of the loop once a match is found
# If no match is found, check matches in biotype_name_mapping
if not match_found:
# Check each pattern and update biotype if a match is found
for pattern, mapped_biotype in biotype_name_mapping.items():
if re.search(pattern, domain):
biotype = mapped_biotype
match_found = True
break # Break out of the loop once a match is found
rna_seq = get_sequence(
region_name,
int(start),
int(end),
int(rnafold_strand),
genome_file,
rfam_dir,
)
valid_structure = check_rnafold_structure(rna_seq, rfam_dir, rnafold_bin)
if not valid_structure:
continue
transcript_string = (
region_name
+ "\tRfam\ttranscript\t"
+ str(start)
+ "\t"
+ str(end)
+ "\t.\t"
+ gtf_strand
+ "\t.\t"
+ 'gene_id "'
+ str(gene_counter)
+ '"; transcript_id "'
+ str(gene_counter)
+ '"; biotype "'
+ biotype # pylint: disable=undefined-loop-variable
+ '";\n'
)
exon_string = (
region_name
+ "\tRfam\texon\t"
+ str(start)
+ "\t"
+ str(end)
+ "\t.\t"
+ gtf_strand
+ "\t.\t"
+ 'gene_id "'
+ str(gene_counter)
+ '"; transcript_id "'
+ str(gene_counter)
+ '"; exon_number "1"; biotype "'
+ biotype # pylint: disable=undefined-loop-variable
+ '";\n'
)
rfam_gtf_out.write(transcript_string)
rfam_gtf_out.write(exon_string)
gene_counter += 1
def check_rnafold_structure(seq: str, rfam_dir: Path, rnafold_bin: Path = Path("RNAfold")) -> Union[float, None]:
"""RNAfold reads RNA sequences, calculates their minimum free energy (mfe)
structure and prints the mfe structure in bracket notation and its free energy.
Args:
seq : sequence
rfam_dir : cmsearch working directory
rnafold_bin : Software path. Defaults to Path("RNAfold").
Returns:
float: mfe structure
"""
# Note there's some extra code in the RNAfold Perl module
# for encoding the structure into an attrib
# Could consider implementing this when running
# for loading into an Ensembl db
free_energy_score = None
try:
with tempfile.NamedTemporaryFile(mode="w+t", delete=False, dir=rfam_dir) as rna_temp_in:
rna_temp_in.writelines(">seq1\n" + seq + "\n")
rna_in_file_path = rna_temp_in.name
rnafold_cmd = [str(rnafold_bin), "--infile", str(rna_in_file_path)]
rnafold_output = subprocess.check_output(
rnafold_cmd,
stderr=subprocess.STDOUT,
text=True,
)
for line in rnafold_output.splitlines():
match = re.search(r"([().]+)\s\(\s*(-*\d+.\d+)\)", line)
if match:
structure = match.group(1) # pylint: disable=unused-variable
free_energy_score = float(match.group(2)) # TOADD DAF
break
except (subprocess.CalledProcessError, OSError) as e:
logging.error("Error while running RNAfold: %s", e)
finally:
#rna_in_file_path.unlink()
logger.info("FREE ENERGY %s", free_energy_score)
return free_energy_score
# this function is useful for the DnaAlignFeature and it should help when we load into the ensembl db
# NOTE some of the code above and the code commented out here is to do with creating
# a DAF. As we don't have a python concept of this I'm leaving it out for the moment
# but the code below is a reference
# my $daf = Bio::EnsEMBL::DnaDnaAlignFeature->new(
# -slice => $self->queries,
# -start => $strand == 1 ? $start : $end,
# -end => $strand == 1 ? $end : $start,
# -strand => $strand,
# -hstart => $hstart,
# -hend => $hend,
# -hstrand => $strand,
# -score => $score,
# -hseqname => length($target_name) > 39 ?
# substr($target_name, 0, 39) : $target_name,,
# -p_value => $evalue,
# -align_type => 'ensembl',
# -cigar_string => abs($hend - $hstart) . "M",
# -hcoverage => $RNAfold->score,
# );
# this function is useful to upload the RNAfold structure in the transcript attribute table
# should be moved into load db module
def encode_str(input_string: str) -> List:
"""Encode the number of repeated sequences reporting the sequence of characters and the number
of occurences; when the encoding sequence reach 200 it is splitted reporting the start and the
stop position in the transcript's structure
Args:
input_string : input sequence
Returns:
List: List of encoded sequences
"""
codes = []
start = 1
count = 0
code = ""
last_chrom = ""
array = []
for chrom in input_string:
count += 1
if chrom == last_chrom:
array.append(chrom)
else:
if code and len(code) > 200 and len(array) == 1:
codes.append(f"{start}:{count}\t{code}")
code = ""
start = count + 1
if len(array) > 1:
code += str(len(array))
array = []
code += chrom
last_chrom = chrom
if len(array) > 1:
code += str(len(array))
codes.append(f"{start}:{count}\t{code}")
return codes
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="RepeatMasker's arguments")
parser.add_argument("--genome_file", required=True, help="Genome file path")
parser.add_argument("--output_dir", required=True, help="Output directory path")
parser.add_argument("--rfam_accessions", required=True, help="List of Rfam accessions.")
parser.add_argument(
"--rfam_cm_db",
default="/hps/nobackup/flicek/ensembl/genebuild/blastdb/ncrna/Rfam_14.0/Rfam.cm",
help="Rfam database with cm models.",
)
parser.add_argument(
"--rfam_seeds_file",
default="/hps/nobackup/flicek/ensembl/genebuild/blastdb/ncrna/Rfam_14.0/Rfam.seed",
help="CRfam seeds file.",
)
parser.add_argument(
"--cmsearch_bin",
default="cmsearch",
help="cmsearch software path.",
)
parser.add_argument(
"--rnafold_bin",
default="RNAfold",
help="RNAfold software path.",
)
parser.add_argument(
"--num_threads",
type=int,
default=1,
help="Number of threads",
)
return parser.parse_args()
def main():
"""cmsearch's entry-point."""
args = parse_args()
log_file_path = create_dir(args.output_dir, "log") / "cmsearch.log"
loginipath = Path(__file__).parents[6] / "conf" / "logging.conf"
logging.config.fileConfig(
loginipath,
defaults={"logfilename": str(log_file_path)},
disable_existing_loggers=False,
)
run_cmsearch(
args.genome_file,
args.output_dir,
args.rfam_accessions,
args.rfam_cm_db,
args.rfam_seeds_file,
args.cmsearch_bin,
args.rnafold_bin,
args.num_threads,
)
if __name__ == "__main__":
main()