Coverage for src / ensembl / utils / database / dbconnection.py: 90%

98 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 10:45 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Database connection handler. 

16 

17This module provides the main class to connect to and access databases. It will be an ORM-less 

18connection, that is, the data can only be accessed via SQL queries (see example below). 

19 

20Examples: 

21 

22 >>> from ensembl.utils.database import DBConnection 

23 >>> dbc = DBConnection("mysql://ensro@mysql-server:4242/mydb") 

24 >>> # You can access the database data via sql queries, for instance: 

25 >>> results = dbc.execute("SELECT * FROM my_table;") 

26 >>> # Or via a connection in a transaction manner: 

27 >>> with dbc.begin() as conn: 

28 >>> results = conn.execute("SELECT * FROM my_table;") 

29 

30""" 

31 

32from __future__ import annotations 

33 

34__all__ = [ 

35 "Query", 

36 "StrURL", 

37 "DBConnection", 

38] 

39 

40from contextlib import contextmanager 

41from typing import Any, ContextManager, Generator, Optional, TypeVar 

42 

43import sqlalchemy 

44from sqlalchemy import create_engine, event 

45from sqlalchemy.orm import sessionmaker 

46from sqlalchemy.schema import MetaData, Table 

47 

48Query = TypeVar("Query", str, sqlalchemy.sql.expression.ClauseElement, sqlalchemy.sql.expression.TextClause) 

49StrURL = TypeVar("StrURL", str, sqlalchemy.engine.URL) 

50 

51 

52class DBConnection: 

53 """Database connection handler, providing also the database's schema and properties. 

54 

55 Args: 

56 url: URL to the database, e.g. `mysql://user:passwd@host:port/my_db`. 

57 reflect: Reflect the database schema or not. 

58 

59 """ 

60 

61 def __init__(self, url: StrURL, reflect: bool = True, **kwargs: Any) -> None: 

62 self._engine = create_engine(url, future=True, **kwargs) 

63 self._metadata: MetaData | None = None 

64 if reflect: 

65 self.load_metadata() 

66 

67 def __repr__(self) -> str: 

68 """Returns a string representation of this object.""" 

69 return f"{self.__class__.__name__}({self.url!r})" 

70 

71 def load_metadata(self) -> None: 

72 """Loads the metadata information of the database.""" 

73 # Note: Just reflect() is not enough as it would not delete tables that no longer exist 

74 self._metadata = sqlalchemy.MetaData() 

75 self._metadata.reflect(bind=self._engine) 

76 

77 def create_all_tables(self, metadata: MetaData) -> None: 

78 """Create the tables from the metadata and set the metadata. 

79 

80 This assumes the database is empty beforehand. If the tables already exist, they will be ignored. 

81 If there are other tables, you may need to run `self.load_metadata()` to update the metadata schema. 

82 """ 

83 self._metadata = metadata 

84 metadata.create_all(self._engine) 

85 

86 def create_table(self, table: Table) -> None: 

87 """Create a table in the database and update the metadata. Do nothing if the table already exists.""" 

88 table.create(self._engine) 

89 # We need to update the metadata to register the new table 

90 self.load_metadata() 

91 

92 @property 

93 def url(self) -> str: 

94 """Returns the database URL.""" 

95 return self._engine.url.render_as_string(hide_password=False) 

96 

97 @property 

98 def db_name(self) -> Optional[str]: 

99 """Returns the database name.""" 

100 return self._engine.url.database 

101 

102 @property 

103 def host(self) -> Optional[str]: 

104 """Returns the database host.""" 

105 return self._engine.url.host 

106 

107 @property 

108 def port(self) -> Optional[int]: 

109 """Returns the port of the database host.""" 

110 return self._engine.url.port 

111 

112 @property 

113 def dialect(self) -> str: 

114 """Returns the SQLAlchemy database dialect name of the database host.""" 

115 return self._engine.name 

116 

117 @property 

118 def tables(self) -> dict[str, sqlalchemy.schema.Table]: 

119 """Returns the database tables keyed to their name, or an empty dict if no metadata was loaded.""" 

120 if self._metadata: 

121 return self._metadata.tables 

122 return {} 

123 

124 def get_primary_key_columns(self, table: str) -> list[str]: 

125 """Returns the primary key column names for the given table. 

126 

127 Args: 

128 table: Table name. 

129 

130 """ 

131 return [col.name for col in self.tables[table].primary_key.columns.values()] 

132 

133 def get_columns(self, table: str) -> list[str]: 

134 """Returns the column names for the given table. 

135 

136 Args: 

137 table: Table name. 

138 

139 """ 

140 return [col.name for col in self.tables[table].columns] 

141 

142 def connect(self) -> sqlalchemy.engine.Connection: 

143 """Returns a new database connection.""" 

144 return self._engine.connect() 

145 

146 def begin(self, *args: Any) -> ContextManager[sqlalchemy.engine.Connection]: 

147 """Returns a context manager delivering a database connection with a transaction established.""" 

148 return self._engine.begin(*args) 

149 

150 def dispose(self) -> None: 

151 """Disposes of the connection pool.""" 

152 self._engine.dispose() 

153 

154 def _enable_sqlite_savepoints(self, engine: sqlalchemy.engine.Engine) -> None: 

155 """Enables SQLite SAVEPOINTS to allow session rollbacks.""" 

156 

157 @event.listens_for(engine, "connect") 

158 def do_connect( 

159 dbapi_connection: Any, # SQLAlchemy is not clear about the type of this argument 

160 connection_record: sqlalchemy.pool.ConnectionPoolEntry, # pylint: disable=unused-argument 

161 ) -> None: 

162 """Disables emitting the BEGIN statement entirely, as well as COMMIT before any DDL.""" 

163 dbapi_connection.isolation_level = None 

164 

165 @event.listens_for(engine, "begin") 

166 def do_begin(conn: sqlalchemy.engine.Connection) -> None: 

167 """Emits a custom own BEGIN.""" 

168 conn.exec_driver_sql("BEGIN") 

169 

170 @contextmanager 

171 def session_scope(self) -> Generator[sqlalchemy.orm.Session, None, None]: 

172 """Provides a transactional scope around a series of operations with rollback in case of failure. 

173 

174 Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all 

175 the modifications performed to the database will persist. 

176 

177 """ 

178 # Create a dedicated engine for this session 

179 engine = create_engine(self._engine.url) 

180 if self.dialect == "sqlite": 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true

181 self._enable_sqlite_savepoints(engine) 

182 Session = sessionmaker(future=True) 

183 session = Session(bind=engine, autoflush=False) 

184 try: 

185 yield session 

186 session.commit() 

187 except: 

188 # Rollback to ensure no changes are made to the database 

189 session.rollback() 

190 raise 

191 finally: 

192 # Whatever happens, make sure the session is closed 

193 session.close() 

194 

195 @contextmanager 

196 def test_session_scope(self) -> Generator[sqlalchemy.orm.Session, None, None]: 

197 """Provides a transactional scope around a series of operations that will be rolled back at the end. 

198 

199 Bear in mind MySQL's storage engine MyISAM does not support rollback transactions, so all 

200 the modifications performed to the database will persist. 

201 

202 """ 

203 # Create a dedicated engine for this session 

204 engine = create_engine(self._engine.url) 

205 if self.dialect == "sqlite": 205 ↛ 208line 205 didn't jump to line 208 because the condition on line 205 was always true

206 self._enable_sqlite_savepoints(engine) 

207 # Connect to the database 

208 connection = engine.connect() 

209 # Begin a non-ORM transaction 

210 transaction = connection.begin() 

211 # Bind an individual session to the connection 

212 Session = sessionmaker(future=True) 

213 try: 

214 # Running on SQLAlchemy 2.0+ 

215 session = Session(bind=connection, join_transaction_mode="create_savepoint") 

216 except TypeError: 

217 # Running on SQLAlchemy 1.4 

218 session = Session(bind=connection) 

219 # If the database supports SAVEPOINT, starting a savepoint will allow to also use rollback 

220 connection.begin_nested() 

221 

222 # Define a new transaction event 

223 @event.listens_for(session, "after_transaction_end") 

224 def end_savepoint( 

225 session: sqlalchemy.orm.Session, # pylint: disable=unused-argument 

226 transaction: sqlalchemy.orm.SessionTransaction, # pylint: disable=unused-argument 

227 ) -> None: 

228 if not connection.in_nested_transaction(): 

229 connection.begin_nested() 

230 

231 try: 

232 yield session 

233 finally: 

234 # Whatever happens, make sure the session and connection are closed, rolling back 

235 # everything done with the session (including calls to commit()) 

236 session.close() 

237 transaction.rollback() 

238 connection.close()