Skip to content

manifest

ensembl.io.genomio.manifest

Manifest files handling module.

StatsLengths = dict[str, int] module-attribute

ArgumentParser

Bases: ArgumentParser

Extends argparse.ArgumentParser with additional methods and functionality.

The default behaviour of the help text will be to display the default values on every non-required argument, i.e. optional arguments with required=False.

Source code in ensembl/utils/argparse.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
class ArgumentParser(argparse.ArgumentParser):
    """Extends `argparse.ArgumentParser` with additional methods and functionality.

    The default behaviour of the help text will be to display the default values on every non-required
    argument, i.e. optional arguments with `required=False`.

    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Extends the base class to include the information about default argument values by default."""
        super().__init__(*args, **kwargs)
        self.formatter_class = argparse.ArgumentDefaultsHelpFormatter

    def _validate_src_path(self, src_path: StrPath) -> Path:
        """Returns the path if exists and it is readable, raises an error through the parser otherwise.

        Args:
            src_path: File or directory path to check.

        """
        src_path = Path(src_path)
        if not src_path.exists():
            self.error(f"'{src_path}' not found")
        elif not os.access(src_path, os.R_OK):
            self.error(f"'{src_path}' not readable")
        return src_path

    def _validate_dst_path(self, dst_path: StrPath, exists_ok: bool = False) -> Path:
        """Returns the path if it is writable, raises an error through the parser otherwise.

        Args:
            dst_path: File or directory path to check.
            exists_ok: Do not raise an error during parsing if the destination path already exists.

        """
        dst_path = Path(dst_path)
        if dst_path.exists():
            if os.access(dst_path, os.W_OK):
                if exists_ok:
                    return dst_path
                self.error(f"'{dst_path}' already exists")
            else:
                self.error(f"'{dst_path}' is not writable")
        # Check if the first parent directory that exists is writable
        for parent_path in dst_path.parents:
            if parent_path.exists():
                if not os.access(parent_path, os.W_OK):
                    self.error(f"'{dst_path}' is not writable")
                break
        return dst_path

    def _validate_number(
        self,
        value: str,
        value_type: Callable[[str], int | float],
        min_value: int | float | None,
        max_value: int | float | None,
    ) -> int | float:
        """Returns the numeric value if it is of the expected type and it is within the specified range.

        Args:
            value: String representation of numeric value to check.
            value_type: Expected type of the numeric value.
            min_value: Minimum value constrain. If `None`, no minimum value constrain.
            max_value: Maximum value constrain. If `None`, no maximum value constrain.

        """
        # Check if the string representation can be converted to the expected type
        try:
            result = value_type(value)
        except (TypeError, ValueError):
            self.error(f"invalid {value_type.__name__} value: {value}")
        # Check if numeric value is within range
        if (min_value is not None) and (result < min_value):
            self.error(f"{value} is lower than minimum value ({min_value})")
        if (max_value is not None) and (result > max_value):
            self.error(f"{value} is greater than maximum value ({max_value})")
        return result

    def add_argument(self, *args: Any, **kwargs: Any) -> None:  # type: ignore[override]
        """Extends the parent function by excluding the default value in the help text when not provided.

        Only applied to required arguments without a default value, i.e. positional arguments or optional
        arguments with `required=True`.

        """
        if kwargs.get("required", False):
            kwargs.setdefault("default", argparse.SUPPRESS)
        super().add_argument(*args, **kwargs)

    def add_argument_src_path(self, *args: Any, **kwargs: Any) -> None:
        """Adds `pathlib.Path` argument, checking if it exists and it is readable at parsing time.

        If "metavar" is not defined, it is added with "PATH" as value to improve help text readability.

        """
        kwargs.setdefault("metavar", "PATH")
        kwargs["type"] = self._validate_src_path
        self.add_argument(*args, **kwargs)

    def add_argument_dst_path(self, *args: Any, exists_ok: bool = True, **kwargs: Any) -> None:
        """Adds `pathlib.Path` argument, checking if it is writable at parsing time.

        If "metavar" is not defined it is added with "PATH" as value to improve help text readability.

        Args:
            exists_ok: Do not raise an error if the destination path already exists.

        """
        kwargs.setdefault("metavar", "PATH")
        kwargs["type"] = lambda x: self._validate_dst_path(x, exists_ok)
        self.add_argument(*args, **kwargs)

    def add_argument_url(self, *args: Any, **kwargs: Any) -> None:
        """Adds `sqlalchemy.engine.URL` argument.

        If "metavar" is not defined it is added with "URI" as value to improve help text readability.

        """
        kwargs.setdefault("metavar", "URI")
        kwargs["type"] = make_url
        self.add_argument(*args, **kwargs)

    # pylint: disable=redefined-builtin
    def add_numeric_argument(
        self,
        *args: Any,
        type: Callable[[str], int | float] = float,
        min_value: int | float | None = None,
        max_value: int | float | None = None,
        **kwargs: Any,
    ) -> None:
        """Adds a numeric argument with constrains on its type and its minimum or maximum value.

        Note that the default value (if defined) is not checked unless the argument is an optional argument
        and no value is provided in the command line.

        Args:
            type: Type to convert the argument value to when parsing.
            min_value: Minimum value constrain. If `None`, no minimum value constrain.
            max_value: Maximum value constrain. If `None`, no maximum value constrain.

        """
        # If both minimum and maximum values are defined, ensure min_value <= max_value
        if (min_value is not None) and (max_value is not None) and (min_value > max_value):
            raise ArgumentError("minimum value is greater than maximum value")
        # Add lambda function to check numeric constrains when parsing argument
        kwargs["type"] = lambda x: self._validate_number(x, type, min_value, max_value)
        self.add_argument(*args, **kwargs)

    # pylint: disable=redefined-builtin
    def add_server_arguments(
        self, prefix: str = "", include_database: bool = False, help: str | None = None
    ) -> None:
        """Adds the usual set of arguments needed to connect to a server, i.e. `--host`, `--port`, `--user`
        and `--password` (optional).

        Note that the parser will assume this is a MySQL server.

        Args:
            prefix: Prefix to add the each argument, e.g. if prefix is `src_`, the arguments will be
                `--src_host`, etc.
            include_database: Include `--database` argument.
            help: Description message to include for this set of arguments.

        """
        group = self.add_argument_group(f"{prefix}server connection arguments", description=help)
        group.add_argument(
            f"--{prefix}host", required=True, metavar="HOST", default=argparse.SUPPRESS, help="host name"
        )
        group.add_argument(
            f"--{prefix}port",
            required=True,
            type=int,
            metavar="PORT",
            default=argparse.SUPPRESS,
            help="port number",
        )
        group.add_argument(
            f"--{prefix}user", required=True, metavar="USER", default=argparse.SUPPRESS, help="user name"
        )
        group.add_argument(f"--{prefix}password", metavar="PWD", help="host password")
        if include_database:
            group.add_argument(
                f"--{prefix}database",
                required=True,
                metavar="NAME",
                default=argparse.SUPPRESS,
                help="database name",
            )

    def add_log_arguments(self, add_log_file: bool = False) -> None:
        """Adds the usual set of arguments required to set and initialise a logging system.

        The current set includes a mutually exclusive group for the default logging level: `--verbose`,
        `--debug` or `--log LEVEL`.

        Args:
            add_log_file: Add arguments to allow storing messages into a file, i.e. `--log_file` and
                `--log_file_level`.

        """
        # Define the list of log levels available
        log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
        # NOTE: from 3.11 this list can be changed to: logging.getLevelNamesMapping().keys()
        # Create logging arguments group
        group = self.add_argument_group("logging arguments")
        # Add 3 mutually exclusive options to set the logging level
        subgroup = group.add_mutually_exclusive_group()
        subgroup.add_argument(
            "-v",
            "--verbose",
            action="store_const",
            const="INFO",
            dest="log_level",
            help="verbose mode, i.e. 'INFO' log level",
        )
        subgroup.add_argument(
            "--debug",
            action="store_const",
            const="DEBUG",
            dest="log_level",
            help="debugging mode, i.e. 'DEBUG' log level",
        )
        subgroup.add_argument(
            "--log",
            choices=log_levels,
            type=str.upper,
            default="WARNING",
            metavar="LEVEL",
            dest="log_level",
            help="level of the events to track: %(choices)s",
        )
        subgroup.set_defaults(log_level="WARNING")
        if add_log_file:
            # Add log file-related arguments
            group.add_argument(
                "--log_file",
                type=lambda x: self._validate_dst_path(x, exists_ok=True),
                metavar="PATH",
                default=None,
                help="log file path",
            )
            group.add_argument(
                "--log_file_level",
                choices=log_levels,
                type=str.upper,
                default="DEBUG",
                metavar="LEVEL",
                help="level of the events to track in the log file: %(choices)s",
            )

    def parse_args(self, *args: Any, **kwargs: Any) -> argparse.Namespace:  # type: ignore[override]
        """Extends the parent function by adding a new URL argument for every server group added.

        The type of this new argument will be `sqlalchemy.engine.URL`. It also logs all the parsed
        arguments for debugging purposes when logging arguments have been added.

        """
        arguments = super().parse_args(*args, **kwargs)
        # Build and add an sqlalchemy.engine.URL object for every server group added
        pattern = re.compile(r"([\w-]*)host$")
        server_prefixes = [x.group(1) for x in map(pattern.match, vars(arguments)) if x]
        for prefix in server_prefixes:
            # Raise an error rather than overwriting when the URL argument is already present
            if f"{prefix}url" in arguments:
                self.error(f"argument '{prefix}url' is already present")
            try:
                server_url = URL.create(
                    "mysql",
                    getattr(arguments, f"{prefix}user"),
                    getattr(arguments, f"{prefix}password"),
                    getattr(arguments, f"{prefix}host"),
                    getattr(arguments, f"{prefix}port"),
                    getattr(arguments, f"{prefix}database", None),
                )
            except AttributeError:
                # Not a database server host argument
                continue
            setattr(arguments, f"{prefix}url", server_url)
        return arguments

formatter_class = argparse.ArgumentDefaultsHelpFormatter instance-attribute

add_argument(*args, **kwargs)

Extends the parent function by excluding the default value in the help text when not provided.

Only applied to required arguments without a default value, i.e. positional arguments or optional arguments with required=True.

Source code in ensembl/utils/argparse.py
132
133
134
135
136
137
138
139
140
141
def add_argument(self, *args: Any, **kwargs: Any) -> None:  # type: ignore[override]
    """Extends the parent function by excluding the default value in the help text when not provided.

    Only applied to required arguments without a default value, i.e. positional arguments or optional
    arguments with `required=True`.

    """
    if kwargs.get("required", False):
        kwargs.setdefault("default", argparse.SUPPRESS)
    super().add_argument(*args, **kwargs)

add_argument_dst_path(*args, exists_ok=True, **kwargs)

Adds pathlib.Path argument, checking if it is writable at parsing time.

If "metavar" is not defined it is added with "PATH" as value to improve help text readability.

Parameters:

Name Type Description Default
exists_ok bool

Do not raise an error if the destination path already exists.

True
Source code in ensembl/utils/argparse.py
153
154
155
156
157
158
159
160
161
162
163
164
def add_argument_dst_path(self, *args: Any, exists_ok: bool = True, **kwargs: Any) -> None:
    """Adds `pathlib.Path` argument, checking if it is writable at parsing time.

    If "metavar" is not defined it is added with "PATH" as value to improve help text readability.

    Args:
        exists_ok: Do not raise an error if the destination path already exists.

    """
    kwargs.setdefault("metavar", "PATH")
    kwargs["type"] = lambda x: self._validate_dst_path(x, exists_ok)
    self.add_argument(*args, **kwargs)

add_argument_src_path(*args, **kwargs)

Adds pathlib.Path argument, checking if it exists and it is readable at parsing time.

If "metavar" is not defined, it is added with "PATH" as value to improve help text readability.

Source code in ensembl/utils/argparse.py
143
144
145
146
147
148
149
150
151
def add_argument_src_path(self, *args: Any, **kwargs: Any) -> None:
    """Adds `pathlib.Path` argument, checking if it exists and it is readable at parsing time.

    If "metavar" is not defined, it is added with "PATH" as value to improve help text readability.

    """
    kwargs.setdefault("metavar", "PATH")
    kwargs["type"] = self._validate_src_path
    self.add_argument(*args, **kwargs)

add_argument_url(*args, **kwargs)

Adds sqlalchemy.engine.URL argument.

If "metavar" is not defined it is added with "URI" as value to improve help text readability.

Source code in ensembl/utils/argparse.py
166
167
168
169
170
171
172
173
174
def add_argument_url(self, *args: Any, **kwargs: Any) -> None:
    """Adds `sqlalchemy.engine.URL` argument.

    If "metavar" is not defined it is added with "URI" as value to improve help text readability.

    """
    kwargs.setdefault("metavar", "URI")
    kwargs["type"] = make_url
    self.add_argument(*args, **kwargs)

add_log_arguments(add_log_file=False)

Adds the usual set of arguments required to set and initialise a logging system.

The current set includes a mutually exclusive group for the default logging level: --verbose, --debug or --log LEVEL.

Parameters:

Name Type Description Default
add_log_file bool

Add arguments to allow storing messages into a file, i.e. --log_file and --log_file_level.

False
Source code in ensembl/utils/argparse.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def add_log_arguments(self, add_log_file: bool = False) -> None:
    """Adds the usual set of arguments required to set and initialise a logging system.

    The current set includes a mutually exclusive group for the default logging level: `--verbose`,
    `--debug` or `--log LEVEL`.

    Args:
        add_log_file: Add arguments to allow storing messages into a file, i.e. `--log_file` and
            `--log_file_level`.

    """
    # Define the list of log levels available
    log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
    # NOTE: from 3.11 this list can be changed to: logging.getLevelNamesMapping().keys()
    # Create logging arguments group
    group = self.add_argument_group("logging arguments")
    # Add 3 mutually exclusive options to set the logging level
    subgroup = group.add_mutually_exclusive_group()
    subgroup.add_argument(
        "-v",
        "--verbose",
        action="store_const",
        const="INFO",
        dest="log_level",
        help="verbose mode, i.e. 'INFO' log level",
    )
    subgroup.add_argument(
        "--debug",
        action="store_const",
        const="DEBUG",
        dest="log_level",
        help="debugging mode, i.e. 'DEBUG' log level",
    )
    subgroup.add_argument(
        "--log",
        choices=log_levels,
        type=str.upper,
        default="WARNING",
        metavar="LEVEL",
        dest="log_level",
        help="level of the events to track: %(choices)s",
    )
    subgroup.set_defaults(log_level="WARNING")
    if add_log_file:
        # Add log file-related arguments
        group.add_argument(
            "--log_file",
            type=lambda x: self._validate_dst_path(x, exists_ok=True),
            metavar="PATH",
            default=None,
            help="log file path",
        )
        group.add_argument(
            "--log_file_level",
            choices=log_levels,
            type=str.upper,
            default="DEBUG",
            metavar="LEVEL",
            help="level of the events to track in the log file: %(choices)s",
        )

add_numeric_argument(*args, type=float, min_value=None, max_value=None, **kwargs)

Adds a numeric argument with constrains on its type and its minimum or maximum value.

Note that the default value (if defined) is not checked unless the argument is an optional argument and no value is provided in the command line.

Parameters:

Name Type Description Default
type Callable[[str], int | float]

Type to convert the argument value to when parsing.

float
min_value int | float | None

Minimum value constrain. If None, no minimum value constrain.

None
max_value int | float | None

Maximum value constrain. If None, no maximum value constrain.

None
Source code in ensembl/utils/argparse.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def add_numeric_argument(
    self,
    *args: Any,
    type: Callable[[str], int | float] = float,
    min_value: int | float | None = None,
    max_value: int | float | None = None,
    **kwargs: Any,
) -> None:
    """Adds a numeric argument with constrains on its type and its minimum or maximum value.

    Note that the default value (if defined) is not checked unless the argument is an optional argument
    and no value is provided in the command line.

    Args:
        type: Type to convert the argument value to when parsing.
        min_value: Minimum value constrain. If `None`, no minimum value constrain.
        max_value: Maximum value constrain. If `None`, no maximum value constrain.

    """
    # If both minimum and maximum values are defined, ensure min_value <= max_value
    if (min_value is not None) and (max_value is not None) and (min_value > max_value):
        raise ArgumentError("minimum value is greater than maximum value")
    # Add lambda function to check numeric constrains when parsing argument
    kwargs["type"] = lambda x: self._validate_number(x, type, min_value, max_value)
    self.add_argument(*args, **kwargs)

add_server_arguments(prefix='', include_database=False, help=None)

Adds the usual set of arguments needed to connect to a server, i.e. --host, --port, --user and --password (optional).

Note that the parser will assume this is a MySQL server.

Parameters:

Name Type Description Default
prefix str

Prefix to add the each argument, e.g. if prefix is src_, the arguments will be --src_host, etc.

''
include_database bool

Include --database argument.

False
help str | None

Description message to include for this set of arguments.

None
Source code in ensembl/utils/argparse.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def add_server_arguments(
    self, prefix: str = "", include_database: bool = False, help: str | None = None
) -> None:
    """Adds the usual set of arguments needed to connect to a server, i.e. `--host`, `--port`, `--user`
    and `--password` (optional).

    Note that the parser will assume this is a MySQL server.

    Args:
        prefix: Prefix to add the each argument, e.g. if prefix is `src_`, the arguments will be
            `--src_host`, etc.
        include_database: Include `--database` argument.
        help: Description message to include for this set of arguments.

    """
    group = self.add_argument_group(f"{prefix}server connection arguments", description=help)
    group.add_argument(
        f"--{prefix}host", required=True, metavar="HOST", default=argparse.SUPPRESS, help="host name"
    )
    group.add_argument(
        f"--{prefix}port",
        required=True,
        type=int,
        metavar="PORT",
        default=argparse.SUPPRESS,
        help="port number",
    )
    group.add_argument(
        f"--{prefix}user", required=True, metavar="USER", default=argparse.SUPPRESS, help="user name"
    )
    group.add_argument(f"--{prefix}password", metavar="PWD", help="host password")
    if include_database:
        group.add_argument(
            f"--{prefix}database",
            required=True,
            metavar="NAME",
            default=argparse.SUPPRESS,
            help="database name",
        )

parse_args(*args, **kwargs)

Extends the parent function by adding a new URL argument for every server group added.

The type of this new argument will be sqlalchemy.engine.URL. It also logs all the parsed arguments for debugging purposes when logging arguments have been added.

Source code in ensembl/utils/argparse.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def parse_args(self, *args: Any, **kwargs: Any) -> argparse.Namespace:  # type: ignore[override]
    """Extends the parent function by adding a new URL argument for every server group added.

    The type of this new argument will be `sqlalchemy.engine.URL`. It also logs all the parsed
    arguments for debugging purposes when logging arguments have been added.

    """
    arguments = super().parse_args(*args, **kwargs)
    # Build and add an sqlalchemy.engine.URL object for every server group added
    pattern = re.compile(r"([\w-]*)host$")
    server_prefixes = [x.group(1) for x in map(pattern.match, vars(arguments)) if x]
    for prefix in server_prefixes:
        # Raise an error rather than overwriting when the URL argument is already present
        if f"{prefix}url" in arguments:
            self.error(f"argument '{prefix}url' is already present")
        try:
            server_url = URL.create(
                "mysql",
                getattr(arguments, f"{prefix}user"),
                getattr(arguments, f"{prefix}password"),
                getattr(arguments, f"{prefix}host"),
                getattr(arguments, f"{prefix}port"),
                getattr(arguments, f"{prefix}database", None),
            )
        except AttributeError:
            # Not a database server host argument
            continue
        setattr(arguments, f"{prefix}url", server_url)
    return arguments

BiotypeCounter

A counter for a given biotype, given a list of features.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class BiotypeCounter:
    """A counter for a given biotype, given a list of features."""

    def __init__(self, count: int = 0, ids: Optional[Set[str]] = None, example: Optional[str] = None) -> None:
        self.count: int = count
        if ids is None:
            ids = set()
        self.ids: Set[str] = ids
        if example is None:
            example = ""
        self.example: str = example

    def add_id(self, feature_id: str) -> None:
        """Add a feature to the counter.

        Args:
            feature_id (str): Feature id to add.
        """
        self.count += 1
        self.ids.add(feature_id)

    def unique_count(self) -> int:
        """Total number feature ids added to the counter so far.

        Returns:
            int: number of features in the counter.
        """
        return len(self.ids)

count = count instance-attribute

example = example instance-attribute

ids = ids instance-attribute

add_id(feature_id)

Add a feature to the counter.

Parameters:

Name Type Description Default
feature_id str

Feature id to add.

required
Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
51
52
53
54
55
56
57
58
def add_id(self, feature_id: str) -> None:
    """Add a feature to the counter.

    Args:
        feature_id (str): Feature id to add.
    """
    self.count += 1
    self.ids.add(feature_id)

unique_count()

Total number feature ids added to the counter so far.

Returns:

Name Type Description
int int

number of features in the counter.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
60
61
62
63
64
65
66
def unique_count(self) -> int:
    """Total number feature ids added to the counter so far.

    Returns:
        int: number of features in the counter.
    """
    return len(self.ids)

IntegrityTool

Check the integrity of sequence and annotation files in the genome

Source code in src/python/ensembl/io/genomio/manifest/check_integrity.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
class IntegrityTool:
    """Check the integrity of sequence and annotation files in the genome"""

    def __init__(
        self,
        manifest_file: Path,
        ignore_final_stops: bool = False,
        no_fail: bool = False,
    ) -> None:
        self.manifest = ManifestStats(manifest_file)
        self.ignore_final_stops = False
        self.set_ignore_final_stops(ignore_final_stops)
        self.errors: list[str] = []
        self.no_fail = no_fail

    def add_errors(self, errors: list[str] | str) -> None:
        """Store the given errors (list or single string) in the list of all errors."""
        if isinstance(errors, str):
            self.errors.append(errors)
        else:
            self.errors += errors

    def check_integrity(self) -> None:
        """Load files listed in the manifest.json and check the integrity.
        Check if the files are correct by verifying the MD5 hash.
        Check if translation, functional annotation and sequence region ids
        and lengths are consistent with the information in gff.
        Compare sequence length from fasta_dna file to seq_region.json metadata.
        """

        # Load the manifest integrity counts
        manifest = self.manifest
        manifest.prepare_integrity_data()

        genome = manifest.genome

        dna = manifest.get_lengths("dna_sequences")
        pep = manifest.get_lengths("peptide_sequences")
        seq_lengths = manifest.get_lengths("seq_regions")
        seq_circular = manifest.get_circular("seq_regions")

        agp_seqr = manifest.get_lengths("agp")

        # Then, run the checks
        self._check_genome(genome)

        if self.manifest.errors:
            errors_str = "\n".join(self.manifest.errors)
            raise InvalidIntegrityError(f"Manifest files parsing failed:\n{errors_str}")

        # Check gff3
        if manifest.has_lengths("gff3_genes"):
            gff_genes = manifest.get_lengths("gff3_genes")
            gff_seq_regions = manifest.get_lengths("gff3_seq_regions")
            gff_translations = manifest.get_lengths("gff3_translations")
            gff_all_translations = manifest.get_lengths("gff3_all_translations")
            gff_transposable_elements = manifest.get_lengths("gff3_transposable_elements")

            ann_genes = manifest.get_lengths("ann_genes")
            ann_translations = manifest.get_lengths("ann_translations")
            ann_transposable_elements = manifest.get_lengths("ann_transposable_elements")

            # Check fasta_pep.fa integrity
            # The sequence length and id retrieved from the fasta_pep file
            # and compared to the translated CDS id and length in the gff
            # We do not compare the peptide lengths because of sequence edits
            if pep:
                tr_errors = self.check_lengths(
                    pep, gff_translations, "Fasta translations vs gff", special_diff=True
                )
                if len(tr_errors) > 0:
                    # The pseudo CDSs are included in this check
                    # Pseudo CDSs are not translated, if the pseudo translation ids are not ignored
                    # in the gff it will give an error
                    tr_errors_all = self.check_lengths(
                        pep,
                        gff_all_translations,
                        "Fasta translations vs gff (include pseudo CDS)",
                        special_diff=True,
                    )
                    if tr_errors_all:
                        self.add_errors(tr_errors)
                        self.add_errors(tr_errors_all)

            # Check functional_annotation.json integrity
            # Gene ids, translated CDS ids and translated CDSs
            # including pseudogenes are compared to the gff
            if ann_genes:
                self.add_errors(self.check_ids(ann_genes, gff_genes, "Gene ids metadata vs gff"))
                tr_id_errors = self.check_ids(
                    ann_translations, gff_translations, "Translation ids metadata vs gff"
                )
                if tr_id_errors:
                    tr_id_errors_all = self.check_ids(
                        ann_translations,
                        gff_all_translations,
                        "Translation ids metadata vs gff (include pseudo CDS)",
                    )
                    if tr_id_errors_all:
                        self.add_errors(tr_id_errors)
                        self.add_errors(tr_id_errors_all)
                self.add_errors(
                    self.check_ids(
                        ann_transposable_elements,
                        gff_transposable_elements,
                        "TE ids metadata vs gff",
                    )
                )

            self.check_seq_region_lengths(
                seq_lengths, gff_seq_regions, "seq_regions JSON vs GFF3 lengths", seq_circular
            )

        self.check_seq_region_lengths(seq_lengths, dna, "seq_regions JSON vs DNA lengths")
        self.check_seq_region_lengths(seq_lengths, agp_seqr, "seq_regions JSON vs AGPs lengths")

        if self.errors:
            errors_str = "\n".join(self.errors)
            message = f"Integrity test failed:\n{errors_str}"
            if self.no_fail:
                print(message)
            else:
                raise InvalidIntegrityError(message)

    def set_ignore_final_stops(self, ignore_final_stops: bool) -> None:
        """Set ignore_final_stops (when calculating peptide length) for this tool and the manifest."""
        self.ignore_final_stops = ignore_final_stops
        self.manifest.ignore_final_stops = ignore_final_stops

    def _check_genome(self, genome: dict[str, Any]) -> None:
        """Check if the accession is correct in genome.json."""
        genome_accession = genome.get("assembly", {}).get("accession", "")
        if not genome_accession:
            return
        if not re.match(r"GC[AF]_\d{9}(\.\d+)?", genome_accession):
            self.add_errors(f"Genome assembly accession is wrong: '{genome_accession}'")

    def check_ids(self, list1: dict[str, Any], list2: dict[str, Any], name: str) -> list[str]:
        """Compare the ids in list1 and list2.

        Args:
            list1: Sequence IDs retrieved from `functional.json`.
            list2: Sequence IDs retrieved from the GFF3 file.
            name: Source name.

        Return:
            List of message errors of sequence IDs found only in one of the lists provided.
        """

        only1 = []
        only2 = []
        common = []

        for item_id in list1:
            if item_id in list2:
                common.append(item_id)
            else:
                only1.append(item_id)
        for item_id in list2:
            if item_id not in common:
                only2.append(item_id)

        errors = []
        if common:
            logging.info(f"{len(common)} common elements in {name}")
        if only1:
            errors.append(f"{len(only1)} only in first list in {name} (first: {only1[0]})")
            logging.debug(f"{len(only1)} only in first list in {name}")
        if only2:
            errors.append(f"{len(only2)} only in second list in {name} (first: {only2[0]})")
            logging.debug(f"{len(only1)} only in second list in {name}")

        return errors

    def check_lengths(
        self,
        list1: dict[str, int],
        list2: dict[str, int],
        name: str,
        *,
        allowed_len_diff: int | None = None,
        special_diff: bool = False,
    ) -> list[str]:
        """Check the difference in ids and length between list1 and list2.
            There are a few special cases here where we allow a certain asymmetry
            by changing the values of the arguments.

        Args:
            list1: dict containing length and id of the sequence from fasta files.
            list2: dict containing length and id in the retrieved from the gff.
            name:  string

        allowed_len_diff : None to to not accept differences in length between list1 and list2.
            The value can be changed based on how much difference in sequence length we are wanting to accept.

        special_diff: set as False when no special length difference is expected between the lists.
                    This can be changed if we want to report common sequences with 1 BP difference.

        Returns:
            Error if there is a difference in length or ids between the lists.
        """

        # check list differences, checks if abs(values diff) < allowed_len_diff

        set1 = frozenset(list1)
        set2 = frozenset(list2)
        list1_2 = list(set1 - set2)
        list2_1 = list(set2 - set1)

        errors = []
        if len(list1_2) > 0:
            errors.append(f"{name}: {len(list1_2)} from the first list only (i.e. {list1_2[0]})")
        if len(list2_1) > 0:
            errors.append(f"{name}: {len(list2_1)} from the second list only (i.e. {list2_1[0]})")

        common_len = 0
        if allowed_len_diff is None:
            common_len = len(set1 & set2)
        else:
            # check for the sequence length difference
            diff_len_list: list[str] = []
            diff_len_special_list: list[str] = []
            for e in set1 & set2:
                dl12 = list1[e] - list2[e]
                if abs(dl12) <= allowed_len_diff:
                    common_len += 1
                else:
                    _dlist = diff_len_list
                    # Special case: 1 AA /BP shorter,
                    #   so assuming the stop codon is not included in the CDS (when it should be)
                    if dl12 == 1 and special_diff:
                        _dlist = diff_len_special_list
                    _dlist.append(f"{e}: {list1[e]}, {list2[e]}")
            if diff_len_special_list:
                errors.append(
                    (
                        f"{len(diff_len_special_list)} common elements with one BP/AA length diff for {name}"
                        f"(e.g. {diff_len_special_list[0]})"
                    )
                )
            if diff_len_list:
                errors.append(
                    (
                        f"{len(diff_len_list)} common elements with length diff for {name}"
                        f"(e.g. {diff_len_list[0]})"
                    )
                )
        if common_len > 0:
            logging.warning(f"{common_len} common elements between lists for {name}")

        return errors

    def check_seq_region_lengths(
        self,
        seqrs: dict[str, Any] | None,
        feats: dict[str, Any] | None,
        name: str,
        circular: dict[str, Any] | None = None,
    ) -> None:
        """Check the integrity of seq_region.json file by comparing the length of the sequence
            to fasta files and the gff.

            Seq_region file is in json format containing the metadata of the sequence.
            It contains sequence id, length, location and the synonyms for the sequence name
            from different sources.

        Args:
            seqs: Sequence name and length retrieved from seq_region.json file.
            feats: Sequence name and length retrieved from the fasta and gff file.
            name: Name of the check to show in the logs.
            circular: Whether any sequence is circular.

        Returns:
            Error if there are common sequences with difference in ids
            and if the sequences are not consistent in the files.
        """
        if not seqrs or not feats:
            return
        comp = self._compare_seqs(seqrs, feats, circular)

        common = comp["common"]
        diff = comp["diff"]
        diff_circular = comp["diff_circular"]
        only_seqr = comp["only_seqr"]
        only_feat = comp["only_feat"]

        if common:
            logging.info(f"{len(common)} common elements in {name}")
        if diff_circular:
            example = diff_circular[0]
            logging.info(f"{len(diff_circular)} differences for circular elements in {name} (e.g. {example})")
        if diff:
            self.add_errors(f"{len(diff)} common elements with higher length in {name} (e.g. {diff[0]})")
        if only_seqr:
            # Not an error!
            logging.info(f"{len(only_seqr)} only in seq_region list in {name} (first: {only_seqr[0]})")
        if only_feat:
            self.add_errors(f"{len(only_feat)} only in second list in {name} (first: {only_feat[0]})")

    def _compare_seqs(
        self, seqrs: dict[str, Any], feats: dict[str, Any], circular: dict[str, Any] | None = None
    ) -> dict[str, list[str]]:
        """Give the intersection and other comparison between two groups of sequences.

        Args:
            seqs: Sequence name and length retrieved from seq_region.json file.
            feats: Sequence name and length retrieved from the fasta and gff file.
            circular: Whether any sequence is circular.

        Returns: Dict with 5 stats:
            common: Common elements.
            only_seqr: Elements only in the first one.
            only_feat: Elements only in the second one.
            diff: Elements that differ.
            diff_circular: Elements that differ in a circular sequence.

        """
        comp: dict[str, list[str]] = {
            "common": [],
            "only_seqr": [],
            "only_feat": [],
            "diff": [],
            "diff_circular": [],
        }

        for seq_id in seqrs:
            if seq_id in feats:
                # Check that feature is within the seq_region length
                if feats[seq_id] > seqrs[seq_id]:
                    diff_str = f"{seq_id}: {seqrs[seq_id]} vs {feats[seq_id]}"
                    if circular and circular.get(seq_id, False):
                        comp["diff_circular"].append(diff_str)
                    else:
                        comp["diff"].append(diff_str)
                else:
                    comp["common"].append(seq_id)
            else:
                comp["only_seqr"].append(seq_id)

        for seq_id in feats:
            if (
                seq_id not in comp["common"]
                and seq_id not in comp["diff"]
                and seq_id not in comp["diff_circular"]
                and seq_id not in seqrs
            ):
                comp["only_feat"].append(seq_id)

        return comp

errors = [] instance-attribute

ignore_final_stops = False instance-attribute

manifest = ManifestStats(manifest_file) instance-attribute

no_fail = no_fail instance-attribute

add_errors(errors)

Store the given errors (list or single string) in the list of all errors.

Source code in src/python/ensembl/io/genomio/manifest/check_integrity.py
47
48
49
50
51
52
def add_errors(self, errors: list[str] | str) -> None:
    """Store the given errors (list or single string) in the list of all errors."""
    if isinstance(errors, str):
        self.errors.append(errors)
    else:
        self.errors += errors

check_ids(list1, list2, name)

Compare the ids in list1 and list2.

Parameters:

Name Type Description Default
list1 dict[str, Any]

Sequence IDs retrieved from functional.json.

required
list2 dict[str, Any]

Sequence IDs retrieved from the GFF3 file.

required
name str

Source name.

required
Return

List of message errors of sequence IDs found only in one of the lists provided.

Source code in src/python/ensembl/io/genomio/manifest/check_integrity.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def check_ids(self, list1: dict[str, Any], list2: dict[str, Any], name: str) -> list[str]:
    """Compare the ids in list1 and list2.

    Args:
        list1: Sequence IDs retrieved from `functional.json`.
        list2: Sequence IDs retrieved from the GFF3 file.
        name: Source name.

    Return:
        List of message errors of sequence IDs found only in one of the lists provided.
    """

    only1 = []
    only2 = []
    common = []

    for item_id in list1:
        if item_id in list2:
            common.append(item_id)
        else:
            only1.append(item_id)
    for item_id in list2:
        if item_id not in common:
            only2.append(item_id)

    errors = []
    if common:
        logging.info(f"{len(common)} common elements in {name}")
    if only1:
        errors.append(f"{len(only1)} only in first list in {name} (first: {only1[0]})")
        logging.debug(f"{len(only1)} only in first list in {name}")
    if only2:
        errors.append(f"{len(only2)} only in second list in {name} (first: {only2[0]})")
        logging.debug(f"{len(only1)} only in second list in {name}")

    return errors

check_integrity()

Load files listed in the manifest.json and check the integrity. Check if the files are correct by verifying the MD5 hash. Check if translation, functional annotation and sequence region ids and lengths are consistent with the information in gff. Compare sequence length from fasta_dna file to seq_region.json metadata.

Source code in src/python/ensembl/io/genomio/manifest/check_integrity.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def check_integrity(self) -> None:
    """Load files listed in the manifest.json and check the integrity.
    Check if the files are correct by verifying the MD5 hash.
    Check if translation, functional annotation and sequence region ids
    and lengths are consistent with the information in gff.
    Compare sequence length from fasta_dna file to seq_region.json metadata.
    """

    # Load the manifest integrity counts
    manifest = self.manifest
    manifest.prepare_integrity_data()

    genome = manifest.genome

    dna = manifest.get_lengths("dna_sequences")
    pep = manifest.get_lengths("peptide_sequences")
    seq_lengths = manifest.get_lengths("seq_regions")
    seq_circular = manifest.get_circular("seq_regions")

    agp_seqr = manifest.get_lengths("agp")

    # Then, run the checks
    self._check_genome(genome)

    if self.manifest.errors:
        errors_str = "\n".join(self.manifest.errors)
        raise InvalidIntegrityError(f"Manifest files parsing failed:\n{errors_str}")

    # Check gff3
    if manifest.has_lengths("gff3_genes"):
        gff_genes = manifest.get_lengths("gff3_genes")
        gff_seq_regions = manifest.get_lengths("gff3_seq_regions")
        gff_translations = manifest.get_lengths("gff3_translations")
        gff_all_translations = manifest.get_lengths("gff3_all_translations")
        gff_transposable_elements = manifest.get_lengths("gff3_transposable_elements")

        ann_genes = manifest.get_lengths("ann_genes")
        ann_translations = manifest.get_lengths("ann_translations")
        ann_transposable_elements = manifest.get_lengths("ann_transposable_elements")

        # Check fasta_pep.fa integrity
        # The sequence length and id retrieved from the fasta_pep file
        # and compared to the translated CDS id and length in the gff
        # We do not compare the peptide lengths because of sequence edits
        if pep:
            tr_errors = self.check_lengths(
                pep, gff_translations, "Fasta translations vs gff", special_diff=True
            )
            if len(tr_errors) > 0:
                # The pseudo CDSs are included in this check
                # Pseudo CDSs are not translated, if the pseudo translation ids are not ignored
                # in the gff it will give an error
                tr_errors_all = self.check_lengths(
                    pep,
                    gff_all_translations,
                    "Fasta translations vs gff (include pseudo CDS)",
                    special_diff=True,
                )
                if tr_errors_all:
                    self.add_errors(tr_errors)
                    self.add_errors(tr_errors_all)

        # Check functional_annotation.json integrity
        # Gene ids, translated CDS ids and translated CDSs
        # including pseudogenes are compared to the gff
        if ann_genes:
            self.add_errors(self.check_ids(ann_genes, gff_genes, "Gene ids metadata vs gff"))
            tr_id_errors = self.check_ids(
                ann_translations, gff_translations, "Translation ids metadata vs gff"
            )
            if tr_id_errors:
                tr_id_errors_all = self.check_ids(
                    ann_translations,
                    gff_all_translations,
                    "Translation ids metadata vs gff (include pseudo CDS)",
                )
                if tr_id_errors_all:
                    self.add_errors(tr_id_errors)
                    self.add_errors(tr_id_errors_all)
            self.add_errors(
                self.check_ids(
                    ann_transposable_elements,
                    gff_transposable_elements,
                    "TE ids metadata vs gff",
                )
            )

        self.check_seq_region_lengths(
            seq_lengths, gff_seq_regions, "seq_regions JSON vs GFF3 lengths", seq_circular
        )

    self.check_seq_region_lengths(seq_lengths, dna, "seq_regions JSON vs DNA lengths")
    self.check_seq_region_lengths(seq_lengths, agp_seqr, "seq_regions JSON vs AGPs lengths")

    if self.errors:
        errors_str = "\n".join(self.errors)
        message = f"Integrity test failed:\n{errors_str}"
        if self.no_fail:
            print(message)
        else:
            raise InvalidIntegrityError(message)

check_lengths(list1, list2, name, *, allowed_len_diff=None, special_diff=False)

Check the difference in ids and length between list1 and list2. There are a few special cases here where we allow a certain asymmetry by changing the values of the arguments.

Parameters:

Name Type Description Default
list1 dict[str, int]

dict containing length and id of the sequence from fasta files.

required
list2 dict[str, int]

dict containing length and id in the retrieved from the gff.

required
name str

string

required
None to to not accept differences in length between list1 and list2.

The value can be changed based on how much difference in sequence length we are wanting to accept.

set as False when no special length difference is expected between the lists.

This can be changed if we want to report common sequences with 1 BP difference.

Returns:

Type Description
list[str]

Error if there is a difference in length or ids between the lists.

Source code in src/python/ensembl/io/genomio/manifest/check_integrity.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def check_lengths(
    self,
    list1: dict[str, int],
    list2: dict[str, int],
    name: str,
    *,
    allowed_len_diff: int | None = None,
    special_diff: bool = False,
) -> list[str]:
    """Check the difference in ids and length between list1 and list2.
        There are a few special cases here where we allow a certain asymmetry
        by changing the values of the arguments.

    Args:
        list1: dict containing length and id of the sequence from fasta files.
        list2: dict containing length and id in the retrieved from the gff.
        name:  string

    allowed_len_diff : None to to not accept differences in length between list1 and list2.
        The value can be changed based on how much difference in sequence length we are wanting to accept.

    special_diff: set as False when no special length difference is expected between the lists.
                This can be changed if we want to report common sequences with 1 BP difference.

    Returns:
        Error if there is a difference in length or ids between the lists.
    """

    # check list differences, checks if abs(values diff) < allowed_len_diff

    set1 = frozenset(list1)
    set2 = frozenset(list2)
    list1_2 = list(set1 - set2)
    list2_1 = list(set2 - set1)

    errors = []
    if len(list1_2) > 0:
        errors.append(f"{name}: {len(list1_2)} from the first list only (i.e. {list1_2[0]})")
    if len(list2_1) > 0:
        errors.append(f"{name}: {len(list2_1)} from the second list only (i.e. {list2_1[0]})")

    common_len = 0
    if allowed_len_diff is None:
        common_len = len(set1 & set2)
    else:
        # check for the sequence length difference
        diff_len_list: list[str] = []
        diff_len_special_list: list[str] = []
        for e in set1 & set2:
            dl12 = list1[e] - list2[e]
            if abs(dl12) <= allowed_len_diff:
                common_len += 1
            else:
                _dlist = diff_len_list
                # Special case: 1 AA /BP shorter,
                #   so assuming the stop codon is not included in the CDS (when it should be)
                if dl12 == 1 and special_diff:
                    _dlist = diff_len_special_list
                _dlist.append(f"{e}: {list1[e]}, {list2[e]}")
        if diff_len_special_list:
            errors.append(
                (
                    f"{len(diff_len_special_list)} common elements with one BP/AA length diff for {name}"
                    f"(e.g. {diff_len_special_list[0]})"
                )
            )
        if diff_len_list:
            errors.append(
                (
                    f"{len(diff_len_list)} common elements with length diff for {name}"
                    f"(e.g. {diff_len_list[0]})"
                )
            )
    if common_len > 0:
        logging.warning(f"{common_len} common elements between lists for {name}")

    return errors

check_seq_region_lengths(seqrs, feats, name, circular=None)

Check the integrity of seq_region.json file by comparing the length of the sequence to fasta files and the gff.

Seq_region file is in json format containing the metadata of the sequence.
It contains sequence id, length, location and the synonyms for the sequence name
from different sources.

Parameters:

Name Type Description Default
seqs

Sequence name and length retrieved from seq_region.json file.

required
feats dict[str, Any] | None

Sequence name and length retrieved from the fasta and gff file.

required
name str

Name of the check to show in the logs.

required
circular dict[str, Any] | None

Whether any sequence is circular.

None

Returns:

Type Description
None

Error if there are common sequences with difference in ids

None

and if the sequences are not consistent in the files.

Source code in src/python/ensembl/io/genomio/manifest/check_integrity.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def check_seq_region_lengths(
    self,
    seqrs: dict[str, Any] | None,
    feats: dict[str, Any] | None,
    name: str,
    circular: dict[str, Any] | None = None,
) -> None:
    """Check the integrity of seq_region.json file by comparing the length of the sequence
        to fasta files and the gff.

        Seq_region file is in json format containing the metadata of the sequence.
        It contains sequence id, length, location and the synonyms for the sequence name
        from different sources.

    Args:
        seqs: Sequence name and length retrieved from seq_region.json file.
        feats: Sequence name and length retrieved from the fasta and gff file.
        name: Name of the check to show in the logs.
        circular: Whether any sequence is circular.

    Returns:
        Error if there are common sequences with difference in ids
        and if the sequences are not consistent in the files.
    """
    if not seqrs or not feats:
        return
    comp = self._compare_seqs(seqrs, feats, circular)

    common = comp["common"]
    diff = comp["diff"]
    diff_circular = comp["diff_circular"]
    only_seqr = comp["only_seqr"]
    only_feat = comp["only_feat"]

    if common:
        logging.info(f"{len(common)} common elements in {name}")
    if diff_circular:
        example = diff_circular[0]
        logging.info(f"{len(diff_circular)} differences for circular elements in {name} (e.g. {example})")
    if diff:
        self.add_errors(f"{len(diff)} common elements with higher length in {name} (e.g. {diff[0]})")
    if only_seqr:
        # Not an error!
        logging.info(f"{len(only_seqr)} only in seq_region list in {name} (first: {only_seqr[0]})")
    if only_feat:
        self.add_errors(f"{len(only_feat)} only in second list in {name} (first: {only_feat[0]})")

set_ignore_final_stops(ignore_final_stops)

Set ignore_final_stops (when calculating peptide length) for this tool and the manifest.

Source code in src/python/ensembl/io/genomio/manifest/check_integrity.py
156
157
158
159
def set_ignore_final_stops(self, ignore_final_stops: bool) -> None:
    """Set ignore_final_stops (when calculating peptide length) for this tool and the manifest."""
    self.ignore_final_stops = ignore_final_stops
    self.manifest.ignore_final_stops = ignore_final_stops

InvalidIntegrityError

Bases: Exception

When a file integrity check fails

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
40
41
class InvalidIntegrityError(Exception):
    """When a file integrity check fails"""

Manifest

Records of a manifest file and its files and md5 checksums.

Source code in src/python/ensembl/io/genomio/manifest/manifest.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class Manifest:
    """Records of a manifest file and its files and md5 checksums."""

    _same_names = {
        "gff3",
        "fasta_dna",
        "fasta_pep",
        "functional_annotation",
        "genome",
        "seq_attrib",
        "seq_region",
        "agp",
        "events",
    }
    _alias_names = {
        "gene_models": "gff3",
        "dna": "fasta_dna",
        "pep": "fasta_pep",
    }
    _same_names_dict = {name: name for name in _same_names}
    names = {**_same_names_dict, **_alias_names}
    multi_files = {"agp"}

    def __init__(self, manifest_dir: Path) -> None:
        """Initializes a manifest with the directory containing the files (and a manifest if it exists).

        Args:
            manifest_dir: directory where the files are contained.
        """
        self.root_dir = manifest_dir
        self.file_path = manifest_dir / "manifest.json"
        self.files: dict = {}

    def create(self) -> None:
        """Creates a manifest file from the files in a directory."""
        self.get_files_checksums()
        with self.file_path.open("w") as json_out:
            json_out.write(json.dumps(self.files, sort_keys=True, indent=4))

    def get_files_checksums(self) -> ManifestDict:
        """Records all the files in the directory with their checksum."""
        manifest_files: ManifestDict = {}
        for subfile in self.root_dir.iterdir():
            logging.debug(f"Check file {subfile} ({subfile.stem}, {subfile.suffix})")
            used_file = False
            if subfile.is_dir():
                logging.warning("Can't create manifest for subdirectory")
                continue

            # Delete and skip empty files
            if subfile.stat().st_size == 0:
                logging.warning(f"Skip and delete empty file: {subfile}")
                subfile.unlink()
                continue

            for name, standard_name in self.names.items():
                # Either the last element of the stem or the suffix is a known name
                if subfile.stem.endswith(name) or subfile.suffix == f".{name}":
                    logging.debug(f"Matched to {name} ({standard_name}) = {subfile}")
                    used_file = True
                    md5 = self._get_md5sum(subfile)
                    file_obj = {"file": subfile.name, "md5sum": md5}

                    # Multiple files stored, each with a name
                    if standard_name in self.multi_files:
                        manifest_files.setdefault(standard_name, {})
                        obj_name = self._prepare_object_name(subfile, name, manifest_files[standard_name])
                        manifest_files[standard_name][obj_name] = file_obj

                    # Single file/init
                    else:
                        manifest_files[standard_name] = file_obj

            if not used_file:
                logging.warning(f"File {subfile} was not included in the manifest")

        self.files = manifest_files
        return self.files

    def _prepare_object_name(
        self, subfile: Path, name: str, manifest_file_dict: dict[str, dict[str, str]]
    ) -> str:
        # Prepare object name
        try:
            # If we recognize the suffix, then the name is the part after the last "_"
            if subfile.suffix == f".{name}":
                obj_name = subfile.stem.split(sep="_")[-1]
            # If we recognize the end of the name, then the name is the part before the last "_"
            else:
                obj_name = subfile.stem.split(sep="_")[-2]
        except IndexError:
            obj_name = "file"

        # Add number if duplicate name
        obj_name_base = obj_name
        count = 1
        while obj_name in manifest_file_dict.keys():
            obj_name = f"{obj_name_base}.{count}"
            count += 1
            if count >= 10:
                raise ValueError(f"Too many files with same name {obj_name_base}")
        return obj_name

    def load(self) -> ManifestDict:
        """Load the content of an existing manifest file."""
        if not self.file_path.exists():
            raise ManifestError(f"Cannot load non-existing manifest file: {self.file_path}")

        with self.file_path.open("r") as manifest_fh:
            manifest = json.load(manifest_fh)

            # Use dir name from the manifest
            for name in manifest:
                if "file" in manifest[name]:
                    file_path = self.root_dir / manifest[name]["file"]
                    # check if the md5sum is correct
                    md5sum = manifest[name]["md5sum"]
                    self._check_md5sum(file_path, md5sum)
                else:
                    for f in manifest[name]:
                        file_path = self.root_dir / manifest[name][f]["file"]
                        # check if the md5sum is correct
                        md5sum = manifest[name][f]["md5sum"]
                        self._check_md5sum(file_path, md5sum)

            self.files = manifest
        return self.files

    @staticmethod
    def _get_md5sum(file_path: Path) -> str:
        """Returns the md5 checksum for a given file."""
        with file_path.open("rb") as f:
            data_bytes = f.read()
            return hashlib.md5(data_bytes).hexdigest()

    def _check_md5sum(self, file_path: Path, md5sum: str) -> None:
        """Checks a file against an md5 checksum, raises a ManifestError if the checksum fails.

        Args:
            file_path: Path to a genome file.
            md5sum: MD5 hash for the files.
        """
        file_md5sum = self._get_md5sum(file_path)
        if file_md5sum != md5sum:
            raise ManifestError(f"Invalid md5 checksum for {file_path}: got {file_md5sum}, expected {md5sum}")

file_path = manifest_dir / 'manifest.json' instance-attribute

files = {} instance-attribute

multi_files = {'agp'} class-attribute instance-attribute

names = {None: _same_names_dict, None: _alias_names} class-attribute instance-attribute

root_dir = manifest_dir instance-attribute

create()

Creates a manifest file from the files in a directory.

Source code in src/python/ensembl/io/genomio/manifest/manifest.py
66
67
68
69
70
def create(self) -> None:
    """Creates a manifest file from the files in a directory."""
    self.get_files_checksums()
    with self.file_path.open("w") as json_out:
        json_out.write(json.dumps(self.files, sort_keys=True, indent=4))

get_files_checksums()

Records all the files in the directory with their checksum.

Source code in src/python/ensembl/io/genomio/manifest/manifest.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def get_files_checksums(self) -> ManifestDict:
    """Records all the files in the directory with their checksum."""
    manifest_files: ManifestDict = {}
    for subfile in self.root_dir.iterdir():
        logging.debug(f"Check file {subfile} ({subfile.stem}, {subfile.suffix})")
        used_file = False
        if subfile.is_dir():
            logging.warning("Can't create manifest for subdirectory")
            continue

        # Delete and skip empty files
        if subfile.stat().st_size == 0:
            logging.warning(f"Skip and delete empty file: {subfile}")
            subfile.unlink()
            continue

        for name, standard_name in self.names.items():
            # Either the last element of the stem or the suffix is a known name
            if subfile.stem.endswith(name) or subfile.suffix == f".{name}":
                logging.debug(f"Matched to {name} ({standard_name}) = {subfile}")
                used_file = True
                md5 = self._get_md5sum(subfile)
                file_obj = {"file": subfile.name, "md5sum": md5}

                # Multiple files stored, each with a name
                if standard_name in self.multi_files:
                    manifest_files.setdefault(standard_name, {})
                    obj_name = self._prepare_object_name(subfile, name, manifest_files[standard_name])
                    manifest_files[standard_name][obj_name] = file_obj

                # Single file/init
                else:
                    manifest_files[standard_name] = file_obj

        if not used_file:
            logging.warning(f"File {subfile} was not included in the manifest")

    self.files = manifest_files
    return self.files

load()

Load the content of an existing manifest file.

Source code in src/python/ensembl/io/genomio/manifest/manifest.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def load(self) -> ManifestDict:
    """Load the content of an existing manifest file."""
    if not self.file_path.exists():
        raise ManifestError(f"Cannot load non-existing manifest file: {self.file_path}")

    with self.file_path.open("r") as manifest_fh:
        manifest = json.load(manifest_fh)

        # Use dir name from the manifest
        for name in manifest:
            if "file" in manifest[name]:
                file_path = self.root_dir / manifest[name]["file"]
                # check if the md5sum is correct
                md5sum = manifest[name]["md5sum"]
                self._check_md5sum(file_path, md5sum)
            else:
                for f in manifest[name]:
                    file_path = self.root_dir / manifest[name][f]["file"]
                    # check if the md5sum is correct
                    md5sum = manifest[name][f]["md5sum"]
                    self._check_md5sum(file_path, md5sum)

        self.files = manifest
    return self.files

ManifestStats

Representation of the main stats of the files in a manifest for comparison.

The stats in question are: - lengths of sequences (DNA, genes and peptides) - sequences and features IDs - sequences circularity

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
class ManifestStats:
    """Representation of the main stats of the files in a manifest for comparison.

    The stats in question are:
    - lengths of sequences (DNA, genes and peptides)
    - sequences and features IDs
    - sequences circularity
    """

    def __init__(self, manifest_path: StrPath, ignore_final_stops: bool = False) -> None:
        self.manifest_files = self._get_manifest(manifest_path)
        self.genome: dict[str, Any] = {}

        self.lengths: dict[str, StatsLengths] = {
            "dna_sequences": {},
            "peptide_sequences": {},
            "seq_region_levels": {},
            "annotations": {},
            "agp": {},
            "gff3_seq_regions": {},
            "gff3_genes": {},
            "gff3_translations": {},
            "gff3_all_translations": {},
            "gff3_transposable_elements": {},
            "ann_genes": {},
            "ann_translations": {},
            "ann_transposable_elements": {},
            "seq_regions": {},
        }

        self.circular: dict[str, StatsLengths] = {
            "seq_regions": {},
        }

        self.errors: list[str] = []

        self.ignore_final_stops = ignore_final_stops

    def _get_manifest(self, manifest_path: PathLike) -> dict[str, Any]:
        """Load the content of a manifest file.

        Returns:
            Dict: Content of the manifest file.
        """
        manifest = Manifest(Path(manifest_path).parent)
        manifest_files = manifest.load()

        # Replace the {file, md5} dict with the file path
        for name in manifest_files:
            if "file" in manifest_files[name]:
                manifest_files[name] = Path(manifest_path).parent / manifest_files[name]["file"]
            else:
                for f in manifest_files[name]:
                    manifest_files[name][f] = Path(manifest_path).parent / manifest_files[name][f]["file"]
        return manifest_files

    def add_error(self, error: str) -> None:
        """Record an error."""
        self.errors.append(error)

    def load_seq_regions(self) -> None:
        """Retrieve seq_regions lengths and circular information from the seq_region JSON file."""

        if "seq_region" not in self.manifest_files:
            return
        logging.info("Manifest contains seq_region JSON")
        seq_regions = get_json(Path(self.manifest_files["seq_region"]))
        seqr_seqlevel = {}
        seq_lengths = {}
        seq_circular = {}
        # Store the length as int
        for seq in seq_regions:
            seq_lengths[seq["name"]] = int(seq["length"])
            seq_circular[seq["name"]] = seq.get("circular", False)
            if seq["coord_system_level"] == "contig":
                seqr_seqlevel[seq["name"]] = int(seq["length"])
            # Also record synonyms (in case GFF file uses synonyms)
            if "synonyms" in seq:
                for synonym in seq["synonyms"]:
                    seq_lengths[synonym["name"]] = int(seq["length"])
        self.lengths["seq_regions"] = seq_lengths
        self.circular["seq_regions"] = seq_circular

    def load_peptides_fasta_lengths(self) -> None:
        """Retrieve peptides sequences lengths from their FASTA file."""
        if "fasta_pep" not in self.manifest_files:
            return
        self.lengths["peptide_sequences"] = self._get_fasta_lengths(
            self.manifest_files["fasta_pep"], ignore_final_stops=self.ignore_final_stops
        )

    def load_dna_fasta_lengths(self) -> None:
        """Retrieve DNA sequences lengths from their FASTA file."""
        if "fasta_dna" not in self.manifest_files:
            return
        self.lengths["dna_sequences"] = self._get_fasta_lengths(self.manifest_files["fasta_dna"])

    def _get_fasta_lengths(self, fasta_path: StrPath, ignore_final_stops: bool = False) -> dict[str, int]:
        """Returns every sequence ID and its length from a FASTA file (DNA or peptide).

        An error will be added for every empty id, non-unique id or stop codon found in the FASTA file.

        Args:
            fasta_path: Path to FASTA file.
            ignore_final_stops: Do not include final stop in the total length.

        """

        data = {}
        non_unique = {}
        non_unique_count = 0
        empty_id_count = 0
        contains_stop_codon = 0
        rec_count = 0
        for rec in SeqIO.parse(fasta_path, "fasta"):
            rec_count += 1

            # Flag empty ids
            if rec.id == "":
                empty_id_count += 1
                continue
            # Flag redundant ids
            if rec.id in data:
                non_unique[rec.id] = 1
                non_unique_count += 1
            # Store sequence id and length
            data[rec.id] = len(rec.seq)
            stops = rec.seq.count("*")
            if stops >= 1 and not rec.seq.endswith("*"):
                contains_stop_codon += 1
            elif rec.seq.endswith("*") and not ignore_final_stops:
                contains_stop_codon += 1

        if empty_id_count > 0:
            self.add_error(f"{empty_id_count} sequences with empty ids in {fasta_path}")
        if non_unique_count > 0:
            self.add_error(f"{non_unique_count} non unique sequence ids in {fasta_path}")
        if contains_stop_codon > 0:
            self.add_error(f"{contains_stop_codon} sequences with stop codons in {fasta_path}")
        if rec_count == 0:
            self.add_error(f"No sequences found in {fasta_path}")
        return data

    def load_functional_annotation(self) -> None:
        """Load the functional annotation file to retrieve the gene_id and translation id.

        The functional annotation file is stored in a JSON format containing the description, id
        and object type, eg: "gene", "transcript", "translation".

        """
        if "functional_annotation" not in self.manifest_files:
            return
        logging.info("Manifest contains functional annotation(s)")

        # Load the json file
        with open(self.manifest_files["functional_annotation"]) as json_file:
            data = json.load(json_file)

        # Get gene ids and translation ids
        genes = {}
        translations = {}
        transposons = {}

        for item in data:
            if item["object_type"] == "gene":
                genes[item["id"]] = 1
            elif item["object_type"] == "translation":
                translations[item["id"]] = 1
            if item["object_type"] == "transposable_element":
                transposons[item["id"]] = 1

        stats = {
            "ann_genes": genes,
            "ann_translations": translations,
            "ann_transposable_elements": transposons,
        }
        self.lengths = {**self.lengths, **stats}

    def load_gff3(self) -> None:
        """A GFF3 parser is used to retrieve information in the GFF3 file such as
        gene and CDS ids and their corresponding lengths.
        """
        if "gff3" not in self.manifest_files:
            return
        logging.info("Manifest contains GFF3 gene annotations")
        gff3_path = self.manifest_files["gff3"]

        seqs: StatsLengths = {}
        genes: StatsLengths = {}
        peps: StatsLengths = {}
        all_peps: StatsLengths = {}
        tes: StatsLengths = {}

        with open_gz_file(gff3_path) as gff3_handle:
            gff = GFF.parse(gff3_handle)
            for seq in gff:
                seqs[seq.id] = len(seq.seq)

                for feat in seq.features:
                    feat_length = abs(feat.location.end - feat.location.start)
                    # Store gene id and length
                    if feat.type in ["gene", "ncRNA_gene", "pseudogene"]:
                        self._retrieve_gff_gene_lengths(feat, genes, peps, all_peps)
                    if feat.type == "transposable_element":
                        tes[feat.id] = feat_length

        stats: dict[str, StatsLengths] = {
            "gff3_seq_regions": seqs,
            "gff3_genes": genes,
            "gff3_translations": peps,
            "gff3_all_translations": all_peps,
            "gff3_transposable_elements": tes,
        }
        self.lengths = {**self.lengths, **stats}

    def _retrieve_gff_gene_lengths(
        self, feat: GFFSeqFeature, genes: StatsLengths, peps: StatsLengths, all_peps: StatsLengths
    ) -> None:
        """Record genes and peptides lengths from a feature.

        Args:
            feat : Gene feature to check.
            genes: Record of genes lengths to update.
            peps: Record of peptides lengths to update.
            all_peps: Record of all peptides lengths to update (include pseudogenes).

        """
        gene_id = feat.id
        gene_id = gene_id.replace("gene:", "")
        genes[gene_id] = abs(feat.location.end - feat.location.start)
        # Get CDS id and length
        protein_transcripts = {
            "mRNA",
            "pseudogenic_transcript",
        }
        ig_transcripts = {
            "IG_V_gene",
            "IG_C_gene",
            "TR_C_gene",
            "TR_V_gene",
        }
        cds_transcripts = protein_transcripts.union(ig_transcripts)
        for feat2 in feat.sub_features:
            if feat2.type not in cds_transcripts:
                continue
            length: dict[str, int] = {}
            for feat3 in feat2.sub_features:
                if feat3.type != "CDS":
                    continue
                pep_id = feat3.id
                pep_id = pep_id.replace("CDS:", "")
                length.setdefault(pep_id, 0)
                length[pep_id] += abs(feat3.location.end - feat3.location.start)
            for pep_id, pep_length in length.items():
                # Store length for translations, add pseudo translations separately
                pep_length = floor(pep_length / 3) - 1
                if feat.type != "pseudogene" and feat2.type in protein_transcripts:
                    peps[pep_id] = pep_length
                all_peps[pep_id] = pep_length

    def load_agp_seq_regions(self, agp_dict: dict | None) -> None:
        """AGP files describe the assembly of larger sequence objects using smaller objects.

        E.g. describes the assembly of scaffolds from contigs.

        Args:
            agp_dict: Dict containing the information about the sequence.

        Note:
            AGP file is only used in the older builds, not used for current processing.
        """
        if not agp_dict:
            return
        logging.info("Manifest contains AGP files")

        seqr: StatsLengths = {}
        for agp_path in agp_dict.values():
            with open(agp_path, "r") as agph:
                for line in agph:
                    (
                        asm_id,
                        _,  # asm_start
                        asm_end,
                        _,  # asm_part
                        typ,
                        cmp_id,
                        _,  # cmp_start
                        cmp_end,
                        _,  # cmp_strand
                    ) = line.split("\t")
                    # Ignore WGS contig
                    if typ != "W":
                        continue

                    # Assembled seq length
                    if asm_id not in seqr or seqr[asm_id] < int(asm_end):
                        seqr[asm_id] = int(asm_end)

                    # Composite seq length
                    if cmp_id not in seqr or seqr[cmp_id] < int(cmp_end):
                        seqr[cmp_id] = int(cmp_end)

        self.lengths["agp"] = seqr

    def load_genome(self) -> None:
        """Load the genome data."""
        if "genome" not in self.manifest_files:
            return
        logging.info("Manifest contains genome JSON")
        self.genome = get_json(Path(self.manifest_files["genome"]))

    def prepare_integrity_data(self) -> None:  # pylint: disable=too-many-branches
        """Read all the files and keep a record (IDs and their lengths) for each case to be compared later."""
        self.load_gff3()
        self.load_dna_fasta_lengths()
        self.load_peptides_fasta_lengths()
        self.load_seq_regions()
        self.load_functional_annotation()
        self.load_agp_seq_regions(self.manifest_files.get("agp"))
        self.load_genome()

    def has_lengths(self, name: str) -> bool:
        """Check if a given name has lengths records.

        Args:
            name: Name for the lengths to check.

        Raises:
            KeyError: If the name is not supported.
        """
        try:
            return bool(self.lengths[name])
        except KeyError as err:
            raise KeyError(f"There is no length record for {name}") from err

    def get_lengths(self, name: str) -> dict[str, Any]:
        """Returns a dict associating IDs with their length from a given file name.

        Args:
            name: Name for the lengths to get.

        Raises:
            KeyError: If the name is not supported.
        """
        try:
            return self.lengths[name]
        except KeyError as err:
            raise KeyError(f"There is no length record for {name}") from err

    def get_circular(self, name: str) -> dict[str, Any]:
        """Returns a dict associating IDs with their is_circular flag from a given file name.

        Args:
            name: Name for the circular data to get.

        Raises:
            KeyError: If the name is not supported.
        """
        try:
            return self.circular[name]
        except KeyError as err:
            raise KeyError(f"No length available for key {name}") from err

circular = {'seq_regions': {}} instance-attribute

errors = [] instance-attribute

genome = {} instance-attribute

ignore_final_stops = ignore_final_stops instance-attribute

lengths = {'dna_sequences': {}, 'peptide_sequences': {}, 'seq_region_levels': {}, 'annotations': {}, 'agp': {}, 'gff3_seq_regions': {}, 'gff3_genes': {}, 'gff3_translations': {}, 'gff3_all_translations': {}, 'gff3_transposable_elements': {}, 'ann_genes': {}, 'ann_translations': {}, 'ann_transposable_elements': {}, 'seq_regions': {}} instance-attribute

manifest_files = self._get_manifest(manifest_path) instance-attribute

add_error(error)

Record an error.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
100
101
102
def add_error(self, error: str) -> None:
    """Record an error."""
    self.errors.append(error)

get_circular(name)

Returns a dict associating IDs with their is_circular flag from a given file name.

Parameters:

Name Type Description Default
name str

Name for the circular data to get.

required

Raises:

Type Description
KeyError

If the name is not supported.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
393
394
395
396
397
398
399
400
401
402
403
404
405
def get_circular(self, name: str) -> dict[str, Any]:
    """Returns a dict associating IDs with their is_circular flag from a given file name.

    Args:
        name: Name for the circular data to get.

    Raises:
        KeyError: If the name is not supported.
    """
    try:
        return self.circular[name]
    except KeyError as err:
        raise KeyError(f"No length available for key {name}") from err

get_lengths(name)

Returns a dict associating IDs with their length from a given file name.

Parameters:

Name Type Description Default
name str

Name for the lengths to get.

required

Raises:

Type Description
KeyError

If the name is not supported.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
379
380
381
382
383
384
385
386
387
388
389
390
391
def get_lengths(self, name: str) -> dict[str, Any]:
    """Returns a dict associating IDs with their length from a given file name.

    Args:
        name: Name for the lengths to get.

    Raises:
        KeyError: If the name is not supported.
    """
    try:
        return self.lengths[name]
    except KeyError as err:
        raise KeyError(f"There is no length record for {name}") from err

has_lengths(name)

Check if a given name has lengths records.

Parameters:

Name Type Description Default
name str

Name for the lengths to check.

required

Raises:

Type Description
KeyError

If the name is not supported.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
365
366
367
368
369
370
371
372
373
374
375
376
377
def has_lengths(self, name: str) -> bool:
    """Check if a given name has lengths records.

    Args:
        name: Name for the lengths to check.

    Raises:
        KeyError: If the name is not supported.
    """
    try:
        return bool(self.lengths[name])
    except KeyError as err:
        raise KeyError(f"There is no length record for {name}") from err

load_agp_seq_regions(agp_dict)

AGP files describe the assembly of larger sequence objects using smaller objects.

E.g. describes the assembly of scaffolds from contigs.

Parameters:

Name Type Description Default
agp_dict dict | None

Dict containing the information about the sequence.

required
Note

AGP file is only used in the older builds, not used for current processing.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def load_agp_seq_regions(self, agp_dict: dict | None) -> None:
    """AGP files describe the assembly of larger sequence objects using smaller objects.

    E.g. describes the assembly of scaffolds from contigs.

    Args:
        agp_dict: Dict containing the information about the sequence.

    Note:
        AGP file is only used in the older builds, not used for current processing.
    """
    if not agp_dict:
        return
    logging.info("Manifest contains AGP files")

    seqr: StatsLengths = {}
    for agp_path in agp_dict.values():
        with open(agp_path, "r") as agph:
            for line in agph:
                (
                    asm_id,
                    _,  # asm_start
                    asm_end,
                    _,  # asm_part
                    typ,
                    cmp_id,
                    _,  # cmp_start
                    cmp_end,
                    _,  # cmp_strand
                ) = line.split("\t")
                # Ignore WGS contig
                if typ != "W":
                    continue

                # Assembled seq length
                if asm_id not in seqr or seqr[asm_id] < int(asm_end):
                    seqr[asm_id] = int(asm_end)

                # Composite seq length
                if cmp_id not in seqr or seqr[cmp_id] < int(cmp_end):
                    seqr[cmp_id] = int(cmp_end)

    self.lengths["agp"] = seqr

load_dna_fasta_lengths()

Retrieve DNA sequences lengths from their FASTA file.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
135
136
137
138
139
def load_dna_fasta_lengths(self) -> None:
    """Retrieve DNA sequences lengths from their FASTA file."""
    if "fasta_dna" not in self.manifest_files:
        return
    self.lengths["dna_sequences"] = self._get_fasta_lengths(self.manifest_files["fasta_dna"])

load_functional_annotation()

Load the functional annotation file to retrieve the gene_id and translation id.

The functional annotation file is stored in a JSON format containing the description, id and object type, eg: "gene", "transcript", "translation".

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def load_functional_annotation(self) -> None:
    """Load the functional annotation file to retrieve the gene_id and translation id.

    The functional annotation file is stored in a JSON format containing the description, id
    and object type, eg: "gene", "transcript", "translation".

    """
    if "functional_annotation" not in self.manifest_files:
        return
    logging.info("Manifest contains functional annotation(s)")

    # Load the json file
    with open(self.manifest_files["functional_annotation"]) as json_file:
        data = json.load(json_file)

    # Get gene ids and translation ids
    genes = {}
    translations = {}
    transposons = {}

    for item in data:
        if item["object_type"] == "gene":
            genes[item["id"]] = 1
        elif item["object_type"] == "translation":
            translations[item["id"]] = 1
        if item["object_type"] == "transposable_element":
            transposons[item["id"]] = 1

    stats = {
        "ann_genes": genes,
        "ann_translations": translations,
        "ann_transposable_elements": transposons,
    }
    self.lengths = {**self.lengths, **stats}

load_genome()

Load the genome data.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
348
349
350
351
352
353
def load_genome(self) -> None:
    """Load the genome data."""
    if "genome" not in self.manifest_files:
        return
    logging.info("Manifest contains genome JSON")
    self.genome = get_json(Path(self.manifest_files["genome"]))

load_gff3()

A GFF3 parser is used to retrieve information in the GFF3 file such as gene and CDS ids and their corresponding lengths.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def load_gff3(self) -> None:
    """A GFF3 parser is used to retrieve information in the GFF3 file such as
    gene and CDS ids and their corresponding lengths.
    """
    if "gff3" not in self.manifest_files:
        return
    logging.info("Manifest contains GFF3 gene annotations")
    gff3_path = self.manifest_files["gff3"]

    seqs: StatsLengths = {}
    genes: StatsLengths = {}
    peps: StatsLengths = {}
    all_peps: StatsLengths = {}
    tes: StatsLengths = {}

    with open_gz_file(gff3_path) as gff3_handle:
        gff = GFF.parse(gff3_handle)
        for seq in gff:
            seqs[seq.id] = len(seq.seq)

            for feat in seq.features:
                feat_length = abs(feat.location.end - feat.location.start)
                # Store gene id and length
                if feat.type in ["gene", "ncRNA_gene", "pseudogene"]:
                    self._retrieve_gff_gene_lengths(feat, genes, peps, all_peps)
                if feat.type == "transposable_element":
                    tes[feat.id] = feat_length

    stats: dict[str, StatsLengths] = {
        "gff3_seq_regions": seqs,
        "gff3_genes": genes,
        "gff3_translations": peps,
        "gff3_all_translations": all_peps,
        "gff3_transposable_elements": tes,
    }
    self.lengths = {**self.lengths, **stats}

load_peptides_fasta_lengths()

Retrieve peptides sequences lengths from their FASTA file.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
127
128
129
130
131
132
133
def load_peptides_fasta_lengths(self) -> None:
    """Retrieve peptides sequences lengths from their FASTA file."""
    if "fasta_pep" not in self.manifest_files:
        return
    self.lengths["peptide_sequences"] = self._get_fasta_lengths(
        self.manifest_files["fasta_pep"], ignore_final_stops=self.ignore_final_stops
    )

load_seq_regions()

Retrieve seq_regions lengths and circular information from the seq_region JSON file.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def load_seq_regions(self) -> None:
    """Retrieve seq_regions lengths and circular information from the seq_region JSON file."""

    if "seq_region" not in self.manifest_files:
        return
    logging.info("Manifest contains seq_region JSON")
    seq_regions = get_json(Path(self.manifest_files["seq_region"]))
    seqr_seqlevel = {}
    seq_lengths = {}
    seq_circular = {}
    # Store the length as int
    for seq in seq_regions:
        seq_lengths[seq["name"]] = int(seq["length"])
        seq_circular[seq["name"]] = seq.get("circular", False)
        if seq["coord_system_level"] == "contig":
            seqr_seqlevel[seq["name"]] = int(seq["length"])
        # Also record synonyms (in case GFF file uses synonyms)
        if "synonyms" in seq:
            for synonym in seq["synonyms"]:
                seq_lengths[synonym["name"]] = int(seq["length"])
    self.lengths["seq_regions"] = seq_lengths
    self.circular["seq_regions"] = seq_circular

prepare_integrity_data()

Read all the files and keep a record (IDs and their lengths) for each case to be compared later.

Source code in src/python/ensembl/io/genomio/manifest/manifest_stats.py
355
356
357
358
359
360
361
362
363
def prepare_integrity_data(self) -> None:  # pylint: disable=too-many-branches
    """Read all the files and keep a record (IDs and their lengths) for each case to be compared later."""
    self.load_gff3()
    self.load_dna_fasta_lengths()
    self.load_peptides_fasta_lengths()
    self.load_seq_regions()
    self.load_functional_annotation()
    self.load_agp_seq_regions(self.manifest_files.get("agp"))
    self.load_genome()

StatsError

Bases: Exception

Raised when stats could not be computed.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
69
70
class StatsError(Exception):
    """Raised when stats could not be computed."""

manifest_stats

Representation of the statistics of the set of files listed in the manifest file provided.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
class manifest_stats:
    """Representation of the statistics of the set of files listed in the manifest file provided."""

    def __init__(self, manifest_dir: str, accession: Optional[str], datasets_bin: Optional[str]):
        self.manifest = f"{manifest_dir}/manifest.json"
        self.accession: Optional[str] = accession
        self.errors: List[str] = []
        self.errors_file = Path(manifest_dir) / "stats_diff.log"
        if datasets_bin is None:
            datasets_bin = "datasets"
        self.datasets_bin = datasets_bin
        self.manifest_parent = manifest_dir
        self.check_ncbi = False

    def run(self, stats_path: StrPath) -> None:
        """Compute stats in the files and output a stats.txt file in the same folder.

        Raises:
            StatsError: Could not compute some stats.
        """
        manifest = self.get_manifest()

        stats = []
        if self.accession is not None:
            stats.append(self.accession)

        # Compute the stats from the GFF3 file
        if "gff3" in manifest:
            stats += self.get_gff3_stats(Path(manifest["gff3"]))

        # Compute the stats from the seq_region file
        if "seq_region" in manifest:
            stats += self.get_seq_region_stats(Path(manifest["seq_region"]))

        # Print out the stats in a separate file
        with Path(stats_path).open("w") as stats_out:
            stats_out.write("\n".join(stats))

        # Die if there were errors in stats comparison
        if self.errors:
            with self.errors_file.open("w") as errors_fh:
                for error_line in self.errors:
                    errors_fh.write(error_line)

    def get_manifest(self) -> Dict:
        """Get the files metadata from the manifest json file.

        Returns:
            Dict: A representation of the manifest json data.
        """
        with open(self.manifest) as f_json:
            manifest = json.load(f_json)
            manifest_root = self.manifest_parent

        # Use dir name from the manifest
        for name in manifest:
            if "file" in manifest[name]:
                file_name = manifest[name]["file"]
                file_name = f"{manifest_root}/{file_name}"
                manifest[name] = file_name
            else:
                for f in manifest[name]:
                    if "file" in manifest[name][f]:
                        file_name = manifest[name][f]["file"]
                        file_name = manifest_root, file_name
                        manifest[name][f] = file_name

        return manifest

    def get_seq_region_stats(self, seq_region_path: Path) -> List[str]:
        """Compute stats from the seq_region json file.

        Args:
            seq_region_path (Path): the seq_region json file.

        Returns:
            List[str]: Stats from the seq_regions.
        """
        with seq_region_path.open("r") as json_file:
            seq_regions = json.load(json_file)

        # Get basic data
        coord_systems: Dict[str, List[int]] = {}
        circular = 0
        locations = []
        codon_tables = []
        for seqr in seq_regions:
            # Get readable seq_region name:
            # either use a Genbank synonym, or just the provided seq_region name
            genbank = "synonyms" in seqr and [x for x in seqr["synonyms"] if x["source"] == "GenBank"]
            seqr_name = genbank and genbank[0]["name"] or seqr["name"]

            # Record the lengths of the elements of each coord_system
            coord_level = seqr["coord_system_level"]
            if coord_level not in coord_systems:
                coord_systems[coord_level] = []
            coord_systems[coord_level].append(seqr["length"])

            # Additional metadata records to count
            if "circular" in seqr:
                circular += 1
            if "codon_table" in seqr:
                codon_tables.append(f"{seqr_name} = {seqr['codon_table']}")
            if "location" in seqr:
                locations.append(f"{seqr_name} = {seqr['location']}")

        # Stats
        stats: List[str] = []
        stats.append(seq_region_path.name)
        stats += self.coord_systems_stats(coord_systems)
        stats += self.seq_region_special_stats(circular, locations, codon_tables)
        stats.append("\n")
        return stats

    def coord_systems_stats(self, coord_systems: Dict[str, List[int]]) -> List[str]:
        """For each coord_system compute various stats:
            - number of sequences
            - sequence length sum, minimum, maximum, mean

        Args:
            coord_systems: Coordinate system dictionary of lengths.

        Returns:
            A list with the computed statistics in a printable format.
        """
        stats: List[str] = []
        stats.append(f"Total coord_systems {len(coord_systems)}")
        for coord_name, lengths in coord_systems.items():
            stats.append(f"\nCoord_system: {coord_name}")

            stat_counts: Dict[str, Union[int, float]] = {
                "Number of sequences": len(lengths),
                "Sequence length sum": sum(lengths),
                "Sequence length minimum": min(lengths),
                "Sequence length maximum": max(lengths),
                "Sequence length mean": mean(lengths),
            }

            for name, count in stat_counts.items():
                if isinstance(count, int):
                    stats.append(f"{count: 9d}\t{name}")
                else:
                    stats.append(f"{count: 9f}\t{name}")
        return stats

    def seq_region_special_stats(
        self,
        circular: int = 0,
        locations: Optional[List[str]] = None,
        codon_tables: Optional[List[str]] = None,
    ) -> List[str]:
        """Prepare stats in case there are circular regions, specific locations and codon_tables.
                stats.append(f"{count: 9f}\t{name}")

        Args:
            circular: Number of circular regions. Defaults to 0.
            locations: The regions and their location. Defaults to None.
            codon_tables: The regions and their codon_table. Defaults to None.

        Returns:
            A list with the computed statistics in a printable format.
        """
        stats: List[str] = []
        if circular or locations or codon_tables:
            stats.append("\nSpecial")
            if circular:
                stats.append(f"{circular: 9d}\tcircular sequences")
            if locations is not None:
                stats.append(f"{len(locations): 9d} sequences with location")
                for loc in locations:
                    stats.append(f"\t\t\t{loc}")
            if codon_tables:
                stats.append(f"{len(codon_tables): 9d} sequences with codon_table")
                for table in codon_tables:
                    stats.append(f"\t\t\t{table}")
        return stats

    def get_gff3_stats(self, gff3_path: Path) -> List[str]:
        """Extract the gene models from the GFF3 file and compute stats.

        Args:
            gff3_path (Path): the GFF3 file.

        Returns:
            List: Stats from the gene model.
        """

        biotypes = self.count_biotypes(gff3_path)
        # Compile final stats
        stats = self.biotypes_stats(biotypes)
        stats += self.check_ncbi_stats(biotypes)
        return stats

    def count_biotypes(self, gff3_path: Path) -> Dict[str, BiotypeCounter]:
        """Count the biotypes in a GFF3 file.

        Args:
            gff3_path: Path to the GFF3 file.

        Returns:
            Dictionary of biotype counters.
        """

        biotypes: Dict[str, BiotypeCounter] = {}

        with open_gz_file(gff3_path) as gff3_handle:
            for rec in GFF.parse(gff3_handle):
                for feat1 in rec.features:
                    # Check if the gene contains proteins (CDSs),
                    # and keep a count of all hierarchies (e.g. gene-mRNA-CDS)
                    is_protein = False
                    for feat2 in feat1.sub_features:
                        if feat2.type == "mRNA":
                            types2 = {f.type for f in feat2.sub_features}
                            if "CDS" in types2:
                                is_protein = True
                        manifest_stats.increment_biotype(biotypes, feat2.id, f"{feat1.type}-{feat2.type}")
                        for feat3 in feat2.sub_features:
                            if feat3.type == "exon":
                                continue
                            manifest_stats.increment_biotype(
                                biotypes, feat3.id, f"{feat1.type}-{feat2.type}-{feat3.type}"
                            )

                    # Main categories counts
                    if feat1.type == "pseudogene":
                        manifest_stats.increment_biotype(biotypes, feat1.id, "pseudogene")
                    elif is_protein:
                        manifest_stats.increment_biotype(biotypes, feat1.id, f"PROT_{feat1.type}")
                    else:
                        # Special case, undefined gene-transcript
                        if (
                            feat1.type == "gene"
                            and feat1.sub_features
                            and feat1.sub_features[0].type == "transcript"
                        ):
                            manifest_stats.increment_biotype(biotypes, feat1.id, "OTHER")
                        else:
                            manifest_stats.increment_biotype(biotypes, feat1.id, f"NONPROT_{feat1.type}")

                    # Total
                    if feat1.type in ("gene", "pseudogene"):
                        manifest_stats.increment_biotype(biotypes, feat1.id, "ALL_GENES")
        return biotypes

    def biotypes_stats(self, biotypes: Dict[str, BiotypeCounter]) -> List[str]:
        """Prepare biotype stats in order of their name.

        Args:
            biotypes: Biotypes counters.

        Returns:
            A list with the computed statistics in a printable format.
        """
        sorted_biotypes = {}
        for name in sorted(biotypes.keys()):
            data: BiotypeCounter = biotypes[name]
            sorted_biotypes[name] = data

        stats = [
            f"{data.unique_count():>9}\t{biotype:<20}\tID = {data.example}"
            for (biotype, data) in sorted_biotypes.items()
        ]
        return stats

    def check_ncbi_stats(self, biotypes: Dict[str, BiotypeCounter]) -> List[str]:
        """Use the dataset tool from NCBI to get stats and compare with what we have"""
        stats: List[str] = []
        if not self.check_ncbi:
            return stats

        if self.accession is None:
            return stats

        accession: str = self.accession

        datasets_bin = self.datasets_bin
        if not which(datasets_bin):
            return stats

        # Get the dataset summary from NCBI
        command = [datasets_bin, "summary", "genome", "accession", accession]
        result_out = subprocess.run(command, stdout=subprocess.PIPE, check=True)
        result = json.loads(result_out.stdout)

        # Get stats
        if "reports" in result:
            genome = result["reports"][0]
            if "annotation_info" in genome and "stats" in genome["annotation_info"]:
                ncbi_stats = genome["annotation_info"]["stats"]

                if "gene_counts" in ncbi_stats:
                    counts = ncbi_stats["gene_counts"]
                    stats = self.compare_ncbi_counts(biotypes, counts)
        return stats

    def compare_ncbi_counts(self, biotypes: Dict[str, BiotypeCounter], ncbi: Dict) -> List[str]:
        """Compare specific gene stats from NCBI"""
        stats: List[str] = []

        maps = [
            ["total", "ALL_GENES"],
            ["protein_coding", "PROT_gene"],
            ["pseudogene", "pseudogene"],
            ["non_coding", "NONPROT_gene"],
            ["other", "OTHER"],
        ]

        for count_map in maps:
            ncbi_name, prep_name = count_map
            ncbi_count = ncbi.get(ncbi_name, 0)
            prepped: Optional[BiotypeCounter] = biotypes.get(prep_name)
            prep_count = 0
            if prepped is not None:
                prep_count = prepped.count

            if prep_count != ncbi_count:
                diff = prep_count - ncbi_count
                self.errors.append(f"DIFF gene count for {count_map}: {prep_count} - {ncbi_count} = {diff}")
            else:
                stats.append(f"Same count for {count_map}: {prep_count}")

        return stats

    @staticmethod
    def increment_biotype(biotypes: Dict[str, BiotypeCounter], feature_id: str, feature_biotype: str) -> None:
        """Add the feature to their respective biotype counter.

        Args:
            biotypes (Dict[str, BiotypeCounter]): All current biotypes, with their counter.
            feature_id (str): Feature id to be counted.
            feature_biotype (str): The biotype of the feature.
        """
        if feature_biotype not in biotypes:
            biotypes[feature_biotype] = BiotypeCounter(example=feature_id)
        biotypes[feature_biotype].add_id(feature_id)

accession = accession instance-attribute

check_ncbi = False instance-attribute

datasets_bin = datasets_bin instance-attribute

errors = [] instance-attribute

errors_file = Path(manifest_dir) / 'stats_diff.log' instance-attribute

manifest = f'{manifest_dir}/manifest.json' instance-attribute

manifest_parent = manifest_dir instance-attribute

biotypes_stats(biotypes)

Prepare biotype stats in order of their name.

Parameters:

Name Type Description Default
biotypes Dict[str, BiotypeCounter]

Biotypes counters.

required

Returns:

Type Description
List[str]

A list with the computed statistics in a printable format.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def biotypes_stats(self, biotypes: Dict[str, BiotypeCounter]) -> List[str]:
    """Prepare biotype stats in order of their name.

    Args:
        biotypes: Biotypes counters.

    Returns:
        A list with the computed statistics in a printable format.
    """
    sorted_biotypes = {}
    for name in sorted(biotypes.keys()):
        data: BiotypeCounter = biotypes[name]
        sorted_biotypes[name] = data

    stats = [
        f"{data.unique_count():>9}\t{biotype:<20}\tID = {data.example}"
        for (biotype, data) in sorted_biotypes.items()
    ]
    return stats

check_ncbi_stats(biotypes)

Use the dataset tool from NCBI to get stats and compare with what we have

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def check_ncbi_stats(self, biotypes: Dict[str, BiotypeCounter]) -> List[str]:
    """Use the dataset tool from NCBI to get stats and compare with what we have"""
    stats: List[str] = []
    if not self.check_ncbi:
        return stats

    if self.accession is None:
        return stats

    accession: str = self.accession

    datasets_bin = self.datasets_bin
    if not which(datasets_bin):
        return stats

    # Get the dataset summary from NCBI
    command = [datasets_bin, "summary", "genome", "accession", accession]
    result_out = subprocess.run(command, stdout=subprocess.PIPE, check=True)
    result = json.loads(result_out.stdout)

    # Get stats
    if "reports" in result:
        genome = result["reports"][0]
        if "annotation_info" in genome and "stats" in genome["annotation_info"]:
            ncbi_stats = genome["annotation_info"]["stats"]

            if "gene_counts" in ncbi_stats:
                counts = ncbi_stats["gene_counts"]
                stats = self.compare_ncbi_counts(biotypes, counts)
    return stats

compare_ncbi_counts(biotypes, ncbi)

Compare specific gene stats from NCBI

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def compare_ncbi_counts(self, biotypes: Dict[str, BiotypeCounter], ncbi: Dict) -> List[str]:
    """Compare specific gene stats from NCBI"""
    stats: List[str] = []

    maps = [
        ["total", "ALL_GENES"],
        ["protein_coding", "PROT_gene"],
        ["pseudogene", "pseudogene"],
        ["non_coding", "NONPROT_gene"],
        ["other", "OTHER"],
    ]

    for count_map in maps:
        ncbi_name, prep_name = count_map
        ncbi_count = ncbi.get(ncbi_name, 0)
        prepped: Optional[BiotypeCounter] = biotypes.get(prep_name)
        prep_count = 0
        if prepped is not None:
            prep_count = prepped.count

        if prep_count != ncbi_count:
            diff = prep_count - ncbi_count
            self.errors.append(f"DIFF gene count for {count_map}: {prep_count} - {ncbi_count} = {diff}")
        else:
            stats.append(f"Same count for {count_map}: {prep_count}")

    return stats

coord_systems_stats(coord_systems)

For each coord_system compute various stats
  • number of sequences
  • sequence length sum, minimum, maximum, mean

Parameters:

Name Type Description Default
coord_systems Dict[str, List[int]]

Coordinate system dictionary of lengths.

required

Returns:

Type Description
List[str]

A list with the computed statistics in a printable format.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def coord_systems_stats(self, coord_systems: Dict[str, List[int]]) -> List[str]:
    """For each coord_system compute various stats:
        - number of sequences
        - sequence length sum, minimum, maximum, mean

    Args:
        coord_systems: Coordinate system dictionary of lengths.

    Returns:
        A list with the computed statistics in a printable format.
    """
    stats: List[str] = []
    stats.append(f"Total coord_systems {len(coord_systems)}")
    for coord_name, lengths in coord_systems.items():
        stats.append(f"\nCoord_system: {coord_name}")

        stat_counts: Dict[str, Union[int, float]] = {
            "Number of sequences": len(lengths),
            "Sequence length sum": sum(lengths),
            "Sequence length minimum": min(lengths),
            "Sequence length maximum": max(lengths),
            "Sequence length mean": mean(lengths),
        }

        for name, count in stat_counts.items():
            if isinstance(count, int):
                stats.append(f"{count: 9d}\t{name}")
            else:
                stats.append(f"{count: 9f}\t{name}")
    return stats

count_biotypes(gff3_path)

Count the biotypes in a GFF3 file.

Parameters:

Name Type Description Default
gff3_path Path

Path to the GFF3 file.

required

Returns:

Type Description
Dict[str, BiotypeCounter]

Dictionary of biotype counters.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def count_biotypes(self, gff3_path: Path) -> Dict[str, BiotypeCounter]:
    """Count the biotypes in a GFF3 file.

    Args:
        gff3_path: Path to the GFF3 file.

    Returns:
        Dictionary of biotype counters.
    """

    biotypes: Dict[str, BiotypeCounter] = {}

    with open_gz_file(gff3_path) as gff3_handle:
        for rec in GFF.parse(gff3_handle):
            for feat1 in rec.features:
                # Check if the gene contains proteins (CDSs),
                # and keep a count of all hierarchies (e.g. gene-mRNA-CDS)
                is_protein = False
                for feat2 in feat1.sub_features:
                    if feat2.type == "mRNA":
                        types2 = {f.type for f in feat2.sub_features}
                        if "CDS" in types2:
                            is_protein = True
                    manifest_stats.increment_biotype(biotypes, feat2.id, f"{feat1.type}-{feat2.type}")
                    for feat3 in feat2.sub_features:
                        if feat3.type == "exon":
                            continue
                        manifest_stats.increment_biotype(
                            biotypes, feat3.id, f"{feat1.type}-{feat2.type}-{feat3.type}"
                        )

                # Main categories counts
                if feat1.type == "pseudogene":
                    manifest_stats.increment_biotype(biotypes, feat1.id, "pseudogene")
                elif is_protein:
                    manifest_stats.increment_biotype(biotypes, feat1.id, f"PROT_{feat1.type}")
                else:
                    # Special case, undefined gene-transcript
                    if (
                        feat1.type == "gene"
                        and feat1.sub_features
                        and feat1.sub_features[0].type == "transcript"
                    ):
                        manifest_stats.increment_biotype(biotypes, feat1.id, "OTHER")
                    else:
                        manifest_stats.increment_biotype(biotypes, feat1.id, f"NONPROT_{feat1.type}")

                # Total
                if feat1.type in ("gene", "pseudogene"):
                    manifest_stats.increment_biotype(biotypes, feat1.id, "ALL_GENES")
    return biotypes

get_gff3_stats(gff3_path)

Extract the gene models from the GFF3 file and compute stats.

Parameters:

Name Type Description Default
gff3_path Path

the GFF3 file.

required

Returns:

Name Type Description
List List[str]

Stats from the gene model.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def get_gff3_stats(self, gff3_path: Path) -> List[str]:
    """Extract the gene models from the GFF3 file and compute stats.

    Args:
        gff3_path (Path): the GFF3 file.

    Returns:
        List: Stats from the gene model.
    """

    biotypes = self.count_biotypes(gff3_path)
    # Compile final stats
    stats = self.biotypes_stats(biotypes)
    stats += self.check_ncbi_stats(biotypes)
    return stats

get_manifest()

Get the files metadata from the manifest json file.

Returns:

Name Type Description
Dict Dict

A representation of the manifest json data.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def get_manifest(self) -> Dict:
    """Get the files metadata from the manifest json file.

    Returns:
        Dict: A representation of the manifest json data.
    """
    with open(self.manifest) as f_json:
        manifest = json.load(f_json)
        manifest_root = self.manifest_parent

    # Use dir name from the manifest
    for name in manifest:
        if "file" in manifest[name]:
            file_name = manifest[name]["file"]
            file_name = f"{manifest_root}/{file_name}"
            manifest[name] = file_name
        else:
            for f in manifest[name]:
                if "file" in manifest[name][f]:
                    file_name = manifest[name][f]["file"]
                    file_name = manifest_root, file_name
                    manifest[name][f] = file_name

    return manifest

get_seq_region_stats(seq_region_path)

Compute stats from the seq_region json file.

Parameters:

Name Type Description Default
seq_region_path Path

the seq_region json file.

required

Returns:

Type Description
List[str]

List[str]: Stats from the seq_regions.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def get_seq_region_stats(self, seq_region_path: Path) -> List[str]:
    """Compute stats from the seq_region json file.

    Args:
        seq_region_path (Path): the seq_region json file.

    Returns:
        List[str]: Stats from the seq_regions.
    """
    with seq_region_path.open("r") as json_file:
        seq_regions = json.load(json_file)

    # Get basic data
    coord_systems: Dict[str, List[int]] = {}
    circular = 0
    locations = []
    codon_tables = []
    for seqr in seq_regions:
        # Get readable seq_region name:
        # either use a Genbank synonym, or just the provided seq_region name
        genbank = "synonyms" in seqr and [x for x in seqr["synonyms"] if x["source"] == "GenBank"]
        seqr_name = genbank and genbank[0]["name"] or seqr["name"]

        # Record the lengths of the elements of each coord_system
        coord_level = seqr["coord_system_level"]
        if coord_level not in coord_systems:
            coord_systems[coord_level] = []
        coord_systems[coord_level].append(seqr["length"])

        # Additional metadata records to count
        if "circular" in seqr:
            circular += 1
        if "codon_table" in seqr:
            codon_tables.append(f"{seqr_name} = {seqr['codon_table']}")
        if "location" in seqr:
            locations.append(f"{seqr_name} = {seqr['location']}")

    # Stats
    stats: List[str] = []
    stats.append(seq_region_path.name)
    stats += self.coord_systems_stats(coord_systems)
    stats += self.seq_region_special_stats(circular, locations, codon_tables)
    stats.append("\n")
    return stats

increment_biotype(biotypes, feature_id, feature_biotype) staticmethod

Add the feature to their respective biotype counter.

Parameters:

Name Type Description Default
biotypes Dict[str, BiotypeCounter]

All current biotypes, with their counter.

required
feature_id str

Feature id to be counted.

required
feature_biotype str

The biotype of the feature.

required
Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
397
398
399
400
401
402
403
404
405
406
407
408
@staticmethod
def increment_biotype(biotypes: Dict[str, BiotypeCounter], feature_id: str, feature_biotype: str) -> None:
    """Add the feature to their respective biotype counter.

    Args:
        biotypes (Dict[str, BiotypeCounter]): All current biotypes, with their counter.
        feature_id (str): Feature id to be counted.
        feature_biotype (str): The biotype of the feature.
    """
    if feature_biotype not in biotypes:
        biotypes[feature_biotype] = BiotypeCounter(example=feature_id)
    biotypes[feature_biotype].add_id(feature_id)

run(stats_path)

Compute stats in the files and output a stats.txt file in the same folder.

Raises:

Type Description
StatsError

Could not compute some stats.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def run(self, stats_path: StrPath) -> None:
    """Compute stats in the files and output a stats.txt file in the same folder.

    Raises:
        StatsError: Could not compute some stats.
    """
    manifest = self.get_manifest()

    stats = []
    if self.accession is not None:
        stats.append(self.accession)

    # Compute the stats from the GFF3 file
    if "gff3" in manifest:
        stats += self.get_gff3_stats(Path(manifest["gff3"]))

    # Compute the stats from the seq_region file
    if "seq_region" in manifest:
        stats += self.get_seq_region_stats(Path(manifest["seq_region"]))

    # Print out the stats in a separate file
    with Path(stats_path).open("w") as stats_out:
        stats_out.write("\n".join(stats))

    # Die if there were errors in stats comparison
    if self.errors:
        with self.errors_file.open("w") as errors_fh:
            for error_line in self.errors:
                errors_fh.write(error_line)

seq_region_special_stats(circular=0, locations=None, codon_tables=None)

Prepare stats in case there are circular regions, specific locations and codon_tables. stats.append(f"{count: 9f} {name}")

Parameters:

Name Type Description Default
circular int

Number of circular regions. Defaults to 0.

0
locations Optional[List[str]]

The regions and their location. Defaults to None.

None
codon_tables Optional[List[str]]

The regions and their codon_table. Defaults to None.

None

Returns:

Type Description
List[str]

A list with the computed statistics in a printable format.

Source code in src/python/ensembl/io/genomio/manifest/compute_stats.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def seq_region_special_stats(
    self,
    circular: int = 0,
    locations: Optional[List[str]] = None,
    codon_tables: Optional[List[str]] = None,
) -> List[str]:
    """Prepare stats in case there are circular regions, specific locations and codon_tables.
            stats.append(f"{count: 9f}\t{name}")

    Args:
        circular: Number of circular regions. Defaults to 0.
        locations: The regions and their location. Defaults to None.
        codon_tables: The regions and their codon_table. Defaults to None.

    Returns:
        A list with the computed statistics in a printable format.
    """
    stats: List[str] = []
    if circular or locations or codon_tables:
        stats.append("\nSpecial")
        if circular:
            stats.append(f"{circular: 9d}\tcircular sequences")
        if locations is not None:
            stats.append(f"{len(locations): 9d} sequences with location")
            for loc in locations:
                stats.append(f"\t\t\t{loc}")
        if codon_tables:
            stats.append(f"{len(codon_tables): 9d} sequences with codon_table")
            for table in codon_tables:
                stats.append(f"\t\t\t{table}")
    return stats

init_logging_with_args(args)

Processes the Namespace object provided to call init_logging() with the correct arguments.

Parameters:

Name Type Description Default
args Namespace

Namespace populated by an argument parser.

required
Source code in ensembl/utils/logging.py
 98
 99
100
101
102
103
104
105
106
107
def init_logging_with_args(args: argparse.Namespace) -> None:
    """Processes the Namespace object provided to call `init_logging()` with the correct arguments.

    Args:
        args: Namespace populated by an argument parser.

    """
    args_dict = vars(args)
    log_args = {x: args_dict[x] for x in ["log_level", "log_file", "log_file_level"] if x in args_dict}
    init_logging(**log_args)

main()

Main entrypoint.

Source code in src/python/ensembl/io/genomio/manifest/generate.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def main() -> None:
    """Main entrypoint."""
    parser = ArgumentParser(
        description="Compare the genomic data between the files present in a manifest file."
    )
    parser.add_argument_dst_path(
        "--manifest_dir", required=True, help="Folder where to create a manifest file"
    )
    parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
    parser.add_log_arguments()
    args = parser.parse_args()
    init_logging_with_args(args)

    manifest = Manifest(args.manifest_dir)
    manifest.create()