Closed KarlisFre closed 11 months ago
There is currently no built-in way to do this currently. However, PhiFlow allows for custom Extrapolation
classes. I'll post a template for your case, soon.
The following class implements an extrapolation that switches between in_ext
and out_ext
depending on whether flow
points outwards or inwards at the boundaries.
from typing import Tuple
from phi.flow import *
from phiml.math import extrapolation, Tensor, shape
from phiml.math.extrapolation import Extrapolation
class InOutConditionalExtrapolation(Extrapolation):
def __init__(self, flow: Tensor, in_ext, out_ext):
in_ext = extrapolation.as_extrapolation(in_ext)
out_ext = extrapolation.as_extrapolation(out_ext)
super().__init__(in_ext.pad_rank)
assert 'vector' in shape(flow)
self.flow = flow
self.in_ext = in_ext
self.out_ext = out_ext
def to_dict(self) -> dict:
raise NotImplementedError
def spatial_gradient(self) -> 'Extrapolation':
return InOutConditionalExtrapolation(self.flow, self.in_ext.spatial_gradient(), self.out_ext.spatial_gradient())
def valid_outer_faces(self, dim) -> Tuple[bool, bool]:
true_lower, true_upper = self.in_ext.valid_outer_faces(dim)
false_lower, false_upper = self.out_ext.valid_outer_faces(dim)
return true_lower or false_lower, true_upper or false_upper
@property
def is_flexible(self) -> bool:
return self.in_ext.is_flexible or self.out_ext.is_flexible
def pad_values(self, value: Tensor, width: int, dim: str, upper_edge: bool, already_padded: dict = None, **kwargs) -> Tensor:
edge_normal = self.flow[{dim: -1 if upper_edge else 0, 'vector': dim}]
if already_padded:
edge_normal = ZERO_GRADIENT.pad(edge_normal, already_padded)
if not upper_edge:
edge_normal *= -1
is_out = edge_normal >= 0
res_in = self.in_ext.pad_values(value, width, dim, upper_edge, **kwargs)
res_out = self.out_ext.pad_values(value, width, dim, upper_edge, **kwargs)
return math.where(is_out, res_out, res_in)
def pad(self, value: Tensor, widths: dict, **kwargs) -> Tensor:
from phiml.math._trace import ShiftLinTracer
if isinstance(value, ShiftLinTracer):
out_mask = self._outflow_mask(widths)
padded_in = self.in_ext.pad(value, widths, **kwargs)
padded_out = self.out_ext.pad(value, widths, **kwargs)
return padded_out * out_mask + padded_in * (1 - out_mask)
return Extrapolation.pad(self, value, widths, **kwargs)
def _outflow_mask(self, widths: dict):
result = math.zeros(shape(self.flow).spatial)
already_padded = {}
for dim, width in widths.items():
values = []
if width[False] > 0:
edge_normal = self.flow[{dim: 0, 'vector': dim}]
edge_normal = ZERO_GRADIENT.pad(edge_normal, already_padded)
values.append(expand(edge_normal < 0, shape(self.flow)[dim].with_size(width[False])))
values.append(result)
if width[True] > 0:
edge_normal = self.flow[{dim: -1, 'vector': dim}]
edge_normal = ZERO_GRADIENT.pad(edge_normal, already_padded)
values.append(expand(edge_normal > 0, shape(self.flow)[dim].with_size(width[True])))
result = concat(values, dim)
already_padded[dim] = width
return result
def __abs__(self):
return InOutConditionalExtrapolation(self.flow, abs(self.in_ext), abs(self.out_ext))
def __neg__(self):
return InOutConditionalExtrapolation(self.flow, -self.in_ext, -self.out_ext)
def __add__(self, other):
return InOutConditionalExtrapolation(self.flow, self.in_ext + other, self.out_ext + other)
def __radd__(self, other):
return InOutConditionalExtrapolation(self.flow, other + self.in_ext, other + self.out_ext)
def __sub__(self, other):
return InOutConditionalExtrapolation(self.flow, self.in_ext - other, self.out_ext - other)
def __rsub__(self, other):
return InOutConditionalExtrapolation(self.flow, other - self.in_ext, other - self.out_ext)
def __mul__(self, other):
return InOutConditionalExtrapolation(self.flow, self.in_ext * other, self.out_ext * other)
def __rmul__(self, other):
return InOutConditionalExtrapolation(self.flow, other * self.in_ext, other * self.out_ext)
def __truediv__(self, other):
return InOutConditionalExtrapolation(self.flow, self.in_ext / other, self.out_ext / other)
def __rtruediv__(self, other):
return InOutConditionalExtrapolation(self.flow, other / self.in_ext, other / self.out_ext)
Here is a simple test script:
v = CenteredGrid(Noise(vector='x,y'), x=32, y=32)
scalar = CenteredGrid(2, InOutConditionalExtrapolation(v.values, 0, 1), x=32, y=32)
padded = field.pad(scalar, 10)
show(v, padded, overlay='args')
Hi, how can I specify or implement a freestream boundary condition? This boundary condition has two different possibilities. When the flow at the boundary is leaving the domain, a zero gradient condition is applied. Otherwise, a fixed velocity is assigned.