Coverage for src / ensembl / utils / archive.py: 100%
31 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 15:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 15:03 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Utils for common IO operations over archive files, e.g. tar or gzip."""
17from __future__ import annotations
19__all__ = [
20 "SUPPORTED_ARCHIVE_FORMATS",
21 "open_gz_file",
22 "extract_file",
23]
25from contextlib import contextmanager
26import gzip
27from pathlib import Path
28import shutil
29from typing import Generator, IO
31from ensembl.utils import StrPath
32from ensembl.utils.argparse import ArgumentParser
35def _unpack_gz_files(src_file: StrPath, dst_dir: StrPath) -> None:
36 """Unpacks `src_file` to `dst_dir`.
38 Args:
39 src_file: File path to unpack (with ".gz" extension).
40 dst_dir: Directory path to unpack the file into.
42 """
43 # Remove '.gz' extension to create the destination file name
44 dst_file = Path(dst_dir) / Path(src_file).stem
45 with gzip.open(src_file, "rb") as f_in:
46 with dst_file.open("wb") as f_out:
47 shutil.copyfileobj(f_in, f_out)
50shutil.register_unpack_format("gzip", [".gz"], _unpack_gz_files, description="GZIP file")
52# Each registered format is a tuple (name, extensions, description)
53SUPPORTED_ARCHIVE_FORMATS = [ext for elem in shutil.get_unpack_formats() for ext in elem[1]]
56@contextmanager
57def open_gz_file(
58 file_path: StrPath, mode: str = "rt", encoding: str = "utf-8"
59) -> Generator[gzip.GzipFile | IO, None, None]:
60 """Yields an open file object, even if the file is compressed with gzip.
62 The file is expected to contain a text, and this can be used with the usual "with".
64 Args:
65 file_path: A (single) file path to open.
66 mode: The mode in which the file is opened.
67 encoding: The name of the encoding used to decode or encode the file.
69 """
70 src_file = Path(file_path)
71 if src_file.suffix == ".gz":
72 with gzip.open(src_file, mode, encoding=encoding) as fh:
73 yield fh
74 else:
75 with src_file.open(mode, encoding=encoding) as fh:
76 yield fh
79def extract_file(src_file: StrPath, dst_dir: StrPath) -> None:
80 """Extracts the `src_file` into `dst_dir`.
82 If the file is not an archive, it will be copied to `dst_dir`. `dst_dir` will be created if it
83 does not exist.
85 Args:
86 src_file: Path to the file to unpack.
87 dst_dir: Path to the folder where to extract the file.
89 """
90 src_file = Path(src_file)
91 extensions = {"".join(src_file.suffixes[i:]) for i in range(0, len(src_file.suffixes))}
93 if extensions.intersection(SUPPORTED_ARCHIVE_FORMATS):
94 shutil.unpack_archive(src_file, dst_dir)
95 else:
96 # Replicate the functionality of shutil.unpack_archive() by creating `dst_dir`
97 Path(dst_dir).mkdir(parents=True, exist_ok=True)
98 shutil.copy(src_file, dst_dir)
101def extract_file_cli() -> None:
102 """Entry-point for the `extract_file` method"""
103 parser = ArgumentParser(description="Extracts file to the given location.")
104 parser.add_argument_src_path("--src_file", required=True, help="Path to the file to unpack")
105 parser.add_argument_dst_path(
106 "--dst_dir", default=Path.cwd(), help="Path to the folder where to extract the file"
107 )
108 args = parser.parse_args()
109 extract_file(args.src_file, args.dst_dir)