Skip to content

gff3

ensembl.io.genomio.gff3

GFF3 files processing module.

Annotation = Dict[str, Any] module-attribute

AnnotationError

Bases: Exception

If anything wrong happens when recording annotations.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
51
52
class AnnotationError(Exception):
    """If anything wrong happens when recording annotations."""

ArgumentParser

Bases: ArgumentParser

Extends argparse.ArgumentParser with additional methods and functionality.

The default behaviour of the help text will be to display the default values on every non-required argument, i.e. optional arguments with required=False.

Source code in ensembl/utils/argparse.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
class ArgumentParser(argparse.ArgumentParser):
    """Extends `argparse.ArgumentParser` with additional methods and functionality.

    The default behaviour of the help text will be to display the default values on every non-required
    argument, i.e. optional arguments with `required=False`.

    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Extends the base class to include the information about default argument values by default."""
        super().__init__(*args, **kwargs)
        self.formatter_class = argparse.ArgumentDefaultsHelpFormatter

    def _validate_src_path(self, src_path: StrPath) -> Path:
        """Returns the path if exists and it is readable, raises an error through the parser otherwise.

        Args:
            src_path: File or directory path to check.

        """
        src_path = Path(src_path)
        if not src_path.exists():
            self.error(f"'{src_path}' not found")
        elif not os.access(src_path, os.R_OK):
            self.error(f"'{src_path}' not readable")
        return src_path

    def _validate_dst_path(self, dst_path: StrPath, exists_ok: bool = False) -> Path:
        """Returns the path if it is writable, raises an error through the parser otherwise.

        Args:
            dst_path: File or directory path to check.
            exists_ok: Do not raise an error during parsing if the destination path already exists.

        """
        dst_path = Path(dst_path)
        if dst_path.exists():
            if os.access(dst_path, os.W_OK):
                if exists_ok:
                    return dst_path
                self.error(f"'{dst_path}' already exists")
            else:
                self.error(f"'{dst_path}' is not writable")
        # Check if the first parent directory that exists is writable
        for parent_path in dst_path.parents:
            if parent_path.exists():
                if not os.access(parent_path, os.W_OK):
                    self.error(f"'{dst_path}' is not writable")
                break
        return dst_path

    def _validate_number(
        self,
        value: str,
        value_type: Callable[[str], int | float],
        min_value: int | float | None,
        max_value: int | float | None,
    ) -> int | float:
        """Returns the numeric value if it is of the expected type and it is within the specified range.

        Args:
            value: String representation of numeric value to check.
            value_type: Expected type of the numeric value.
            min_value: Minimum value constrain. If `None`, no minimum value constrain.
            max_value: Maximum value constrain. If `None`, no maximum value constrain.

        """
        # Check if the string representation can be converted to the expected type
        try:
            result = value_type(value)
        except (TypeError, ValueError):
            self.error(f"invalid {value_type.__name__} value: {value}")
        # Check if numeric value is within range
        if (min_value is not None) and (result < min_value):
            self.error(f"{value} is lower than minimum value ({min_value})")
        if (max_value is not None) and (result > max_value):
            self.error(f"{value} is greater than maximum value ({max_value})")
        return result

    def add_argument(self, *args: Any, **kwargs: Any) -> None:  # type: ignore[override]
        """Extends the parent function by excluding the default value in the help text when not provided.

        Only applied to required arguments without a default value, i.e. positional arguments or optional
        arguments with `required=True`.

        """
        if kwargs.get("required", False):
            kwargs.setdefault("default", argparse.SUPPRESS)
        super().add_argument(*args, **kwargs)

    def add_argument_src_path(self, *args: Any, **kwargs: Any) -> None:
        """Adds `pathlib.Path` argument, checking if it exists and it is readable at parsing time.

        If "metavar" is not defined, it is added with "PATH" as value to improve help text readability.

        """
        kwargs.setdefault("metavar", "PATH")
        kwargs["type"] = self._validate_src_path
        self.add_argument(*args, **kwargs)

    def add_argument_dst_path(self, *args: Any, exists_ok: bool = True, **kwargs: Any) -> None:
        """Adds `pathlib.Path` argument, checking if it is writable at parsing time.

        If "metavar" is not defined it is added with "PATH" as value to improve help text readability.

        Args:
            exists_ok: Do not raise an error if the destination path already exists.

        """
        kwargs.setdefault("metavar", "PATH")
        kwargs["type"] = lambda x: self._validate_dst_path(x, exists_ok)
        self.add_argument(*args, **kwargs)

    def add_argument_url(self, *args: Any, **kwargs: Any) -> None:
        """Adds `sqlalchemy.engine.URL` argument.

        If "metavar" is not defined it is added with "URI" as value to improve help text readability.

        """
        kwargs.setdefault("metavar", "URI")
        kwargs["type"] = make_url
        self.add_argument(*args, **kwargs)

    # pylint: disable=redefined-builtin
    def add_numeric_argument(
        self,
        *args: Any,
        type: Callable[[str], int | float] = float,
        min_value: int | float | None = None,
        max_value: int | float | None = None,
        **kwargs: Any,
    ) -> None:
        """Adds a numeric argument with constrains on its type and its minimum or maximum value.

        Note that the default value (if defined) is not checked unless the argument is an optional argument
        and no value is provided in the command line.

        Args:
            type: Type to convert the argument value to when parsing.
            min_value: Minimum value constrain. If `None`, no minimum value constrain.
            max_value: Maximum value constrain. If `None`, no maximum value constrain.

        """
        # If both minimum and maximum values are defined, ensure min_value <= max_value
        if (min_value is not None) and (max_value is not None) and (min_value > max_value):
            raise ArgumentError("minimum value is greater than maximum value")
        # Add lambda function to check numeric constrains when parsing argument
        kwargs["type"] = lambda x: self._validate_number(x, type, min_value, max_value)
        self.add_argument(*args, **kwargs)

    # pylint: disable=redefined-builtin
    def add_server_arguments(
        self, prefix: str = "", include_database: bool = False, help: str | None = None
    ) -> None:
        """Adds the usual set of arguments needed to connect to a server, i.e. `--host`, `--port`, `--user`
        and `--password` (optional).

        Note that the parser will assume this is a MySQL server.

        Args:
            prefix: Prefix to add the each argument, e.g. if prefix is `src_`, the arguments will be
                `--src_host`, etc.
            include_database: Include `--database` argument.
            help: Description message to include for this set of arguments.

        """
        group = self.add_argument_group(f"{prefix}server connection arguments", description=help)
        group.add_argument(
            f"--{prefix}host", required=True, metavar="HOST", default=argparse.SUPPRESS, help="host name"
        )
        group.add_argument(
            f"--{prefix}port",
            required=True,
            type=int,
            metavar="PORT",
            default=argparse.SUPPRESS,
            help="port number",
        )
        group.add_argument(
            f"--{prefix}user", required=True, metavar="USER", default=argparse.SUPPRESS, help="user name"
        )
        group.add_argument(f"--{prefix}password", metavar="PWD", help="host password")
        if include_database:
            group.add_argument(
                f"--{prefix}database",
                required=True,
                metavar="NAME",
                default=argparse.SUPPRESS,
                help="database name",
            )

    def add_log_arguments(self, add_log_file: bool = False) -> None:
        """Adds the usual set of arguments required to set and initialise a logging system.

        The current set includes a mutually exclusive group for the default logging level: `--verbose`,
        `--debug` or `--log LEVEL`.

        Args:
            add_log_file: Add arguments to allow storing messages into a file, i.e. `--log_file` and
                `--log_file_level`.

        """
        # Define the list of log levels available
        log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
        # NOTE: from 3.11 this list can be changed to: logging.getLevelNamesMapping().keys()
        # Create logging arguments group
        group = self.add_argument_group("logging arguments")
        # Add 3 mutually exclusive options to set the logging level
        subgroup = group.add_mutually_exclusive_group()
        subgroup.add_argument(
            "-v",
            "--verbose",
            action="store_const",
            const="INFO",
            dest="log_level",
            help="verbose mode, i.e. 'INFO' log level",
        )
        subgroup.add_argument(
            "--debug",
            action="store_const",
            const="DEBUG",
            dest="log_level",
            help="debugging mode, i.e. 'DEBUG' log level",
        )
        subgroup.add_argument(
            "--log",
            choices=log_levels,
            type=str.upper,
            default="WARNING",
            metavar="LEVEL",
            dest="log_level",
            help="level of the events to track: %(choices)s",
        )
        subgroup.set_defaults(log_level="WARNING")
        if add_log_file:
            # Add log file-related arguments
            group.add_argument(
                "--log_file",
                type=lambda x: self._validate_dst_path(x, exists_ok=True),
                metavar="PATH",
                default=None,
                help="log file path",
            )
            group.add_argument(
                "--log_file_level",
                choices=log_levels,
                type=str.upper,
                default="DEBUG",
                metavar="LEVEL",
                help="level of the events to track in the log file: %(choices)s",
            )

    def parse_args(self, *args: Any, **kwargs: Any) -> argparse.Namespace:  # type: ignore[override]
        """Extends the parent function by adding a new URL argument for every server group added.

        The type of this new argument will be `sqlalchemy.engine.URL`. It also logs all the parsed
        arguments for debugging purposes when logging arguments have been added.

        """
        arguments = super().parse_args(*args, **kwargs)
        # Build and add an sqlalchemy.engine.URL object for every server group added
        pattern = re.compile(r"([\w-]*)host$")
        server_prefixes = [x.group(1) for x in map(pattern.match, vars(arguments)) if x]
        for prefix in server_prefixes:
            # Raise an error rather than overwriting when the URL argument is already present
            if f"{prefix}url" in arguments:
                self.error(f"argument '{prefix}url' is already present")
            try:
                server_url = URL.create(
                    "mysql",
                    getattr(arguments, f"{prefix}user"),
                    getattr(arguments, f"{prefix}password"),
                    getattr(arguments, f"{prefix}host"),
                    getattr(arguments, f"{prefix}port"),
                    getattr(arguments, f"{prefix}database", None),
                )
            except AttributeError:
                # Not a database server host argument
                continue
            setattr(arguments, f"{prefix}url", server_url)
        return arguments

formatter_class = argparse.ArgumentDefaultsHelpFormatter instance-attribute

add_argument(*args, **kwargs)

Extends the parent function by excluding the default value in the help text when not provided.

Only applied to required arguments without a default value, i.e. positional arguments or optional arguments with required=True.

Source code in ensembl/utils/argparse.py
132
133
134
135
136
137
138
139
140
141
def add_argument(self, *args: Any, **kwargs: Any) -> None:  # type: ignore[override]
    """Extends the parent function by excluding the default value in the help text when not provided.

    Only applied to required arguments without a default value, i.e. positional arguments or optional
    arguments with `required=True`.

    """
    if kwargs.get("required", False):
        kwargs.setdefault("default", argparse.SUPPRESS)
    super().add_argument(*args, **kwargs)

add_argument_dst_path(*args, exists_ok=True, **kwargs)

Adds pathlib.Path argument, checking if it is writable at parsing time.

If "metavar" is not defined it is added with "PATH" as value to improve help text readability.

Parameters:

Name Type Description Default
exists_ok bool

Do not raise an error if the destination path already exists.

True
Source code in ensembl/utils/argparse.py
153
154
155
156
157
158
159
160
161
162
163
164
def add_argument_dst_path(self, *args: Any, exists_ok: bool = True, **kwargs: Any) -> None:
    """Adds `pathlib.Path` argument, checking if it is writable at parsing time.

    If "metavar" is not defined it is added with "PATH" as value to improve help text readability.

    Args:
        exists_ok: Do not raise an error if the destination path already exists.

    """
    kwargs.setdefault("metavar", "PATH")
    kwargs["type"] = lambda x: self._validate_dst_path(x, exists_ok)
    self.add_argument(*args, **kwargs)

add_argument_src_path(*args, **kwargs)

Adds pathlib.Path argument, checking if it exists and it is readable at parsing time.

If "metavar" is not defined, it is added with "PATH" as value to improve help text readability.

Source code in ensembl/utils/argparse.py
143
144
145
146
147
148
149
150
151
def add_argument_src_path(self, *args: Any, **kwargs: Any) -> None:
    """Adds `pathlib.Path` argument, checking if it exists and it is readable at parsing time.

    If "metavar" is not defined, it is added with "PATH" as value to improve help text readability.

    """
    kwargs.setdefault("metavar", "PATH")
    kwargs["type"] = self._validate_src_path
    self.add_argument(*args, **kwargs)

add_argument_url(*args, **kwargs)

Adds sqlalchemy.engine.URL argument.

If "metavar" is not defined it is added with "URI" as value to improve help text readability.

Source code in ensembl/utils/argparse.py
166
167
168
169
170
171
172
173
174
def add_argument_url(self, *args: Any, **kwargs: Any) -> None:
    """Adds `sqlalchemy.engine.URL` argument.

    If "metavar" is not defined it is added with "URI" as value to improve help text readability.

    """
    kwargs.setdefault("metavar", "URI")
    kwargs["type"] = make_url
    self.add_argument(*args, **kwargs)

add_log_arguments(add_log_file=False)

Adds the usual set of arguments required to set and initialise a logging system.

The current set includes a mutually exclusive group for the default logging level: --verbose, --debug or --log LEVEL.

Parameters:

Name Type Description Default
add_log_file bool

Add arguments to allow storing messages into a file, i.e. --log_file and --log_file_level.

False
Source code in ensembl/utils/argparse.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def add_log_arguments(self, add_log_file: bool = False) -> None:
    """Adds the usual set of arguments required to set and initialise a logging system.

    The current set includes a mutually exclusive group for the default logging level: `--verbose`,
    `--debug` or `--log LEVEL`.

    Args:
        add_log_file: Add arguments to allow storing messages into a file, i.e. `--log_file` and
            `--log_file_level`.

    """
    # Define the list of log levels available
    log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
    # NOTE: from 3.11 this list can be changed to: logging.getLevelNamesMapping().keys()
    # Create logging arguments group
    group = self.add_argument_group("logging arguments")
    # Add 3 mutually exclusive options to set the logging level
    subgroup = group.add_mutually_exclusive_group()
    subgroup.add_argument(
        "-v",
        "--verbose",
        action="store_const",
        const="INFO",
        dest="log_level",
        help="verbose mode, i.e. 'INFO' log level",
    )
    subgroup.add_argument(
        "--debug",
        action="store_const",
        const="DEBUG",
        dest="log_level",
        help="debugging mode, i.e. 'DEBUG' log level",
    )
    subgroup.add_argument(
        "--log",
        choices=log_levels,
        type=str.upper,
        default="WARNING",
        metavar="LEVEL",
        dest="log_level",
        help="level of the events to track: %(choices)s",
    )
    subgroup.set_defaults(log_level="WARNING")
    if add_log_file:
        # Add log file-related arguments
        group.add_argument(
            "--log_file",
            type=lambda x: self._validate_dst_path(x, exists_ok=True),
            metavar="PATH",
            default=None,
            help="log file path",
        )
        group.add_argument(
            "--log_file_level",
            choices=log_levels,
            type=str.upper,
            default="DEBUG",
            metavar="LEVEL",
            help="level of the events to track in the log file: %(choices)s",
        )

add_numeric_argument(*args, type=float, min_value=None, max_value=None, **kwargs)

Adds a numeric argument with constrains on its type and its minimum or maximum value.

Note that the default value (if defined) is not checked unless the argument is an optional argument and no value is provided in the command line.

Parameters:

Name Type Description Default
type Callable[[str], int | float]

Type to convert the argument value to when parsing.

float
min_value int | float | None

Minimum value constrain. If None, no minimum value constrain.

None
max_value int | float | None

Maximum value constrain. If None, no maximum value constrain.

None
Source code in ensembl/utils/argparse.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def add_numeric_argument(
    self,
    *args: Any,
    type: Callable[[str], int | float] = float,
    min_value: int | float | None = None,
    max_value: int | float | None = None,
    **kwargs: Any,
) -> None:
    """Adds a numeric argument with constrains on its type and its minimum or maximum value.

    Note that the default value (if defined) is not checked unless the argument is an optional argument
    and no value is provided in the command line.

    Args:
        type: Type to convert the argument value to when parsing.
        min_value: Minimum value constrain. If `None`, no minimum value constrain.
        max_value: Maximum value constrain. If `None`, no maximum value constrain.

    """
    # If both minimum and maximum values are defined, ensure min_value <= max_value
    if (min_value is not None) and (max_value is not None) and (min_value > max_value):
        raise ArgumentError("minimum value is greater than maximum value")
    # Add lambda function to check numeric constrains when parsing argument
    kwargs["type"] = lambda x: self._validate_number(x, type, min_value, max_value)
    self.add_argument(*args, **kwargs)

add_server_arguments(prefix='', include_database=False, help=None)

Adds the usual set of arguments needed to connect to a server, i.e. --host, --port, --user and --password (optional).

Note that the parser will assume this is a MySQL server.

Parameters:

Name Type Description Default
prefix str

Prefix to add the each argument, e.g. if prefix is src_, the arguments will be --src_host, etc.

''
include_database bool

Include --database argument.

False
help str | None

Description message to include for this set of arguments.

None
Source code in ensembl/utils/argparse.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def add_server_arguments(
    self, prefix: str = "", include_database: bool = False, help: str | None = None
) -> None:
    """Adds the usual set of arguments needed to connect to a server, i.e. `--host`, `--port`, `--user`
    and `--password` (optional).

    Note that the parser will assume this is a MySQL server.

    Args:
        prefix: Prefix to add the each argument, e.g. if prefix is `src_`, the arguments will be
            `--src_host`, etc.
        include_database: Include `--database` argument.
        help: Description message to include for this set of arguments.

    """
    group = self.add_argument_group(f"{prefix}server connection arguments", description=help)
    group.add_argument(
        f"--{prefix}host", required=True, metavar="HOST", default=argparse.SUPPRESS, help="host name"
    )
    group.add_argument(
        f"--{prefix}port",
        required=True,
        type=int,
        metavar="PORT",
        default=argparse.SUPPRESS,
        help="port number",
    )
    group.add_argument(
        f"--{prefix}user", required=True, metavar="USER", default=argparse.SUPPRESS, help="user name"
    )
    group.add_argument(f"--{prefix}password", metavar="PWD", help="host password")
    if include_database:
        group.add_argument(
            f"--{prefix}database",
            required=True,
            metavar="NAME",
            default=argparse.SUPPRESS,
            help="database name",
        )

parse_args(*args, **kwargs)

Extends the parent function by adding a new URL argument for every server group added.

The type of this new argument will be sqlalchemy.engine.URL. It also logs all the parsed arguments for debugging purposes when logging arguments have been added.

Source code in ensembl/utils/argparse.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def parse_args(self, *args: Any, **kwargs: Any) -> argparse.Namespace:  # type: ignore[override]
    """Extends the parent function by adding a new URL argument for every server group added.

    The type of this new argument will be `sqlalchemy.engine.URL`. It also logs all the parsed
    arguments for debugging purposes when logging arguments have been added.

    """
    arguments = super().parse_args(*args, **kwargs)
    # Build and add an sqlalchemy.engine.URL object for every server group added
    pattern = re.compile(r"([\w-]*)host$")
    server_prefixes = [x.group(1) for x in map(pattern.match, vars(arguments)) if x]
    for prefix in server_prefixes:
        # Raise an error rather than overwriting when the URL argument is already present
        if f"{prefix}url" in arguments:
            self.error(f"argument '{prefix}url' is already present")
        try:
            server_url = URL.create(
                "mysql",
                getattr(arguments, f"{prefix}user"),
                getattr(arguments, f"{prefix}password"),
                getattr(arguments, f"{prefix}host"),
                getattr(arguments, f"{prefix}port"),
                getattr(arguments, f"{prefix}database", None),
            )
        except AttributeError:
            # Not a database server host argument
            continue
        setattr(arguments, f"{prefix}url", server_url)
    return arguments

DuplicateIdError

Bases: Exception

Trying to add a feature with an ID already in use.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
43
44
class DuplicateIdError(Exception):
    """Trying to add a feature with an ID already in use."""

FunctionalAnnotations

List of annotations extracted from a GFF3 file.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
class FunctionalAnnotations:
    """List of annotations extracted from a GFF3 file."""

    ignored_xrefs = {"go", "interpro", "uniprot"}

    def __init__(self, provider_name: str = "") -> None:
        self.annotations: List[Annotation] = []
        self.provider_name = provider_name
        # Annotated features
        # Under each feature, each dict's key is a feature ID
        self.features: Dict[str, Dict[str, Annotation]] = {
            "gene": {},
            "transcript": {},
            "translation": {},
            "transposable_element": {},
        }
        # Keep parent info: key is the feature ID, value is the parent ID
        self.parents: Dict[str, Dict[str, str]] = {
            "gene": {},
            "transcript": {},
        }

    def get_xrefs(self, feature: GFFSeqFeature) -> List[Dict[str, Any]]:
        """Get the xrefs from the Dbxref field."""
        all_xref: List[Dict[str, str]] = []

        if "Dbxref" in feature.qualifiers:
            for xref in feature.qualifiers["Dbxref"]:
                dbname, name = xref.split(":", maxsplit=1)
                if dbname == "GenBank" and self.provider_name == "RefSeq":
                    dbname = "RefSeq"

                if dbname.lower() in self.ignored_xrefs:
                    continue

                xrefs = {"dbname": dbname, "id": name}
                all_xref.append(xrefs)

        # Add RefSeq ID xref if it looks like one
        if self.provider_name == "RefSeq":
            if feature.type == "gene" and feature.id.startswith("LOC"):
                xref_dbs = {x["dbname"] for x in all_xref}
                if "RefSeq" not in xref_dbs:
                    all_xref.append({"dbname": "RefSeq", "id": feature.id})

        return all_xref

    def get_features(self, feat_type: str) -> Dict[str, Annotation]:
        """Get all feature annotations for the requested type."""
        try:
            return self.features[feat_type]
        except KeyError as err:
            raise KeyError(f"No such feature type {feat_type}") from err

    def add_parent_link(self, parent_type: str, parent_id: str, child_id: str) -> None:
        """Record a parent-child IDs relationship for a given parent biotype."""
        features = self.get_features(parent_type)
        if parent_id not in features:
            raise MissingParentError(f"Parent {parent_type}:{parent_id} not found for {child_id}")
        self.parents[parent_type][child_id] = parent_id

    def get_parent(self, parent_type: str, child_id: str) -> str:
        """Returns the parent ID of a given child for a given parent biotype."""
        try:
            parents = self.parents[parent_type]
        except KeyError as err:
            raise KeyError(f"Unsupported parent type {parent_type}") from err

        parent_id = parents.get(child_id)
        if parent_id is None:
            raise MissingParentError(f"Can't find {parent_type} parent for {child_id}")
        return parent_id

    def add_feature(
        self,
        feature: GFFSeqFeature,
        feat_type: str,
        parent_id: Optional[str] = None,
        all_parent_ids: Optional[List[str]] = None,
    ) -> None:
        """Add annotation for a feature of a given type. If a parent_id is provided, record the relationship.

        Args:
            feature: The feature to create an annotation.
            feat_type: Type of the feature to annotate.
            parent_id: Parent ID of this feature to keep it linked.
            all_parent_ids: All parent IDs to remove from non-informative descriptions.
        """
        if all_parent_ids is None:
            all_parent_ids = []
        features = self.get_features(feat_type)
        if feature.id in features:
            raise AnnotationError(f"Feature {feat_type} ID {feature.id} already added")

        feature_object = self._generic_feature(feature, feat_type, all_parent_ids)
        self.features[feat_type][feature.id] = feature_object

        if parent_id:
            if feat_type in _PARENTS:
                parent_type = _PARENTS[feat_type]
                self.add_parent_link(parent_type, parent_id, feature.id)
            else:
                raise AnnotationError(f"No parent possible for {feat_type} {feature.id}")

    def _generic_feature(
        self, feature: GFFSeqFeature, feat_type: str, parent_ids: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Create a feature object following the specifications.

        Args:
            feature: The GFFSeqFeature to add to the list.
            feat_type: Feature type of the feature to store (e.g. gene, transcript, translation).
            all_parent_ids: All parent IDs to remove from non-informative descriptions.

        """
        if parent_ids is None:
            parent_ids = []

        feature_object: Annotation = {"object_type": feat_type, "id": feature.id}

        # Description?
        for qname in ("description", "product"):
            if qname in feature.qualifiers:
                description = feature.qualifiers[qname][0]
                if self.product_is_informative(description, feat_ids=parent_ids + [feature.id]):
                    feature_object["description"] = description
                    break
                logging.debug(f"Non informative description for {feature.id}: {description}")

        feature_object["xrefs"] = []
        if "Dbxref" in feature.qualifiers:
            all_xref = self.get_xrefs(feature)
            feature_object["xrefs"] = all_xref

        xref_values = {xref["id"].lower() for xref in feature_object["xrefs"]}

        # Synonyms?
        # We add synonyms to the external_synonym table
        # which is associated with the first xref of that feature type
        if "Name" in feature.qualifiers:
            feat_name = feature.qualifiers["Name"][0]
            if feat_name.lower() != feature.id.lower() and feat_name.lower() not in xref_values:
                feature_object["synonyms"] = [feat_name]

        # is_pseudogene?
        if feature.type.startswith("pseudogen"):
            feature_object["is_pseudogene"] = True

        # Don't keep empty xref
        if not feature_object["xrefs"]:
            del feature_object["xrefs"]
        return feature_object

    def transfer_descriptions(self) -> None:
        """Transfers the feature descriptions in 2 steps:
        - from translations to transcripts (if the transcript description is empty)
        - from transcripts to genes (same case)

        """
        self._transfer_description_up("translation")
        self._transfer_description_up("transcript")

    def _transfer_description_up(self, child_feature: str) -> None:
        """Transfer descriptions from all feature of a given type, up to their parent.

        Args:
            child_feature: Either "translation" (transfer to transcript) or "transcript" (to gene).

        """
        children_features = self.get_features(child_feature)
        parent_type = _PARENTS[child_feature]
        parent_features = self.get_features(parent_type)

        # Transfer description from children to their parent
        for child_id, child in children_features.items():
            child_description = child.get("description")
            if child_description is not None:
                child_description = self._clean_description(child_description)
                # Check parent
                parent_id = self.get_parent(parent_type, child_id)
                parent = parent_features[parent_id]
                parent_description = parent.get("description")
                if parent_description is None:
                    parent["description"] = child_description

    @staticmethod
    def _clean_description(description: str) -> str:
        """Returns the description without "transcript variant" information."""
        variant_re = re.compile(r", transcript variant [A-Z][0-9]+$", re.IGNORECASE)
        description = re.sub(variant_re, "", description)
        return description

    @staticmethod
    def product_is_informative(product: str, feat_ids: Optional[List[str]] = None) -> bool:
        """Returns True if the product name contains informative words, False otherwise.

        It is considered uninformative when the description contains words such as "hypothetical" or
        or "putative". If feature IDs are provided, consider it uninformative as well (we do not want
        descriptions to be just the ID).

        Args:
            product: A product name.
            feat_ids: List of feature IDs.

        """
        non_informative_words = [
            "hypothetical",
            "putative",
            "uncharacterized",
            "unspecified",
            "unknown",
            r"(of )?unknown function",
            "conserved",
            "predicted",
            "fragment",
            "product",
            "function",
            "protein",
            "transcript",
            "gene",
            "RNA",
            r"(variant|isoform)( X?\d+)?",
            r"low quality protein",
        ]
        non_informative_re = re.compile(r"|".join(non_informative_words), re.IGNORECASE)

        # Remove all IDs that are in the description
        if feat_ids:
            logging.debug(f"Filter out {feat_ids} from {product}")
            try:
                for feat_id in feat_ids:
                    feat_id_re = re.compile(feat_id, re.IGNORECASE)
                    product = re.sub(feat_id_re, "", product)
            except TypeError as err:
                raise TypeError(f"Failed to search {feat_id_re} in '{product}'") from err

        # Remove punctuations
        punct_re = re.compile(r"[,;: _()-]+")
        product = re.sub(punct_re, " ", product)

        # Then remove non informative words
        product = re.sub(non_informative_re, " ", product)

        # Anything (informative) left?
        empty_re = re.compile(r"^[ ]*$")
        return not bool(empty_re.match(product))

    def _to_list(self) -> list[Annotation]:
        all_list: list[Annotation] = []
        for feat_dict in self.features.values():
            all_list += feat_dict.values()
        return all_list

    def to_json(self, out_path: PathLike) -> None:
        """Print out the current annotation list in a json file.

        Args:
            out_path: JSON file path where to write the data.

        """
        self.transfer_descriptions()
        feats_list = self._to_list()
        print_json(Path(out_path), feats_list)

    def store_gene(self, gene: GFFSeqFeature) -> None:
        """Record the functional_annotations of a gene and its children features."""
        self.add_feature(gene, "gene")

        for transcript in gene.sub_features:
            self.add_feature(transcript, "transcript", gene.id, [gene.id])
            for feat in transcript.sub_features:
                if feat.type == "CDS":
                    self.add_feature(feat, "translation", transcript.id, [gene.id, transcript.id])
                    # Store CDS functional annotation only once
                    break

annotations = [] instance-attribute

features = {'gene': {}, 'transcript': {}, 'translation': {}, 'transposable_element': {}} instance-attribute

ignored_xrefs = {'go', 'interpro', 'uniprot'} class-attribute instance-attribute

parents = {'gene': {}, 'transcript': {}} instance-attribute

provider_name = provider_name instance-attribute

add_feature(feature, feat_type, parent_id=None, all_parent_ids=None)

Add annotation for a feature of a given type. If a parent_id is provided, record the relationship.

Parameters:

Name Type Description Default
feature GFFSeqFeature

The feature to create an annotation.

required
feat_type str

Type of the feature to annotate.

required
parent_id Optional[str]

Parent ID of this feature to keep it linked.

None
all_parent_ids Optional[List[str]]

All parent IDs to remove from non-informative descriptions.

None
Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def add_feature(
    self,
    feature: GFFSeqFeature,
    feat_type: str,
    parent_id: Optional[str] = None,
    all_parent_ids: Optional[List[str]] = None,
) -> None:
    """Add annotation for a feature of a given type. If a parent_id is provided, record the relationship.

    Args:
        feature: The feature to create an annotation.
        feat_type: Type of the feature to annotate.
        parent_id: Parent ID of this feature to keep it linked.
        all_parent_ids: All parent IDs to remove from non-informative descriptions.
    """
    if all_parent_ids is None:
        all_parent_ids = []
    features = self.get_features(feat_type)
    if feature.id in features:
        raise AnnotationError(f"Feature {feat_type} ID {feature.id} already added")

    feature_object = self._generic_feature(feature, feat_type, all_parent_ids)
    self.features[feat_type][feature.id] = feature_object

    if parent_id:
        if feat_type in _PARENTS:
            parent_type = _PARENTS[feat_type]
            self.add_parent_link(parent_type, parent_id, feature.id)
        else:
            raise AnnotationError(f"No parent possible for {feat_type} {feature.id}")

Record a parent-child IDs relationship for a given parent biotype.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
109
110
111
112
113
114
def add_parent_link(self, parent_type: str, parent_id: str, child_id: str) -> None:
    """Record a parent-child IDs relationship for a given parent biotype."""
    features = self.get_features(parent_type)
    if parent_id not in features:
        raise MissingParentError(f"Parent {parent_type}:{parent_id} not found for {child_id}")
    self.parents[parent_type][child_id] = parent_id

get_features(feat_type)

Get all feature annotations for the requested type.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
102
103
104
105
106
107
def get_features(self, feat_type: str) -> Dict[str, Annotation]:
    """Get all feature annotations for the requested type."""
    try:
        return self.features[feat_type]
    except KeyError as err:
        raise KeyError(f"No such feature type {feat_type}") from err

get_parent(parent_type, child_id)

Returns the parent ID of a given child for a given parent biotype.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
116
117
118
119
120
121
122
123
124
125
126
def get_parent(self, parent_type: str, child_id: str) -> str:
    """Returns the parent ID of a given child for a given parent biotype."""
    try:
        parents = self.parents[parent_type]
    except KeyError as err:
        raise KeyError(f"Unsupported parent type {parent_type}") from err

    parent_id = parents.get(child_id)
    if parent_id is None:
        raise MissingParentError(f"Can't find {parent_type} parent for {child_id}")
    return parent_id

get_xrefs(feature)

Get the xrefs from the Dbxref field.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_xrefs(self, feature: GFFSeqFeature) -> List[Dict[str, Any]]:
    """Get the xrefs from the Dbxref field."""
    all_xref: List[Dict[str, str]] = []

    if "Dbxref" in feature.qualifiers:
        for xref in feature.qualifiers["Dbxref"]:
            dbname, name = xref.split(":", maxsplit=1)
            if dbname == "GenBank" and self.provider_name == "RefSeq":
                dbname = "RefSeq"

            if dbname.lower() in self.ignored_xrefs:
                continue

            xrefs = {"dbname": dbname, "id": name}
            all_xref.append(xrefs)

    # Add RefSeq ID xref if it looks like one
    if self.provider_name == "RefSeq":
        if feature.type == "gene" and feature.id.startswith("LOC"):
            xref_dbs = {x["dbname"] for x in all_xref}
            if "RefSeq" not in xref_dbs:
                all_xref.append({"dbname": "RefSeq", "id": feature.id})

    return all_xref

product_is_informative(product, feat_ids=None) staticmethod

Returns True if the product name contains informative words, False otherwise.

It is considered uninformative when the description contains words such as "hypothetical" or or "putative". If feature IDs are provided, consider it uninformative as well (we do not want descriptions to be just the ID).

Parameters:

Name Type Description Default
product str

A product name.

required
feat_ids Optional[List[str]]

List of feature IDs.

None
Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@staticmethod
def product_is_informative(product: str, feat_ids: Optional[List[str]] = None) -> bool:
    """Returns True if the product name contains informative words, False otherwise.

    It is considered uninformative when the description contains words such as "hypothetical" or
    or "putative". If feature IDs are provided, consider it uninformative as well (we do not want
    descriptions to be just the ID).

    Args:
        product: A product name.
        feat_ids: List of feature IDs.

    """
    non_informative_words = [
        "hypothetical",
        "putative",
        "uncharacterized",
        "unspecified",
        "unknown",
        r"(of )?unknown function",
        "conserved",
        "predicted",
        "fragment",
        "product",
        "function",
        "protein",
        "transcript",
        "gene",
        "RNA",
        r"(variant|isoform)( X?\d+)?",
        r"low quality protein",
    ]
    non_informative_re = re.compile(r"|".join(non_informative_words), re.IGNORECASE)

    # Remove all IDs that are in the description
    if feat_ids:
        logging.debug(f"Filter out {feat_ids} from {product}")
        try:
            for feat_id in feat_ids:
                feat_id_re = re.compile(feat_id, re.IGNORECASE)
                product = re.sub(feat_id_re, "", product)
        except TypeError as err:
            raise TypeError(f"Failed to search {feat_id_re} in '{product}'") from err

    # Remove punctuations
    punct_re = re.compile(r"[,;: _()-]+")
    product = re.sub(punct_re, " ", product)

    # Then remove non informative words
    product = re.sub(non_informative_re, " ", product)

    # Anything (informative) left?
    empty_re = re.compile(r"^[ ]*$")
    return not bool(empty_re.match(product))

store_gene(gene)

Record the functional_annotations of a gene and its children features.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
319
320
321
322
323
324
325
326
327
328
329
def store_gene(self, gene: GFFSeqFeature) -> None:
    """Record the functional_annotations of a gene and its children features."""
    self.add_feature(gene, "gene")

    for transcript in gene.sub_features:
        self.add_feature(transcript, "transcript", gene.id, [gene.id])
        for feat in transcript.sub_features:
            if feat.type == "CDS":
                self.add_feature(feat, "translation", transcript.id, [gene.id, transcript.id])
                # Store CDS functional annotation only once
                break

to_json(out_path)

Print out the current annotation list in a json file.

Parameters:

Name Type Description Default
out_path PathLike

JSON file path where to write the data.

required
Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
308
309
310
311
312
313
314
315
316
317
def to_json(self, out_path: PathLike) -> None:
    """Print out the current annotation list in a json file.

    Args:
        out_path: JSON file path where to write the data.

    """
    self.transfer_descriptions()
    feats_list = self._to_list()
    print_json(Path(out_path), feats_list)

transfer_descriptions()

Transfers the feature descriptions in 2 steps: - from translations to transcripts (if the transcript description is empty) - from transcripts to genes (same case)

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
208
209
210
211
212
213
214
215
def transfer_descriptions(self) -> None:
    """Transfers the feature descriptions in 2 steps:
    - from translations to transcripts (if the transcript description is empty)
    - from transcripts to genes (same case)

    """
    self._transfer_description_up("translation")
    self._transfer_description_up("transcript")

GFFGeneMerger

Specialized class to merge split genes in a GFF3 file, prior to further parsing.

Source code in src/python/ensembl/io/genomio/gff3/gene_merger.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class GFFGeneMerger:
    """Specialized class to merge split genes in a GFF3 file, prior to further parsing."""

    def __init__(self) -> None:
        source = files(ensembl.io.genomio.data.gff3).joinpath("biotypes.json")
        with as_file(source) as biotypes_json:
            self._biotypes = get_json(biotypes_json)

    def merge(self, in_gff_path: PathLike, out_gff_path: PathLike) -> List[str]:
        """
        Merge genes in a gff that are split in multiple lines.

        Args:
            in_gff_path: Input GFF3 that may have split merge.
            out_gff_path: Output GFF3 with those genes merged.

        Returns:
            List of all merged genes, each represented as a string of the GFF3 lines of all their parts.
        """
        to_merge = []
        merged: List[str] = []

        with Path(in_gff_path).open("r") as in_gff_fh, Path(out_gff_path).open("w") as out_gff_fh:
            for line in in_gff_fh:
                # Skip comments
                if line.startswith("#"):
                    if line.startswith("##FASTA"):
                        logging.warning("This GFF3 file contains FASTA sequences")
                        break
                    out_gff_fh.write(line)
                else:
                    # Parse one line
                    line = line.rstrip()
                    fields = line.split("\t")
                    attr_fields = fields[8].split(";")
                    attrs = {}
                    for a in attr_fields:
                        (key, value) = a.split("=")
                        attrs[key] = value

                    # Check this is a gene to merge; cache it then
                    if fields[2] in self._biotypes["gene"]["supported"] and (
                        "part" in attrs or "is_ordered" in attrs
                    ):
                        to_merge.append(fields)

                    # If not, merge previous gene if needed, and print the line
                    else:
                        if to_merge:
                            merged_str = []
                            for line_to_merge in to_merge:
                                merged_str.append("\t".join(line_to_merge))
                            merged.append("\n".join(merged_str) + "\n")

                            new_line = self._merge_genes(to_merge)
                            out_gff_fh.write(new_line)
                            to_merge = []
                        out_gff_fh.write(line + "\n")

            # Print last merged gene if there is one
            if to_merge:
                merged_str = []
                for line_to_merge in to_merge:
                    merged_str.append("\t".join(line_to_merge))
                merged.append("\n".join(merged_str) + "\n")

                new_line = self._merge_genes(to_merge)
                out_gff_fh.write(new_line)

        logging.debug(f"Merged lines: {len(merged)}")
        return merged

    def _merge_genes(self, to_merge: List) -> str:
        """Returns a single gene gff3 line merged from separate parts.

        Args:
            to_merge: List of gff3 lines with gene parts.

        """
        min_start = -1
        max_end = -1
        for gene in to_merge:
            start = int(gene[3])
            end = int(gene[4])

            if start < min_start or min_start < 0:
                min_start = start
            if end > max_end or max_end < 0:
                max_end = end

        # Take the first line as template and replace things
        new_gene = to_merge[0]
        new_gene[3] = str(min_start)
        new_gene[4] = str(max_end)

        attrs = new_gene[8]
        attrs = attrs.replace(";is_ordered=true", "")
        attrs = re.sub(r";part=\d+/\d+", "", attrs)
        new_gene[8] = attrs

        return "\t".join(new_gene) + "\n"

merge(in_gff_path, out_gff_path)

Merge genes in a gff that are split in multiple lines.

Parameters:

Name Type Description Default
in_gff_path PathLike

Input GFF3 that may have split merge.

required
out_gff_path PathLike

Output GFF3 with those genes merged.

required

Returns:

Type Description
List[str]

List of all merged genes, each represented as a string of the GFF3 lines of all their parts.

Source code in src/python/ensembl/io/genomio/gff3/gene_merger.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def merge(self, in_gff_path: PathLike, out_gff_path: PathLike) -> List[str]:
    """
    Merge genes in a gff that are split in multiple lines.

    Args:
        in_gff_path: Input GFF3 that may have split merge.
        out_gff_path: Output GFF3 with those genes merged.

    Returns:
        List of all merged genes, each represented as a string of the GFF3 lines of all their parts.
    """
    to_merge = []
    merged: List[str] = []

    with Path(in_gff_path).open("r") as in_gff_fh, Path(out_gff_path).open("w") as out_gff_fh:
        for line in in_gff_fh:
            # Skip comments
            if line.startswith("#"):
                if line.startswith("##FASTA"):
                    logging.warning("This GFF3 file contains FASTA sequences")
                    break
                out_gff_fh.write(line)
            else:
                # Parse one line
                line = line.rstrip()
                fields = line.split("\t")
                attr_fields = fields[8].split(";")
                attrs = {}
                for a in attr_fields:
                    (key, value) = a.split("=")
                    attrs[key] = value

                # Check this is a gene to merge; cache it then
                if fields[2] in self._biotypes["gene"]["supported"] and (
                    "part" in attrs or "is_ordered" in attrs
                ):
                    to_merge.append(fields)

                # If not, merge previous gene if needed, and print the line
                else:
                    if to_merge:
                        merged_str = []
                        for line_to_merge in to_merge:
                            merged_str.append("\t".join(line_to_merge))
                        merged.append("\n".join(merged_str) + "\n")

                        new_line = self._merge_genes(to_merge)
                        out_gff_fh.write(new_line)
                        to_merge = []
                    out_gff_fh.write(line + "\n")

        # Print last merged gene if there is one
        if to_merge:
            merged_str = []
            for line_to_merge in to_merge:
                merged_str.append("\t".join(line_to_merge))
            merged.append("\n".join(merged_str) + "\n")

            new_line = self._merge_genes(to_merge)
            out_gff_fh.write(new_line)

    logging.debug(f"Merged lines: {len(merged)}")
    return merged

GFFSimplifier

Parse a GGF3 file and output a cleaned up GFF3 + annotation json file.

Raises:

Type Description
GFFParserError

If an error cannot be automatically fixed.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
class GFFSimplifier:
    """Parse a GGF3 file and output a cleaned up GFF3 + annotation json file.

    Raises:
        GFFParserError: If an error cannot be automatically fixed.
    """

    def __init__(
        self,
        genome_path: Optional[PathLike] = None,
        skip_unrecognized: bool = False,
        allow_pseudogene_with_cds: bool = False,
    ):
        """Create an object that simplifies `SeqFeature` objects.

        Args:
            genome_path: Genome metadata file.
            skip_unrecognized: Do not include unknown biotypes instead of raising an exception.
            allow_pseudogene_with_cds: Keep CDSs under pseudogenes that have them. Delete them otherwise.

        Raises:
            GFFParserError: If a biotype is unknown and `skip_unrecognized` is False.
        """
        self.skip_unrecognized = skip_unrecognized
        self.allow_pseudogene_with_cds = allow_pseudogene_with_cds

        # Load biotypes
        source = files(ensembl.io.genomio.data.gff3).joinpath("biotypes.json")
        with as_file(source) as biotypes_json:
            self._biotypes = get_json(biotypes_json)

        # Load genome metadata
        self.genome = {}
        if genome_path:
            with Path(genome_path).open("r") as genome_fh:
                self.genome = json.load(genome_fh)

        self.refseq = False
        if self.genome and self.genome["assembly"]["accession"].startswith("GCF"):
            self.refseq = True

        # Other preparations
        self.stable_ids = StableIDAllocator()
        self.stable_ids.set_prefix(self.genome)
        self.exclude_seq_regions: List[str] = []
        self.fail_types: Set = set()

        # Init the actual data we will store
        self.records = Records()
        self.annotations = FunctionalAnnotations(self.get_provider_name())

    def get_provider_name(self) -> str:
        """Returns the provider name for this genome.

        If this information is not available, will try to infer it from the assembly accession. Will
        return "GenBank" otherwise.
        """
        provider_name = "GenBank"
        if self.genome:
            try:
                provider_name = self.genome["assembly"]["provider_name"]
            except KeyError:
                if self.genome["assembly"]["accession"].startswith("GCF"):
                    provider_name = "RefSeq"
        else:
            logging.warning(f"No genome file, using the default provider_name: {provider_name}")
        return provider_name

    def simpler_gff3(self, in_gff_path: PathLike) -> None:
        """Loads a GFF3 from INSDC and rewrites it in a simpler version, whilst also writing a
        functional annotation file.
        """
        self.records.from_gff(in_gff_path, self.exclude_seq_regions)
        for record in self.records:
            cleaned_features = []
            for feature in record.features:
                split_genes = self.normalize_mirna(feature)
                if split_genes:
                    cleaned_features += split_genes
                else:
                    try:
                        clean_feature = self.simpler_gff3_feature(feature)
                        cleaned_features.append(clean_feature)
                    except (UnsupportedFeatureError, IgnoredFeatureError) as err:
                        logging.debug(err.message)
            record.features = cleaned_features

        if self.fail_types:
            fail_errors = "\n   ".join(list(self.fail_types))
            logging.warning(f"Unrecognized types found:\n   {fail_errors}")
            if not self.skip_unrecognized:
                raise GFFParserError("Unrecognized types found, abort")

    def simpler_gff3_feature(self, gene: GFFSeqFeature) -> GFFSeqFeature:
        """Creates a simpler version of a GFF3 feature.

        Raises:
            IgnoredFeatureError: If the feature type is ignored.
            UnsupportedFeatureError: If the feature type is not supported.
        """
        # Special cases
        non_gene = self.normalize_non_gene(gene)
        if non_gene:
            return non_gene
        if gene.type in self._biotypes["gene"]["ignored"]:
            raise IgnoredFeatureError(f"Ignored type {gene.type} for {gene.id}")

        # Synonym
        if gene.type == "protein_coding_gene":
            gene.type = "gene"

        # Lone sub-gene features, create a gene
        gene = self.create_gene_for_lone_transcript(gene)
        gene = self.create_gene_for_lone_cds(gene)

        # What to do with unsupported gene types
        if gene.type not in self._biotypes["gene"]["supported"]:
            self.fail_types.add(f"gene={gene.type}")
            raise UnsupportedFeatureError(f"Unsupported type {gene.type} for {gene.id}")

        # Normalize and store
        gene = self.normalize_gene(gene)
        self.annotations.store_gene(gene)
        return self.clean_gene(gene)

    def create_gene_for_lone_transcript(self, feat: GFFSeqFeature) -> GFFSeqFeature:
        """Returns a gene for lone transcripts: 'gene' for tRNA/rRNA/mRNA, and 'ncRNA_gene' for all others.

        Args:
            feat: The transcript for which we want to create a gene.
        """
        transcript_types = self._biotypes["transcript"]["supported"]
        if feat.type not in transcript_types:
            return feat

        new_type = "ncRNA_gene"
        if feat.type in ("tRNA", "rRNA", "mRNA"):
            new_type = "gene"
        logging.debug(f"Put the transcript {feat.type} in a {new_type} parent feature")
        new_gene = GFFSeqFeature(feat.location, type=new_type)
        new_gene.qualifiers["source"] = feat.qualifiers["source"]
        new_gene.sub_features = [feat]

        # Use the transcript ID for the gene, and generate a sub ID for the transcript
        new_gene.id = feat.id
        new_gene.qualifiers["ID"] = new_gene.id
        feat.id = self.stable_ids.generate_transcript_id(new_gene.id, 1)
        feat.qualifiers["ID"] = feat.id

        # Remove the exon/CDS parent so it is properly updated
        for subfeat in feat.sub_features:
            del subfeat.qualifiers["Parent"]

        # Check if it's a pseudogene
        if feat.type == "mRNA":
            is_pseudo = False
            for subfeat in feat.sub_features:
                pseudo_qual = subfeat.qualifiers.get("pseudo", [""])[0]
                if subfeat.type == "CDS" and pseudo_qual == "true":
                    is_pseudo = True
                    del subfeat.qualifiers["pseudo"]
                    break
            if is_pseudo:
                new_gene.type = "pseudogene"

        return new_gene

    def create_gene_for_lone_cds(self, feat: GFFSeqFeature) -> GFFSeqFeature:
        """Returns a gene created for a lone CDS.

        Args:
            feat: The CDS for which we want to create a gene.
        """
        if feat.type != "CDS":
            return feat

        logging.debug(f"Put the lone CDS in gene-mRNA parent features for {feat.id}")

        # Create a transcript, add the CDS
        transcript = GFFSeqFeature(feat.location, type="mRNA")
        transcript.qualifiers["source"] = feat.qualifiers["source"]
        transcript.sub_features = [feat]

        # Add an exon too
        exon = GFFSeqFeature(feat.location, type="exon")
        exon.qualifiers["source"] = feat.qualifiers["source"]
        transcript.sub_features.append(exon)

        # Create a gene, add the transcript
        gene_type = "gene"
        if ("pseudo" in feat.qualifiers) and (feat.qualifiers["pseudo"][0] == "true"):
            gene_type = "pseudogene"
            del feat.qualifiers["pseudo"]
        new_gene = GFFSeqFeature(feat.location, type=gene_type)
        new_gene.qualifiers["source"] = feat.qualifiers["source"]
        new_gene.sub_features = [transcript]
        new_gene.id = self.stable_ids.generate_gene_id()
        new_gene.qualifiers["ID"] = new_gene.id
        transcript.id = self.stable_ids.generate_transcript_id(new_gene.id, 1)
        transcript.qualifiers["ID"] = transcript.id

        return new_gene

    def normalize_non_gene(self, feat: GFFSeqFeature) -> Optional[GFFSeqFeature]:
        """Returns a normalised "non-gene" or `None` if not applicable.

        Only transposable elements supported at the moment.

        Args:
            feat: Feature to normalise.

        Raises:
            NotImplementedError: If the feature is a not supported non-gene.
        """

        if feat.type not in self._biotypes["non_gene"]["supported"]:
            return None
        if feat.type in ("mobile_genetic_element", "transposable_element"):
            feat.type = "transposable_element"
            feat = self._normalize_mobile_genetic_element(feat)
            # Generate ID if needed
            feat.id = self.stable_ids.normalize_gene_id(feat, self.refseq)
            feat.qualifiers["ID"] = feat.id

            self.annotations.add_feature(feat, "transposable_element")
            return self.clean_gene(feat)
        # This is a failsafe in case you add supported non-genes
        raise NotImplementedError(f"Unsupported non-gene: {feat.type} for {feat.id}")

    def _normalize_mobile_genetic_element(self, feat: GFFSeqFeature) -> GFFSeqFeature:
        """Normalize a mobile element if it has a mobile_element_type field."""
        try:
            mobile_element_type = feat.qualifiers["mobile_element_type"]
        except KeyError:
            logging.warning("No 'mobile_element_type' tag found")
            return feat

        # Get the type (and name) from the attrib
        element_type, _, element_name = mobile_element_type[0].partition(":")
        description = element_type
        if element_name:
            description += f" ({element_name})"

        # Keep the metadata in the description if the type is known
        if element_type in ("transposon", "retrotransposon"):
            if not feat.qualifiers.get("product"):
                feat.qualifiers["product"] = [description]
            return feat
        raise GFFParserError(f"'mobile_element_type' is not a transposon: {element_type}")

    def clean_gene(self, gene: GFFSeqFeature) -> GFFSeqFeature:
        """Return the same gene without qualifiers unrelated to the gene structure."""

        old_gene_qualifiers = gene.qualifiers
        gene.qualifiers = {"ID": gene.id, "source": old_gene_qualifiers["source"]}
        for transcript in gene.sub_features:
            # Replace qualifiers
            old_transcript_qualifiers = transcript.qualifiers
            transcript.qualifiers = {
                "ID": transcript.id,
                "Parent": gene.id,
                "source": old_transcript_qualifiers["source"],
            }

            for feat in transcript.sub_features:
                old_qualifiers = feat.qualifiers
                feat.qualifiers = {
                    "ID": feat.id,
                    "Parent": transcript.id,
                    "source": old_qualifiers["source"],
                }
                if feat.type == "CDS":
                    feat.qualifiers["phase"] = old_qualifiers["phase"]

        return gene

    def normalize_gene(self, gene: GFFSeqFeature) -> GFFSeqFeature:
        """Returns a normalized gene structure, separate from the functional elements.

        Args:
            gene: Gene object to normalize.
            functional_annotation: List of feature annotations (appended by this method).

        """

        gene.id = self.stable_ids.normalize_gene_id(gene, refseq=self.refseq)
        restructure_gene(gene)
        self.normalize_transcripts(gene)
        self.normalize_pseudogene(gene)

        return gene

    def normalize_pseudogene(self, gene: GFFSeqFeature) -> None:
        """Normalize CDSs if allowed, otherwise remove them."""
        if gene.type != "pseudogene":
            return

        if self.allow_pseudogene_with_cds:
            self.stable_ids.normalize_pseudogene_cds_id(gene)
        else:
            remove_cds_from_pseudogene(gene)

    def normalize_transcripts(self, gene: GFFSeqFeature) -> None:
        """Normalizes a transcript."""

        allowed_transcript_types = self._biotypes["transcript"]["supported"]
        ignored_transcript_types = self._biotypes["transcript"]["ignored"]

        transcripts_to_delete = []
        for count, transcript in enumerate(gene.sub_features):
            if (
                transcript.type not in allowed_transcript_types
                and transcript.type not in ignored_transcript_types
            ):
                self.fail_types.add(f"transcript={transcript.type}")
                logging.warning(
                    f"Unrecognized transcript type: {transcript.type}" f" for {transcript.id} ({gene.id})"
                )
                transcripts_to_delete.append(count)
                continue

            # New transcript ID
            transcript_number = count + 1
            transcript.id = self.stable_ids.generate_transcript_id(gene.id, transcript_number)

            transcript = self.format_gene_segments(transcript)

            # EXONS AND CDS
            transcript = self._normalize_transcript_subfeatures(gene, transcript)

        if transcripts_to_delete:
            for elt in sorted(transcripts_to_delete, reverse=True):
                gene.sub_features.pop(elt)

    def format_gene_segments(self, transcript: GFFSeqFeature) -> GFFSeqFeature:
        """Returns the equivalent Ensembl biotype feature for gene segment transcript features.

        Supported features: "C_gene_segment" and "V_gene_segment".

        Args:
            transcript: Gene segment transcript feature.

        Raises:
            GeneSegmentError: Unable to get the segment type information from the feature.
        """
        if transcript.type not in ("C_gene_segment", "V_gene_segment"):
            return transcript

        # Guess the segment type from the transcript attribs
        seg_type = self._get_segment_type(transcript)
        if not seg_type:
            # Get the information from a CDS instead
            sub_feats: List[GFFSeqFeature] = transcript.sub_features
            cdss: List[GFFSeqFeature] = list(filter(lambda x: x.type == "CDS", sub_feats))
            if cdss:
                seg_type = self._get_segment_type(cdss[0])
            if not seg_type:
                raise GeneSegmentError(f"Unable to infer segment from {transcript.id}")

        # Change V/C_gene_segment into a its corresponding transcript names
        transcript.type = f"{seg_type}_{transcript.type.replace('_segment', '')}"
        return transcript

    def _get_segment_type(self, feature: GFFSeqFeature) -> str:
        """Infer if a segment is "IG" (immunoglobulin) of "TR" (t-cell) from the feature attribs.

        Returns an empty string if no segment type info was found.
        """

        product = feature.qualifiers.get("standard_name", [""])[0]
        if not product:
            product = feature.qualifiers.get("product", [""])[0]
        if not product:
            return ""

        if re.search(r"\b(immunoglobulin|ig)\b", product, flags=re.IGNORECASE):
            return "IG"
        if re.search(r"\bt[- _]cell\b", product, flags=re.IGNORECASE):
            return "TR"
        return ""

    def _normalize_transcript_subfeatures(
        self, gene: GFFSeqFeature, transcript: GFFSeqFeature
    ) -> GFFSeqFeature:
        """Returns a transcript with normalized sub-features."""
        exons_to_delete = []
        exon_number = 1
        for tcount, feat in enumerate(transcript.sub_features):
            if feat.type == "exon":
                # New exon ID
                feat.id = f"{transcript.id}-E{exon_number}"
                exon_number += 1
                # Replace qualifiers
                old_exon_qualifiers = feat.qualifiers
                feat.qualifiers = {"Parent": transcript.id}
                feat.qualifiers["source"] = old_exon_qualifiers["source"]
            elif feat.type == "CDS":
                # New CDS ID
                feat.id = self.stable_ids.normalize_cds_id(feat.id)
                if feat.id in ("", gene.id, transcript.id):
                    feat.id = f"{transcript.id}_cds"
            else:
                if feat.type in self._biotypes["transcript"]["ignored"]:
                    exons_to_delete.append(tcount)
                    continue

                self.fail_types.add(f"sub_transcript={feat.type}")
                logging.warning(
                    f"Unrecognized exon type for {feat.type}: {feat.id}"
                    f" (for transcript {transcript.id} of type {transcript.type})"
                )
                exons_to_delete.append(tcount)
                continue

        if exons_to_delete:
            for elt in sorted(exons_to_delete, reverse=True):
                transcript.sub_features.pop(elt)
        return transcript

    def normalize_mirna(self, gene: GFFSeqFeature) -> List[GFFSeqFeature]:
        """Returns gene representations from a miRNA gene that can be loaded in an Ensembl database.

        Change the representation from the form `gene[ primary_transcript[ exon, miRNA[ exon ] ] ]`
        to `ncRNA_gene[ miRNA_primary_transcript[ exon ] ]` and `gene[ miRNA[ exon ] ]`

        Raises:
            GFFParserError: If gene has more than 1 transcript, the transcript was not formatted
                correctly or there are unknown sub-features.
        """
        base_id = gene.id
        transcripts = gene.sub_features

        # Insert main gene first if needed
        old_gene = gene
        if gene.type == "primary_transcript":
            primary = old_gene
            gene = GFFSeqFeature(primary.location, type="gene")
            gene.sub_features = [primary]
            gene.qualifiers = primary.qualifiers
            transcripts = gene.sub_features
            gene.id = f"{base_id}_0"
            gene.qualifiers["ID"] = gene.id

        if (len(transcripts) == 0) or (transcripts[0].type != "primary_transcript"):
            return []
        if len(transcripts) > 1:
            raise GFFParserError(f"Gene has too many sub_features for miRNA {gene.id}")

        # Passed the checks
        primary = transcripts[0]
        primary.type = "miRNA_primary_transcript"
        gene.type = "ncRNA_gene"
        logging.debug(f"Formatting miRNA gene {gene.id}")

        new_genes = []
        new_primary_subfeatures = []
        num = 1
        for sub in primary.sub_features:
            if sub.type == "exon":
                new_primary_subfeatures.append(sub)
            elif sub.type == "miRNA":
                new_gene_id = f"{base_id}_{num}"
                num += 1
                new_gene = GFFSeqFeature(sub.location, type="gene", id=new_gene_id)
                new_gene.qualifiers = {"source": sub.qualifiers["source"], "ID": new_gene_id}
                new_gene.sub_features = [sub]
                new_genes.append(new_gene)
            else:
                raise GFFParserError(f"Unknown subtypes for miRNA features: {sub.id}")
        primary.sub_features = new_primary_subfeatures

        if not new_genes:
            logging.debug(f"Primary_transcript without miRNA in {gene.id}")
            all_genes = [gene]
        else:
            all_genes = [gene] + new_genes

        # Normalize like other genes
        all_genes_cleaned = []
        for new_gene in all_genes:
            new_gene = self.normalize_gene(new_gene)
            self.annotations.store_gene(new_gene)
            all_genes_cleaned.append(self.clean_gene(new_gene))
        return all_genes_cleaned

allow_pseudogene_with_cds = allow_pseudogene_with_cds instance-attribute

annotations = FunctionalAnnotations(self.get_provider_name()) instance-attribute

exclude_seq_regions = [] instance-attribute

fail_types = set() instance-attribute

genome = json.load(genome_fh) instance-attribute

records = Records() instance-attribute

refseq = False instance-attribute

skip_unrecognized = skip_unrecognized instance-attribute

stable_ids = StableIDAllocator() instance-attribute

clean_gene(gene)

Return the same gene without qualifiers unrelated to the gene structure.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def clean_gene(self, gene: GFFSeqFeature) -> GFFSeqFeature:
    """Return the same gene without qualifiers unrelated to the gene structure."""

    old_gene_qualifiers = gene.qualifiers
    gene.qualifiers = {"ID": gene.id, "source": old_gene_qualifiers["source"]}
    for transcript in gene.sub_features:
        # Replace qualifiers
        old_transcript_qualifiers = transcript.qualifiers
        transcript.qualifiers = {
            "ID": transcript.id,
            "Parent": gene.id,
            "source": old_transcript_qualifiers["source"],
        }

        for feat in transcript.sub_features:
            old_qualifiers = feat.qualifiers
            feat.qualifiers = {
                "ID": feat.id,
                "Parent": transcript.id,
                "source": old_qualifiers["source"],
            }
            if feat.type == "CDS":
                feat.qualifiers["phase"] = old_qualifiers["phase"]

    return gene

create_gene_for_lone_cds(feat)

Returns a gene created for a lone CDS.

Parameters:

Name Type Description Default
feat GFFSeqFeature

The CDS for which we want to create a gene.

required
Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def create_gene_for_lone_cds(self, feat: GFFSeqFeature) -> GFFSeqFeature:
    """Returns a gene created for a lone CDS.

    Args:
        feat: The CDS for which we want to create a gene.
    """
    if feat.type != "CDS":
        return feat

    logging.debug(f"Put the lone CDS in gene-mRNA parent features for {feat.id}")

    # Create a transcript, add the CDS
    transcript = GFFSeqFeature(feat.location, type="mRNA")
    transcript.qualifiers["source"] = feat.qualifiers["source"]
    transcript.sub_features = [feat]

    # Add an exon too
    exon = GFFSeqFeature(feat.location, type="exon")
    exon.qualifiers["source"] = feat.qualifiers["source"]
    transcript.sub_features.append(exon)

    # Create a gene, add the transcript
    gene_type = "gene"
    if ("pseudo" in feat.qualifiers) and (feat.qualifiers["pseudo"][0] == "true"):
        gene_type = "pseudogene"
        del feat.qualifiers["pseudo"]
    new_gene = GFFSeqFeature(feat.location, type=gene_type)
    new_gene.qualifiers["source"] = feat.qualifiers["source"]
    new_gene.sub_features = [transcript]
    new_gene.id = self.stable_ids.generate_gene_id()
    new_gene.qualifiers["ID"] = new_gene.id
    transcript.id = self.stable_ids.generate_transcript_id(new_gene.id, 1)
    transcript.qualifiers["ID"] = transcript.id

    return new_gene

create_gene_for_lone_transcript(feat)

Returns a gene for lone transcripts: 'gene' for tRNA/rRNA/mRNA, and 'ncRNA_gene' for all others.

Parameters:

Name Type Description Default
feat GFFSeqFeature

The transcript for which we want to create a gene.

required
Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def create_gene_for_lone_transcript(self, feat: GFFSeqFeature) -> GFFSeqFeature:
    """Returns a gene for lone transcripts: 'gene' for tRNA/rRNA/mRNA, and 'ncRNA_gene' for all others.

    Args:
        feat: The transcript for which we want to create a gene.
    """
    transcript_types = self._biotypes["transcript"]["supported"]
    if feat.type not in transcript_types:
        return feat

    new_type = "ncRNA_gene"
    if feat.type in ("tRNA", "rRNA", "mRNA"):
        new_type = "gene"
    logging.debug(f"Put the transcript {feat.type} in a {new_type} parent feature")
    new_gene = GFFSeqFeature(feat.location, type=new_type)
    new_gene.qualifiers["source"] = feat.qualifiers["source"]
    new_gene.sub_features = [feat]

    # Use the transcript ID for the gene, and generate a sub ID for the transcript
    new_gene.id = feat.id
    new_gene.qualifiers["ID"] = new_gene.id
    feat.id = self.stable_ids.generate_transcript_id(new_gene.id, 1)
    feat.qualifiers["ID"] = feat.id

    # Remove the exon/CDS parent so it is properly updated
    for subfeat in feat.sub_features:
        del subfeat.qualifiers["Parent"]

    # Check if it's a pseudogene
    if feat.type == "mRNA":
        is_pseudo = False
        for subfeat in feat.sub_features:
            pseudo_qual = subfeat.qualifiers.get("pseudo", [""])[0]
            if subfeat.type == "CDS" and pseudo_qual == "true":
                is_pseudo = True
                del subfeat.qualifiers["pseudo"]
                break
        if is_pseudo:
            new_gene.type = "pseudogene"

    return new_gene

format_gene_segments(transcript)

Returns the equivalent Ensembl biotype feature for gene segment transcript features.

Supported features: "C_gene_segment" and "V_gene_segment".

Parameters:

Name Type Description Default
transcript GFFSeqFeature

Gene segment transcript feature.

required

Raises:

Type Description
GeneSegmentError

Unable to get the segment type information from the feature.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def format_gene_segments(self, transcript: GFFSeqFeature) -> GFFSeqFeature:
    """Returns the equivalent Ensembl biotype feature for gene segment transcript features.

    Supported features: "C_gene_segment" and "V_gene_segment".

    Args:
        transcript: Gene segment transcript feature.

    Raises:
        GeneSegmentError: Unable to get the segment type information from the feature.
    """
    if transcript.type not in ("C_gene_segment", "V_gene_segment"):
        return transcript

    # Guess the segment type from the transcript attribs
    seg_type = self._get_segment_type(transcript)
    if not seg_type:
        # Get the information from a CDS instead
        sub_feats: List[GFFSeqFeature] = transcript.sub_features
        cdss: List[GFFSeqFeature] = list(filter(lambda x: x.type == "CDS", sub_feats))
        if cdss:
            seg_type = self._get_segment_type(cdss[0])
        if not seg_type:
            raise GeneSegmentError(f"Unable to infer segment from {transcript.id}")

    # Change V/C_gene_segment into a its corresponding transcript names
    transcript.type = f"{seg_type}_{transcript.type.replace('_segment', '')}"
    return transcript

get_provider_name()

Returns the provider name for this genome.

If this information is not available, will try to infer it from the assembly accession. Will return "GenBank" otherwise.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_provider_name(self) -> str:
    """Returns the provider name for this genome.

    If this information is not available, will try to infer it from the assembly accession. Will
    return "GenBank" otherwise.
    """
    provider_name = "GenBank"
    if self.genome:
        try:
            provider_name = self.genome["assembly"]["provider_name"]
        except KeyError:
            if self.genome["assembly"]["accession"].startswith("GCF"):
                provider_name = "RefSeq"
    else:
        logging.warning(f"No genome file, using the default provider_name: {provider_name}")
    return provider_name

normalize_gene(gene)

Returns a normalized gene structure, separate from the functional elements.

Parameters:

Name Type Description Default
gene GFFSeqFeature

Gene object to normalize.

required
functional_annotation

List of feature annotations (appended by this method).

required
Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def normalize_gene(self, gene: GFFSeqFeature) -> GFFSeqFeature:
    """Returns a normalized gene structure, separate from the functional elements.

    Args:
        gene: Gene object to normalize.
        functional_annotation: List of feature annotations (appended by this method).

    """

    gene.id = self.stable_ids.normalize_gene_id(gene, refseq=self.refseq)
    restructure_gene(gene)
    self.normalize_transcripts(gene)
    self.normalize_pseudogene(gene)

    return gene

normalize_mirna(gene)

Returns gene representations from a miRNA gene that can be loaded in an Ensembl database.

Change the representation from the form gene[ primary_transcript[ exon, miRNA[ exon ] ] ] to ncRNA_gene[ miRNA_primary_transcript[ exon ] ] and gene[ miRNA[ exon ] ]

Raises:

Type Description
GFFParserError

If gene has more than 1 transcript, the transcript was not formatted correctly or there are unknown sub-features.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
def normalize_mirna(self, gene: GFFSeqFeature) -> List[GFFSeqFeature]:
    """Returns gene representations from a miRNA gene that can be loaded in an Ensembl database.

    Change the representation from the form `gene[ primary_transcript[ exon, miRNA[ exon ] ] ]`
    to `ncRNA_gene[ miRNA_primary_transcript[ exon ] ]` and `gene[ miRNA[ exon ] ]`

    Raises:
        GFFParserError: If gene has more than 1 transcript, the transcript was not formatted
            correctly or there are unknown sub-features.
    """
    base_id = gene.id
    transcripts = gene.sub_features

    # Insert main gene first if needed
    old_gene = gene
    if gene.type == "primary_transcript":
        primary = old_gene
        gene = GFFSeqFeature(primary.location, type="gene")
        gene.sub_features = [primary]
        gene.qualifiers = primary.qualifiers
        transcripts = gene.sub_features
        gene.id = f"{base_id}_0"
        gene.qualifiers["ID"] = gene.id

    if (len(transcripts) == 0) or (transcripts[0].type != "primary_transcript"):
        return []
    if len(transcripts) > 1:
        raise GFFParserError(f"Gene has too many sub_features for miRNA {gene.id}")

    # Passed the checks
    primary = transcripts[0]
    primary.type = "miRNA_primary_transcript"
    gene.type = "ncRNA_gene"
    logging.debug(f"Formatting miRNA gene {gene.id}")

    new_genes = []
    new_primary_subfeatures = []
    num = 1
    for sub in primary.sub_features:
        if sub.type == "exon":
            new_primary_subfeatures.append(sub)
        elif sub.type == "miRNA":
            new_gene_id = f"{base_id}_{num}"
            num += 1
            new_gene = GFFSeqFeature(sub.location, type="gene", id=new_gene_id)
            new_gene.qualifiers = {"source": sub.qualifiers["source"], "ID": new_gene_id}
            new_gene.sub_features = [sub]
            new_genes.append(new_gene)
        else:
            raise GFFParserError(f"Unknown subtypes for miRNA features: {sub.id}")
    primary.sub_features = new_primary_subfeatures

    if not new_genes:
        logging.debug(f"Primary_transcript without miRNA in {gene.id}")
        all_genes = [gene]
    else:
        all_genes = [gene] + new_genes

    # Normalize like other genes
    all_genes_cleaned = []
    for new_gene in all_genes:
        new_gene = self.normalize_gene(new_gene)
        self.annotations.store_gene(new_gene)
        all_genes_cleaned.append(self.clean_gene(new_gene))
    return all_genes_cleaned

normalize_non_gene(feat)

Returns a normalised "non-gene" or None if not applicable.

Only transposable elements supported at the moment.

Parameters:

Name Type Description Default
feat GFFSeqFeature

Feature to normalise.

required

Raises:

Type Description
NotImplementedError

If the feature is a not supported non-gene.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def normalize_non_gene(self, feat: GFFSeqFeature) -> Optional[GFFSeqFeature]:
    """Returns a normalised "non-gene" or `None` if not applicable.

    Only transposable elements supported at the moment.

    Args:
        feat: Feature to normalise.

    Raises:
        NotImplementedError: If the feature is a not supported non-gene.
    """

    if feat.type not in self._biotypes["non_gene"]["supported"]:
        return None
    if feat.type in ("mobile_genetic_element", "transposable_element"):
        feat.type = "transposable_element"
        feat = self._normalize_mobile_genetic_element(feat)
        # Generate ID if needed
        feat.id = self.stable_ids.normalize_gene_id(feat, self.refseq)
        feat.qualifiers["ID"] = feat.id

        self.annotations.add_feature(feat, "transposable_element")
        return self.clean_gene(feat)
    # This is a failsafe in case you add supported non-genes
    raise NotImplementedError(f"Unsupported non-gene: {feat.type} for {feat.id}")

normalize_pseudogene(gene)

Normalize CDSs if allowed, otherwise remove them.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
367
368
369
370
371
372
373
374
375
def normalize_pseudogene(self, gene: GFFSeqFeature) -> None:
    """Normalize CDSs if allowed, otherwise remove them."""
    if gene.type != "pseudogene":
        return

    if self.allow_pseudogene_with_cds:
        self.stable_ids.normalize_pseudogene_cds_id(gene)
    else:
        remove_cds_from_pseudogene(gene)

normalize_transcripts(gene)

Normalizes a transcript.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def normalize_transcripts(self, gene: GFFSeqFeature) -> None:
    """Normalizes a transcript."""

    allowed_transcript_types = self._biotypes["transcript"]["supported"]
    ignored_transcript_types = self._biotypes["transcript"]["ignored"]

    transcripts_to_delete = []
    for count, transcript in enumerate(gene.sub_features):
        if (
            transcript.type not in allowed_transcript_types
            and transcript.type not in ignored_transcript_types
        ):
            self.fail_types.add(f"transcript={transcript.type}")
            logging.warning(
                f"Unrecognized transcript type: {transcript.type}" f" for {transcript.id} ({gene.id})"
            )
            transcripts_to_delete.append(count)
            continue

        # New transcript ID
        transcript_number = count + 1
        transcript.id = self.stable_ids.generate_transcript_id(gene.id, transcript_number)

        transcript = self.format_gene_segments(transcript)

        # EXONS AND CDS
        transcript = self._normalize_transcript_subfeatures(gene, transcript)

    if transcripts_to_delete:
        for elt in sorted(transcripts_to_delete, reverse=True):
            gene.sub_features.pop(elt)

simpler_gff3(in_gff_path)

Loads a GFF3 from INSDC and rewrites it in a simpler version, whilst also writing a functional annotation file.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def simpler_gff3(self, in_gff_path: PathLike) -> None:
    """Loads a GFF3 from INSDC and rewrites it in a simpler version, whilst also writing a
    functional annotation file.
    """
    self.records.from_gff(in_gff_path, self.exclude_seq_regions)
    for record in self.records:
        cleaned_features = []
        for feature in record.features:
            split_genes = self.normalize_mirna(feature)
            if split_genes:
                cleaned_features += split_genes
            else:
                try:
                    clean_feature = self.simpler_gff3_feature(feature)
                    cleaned_features.append(clean_feature)
                except (UnsupportedFeatureError, IgnoredFeatureError) as err:
                    logging.debug(err.message)
        record.features = cleaned_features

    if self.fail_types:
        fail_errors = "\n   ".join(list(self.fail_types))
        logging.warning(f"Unrecognized types found:\n   {fail_errors}")
        if not self.skip_unrecognized:
            raise GFFParserError("Unrecognized types found, abort")

simpler_gff3_feature(gene)

Creates a simpler version of a GFF3 feature.

Raises:

Type Description
IgnoredFeatureError

If the feature type is ignored.

UnsupportedFeatureError

If the feature type is not supported.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def simpler_gff3_feature(self, gene: GFFSeqFeature) -> GFFSeqFeature:
    """Creates a simpler version of a GFF3 feature.

    Raises:
        IgnoredFeatureError: If the feature type is ignored.
        UnsupportedFeatureError: If the feature type is not supported.
    """
    # Special cases
    non_gene = self.normalize_non_gene(gene)
    if non_gene:
        return non_gene
    if gene.type in self._biotypes["gene"]["ignored"]:
        raise IgnoredFeatureError(f"Ignored type {gene.type} for {gene.id}")

    # Synonym
    if gene.type == "protein_coding_gene":
        gene.type = "gene"

    # Lone sub-gene features, create a gene
    gene = self.create_gene_for_lone_transcript(gene)
    gene = self.create_gene_for_lone_cds(gene)

    # What to do with unsupported gene types
    if gene.type not in self._biotypes["gene"]["supported"]:
        self.fail_types.add(f"gene={gene.type}")
        raise UnsupportedFeatureError(f"Unsupported type {gene.type} for {gene.id}")

    # Normalize and store
    gene = self.normalize_gene(gene)
    self.annotations.store_gene(gene)
    return self.clean_gene(gene)

InvalidStableID

Bases: ValueError

Raised when there is a problem with an stable ID.

Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
27
28
class InvalidStableID(ValueError):
    """Raised when there is a problem with an stable ID."""

MissingParentError

Bases: Exception

Trying to add a feature without an expected parent.

Source code in src/python/ensembl/io/genomio/gff3/extract_annotation.py
47
48
class MissingParentError(Exception):
    """Trying to add a feature without an expected parent."""

Records

Bases: list

List of GFF3 SeqRecords.

Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Records(list):
    """List of GFF3 SeqRecords."""

    def from_gff(self, in_gff_path: PathLike, excluded: Optional[List[str]] = None) -> None:
        """Loads records from a GFF3 file.

        Args:
            in_gff_path: Input GFF3 file path.
            excluded: Record IDs to not load from the GFF3 file.
        """
        if excluded is None:
            excluded = []
        with Path(in_gff_path).open("r") as in_gff_fh:
            for record in GFF.parse(in_gff_fh):
                if record.id in excluded:
                    logging.debug(f"Skip seq_region {record.id} - in exclusion list")
                    continue
                clean_record = SeqRecord(record.seq, id=record.id)
                clean_record.features = record.features
                self.append(clean_record)

    def to_gff(self, out_gff_path: PathLike) -> None:
        """Writes the current list of records in a GFF3 file.

        Args:
            out_gff_path: Path to GFF3 file where to write the records.
        """
        with Path(out_gff_path).open("w") as out_gff_fh:
            GFF.write(self, out_gff_fh)

from_gff(in_gff_path, excluded=None)

Loads records from a GFF3 file.

Parameters:

Name Type Description Default
in_gff_path PathLike

Input GFF3 file path.

required
excluded Optional[List[str]]

Record IDs to not load from the GFF3 file.

None
Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def from_gff(self, in_gff_path: PathLike, excluded: Optional[List[str]] = None) -> None:
    """Loads records from a GFF3 file.

    Args:
        in_gff_path: Input GFF3 file path.
        excluded: Record IDs to not load from the GFF3 file.
    """
    if excluded is None:
        excluded = []
    with Path(in_gff_path).open("r") as in_gff_fh:
        for record in GFF.parse(in_gff_fh):
            if record.id in excluded:
                logging.debug(f"Skip seq_region {record.id} - in exclusion list")
                continue
            clean_record = SeqRecord(record.seq, id=record.id)
            clean_record.features = record.features
            self.append(clean_record)

to_gff(out_gff_path)

Writes the current list of records in a GFF3 file.

Parameters:

Name Type Description Default
out_gff_path PathLike

Path to GFF3 file where to write the records.

required
Source code in src/python/ensembl/io/genomio/gff3/simplifier.py
65
66
67
68
69
70
71
72
def to_gff(self, out_gff_path: PathLike) -> None:
    """Writes the current list of records in a GFF3 file.

    Args:
        out_gff_path: Path to GFF3 file where to write the records.
    """
    with Path(out_gff_path).open("w") as out_gff_fh:
        GFF.write(self, out_gff_fh)

StableIDAllocator dataclass

Set of tools to check and allocate stable IDs.

Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@dataclass
class StableIDAllocator:
    """Set of tools to check and allocate stable IDs."""

    # Multiple parameters to automate various fixes
    skip_gene_id_validation: bool = False
    min_id_length: int = 7
    current_id_number: int = 0
    make_missing_stable_ids: bool = True
    prefix: str = "TMP_"
    _loaded_ids: Set = field(default_factory=set)

    def set_prefix(self, genome: Dict) -> None:
        """Sets the ID prefix using the organism abbrev if it exists in the genome metadata."""
        try:
            org = genome["BRC4"]["organism_abbrev"]
        except KeyError:
            prefix = "TMP_PREFIX_"
        else:
            prefix = "TMP_" + org + "_"
        self.prefix = prefix

    def generate_gene_id(self) -> str:
        """Returns a new unique gene stable_id with a prefix.

        The ID is made up of a prefix and a number, which is auto incremented.

        """
        self.current_id_number += 1
        new_id = f"{self.prefix}{self.current_id_number}"
        return new_id

    def is_valid(self, stable_id: str) -> bool:
        """Checks that the format of a stable ID is valid.
        Args:
            stable_id: Stable ID to validate.
        """

        if self.skip_gene_id_validation:
            logging.debug(f"Validation deactivated by user: '{stable_id}' not checked")
            return True

        # Trna (from tRNAscan)
        if re.search(r"^Trna", stable_id, re.IGNORECASE):
            logging.debug(f"Stable ID is a tRNA from tRNA-scan: {stable_id}")
            return False

        # Coordinates
        if re.search(r"^.+:\d+..\d+", stable_id):
            logging.debug(f"Stable id is a coordinate: {stable_id}")
            return False

        # Special characters
        if re.search(r"[ |]", stable_id):
            logging.debug(f"Stable id contains special characters: {stable_id}")
            return False

        # Min length
        if len(stable_id) < self.min_id_length:
            logging.debug(f"Stable id is too short (<{self.min_id_length}) {stable_id}")
            return False

        return True

    @staticmethod
    def remove_prefix(stable_id: str, prefixes: List[str]) -> str:
        """Returns the stable ID after removing its prefix (if any).

        If more than one prefix may be found, only the first one is removed.

        Args:
            stable_id: Stable ID to process.
            prefixes: List of prefixes to search for.
        """

        for prefix in prefixes:
            if stable_id.startswith(prefix):
                return stable_id[len(prefix) :]
        return stable_id

    @staticmethod
    def generate_transcript_id(gene_id: str, number: int) -> str:
        """Returns a formatted transcript ID generated from a gene ID and number.
        Args:
            gene_id: Gene stable ID.
            number: Positive number.
        Raises:
            ValueError: If the number provided is not greater than zero.

        """
        if number < 1:
            raise ValueError("Number has to be a positive integer.")

        transcript_id = f"{gene_id}_t{number}"
        return transcript_id

    def normalize_cds_id(self, cds_id: str) -> str:
        """Returns a normalized version of the provided CDS ID.

        The normalisation implies to remove any unnecessary prefixes around the CDS ID. However, if
        the CDS ID is still not proper, an empty string will be returned.

        Args:
            cds_id: CDS ID to normalize.

        """

        prefixes = ["cds-", "cds:"]
        normalized_cds_id = StableIDAllocator.remove_prefix(cds_id, prefixes)

        # Special case: if the ID doesn't look like one, remove it - it needs to be regenerated
        if not self.is_valid(normalized_cds_id):
            return ""
        return normalized_cds_id

    def normalize_pseudogene_cds_id(self, pseudogene: GFFSeqFeature) -> None:
        """Normalizes every CDS ID of the provided pseudogene.

        Ensure each CDS from a pseudogene has a proper ID:
        - Different from the gene
        - Derived from the gene if it is not proper

        Args:
            pseudogene: Pseudogene feature.
        """
        for transcript in pseudogene.sub_features:
            for feat in transcript.sub_features:
                if feat.type == "CDS":
                    feat.id = self.normalize_cds_id(feat.id)
                    if feat.id in ("", pseudogene.id):
                        feat.id = f"{transcript.id}_cds"
                        feat.qualifiers["ID"] = feat.id

    def normalize_gene_id(self, gene: GFFSeqFeature, refseq: Optional[bool] = False) -> str:
        """Returns a normalized gene stable ID.

        Removes any unnecessary prefixes, but will generate a new stable ID if the normalized one is
        not recognized as valid.

        Args:
            gene: Gene feature to normalize.
        """
        prefixes = ["gene-", "gene:"]
        new_gene_id = StableIDAllocator.remove_prefix(gene.id, prefixes)

        is_valid = False
        # Special case for RefSeq: only valid Gene IDs are LOC*
        if refseq:
            if new_gene_id.startswith("LOC"):
                is_valid = True
        else:
            is_valid = self.is_valid(new_gene_id)

        if is_valid:
            return new_gene_id

        # In case the normalized gene ID is not valid, use the GeneID
        logging.debug(f"Gene ID is not valid: {new_gene_id}")
        qual = gene.qualifiers
        if "Dbxref" in qual:
            for xref in qual["Dbxref"]:
                (db, value) = xref.split(":")
                if db != "GeneID":
                    continue
                new_gene_id_base = f"{db}_{value}"
                new_gene_id = new_gene_id_base
                number = 1
                while new_gene_id in self._loaded_ids:
                    number += 1
                    new_gene_id = f"{new_gene_id_base}_{number}"
                    if number > 10:
                        raise InvalidStableID(f"Duplicate ID {new_gene_id_base} (up to {new_gene_id})")
                self._loaded_ids.add(new_gene_id)
                logging.debug(f"Using GeneID {new_gene_id} for stable_id instead of {gene.id}")
                return new_gene_id

        # Make a new stable_id
        if self.make_missing_stable_ids:
            new_gene_id = self.generate_gene_id()
            logging.debug(f"New ID: {new_gene_id} -> {new_gene_id}")
            return new_gene_id
        raise InvalidStableID(f"Can't use invalid gene id for {gene}")

current_id_number = 0 class-attribute instance-attribute

make_missing_stable_ids = True class-attribute instance-attribute

min_id_length = 7 class-attribute instance-attribute

prefix = 'TMP_' class-attribute instance-attribute

skip_gene_id_validation = False class-attribute instance-attribute

generate_gene_id()

Returns a new unique gene stable_id with a prefix.

The ID is made up of a prefix and a number, which is auto incremented.

Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
53
54
55
56
57
58
59
60
61
def generate_gene_id(self) -> str:
    """Returns a new unique gene stable_id with a prefix.

    The ID is made up of a prefix and a number, which is auto incremented.

    """
    self.current_id_number += 1
    new_id = f"{self.prefix}{self.current_id_number}"
    return new_id

generate_transcript_id(gene_id, number) staticmethod

Returns a formatted transcript ID generated from a gene ID and number. Args: gene_id: Gene stable ID. number: Positive number. Raises: ValueError: If the number provided is not greater than zero.

Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@staticmethod
def generate_transcript_id(gene_id: str, number: int) -> str:
    """Returns a formatted transcript ID generated from a gene ID and number.
    Args:
        gene_id: Gene stable ID.
        number: Positive number.
    Raises:
        ValueError: If the number provided is not greater than zero.

    """
    if number < 1:
        raise ValueError("Number has to be a positive integer.")

    transcript_id = f"{gene_id}_t{number}"
    return transcript_id

is_valid(stable_id)

Checks that the format of a stable ID is valid. Args: stable_id: Stable ID to validate.

Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def is_valid(self, stable_id: str) -> bool:
    """Checks that the format of a stable ID is valid.
    Args:
        stable_id: Stable ID to validate.
    """

    if self.skip_gene_id_validation:
        logging.debug(f"Validation deactivated by user: '{stable_id}' not checked")
        return True

    # Trna (from tRNAscan)
    if re.search(r"^Trna", stable_id, re.IGNORECASE):
        logging.debug(f"Stable ID is a tRNA from tRNA-scan: {stable_id}")
        return False

    # Coordinates
    if re.search(r"^.+:\d+..\d+", stable_id):
        logging.debug(f"Stable id is a coordinate: {stable_id}")
        return False

    # Special characters
    if re.search(r"[ |]", stable_id):
        logging.debug(f"Stable id contains special characters: {stable_id}")
        return False

    # Min length
    if len(stable_id) < self.min_id_length:
        logging.debug(f"Stable id is too short (<{self.min_id_length}) {stable_id}")
        return False

    return True

normalize_cds_id(cds_id)

Returns a normalized version of the provided CDS ID.

The normalisation implies to remove any unnecessary prefixes around the CDS ID. However, if the CDS ID is still not proper, an empty string will be returned.

Parameters:

Name Type Description Default
cds_id str

CDS ID to normalize.

required
Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def normalize_cds_id(self, cds_id: str) -> str:
    """Returns a normalized version of the provided CDS ID.

    The normalisation implies to remove any unnecessary prefixes around the CDS ID. However, if
    the CDS ID is still not proper, an empty string will be returned.

    Args:
        cds_id: CDS ID to normalize.

    """

    prefixes = ["cds-", "cds:"]
    normalized_cds_id = StableIDAllocator.remove_prefix(cds_id, prefixes)

    # Special case: if the ID doesn't look like one, remove it - it needs to be regenerated
    if not self.is_valid(normalized_cds_id):
        return ""
    return normalized_cds_id

normalize_gene_id(gene, refseq=False)

Returns a normalized gene stable ID.

Removes any unnecessary prefixes, but will generate a new stable ID if the normalized one is not recognized as valid.

Parameters:

Name Type Description Default
gene GFFSeqFeature

Gene feature to normalize.

required
Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def normalize_gene_id(self, gene: GFFSeqFeature, refseq: Optional[bool] = False) -> str:
    """Returns a normalized gene stable ID.

    Removes any unnecessary prefixes, but will generate a new stable ID if the normalized one is
    not recognized as valid.

    Args:
        gene: Gene feature to normalize.
    """
    prefixes = ["gene-", "gene:"]
    new_gene_id = StableIDAllocator.remove_prefix(gene.id, prefixes)

    is_valid = False
    # Special case for RefSeq: only valid Gene IDs are LOC*
    if refseq:
        if new_gene_id.startswith("LOC"):
            is_valid = True
    else:
        is_valid = self.is_valid(new_gene_id)

    if is_valid:
        return new_gene_id

    # In case the normalized gene ID is not valid, use the GeneID
    logging.debug(f"Gene ID is not valid: {new_gene_id}")
    qual = gene.qualifiers
    if "Dbxref" in qual:
        for xref in qual["Dbxref"]:
            (db, value) = xref.split(":")
            if db != "GeneID":
                continue
            new_gene_id_base = f"{db}_{value}"
            new_gene_id = new_gene_id_base
            number = 1
            while new_gene_id in self._loaded_ids:
                number += 1
                new_gene_id = f"{new_gene_id_base}_{number}"
                if number > 10:
                    raise InvalidStableID(f"Duplicate ID {new_gene_id_base} (up to {new_gene_id})")
            self._loaded_ids.add(new_gene_id)
            logging.debug(f"Using GeneID {new_gene_id} for stable_id instead of {gene.id}")
            return new_gene_id

    # Make a new stable_id
    if self.make_missing_stable_ids:
        new_gene_id = self.generate_gene_id()
        logging.debug(f"New ID: {new_gene_id} -> {new_gene_id}")
        return new_gene_id
    raise InvalidStableID(f"Can't use invalid gene id for {gene}")

normalize_pseudogene_cds_id(pseudogene)

Normalizes every CDS ID of the provided pseudogene.

Ensure each CDS from a pseudogene has a proper ID: - Different from the gene - Derived from the gene if it is not proper

Parameters:

Name Type Description Default
pseudogene GFFSeqFeature

Pseudogene feature.

required
Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def normalize_pseudogene_cds_id(self, pseudogene: GFFSeqFeature) -> None:
    """Normalizes every CDS ID of the provided pseudogene.

    Ensure each CDS from a pseudogene has a proper ID:
    - Different from the gene
    - Derived from the gene if it is not proper

    Args:
        pseudogene: Pseudogene feature.
    """
    for transcript in pseudogene.sub_features:
        for feat in transcript.sub_features:
            if feat.type == "CDS":
                feat.id = self.normalize_cds_id(feat.id)
                if feat.id in ("", pseudogene.id):
                    feat.id = f"{transcript.id}_cds"
                    feat.qualifiers["ID"] = feat.id

remove_prefix(stable_id, prefixes) staticmethod

Returns the stable ID after removing its prefix (if any).

If more than one prefix may be found, only the first one is removed.

Parameters:

Name Type Description Default
stable_id str

Stable ID to process.

required
prefixes List[str]

List of prefixes to search for.

required
Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@staticmethod
def remove_prefix(stable_id: str, prefixes: List[str]) -> str:
    """Returns the stable ID after removing its prefix (if any).

    If more than one prefix may be found, only the first one is removed.

    Args:
        stable_id: Stable ID to process.
        prefixes: List of prefixes to search for.
    """

    for prefix in prefixes:
        if stable_id.startswith(prefix):
            return stable_id[len(prefix) :]
    return stable_id

set_prefix(genome)

Sets the ID prefix using the organism abbrev if it exists in the genome metadata.

Source code in src/python/ensembl/io/genomio/gff3/id_allocator.py
43
44
45
46
47
48
49
50
51
def set_prefix(self, genome: Dict) -> None:
    """Sets the ID prefix using the organism abbrev if it exists in the genome metadata."""
    try:
        org = genome["BRC4"]["organism_abbrev"]
    except KeyError:
        prefix = "TMP_PREFIX_"
    else:
        prefix = "TMP_" + org + "_"
    self.prefix = prefix

get_intervals(record, genes_dict, seq_dict, seq_name)

Extract start/stop feature coordinates for use in creating intervaltree object.

Parameters:

Name Type Description Default
record SeqRecord

Individual sequence record.

required
genes_dict dict

Genes.

required
seq_dict dict

Sequences.

required
seq_name str

Feature sequence name.

required
Source code in src/python/ensembl/io/genomio/gff3/overlaps.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def get_intervals(record: SeqRecord, genes_dict: dict, seq_dict: dict, seq_name: str) -> None:
    """Extract start/stop feature coordinates for use in creating intervaltree object.

    Args:
        record: Individual sequence record.
        genes_dict: Genes.
        seq_dict: Sequences.
        seq_name: Feature sequence name.
    """

    for feature in record.features:
        genes_dict[str(feature.id)] = {
            "sequence": f"{record.id}",
            "start": f"{int(feature.location.start) + 1}",
            "end": f"{int(feature.location.end)}",
            "strand": f"{feature.location.strand}",
            "name": f"{feature.id}",
        }

        if feature.location.strand == 1:
            seq_dict[seq_name]["plus"].append(
                (int(feature.location.start), int(feature.location.end), str(feature.id))
            )
        elif feature.location.strand == -1:
            seq_dict[seq_name]["minus"].append(
                (int(feature.location.start), int(feature.location.end), str(feature.id))
            )
        else:
            logging.critical("Something went wrong with the strand processing!")

identify_feature_overlaps(gff_in, output_file, isolate_feature)

Detect overlapping GFF3 SeqFeature objects and dump to a report.

Parameters:

Name Type Description Default
gff_in Path

User supplied GFF3 input file.

required
output_file Path

Output file to write feature overlaps.

required
isolate_feature str

Sequence feature type to filter by.

required
Source code in src/python/ensembl/io/genomio/gff3/overlaps.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def identify_feature_overlaps(gff_in: Path, output_file: Path, isolate_feature: str) -> None:
    """Detect overlapping GFF3 SeqFeature objects and dump to a report.

    Args:
        gff_in: User supplied GFF3 input file.
        output_file: Output file to write feature overlaps.
        isolate_feature: Sequence feature type to filter by.
    """
    logging.info("Processing sequence feature overlaps!")
    logging.info(f"Output file = {str(output_file)}")
    logging.info(f"Features filtered by type: {isolate_feature}")

    gff_type_filter: dict = {"gff_type": [isolate_feature]}
    seq_dict: dict = defaultdict(dict)
    genes_dict: dict = {}
    with gff_in.open("r", encoding="utf-8") as input_handle:
        for record in GFF.parse(input_handle, limit_info=gff_type_filter):
            seq_name = str(record.id)
            if seq_name not in seq_dict:
                seq_dict[seq_name]["plus"] = []
                seq_dict[seq_name]["minus"] = []

            get_intervals(record, genes_dict, seq_dict, seq_name)

    overlap_count = _write_report(output_file, seq_dict, genes_dict)

    result_total_features = f"In total {len(genes_dict)} {isolate_feature} features were scanned."
    print(result_total_features)
    logging.info(result_total_features)

    result_total_overlaps = f"In total {overlap_count} overlaps were detected."
    print(result_total_overlaps)
    logging.info(result_total_overlaps)

    logging.info("Finished all processing.")

init_logging_with_args(args)

Processes the Namespace object provided to call init_logging() with the correct arguments.

Parameters:

Name Type Description Default
args Namespace

Namespace populated by an argument parser.

required
Source code in ensembl/utils/logging.py
 98
 99
100
101
102
103
104
105
106
107
def init_logging_with_args(args: argparse.Namespace) -> None:
    """Processes the Namespace object provided to call `init_logging()` with the correct arguments.

    Args:
        args: Namespace populated by an argument parser.

    """
    args_dict = vars(args)
    log_args = {x: args_dict[x] for x in ["log_level", "log_file", "log_file_level"] if x in args_dict}
    init_logging(**log_args)

main()

Main script entry-point.

Source code in src/python/ensembl/io/genomio/gff3/process.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def main() -> None:
    """Main script entry-point."""
    parser = ArgumentParser(
        description=(
            "Standardize the gene model representation of a GFF3 file, and extract the functional "
            "annotation in a separate file."
        )
    )
    parser.add_argument_src_path("--in_gff_path", required=True, help="Input GFF3 file")
    parser.add_argument_src_path("--genome_data", required=True, help="Genome JSON file")
    parser.add_argument(
        "--fail_missing_stable_ids", action="store_true", help="Do not generate IDs when missing/invalid"
    )
    parser.add_argument_dst_path("--out_gff_path", default=Path("gene_models.gff3"), help="Output GFF3 file")
    parser.add_argument_dst_path(
        "--out_func_path",
        default=Path("functional_annotation.json"),
        help="Output functional annotation JSON file",
    )
    parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
    parser.add_log_arguments(add_log_file=True)
    args = parser.parse_args()
    init_logging_with_args(args)

    # Merge multiline gene features in a separate file
    logging.info("Checking for genes to merge...")
    interim_gff_path = Path(f"{args.in_gff_path}_INTERIM_MERGE")
    merger = GFFGeneMerger()
    merged_genes = merger.merge(args.in_gff_path, interim_gff_path)
    num_merged_genes = len(merged_genes)
    in_gff_path = args.in_gff_path
    # If there are split genes, decide to merge, or just die
    if num_merged_genes > 0:
        # Report the list of merged genes in case something does not look right
        logging.info(f"{num_merged_genes} genes merged")
        logging.debug("\n".join(merged_genes))
        # Use the GFF with the merged genes for the next part
        in_gff_path = interim_gff_path

    # Load GFF3 data and write a simpler version that follows our specifications as well as a
    # functional annotation JSON file
    logging.info("Simplify and fix GFF3")
    gff_data = GFFSimplifier(args.genome_data)
    if args.fail_missing_stable_ids:
        gff_data.stable_ids.make_missing_stable_ids = False
    gff_data.simpler_gff3(in_gff_path)
    gff_data.records.to_gff(args.out_gff_path)
    gff_data.annotations.to_json(args.out_func_path)

scan_tree(feature_intervals)

Construct an interval tree using supplied genomic intervals, check all elements on the tree against itself and return any that hit 2 or more intervals (i.e. itself + 1 other)

Parameters:

Name Type Description Default
feature_intervals list

Genome features to examine for coordinate (start/end) overlaps.

required
Return

Set of intervals identified in the input GFF3 file that overlap with 2 or more intervals.

Source code in src/python/ensembl/io/genomio/gff3/overlaps.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def scan_tree(feature_intervals: list) -> set:
    """Construct an interval tree using supplied genomic intervals, check all elements on the tree against
    itself and return any that hit 2 or more intervals (i.e. itself + 1 other)

    Args:
        feature_intervals: Genome features to examine for coordinate (start/end) overlaps.

    Return:
        Set of intervals identified in the input GFF3 file that overlap with 2 or more intervals.
    """

    interval_sets = set()
    traversed_tree = IntervalTree(Interval(*iv) for iv in feature_intervals)

    for interval in feature_intervals:
        if len(traversed_tree.overlap(interval[0], interval[1])) > 1:
            overlap_interval = traversed_tree.overlap(interval[0], interval[1])

            for features in overlap_interval:
                interval_sets.add(features.data)

    return interval_sets

summarize_feature_stats(gff_in)

Analyse a GFF3 file and produce a summary of its feature types.

Parameters:

Name Type Description Default
gff_in Path

User supplied GFF3 input file.

required
Source code in src/python/ensembl/io/genomio/gff3/overlaps.py
41
42
43
44
45
46
47
48
49
50
51
52
53
def summarize_feature_stats(gff_in: Path) -> None:
    """Analyse a GFF3 file and produce a summary of its feature types.

    Args:
        gff_in: User supplied GFF3 input file.
    """

    logging.info("Alt processing: Not parsing the GFF3, producing summary feature stats instead!")

    examiner = GFFExaminer()
    with gff_in.open("r", encoding="utf-8") as input_handle:
        pprint(examiner.available_limits(input_handle))
    input_handle.close()