Advanced topics¶
You can extend Ocellaris in more ways than just adding user code for customizing the existing fluid solvers. You can for instance add brand new solvers while still taking advantage of the Ocellaris framework for input and output handling, post-processing and more.
Contents
Writing a custom solver¶
The architecture of Ocellaris is quite customizable, so if you want to add a custom equation solver, multiphase model, convection scheme, slope limiter or more it is possible to do so without touching the Ocellaris code, but by crafting a special input file.
A minimal custom solver must implement the following:
import dolfin
from ocellaris.solvers import Solver, register_solver
@register_solver('MyCustomSolver')
class ACustomSolverClass(Solver):
description = "This is my custom solver!"
@classmethod
def create_function_spaces(cls, simulation):
mesh = simulation.data['mesh']
simulation.data['Vmyfunc'] = dolfin.FunctionSpace(mesh, family, degree)
def __init__(self, simulation):
self.simulation = simulation
def run(self):
sim = self.simulation
sim.hooks.simulation_started()
dt = 0.1
t = 0
for i in range(1, 10):
t += dt
sim.hooks.new_timestep(timestep_number=i, t=t, dt=dt)
# .... do something ...
sim.hooks.end_timestep()
sim.hooks.simulation_ended(success=True)
Ocellaris will first run create_function_spaces
and then a bit later the
__init__
method will run. A full example is given next, a custom solver
for solving a Poisson equation for an unknown function, phi, using the
discontinuous Galerkin symmetric interior penalty method. This file can be
found in custom_solver.py
under the demos
folder in the Ocellaris
source code.
# Copyright (C) 2017-2019 Tormod Landet
# SPDX-License-Identifier: Apache-2.0
import dolfin
from dolfin import dot, grad, dx, dS, jump, avg
from ocellaris.solvers import Solver, register_solver
from ocellaris.solver_parts import define_penalty
from ocellaris.utils import linear_solver_from_input
@register_solver('PoissonDG')
class PoissonDGSolver(Solver):
description = "Poisson equation solver using DG elements"
@classmethod
def create_function_spaces(cls, simulation):
family = simulation.input.get_value('solver/function_space', 'DG', 'string')
degree = simulation.input.get_value('solver/polynomial_degree', 1, 'int')
mesh = simulation.data['mesh']
simulation.data['Vphi'] = dolfin.FunctionSpace(mesh, family, degree)
def __init__(self, simulation):
"""
A discontinuous Galerkin Poisson solver for use in
the Ocellaris solution framework. Solves -∇⋅∇φ = f
by use of the Symmetric Interior Penalty method
"""
self.simulation = simulation
self.setup_scalar_equation()
def setup_scalar_equation(self):
sim = self.simulation
V = sim.data['Vphi']
mesh = V.mesh()
P = V.ufl_element().degree()
# Source term
source_cpp = sim.input.get_value('solver/source', '0', 'string')
f = dolfin.Expression(source_cpp, degree=P)
# Create the solution function
sim.data['phi'] = dolfin.Function(V)
# DG elliptic penalty
penalty = define_penalty(mesh, P, k_min=1.0, k_max=1.0)
penalty_dS = dolfin.Constant(penalty)
penalty_ds = dolfin.Constant(penalty * 2)
yh = dolfin.Constant(1 / (penalty * 2))
# Define weak form
u, v = dolfin.TrialFunction(V), dolfin.TestFunction(V)
a = dot(grad(u), grad(v)) * dx
L = f * v * dx
# Symmetric Interior Penalty method for -∇⋅∇φ
n = dolfin.FacetNormal(mesh)
a -= dot(n('+'), avg(grad(u))) * jump(v) * dS
a -= dot(n('+'), avg(grad(v))) * jump(u) * dS
# Symmetric Interior Penalty coercivity term
a += penalty_dS * jump(u) * jump(v) * dS
# Dirichlet boundary conditions
# Nitsche's (1971) method, see e.g. Epshteyn and Rivière (2007)
dirichlet_bcs = sim.data['dirichlet_bcs'].get('phi', [])
for dbc in dirichlet_bcs:
bcval, dds = dbc.func(), dbc.ds()
# SIPG for -∇⋅∇φ
a -= dot(n, grad(u)) * v * dds
a -= dot(n, grad(v)) * u * dds
L -= dot(n, grad(v)) * bcval * dds
# Weak Dirichlet
a += penalty_ds * u * v * dds
L += penalty_ds * bcval * v * dds
# Neumann boundary conditions
neumann_bcs = sim.data['neumann_bcs'].get('phi', [])
for nbc in neumann_bcs:
L += nbc.func() * v * nbc.ds()
# Robin boundary conditions
# See Juntunen and Stenberg (2009)
# n⋅∇φ = (φ0 - φ)/b + g
robin_bcs = sim.data['robin_bcs'].get('phi', [])
for rbc in robin_bcs:
b, rds = rbc.blend(), rbc.ds()
dval, nval = rbc.dfunc(), rbc.nfunc()
# From IBP of the main equation
a -= dot(n, grad(u)) * v * rds
# Test functions for the Robin BC
z1 = 1 / (b + yh) * v
z2 = -yh / (b + yh) * dot(n, grad(v))
# Robin BC added twice with different test functions
for z in [z1, z2]:
a += b * dot(n, grad(u)) * z * rds
a += u * z * rds
L += dval * z * rds
L += b * nval * z * rds
# Does the system have a null-space?
self.has_null_space = len(dirichlet_bcs) + len(robin_bcs) == 0
self.form_lhs = a
self.form_rhs = L
def run(self):
sim = self.simulation
sim.hooks.simulation_started()
sim.hooks.new_timestep(timestep_number=1, t=1.0, dt=1.0)
# Assemble system
A = dolfin.assemble(self.form_lhs)
b = dolfin.assemble(self.form_rhs)
# Create Krylov solver
solver = linear_solver_from_input(sim, 'solver/phi', 'cg')
solver.set_operator(A)
# The function where the solution is stored
phi = self.simulation.data['phi']
# Remove null space if present (pure Neumann BCs)
if self.has_null_space:
null_vec = dolfin.Vector(phi.vector())
phi.function_space().dofmap().set(null_vec, 1.0)
null_vec *= 1.0 / null_vec.norm("l2")
null_space = dolfin.VectorSpaceBasis([null_vec])
dolfin.as_backend_type(A).set_nullspace(null_space)
null_space.orthogonalize(b)
# Solve the linear system using the default solver (direct LU solver)
solver.solve(phi.vector(), b)
sim.hooks.end_timestep()
sim.hooks.simulation_ended(success=True)
The input file that goes along with this solver makes sure that the solver
Python file is loaded and the sets up the boundary conditions for the
phi
field.
ocellaris:
type: input
version: 1.0
user_code:
modules:
- custom_solver
mesh:
type: Rectangle
Nx: 10
Ny: 10
solver:
type: PoissonDG
phi:
solver: cg
preconditioner: hypre_amg
parameters:
relative_tolerance: 1.0e-10
absolute_tolerance: 1.0e-15
boundary_conditions:
- name: all walls
selector: code
inside_code: on_boundary
phi:
type: ConstantValue
value: 1.0
output:
prefix: output/custom_solver
dolfin_log_level: warning
ocellaris_log_level: info
log_enabled: yes
solution_properties: off
xdmf_write_interval: 0
Writing a custom boundary condition¶
Here the procedure to create a custom Dirichlet boundary condition is sketched. Creating custom Neuman or Robin BCs (Boundary Conditions) follows roughly the same setup, look at the implementation of those (or FreeSlip etc) in the source code for pointers to what steps must be taken.
The Ocellaris input file must load the custom BC Python file to activate it, and then the input file can reference the custom BC type later on:
ocellaris:
type: input
version: 1.0
user_code:
python_path:
- /optional.path.to.my.modules/
modules:
- path.to.custom_bc_module
# ...
boundary_conditions:
- name: all walls
selector: code
inside_code: on_boundary
u0:
type: MyCustomBC
The custom_bc_module.py
file defines MyCustomBC
. The code can be
something like this (untested, but the overall design should work):
import dolfin
from ocellaris.solver_parts.boundary_conditions.dirichlet import (
register_boundary_condition,
BoundaryConditionCreator,
OcellarisDirichletBC)
@register_boundary_condition('MyCustomBC')
class CustomDirichletBoundary(BoundaryConditionCreator):
description = 'A custom Dirichlet boundary condition'
def __init__(self, simulation, var_name, inp_dict, subdomains, subdomain_id):
"""
Dirichlet boundary condition that does X
"""
self.simulation = simulation
# This specific BC only works for the velocity component "u0"
# (this is not very realistic, but it simplifies this example)
assert var_name == 'u0'
V = simulation.data['Vu']
# Define a function that holds the BC value
bc_val = dolfin.Function(V)
bc = OcellarisDirichletBC(simulation, V, bc_val, subdomains, subdomain_id)
bcs = simulation.data['dirichlet_bcs']
bcs.setdefault(var_name, []).append(bc)
self.simulation.log.info(' Custom BC for %s' % var_name)
# Update this BC before each time step
self.bc_val_func = bc_val
simulation.hooks.add_pre_timestep_hook(self.update, 'CustomBC update')
def update(self, timestep_number, t, dt):
"""
This code runs before the Navier-Stoke solver each time step and
can change the BC value that is used in the assembly of the system
matrices. It must modify the function bc_val that was sent to the
OcellarisDirichletBC class above.
"""
some_code_to_update_this_bc(self.bc_val_func)
Writing a custom known field¶
Known fields are Python classes that provide a get_function
method. The
following example shows building and enabling a very simple field:
ocellaris:
type: input
version: 1.0
user_code:
python_path:
- /optional.path.to.my.modules/
modules:
- path.to.custom_field
fields:
- name: myfield
type: MyCustomField
myval: 42.0
# ...
# input can now refer to the 'myfield/gamma' function
The custom_field.py
file defines MyCustomField
which defines a
gamma
function. The code can be something like this:
import dolfin
from ocellaris.solver_parts.fields import register_known_field, KnownField
@register_known_field('MyCustomField')
class SimpleCustomField(KnownField):
description = 'Simple custom field'
def __init__(self, simulation, field_inp):
"""
A scalar field
"""
self.simulation = simulation
self.func = None
self.value = field_inp.get_value('myval', required_type='float')
def get_variable(self, name):
if name != 'gamma':
ocellaris_error(
'Custom field does not define %r' % name,
'This custom field defines only gamma',
)
if self.func is None:
V = self.simulation.data['Vu']
self.func = dolfin.Function(V)
arr = self.func.vector().get_local()
arr[:] = self.value
self.func.vector().set_local(arr)
self.func.vector().apply('insert')
return self.func
A custom wave field¶
A custom wave field can be made in the same way as the simple field above, but
some benefits can be had from using the BaseWaveField
base class which
handles partial filling of cells in VOF simulations among other things.
import dolfin
from ocellaris.utils import ocellaris_error
from ocellaris.solver_parts.fields import register_known_field
from ocellaris.solver_parts.fields.base_wave_field import BaseWaveField
@register_known_field('MyCustomWaves')
class CustomWaveField(BaseWaveField):
description = 'Custom wave field'
def __init__(self, simulation, field_inp):
"""
A custom wave field
"""
super().__init__(simulation, field_inp)
simulation.log.info('Creating a custom wave field %r' % self.name)
# Read input
# Optional: wave_height, h, and still_water_pos
# (can be provided if needed, for example, to adjust z-coordinate)
wave_height = field_inp.get_value('wave_height', required_type='float')
h = field_inp.get_value('depth', required_type='float')
still_water_pos = field_inp.get_value('still_water_position', required_type='float')
# self.stationary boolean must be provided to indicate if c++ code has to be updated every time step.
# See BaseWaveField for details.
self.stationary = True
# Project the colour function to DG0 (set degree to -1 to prevent this)
# The special projection in BaseWaveField handles partial filling of cells
# by use of quadrature elements in the C++ expression and a DG0 projection
self.colour_projection_degree = 6
self.colour_projection_form = None
# Define the C++ code (YOU MUST CHANGE THESE TO SOME VALID C++ EXPRESSIONS)
# Use x[0] for the horizontal coordinate and x[2] for the vertical coordinate,
# conversion to 2D is done below if the simulation is 2D
cpp_e = 'C++ code for the wave elevation;'
cpp_u = 'C++ code for the particle velocity in the horizontal direction;'
cpp_w = 'C++ code for the particle velocity in the vertical direction;'
# Store the C++ code
self._cpp['elevation'] = cpp_e
self._cpp['uhoriz'] = cpp_u
self._cpp['uvert'] = cpp_w
self._cpp['c'] = 'x[2] <= (%s) ? 1.0 : 0.0' % cpp_e
# Adjust the z-coordinate such that the bottom is at z=0
# (make changes if your code assumes that the free surface is at z=0)
zdiff = still_water_pos - self.h
for k, v in list(self._cpp.items()):
self._cpp[k] = v.replace('x[2]', '(x[2] - %r)' % zdiff)
# Adjust the C++ code z-coordinate if the simulation is 2D (then z -> y)
if self.simulation.ndim == 2:
for k, v in list(self._cpp.items()):
self._cpp[k] = v.replace('x[2]', 'x[1]')
Writing a custom multiphase model¶
Connecting a custom multi phase model to Ocellaris is done in the same way as connecting the main equation solver described above. How to register the model is shown in the following snippet
from ocellaris.solver_parts.multiphase import MultiPhaseModel, \
register_multi_phase_model
class MyCustomMultiPhaseModel(MultiPhaseModel):
description = 'A fantastic multiphase model!'
# See the SinglePhase model for an example of the required
# API interface needed to work inside Ocellaris
Writing other custom parts¶
Many more items are pluggable, some examples are
Convection schemes (used mostly in the VOF multi phase solver)
Known fields (for initial and boundary conditions, forcing zones and more)
Slope limiters (scalar fields)
Velocity slope limiters (solenoidal vector fields)
You will have to look in the source code for the required API to implement, but the method for connecting the custom code to Ocellaris is the same as what is described above for the main equation solver. The registering functions are:
from ocellaris.solver_parts.convection import register_convection_scheme
from ocellaris.solver_parts.fields import register_known_field
from ocellaris.solver_parts.slope_limiter import register_slope_limiter
from ocellaris.solver_parts.slope_limiter_velocity import register_velocity_slope_limiter
If you want to create a custom forcing zone or something else that is not currently pluggable, then please create a feature request on the issue tracker or submit a pull request with code following the same general framework as the pluggable pieces above.