Coverage for src/python/ensembl/ncbi_taxonomy/api/utils.py: 97%

87 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-17 13:09 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Taxonomy API utils. 

15 

16This module constitutes a set of utils through a Taxonomy API main class given 

17a Taxonomy ORM compatible database session. 

18 

19Typical usage example:: 

20 

21 from ensembl.database import DBConnection 

22 from ensembl.ncbi_taxonomy.api.utils import Taxonomy 

23 dbc = DBConnection('mysql://user@mysql-host:port/dbname') 

24 with dbc.session_scope() as session: 

25 # Get the last common ancestor of dog and mouse 

26 dog_node = Taxonomy.fetch_taxon_by_species_name(session, 'canis_lupus_familiaris') 

27 mouse_node = Taxonomy.fetch_taxon_by_species_name(session, 'mus_musculus') 

28 common_anc = Taxonomy.last_common_ancestor(session, dog_node.taxon_id, mouse_node.taxon_id) 

29 

30""" 

31 

32__all__ = ["Taxonomy"] 

33 

34from typing import Tuple 

35 

36from sqlalchemy import and_ 

37from sqlalchemy.orm import as_declarative, Session, aliased 

38from sqlalchemy.orm.exc import NoResultFound 

39 

40from ensembl.ncbi_taxonomy.models import NCBITaxaNode, NCBITaxonomy 

41 

42 

43@as_declarative() 

44class Taxonomy: 

45 """Contains all the taxonomy related functions over NCBITaxonomy ORM 

46 

47 Attributes: 

48 session: db Session() 

49 """ 

50 

51 @classmethod 

52 def fetch_node_by_id(cls, session: Session, taxon_id: int) -> NCBITaxonomy: 

53 """Returns taxonomy node object by ``taxon_id`` 

54 

55 Args: 

56 taxon_id: Unique taxonomy identifier in database 

57 

58 Raises: 

59 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id`` does not exist 

60 """ 

61 q = session.query(NCBITaxonomy).filter(NCBITaxonomy.taxon_id == taxon_id).first() 

62 if not q: 

63 raise NoResultFound() 

64 return q 

65 

66 @classmethod 

67 def fetch_taxon_by_species_name(cls, session: Session, name: str) -> NCBITaxonomy: 

68 """Returns first taxonomy object matching ``name`` 

69 

70 Args: 

71 name: Scientific ncbi_taxa_name.name in database 

72 

73 Raises: 

74 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id`` does not exist 

75 """ 

76 q = ( 

77 session.query(NCBITaxonomy) 

78 .filter(NCBITaxonomy.name == (name.replace("_", " "))) 

79 .filter(NCBITaxonomy.name_class == "scientific name") 

80 .first() 

81 ) 

82 if not q: 

83 raise NoResultFound() 

84 return q 

85 

86 @classmethod 

87 def parent(cls, session: Session, taxon_id: int) -> NCBITaxonomy: 

88 """Returns taxonomy node object for parent node 

89 

90 Args: 

91 taxon_id: Unique taxonomy identifier in database 

92 

93 Raises: 

94 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id`` does not exist 

95 """ 

96 ParentTaxonomy = aliased(NCBITaxonomy, name="parent_ncbi_taxonomy") 

97 q = ( 

98 session.query(NCBITaxonomy, ParentTaxonomy) 

99 .outerjoin(ParentTaxonomy, NCBITaxonomy.parent_id == ParentTaxonomy.taxon_id) 

100 .filter(NCBITaxonomy.taxon_id == taxon_id) 

101 .filter(ParentTaxonomy.name_class == "scientific name") 

102 .first() 

103 ) 

104 try: 

105 return q[1] 

106 except TypeError as exc: 

107 raise NoResultFound() from exc 

108 

109 @classmethod 

110 def children(cls, session: Session, taxon_id: int) -> tuple: 

111 """Returns taxonomy node object for children nodes 

112 

113 Args: 

114 taxon_id: Unique taxonomy identifier in database 

115 

116 Raises: 

117 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id`` does not exist 

118 or has no children 

119 """ 

120 q = ( 

121 session.query(NCBITaxonomy) 

122 .filter(NCBITaxonomy.parent_id == taxon_id) 

123 .filter(NCBITaxonomy.name_class == "scientific name") 

124 .all() 

125 ) 

126 results = list(q) 

127 rows = [x.__dict__ for x in results] 

128 q = tuple(rows) 

129 if not q: 

130 raise NoResultFound() 

131 return q 

132 

133 @classmethod 

134 def is_root(cls, session: Session, taxon_id: int) -> bool: 

135 """Returns True if ``taxon_id`` is a root and False if not 

136 

137 Args: 

138 taxon_id: Unique taxonomy identifier in database 

139 """ 

140 try: 

141 if ( 141 ↛ 149line 141 didn't jump to line 149

142 session.query(NCBITaxaNode) 

143 .filter(NCBITaxaNode.root_id == taxon_id, NCBITaxaNode.taxon_id == taxon_id) 

144 .one() 

145 ): 

146 return True 

147 except NoResultFound: 

148 return False 

149 return False 

150 

151 @classmethod 

152 def num_descendants(cls, session: Session, taxon_id: int) -> int: 

153 """Returns number of descendants from ``taxon_id`` 

154 

155 Args: 

156 taxon_id: Unique taxonomy identifier in database 

157 

158 Raises: 

159 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id`` does not exist 

160 """ 

161 session.query(NCBITaxaNode).filter(NCBITaxaNode.taxon_id == taxon_id).one() 

162 right_index = ( 

163 session.query(NCBITaxaNode.right_index).filter(NCBITaxaNode.taxon_id == taxon_id).scalar() 

164 ) 

165 left_index = session.query(NCBITaxaNode.left_index).filter(NCBITaxaNode.taxon_id == taxon_id).scalar() 

166 return (right_index - left_index - 1) / 2 

167 

168 @classmethod 

169 def is_leaf(cls, session: Session, taxon_id: int) -> bool: 

170 """Returns True if ``taxon_id`` is a leaf and False if not 

171 

172 Args: 

173 taxon_id: Unique taxonomy identifier in database 

174 

175 Raises: 

176 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id`` does not exist 

177 """ 

178 if cls.num_descendants(session, taxon_id) == 0: 

179 return True 

180 return False 

181 

182 @classmethod 

183 def fetch_ancestors(cls, session: Session, taxon_id: int) -> Tuple: 

184 """Returns a tuple of ancestor node objects from ``taxon_id`` 

185 

186 Args: 

187 taxon_id: Unique taxonomy identifier in database 

188 

189 Raises: 

190 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id`` does not exist 

191 or has no ancestors 

192 """ 

193 ParentTaxaNode = aliased(NCBITaxaNode) 

194 q = ( 

195 session.query(ParentTaxaNode, NCBITaxaNode) 

196 .outerjoin( 

197 NCBITaxaNode, 

198 and_( 

199 NCBITaxaNode.left_index.between(ParentTaxaNode.left_index, ParentTaxaNode.right_index), 

200 ParentTaxaNode.taxon_id != NCBITaxaNode.taxon_id, 

201 ), 

202 ) 

203 .filter(NCBITaxaNode.taxon_id == taxon_id) 

204 .all() 

205 ) 

206 if not q: 

207 raise NoResultFound() 

208 results = [] 

209 for row in q: 

210 taxon = row[0].__dict__ 

211 results.append(taxon) 

212 ordered_results = sorted(results, key=lambda x: x["taxon_id"]) 

213 q = tuple(ordered_results) 

214 return q 

215 

216 @classmethod 

217 def all_common_ancestors(cls, session: Session, taxon_id_1: int, taxon_id_2: int) -> tuple: 

218 """Returns a tuple of common ancestor node objects shared between taxa 

219 

220 Args: 

221 taxon_id_1: Unique taxonomy identifier in database 

222 taxon_id_2: Unique taxonomy identifier in database 

223 

224 Raises: 

225 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id_1`` or 

226 ``taxon_id_2`` do not exist or have no common ancestors 

227 """ 

228 taxon_1_ancestors = cls.fetch_ancestors(session, taxon_id_1) 

229 taxon_2_ancestors = cls.fetch_ancestors(session, taxon_id_2) 

230 if taxon_1_ancestors is None or taxon_2_ancestors is None: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 raise NoResultFound() 

232 ancestors_1 = list(taxon_1_ancestors) 

233 ancestors_2 = list(taxon_2_ancestors) 

234 ancestors_ids_1 = [taxon["taxon_id"] for taxon in ancestors_1] 

235 ancestors_ids_2 = [taxon["taxon_id"] for taxon in ancestors_2] 

236 common_ancestors = list(set(ancestors_ids_1).intersection(ancestors_ids_2)) 

237 common_ancestors.sort(key=lambda taxon_id: (-cls.num_descendants(session, taxon_id), taxon_id)) 

238 all_common_ancs = [cls.fetch_node_by_id(session, taxon_id) for taxon_id in common_ancestors] 

239 return tuple(all_common_ancs) 

240 

241 @classmethod 

242 def last_common_ancestor(cls, session: Session, taxon_id_1: int, taxon_id_2: int) -> NCBITaxonomy: 

243 """Returns most recent common ancestor node object shared between taxa 

244 

245 Args: 

246 taxon_id_1: Unique taxonomy identifier in database 

247 taxon_id_2: Unique taxonomy identifier in database 

248 

249 Raises: 

250 sqlalchemy.orm.exc.NoResultFound: if ``taxon_id_1`` or 

251 ``taxon_id_2`` do not exist or have no common ancestors 

252 """ 

253 common_ancestors = cls.all_common_ancestors(session, taxon_id_1, taxon_id_2) 

254 return common_ancestors[0]