# See the NOTICE file distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The STAR (Spliced Transcripts Alignment to a Reference) alignment tool is widely used
in genomics research for aligning RNA-seq data to a reference genome.
Dobin A, Davis CA, Schlesinger F, et al. STAR: ultrafast universal RNA-seq aligner.
Bioinformatics. 2013;29(1):15-21. doi:10.1093/bioinformatics/bts635
"""
__all__ = ["run_star"]
import argparse
import logging
import logging.config
import gzip
import math
import multiprocessing
from pathlib import Path
import random
import re
import shutil
import subprocess
from typing import List
from ensembl.tools.anno.utils._utils import (
check_exe,
create_dir,
check_gtf_content,
get_seq_region_length,
)
[docs]def run_star(
genome_file: Path,
output_dir: Path,
short_read_fastq_dir: Path,
delete_pre_trim_fastq: bool = False,
trim_fastq: bool = False,
max_reads_per_sample: int = 0,
max_intron_length: int = 100000,
num_threads: int = 1,
star_bin: Path = Path("star"),
samtools_bin: Path = Path("samtools"),
trim_galore_bin: Path = Path("trim_galore"),
) -> None:
"""
Run STAR alignment on list of short read data.
:param genome_file: Genome file path.
:type genome_file: Path
:param output_dir: Working directory path.
:type output_dir: Path
:param short_read_fastq_dir: Short read directory path.
:type short_read_fastq_dir: Path
:param delete_pre_trim_fastq: Delete the original fastq files after trimming. Defaults to False.
:type delete_pre_trim_fastq: boolean, default False
:param trim_fastq: Trim short read files using TrimGalore. Defaults to False.
:type trim_fastq: boolean, default False
:param max_reads_per_sample: Max number of reads per sample. Defaults to 0 (unlimited).
:type max_reads_per_sample: int, default 0
:param max_intron_length: The maximum intron size for alignments. Defaults to 100000.
:type max_intron_length: int, default 100000
:param num_threads: Number of available threads.
:type num_threads: int, default 1
:param star_bin: Software path.
:type star_bin: Path, default star
:param samtools_bin: Software path.
:type samtools_bin: Path,default samtools
:param trim_galore_bin: Software path.
:type trim_galore_bin: Path, default trim_galore
:return: None
:rtype: None
"""
check_exe(star_bin)
# If trimming has been enabled then switch the path for
# short_read_fastq_dir from the original location to the trimmed fastq dir
if trim_fastq:
run_trimming(output_dir, short_read_fastq_dir, delete_pre_trim_fastq, num_threads, trim_galore_bin)
short_read_fastq_dir = output_dir / "trim_galore_output"
# if not os.path.exists(subsample_script_path):
# subsample_script_path = "subsample_fastq.py"
star_dir = create_dir(output_dir, "star_output")
for output_file in [
Path(f"{output_dir}/stringtie_output/annotation.gtf"),
Path(f"{output_dir}/scallop_output/annotation.gtf"),
]:
if output_file.exists():
transcript_count = check_gtf_content(output_file, "transcript") # check a gtf
if transcript_count > 0:
logging.info("Transcriptomic alignment exists")
return
star_index_file = star_dir / "SAindex"
fastq_file_list = []
file_types = ("*.fastq", "*.fq", "*.fastq.gz", "*.fq.gz")
fastq_file_list = [
path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type)
]
if len(fastq_file_list) == 0:
raise IndexError(f"The list of fastq files is empty. Fastq dir:\n{short_read_fastq_dir}")
# for file_type in file_types:
# fastq_file_list.extend(glob.glob(os.path.join(short_read_fastq_dir, file_type)))
# Get list of paired paths
fastq_file_list = _create_paired_paths(fastq_file_list)
# Subsamples in parallel if there's a value set
if max_reads_per_sample:
subsample_transcriptomic_data(fastq_file_list)
# Get the list of the new subsampled files
fastq_file_list = [
path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type)
]
# I don't think is needed
# fastq_file_list = check_for_fastq_subsamples(fastq_file_list)
if not star_index_file.exists():
logging.info("Did not find an index file for Star. Will create now")
seq_region_to_length = get_seq_region_length(genome_file, 0)
genome_size = sum(seq_region_to_length.values())
# This calculates the base-2 logarithm of the genome_size. The logarithm of the genome size is
# a measure of how many bits are needed to represent the genome size in binary.
#
# The choice of 14 as the maximum value is likely based on empirical observations and optimization
# considerations. Too large of a seed length can lead to increased memory usage and potentially
# slower indexing, while a seed length that is too small might affect alignment accuracy.
index_bases = min(14, math.floor((math.log(genome_size, 2) / 2) - 1))
try:
subprocess.run( # pylint:disable=subprocess-run-check
[
str(star_bin),
"--runThreadN",
str(num_threads),
"--runMode",
"genomeGenerate",
"--outFileNamePrefix",
f"{star_dir}/",
"--genomeDir",
str(star_dir),
"--genomeSAindexNbases",
str(index_bases),
"--genomeFastaFiles",
str(genome_file),
]
)
except Exception as e:#pylint:disable=broad-exception-caught
logging.error("An error occurred while creating star index: %s", e)
logging.info("Running Star on the files in the fastq dir")
for fastq_file in fastq_file_list:
# logger.info(fastq_file_path)
# fastq_file_name = os.path.basename(fastq_file_path)
star_tmp_dir = star_dir / "tmp"
if star_tmp_dir.exists():
shutil.rmtree(star_tmp_dir)
sam_file = Path(f"{star_dir}/{fastq_file.name}.sam")
junctions_file = Path(f"{star_dir}/{fastq_file.name}.sj.tab")
sam_file_name = sam_file.name
sam_temp_file = Path(f"{star_dir}/{sam_file_name}.tmp")
bam_file = re.sub(".sam", ".bam", sam_file_name)
bam_sort_file = Path(f"{star_dir}/{bam_file}")
log_out_file = Path(f"{star_dir}/{fastq_file.name}.Log.final.out")
if log_out_file.exists() and bam_sort_file.exists() and bam_sort_file.stat().st_size != 0:
logging.info(
"Found an existing bam file for the fastq file, \
presuming the file has been processed, will skip"
)
continue
logging.info("Processing %s", fastq_file)
star_command = [
str(star_bin),
"--outFilterIntronMotifs",
"RemoveNoncanonicalUnannotated",
"--outSAMstrandField",
"intronMotif",
"--runThreadN",
str(num_threads),
"--twopassMode",
"Basic",
"--runMode",
"alignReads",
"--genomeDir",
str(star_dir),
"--readFilesIn",
str(fastq_file),
"--outFileNamePrefix",
f"{star_dir}/",
"--outTmpDir",
str(star_tmp_dir),
"--outSAMtype",
"SAM",
"--alignIntronMax",
str(max_intron_length),
]
#'--outSJfilterIntronMaxVsReadN','5000','10000','25000','40000',
#'50000','50000','50000','50000','50000','100000']
# check_compression = re.search(r".gz$", fastq_file)
if fastq_file.suffix.endswith(".gz"):
star_command.append("--readFilesCommand")
star_command.append("gunzip")
star_command.append("-c")
subprocess.run(star_command) # pylint:disable=subprocess-run-check
shutil.move(Path(f"{star_dir}/Aligned.out.sam"), sam_file)
shutil.move(Path(f"{star_dir}/SJ.out.tab"), junctions_file)
logging.info("Converting samfile into sorted bam file. Bam file: %s", bam_file)
subprocess.run( # pylint:disable=subprocess-run-check
[
str(samtools_bin),
"sort",
"-@",
str(num_threads),
"-T",
str(sam_temp_file),
"-o",
str(bam_sort_file),
str(sam_file),
]
)
shutil.move(star_dir / "Log.final.out", log_out_file)
sam_file.unlink()
logging.info("Completed running STAR")
def _create_paired_paths(fastq_file_paths: List) -> List[Path]:
"""
Create list of paired transcriptomic fastq files
Args:
fastq_file_paths (List): List of transcriptomic file paths.
Returns:
List: List of paired transcriptomic files
"""
path_dict = {}
# final_list = []
for fastq_file in fastq_file_paths:
paired_name = re.search(r"(.+)_\d+\.(fastq|fq)", str(fastq_file))
if not paired_name:
logging.exception(
"Could not find _1 or _2 at the end of the prefix \
for file. Assuming file is not paired: %s",
fastq_file,
)
# final_list.append([fastq_file])
path_dict[fastq_file] = [fastq_file]
continue
run_accession = paired_name.group(1)
if run_accession in path_dict:
path_dict[run_accession].append(fastq_file)
else:
path_dict[run_accession] = [fastq_file]
# for pair in path_dict:
# final_list.append(path_dict[pair])
logging.info([value for values_list in path_dict.values() for value in values_list])
return [value for values_list in path_dict.values() for value in values_list]
# pylint:disable=pointless-string-statement
"""
For an advanced and optimised subsampling we could use
https://github.com/lh3/seqtk
"""
def _subsample_paired_fastq_files(
fastq_files: List[Path],
subsample_read_limit: int = 100000000,
num_threads: int = 2,
compressed: bool = False,
) -> None:
"""
Perform subsampling on two paired FastQ files in parallel using multiple threads.
Args:
fastq_files : Path for paired fastq files.
output_files : Path for the output file.
subsample_read_limit : Subsample size, defaults to 100000000.
num_threads : Number of threads, defaults to 2.
compressed : file compressed, defaults to False.
"""
if len(fastq_files) == 2:
fastq_file_1, fastq_file_2 = fastq_files
output_file_1, output_file_2 = [Path(f"{fastq_file_1}.sub"), Path(f"{fastq_file_2}.sub")]
elif len(fastq_files) == 1:
fastq_file_1 = fastq_files[0]
output_file_1 = Path(f"{fastq_file_1}.sub")
else:
raise FileNotFoundError("No fastq file found")
if fastq_file_1.suffix.endswith(".gz$"):
compressed = True
num_lines = sum(1 for line in gzip.open(fastq_file_1)) # pylint:disable=consider-using-with
else:
num_lines = sum(1 for line in open(fastq_file_1)) # pylint:disable=consider-using-with
range_limit = int(num_lines / 4)
if range_limit <= subsample_read_limit:
logging.info(
"Number of reads (%s is less than the max allowed read count (%s), \
no need to subsample",
str(range_limit),
str(subsample_read_limit),
)
return
rand_list = random.sample(range(0, range_limit - 1), subsample_read_limit)
random_indices = {idx * 4: 1 for idx in rand_list}
logging.info("Processing paired files in parallel")
pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with
pool.apply_async(
_subsample_fastq_subset,
args=(
fastq_file_1,
output_file_1,
random_indices,
compressed,
),
)
pool.apply_async(
_subsample_fastq_subset,
args=(
fastq_file_2,
output_file_2,
random_indices,
compressed,
),
)
pool.close()
pool.join()
def _subsample_fastq_subset(
fastq_file: Path, output_file: Path, random_indices: dict, compressed: bool
) -> None:
"""
Selecting specific sets of four lines from an input FastQ file and writing them to an output file.
Args:
fastq_file : Path for the fastq file.
output_file : Path for the output file.
random_indices : set of random indices.
compressed : the files is compressed
"""
line_index = 0
with gzip.open(fastq_file, "rt") if compressed else open(fastq_file) as file_in, open(
output_file, "w+"
) as file_out:
lines = [file_in.readline() for _ in range(4)]
while lines[3]: # This ensures that the loop continues until the end of the input file.
if line_index in random_indices:
file_out.writelines(lines)
line_index += 4
lines = [file_in.readline() for _ in range(4)]
def subsample_transcriptomic_data(fastq_file_list: List, num_threads: int = 2) -> None:
"""
Subsample paired fastq files.
Args:
fastq_file_list : List of fastq file path to process.
num_threads : number of threads
"""
for fastq_files in fastq_file_list:
fastq_file_1, fastq_file_2 = fastq_files
# fastq_file_pair = ""
# if len(fastq_files) == 2:
# fastq_file_pair = fastq_files[1]
if len(fastq_files) == 1:
fastq_file_1 = fastq_files[0]
if Path(f"{fastq_file_1}.sub").exists():
logging.info(
"Found an existing .sub file on the fastq path, will use that instead. File:%s.sub",
fastq_file_1,
)
else:
_subsample_paired_fastq_files(fastq_files, compressed=True, num_threads=num_threads)
elif len(fastq_files) == 2:
fastq_file_1, fastq_file_2 = fastq_files
if Path(f"{fastq_file_1}.sub").exists() and Path(f"{fastq_file_2}.sub").exists():
logging.info(
"Found an existing .sub files on the fastq path for both members of the pair, will use \
those instead of subsampling again. Files: %s.sub,%s.sub",
fastq_file_1,
fastq_file_2,
)
elif Path(f"{fastq_file_2}.sub").exists():
_subsample_paired_fastq_files(fastq_files, compressed=True, num_threads=num_threads)
def run_trimming(
output_dir: Path,
short_read_fastq_dir: Path,
delete_pre_trim_fastq: bool = False,
num_threads: int = 1,
trim_galore_bin="trim_galore",
) -> None:
"""
Trim list of short read fastq files.
Args:
output_dir : Working directory path.
short_read_fastq_dir : Short read directory path.
delete_pre_trim_fastq : Removing original fastq file post trimming. Defaults to False.
num_threads : Number of threads.
trim_galore_bin : Software path.
"""
check_exe(trim_galore_bin)
trim_dir = create_dir(output_dir, "trim_galore_output")
fastq_file_list = []
file_types = ("*.fastq", "*.fq", "*.fastq.gz", "*.fq.gz")
fastq_file_list = [
path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type)
]
fastq_file_list = _create_paired_paths(fastq_file_list)
trim_galore_cmd = [
str(trim_galore_bin),
"--illumina",
"--quality",
"20",
"--length",
"50",
"--output_dir",
str(trim_dir),
]
pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with
for fastq_file in fastq_file_list:
if delete_pre_trim_fastq:
fastq_file.unlink()
pool.apply_async(
multiprocess_trim_galore,
args=(
trim_galore_cmd,
fastq_file,
trim_dir,
),
)
pool.close()
pool.join()
trimmed_fastq_list = trim_dir.glob("*.fq.gz")
for trimmed_fastq_path in trimmed_fastq_list:
logging.info("Trimmed file path: %s", str(trimmed_fastq_path))
sub_patterns = re.compile(r"|".join(("_val_1.fq", "_val_2.fq", "_trimmed.fq")))
updated_file_path_name = sub_patterns.sub(".fq", trimmed_fastq_path.name)
updated_file_path = short_read_fastq_dir / updated_file_path_name
logging.info("Updated file path: %s", str(updated_file_path))
trimmed_fastq_path.rename(updated_file_path)
files_to_delete_list: List[Path] = []
for file_type in file_types:
files_to_delete_list.extend(short_read_fastq_dir.glob(file_type))
for file_to_delete in files_to_delete_list:
file_to_delete.unlink()
def multiprocess_trim_galore(trim_galore_cmd: List, fastq_paired_files: List[Path]) -> None:
"""
Trim short paired or single short read fastq file.
Args:
trim_galore_cmd : Generic command.
fastq_paired_files : List of single or paired fastq files.
"""
fastq_file = fastq_paired_files[0]
fastq_file_pair = None
if len(fastq_paired_files) == 2:
fastq_file, fastq_file_pair = fastq_paired_files
trim_galore_cmd.append("--paired")
trim_galore_cmd.append(fastq_file)
trim_galore_cmd.append(fastq_file_pair)
elif len(fastq_paired_files) == 1:
trim_galore_cmd.append(fastq_paired_files)
logging.info("Running Trim Galore with the following command: %s", {" ".join(trim_galore_cmd)})
subprocess.run(trim_galore_cmd, check=True)
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="STAR's arguments")
parser.add_argument("--genome_file", required=True, help="Genome file path")
parser.add_argument("--output_dir", required=True, help="Output directory path")
parser.add_argument("--short_read_fastq_dir", required=True, help="Short read directory path")
parser.add_argument(
"--delete_pre_trim_fastq",
action="store_true",
default=False,
help="Delete the original fastq files after trimming",
)
parser.add_argument(
"--trim_fastq", action="store_true", default=False, help="Trim the short read files using Trim Galore"
)
parser.add_argument(
"--max_reads_per_sample", type=int, default=0, help="The maximum number of reads to use per sample"
)
parser.add_argument(
"--max_intron_length", type=int, default=100000, help="The maximum intron size for alignments"
)
parser.add_argument("--num_threads", type=int, default=1, help="Number of threads")
parser.add_argument("--star_bin", default="STAR", help="Star software path")
parser.add_argument("--samtools_bin", default="samtools", help="Samtools software path")
parser.add_argument("--trim_galore_bin", default="trim_galore", help="Trim Galore software path")
return parser.parse_args()
def main():
"""STAR's entry-point."""
args = parse_args()
log_file_path = create_dir(args.output_dir, "log") / "star.log"
loginipath = Path(__file__).parents[6] / "conf" / "logging.conf"
logging.config.fileConfig(
loginipath,
defaults={"logfilename": str(log_file_path)},
disable_existing_loggers=False,
)
run_star(
args.genome_file,
args.output_dir,
args.short_read_fastq_dir,
args.delete_pre_trim_fastq,
args.trim_fastq,
args.max_reads_per_sample,
args.max_intron_length,
args.num_threads,
args.star_bin,
args.samtools_bin,
args.trim_galore_bin,
)
if __name__ == "__main__":
main()
# pylint:disable=pointless-string-statement
"""
def model_builder(work_dir):
star_output_dir = os.path.join(work_dir, "star_output")
all_junctions_file = os.path.join(star_output_dir, "all_junctions.sj")
sjf_out = open(all_junctions_file, "w+")
for sj_tab_file in glob.glob(input_dir + "/*.sj.tab"):
sjf_in = open(sj_tab_file)
sjf_lines = sjf_in.readlines()
for line in sjf_lines:
elements = line.split("\t")
strand = "+"
# my $slice_name = $eles[0];
# my $start = $eles[1];
# my $end = $eles[2];
# my $strand = $eles[3];
# If the strand is undefined then skip, Augustus expects a strand
if elements[3] == "0":
continue
elif elements[3] == "2":
strand = "-"
junction_length = int(elements[2]) - int(elements[1]) + 1
if junction_length < 100:
continue
if not elements[4] and elements[7] < 10:
continue
# For the moment treat multimapping and single
# mapping things as a combined score
score = float(elements[6]) + float(elements[7])
score = str(score)
output_line = [
elements[0],
"RNASEQ",
"intron",
elements[1],
elements[2],
score,
strand,
".",
("src=W;mul=" + score + ";"),
]
sjf_out.write("\t".join(output_line) + "\n")
sjf_out.close()
"""