Source code for dfba.library

# Copyright (C) 2018, 2019 Columbia University Irving Medical Center,
#     New York, USA
# Copyright (C) 2019 Novo Nordisk Foundation Center for Biosustainability,
#     Technical University of Denmark

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Functions to add a given `DfbaModel` to the dynamic library."""

import os
from typing import Dict, List, TextIO

import sympy
from cobra import DictList


[docs]def WriteIndent(theFile: TextIO, indentCount: int, text: str) -> None: """Write indent. Parameters ------- theFile : file Open cpp file. indentCount : int Number of tabs to indent. text : string The line of text to write to file. """ while indentCount > 0: text = " " + text indentCount -= 1 theFile.write(text)
[docs]def indices(nrow: int, reactions: DictList, reaction_id: str) -> List[int]: """Write indices. Parameters ------- nrow : int Number of rows in LP problem. reactions : DictList A DictList where the key is the reaction identifier and the value a `cobra.Reaction` object. reaction_id : string Identifier of the reaction to return indices for. Returns ------- indices : list lpindices of forward and reverse reaction. """ if reaction_id not in reactions: raise Exception("Error: {} is not a reaction in the model!".format(reaction_id)) forward_index = 2 * reactions.index(reaction_id) + nrow reverse_index = forward_index + 1 return [forward_index, reverse_index]
[docs]def col_indices(reactions: DictList, reaction_id: str) -> List[int]: """Write column indices. Parameters ------- reactions : DictList A `DictList` object where the key is the reaction identifier and the value a `cobra.Reaction` object. reaction_id : string Identifier of the reaction to return column indices for. Returns ------- indices : list Column indices of forward and reverse reaction. """ if reaction_id not in reactions: raise Exception("Error: {} is not a reaction in the model!".format(reaction_id)) forward_index = 2 * reactions.index(reaction_id) reverse_index = forward_index + 1 return [forward_index, reverse_index]
[docs]def ctrl_parm_expression(ctrl_parms: Dict, change_pnts: List) -> str: """Control paramter expression. Parameters ------- ctrl_parms : dict A dict of control parameters to be included in expression. change_pnts : list Ordered list of change points from control parameters in model. Returns ------- control_expression : string The control parameter expression. """ if bool(ctrl_parms): # if dict of control parameters not empty control_expression = "" ctrl_cp_idxs = {} # loop over control parameters, add their definitions, and track # individual change points for ctrl_parm in ctrl_parms.keys(): control_parameter = ctrl_parms[ctrl_parm] control_expression += ( "double " + str(control_parameter.id) + " = " + str(control_parameter.values[0]) + ";\n\t" ) ctrl_cp_idxs[ctrl_parm] = 0 control_expression += "switch(flag){\n" # loop over change points for i in range(len(change_pnts)): control_expression += "\t\tcase " + str(i + 1) + ":\n" for ctrl_parm in ctrl_parms.keys(): control_parameter = ctrl_parms[ctrl_parm] cp_idx = ctrl_cp_idxs[ctrl_parm] if control_parameter.change_points[cp_idx] == change_pnts[i]: control_expression += ( "\t\t\t" + str(control_parameter.id) + " = " + str(control_parameter.values[cp_idx + 1]) + ";\n" ) if cp_idx < len(control_parameter.change_points) - 1: ctrl_cp_idxs[ctrl_parm] += 1 elif control_parameter.change_points[cp_idx] < change_pnts[i]: control_expression += ( "\t\t\t" + str(control_parameter.id) + " = " + str(control_parameter.values[cp_idx + 1]) + ";\n" ) else: control_expression += ( "\t\t\t" + str(control_parameter.id) + " = " + str(control_parameter.values[cp_idx]) + ";\n" ) control_expression += "\t\t\tbreak;\n" control_expression += "\t}\n" return control_expression else: raise Exception("The supplied dict of control parameters is empty!")
[docs]def create(directory: str) -> None: """Create library. Parameters ------- directory : string Path to temporary directory containing library. """ cppfilePath = os.path.join(directory, "functionlib.cpp") with open(cppfilePath, "w") as cppFile: cppFile.write("#include <stdlib.h> \n#include <cmath> \n#include <vector> \n\n")
[docs]def write_model_to_file( name: str, kinetic_variables: DictList, exchange_fluxes: DictList, nexc: int, nreq: int, exchange_indices: List[int], change_pnts: List, directory: str, ) -> None: """Write `DfbaModel` to file. Parameters ------- name : string str of `DfbaModel` id kinetic_variables : DictList A `DictList` object where the key is the kinetic variable identifier and the value a `KineticValue` object. exchange_fluxes : DictList A `DictList` object where the key is the exchange flux identifier and the value an `ExchangeFLux` object. nexc : int Number of exchange fluxes. nreq : int Number of fluxes required to define ODE right-hand side. exchange_indics : list Indices of exchange fluxes. change_pnts : list Ordered list of change points from control parameters in model. directory: string Path to temporary directory containing library. """ # Collect the kinetic expression kinetics = [] for i in range(nreq // 2): kinetics.append( "double v{} = fluxes[{}] - fluxes[{}]; \n".format(i, 2 * i, 2 * i + 1) ) kinetic_ctrl_parms = {} for kinetic_variable in kinetic_variables: if kinetic_variable.rhs_expression[1] is not None: for k in range(len(kinetic_variable.rhs_expression[1])): ctrl_parm_id = kinetic_variable.rhs_expression[1][k].id if ctrl_parm_id not in kinetic_ctrl_parms.keys(): kinetic_ctrl_parms[ctrl_parm_id] = kinetic_variable.rhs_expression[ 1 ][k] if bool(kinetic_ctrl_parms): control_expression = ctrl_parm_expression(kinetic_ctrl_parms, change_pnts) kinetics.append(control_expression) counter = 0 for kinetic_variable in kinetic_variables: kinetics.append( "rval[" + str(counter) + "] = " + sympy.ccode(kinetic_variable.rhs_expression[0]) + " - ypval[" + str(counter) + "]; \n" ) counter += 1 # Collect exchange flux expressions upper_bounds = [] lower_bounds = [] for exchange_flux in exchange_fluxes: if exchange_flux.upper_bound_expression is not None: condition = exchange_flux.upper_bound_expression[1] bound = None ub_ctrl_parms = {} if exchange_flux.upper_bound_expression[0] is not None: bound = exchange_flux.upper_bound_expression[0] if exchange_flux.upper_bound_expression[2] is not None: for k in range(len(exchange_flux.upper_bound_expression[2])): ctrl_parm_id = exchange_flux.upper_bound_expression[2][k].id if ctrl_parm_id not in ub_ctrl_parms.keys(): ub_ctrl_parms[ ctrl_parm_id ] = exchange_flux.upper_bound_expression[2][k] control_expression = "" if bool(ub_ctrl_parms): control_expression = ctrl_parm_expression(ub_ctrl_parms, change_pnts) if condition is not None: upper_bounds.append( "\t\t\tif(" + sympy.ccode(condition) + "<0.0){\n\t\t\t\treturn 0.0;\n\t\t\t}\n\t\t\telse{\n" + control_expression + "\n" + "\t\t\t\treturn " + sympy.ccode(bound) + ";\n\t\t\t}\n" ) else: upper_bounds.append( "\t\t\t" + control_expression + "\n" + "\t\t\treturn " + str(bound) + ";\n" ) if exchange_flux.lower_bound_expression is not None: condition = exchange_flux.lower_bound_expression[1] bound = None lb_ctrl_parms = {} if exchange_flux.lower_bound_expression[0] is not None: bound = exchange_flux.lower_bound_expression[0] if exchange_flux.lower_bound_expression[2] is not None: for k in range(len(exchange_flux.lower_bound_expression[2])): ctrl_parm_id = exchange_flux.lower_bound_expression[2][k].id if ctrl_parm_id not in lb_ctrl_parms.keys(): lb_ctrl_parms[ ctrl_parm_id ] = exchange_flux.lower_bound_expression[2][k] control_expression = "" if bool(lb_ctrl_parms): control_expression = ctrl_parm_expression(lb_ctrl_parms, change_pnts) if condition is not None: upper_bounds.append( "\t\t\tif(" + sympy.ccode(condition) + "<0.0){\n\t\t\t\treturn 0.0;\n\t\t\t}\n\t\t\telse{\n" + control_expression + "\n" + "\t\t\t\treturn " + sympy.ccode(bound) + ";\n\t\t\t}\n" ) else: upper_bounds.append( "\t\t\t" + control_expression + "\n" + "\t\t\treturn " + sympy.ccode(bound) + ";\n" ) # Write functions to file cppfilePath = os.path.join(directory, "functionlib.cpp") with open(cppfilePath, "a") as cppFile: t = 0 # write the kinetics function WriteIndent( cppFile, t, 'extern "C" void kinetics' + name + "(double tval, double *yval, double *ypval, double *rval, " + "std::vector<double> fluxes, int flag)\n", ) WriteIndent(cppFile, t, "{\n") for i in range(len(kinetics)): WriteIndent(cppFile, t + 1, kinetics[i]) WriteIndent(cppFile, t, "}\n\n") t = 0 # write the upper bounds function WriteIndent( cppFile, t, 'extern "C" double upper_bounds' + name + "(int i, double tval, double *yval, int flag)\n", ) WriteIndent(cppFile, t, "{\n") if len(upper_bounds) > 0: WriteIndent(cppFile, t + 1, "switch(i){\n") for i in range(nexc): WriteIndent( cppFile, t + 2, "case " + str(exchange_indices[i]) + ": {\n", ) WriteIndent(cppFile, t, upper_bounds[i]) WriteIndent(cppFile, t + 3, "break;\n") WriteIndent(cppFile, t + 2, "}\n") WriteIndent(cppFile, t + 2, "default: {\n") WriteIndent(cppFile, t + 3, "exit(-1);\n") WriteIndent(cppFile, t + 2, "}\n") WriteIndent(cppFile, t + 1, "}\n") WriteIndent(cppFile, t + 1, "return 0.0; \n") WriteIndent(cppFile, t, "}\n\n") t = 0 # write the lower bounds function WriteIndent( cppFile, t, 'extern "C" double lower_bounds' + name + "(int i, double tval, double *yval, int flag)\n", ) WriteIndent(cppFile, t, "{\n") if len(lower_bounds) > 0: WriteIndent(cppFile, t + 1, "switch(i){\n") for i in range(nexc): WriteIndent( cppFile, t + 2, "case " + str(exchange_indices[i]) + ": {\n", ) WriteIndent(cppFile, t, lower_bounds[i]) WriteIndent(cppFile, t + 3, "break;\n") WriteIndent(cppFile, t + 2, "}\n") WriteIndent(cppFile, t + 2, "default: {\n") WriteIndent(cppFile, t + 3, "exit(-1);\n") WriteIndent(cppFile, t + 2, "}\n") WriteIndent(cppFile, t + 1, "}\n") WriteIndent(cppFile, t + 1, "return 0.0; \n") WriteIndent(cppFile, t, "}\n\n")