Coverage for src / ensembl / utils / archive.py: 95%

34 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 10:45 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Utils for common IO operations over archive files, e.g. tar or gzip.""" 

16 

17from __future__ import annotations 

18 

19__all__ = [ 

20 "SUPPORTED_ARCHIVE_FORMATS", 

21 "open_gz_file", 

22 "extract_file", 

23] 

24 

25from contextlib import contextmanager 

26import gzip 

27from pathlib import Path 

28import shutil 

29import sys 

30from typing import Any, Generator, IO 

31 

32from ensembl.utils import StrPath 

33from ensembl.utils.argparse import ArgumentParser 

34 

35 

36def _unpack_gz_files( 

37 src_file: StrPath, dst_dir: StrPath, **kwargs: Any # pylint: disable=unused-argument 

38) -> None: 

39 """Unpacks `src_file` to `dst_dir`. 

40 

41 Args: 

42 src_file: File path to unpack (with ".gz" extension). 

43 dst_dir: Directory path to unpack the file into. 

44 

45 """ 

46 # Remove '.gz' extension to create the destination file name 

47 dst_file = Path(dst_dir) / Path(src_file).stem 

48 with gzip.open(src_file, "rb") as f_in: 

49 with dst_file.open("wb") as f_out: 

50 shutil.copyfileobj(f_in, f_out) 

51 

52 

53shutil.register_unpack_format("gzip", [".gz"], _unpack_gz_files, description="GZIP file") 

54 

55# Each registered format is a tuple (name, extensions, description) 

56SUPPORTED_ARCHIVE_FORMATS = [ext for elem in shutil.get_unpack_formats() for ext in elem[1]] 

57 

58 

59@contextmanager 

60def open_gz_file( 

61 file_path: StrPath, mode: str = "rt", encoding: str = "utf-8" 

62) -> Generator[gzip.GzipFile | IO, None, None]: 

63 """Yields an open file object, even if the file is compressed with gzip. 

64 

65 The file is expected to contain a text, and this can be used with the usual "with". 

66 

67 Args: 

68 file_path: A (single) file path to open. 

69 mode: The mode in which the file is opened. 

70 encoding: The name of the encoding used to decode or encode the file. 

71 

72 """ 

73 src_file = Path(file_path) 

74 if src_file.suffix == ".gz": 

75 with gzip.open(src_file, mode, encoding=encoding) as fh: 

76 yield fh 

77 else: 

78 with src_file.open(mode, encoding=encoding) as fh: 

79 yield fh 

80 

81 

82def extract_file(src_file: StrPath, dst_dir: StrPath) -> None: 

83 """Extracts the `src_file` into `dst_dir`. 

84 

85 If the file is not an archive, it will be copied to `dst_dir`. `dst_dir` will be created if it 

86 does not exist. 

87 

88 Args: 

89 src_file: Path to the file to unpack. 

90 dst_dir: Path to the folder where to extract the file. 

91 

92 """ 

93 src_file = Path(src_file) 

94 extensions = {"".join(src_file.suffixes[i:]) for i in range(0, len(src_file.suffixes))} 

95 

96 if extensions.intersection(SUPPORTED_ARCHIVE_FORMATS): 

97 if sys.version_info >= (3, 12): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 shutil.unpack_archive(src_file, dst_dir, filter="data") 

99 else: 

100 shutil.unpack_archive(src_file, dst_dir) 

101 else: 

102 # Replicate the functionality of shutil.unpack_archive() by creating `dst_dir` 

103 Path(dst_dir).mkdir(parents=True, exist_ok=True) 

104 shutil.copy(src_file, dst_dir) 

105 

106 

107def extract_file_cli() -> None: 

108 """Entry-point for the `extract_file` method""" 

109 parser = ArgumentParser(description="Extracts file to the given location.") 

110 parser.add_argument_src_path("--src_file", required=True, help="Path to the file to unpack") 

111 parser.add_argument_dst_path( 

112 "--dst_dir", default=Path.cwd(), help="Path to the folder where to extract the file" 

113 ) 

114 args = parser.parse_args() 

115 extract_file(args.src_file, args.dst_dir)