Coverage for src / ensembl / utils / archive.py: 100%

31 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 15:03 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Utils for common IO operations over archive files, e.g. tar or gzip.""" 

16 

17from __future__ import annotations 

18 

19__all__ = [ 

20 "SUPPORTED_ARCHIVE_FORMATS", 

21 "open_gz_file", 

22 "extract_file", 

23] 

24 

25from contextlib import contextmanager 

26import gzip 

27from pathlib import Path 

28import shutil 

29from typing import Generator, IO 

30 

31from ensembl.utils import StrPath 

32from ensembl.utils.argparse import ArgumentParser 

33 

34 

35def _unpack_gz_files(src_file: StrPath, dst_dir: StrPath) -> None: 

36 """Unpacks `src_file` to `dst_dir`. 

37 

38 Args: 

39 src_file: File path to unpack (with ".gz" extension). 

40 dst_dir: Directory path to unpack the file into. 

41 

42 """ 

43 # Remove '.gz' extension to create the destination file name 

44 dst_file = Path(dst_dir) / Path(src_file).stem 

45 with gzip.open(src_file, "rb") as f_in: 

46 with dst_file.open("wb") as f_out: 

47 shutil.copyfileobj(f_in, f_out) 

48 

49 

50shutil.register_unpack_format("gzip", [".gz"], _unpack_gz_files, description="GZIP file") 

51 

52# Each registered format is a tuple (name, extensions, description) 

53SUPPORTED_ARCHIVE_FORMATS = [ext for elem in shutil.get_unpack_formats() for ext in elem[1]] 

54 

55 

56@contextmanager 

57def open_gz_file( 

58 file_path: StrPath, mode: str = "rt", encoding: str = "utf-8" 

59) -> Generator[gzip.GzipFile | IO, None, None]: 

60 """Yields an open file object, even if the file is compressed with gzip. 

61 

62 The file is expected to contain a text, and this can be used with the usual "with". 

63 

64 Args: 

65 file_path: A (single) file path to open. 

66 mode: The mode in which the file is opened. 

67 encoding: The name of the encoding used to decode or encode the file. 

68 

69 """ 

70 src_file = Path(file_path) 

71 if src_file.suffix == ".gz": 

72 with gzip.open(src_file, mode, encoding=encoding) as fh: 

73 yield fh 

74 else: 

75 with src_file.open(mode, encoding=encoding) as fh: 

76 yield fh 

77 

78 

79def extract_file(src_file: StrPath, dst_dir: StrPath) -> None: 

80 """Extracts the `src_file` into `dst_dir`. 

81 

82 If the file is not an archive, it will be copied to `dst_dir`. `dst_dir` will be created if it 

83 does not exist. 

84 

85 Args: 

86 src_file: Path to the file to unpack. 

87 dst_dir: Path to the folder where to extract the file. 

88 

89 """ 

90 src_file = Path(src_file) 

91 extensions = {"".join(src_file.suffixes[i:]) for i in range(0, len(src_file.suffixes))} 

92 

93 if extensions.intersection(SUPPORTED_ARCHIVE_FORMATS): 

94 shutil.unpack_archive(src_file, dst_dir) 

95 else: 

96 # Replicate the functionality of shutil.unpack_archive() by creating `dst_dir` 

97 Path(dst_dir).mkdir(parents=True, exist_ok=True) 

98 shutil.copy(src_file, dst_dir) 

99 

100 

101def extract_file_cli() -> None: 

102 """Entry-point for the `extract_file` method""" 

103 parser = ArgumentParser(description="Extracts file to the given location.") 

104 parser.add_argument_src_path("--src_file", required=True, help="Path to the file to unpack") 

105 parser.add_argument_dst_path( 

106 "--dst_dir", default=Path.cwd(), help="Path to the folder where to extract the file" 

107 ) 

108 args = parser.parse_args() 

109 extract_file(args.src_file, args.dst_dir)