Examples for PhreeqPy

Simple example

This is an example for the use PhreeqPy for one-dimensional advection. It is the example 11 from the PHREEQC users manual.

"""Advection with DLL or COM server.

Using MODFIY we update the concentration on every
time step. We shift by one cell per step.
"""

from __future__ import print_function

import sys

# Simple Python 3 compatibility adjustment.
if sys.version_info[0] == 2:
    range = xrange

import os
import timeit

MODE = 'dll'  # 'dll' or 'com'

if MODE == 'com':
    import phreeqpy.iphreeqc.phreeqc_com as phreeqc_mod
elif MODE == 'dll':
    import phreeqpy.iphreeqc.phreeqc_dll as phreeqc_mod
else:
    raise Exception('Mode "%s" is not defined use "com" or "dll".' % MODE)


def make_initial_conditions():
    """
    Specify initial conditions data blocks.

    Uniform initial conditions are assumed.
    """
    initial_conditions = """
    TITLE Example 11.--Transport and ion exchange.
    SOLUTION 0  CaCl2
        units            mmol/kgw
        temp             25.0
        pH               7.0     charge
        pe               12.5    O2(g)   -0.68
        Ca               0.6
        Cl               1.2
    SOLUTION 1  Initial solution for column
        units            mmol/kgw
        temp             25.0
        pH               7.0     charge
        pe               12.5    O2(g)   -0.68
        Na               1.0
        K                0.2
        N(5)             1.2
        END
    EXCHANGE 1
        equilibrate 1
        X                0.0011
    END
        """
    return initial_conditions


def make_selected_output(components):
    """
    Build SELECTED_OUTPUT data block
    """
    headings = "-headings    cb    H    O    "
    for i in range(len(components)):
        headings += components[i] + "\t"
    selected_output = """
    SELECTED_OUTPUT
        -reset false
    USER_PUNCH
    """
    selected_output += headings + "\n"
    #
    # charge balance, H, and O
    #
    code = '10 w = TOT("water")\n'
    code += '20 PUNCH CHARGE_BALANCE, TOTMOLE("H"), TOTMOLE("O")\n'
    #
    # All other elements
    #
    lino = 30
    for component in components:
        code += '%d PUNCH w*TOT(\"%s\")\n' % (lino, component)
        lino += 10
    selected_output += code
    return selected_output


def initialize(cells, first=False):
    """
    Initialize IPhreeqc module
    """
    phreeqc = phreeqc_mod.IPhreeqc()
    phreeqc.load_database(r"phreeqc.dat")
    initial_conditions = make_initial_conditions()
    phreeqc.run_string(initial_conditions)
    components = phreeqc.get_component_list()
    selected_output = make_selected_output(components)
    phreeqc.run_string(selected_output)
    phc_string = "RUN_CELLS; -cells 0-1\n"
    phreeqc.run_string(phc_string)
    conc = get_selected_output(phreeqc)
    inflow = {}
    initial = {}
    for name in conc:
        if first:
            inflow[name] = conc[name][0]
        else:
            inflow[name] = conc[name][1]
        initial[name] = conc[name][1]
    task = initial_conditions + "\n"
    task += "COPY solution 1 %d-%d\n" % (cells[0], cells[1])
    task += "COPY exchange 1 %d-%d\n" % (cells[0], cells[1])
    task += "END\n"
    task += "RUN_CELLS; -cells %d-%d\n" % (cells[0], cells[1])
    task += selected_output
    phreeqc.run_string(task)
    conc = get_selected_output(phreeqc)
    for name in conc:
        value = [initial[name]] * len(conc[name])
        conc[name] = value
    return phreeqc, inflow, conc


def advect_step(phreeqc, inflow, conc, cells):
    """Advect by shifting concentrations from previous time step.
    """
    all_names = conc.keys()
    names = [name for name in all_names if name not in ('cb', 'H', 'O')]
    for name in conc:
        # shift one cell
        conc[name][1:] = conc[name][:-1]
        conc[name][0] = inflow[name]
    modify = []
    for index, cell in enumerate(range(cells[0], cells[1] + 1)):
        modify.append("SOLUTION_MODIFY %d" % cell)
        modify.append("\t-cb      %e" % conc['cb'][index])
        modify.append("\t-total_h %f" % conc['H'][index])
        modify.append("\t-total_o %f" % conc['O'][index])
        modify.append("\t-totals")
        for name in names:
            modify.append("\t\t%s\t%f" % (name, conc[name][index]))
    modify.append("RUN_CELLS; -cells %d-%d\n" % (cells[0], cells[1]))
    cmd = '\n'.join(modify)
    phreeqc.run_string(cmd)
    conc = get_selected_output(phreeqc)
    return conc


def get_selected_output(phreeqc):
    """Return calculation result as dict.

    Header entries are the keys and the columns
    are the values as lists of numbers.
    """
    output = phreeqc.get_selected_output_array()
    header = output[0]
    conc = {}
    for head in header:
        conc[head] = []
    for row in output[1:]:
        for col, head in enumerate(header):
            conc[head].append(row[col])
    return conc


def run(ncells, shifts, specie_names):
    """Do one run in one process.
    """
    cells = (1, ncells)
    phreeqc, inflow, conc = initialize(cells, first=True)
    outflow = {}
    for name in specie_names:
        outflow[name] = []
    for _counter in range(shifts):
        # advect
        conc = advect_step(phreeqc, inflow, conc, cells)
        for name in specie_names:
            outflow[name].append(conc[name][-1])
    return outflow


def write_outflow(file_name, outflow):
    """Write the outflow values to a file.
    """
    fobj = open(file_name, 'w')
    header = outflow.keys()
    for head in header:
        fobj.write('%20s' % head)
    fobj.write('\n')
    for lineno in range(len(outflow[head])):
        for head in header:
            fobj.write('%20.17f' % outflow[head][lineno])
        fobj.write('\n')


def main(ncells, shifts):
    """Run different versions with and without multiprocessing
    """

    def measure_time(func, *args, **kwargs):
        """Convinience function to measure run times.
        """
        start = timeit.default_timer()
        result = func(*args, **kwargs)
        return result, timeit.default_timer() - start

    print('Dimensions')
    print('==========')
    print('number of cells:   ', ncells)
    print('number of shifts   ', shifts)
    specie_names = ('Ca', 'Cl', 'K', 'N', 'Na')
    outflow, run_time = measure_time(run, ncells, shifts, specie_names)
    if not os.path.exists('data'):
        os.mkdir('data')
    write_outflow('data/out.txt', outflow)
    print('run time:', run_time)
    print("Finished simulation\n")

if __name__ == '__main__':
    main(ncells=40, shifts=120)

Parallel Example

This is an example for the use PhreeqPy for one-dimensional advection running in parallel, using multiprocessing.

# -*- coding: utf-8 -*-

"""A coupled advection-reaction model using PhreeqPy.

The sketch below shows the setup. The coupled model contains a advection and a
reaction model. The reaction model can have one or more Phreeqc calculators.
The calculators can work in parallel using the module `multiprocessing`.

+--------------------------------------------------------------------------+
|                            CoupledModel                                  |
|  +--------------------------------------------------------------------+  |
|  |                         AdvectionModel                             |  |
|  |                                                                    |  |
|  +--------------------------------------------------------------------+  |
|  |                         ReactionModel                              |  |
|  |  +--------------------+--------------------+--------------------+  |  |
|  |  | PhreeqcCalculator1 | PhreeqcCalculator2 | PhreeqcCalculator3 |  |  |
+--------------------------------------------------------------------------+

Author: Mike Müller, mmueller@hydrocomputing.com
"""


import multiprocessing
import os
import timeit

import matplotlib.pyplot as plt

MODE = 'dll'  # 'dll' or 'com'

if MODE == 'com':
    import phreeqpy.iphreeqc.phreeqc_com as phreeqc_mod
elif MODE == 'dll':
    import phreeqpy.iphreeqc.phreeqc_dll as phreeqc_mod
else:
    raise Exception('Mode "%s" is not defined use "com" or "dll".' % MODE)


class CoupledModel(object):
    """This is a coupled advection model.

    Since it is just a simple example, we use a 1D model.
    The same approach can be applied to a 2d or 3D model
    as long as the advection model supports it.
    The PHREEQC part is the same.
    Furthermore, instead of a simple advection model,
    we can have a more sophisticated transport model.
    """

    def __init__(self, ncells, nshifts, initial_conditions, processes):
        self.nshifts = nshifts
        self.reaction_model = ReactionModel(ncells, initial_conditions,
                                            processes)
        self.reaction_model.make_initial_state()
        init_conc = dict([(name, [value] * ncells) for name, value in
                          self.reaction_model.init_conc.items()])
        self.advection_model = AdvectionModel(init_conc,
                                              self.reaction_model.inflow_conc)
        self.component_names = self.reaction_model.component_names
        self.results = {}
        for name in self.component_names:
            self.results[name] = []

    def run(self):
        """Go over all time steps (shifts).
        """
        for _ in range(self.nshifts):
            self.advection_model.advect()
            self.advection_model.save_results(self.results)
            self.reaction_model.modify(self.advection_model.conc)
            self.advection_model.update_conc(self.reaction_model.conc)
        self.reaction_model.finish()


class AdvectionModel(object):
    """Very simple 1D advection model.

    This model can be replaced by a more sophisticated transport
    model with two or three dimensions.
    This could be an external code such as MT3D or anything
    else that is moving concentrations through the subsurface.
    """

    def __init__(self, init_conc, inflow_conc):
        """Set the initial and inflow concentrations.

        Both concentrations are dictionaries with the specie names as keys.
        Values are 1D arrays for `init_conc` and scalars for `inflow_conc`.
        """
        self.conc = init_conc
        self.inflow_conc = inflow_conc
        self.outflow = {}

    def update_conc(self, new_conc):
        """Update the concentrations after the reactions step.

        This is very simple but could be more involved
        if the transport model is more complex.
        """
        self.conc = new_conc

    def advect(self):
        """Shift one cell.
        """
        for name in self.conc:
            self.outflow[name] = self.conc[name][-1]
            self.conc[name][1:] = self.conc[name][:-1]
            self.conc[name][0] = self.inflow_conc[name]

    def save_results(self, results):
        """Save the calculation results.

        Typically, we would write our results into a file.
        For simplicity we just add the current outflow that
        we stored in `self.outflow` and add it to `results`,
        which is a dictionary with all specie names as keys
        and lists as values.
        """
        for name in self.conc:
            results[name].append(self.outflow[name])


class ReactionModel(object):
    """Calculate reactions using PHREEQC as computational engine.

    We have no direct contact with IPhreeqc here.
    We make one or more instances of `PhreeqcCalculator`
    that are actually using IPhreeqc.
    We can use more than one processor with `multiprocessing`.
    """

    def __init__(self, ncells, initial_conditions, processes):
        if processes > ncells:
            raise ValueError('Number of processes needs to be less or equal '
                             'than number of cells. %d processes %d cells.'
                             % (processes, ncells))
        if processes < 1:
            raise ValueError('Need at least one process got %d' % processes)
        self.parallel = False
        if processes > 1:
            self.parallel = True
        self.ncells = ncells
        self.initial_conditions = initial_conditions
        self.processes = processes
        self.inflow_conc = {}
        self.init_conc = {}
        self.conc = {}
        self.component_names = []
        self.calculators = []
        self.cell_ranges = []
        self._init_calculators()
        self.make_initial_state()

    def _init_calculators(self):
        """If we are going parallel we need several calculators.
        """
        if self.parallel:
            # Domain decomposition.
            slave_ncells, reminder = divmod(self.ncells, self.processes)
            root_ncells = slave_ncells + reminder
            current_cell = root_ncells
            root_calculator = PhreeqcCalculator(root_ncells,
                                                self.initial_conditions)
            self.calculators = [root_calculator]
            self.cell_ranges = [(0, root_ncells)]
            for _ in range(self.processes - 1):
                self.calculators.append(PhreeqcCalculatorProxy(slave_ncells,
                                                    self.initial_conditions))
                self.cell_ranges.append((current_cell,
                                         current_cell + slave_ncells))
                current_cell += slave_ncells
            assert current_cell == self.ncells
            self.calculators.reverse()
            self.cell_ranges.reverse()
        else:
            root_calculator = PhreeqcCalculator(self.ncells,
                                                self.initial_conditions)
            # Just one calculator and the entire range but still use a list
            # to provide the same interface as the parallel case.
            self.calculators = [root_calculator]
            self.cell_ranges = [(0, self.ncells)]

    def make_initial_state(self):
        """Get the initial values from the calculator(s).
        """
        self.inflow_conc = self.calculators[0].inflow_conc
        self.init_conc = self.calculators[0].init_conc
        self.component_names = self.calculators[0].component_names
        if self.parallel:
            # Make sure all calculators are initialized the same.
            for calculator in self.calculators[1:]:
                assert self.inflow_conc == calculator.inflow_conc
                assert self.init_conc == calculator.init_conc
                assert self.component_names == calculator.component_names

    def modify(self, new_conc):
        """Pass new conc after advection to the calculator.
        """
        self.conc = {}
        for name in self.component_names:
            self.conc[name] = []
        for cell_range, calculator in zip(self.cell_ranges, self.calculators):
            current_conc = dict([(name, value[cell_range[0]:cell_range[1]]) for
                                  name, value in new_conc.items()])
            calculator.modify(current_conc)
        for calculator in self.calculators:
            conc = calculator.get_modified()
            for name in self.component_names:
                self.conc[name].extend(conc[name])

    def finish(self):
        """This is necessary for multiprocessing.

        Multiprocessing uses external processes. These need to be
        explicitly closed to avoid hanging of the program at
        the end.
        """
        for calculator in self.calculators:
            calculator.finish()


class PhreeqcCalculator(object):
    """All PHREEQC calculations happen here.

    This is the only place where we interact wit IPhreeqc.
    Each instance of this class might run in a different
    process using `multiprocessing`.
    """

    def __init__(self, ncells, initial_conditions):
        """
        ncells - number of cells
        initial_conditions - string containing PHREEQC input for
                             solution and exchange, see example below
        """
        self.ncells = ncells
        self.initial_conditions = initial_conditions
        self.inflow_conc = {}
        self.init_conc = {}
        self.conc = {}
        self.phreeqc = phreeqc_mod.IPhreeqc()
        self.phreeqc.load_database(r"phreeqc.dat")
        self.components = []
        self.component_names = []
        self._make_initial_state()

    def _make_initial_state(self):
        """Copy solution to all cells and calculate initial conditions.
        """
        self.phreeqc.run_string(self.initial_conditions)
        self.components = self.phreeqc.get_component_list()
        start = 1
        end = self.ncells
        code = ''
        code += "COPY solution 1 %d-%d\n" % (start, end)
        code += "COPY exchange 1 %d-%d\n" % (start, end)
        code += "END\n"
        code += "RUN_CELLS; -cells %d-%d\n" % (start, end)
        code += self.make_selected_output(self.components)
        self.phreeqc.run_string(code)
        self.conc = self.get_selected_output()
        all_names = self.conc.keys()
        self.component_names = [name for name in all_names if name not in
                                ('cb', 'H', 'O')]
        code = ''
        code += self.make_selected_output(self.components)
        code += "RUN_CELLS; -cells 0-1\n"
        self.phreeqc.run_string(code)
        start_conc = self.get_selected_output()
        for name in self.component_names:
            self.inflow_conc[name] = start_conc[name][0]
            self.init_conc[name] = start_conc[name][1]

    def modify(self, new_conc):
        """Set new concentration after advection and re-calculate.
        """
        conc = self.conc
        end = self.ncells + 1
        conc.update(new_conc)
        modify = []
        for index, cell in enumerate(range(1, end)):
            modify.append("SOLUTION_MODIFY %d" % cell)
            modify.append("\t-cb      %e" % conc['cb'][index])
            modify.append("\t-total_h %s" % conc['H'][index])
            modify.append("\t-total_o %s" % conc['O'][index])
            modify.append("\t-totals")
            for name in self.component_names:
                modify.append("\t\t%s\t%s" % (name, conc[name][index]))
        modify.append("RUN_CELLS; -cells %d-%d\n" % (1, self.ncells))
        code = '\n'.join(modify)
        self.phreeqc.run_string(code)
        self.conc = self.get_selected_output()

    def get_modified(self):
        """Return calculated conc.
        """
        return self.conc

    @ staticmethod # this is just a function but belongs here
    def make_selected_output(components):
        """
        Build SELECTED_OUTPUT data block.
        """
        headings = "-headings    cb    H    O    "
        headings += '\t'.join(components)
        selected_output = """
        SELECTED_OUTPUT
            -reset false
        USER_PUNCH
        """
        selected_output += headings + "\n"
        # charge balance, H, and O
        code = '10 w = TOT("water")\n'
        code += '20 PUNCH CHARGE_BALANCE, TOTMOLE("H"), TOTMOLE("O")\n'
        # All other elements
        lino = 30
        for component in components:
            code += '%d PUNCH w*TOT(\"%s\")\n' % (lino, component)
            lino += 10
        selected_output += code
        return selected_output

    def get_selected_output(self):
        """Return calculation result as dict.

        Header entries are the keys and the columns
        are the values as lists of numbers.
        """
        output = self.phreeqc.get_selected_output_array()
        header = output[0]
        conc = {}
        for head in header:
            conc[head] = []
        for row in output[1:]:
            for col, head in enumerate(header):
                conc[head].append(row[col])
        return conc

    def finish(self):
        """Placeholder to give same interface as the multiprocessing version.
        """
        pass


class PhreeqcCalculatorProxy(object):
    """Proxy that communicates with other processes.

    We uses this proxy for parallel computations.
    All code that is specific for parallel computing is located
    in here.
    """

    def __init__(self, ncells, initial_conditions):
        """Go parallel.
        """
        self.in_queue = multiprocessing.JoinableQueue()
        self.out_queue = multiprocessing.JoinableQueue()
        self.process = multiprocessing.Process(
            target=process_worker,
            args=(ncells, initial_conditions, self.in_queue, self.out_queue))
        self.process.start()
        (self.inflow_conc,
         self.init_conc,
         self.component_names) = self.out_queue.get()

    def modify(self, new_conc):
        """Run PHREEQC in another process.
        """
        self.in_queue.put(new_conc)

    def get_modified(self):
        """Return calculated conc.
        """
        return self.out_queue.get()

    def finish(self):
        """Terminate the process.
        """
        self.in_queue.put(None)
        self.process.join()


def process_worker(ncells, initial_conditions, in_queue, out_queue):
    """This runs in another process.
    """
    print('Started process with ID', os.getpid())
    calculator = PhreeqcCalculator(ncells, initial_conditions)
    out_queue.put((calculator.inflow_conc, calculator.init_conc,
                  calculator.component_names))
    while True:
        new_conc = in_queue.get()
        # None is the sentinel. We are done
        if new_conc is None:
            break
        calculator.modify(new_conc)
        out_queue.put(calculator.conc)


def plot(ncells, outflow, specie_names):
    """Plot the results.
    """
    colors = {'Ca': 'r', 'Cl': 'b', 'K': 'g', 'N': 'y', 'Na': 'm'}
    x = [i / float(ncells) for i in
         range(1, len(outflow[specie_names[0]]) + 1)]
    args = []
    for name in specie_names:
        args.extend([x, outflow[name], colors[name]])
    plt.plot(*args)
    plt.legend(specie_names, loc=(0.8, 0.5))
    plt.ylabel('MILLIMOLES PER KILOGRAM WATER')
    plt.xlabel('PORE VOLUME')
    plt.savefig("ex11.png")
    plt.show()


def measure_time(func, *args, **kwargs):
    """Convenience function to measure run times.
    """
    start = timeit.default_timer()
    result = func(*args, **kwargs)
    return result, timeit.default_timer() - start


def main(ncells, nshifts, processes=2, show_plot=False):
    """
    Specify initial conditions data blocks.

    Uniform initial conditions are assumed.
    """
    initial_conditions = """
    TITLE Example 11.--Transport and ion exchange.
    SOLUTION 0  CaCl2
        units            mmol/kgw
        temp             25.0
        pH               7.0     charge
        pe               12.5    O2(g)   -0.68
        Ca               0.6
        Cl               1.2
    SOLUTION 1  Initial solution for column
        units            mmol/kgw
        temp             25.0
        pH               7.0     charge
        pe               12.5    O2(g)   -0.68
        Na               1.0
        K                0.2
        N(5)             1.2
        END
    EXCHANGE 1
        equilibrate 1
        X                0.0011
    END
        """

    def run():
        """Do the work.
        """
        model = CoupledModel(ncells, nshifts, initial_conditions,
                             processes)
        model.run()
        return model, model.results

    (model, outflow), run_time = measure_time(run)
    print('Statistics')
    print('==========')
    print('number of cells:    ', ncells)
    print('number of shifts:   ', nshifts)
    print('number of processes:', processes)
    print('run_time:           ', run_time)
    print()
    if show_plot:
        plot(ncells, outflow, model.component_names)


def benchmark(ncells=400, nshifts=1200, process_range=range(1,9)):
    """Benchmark for diffrent numbers of processes"""
    for processes in process_range:
        main(ncells=ncells, nshifts=nshifts, processes=processes)


def plot_result(ncells=400, nshifts=1200, processes=4):
    """Also show the plot"""
    main(ncells=ncells, nshifts=nshifts, processes=processes, show_plot=True)


if __name__ == '__main__':
    benchmark()
    plot_result()