Coverage for src / ensembl / utils / database / dbconnection.py: 90%
98 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 10:45 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 10:45 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Database connection handler.
17This module provides the main class to connect to and access databases. It will be an ORM-less
18connection, that is, the data can only be accessed via SQL queries (see example below).
20Examples:
22 >>> from ensembl.utils.database import DBConnection
23 >>> dbc = DBConnection("mysql://ensro@mysql-server:4242/mydb")
24 >>> # You can access the database data via sql queries, for instance:
25 >>> results = dbc.execute("SELECT * FROM my_table;")
26 >>> # Or via a connection in a transaction manner:
27 >>> with dbc.begin() as conn:
28 >>> results = conn.execute("SELECT * FROM my_table;")
30"""
32from __future__ import annotations
34__all__ = [
35 "Query",
36 "StrURL",
37 "DBConnection",
38]
40from contextlib import contextmanager
41from typing import Any, ContextManager, Generator, Optional, TypeVar
43import sqlalchemy
44from sqlalchemy import create_engine, event
45from sqlalchemy.orm import sessionmaker
46from sqlalchemy.schema import MetaData, Table
48Query = TypeVar("Query", str, sqlalchemy.sql.expression.ClauseElement, sqlalchemy.sql.expression.TextClause)
49StrURL = TypeVar("StrURL", str, sqlalchemy.engine.URL)
52class DBConnection:
53 """Database connection handler, providing also the database's schema and properties.
55 Args:
56 url: URL to the database, e.g. `mysql://user:passwd@host:port/my_db`.
57 reflect: Reflect the database schema or not.
59 """
61 def __init__(self, url: StrURL, reflect: bool = True, **kwargs: Any) -> None:
62 self._engine = create_engine(url, future=True, **kwargs)
63 self._metadata: MetaData | None = None
64 if reflect:
65 self.load_metadata()
67 def __repr__(self) -> str:
68 """Returns a string representation of this object."""
69 return f"{self.__class__.__name__}({self.url!r})"
71 def load_metadata(self) -> None:
72 """Loads the metadata information of the database."""
73 # Note: Just reflect() is not enough as it would not delete tables that no longer exist
74 self._metadata = sqlalchemy.MetaData()
75 self._metadata.reflect(bind=self._engine)
77 def create_all_tables(self, metadata: MetaData) -> None:
78 """Create the tables from the metadata and set the metadata.
80 This assumes the database is empty beforehand. If the tables already exist, they will be ignored.
81 If there are other tables, you may need to run `self.load_metadata()` to update the metadata schema.
82 """
83 self._metadata = metadata
84 metadata.create_all(self._engine)
86 def create_table(self, table: Table) -> None:
87 """Create a table in the database and update the metadata. Do nothing if the table already exists."""
88 table.create(self._engine)
89 # We need to update the metadata to register the new table
90 self.load_metadata()
92 @property
93 def url(self) -> str:
94 """Returns the database URL."""
95 return self._engine.url.render_as_string(hide_password=False)
97 @property
98 def db_name(self) -> Optional[str]:
99 """Returns the database name."""
100 return self._engine.url.database
102 @property
103 def host(self) -> Optional[str]:
104 """Returns the database host."""
105 return self._engine.url.host
107 @property
108 def port(self) -> Optional[int]:
109 """Returns the port of the database host."""
110 return self._engine.url.port
112 @property
113 def dialect(self) -> str:
114 """Returns the SQLAlchemy database dialect name of the database host."""
115 return self._engine.name
117 @property
118 def tables(self) -> dict[str, sqlalchemy.schema.Table]:
119 """Returns the database tables keyed to their name, or an empty dict if no metadata was loaded."""
120 if self._metadata:
121 return self._metadata.tables
122 return {}
124 def get_primary_key_columns(self, table: str) -> list[str]:
125 """Returns the primary key column names for the given table.
127 Args:
128 table: Table name.
130 """
131 return [col.name for col in self.tables[table].primary_key.columns.values()]
133 def get_columns(self, table: str) -> list[str]:
134 """Returns the column names for the given table.
136 Args:
137 table: Table name.
139 """
140 return [col.name for col in self.tables[table].columns]
142 def connect(self) -> sqlalchemy.engine.Connection:
143 """Returns a new database connection."""
144 return self._engine.connect()
146 def begin(self, *args: Any) -> ContextManager[sqlalchemy.engine.Connection]:
147 """Returns a context manager delivering a database connection with a transaction established."""
148 return self._engine.begin(*args)
150 def dispose(self) -> None:
151 """Disposes of the connection pool."""
152 self._engine.dispose()
154 def _enable_sqlite_savepoints(self, engine: sqlalchemy.engine.Engine) -> None:
155 """Enables SQLite SAVEPOINTS to allow session rollbacks."""
157 @event.listens_for(engine, "connect")
158 def do_connect(
159 dbapi_connection: Any, # SQLAlchemy is not clear about the type of this argument
160 connection_record: sqlalchemy.pool.ConnectionPoolEntry, # pylint: disable=unused-argument
161 ) -> None:
162 """Disables emitting the BEGIN statement entirely, as well as COMMIT before any DDL."""
163 dbapi_connection.isolation_level = None
165 @event.listens_for(engine, "begin")
166 def do_begin(conn: sqlalchemy.engine.Connection) -> None:
167 """Emits a custom own BEGIN."""
168 conn.exec_driver_sql("BEGIN")
170 @contextmanager
171 def session_scope(self) -> Generator[sqlalchemy.orm.Session, None, None]:
172 """Provides a transactional scope around a series of operations with rollback in case of failure.
174 Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all
175 the modifications performed to the database will persist.
177 """
178 # Create a dedicated engine for this session
179 engine = create_engine(self._engine.url)
180 if self.dialect == "sqlite": 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true
181 self._enable_sqlite_savepoints(engine)
182 Session = sessionmaker(future=True)
183 session = Session(bind=engine, autoflush=False)
184 try:
185 yield session
186 session.commit()
187 except:
188 # Rollback to ensure no changes are made to the database
189 session.rollback()
190 raise
191 finally:
192 # Whatever happens, make sure the session is closed
193 session.close()
195 @contextmanager
196 def test_session_scope(self) -> Generator[sqlalchemy.orm.Session, None, None]:
197 """Provides a transactional scope around a series of operations that will be rolled back at the end.
199 Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all
200 the modifications performed to the database will persist.
202 """
203 # Create a dedicated engine for this session
204 engine = create_engine(self._engine.url)
205 if self.dialect == "sqlite": 205 ↛ 208line 205 didn't jump to line 208 because the condition on line 205 was always true
206 self._enable_sqlite_savepoints(engine)
207 # Connect to the database
208 connection = engine.connect()
209 # Begin a non-ORM transaction
210 transaction = connection.begin()
211 # Bind an individual session to the connection
212 Session = sessionmaker(future=True)
213 try:
214 # Running on SQLAlchemy 2.0+
215 session = Session(bind=connection, join_transaction_mode="create_savepoint")
216 except TypeError:
217 # Running on SQLAlchemy 1.4
218 session = Session(bind=connection)
219 # If the database supports SAVEPOINT, starting a savepoint will allow to also use rollback
220 connection.begin_nested()
222 # Define a new transaction event
223 @event.listens_for(session, "after_transaction_end")
224 def end_savepoint(
225 session: sqlalchemy.orm.Session, # pylint: disable=unused-argument
226 transaction: sqlalchemy.orm.SessionTransaction, # pylint: disable=unused-argument
227 ) -> None:
228 if not connection.in_nested_transaction():
229 connection.begin_nested()
231 try:
232 yield session
233 finally:
234 # Whatever happens, make sure the session and connection are closed, rolling back
235 # everything done with the session (including calls to commit())
236 session.close()
237 transaction.rollback()
238 connection.close()