# Examples for PhreeqPy¶

## Simple example¶

This is an example for the use PhreeqPy for one-dimensional advection. It is the example 11 from the PHREEQC users manual.

```"""Advection with DLL or COM server.

Using MODFIY we update the concentration on every
time step. We shift by one cell per step.
"""

from __future__ import print_function

import sys

# Simple Python 3 compatibility adjustment.
if sys.version_info == 2:
range = xrange

import os
import timeit

MODE = 'dll'  # 'dll' or 'com'

if MODE == 'com':
import phreeqpy.iphreeqc.phreeqc_com as phreeqc_mod
elif MODE == 'dll':
import phreeqpy.iphreeqc.phreeqc_dll as phreeqc_mod
else:
raise Exception('Mode "%s" is not defined use "com" or "dll".' % MODE)

def make_initial_conditions():
"""
Specify initial conditions data blocks.

Uniform initial conditions are assumed.
"""
initial_conditions = """
TITLE Example 11.--Transport and ion exchange.
SOLUTION 0  CaCl2
units            mmol/kgw
temp             25.0
pH               7.0     charge
pe               12.5    O2(g)   -0.68
Ca               0.6
Cl               1.2
SOLUTION 1  Initial solution for column
units            mmol/kgw
temp             25.0
pH               7.0     charge
pe               12.5    O2(g)   -0.68
Na               1.0
K                0.2
N(5)             1.2
END
EXCHANGE 1
equilibrate 1
X                0.0011
END
"""
return initial_conditions

def make_selected_output(components):
"""
Build SELECTED_OUTPUT data block
"""
for i in range(len(components)):
selected_output = """
SELECTED_OUTPUT
-reset false
USER_PUNCH
"""
#
# charge balance, H, and O
#
code = '10 w = TOT("water")\n'
code += '20 PUNCH CHARGE_BALANCE, TOTMOLE("H"), TOTMOLE("O")\n'
#
# All other elements
#
lino = 30
for component in components:
code += '%d PUNCH w*TOT(\"%s\")\n' % (lino, component)
lino += 10
selected_output += code
return selected_output

def initialize(cells, first=False):
"""
Initialize IPhreeqc module
"""
phreeqc = phreeqc_mod.IPhreeqc()
initial_conditions = make_initial_conditions()
phreeqc.run_string(initial_conditions)
components = phreeqc.get_component_list()
selected_output = make_selected_output(components)
phreeqc.run_string(selected_output)
phc_string = "RUN_CELLS; -cells 0-1\n"
phreeqc.run_string(phc_string)
conc = get_selected_output(phreeqc)
inflow = {}
initial = {}
for name in conc:
if first:
inflow[name] = conc[name]
else:
inflow[name] = conc[name]
initial[name] = conc[name]
task += "COPY solution 1 %d-%d\n" % (cells, cells)
task += "COPY exchange 1 %d-%d\n" % (cells, cells)
task += "RUN_CELLS; -cells %d-%d\n" % (cells, cells)
conc = get_selected_output(phreeqc)
for name in conc:
value = [initial[name]] * len(conc[name])
conc[name] = value
return phreeqc, inflow, conc

"""Advect by shifting concentrations from previous time step.
"""
all_names = conc.keys()
names = [name for name in all_names if name not in ('cb', 'H', 'O')]
for name in conc:
# shift one cell
conc[name][1:] = conc[name][:-1]
conc[name] = inflow[name]
modify = []
for index, cell in enumerate(range(cells, cells + 1)):
modify.append("SOLUTION_MODIFY %d" % cell)
modify.append("\t-cb      %e" % conc['cb'][index])
modify.append("\t-total_h %f" % conc['H'][index])
modify.append("\t-total_o %f" % conc['O'][index])
modify.append("\t-totals")
for name in names:
modify.append("\t\t%s\t%f" % (name, conc[name][index]))
modify.append("RUN_CELLS; -cells %d-%d\n" % (cells, cells))
cmd = '\n'.join(modify)
phreeqc.run_string(cmd)
conc = get_selected_output(phreeqc)
return conc

def get_selected_output(phreeqc):
"""Return calculation result as dict.

Header entries are the keys and the columns
are the values as lists of numbers.
"""
output = phreeqc.get_selected_output_array()
conc = {}
for row in output[1:]:
return conc

def run(ncells, shifts, specie_names):
"""Do one run in one process.
"""
cells = (1, ncells)
phreeqc, inflow, conc = initialize(cells, first=True)
outflow = {}
for name in specie_names:
outflow[name] = []
for _counter in range(shifts):
conc = advect_step(phreeqc, inflow, conc, cells)
for name in specie_names:
outflow[name].append(conc[name][-1])
return outflow

def write_outflow(file_name, outflow):
"""Write the outflow values to a file.
"""
fobj = open(file_name, 'w')
fobj.write('\n')
fobj.write('\n')

def main(ncells, shifts):
"""Run different versions with and without multiprocessing
"""

def measure_time(func, *args, **kwargs):
"""Convinience function to measure run times.
"""
start = timeit.default_timer()
result = func(*args, **kwargs)
return result, timeit.default_timer() - start

print('Dimensions')
print('==========')
print('number of cells:   ', ncells)
print('number of shifts   ', shifts)
specie_names = ('Ca', 'Cl', 'K', 'N', 'Na')
outflow, run_time = measure_time(run, ncells, shifts, specie_names)
if not os.path.exists('data'):
os.mkdir('data')
write_outflow('data/out.txt', outflow)
print('run time:', run_time)
print("Finished simulation\n")

if __name__ == '__main__':
main(ncells=40, shifts=120)
```

## Parallel Example¶

This is an example for the use PhreeqPy for one-dimensional advection running in parallel, using `multiprocessing`.

```# -*- coding: utf-8 -*-

"""A coupled advection-reaction model using PhreeqPy.

The sketch below shows the setup. The coupled model contains a advection and a
reaction model. The reaction model can have one or more Phreeqc calculators.
The calculators can work in parallel using the module `multiprocessing`.

+--------------------------------------------------------------------------+
|                            CoupledModel                                  |
|  +--------------------------------------------------------------------+  |
|  |                                                                    |  |
|  +--------------------------------------------------------------------+  |
|  |                         ReactionModel                              |  |
|  |  +--------------------+--------------------+--------------------+  |  |
|  |  | PhreeqcCalculator1 | PhreeqcCalculator2 | PhreeqcCalculator3 |  |  |
+--------------------------------------------------------------------------+

Author: Mike Müller, mmueller@hydrocomputing.com
"""

import multiprocessing
import os
import timeit

import matplotlib.pyplot as plt

MODE = 'dll'  # 'dll' or 'com'

if MODE == 'com':
import phreeqpy.iphreeqc.phreeqc_com as phreeqc_mod
elif MODE == 'dll':
import phreeqpy.iphreeqc.phreeqc_dll as phreeqc_mod
else:
raise Exception('Mode "%s" is not defined use "com" or "dll".' % MODE)

class CoupledModel(object):
"""This is a coupled advection model.

Since it is just a simple example, we use a 1D model.
The same approach can be applied to a 2d or 3D model
as long as the advection model supports it.
The PHREEQC part is the same.
we can have a more sophisticated transport model.
"""

def __init__(self, ncells, nshifts, initial_conditions, processes):
self.nshifts = nshifts
self.reaction_model = ReactionModel(ncells, initial_conditions,
processes)
self.reaction_model.make_initial_state()
init_conc = dict([(name, [value] * ncells) for name, value in
self.reaction_model.init_conc.items()])
self.reaction_model.inflow_conc)
self.component_names = self.reaction_model.component_names
self.results = {}
for name in self.component_names:
self.results[name] = []

def run(self):
"""Go over all time steps (shifts).
"""
for _ in range(self.nshifts):
self.reaction_model.finish()

This model can be replaced by a more sophisticated transport
model with two or three dimensions.
This could be an external code such as MT3D or anything
else that is moving concentrations through the subsurface.
"""

def __init__(self, init_conc, inflow_conc):
"""Set the initial and inflow concentrations.

Both concentrations are dictionaries with the specie names as keys.
Values are 1D arrays for `init_conc` and scalars for `inflow_conc`.
"""
self.conc = init_conc
self.inflow_conc = inflow_conc
self.outflow = {}

def update_conc(self, new_conc):
"""Update the concentrations after the reactions step.

This is very simple but could be more involved
if the transport model is more complex.
"""
self.conc = new_conc

"""Shift one cell.
"""
for name in self.conc:
self.outflow[name] = self.conc[name][-1]
self.conc[name][1:] = self.conc[name][:-1]
self.conc[name] = self.inflow_conc[name]

def save_results(self, results):
"""Save the calculation results.

Typically, we would write our results into a file.
For simplicity we just add the current outflow that
we stored in `self.outflow` and add it to `results`,
which is a dictionary with all specie names as keys
and lists as values.
"""
for name in self.conc:
results[name].append(self.outflow[name])

class ReactionModel(object):
"""Calculate reactions using PHREEQC as computational engine.

We have no direct contact with IPhreeqc here.
We make one or more instances of `PhreeqcCalculator`
that are actually using IPhreeqc.
We can use more than one processor with `multiprocessing`.
"""

def __init__(self, ncells, initial_conditions, processes):
if processes > ncells:
raise ValueError('Number of processes needs to be less or equal '
'than number of cells. %d processes %d cells.'
% (processes, ncells))
if processes < 1:
raise ValueError('Need at least one process got %d' % processes)
self.parallel = False
if processes > 1:
self.parallel = True
self.ncells = ncells
self.initial_conditions = initial_conditions
self.processes = processes
self.inflow_conc = {}
self.init_conc = {}
self.conc = {}
self.component_names = []
self.calculators = []
self.cell_ranges = []
self._init_calculators()
self.make_initial_state()

def _init_calculators(self):
"""If we are going parallel we need several calculators.
"""
if self.parallel:
# Domain decomposition.
slave_ncells, reminder = divmod(self.ncells, self.processes)
root_ncells = slave_ncells + reminder
current_cell = root_ncells
root_calculator = PhreeqcCalculator(root_ncells,
self.initial_conditions)
self.calculators = [root_calculator]
self.cell_ranges = [(0, root_ncells)]
for _ in range(self.processes - 1):
self.calculators.append(PhreeqcCalculatorProxy(slave_ncells,
self.initial_conditions))
self.cell_ranges.append((current_cell,
current_cell + slave_ncells))
current_cell += slave_ncells
assert current_cell == self.ncells
self.calculators.reverse()
self.cell_ranges.reverse()
else:
root_calculator = PhreeqcCalculator(self.ncells,
self.initial_conditions)
# Just one calculator and the entire range but still use a list
# to provide the same interface as the parallel case.
self.calculators = [root_calculator]
self.cell_ranges = [(0, self.ncells)]

def make_initial_state(self):
"""Get the initial values from the calculator(s).
"""
self.inflow_conc = self.calculators.inflow_conc
self.init_conc = self.calculators.init_conc
self.component_names = self.calculators.component_names
if self.parallel:
# Make sure all calculators are initialized the same.
for calculator in self.calculators[1:]:
assert self.inflow_conc == calculator.inflow_conc
assert self.init_conc == calculator.init_conc
assert self.component_names == calculator.component_names

def modify(self, new_conc):
"""Pass new conc after advection to the calculator.
"""
self.conc = {}
for name in self.component_names:
self.conc[name] = []
for cell_range, calculator in zip(self.cell_ranges, self.calculators):
current_conc = dict([(name, value[cell_range:cell_range]) for
name, value in new_conc.items()])
calculator.modify(current_conc)
for calculator in self.calculators:
conc = calculator.get_modified()
for name in self.component_names:
self.conc[name].extend(conc[name])

def finish(self):
"""This is necessary for multiprocessing.

Multiprocessing uses external processes. These need to be
explicitly closed to avoid hanging of the program at
the end.
"""
for calculator in self.calculators:
calculator.finish()

class PhreeqcCalculator(object):
"""All PHREEQC calculations happen here.

This is the only place where we interact wit IPhreeqc.
Each instance of this class might run in a different
process using `multiprocessing`.
"""

def __init__(self, ncells, initial_conditions):
"""
ncells - number of cells
initial_conditions - string containing PHREEQC input for
solution and exchange, see example below
"""
self.ncells = ncells
self.initial_conditions = initial_conditions
self.inflow_conc = {}
self.init_conc = {}
self.conc = {}
self.phreeqc = phreeqc_mod.IPhreeqc()
self.components = []
self.component_names = []
self._make_initial_state()

def _make_initial_state(self):
"""Copy solution to all cells and calculate initial conditions.
"""
self.phreeqc.run_string(self.initial_conditions)
self.components = self.phreeqc.get_component_list()
start = 1
end = self.ncells
code = ''
code += "COPY solution 1 %d-%d\n" % (start, end)
code += "COPY exchange 1 %d-%d\n" % (start, end)
code += "END\n"
code += "RUN_CELLS; -cells %d-%d\n" % (start, end)
code += self.make_selected_output(self.components)
self.phreeqc.run_string(code)
self.conc = self.get_selected_output()
all_names = self.conc.keys()
self.component_names = [name for name in all_names if name not in
('cb', 'H', 'O')]
code = ''
code += self.make_selected_output(self.components)
code += "RUN_CELLS; -cells 0-1\n"
self.phreeqc.run_string(code)
start_conc = self.get_selected_output()
for name in self.component_names:
self.inflow_conc[name] = start_conc[name]
self.init_conc[name] = start_conc[name]

def modify(self, new_conc):
"""Set new concentration after advection and re-calculate.
"""
conc = self.conc
end = self.ncells + 1
conc.update(new_conc)
modify = []
for index, cell in enumerate(range(1, end)):
modify.append("SOLUTION_MODIFY %d" % cell)
modify.append("\t-cb      %e" % conc['cb'][index])
modify.append("\t-total_h %s" % conc['H'][index])
modify.append("\t-total_o %s" % conc['O'][index])
modify.append("\t-totals")
for name in self.component_names:
modify.append("\t\t%s\t%s" % (name, conc[name][index]))
modify.append("RUN_CELLS; -cells %d-%d\n" % (1, self.ncells))
code = '\n'.join(modify)
self.phreeqc.run_string(code)
self.conc = self.get_selected_output()

def get_modified(self):
"""Return calculated conc.
"""
return self.conc

@ staticmethod # this is just a function but belongs here
def make_selected_output(components):
"""
Build SELECTED_OUTPUT data block.
"""
selected_output = """
SELECTED_OUTPUT
-reset false
USER_PUNCH
"""
# charge balance, H, and O
code = '10 w = TOT("water")\n'
code += '20 PUNCH CHARGE_BALANCE, TOTMOLE("H"), TOTMOLE("O")\n'
# All other elements
lino = 30
for component in components:
code += '%d PUNCH w*TOT(\"%s\")\n' % (lino, component)
lino += 10
selected_output += code
return selected_output

def get_selected_output(self):
"""Return calculation result as dict.

Header entries are the keys and the columns
are the values as lists of numbers.
"""
output = self.phreeqc.get_selected_output_array()
conc = {}
for row in output[1:]:
return conc

def finish(self):
"""Placeholder to give same interface as the multiprocessing version.
"""
pass

class PhreeqcCalculatorProxy(object):
"""Proxy that communicates with other processes.

We uses this proxy for parallel computations.
All code that is specific for parallel computing is located
in here.
"""

def __init__(self, ncells, initial_conditions):
"""Go parallel.
"""
self.in_queue = multiprocessing.JoinableQueue()
self.out_queue = multiprocessing.JoinableQueue()
self.process = multiprocessing.Process(
target=process_worker,
args=(ncells, initial_conditions, self.in_queue, self.out_queue))
self.process.start()
(self.inflow_conc,
self.init_conc,
self.component_names) = self.out_queue.get()

def modify(self, new_conc):
"""Run PHREEQC in another process.
"""
self.in_queue.put(new_conc)

def get_modified(self):
"""Return calculated conc.
"""
return self.out_queue.get()

def finish(self):
"""Terminate the process.
"""
self.in_queue.put(None)
self.process.join()

def process_worker(ncells, initial_conditions, in_queue, out_queue):
"""This runs in another process.
"""
print('Started process with ID', os.getpid())
calculator = PhreeqcCalculator(ncells, initial_conditions)
out_queue.put((calculator.inflow_conc, calculator.init_conc,
calculator.component_names))
while True:
new_conc = in_queue.get()
# None is the sentinel. We are done
if new_conc is None:
break
calculator.modify(new_conc)
out_queue.put(calculator.conc)

def plot(ncells, outflow, specie_names):
"""Plot the results.
"""
colors = {'Ca': 'r', 'Cl': 'b', 'K': 'g', 'N': 'y', 'Na': 'm'}
x = [i / float(ncells) for i in
range(1, len(outflow[specie_names]) + 1)]
args = []
for name in specie_names:
args.extend([x, outflow[name], colors[name]])
plt.plot(*args)
plt.legend(specie_names, loc=(0.8, 0.5))
plt.ylabel('MILLIMOLES PER KILOGRAM WATER')
plt.xlabel('PORE VOLUME')
plt.savefig("ex11.png")
plt.show()

def measure_time(func, *args, **kwargs):
"""Convenience function to measure run times.
"""
start = timeit.default_timer()
result = func(*args, **kwargs)
return result, timeit.default_timer() - start

def main(ncells, nshifts, processes=2, show_plot=False):
"""
Specify initial conditions data blocks.

Uniform initial conditions are assumed.
"""
initial_conditions = """
TITLE Example 11.--Transport and ion exchange.
SOLUTION 0  CaCl2
units            mmol/kgw
temp             25.0
pH               7.0     charge
pe               12.5    O2(g)   -0.68
Ca               0.6
Cl               1.2
SOLUTION 1  Initial solution for column
units            mmol/kgw
temp             25.0
pH               7.0     charge
pe               12.5    O2(g)   -0.68
Na               1.0
K                0.2
N(5)             1.2
END
EXCHANGE 1
equilibrate 1
X                0.0011
END
"""

def run():
"""Do the work.
"""
model = CoupledModel(ncells, nshifts, initial_conditions,
processes)
model.run()
return model, model.results

(model, outflow), run_time = measure_time(run)
print('Statistics')
print('==========')
print('number of cells:    ', ncells)
print('number of shifts:   ', nshifts)
print('number of processes:', processes)
print('run_time:           ', run_time)
print()
if show_plot:
plot(ncells, outflow, model.component_names)

def benchmark(ncells=400, nshifts=1200, process_range=range(1,9)):
"""Benchmark for diffrent numbers of processes"""
for processes in process_range:
main(ncells=ncells, nshifts=nshifts, processes=processes)

def plot_result(ncells=400, nshifts=1200, processes=4):
"""Also show the plot"""
main(ncells=ncells, nshifts=nshifts, processes=processes, show_plot=True)

if __name__ == '__main__':
benchmark()
plot_result()
```