Coverage for src/ensembl/utils/archive.py: 100%

31 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-06 14:10 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Utils for common IO operations over archive files, e.g. tar or gzip.""" 

16 

17from __future__ import annotations 

18 

19__all__ = [ 

20 "SUPPORTED_ARCHIVE_FORMATS", 

21 "open_gz_file", 

22 "extract_file", 

23] 

24 

25from contextlib import contextmanager 

26import gzip 

27from pathlib import Path 

28import shutil 

29from typing import Generator, TextIO 

30 

31from ensembl.utils import StrPath 

32from ensembl.utils.argparse import ArgumentParser 

33 

34 

35def _unpack_gz_files(src_file: StrPath, dst_dir: StrPath) -> None: 

36 """Unpacks `src_file` to `dst_dir`. 

37 

38 Args: 

39 src_file: File path to unpack (with ".gz" extension). 

40 dst_dir: Directory path to unpack the file into. 

41 

42 """ 

43 # Remove '.gz' extension to create the destination file name 

44 dst_file = Path(dst_dir) / Path(src_file).stem 

45 with gzip.open(src_file, "rb") as f_in: 

46 with dst_file.open("wb") as f_out: 

47 shutil.copyfileobj(f_in, f_out) 

48 

49 

50shutil.register_unpack_format("gzip", [".gz"], _unpack_gz_files, description="GZIP file") 

51 

52# Each registered format is a tuple (name, extensions, description) 

53SUPPORTED_ARCHIVE_FORMATS = [ext for elem in shutil.get_unpack_formats() for ext in elem[1]] 

54 

55 

56@contextmanager 

57def open_gz_file(file_path: StrPath) -> Generator[TextIO, None, None]: 

58 """Yields an open file object, even if the file is compressed with gzip. 

59 

60 The file is expected to contain a text, and this can be used with the usual "with". 

61 

62 Args: 

63 file_path: A (single) file path to open. 

64 

65 """ 

66 src_file = Path(file_path) 

67 if src_file.suffix == ".gz": 

68 with gzip.open(src_file, "rt") as fh: 

69 yield fh 

70 else: 

71 with src_file.open("rt") as fh: 

72 yield fh 

73 

74 

75def extract_file(src_file: StrPath, dst_dir: StrPath) -> None: 

76 """Extracts the `src_file` into `dst_dir`. 

77 

78 If the file is not an archive, it will be copied to `dst_dir`. `dst_dir` will be created if it 

79 does not exist. 

80 

81 Args: 

82 src_file: Path to the file to unpack. 

83 dst_dir: Path to the folder where to extract the file. 

84 

85 """ 

86 src_file = Path(src_file) 

87 extensions = {"".join(src_file.suffixes[i:]) for i in range(0, len(src_file.suffixes))} 

88 

89 if extensions.intersection(SUPPORTED_ARCHIVE_FORMATS): 

90 shutil.unpack_archive(src_file, dst_dir) 

91 else: 

92 # Replicate the functionality of shutil.unpack_archive() by creating `dst_dir` 

93 Path(dst_dir).mkdir(parents=True, exist_ok=True) 

94 shutil.copy(src_file, dst_dir) 

95 

96 

97def extract_file_cli() -> None: 

98 """Entry-point for the `extract_file` method""" 

99 parser = ArgumentParser(description="Extracts file to the given location.") 

100 parser.add_argument_src_path("--src_file", required=True, help="Path to the file to unpack") 

101 parser.add_argument_dst_path( 

102 "--dst_dir", default=Path.cwd(), help="Path to the folder where to extract the file" 

103 ) 

104 args = parser.parse_args() 

105 extract_file(args.src_file, args.dst_dir)