Coverage for src/ensembl/utils/archive.py: 100%
31 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-06 14:10 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-06 14:10 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Utils for common IO operations over archive files, e.g. tar or gzip."""
17from __future__ import annotations
19__all__ = [
20 "SUPPORTED_ARCHIVE_FORMATS",
21 "open_gz_file",
22 "extract_file",
23]
25from contextlib import contextmanager
26import gzip
27from pathlib import Path
28import shutil
29from typing import Generator, TextIO
31from ensembl.utils import StrPath
32from ensembl.utils.argparse import ArgumentParser
35def _unpack_gz_files(src_file: StrPath, dst_dir: StrPath) -> None:
36 """Unpacks `src_file` to `dst_dir`.
38 Args:
39 src_file: File path to unpack (with ".gz" extension).
40 dst_dir: Directory path to unpack the file into.
42 """
43 # Remove '.gz' extension to create the destination file name
44 dst_file = Path(dst_dir) / Path(src_file).stem
45 with gzip.open(src_file, "rb") as f_in:
46 with dst_file.open("wb") as f_out:
47 shutil.copyfileobj(f_in, f_out)
50shutil.register_unpack_format("gzip", [".gz"], _unpack_gz_files, description="GZIP file")
52# Each registered format is a tuple (name, extensions, description)
53SUPPORTED_ARCHIVE_FORMATS = [ext for elem in shutil.get_unpack_formats() for ext in elem[1]]
56@contextmanager
57def open_gz_file(file_path: StrPath) -> Generator[TextIO, None, None]:
58 """Yields an open file object, even if the file is compressed with gzip.
60 The file is expected to contain a text, and this can be used with the usual "with".
62 Args:
63 file_path: A (single) file path to open.
65 """
66 src_file = Path(file_path)
67 if src_file.suffix == ".gz":
68 with gzip.open(src_file, "rt") as fh:
69 yield fh
70 else:
71 with src_file.open("rt") as fh:
72 yield fh
75def extract_file(src_file: StrPath, dst_dir: StrPath) -> None:
76 """Extracts the `src_file` into `dst_dir`.
78 If the file is not an archive, it will be copied to `dst_dir`. `dst_dir` will be created if it
79 does not exist.
81 Args:
82 src_file: Path to the file to unpack.
83 dst_dir: Path to the folder where to extract the file.
85 """
86 src_file = Path(src_file)
87 extensions = {"".join(src_file.suffixes[i:]) for i in range(0, len(src_file.suffixes))}
89 if extensions.intersection(SUPPORTED_ARCHIVE_FORMATS):
90 shutil.unpack_archive(src_file, dst_dir)
91 else:
92 # Replicate the functionality of shutil.unpack_archive() by creating `dst_dir`
93 Path(dst_dir).mkdir(parents=True, exist_ok=True)
94 shutil.copy(src_file, dst_dir)
97def extract_file_cli() -> None:
98 """Entry-point for the `extract_file` method"""
99 parser = ArgumentParser(description="Extracts file to the given location.")
100 parser.add_argument_src_path("--src_file", required=True, help="Path to the file to unpack")
101 parser.add_argument_dst_path(
102 "--dst_dir", default=Path.cwd(), help="Path to the folder where to extract the file"
103 )
104 args = parser.parse_args()
105 extract_file(args.src_file, args.dst_dir)