# Copyright (C) 2018, 2019 Columbia University Irving Medical Center,
# New York, USA
# Copyright (C) 2019 Novo Nordisk Foundation Center for Biosustainability,
# Technical University of Denmark
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Definition of `DfbaModel` class."""
import logging
import tempfile
from typing import Dict, List, Optional, Tuple
import pandas as pd
from cobra import DictList, Model, Object, Reaction
from cobra.util import solver
from optlang import symbolics
from . import jit, library
from .control import ControlParameter
from .dfba_utils import * # noqa: F403
from .exchange import ExchangeFlux
from .types import Expression, Problem
[docs]logger = logging.getLogger(__name__)
# Lines refering to the imported functions from .dbfa utils must be commented
# in-line to ignore F405 since flake8 can't understand this import
# (C++ level related)
[docs]class DfbaModel(Object):
"""Class representation for a dynamic FBA model.
Parameters
----------
cobra_object : cobra.Model
Existing `cobra.Model` object representing FBA model.
Attributes
----------
cobra_model : cobra.Model
Existing `cobra.Model` object containing FBA model
(reactions, metabolites, objective).
reactions : DictList
A `DictList` object where the key is the reaction identifier
and the value a `cobra.Reaction` object in cobra_model attribute.
objectives : list
A list containing identifiers of reactions to be used as objectives
in lexicographic optimization (currently not supported)
directions : list
A list containing directions (max or min) of each objective in
lexicographic optimization (currently not supported)
kinetic_variables : DictList
A `DictList` object where the key is the kinetic variable identifier
and the value a `KineticVariable` object.
exchange_fluxes : DictList
A `DictList` object where the key is the reaction identifier and the
value an `ExchangeFlux` object.
user_data : dfba_utils.UserData
A read only attribute containing user data of the model
to be passed to algorithm prior to simulation.
solver_data : dfba_utils.SolverData
An attribute containing data for the solver to be used for
simulation of the model.
"""
def __init__(self, cobra_object: Model) -> None:
"""Init method."""
if isinstance(cobra_object, Model):
self._id = id(self)
self._cobra_model = cobra_object
self._reactions = cobra_object.reactions
self._objectives = []
self._directions = []
self._kinetic_variables = DictList()
self._exchange_fluxes = DictList()
self._user_data = UserData() # noqa: F405
self._solver_data = SolverData() # noqa: F405
else:
raise Exception(
"Error: must use instance of class cobra.Model " "to init DfbaModel!"
)
[docs] def add_objectives(self, objectives: List, directions: List) -> None:
"""Add objectives.
Parameters
-------
objectives : list
The list of reaction indetifiers to be added to the model as
objectives for lexicographic optimization.
directions : list
The list of directions (max or min) of each objective to be
added to the model for lexicographic optimization.
"""
# TODO: currently not supported. Should it raise NotImplementedError?
if type(objectives) is not list:
objectives = [objectives]
if type(directions) is not list:
directions = [directions]
if len(objectives) != len(directions):
raise Exception(
"Error: objectives list must be same length as directions list!"
)
self._objectives.extend(objectives)
self._directions.extend(directions)
[docs] def add_kinetic_variables(self, kinetic_variable_list: List) -> None:
"""Add kinetic variables.
Parameters
-------
kinetic_variable_list : list
The list of indetifiers of kinetic variables to be added to the
model.
"""
if type(kinetic_variable_list) is not list:
kinetic_variable_list = [kinetic_variable_list]
def existing_filter(kinetic_variable):
if kinetic_variable.id in self.kinetic_variables:
logger.warning(
f"Ignoring kinetic variable {kinetic_variable.id} since it"
f"already exists in the model."
)
return False
return True
pruned = DictList(filter(existing_filter, kinetic_variable_list))
self._kinetic_variables += pruned
DictList.sort(self._kinetic_variables)
counter = 0
for kinetic_variable in self._kinetic_variables:
symbolics.Symbol.__init__(kinetic_variable, "yval[" + str(counter) + "]")
counter += 1
[docs] def add_exchange_fluxes(self, exchange_flux_list: List) -> None:
"""Add exchange fluxes.
Parameters
-------
exchange_flux_list : list
list of identifiers of exchange fluxes to be added to the model.
"""
if type(exchange_flux_list) is not list:
exchange_flux_list = [exchange_flux_list]
def existing_filter(exchange_flux: ExchangeFlux) -> bool:
if exchange_flux.id not in self.reactions:
logger.warning(
f"Ignoring exchange flux {exchange_flux.id} since it does"
f"not correspond to a reaction in the model."
)
return False
if exchange_flux.id in self.exchange_fluxes:
logger.warning(
f"Ignoring exchange flux {exchange_flux.id} since it"
f"already exists in the model."
)
return False
return True
pruned = DictList(filter(existing_filter, exchange_flux_list))
self._exchange_fluxes += pruned
DictList.sort(self._exchange_fluxes)
counter = 0
for exchange_flux in self._exchange_fluxes:
symbolics.Symbol.__init__(exchange_flux, "v" + str(counter))
counter += 1
[docs] def add_initial_conditions(self, initial_conditions: Dict) -> None:
"""Add initial conditions.
Parameters
-------
initial_conditions : dict
A dict where the key is the kinetic variable identifier and
the value an initial condition.
"""
if type(initial_conditions) is not dict:
raise Exception(
"Error: initial conditions should be dict of format"
"{kinetic_variable.id: initial condition}!"
)
for key in initial_conditions.keys():
if key in self.kinetic_variables:
self.kinetic_variables.get_by_id(
key
).initial_condition = initial_conditions[key]
else:
logger.warning(
f"Ignoring initial condition for {key} since it does not"
f"correspond to a kinetic variable in the model."
)
[docs] def add_rhs_expression(
self,
kinetic_variable_id: str,
expression: Expression,
control_parameters: Optional[List[ControlParameter]] = None,
) -> None:
"""Add rhs expression.
Parameters
-------
kinetic_variable_id : string
Identifier of the kinetic variable to be supplied with
rhs expression for calculating its derivative wrt time.
expression : optlang.symbolics expression
The symbolic expression for calculating derivative of kinetic
variable wrt time.
control_parameters : list
A list of `ControlParameter` objects (if any) appearing in the
supplied symbolic expression.
"""
if control_parameters is not None:
if type(control_parameters) is not list:
control_parameters = [control_parameters]
if kinetic_variable_id not in self.kinetic_variables:
raise Exception(
"Error: {} is not a kinetic variable in model!".format(
kinetic_variable_id
)
)
self.kinetic_variables.get_by_id(kinetic_variable_id).rhs_expression = [
expression,
control_parameters,
]
[docs] def add_exchange_flux_lb(
self,
exchange_flux_id: str,
expression: Expression,
condition: Optional[Expression] = None,
control_parameters: Optional[List[ControlParameter]] = None,
) -> None:
"""Add exchange flux lower bound.
Parameters
-------
exchange_flux_id : string
Indetifier of the exchange flux to be supplied with expression
for calculating its lower bound.
expression : optlang.symbolics expression
The symbolic expression for calculating lower bound of exchange
flux. Convention is that lower bounds of exchange fluxes come
with negative sign and therefore expression should be
non-negative,representing the magnitude of this lower bound.
condition : optlang.symbolics expression
The symbolic expression for non-negative condition on metabolite
concentrations required for correct evaluation of lower bound
expression. Numerical approximation can generate unphysical,
negative concetration values.
control_parameters : list
A list of `ControlParameter` objects (if any) appearing in the
supplied symbolic expression.
"""
if control_parameters is not None:
if type(control_parameters) is not list:
control_parameters = [control_parameters]
if exchange_flux_id not in self.exchange_fluxes:
raise Exception(
"Error: reaction {} is not an exchange flux in model!".format(
exchange_flux_id
)
)
self.exchange_fluxes.get_by_id(exchange_flux_id).lower_bound_expression = [
expression,
condition,
control_parameters,
]
[docs] def add_exchange_flux_ub(
self,
exchange_flux_id: str,
expression: Expression,
condition: Optional[Expression] = None,
control_parameters: Optional[List[ControlParameter]] = None,
) -> None:
"""Add exchange flux upper bound.
Parameters
-------
exchange_flux_id : string
Indetifier of the exchange flux to be supplied with expression
for calculating its upper bound.
expression : optlang.symbolics expression
The symbolic expression for calculating upper bound of exchange
flux. Convention is that upper bounds of exchange fluxes come
with positive sign and therefore expression should be
non-negative, representing the magnitude of this upper bound.
condition : optlang.symbolics expression
The symbolic expression for non-negative condition on metabolite
concentrations required for correct evaluation of upper bound
expression. Numerical approximation can generate unphysical,
negative concetration values.
control_parameters : list
A list of `ControlParameter` objects (if any) appearing in the
supplied symbolic expression.
"""
if control_parameters is not None:
if type(control_parameters) is not list:
control_parameters = [control_parameters]
if exchange_flux_id not in self.exchange_fluxes:
raise Exception(
"Error: reaction {} is not an exchange flux in model!".format(
exchange_flux_id
)
)
self.exchange_fluxes.get_by_id(exchange_flux_id).upper_bound_expression = [
expression,
condition,
control_parameters,
]
[docs] def simulate(
self,
tstart: float,
tstop: float,
tout: float,
output_fluxes: Optional[List[str]] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Simulate model.
Parameters
-------
tstart : float
Initial time point to be used in simulation of the model.
tstop : float
Final time point to be used in simulation of the model.
tout : float
Output frequency to be used in simulation of the model.
output_fluxes : list
Optional list of reaction ids whose fluxes are to be printed to
results along with kinetic variables.
Returns
-------
tuple of 2 pd.Dataframe's
1. time, concentrations (in `self.kinetic_variables`)
2. time, flux trajectories (in )
"""
print_fluxes = []
if output_fluxes:
for rxn in output_fluxes:
if not self.reactions.__contains__(rxn):
logger.warning(
f"Ignoring reaction {rxn} since it does not correspond"
f"to a reaction in the model."
)
else:
print_fluxes.extend([self.reactions.get_by_id(rxn)])
else:
output_fluxes = []
with tempfile.TemporaryDirectory() as directory:
logger.debug(f"The created temporary directory is {directory}")
library.create(directory)
self.add_to_library(tstart, tstop, tout, print_fluxes, directory)
functionlib = jit.compile(directory)
logger.debug(f"Function library {functionlib} compiled successfully...")
results = simulate_dfba_model(self, functionlib) # noqa: F405
# Post-processing of results
kv_ids = [kv.id for kv in self.kinetic_variables]
header = ["time"] + kv_ids + output_fluxes
df_results = pd.DataFrame(results, columns=header)
return (
df_results[["time"] + kv_ids],
df_results[["time"] + output_fluxes],
)
[docs] def lp_problem(self) -> Problem:
"""LP problem.
Returns
-------
lp_problem : Swig Object of type `glp_prob *`
SWIGLPK object representing FBA model as pointer to GLPK problem
"""
self.cobra_model.solver.update()
return self.cobra_model.solver.problem
# TODO:
# def lp_config(self):
"""
LP problem configuration
Returns
-------
lp_problem : proxy of Swig Object of type `glp_smcp *`
SWIGLPK object representing configuration of GLPK problem.
"""
# self.cobra_model.solver.update()
# return self.cobra_model.solver.configuration._smcp
@property
[docs] def id(self) -> str:
"""."""
return getattr(self, "_id", None)
@property
[docs] def cobra_model(self) -> Model:
"""."""
return self._cobra_model
@property
[docs] def reactions(self) -> DictList:
"""."""
return self._reactions
@property
[docs] def objectives(self) -> List:
"""."""
return self._objectives
@property
[docs] def directions(self) -> List:
"""."""
return self._directions
@property
[docs] def kinetic_variables(self) -> DictList:
"""."""
return self._kinetic_variables
@property
[docs] def exchange_fluxes(self) -> DictList:
"""."""
return self._exchange_fluxes
@property
[docs] def user_data(self) -> UserData: # noqa: F405
"""."""
return self._user_data
@property
[docs] def solver_data(self) -> SolverData: # noqa: F405
"""."""
return self._solver_data
[docs] def add_to_library(
self,
tstart: float,
tstop: float,
tout: float,
print_fluxes: List[Reaction],
directory: str,
) -> None:
"""Add model to library.
Parameters
------- tstart : float
Initial time point to be used in simulation of the model.
tstop : float
Final time point to be used in simulation of the model.
tout : float
Length of time interval for output.
print_fluxes : list
List of reactions whose fluxes are to be printed to results
along with kinetic variables.
directory: string
Path to temporary directory containing library.
"""
for objective in self.objectives[:-1]:
logger.debug(f"Adding objective to user data for model {self.id}.")
rxn = self.reactions.index(objective)
expression = (
self.cobra_model.variables[2 * rxn]
- self.cobra_model.variables[2 * rxn + 1]
)
constraint = self.cobra_model.problem.Constraint(
expression, name=objective, ub=None, lb=None
)
solver.add_cons_vars_to_problem(self.cobra_model, constraint, sloppy=True)
for objective in self.objectives:
self.user_data.add_objective(
library.col_indices(self.reactions, objective), [1, -1]
)
self.user_data.set_directions(self.directions)
self.cobra_model.solver.update()
nkin = len(self.kinetic_variables)
initial_conditions = [tstart]
change_pnts = []
control_parameter = None
for kinetic_variable in self.kinetic_variables:
initial_conditions.extend([kinetic_variable.initial_condition])
if kinetic_variable.rhs_expression[1] is not None:
for k in range(len(kinetic_variable.rhs_expression[1])):
control_parameter = kinetic_variable.rhs_expression[1][k]
change_pnts.extend(control_parameter.change_points)
nrow = len(self.cobra_model.constraints)
required_indices = []
for exchange_flux in self.exchange_fluxes:
required_indices.extend(
library.indices(nrow, self.reactions, exchange_flux.id)
)
nreq = len(required_indices)
exchange_indices = []
for exchange_flux in self.exchange_fluxes:
lb = exchange_flux.lower_bound_expression
ub = exchange_flux.upper_bound_expression
if ub is not None:
exchange_indices.append(
library.indices(nrow, self.reactions, exchange_flux.id)[0]
)
if ub[2] is not None:
for k in range(len(ub[2])):
control_parameter = ub[2][k]
change_pnts.extend(control_parameter.change_points)
if lb is not None:
exchange_indices.append(
library.indices(nrow, self.reactions, exchange_flux.id)[1]
)
if lb[2] is not None:
for k in range(len(lb[2])):
control_parameter = lb[2][k]
change_pnts.extend(control_parameter.change_points)
nexc = len(exchange_indices)
change_pnts = list(set(change_pnts))
change_pnts = sorted(change_pnts)
current_indices = []
for rxn in print_fluxes:
current_indices.extend(library.indices(nrow, self.reactions, rxn.id))
logger.debug("Adding model {self.id} to library.")
library.write_model_to_file(
str(self.id),
self.kinetic_variables,
self.exchange_fluxes,
nexc,
nreq,
exchange_indices,
change_pnts,
directory,
)
logger.debug("Setting user data for model {self.id}.")
self.user_data.set_name(str(self.id))
self.user_data.set_kinetic_dimensions(nkin, nexc, nreq)
self.user_data.set_output_times(tstop, tout)
self.user_data.set_initial_conditions(initial_conditions)
self.user_data.set_exchange_indices(exchange_indices)
self.user_data.set_required_indices(required_indices)
self.user_data.set_current_indices(current_indices)
self.user_data.set_change_points(
change_pnts if not all(el is None for el in change_pnts) else []
)