Coverage for src / ensembl / utils / archive.py: 95%
34 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 10:45 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 10:45 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Utils for common IO operations over archive files, e.g. tar or gzip."""
17from __future__ import annotations
19__all__ = [
20 "SUPPORTED_ARCHIVE_FORMATS",
21 "open_gz_file",
22 "extract_file",
23]
25from contextlib import contextmanager
26import gzip
27from pathlib import Path
28import shutil
29import sys
30from typing import Any, Generator, IO
32from ensembl.utils import StrPath
33from ensembl.utils.argparse import ArgumentParser
36def _unpack_gz_files(
37 src_file: StrPath, dst_dir: StrPath, **kwargs: Any # pylint: disable=unused-argument
38) -> None:
39 """Unpacks `src_file` to `dst_dir`.
41 Args:
42 src_file: File path to unpack (with ".gz" extension).
43 dst_dir: Directory path to unpack the file into.
45 """
46 # Remove '.gz' extension to create the destination file name
47 dst_file = Path(dst_dir) / Path(src_file).stem
48 with gzip.open(src_file, "rb") as f_in:
49 with dst_file.open("wb") as f_out:
50 shutil.copyfileobj(f_in, f_out)
53shutil.register_unpack_format("gzip", [".gz"], _unpack_gz_files, description="GZIP file")
55# Each registered format is a tuple (name, extensions, description)
56SUPPORTED_ARCHIVE_FORMATS = [ext for elem in shutil.get_unpack_formats() for ext in elem[1]]
59@contextmanager
60def open_gz_file(
61 file_path: StrPath, mode: str = "rt", encoding: str = "utf-8"
62) -> Generator[gzip.GzipFile | IO, None, None]:
63 """Yields an open file object, even if the file is compressed with gzip.
65 The file is expected to contain a text, and this can be used with the usual "with".
67 Args:
68 file_path: A (single) file path to open.
69 mode: The mode in which the file is opened.
70 encoding: The name of the encoding used to decode or encode the file.
72 """
73 src_file = Path(file_path)
74 if src_file.suffix == ".gz":
75 with gzip.open(src_file, mode, encoding=encoding) as fh:
76 yield fh
77 else:
78 with src_file.open(mode, encoding=encoding) as fh:
79 yield fh
82def extract_file(src_file: StrPath, dst_dir: StrPath) -> None:
83 """Extracts the `src_file` into `dst_dir`.
85 If the file is not an archive, it will be copied to `dst_dir`. `dst_dir` will be created if it
86 does not exist.
88 Args:
89 src_file: Path to the file to unpack.
90 dst_dir: Path to the folder where to extract the file.
92 """
93 src_file = Path(src_file)
94 extensions = {"".join(src_file.suffixes[i:]) for i in range(0, len(src_file.suffixes))}
96 if extensions.intersection(SUPPORTED_ARCHIVE_FORMATS):
97 if sys.version_info >= (3, 12): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 shutil.unpack_archive(src_file, dst_dir, filter="data")
99 else:
100 shutil.unpack_archive(src_file, dst_dir)
101 else:
102 # Replicate the functionality of shutil.unpack_archive() by creating `dst_dir`
103 Path(dst_dir).mkdir(parents=True, exist_ok=True)
104 shutil.copy(src_file, dst_dir)
107def extract_file_cli() -> None:
108 """Entry-point for the `extract_file` method"""
109 parser = ArgumentParser(description="Extracts file to the given location.")
110 parser.add_argument_src_path("--src_file", required=True, help="Path to the file to unpack")
111 parser.add_argument_dst_path(
112 "--dst_dir", default=Path.cwd(), help="Path to the folder where to extract the file"
113 )
114 args = parser.parse_args()
115 extract_file(args.src_file, args.dst_dir)