Skip to content

database

ensembl.utils.database

Database module.

Query = TypeVar('Query', str, sqlalchemy.sql.expression.ClauseElement, sqlalchemy.sql.expression.TextClause) module-attribute

StrURL = TypeVar('StrURL', str, sqlalchemy.engine.URL) module-attribute

DBConnection

Database connection handler, providing also the database's schema and properties.

Parameters:

Name Type Description Default
url StrURL

URL to the database, e.g. mysql://user:passwd@host:port/my_db.

required
reflect bool

Reflect the database schema or not.

True
Source code in src/ensembl/utils/database/dbconnection.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class DBConnection:
    """Database connection handler, providing also the database's schema and properties.

    Args:
        url: URL to the database, e.g. `mysql://user:passwd@host:port/my_db`.
        reflect: Reflect the database schema or not.

    """

    def __init__(self, url: StrURL, reflect: bool = True, **kwargs: Any) -> None:
        self._engine = create_engine(url, future=True, **kwargs)
        self._metadata: MetaData | None = None
        if reflect:
            self.load_metadata()

    def __repr__(self) -> str:
        """Returns a string representation of this object."""
        return f"{self.__class__.__name__}({self.url!r})"

    def load_metadata(self) -> None:
        """Loads the metadata information of the database."""
        # Note: Just reflect() is not enough as it would not delete tables that no longer exist
        self._metadata = sqlalchemy.MetaData()
        self._metadata.reflect(bind=self._engine)

    def create_all_tables(self, metadata: MetaData) -> None:
        """Create the tables from the metadata and set the metadata.

        This assumes the database is empty beforehand. If the tables already exist, they will be ignored.
        If there are other tables, you may need to run `self.load_metadata()` to update the metadata schema.
        """
        self._metadata = metadata
        metadata.create_all(self._engine)

    def create_table(self, table: Table) -> None:
        """Create a table in the database and update the metadata. Do nothing if the table already exists."""
        table.create(self._engine)
        # We need to update the metadata to register the new table
        self.load_metadata()

    @property
    def url(self) -> str:
        """Returns the database URL."""
        return self._engine.url.render_as_string(hide_password=False)

    @property
    def db_name(self) -> Optional[str]:
        """Returns the database name."""
        return self._engine.url.database

    @property
    def host(self) -> Optional[str]:
        """Returns the database host."""
        return self._engine.url.host

    @property
    def port(self) -> Optional[int]:
        """Returns the port of the database host."""
        return self._engine.url.port

    @property
    def dialect(self) -> str:
        """Returns the SQLAlchemy database dialect name of the database host."""
        return self._engine.name

    @property
    def tables(self) -> dict[str, sqlalchemy.schema.Table]:
        """Returns the database tables keyed to their name, or an empty dict if no metadata was loaded."""
        if self._metadata:
            return self._metadata.tables
        return {}

    def get_primary_key_columns(self, table: str) -> list[str]:
        """Returns the primary key column names for the given table.

        Args:
            table: Table name.

        """
        return [col.name for col in self.tables[table].primary_key.columns.values()]

    def get_columns(self, table: str) -> list[str]:
        """Returns the column names for the given table.

        Args:
            table: Table name.

        """
        return [col.name for col in self.tables[table].columns]

    def connect(self) -> sqlalchemy.engine.Connection:
        """Returns a new database connection."""
        return self._engine.connect()

    def begin(self, *args: Any) -> ContextManager[sqlalchemy.engine.Connection]:
        """Returns a context manager delivering a database connection with a transaction established."""
        return self._engine.begin(*args)

    def dispose(self) -> None:
        """Disposes of the connection pool."""
        self._engine.dispose()

    def _enable_sqlite_savepoints(self, engine: sqlalchemy.engine.Engine) -> None:
        """Enables SQLite SAVEPOINTS to allow session rollbacks."""

        @event.listens_for(engine, "connect")
        def do_connect(
            dbapi_connection: Any,  # SQLAlchemy is not clear about the type of this argument
            connection_record: sqlalchemy.pool.ConnectionPoolEntry,  # pylint: disable=unused-argument
        ) -> None:
            """Disables emitting the BEGIN statement entirely, as well as COMMIT before any DDL."""
            dbapi_connection.isolation_level = None

        @event.listens_for(engine, "begin")
        def do_begin(conn: sqlalchemy.engine.Connection) -> None:
            """Emits a custom own BEGIN."""
            conn.exec_driver_sql("BEGIN")

    @contextmanager
    def session_scope(self) -> Generator[sqlalchemy.orm.Session, None, None]:
        """Provides a transactional scope around a series of operations with rollback in case of failure.

        Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all
        the modifications performed to the database will persist.

        """
        # Create a dedicated engine for this session
        engine = create_engine(self._engine.url)
        if self.dialect == "sqlite":
            self._enable_sqlite_savepoints(engine)
        Session = sessionmaker(future=True)
        session = Session(bind=engine, autoflush=False)
        try:
            yield session
            session.commit()
        except:
            # Rollback to ensure no changes are made to the database
            session.rollback()
            raise
        finally:
            # Whatever happens, make sure the session is closed
            session.close()

    @contextmanager
    def test_session_scope(self) -> Generator[sqlalchemy.orm.Session, None, None]:
        """Provides a transactional scope around a series of operations that will be rolled back at the end.

        Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all
        the modifications performed to the database will persist.

        """
        # Create a dedicated engine for this session
        engine = create_engine(self._engine.url)
        if self.dialect == "sqlite":
            self._enable_sqlite_savepoints(engine)
        # Connect to the database
        connection = engine.connect()
        # Begin a non-ORM transaction
        transaction = connection.begin()
        # Bind an individual session to the connection
        Session = sessionmaker(future=True)
        try:
            # Running on SQLAlchemy 2.0+
            session = Session(bind=connection, join_transaction_mode="create_savepoint")
        except TypeError:
            # Running on SQLAlchemy 1.4
            session = Session(bind=connection)
            # If the database supports SAVEPOINT, starting a savepoint will allow to also use rollback
            connection.begin_nested()

            # Define a new transaction event
            @event.listens_for(session, "after_transaction_end")
            def end_savepoint(
                session: sqlalchemy.orm.Session,  # pylint: disable=unused-argument
                transaction: sqlalchemy.orm.SessionTransaction,  # pylint: disable=unused-argument
            ) -> None:
                if not connection.in_nested_transaction():
                    connection.begin_nested()

        try:
            yield session
        finally:
            # Whatever happens, make sure the session and connection are closed, rolling back
            # everything done with the session (including calls to commit())
            session.close()
            transaction.rollback()
            connection.close()

db_name: Optional[str] property

Returns the database name.

dialect: str property

Returns the SQLAlchemy database dialect name of the database host.

host: Optional[str] property

Returns the database host.

port: Optional[int] property

Returns the port of the database host.

tables: dict[str, sqlalchemy.schema.Table] property

Returns the database tables keyed to their name, or an empty dict if no metadata was loaded.

url: str property

Returns the database URL.

begin(*args)

Returns a context manager delivering a database connection with a transaction established.

Source code in src/ensembl/utils/database/dbconnection.py
147
148
149
def begin(self, *args: Any) -> ContextManager[sqlalchemy.engine.Connection]:
    """Returns a context manager delivering a database connection with a transaction established."""
    return self._engine.begin(*args)

connect()

Returns a new database connection.

Source code in src/ensembl/utils/database/dbconnection.py
143
144
145
def connect(self) -> sqlalchemy.engine.Connection:
    """Returns a new database connection."""
    return self._engine.connect()

create_all_tables(metadata)

Create the tables from the metadata and set the metadata.

This assumes the database is empty beforehand. If the tables already exist, they will be ignored. If there are other tables, you may need to run self.load_metadata() to update the metadata schema.

Source code in src/ensembl/utils/database/dbconnection.py
78
79
80
81
82
83
84
85
def create_all_tables(self, metadata: MetaData) -> None:
    """Create the tables from the metadata and set the metadata.

    This assumes the database is empty beforehand. If the tables already exist, they will be ignored.
    If there are other tables, you may need to run `self.load_metadata()` to update the metadata schema.
    """
    self._metadata = metadata
    metadata.create_all(self._engine)

create_table(table)

Create a table in the database and update the metadata. Do nothing if the table already exists.

Source code in src/ensembl/utils/database/dbconnection.py
87
88
89
90
91
def create_table(self, table: Table) -> None:
    """Create a table in the database and update the metadata. Do nothing if the table already exists."""
    table.create(self._engine)
    # We need to update the metadata to register the new table
    self.load_metadata()

dispose()

Disposes of the connection pool.

Source code in src/ensembl/utils/database/dbconnection.py
151
152
153
def dispose(self) -> None:
    """Disposes of the connection pool."""
    self._engine.dispose()

get_columns(table)

Returns the column names for the given table.

Parameters:

Name Type Description Default
table str

Table name.

required
Source code in src/ensembl/utils/database/dbconnection.py
134
135
136
137
138
139
140
141
def get_columns(self, table: str) -> list[str]:
    """Returns the column names for the given table.

    Args:
        table: Table name.

    """
    return [col.name for col in self.tables[table].columns]

get_primary_key_columns(table)

Returns the primary key column names for the given table.

Parameters:

Name Type Description Default
table str

Table name.

required
Source code in src/ensembl/utils/database/dbconnection.py
125
126
127
128
129
130
131
132
def get_primary_key_columns(self, table: str) -> list[str]:
    """Returns the primary key column names for the given table.

    Args:
        table: Table name.

    """
    return [col.name for col in self.tables[table].primary_key.columns.values()]

load_metadata()

Loads the metadata information of the database.

Source code in src/ensembl/utils/database/dbconnection.py
72
73
74
75
76
def load_metadata(self) -> None:
    """Loads the metadata information of the database."""
    # Note: Just reflect() is not enough as it would not delete tables that no longer exist
    self._metadata = sqlalchemy.MetaData()
    self._metadata.reflect(bind=self._engine)

session_scope()

Provides a transactional scope around a series of operations with rollback in case of failure.

Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all the modifications performed to the database will persist.

Source code in src/ensembl/utils/database/dbconnection.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@contextmanager
def session_scope(self) -> Generator[sqlalchemy.orm.Session, None, None]:
    """Provides a transactional scope around a series of operations with rollback in case of failure.

    Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all
    the modifications performed to the database will persist.

    """
    # Create a dedicated engine for this session
    engine = create_engine(self._engine.url)
    if self.dialect == "sqlite":
        self._enable_sqlite_savepoints(engine)
    Session = sessionmaker(future=True)
    session = Session(bind=engine, autoflush=False)
    try:
        yield session
        session.commit()
    except:
        # Rollback to ensure no changes are made to the database
        session.rollback()
        raise
    finally:
        # Whatever happens, make sure the session is closed
        session.close()

test_session_scope()

Provides a transactional scope around a series of operations that will be rolled back at the end.

Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all the modifications performed to the database will persist.

Source code in src/ensembl/utils/database/dbconnection.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@contextmanager
def test_session_scope(self) -> Generator[sqlalchemy.orm.Session, None, None]:
    """Provides a transactional scope around a series of operations that will be rolled back at the end.

    Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all
    the modifications performed to the database will persist.

    """
    # Create a dedicated engine for this session
    engine = create_engine(self._engine.url)
    if self.dialect == "sqlite":
        self._enable_sqlite_savepoints(engine)
    # Connect to the database
    connection = engine.connect()
    # Begin a non-ORM transaction
    transaction = connection.begin()
    # Bind an individual session to the connection
    Session = sessionmaker(future=True)
    try:
        # Running on SQLAlchemy 2.0+
        session = Session(bind=connection, join_transaction_mode="create_savepoint")
    except TypeError:
        # Running on SQLAlchemy 1.4
        session = Session(bind=connection)
        # If the database supports SAVEPOINT, starting a savepoint will allow to also use rollback
        connection.begin_nested()

        # Define a new transaction event
        @event.listens_for(session, "after_transaction_end")
        def end_savepoint(
            session: sqlalchemy.orm.Session,  # pylint: disable=unused-argument
            transaction: sqlalchemy.orm.SessionTransaction,  # pylint: disable=unused-argument
        ) -> None:
            if not connection.in_nested_transaction():
                connection.begin_nested()

    try:
        yield session
    finally:
        # Whatever happens, make sure the session and connection are closed, rolling back
        # everything done with the session (including calls to commit())
        session.close()
        transaction.rollback()
        connection.close()

UnitTestDB

Creates and connects to a new test database, applying the schema and importing the data.

Parameters:

Name Type Description Default
server_url StrURL

URL of the server hosting the database.

required
metadata MetaData | None

Use this metadata to create the schema instead of using an SQL schema file.

None
dump_dir StrPath | None

Directory path with the database schema in table.sql (mandatory) and one TSV data file (without headers) per table following the convention <table_name>.txt (optional).

None
name str | None

Name to give to the new database. If not provided, the last directory name of dump_dir will be used instead. In either case, the new database name will be prefixed by the username.

None
tmp_path StrPath | None

Temp dir where the test db is created if using SQLite (otherwise use current dir).

None

Attributes:

Name Type Description
dbc

Database connection handler.

Raises:

Type Description
FileNotFoundError

If table.sql is not found.

Source code in src/ensembl/utils/database/unittestdb.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class UnitTestDB:
    """Creates and connects to a new test database, applying the schema and importing the data.

    Args:
        server_url: URL of the server hosting the database.
        metadata: Use this metadata to create the schema instead of using an SQL schema file.
        dump_dir: Directory path with the database schema in `table.sql` (mandatory) and one TSV data
            file (without headers) per table following the convention `<table_name>.txt` (optional).
        name: Name to give to the new database. If not provided, the last directory name of `dump_dir`
            will be used instead. In either case, the new database name will be prefixed by the username.
        tmp_path: Temp dir where the test db is created if using SQLite (otherwise use current dir).

    Attributes:
        dbc: Database connection handler.

    Raises:
        FileNotFoundError: If `table.sql` is not found.

    """

    def __init__(
        self,
        server_url: StrURL,
        dump_dir: StrPath | None = None,
        name: str | None = None,
        metadata: MetaData | None = None,
        tmp_path: StrPath | None = None,
    ) -> None:
        db_url = make_url(server_url)
        if not name:
            name = Path(dump_dir).name if dump_dir else "testdb"
        db_name = f"{TEST_USERNAME}_{name}"

        # Add the database name to the URL
        if db_url.get_dialect().name == "sqlite":
            db_path = Path(tmp_path) / db_name if tmp_path else db_name
            db_url = db_url.set(database=f"{db_path}.db")
        else:
            db_url = db_url.set(database=db_name)
        # Enable "local_infile" variable for MySQL databases to allow importing data from files
        connect_args = {}
        if db_url.get_dialect().name == "mysql":
            connect_args["local_infile"] = 1
        # Create the database, dropping it beforehand if it already exists
        if database_exists(db_url):
            drop_database(db_url)
        create_database(db_url)
        # Establish the connection to the database, load the schema and import the data
        try:
            self.dbc = DBConnection(db_url, connect_args=connect_args, reflect=False)
            self._load_schema_and_data(dump_dir, metadata)
        except:
            # Make sure the database is deleted before raising the exception
            drop_database(db_url)
            raise
        # Update the loaded metadata information of the database
        self.dbc.load_metadata()

    def _load_schema_and_data(
        self, dump_dir: StrPath | None = None, metadata: MetaData | None = None
    ) -> None:
        with self.dbc.begin() as conn:
            # Set InnoDB engine as default and disable foreign key checks for MySQL databases
            if self.dbc.dialect == "mysql":
                conn.execute(text("SET default_storage_engine=InnoDB"))
                conn.execute(text("SET FOREIGN_KEY_CHECKS=0"))

            # Load the schema
            if metadata:
                self.dbc.create_all_tables(metadata)
            elif dump_dir:
                dump_dir_path = Path(dump_dir)
                with open(dump_dir_path / "table.sql", "r") as schema:
                    for query in "".join(schema.readlines()).split(";"):
                        if query.strip():
                            conn.execute(text(query))

            # And import any available data for each table
            if dump_dir:
                for tsv_file in dump_dir_path.glob("*.txt"):
                    table = tsv_file.stem
                    self._load_data(conn, table, tsv_file)

            # Re-enable foreign key checks for MySQL databases
            if self.dbc.dialect == "mysql":
                conn.execute(text("SET FOREIGN_KEY_CHECKS=1"))

    def __repr__(self) -> str:
        """Returns a string representation of this object."""
        return f"{self.__class__.__name__}({self.dbc.url!r})"

    def drop(self) -> None:
        """Drops the database."""
        drop_database(self.dbc.url)
        # Ensure the connection pool is properly closed and disposed
        self.dbc.dispose()

    def _load_data(self, conn: sqlalchemy.engine.Connection, table: str, src: StrPath) -> None:
        """Loads the table data from the given file.

        Args:
            conn: Open connection to the database.
            table: Table name to load the data to.
            src: File path with the data in TSV format (without headers).

        """
        if self.dbc.dialect == "sqlite":
            # SQLite does not have an equivalent to "LOAD DATA": use its ".import" command instead
            subprocess.run(["sqlite3", self.dbc.db_name, ".mode tabs", f".import {src} {table}"], check=True)
        elif self.dbc.dialect == "postgresql":
            conn.execute(text(f"COPY {table} FROM '{src}'"))
        elif self.dbc.dialect == "sqlserver":
            conn.execute(text(f"BULK INSERT {table} FROM '{src}'"))
        else:
            conn.execute(text(f"LOAD DATA LOCAL INFILE '{src}' INTO TABLE {table}"))

    def __enter__(self) -> UnitTestDB:
        return self

    def __exit__(self, *args: Any) -> None:
        self.drop()

dbc = DBConnection(db_url, connect_args=connect_args, reflect=False) instance-attribute

drop()

Drops the database.

Source code in src/ensembl/utils/database/unittestdb.py
151
152
153
154
155
def drop(self) -> None:
    """Drops the database."""
    drop_database(self.dbc.url)
    # Ensure the connection pool is properly closed and disposed
    self.dbc.dispose()