diff --git a/examples/WIP/build_decomposition.py b/examples/WIP/build_decomposition.py new file mode 100644 index 00000000..ea6837c4 --- /dev/null +++ b/examples/WIP/build_decomposition.py @@ -0,0 +1,62 @@ +import torch +import torch.distributed as dist +import torchdd as dd +import lettuce as lt +import numpy as np + +res = 4 +dist.init_process_group(backend="mpi", rank=-1, world_size=-1) +dtype = torch.float64 +domain = dd.BoxDomain( + lower=torch.zeros(3), + upper=2*np.pi*torch.ones(3), + resolution=torch.Size([res, res, res]), + n_ghost=[[0, 1], [0, 0], [0, 0]], + mpi_rank=0, + device="cpu", + dtype=dtype, + endpoint=False) + +lattice_cpu = lt.Lattice(lt.D3Q27, device="cpu", dtype=dtype) +lattice_gpu = lt.Lattice(lt.D3Q27, device="cpu", dtype=dtype) +flow = lt.TaylorGreenVortex3D(lattice=lattice_cpu, + domain=domain, + mach_number=0.1, + reynolds_number=400, + compute_f=True) + +decom = dd.DomainDecomposition(domain=domain, + flow=flow, + dims=[2, 1, 1], + mpi=False) + +# print(domain.grid(with_ghost=True)[0]) +# print(domain.grid(with_ghost=True)[1]) +# print() +# domains, flows = decom.split_domain(split_flow=True) +domains = decom.split_domain() + + +flows_0 = lt.TaylorGreenVortex3D(lattice=lattice_gpu, + domain=domains[0], + mach_number=0.05, + reynolds_number=400, + compute_f=True) + +flows_1 = lt.TaylorGreenVortex3D(lattice=lattice_gpu, + domain=domains[1], + mach_number=0.05, + reynolds_number=400, + compute_f=True) + +print(flows_0.f.shape) +print(flows_1.f.shape) + +ff = torch.cat((flows_0.f[:,:-1,...], flows_1.f[:,:-1,...]), dim=1) +print(ff.shape) + +# collision = lt.BGKCollision(lattice_gpu, tau=flows[0].units.relaxation_parameter_lu) +# streaming = lt.StandardStreaming(lattice_gpu) +# simulation = lt.Simulation(flow=flows[0], lattice=lattice_gpu, collision=collision, streaming=streaming) +# mlups = simulation.step(1000) +# print(f"Finish with {mlups} MLUPS") diff --git a/examples/WIP/build_domain.py b/examples/WIP/build_domain.py new file mode 100644 index 00000000..912d3890 --- /dev/null +++ b/examples/WIP/build_domain.py @@ -0,0 +1,34 @@ +import torchdd as dd +import torch +import torch.distributed as dist +import lettuce as lt +import numpy as np + +device = "cpu" +dtype = torch.float64 + +lattice = lt.Lattice(lt.D3Q27, device, dtype) +domain = dd.BoxDomain( + lower=torch.zeros(2), + upper=2*torch.ones(2), + resolution=torch.Size([4, 4]), + n_ghost=[[0, 1], [0, 0]], + mpi_rank=0, + device=device, + dtype=torch.float64, + endpoint=False) + +print(domain.grid(with_ghost=True)) +print("domain device:", domain.device) +print("domain coord:", domain.coord) +print("domain dim:", domain.dim) +print("domain lower:", domain.lower) +print("domain upper:", domain.upper) +print("domain resolution:", domain.resolution) +print("domain shape:", domain.shape) +print("domain tensor_shape:", domain.tensor_shape) +print("domain lengths:", domain.lengths) +print("domain cell_lengts:", domain.cell_lengths) +print("domain n_cells:", domain.n_cells) +print("domain n_points:", domain.n_points) +print("domain n_ghosts:", domain.n_ghost) diff --git a/examples/WIP/build_flow.py b/examples/WIP/build_flow.py new file mode 100644 index 00000000..9b5de444 --- /dev/null +++ b/examples/WIP/build_flow.py @@ -0,0 +1,25 @@ +import torch +import torchdd as dd +import lettuce as lt +import numpy as np + +dtype = torch.float64 +lattice_cpu = lt.Lattice(lt.D3Q27, device="cpu", dtype=dtype) + +domain = dd.BoxDomain( + lower=torch.zeros(3), + upper=2 *np.pi* torch.ones(3), + resolution=torch.Size([50, 50, 50]), + endpoint=False, + mpi_rank=0, + device="cpu", + dtype=torch.float64) + +flow = lt.TaylorGreenVortex3D(lattice=lattice_cpu, + domain=domain, + mach_number=0.05, + reynolds_number=400, + compute_f=True) + +print(flow.f.device) +print(flow.f.shape) \ No newline at end of file diff --git a/examples/WIP/mpi_tgv3d.py b/examples/WIP/mpi_tgv3d.py new file mode 100644 index 00000000..c8ba4636 --- /dev/null +++ b/examples/WIP/mpi_tgv3d.py @@ -0,0 +1,67 @@ +import torch +import torch.distributed as dist +import torchdd as dd +import lettuce as lt +import numpy as np +import matplotlib.pyplot as plt + +res = 50 +time = 1 #sec +step = None +device = "cpu" +interval = 50 +re = 400 + +dist.init_process_group(backend="mpi", rank=-1, world_size=-1) +dtype = torch.float64 +domain = dd.BoxDomain( + lower=torch.zeros(3), + upper=2 * np.pi * torch.ones(3), + resolution=torch.Size([res]*3), + n_ghost=[[0, 1], [0, 0], [0, 0]], + mpi_rank=0, + device="cpu", + dtype=torch.float64, + endpoint=False) + +lattice_cpu = lt.Lattice(lt.D3Q27, device="cpu", dtype=dtype) +lattice_gpu = lt.Lattice(lt.D3Q27, device=device+":"+str(dist.get_rank()), dtype=dtype) + +flow = lt.TaylorGreenVortex3D(lattice=lattice_gpu, + domain=domain, + mach_number=0.05, + reynolds_number=re, + compute_f=False) + +decom = dd.DomainDecomposition(domain=domain, + flow=flow, + dims=[1,1,1], + mpi=True) +domains = decom.split_domain() + +flows = lt.TaylorGreenVortex3D(lattice=lattice_gpu, + domain=domains[0], + mach_number=0.05, + reynolds_number=re, + compute_f=False) + +flows.units = flow.units + +for i in domains: + print("I'm ",flows.__class__.__name__," on rank:", i.rank," with ",flows.domain,", coordinates:", flows.domain.coord) + +collision = lt.BGKCollision(lattice_gpu, tau=flows.units.relaxation_parameter_lu) +# streaming = lt.StandardStreaming(lattice_gpu) +streaming = dd.MPIStreaming(lattice=lattice_gpu, decom=decom, device=device) +simulation = lt.Simulation(flow=flows, lattice=lattice_gpu, collision=collision, streaming=streaming) + +energy = lt.IncompressibleKineticEnergy(lattice_gpu, flow) +reporter = dd.MPIObservableReporter(energy, decomposition=decom, interval=interval,) +simulation.reporters.append(reporter) + +steps = int(flow.units.convert_time_to_lu(time)) if step is None else step +if dist.get_rank() == 0: + print("steps:",steps) +print(f"Start simulation on rank: {domains[0].rank}") +mlups = simulation.step(steps) +print(f"Finish with {mlups} MLUPS") diff --git a/lettuce/boundary.py b/lettuce/boundary.py index 260451a2..83d05c39 100644 --- a/lettuce/boundary.py +++ b/lettuce/boundary.py @@ -15,13 +15,60 @@ """ +import warnings +from typing import Callable, Union import torch import numpy as np -from lettuce import (LettuceException) +from .util import LettuceException, LettuceWarning +from .lattices import Lattice + __all__ = ["BounceBackBoundary", "AntiBounceBackOutlet", "EquilibriumBoundaryPU", "EquilibriumOutletP"] +class Boundary: + """Base class for boundary conditions + + Parameters + ---------- + mask_function : Callable + A function that takes the grid as a sequence of `dim` numpy arrays and returns a boolean mask as a numpy array. + + Examples + -------- + + >>> Boundary(lambda x,y : x>=0.5) + """ + def __init__(self, mask_function: Callable = None): + self.mask_function = mask_function + self._mask = None + + def update_mask(self, lattice, grid): + if self.mask_function is not None: + self._mask = lattice.self.mask_function(grid) + + def make_no_stream_mask(self, mask) -> Union[bool, torch.Tensor]: + return False + + def make_no_collision_mask(self, mask): + return False + + @property + def mask(self): + if self._mask is None: + raise LettuceException(f"Call `update_mask` before accessing .mask") + return self._mask + + @mask.setter + def mask(self, mask): + warnings.warn( + "Setting the boundary mask manually is deprecated as it does " + "not support grid refinement and MPI parallelization. " + "Instead, the boundary constructor should receive a mask-generating function. " + ) + self._mask = mask + + class BounceBackBoundary: """Fullway Bounce-Back Boundary""" diff --git a/lettuce/flows/flow.py b/lettuce/flows/flow.py new file mode 100644 index 00000000..9440882d --- /dev/null +++ b/lettuce/flows/flow.py @@ -0,0 +1,138 @@ + +from copy import deepcopy +from typing import Sequence +import numpy as np +import torch +from typing import Tuple, Union + + +from ..util import LettuceException +from ..lattices import Lattice +from ..boundary import Boundary + + +class Flow: + """ + + Attributes + ---------- + boundaries : Sequence[Boundary] + compute_f: bool - init f on rank 0 + """ + def __init__( + self, + domain: "Domain", + units: "Units" = None, + compute_f: bool = False): + self.domain = domain + self.grid = self.domain.grid(as_numpy=True) + self.units = units + if self.domain.dim != self.units.lattice.D: + raise ValueError( + f"Dimension error. Domain has {self.domain.dim} dimension(s). " + f"{self.units.lattice.stencil.__name__} has {self.units.lattice.D} dimension(s)" + ) + self._compute_f = compute_f + if compute_f and self.domain.rank==0: + self._f = self.compute_initial_f(lattice=self.units.lattice) + else: + self._f = None + + @property + def compute_f(self): + return self._compute_f + + @property + def f(self): + # TODO: FIX, compute_f = False + if self._f is not None and self.domain.rank == 0: + return self._f + elif self._f is None and self.domain.rank == 0 and self.compute_f is True: + raise Exception(f"f is not initialized on rank {self.domain.rank}. " + f"Attribute 'compute_f' is set as {self.compute_f}. ") + elif self.domain.rank != 0: + return None + + @f.setter + def f(self, new_f): + self._f = new_f + + @property + def boundaries(self) -> Sequence[Boundary]: + return [] + + def compute_initial_f(self, lattice: Lattice) -> torch.Tensor: + grid = self.grid + p, u = self.initial_solution(grid) + if not list(p.shape) == [1] + list(grid[0].shape): + raise LettuceException( + f"Wrong dimension of initial pressure field. " + f"Expected {[1] + list(grid[0].shape)}, " + f"but got {list(p.shape)}." + ) + if not list(u.shape) == [lattice.D] + list(grid[0].shape): + raise LettuceException( + "Wrong dimension of initial velocity field." + f"Expected {[lattice.D] + list(grid[0].shape)}, " + f"but got {list(u.shape)}." + ) + u = self.units.lattice.convert_to_tensor(self.units.convert_velocity_to_lu(u)) + rho = self.units.lattice.convert_to_tensor(self.units.convert_pressure_pu_to_density_lu(p)) + return self.units.lattice.equilibrium(rho, self.units.lattice.convert_to_tensor(u)) + + def compute_masks(self, lattice: Lattice) -> Tuple[torch.Tensor, torch.Tensor]: + grid = self.grid + grid_shape = grid[0].shape + f_shape = [lattice.Q, *grid_shape] + no_stream_mask = torch.zeros(f_shape, device=lattice.device, dtype=torch.bool) + no_collision_mask = torch.zeros(grid_shape, device=lattice.device, dtype=torch.bool) + + # Apply boundaries + # boundaries = deepcopy(self.boundaries) # store locally to keep the flow free from the boundary state + for boundary in self.boundaries: + boundary.update_mask(lattice, self.grid) + if hasattr(boundary, "make_no_collision_mask"): + no_collision_mask = no_collision_mask | boundary.make_no_collision_mask(f_shape) + if hasattr(boundary, "make_no_stream_mask"): + no_stream_mask = no_stream_mask | boundary.make_no_stream_mask(f_shape) + + return no_stream_mask, no_collision_mask + + + + +# class DomainDecomposedFlow: +# def __init__(self, flow: Flow, masks: Sequence[npt.NDArray[bool]]): +# grid = flow.grid +# if not all(mask.shape == grid.shape for mask in masks): +# raise ValueError(f"At least one mask shape did not match grid shape ({grid.shape})") +# self.flow = flow +# self.masks = masks +# +# +# +# flow = TaylorGreenVortex3D(...) +# +# # manual decomposition of the flow domain into rectangular/hexagonal domains +# mask0 = flow.grid.x < 0.5 +# mask1 = flow.grid.x >= 0.5 +# +# # set up a distributed flow object +# decomposed = DomainDecomposedFlow(flow, masks=(mask0, mask1)) +# +# # send part of the domain to a device +# decomposed.set_device(0, "cuda:0") +# +# # refine the domain if needed (this is important to do here; big flows will not fit on one node) +# decomposed.refine_domain(0, refinement_level=4) +# +# # send part of the domain to a different device +# decomposed.set_device(1, "cuda:1") +# +# # refine this part more coarsely +# decomposed.refine_domain(1, refinement_level=3) +# +# # set up the simulation with the decomposed flow +# simulation = Simulation(decomposed, ...) + + diff --git a/lettuce/flows/taylorgreen.py b/lettuce/flows/taylorgreen.py index 8b1bc58b..216f8c24 100644 --- a/lettuce/flows/taylorgreen.py +++ b/lettuce/flows/taylorgreen.py @@ -3,19 +3,29 @@ """ import numpy as np - from lettuce.unit import UnitConversion +from lettuce.flows.flow import Flow -class TaylorGreenVortex2D: - def __init__(self, resolution, reynolds_number, mach_number, lattice): - self.resolution = resolution +class TaylorGreenVortex2D(Flow): + def __init__(self, + domain, + reynolds_number, + mach_number, + lattice, + compute_f=False): + self.resolution = domain.shape[1] self.units = UnitConversion( - lattice, - reynolds_number=reynolds_number, mach_number=mach_number, - characteristic_length_lu=resolution, characteristic_length_pu=2 * np.pi, + lattice=lattice, + reynolds_number=reynolds_number, + mach_number=mach_number, + characteristic_length_lu=self.resolution, + characteristic_length_pu=2 * np.pi, characteristic_velocity_pu=1 ) + super().__init__(domain=domain, + units=self.units, + compute_f=compute_f) def analytic_solution(self, x, t=0): nu = self.units.viscosity_pu @@ -27,26 +37,24 @@ def analytic_solution(self, x, t=0): def initial_solution(self, x): return self.analytic_solution(x, t=0) - @property - def grid(self): - x = np.linspace(0, 2 * np.pi, num=self.resolution, endpoint=False) - y = np.linspace(0, 2 * np.pi, num=self.resolution, endpoint=False) - return np.meshgrid(x, y, indexing='ij') - @property - def boundaries(self): - return [] - - -class TaylorGreenVortex3D: - def __init__(self, resolution, reynolds_number, mach_number, lattice): - self.resolution = resolution +class TaylorGreenVortex3D(Flow): + def __init__(self, + domain, + reynolds_number, + mach_number, + lattice, + compute_f=False): + self.resolution = domain.shape[1] self.units = UnitConversion( lattice, reynolds_number=reynolds_number, mach_number=mach_number, - characteristic_length_lu=resolution / (2 * np.pi), characteristic_length_pu=1, + characteristic_length_lu=self.resolution / (2 * np.pi), characteristic_length_pu=1, characteristic_velocity_pu=1 ) + super().__init__(domain=domain, + units=self.units, + compute_f=compute_f) def initial_solution(self, x): u = np.array([ @@ -57,13 +65,13 @@ def initial_solution(self, x): p = np.array([1 / 16. * (np.cos(2 * x[0]) + np.cos(2 * x[1])) * (np.cos(2 * x[2]) + 2)]) return p, u - @property - def grid(self): - x = np.linspace(0, 2 * np.pi, num=self.resolution, endpoint=False) - y = np.linspace(0, 2 * np.pi, num=self.resolution, endpoint=False) - z = np.linspace(0, 2 * np.pi, num=self.resolution, endpoint=False) - return np.meshgrid(x, y, z, indexing='ij') - - @property - def boundaries(self): - return [] + # @property + # def grid(self): + # x = np.linspace(0, 2 * np.pi, num=self.resolution, endpoint=False) + # y = np.linspace(0, 2 * np.pi, num=self.resolution, endpoint=False) + # z = np.linspace(0, 2 * np.pi, num=self.resolution, endpoint=False) + # return np.meshgrid(x, y, z, indexing='ij') + # + # @property + # def boundaries(self): + # return [] diff --git a/lettuce/simulation.py b/lettuce/simulation.py index caf4e8f4..d7074ce6 100644 --- a/lettuce/simulation.py +++ b/lettuce/simulation.py @@ -31,36 +31,15 @@ def __init__(self, flow, lattice, collision, streaming): self.streaming = streaming self.i = 0 - grid = flow.grid - p, u = flow.initial_solution(grid) - assert list(p.shape) == [1] + list(grid[0].shape), \ - LettuceException(f"Wrong dimension of initial pressure field. " - f"Expected {[1] + list(grid[0].shape)}, " - f"but got {list(p.shape)}.") - assert list(u.shape) == [lattice.D] + list(grid[0].shape), \ - LettuceException("Wrong dimension of initial velocity field." - f"Expected {[lattice.D] + list(grid[0].shape)}, " - f"but got {list(u.shape)}.") - u = lattice.convert_to_tensor(flow.units.convert_velocity_to_lu(u)) - rho = lattice.convert_to_tensor(flow.units.convert_pressure_pu_to_density_lu(p)) - self.f = lattice.equilibrium(rho, lattice.convert_to_tensor(u)) - self.reporters = [] # Define masks, where the collision or streaming are not applied - x = flow.grid - self.no_collision_mask = lattice.convert_to_tensor(np.zeros_like(x[0], dtype=bool)) - no_stream_mask = lattice.convert_to_tensor(np.zeros(self.f.shape, dtype=bool)) - - # Apply boundaries - self._boundaries = deepcopy(self.flow.boundaries) # store locally to keep the flow free from the boundary state - for boundary in self._boundaries: - if hasattr(boundary, "make_no_collision_mask"): - self.no_collision_mask = self.no_collision_mask | boundary.make_no_collision_mask(self.f.shape) - if hasattr(boundary, "make_no_stream_mask"): - no_stream_mask = no_stream_mask | boundary.make_no_stream_mask(self.f.shape) - if no_stream_mask.any(): - self.streaming.no_stream_mask = no_stream_mask + self.f = (flow.f.to(device=self.lattice.device) + if flow.f is not None + else + flow.compute_initial_f(self.lattice).to(device=self.lattice.device)) + no_stream_mask, self.no_collision_mask = flow.compute_masks(self.lattice) + self.streaming.no_stream_mask = no_stream_mask if no_stream_mask.any() else None def step(self, num_steps): """Take num_steps stream-and-collision steps and return performance in MLUPS.""" @@ -72,8 +51,8 @@ def step(self, num_steps): self.f = self.streaming(self.f) # Perform the collision routine everywhere, expect where the no_collision_mask is true self.f = torch.where(self.no_collision_mask, self.f, self.collision(self.f)) - for boundary in self._boundaries: - self.f = boundary(self.f) + # for boundary in self._boundaries: + # self.f = boundary(self.f) self._report() end = timer() seconds = end - start diff --git a/setup.py b/setup.py index 6321f128..e4b233fe 100644 --- a/setup.py +++ b/setup.py @@ -47,3 +47,9 @@ cmdclass=versioneer.get_cmdclass(), zip_safe=False, ) + +setup( + name="torchdd", + version="0.0.1", + packages=find_packages('torchdd') +) diff --git a/tests/conftest.py b/tests/conftest.py index 5c8de7a0..83e26363 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -75,3 +75,7 @@ def f_transform(request, f_all_lattices): return f, Transform(lattice) else: pytest.skip("Stencil not supported for this transform.") + +@pytest.fixture() +def ctx(dtype_device): + return {"dtype": dtype_device[0], "device": dtype_device[1]} diff --git a/tests/test_domain.py b/tests/test_domain.py new file mode 100644 index 00000000..f6e6256e --- /dev/null +++ b/tests/test_domain.py @@ -0,0 +1,139 @@ + + +import pytest +import torch +from torchdd import BoxDomain + +@pytest.mark.parametrize("endpoint", [True, False]) +@pytest.mark.parametrize("dim", [1, 2, 3]) +@pytest.mark.parametrize("n_cells", [1, 4]) +def test_grid_cubic(dim, n_cells, endpoint, ctx): + domain = BoxDomain( + lower=torch.zeros(dim, **ctx), + upper=torch.ones(dim, **ctx), + resolution=torch.Size([n_cells] * dim), + endpoint=endpoint, + cubic_cells=True + ) + assert domain.n_cells == n_cells**dim + assert domain.n_points == (n_cells + 1)**dim + grid = domain.grid() + assert len(grid) == dim + for g in grid: + assert g.shape == tuple([n_cells + 1] * dim) + assert g.shape == domain.shape + if endpoint == True: + assert g.max().item() == pytest.approx(1.0) + if endpoint == False: + assert g.max().item() == pytest.approx(n_cells/(n_cells+1)) + assert g.min().item() == pytest.approx(0.0) + + domain.resolution = [2] * dim + + if dim > 1: + with pytest.raises(ValueError, match="Quad domain with domain lengths"): + domain.resolution = [2] * (dim-1) + [3] + + +def test_cubic_fail(ctx): + with pytest.raises(ValueError, match="Quad domain with domain lengths"): + BoxDomain( + lower=torch.zeros(2, **ctx), + upper=torch.ones(2, **ctx), + resolution=torch.Size([2, 3]), + cubic_cells=True + ) + + +def test_noncubic(ctx): + domain = BoxDomain( + lower=torch.zeros(2, **ctx), + upper=torch.ones(2, **ctx), + resolution=torch.Size([2, 3]), + cubic_cells=False + ) + assert domain.resolution == (2, 3) + assert domain.shape == (3, 4) + domain.shape = (5, 6) + assert domain.resolution == (4, 5) + + grid = domain.grid(as_numpy=True) + for g in grid: + assert g.shape == domain.shape + + +# def test_contains(ctx): +# domain = BoxDomain( +# lower=torch.zeros(2, **ctx), +# upper=torch.ones(2, **ctx), +# resolution=torch.Size([2, 3]), +# cubic_cells=False +# ) +# assert 0.5 * torch.ones(2, **ctx) in domain +# assert (torch.rand(4, 2, **ctx) in domain) +# x = torch.rand(4, 2, **ctx) +# x[2:, :] += 1.0 +# contains = domain.contains(x) +# assert (contains == torch.Tensor([True, True, False, False]).to(contains)).all() + +@pytest.mark.parametrize("endpoint", [True, False]) +def test_coarsen_refine(endpoint, ctx): + domain = BoxDomain( + lower=torch.zeros(3, **ctx), + upper=torch.ones(3, **ctx), + resolution=torch.Size([2, 2, 2]), + endpoint=endpoint, + cubic_cells=True + ) + domain.refine(1) + assert domain.resolution == (4, 4, 4) + domain.coarsen(2) + assert domain.resolution == (1, 1, 1) + with pytest.raises(ValueError, match="BoxDomain with resolution"): + domain.coarsen(1) + + +def test_ghost(ctx): + domain = BoxDomain( + lower=torch.zeros(3, **ctx), + upper=2 * torch.ones(3, **ctx), + resolution=torch.Size([2, 2, 2]), + cubic_cells=True, + n_ghost=[[1, 1], [0, 0], [0, 0]] + ) + assert (domain.n_ghost == torch.tensor([[1, 1], [0, 0], [0, 0]], dtype=torch.int, device=ctx['device'])).all() + grid = domain.grid(with_ghost=False) + for g in grid: + assert g.shape == domain.shape + + grid_with_ghost = domain.grid(with_ghost=True) + for g in grid_with_ghost: + assert g.shape == domain.tensor_shape + + with pytest.raises(ValueError, match="n_ghost needs to have shape"): + BoxDomain( + lower=torch.zeros(3, **ctx), + upper=2*torch.ones(3, **ctx), + resolution=torch.Size([2, 2, 2]), + cubic_cells=True, + n_ghost=[[1,1], [0,0]] + ) + +@pytest.mark.parametrize("endpoint", [True, False]) +def test_split(endpoint, ctx): + domain = BoxDomain( + lower=torch.zeros(3, **ctx), + upper=2*torch.ones(3, **ctx), + resolution=torch.Size([10, 10, 10]), + endpoint=endpoint, + cubic_cells=True + ) + + domains = domain.split(1.0, 1.6) if endpoint is True else domain.split(2*5/11, 2*8/11) + for i in domains: + assert i.resolution[0] in [5,3,2] + + with pytest.raises(ValueError, match="Coordinate not contained in box."): + domain.split(1.25) + + diff --git a/torchdd/__init__.py b/torchdd/__init__.py new file mode 100644 index 00000000..948dc104 --- /dev/null +++ b/torchdd/__init__.py @@ -0,0 +1,5 @@ +from .domain import * +from .decompostion import * +from .ddtensor import * + +#TODO: README Tabelle erstellen mit unterstützten Funktionen für Lettuce, MPI, CUDA Extension \ No newline at end of file diff --git a/torchdd/communication.py b/torchdd/communication.py new file mode 100644 index 00000000..cfb0ae15 --- /dev/null +++ b/torchdd/communication.py @@ -0,0 +1,11 @@ +import torch +import torch.distributed as dist + +#TODO: Concept for communcation + +class DomainDecomposedTensor(): + def __init__(self): + pass + + def communicate(self): + pass \ No newline at end of file diff --git a/torchdd/ddtensor.py b/torchdd/ddtensor.py new file mode 100644 index 00000000..68ede910 --- /dev/null +++ b/torchdd/ddtensor.py @@ -0,0 +1,51 @@ + +__all__ = ["DDTensor"] + +import torch +from typing import Union + + +class DDTensor(torch.TensorType): + def __init__(self, *input): + super().__init__(*input) + self._domains = [] + self._tensors = [] + + @property + def domains(self): + return self._domains + + @property + def tensors(self): + return self._tensors + + @property + def local_domains(self): + pass + + @property + def local_tensors(self): + pass + + @property + def local_shapes(self): + pass + + def __getitem__(self, index: Union[int, slice]) -> Union[torch.Tensor, "DomainDecomposedTensor"]: + pass + + # def __getattr__(self, field: str): + # local_tensor = list(self.local_tensors) + # if isinstance(, Callable): + # pass + +# # alias +# DDTensor = DomainDecomposedTensor +# +# class DDTensorMethod: +# def __init__(self, name, sub_tensors): +# pass +# +# class DDShape: +# + diff --git a/torchdd/decompostion.py b/torchdd/decompostion.py new file mode 100644 index 00000000..4c695905 --- /dev/null +++ b/torchdd/decompostion.py @@ -0,0 +1,295 @@ + +__all__ = ["DomainDecomposition", "MPIObservableReporter", "MPIStreaming"] + +from .domain import Domain, BoxDomain +from typing import Sequence, Optional +import os +import sys +import torch.distributed as dist +import torch +import copy +import numpy as np + +# +# TODO: (v0.1.5) MPI Streaming +# TODO: (v0.1.5) Upper class for boundaries +# TODO: (v0.1.5) Reporter for observables +# TODO: (v0.1.5) Visualization - VTK Files + +# for student +# TODO: (v0.2.0) Expand attribute dims for 2nd, 3rd dimension (splitting) +# TODO: (v0.2.0) Create map, coord, topology +# TODO: (v0.2.0) Create Fct. for flow-decomposition +# TODO: (v0.2.0) Syntax for Fct names +# TODO: (v0.2.0) Add utilities from mpi4py) + +class DomainDecomposition: + def __init__(self, + domain: "Domain" = None, + flow: "Flow" = None, + dims: Sequence[int] = None, + mpi: bool = False, + src: int = 0,): + self.flow = flow + # self.domain = domain if domain else flow.domain + self.domain = domain + self.domains = [] + self._dims = dims + self.mpi = mpi + self.src = src + self._mpi_rank = domain.rank + if self.mpi: + self._mpi_rank = dist.get_rank() + self._mpi_size = dist.get_world_size() + self._mpi_group = dist.group.WORLD + self.init_mpi_process() + self._dims[0] = self._mpi_size #TODO: only for dim==0 + self._placeholder = [] + else: + self._mpi_rank = domain.rank + + @property + def dims(self): + return self._dims + + @dims.setter + def dims(self, new_dim): + self._dims = new_dim + + @property + def topo(self): + return self._topo + + @topo.setter + def topo(self, new_topo): + self._topo = new_topo + + @property + def mpi_size(self): + if self._mpi_size is None: + raise Exception("MPI distribution is not initialized.") + else: + return self._mpi_size + + @mpi_size.setter + def mpi_size(self, new_size: int): + self._mpi_size = new_size + + @property + def mpi_rank(self) -> int: + if self._mpi_rank is None: + raise Exception("MPI distribution is not initialized.") + else: + return self._mpi_rank + + @mpi_rank.setter + def mpi_rank(self, new_rank: int): + if new_rank >= self.mpi_size: + raise ValueError(f"mpi_rank ({new_rank}) has to be smaller than mpi_size ({self.mpi_size()})") + self._rank = new_rank + + @property + def mpi_group(self): + return self._mpi_group + + @mpi_group.setter + def mpi_group(self, new_group): + self._mpi_group = new_group + + def init_mpi_process(self, + rank=-1, + size=-1, + group=None, + backend="mpi"): + # pass + """ Initialize the distributed environment. """ + # os.environ['MASTER_ADDR'] = '127.0.0.1' + # os.environ['MASTER_PORT'] = '29500' + # dist.init_process_group(backend=backend, + # rank=rank, + # world_size=size) + self.mpi_size = dist.get_world_size() + self.mpi_rank = dist.get_rank() + self.mpi_group = dist.group.WORLD if group is None else group + + def split_domain(self, *coordinates, split_flow: bool = False) -> Sequence["BoxDomain"]: + """Split domain + Notes: + - works currently only for dim==0 + """ + if self.dims[0] == 1: + raise ValueError(f"Dims: {self.dims} is to low.") + if self.mpi_rank == 0: + if len(coordinates)==0: + coordinates = ((self.domain.grid()[0][np.linspace(0, self.domain.resolution[0], self.dims[0] + 1, dtype=int), 0])[1:-1] if self.domain.dim==2 else + (self.domain.grid()[0][np.linspace(0, self.domain.resolution[0], self.dims[0] + 1, dtype=int), 0, 0])[1:-1] + ) + domains = self.domain.split(*coordinates) + self._placeholder = [torch.tensor(_.shape,dtype=int) for _ in domains] + for i in range(len(self._placeholder)): + self._placeholder[i][0] -= 1 + self.create_map(domains) + flows = self.split_flow(domains) if split_flow else None + else: + domains = None + flows = None + if self.mpi: + domain = self.send(domains) + if split_flow: + flow = self.send(flows) + self.create_map(domain, mpi=True) + return (domain, flow) if split_flow else domain + else: + return (domains, flows) if split_flow else domains + + def split_flow(self, domains, flow = None) -> Sequence["BoxDomain"]: + # TODO: This might be a memory problem + _flow = self.flow if flow is None else flow + if _flow is None: + raise ValueError("There is no flow to split") + flows = [] + counter = 0 + for domain_id, domain in enumerate(domains): + flow = copy.deepcopy(_flow) + for i in range(self.domain.shape[0] - domain.shape[0] + 1): + fit = torch.all(torch.isclose(self.domain.grid()[0][i:i + domain.shape[0], ...], domain.grid()[0])) + if fit: + counter += 1 + flow.f = _flow.f[:, i:i + domain.shape[0], ...].clone() + flow.domain = copy.deepcopy(domain) + flow.grid = self.domain.grid(as_numpy=True)[0][i:i + domain.shape[0], ...] + flows.append(flow) + if domain_id + 1 is not counter: + raise ValueError( + f"Flow is not splitted correctly. " + f"Domain is splitted into {len(domains)} domains. " + f"Flow is splitted into {counter} flows." + ) + return flows + + def send(self, objects): + """Sending domains to devices""" + if self.mpi_rank == 0: + print("Sending",objects[0].__class__.__name__,"to devices.") + else: + objects = [None] * self.mpi_size + object = [None] + dist.scatter_object_list(object, objects, src=self.src, group=self.mpi_group) + object[0].rank = self.mpi_rank + return object + + def create_map(self, domains, mpi = False): + """Create a map with coordination""" + #TODO: Naiv hard coded implementiert. Hier muss überprüft werden, auf welche Ranks die Domains verteilt wurden. + self.topo = torch.zeros(self.dims) + if mpi: + self.topo = torch.arange(torch.prod(torch.tensor(self.dims))).view(self.dims) + self.left_neighbor = torch.roll(self.topo,1,0) + self.right_neighbor = torch.roll(self.topo, 1, 0) + + def get_topo(self): + """Return information on the cartesian topology""" + # TODO: Decom - Implement fct. - get_topo + return self.topo + + def get_neighbors(self): + """Return information on the cartesian topology""" + left_neighbor = torch.roll(self.topo, 1, 0) + right_neighbor = torch.roll(self.topo, -1, 0) + return left_neighbor, right_neighbor + + def get_coords(self, rank: int): + """Translate ranks to logical coordinates""" + #TODO: Decom - Implement fct. - get_coords + pass + +class MPIStreaming: + def __init__(self, lattice, decom, device): + self.lattice = lattice + self._no_stream_mask = None + self.decom = decom + self.device = device+":"+str(decom.mpi_rank) + # self.device = "cpu" + + @property + def no_stream_mask(self): + return self._no_stream_mask + + @no_stream_mask.setter + def no_stream_mask(self, mask): + self._no_stream_mask = mask + + def __call__(self, f): + for i in range(1, self.lattice.Q): + if self.no_stream_mask is None: + f[i] = self._stream(f, i) + else: + new_fi = self._stream(f, i) + f[i] = torch.where(self.no_stream_mask[i], f[i], new_fi) + f = self._communication(f).clone() + return f + + def _stream(self, f, i): + return torch.roll(f[i], shifts=tuple(self.lattice.stencil.e[i]), dims=tuple(np.arange(self.lattice.D))) + + def _communication(self, f): + left_neighbor, right_neighbor = self.decom.get_neighbors() + + # print(f"I'm {self.decom.mpi_rank} and my left_neighbor is {left_neighbor[self.decom.mpi_rank]}") + # print(f"I'm {self.decom.mpi_rank} and my right is {right_neighbor[self.decom.mpi_rank]}") + f_to_right = f[self.lattice.e[:, 0] == 1, -1, ...].cpu().clone().detach() + f_from_left = torch.zeros_like(f_to_right).clone().detach() + # f_from_left = torch.zeros_like(left_neighbor[self.decom.mpi_rank]) + to_right = dist.isend(tensor=f_to_right, dst=right_neighbor[self.decom.mpi_rank]) + # to_right = dist.isend(tensor=torch.tensor(self.decom.mpi_rank), dst=right_neighbor[self.decom.mpi_rank]) + from_left = dist.irecv(tensor=f_from_left, src=left_neighbor[self.decom.mpi_rank]) + to_right.wait() + from_left.wait() + f[self.lattice.e[:, 0] == 1, 0, ...] = f_from_left.detach().to(device=self.device) + #print(f"Im rank {self.decom.mpi_rank} and my info is: ", f_from_left) + + f_to_left = f[self.lattice.e[:, 0] == -1, 0, ...].cpu().clone().detach() + f_from_right = torch.zeros_like(f_to_left).clone().detach() + to_left = dist.isend(tensor=f_to_left, dst=left_neighbor[self.decom.mpi_rank]) + from_right = dist.irecv(tensor=f_from_right, src=right_neighbor[self.decom.mpi_rank]) + to_left.wait() + from_right.wait() + f[self.lattice.e[:,0] == -1, -1, ...] = f_from_right.detach().to(device=self.device) + + + return f + +class MPIObservableReporter: + def __init__(self, observable, decomposition, interval=1, out=sys.stdout): + self.observable = observable + self.decomposition = decomposition + self.interval = interval + self.out = [] if out is None else out + self._parameter_name = observable.__class__.__name__ + if dist.get_rank() == 0: + print('steps ', 'time ', self._parameter_name) + + def __call__(self, i, t, f): + if i % self.interval == 0: + f_send = f[:, :-1, ...].detach().clone().cpu() + + index = [f.shape[0]] + [-1] * self.decomposition.domain.dim + f_all = [torch.zeros(tuple(_), dtype=f_send.dtype, device=f_send.device)[None, ...].expand(index).clone() for _ in + self.decomposition._placeholder] if self.decomposition.mpi_rank == 0 else None + + torch.distributed.gather_object(obj=f_send, object_gather_list=f_all, dst=0) + if self.decomposition.mpi_rank == 0: + ff = torch.cat(f_all, dim=1).to(device=f.device) + + + observed = self.observable.lattice.convert_to_numpy(self.observable(ff)) + assert len(observed.shape) < 2 + if len(observed.shape) == 0: + observed = [observed.item()] + else: + observed = observed.tolist() + entry = [i, t] + observed + if isinstance(self.out, list): + self.out.append(entry) + else: + print(*entry, file=self.out) \ No newline at end of file diff --git a/torchdd/domain.py b/torchdd/domain.py new file mode 100644 index 00000000..500f6aec --- /dev/null +++ b/torchdd/domain.py @@ -0,0 +1,271 @@ +__all__ = ["Domain", "BoxDomain"] + +import copy +from typing import Sequence, Union +import torch +import torch.distributed as dist +import os +import numpy as np +#from math import prod + +# TODO: (v0.1.5) Add Endpoint (TRUE/FALSE) +# TODO: (v0.1.5) Concept for ghostcells +# TODO: (v0.2.0) Concept for periodicity (see. mpi4py intercomm/create_cart) +# TODO: (v0.2.0) Add META Attribute + +class Domain: + def __init__(self, + mpi_rank: int = 0, + meta: "DomainMeta" = None): + self._meta = meta + self._mpi_rank = mpi_rank + + @property + def mpi_rank(self) -> int: + return self._mpi_rank + + @mpi_rank.setter + def mpi_rank(self, new_rank: int): + # if new_rank >= self.mpi_size: + # raise ValueError(f"mpi_rank ({new_rank}) has to be smaller than mpi_size ({self.mpi_size()})") + self._mpi_rank = new_rank + + @property + def rank(self): + return self.mpi_rank + + @rank.setter + def rank(self, new_rank: int): + self.mpi_rank = new_rank + + @property + def meta(self) -> Union["DomainMeta", None]: + if hasattr(self, "_meta"): + return self._meta + return None + + @meta.setter + def meta(self, meta: Union["DomainMeta", None]): + self._meta = meta + + @property + def dim(self) -> int: + raise NotImplementedError() + + @property + def tensor_shape(self) -> torch.Size: + raise NotImplementedError() + + @property + def ghosts(self): + raise NotImplementedError() + + @property + def grid(self, as_numpy=False, with_ghost=False) -> Union[Sequence[torch.Tensor], Sequence[np.ndarray]]: + raise NotImplementedError() + + def contains(self, points: torch.Tensor) -> torch.Tensor: + raise NotImplementedError() + + +class BoxDomain(Domain): + """(Hyper-)Rectangular domain of dimension `D` + + Attributes + ---------- + lower : torch.Tensor + lower bound of the box + ... # TODO + n_ghost : Sequence[Sequence[int]] + Int tensor of shape `(D, 2)`. Number of ghost nodes below and above domain. + The first and second column denotes the number of lower und upper ghost cells + along each axis, respectively. + """ + def __init__( + self, + lower: torch.Tensor, + upper: torch.Tensor, + resolution: Sequence[int], + mpi_rank: int = 0, + cubic_cells: bool = True, + n_ghost: Sequence[Sequence[int]] = None, + device: str or torch.device = None, + dtype: torch.dtype = None, + endpoint: bool = True, + ): + super().__init__(mpi_rank=mpi_rank) + self.endpoint = endpoint + self.device = device if device is not None else lower.device + self.dtype = dtype if dtype is not None else lower.dtype + self._cubic_cells = cubic_cells + self._lower = lower.to(device=device,dtype=dtype) + self._upper = upper.to(device=device,dtype=dtype) + self.resolution = resolution + self.n_ghost = n_ghost + self._coord = [0] * self.dim + + @property + def coord(self) -> Sequence[int]: + return self._coord + + @coord.setter + def coord(self, new_coord: Sequence[int]): + self._coord = new_coord + + @property + def dim(self) -> int: + return self._lower.shape[0] + + @property + def lower(self) -> torch.Tensor: + return self._lower + + @property + def upper(self) -> torch.Tensor: + return self._upper + + @property + def resolution(self) -> torch.Size: + return torch.Size(self._resolution) + + @resolution.setter + def resolution(self, new_resolution: Sequence[int]): + if self._cubic_cells: + cell_lengths = torch.tensor([length / res for length, res in zip(self.lengths, new_resolution)]) + if not all(torch.allclose(cell_length, cell_lengths[0]) for cell_length in cell_lengths): + raise ValueError( + f"Quad domain with domain lengths {[x for x in self.lengths]} cannot be " + f"represented by {[x for x in new_resolution]} cubic cells. " + f"Cell lengths {[x for x in cell_lengths]} would be non-cubic." + ) + self._resolution = torch.Size(new_resolution) + # TODO: (v0.2.0) Implement meta data + # if self._meta is not None: + # self._meta.update(self) + + + @property + def shape(self) -> torch.Size: + return torch.Size([res + 1 for res in self.resolution]) + + @property + def tensor_shape(self) -> torch.Size: + return torch.Size([length + ghost.sum() for length, ghost in zip(self.shape, self.n_ghost)]) + + @shape.setter + def shape(self, new_shape: Sequence[int]): + self.resolution = [s - 1 for s in new_shape] + + @property + def lengths(self) -> torch.Tensor: + return self.upper - self.lower + + @property + def cell_lengths(self) -> torch.Tensor: + modifier = (torch.tensor(self.resolution, device=self.upper.device) if self.endpoint + else torch.tensor(self.resolution, device=self.upper.device)+1) + return (self.upper - self.lower)/modifier + + @property + def n_cells(self) -> int: + return torch.prod(torch.tensor(self.resolution, device=self.upper.device), dtype= int) + + @property + def n_points(self) -> int: + return torch.prod(torch.tensor(self.shape, device=self.upper.device), dtype= int) + + @property + def n_ghost(self) -> torch.IntTensor: + return self._n_ghost.to(self.upper.device) + + @n_ghost.setter + def n_ghost(self, n_ghost: Union[Sequence[Sequence[int]], None]): + num = torch.zeros(self.dim, 2, dtype=torch.int) if n_ghost is None else torch.tensor(n_ghost, dtype=torch.int) + if num.shape != (self.dim, 2): + raise ValueError("n_ghost needs to have shape `(dim, 2)`") + self._n_ghost = num + + def refine(self, refinement_level: int, inplace=True) -> "BoxDomain": + new_resolution = tuple(res * 2**refinement_level for res in self.resolution) + domain = self if inplace else copy.deepcopy(self) + domain.resolution = new_resolution + return domain + + def coarsen(self, coarsen_level: int, inplace=True) -> "BoxDomain": + if any(n % 2**coarsen_level != 0 for n in self.resolution): + raise ValueError( + f"BoxDomain with resolution {self.resolution} cannot be coarsened by {coarsen_level} levels. " + f"All dimensions need to be multiples of 2**coarsen_level = {2**coarsen_level}." + ) + new_resolution = tuple(res // 2**coarsen_level for res in self.resolution) + domain = self if inplace else copy.deepcopy(self) + domain.resolution = new_resolution + return domain + + + def grid(self, as_numpy=False, with_ghost=False) -> Union[Sequence[torch.Tensor], Sequence[np.ndarray]]: + resolution = (torch.tensor(self.resolution, device=self.device) + self.n_ghost.sum(dim=1) if with_ghost else self.resolution) + _lower = (self.lower - self.cell_lengths*self.n_ghost[:,0] if with_ghost else self.lower).to(device='cpu') + _upper = (self.upper + self.cell_lengths*self.n_ghost[:,1] if with_ghost else self.upper).to(device='cpu') + cell_boundaries_1d = (torch.tensor(np.linspace(lower, upper, step + 1, endpoint=self.endpoint), dtype=self.dtype, device=self.device) + for lower, upper, step in zip(_lower, _upper, resolution)) + grid = torch.meshgrid(*cell_boundaries_1d)#, indexing='ij') #TODO: indexing is not compatible with every PyTorch version + + if as_numpy: + return tuple(g.detach().cpu().numpy() for g in grid) + else: + return grid + + + def contains(self, points: torch.Tensor) -> torch.Tensor: + return ((points >= self.lower) & (points <= self.upper)).all(dim=-1) + + + def split(self, *coordinates: float, n_ghost: int = 1, dim: int = 0) -> Sequence["BoxDomain"]: + grid_points = [] + bounds = [self.lower] + upper_virtual = self.upper.clone() + self.cell_lengths * self.n_ghost[:, 1] + if self.endpoint is False: + upper_virtual -= self.cell_lengths + for coordinate in sorted(coordinates): + point_fraction = ((coordinate - self.lower[dim]) / self.lengths[dim]).item() + float_index = self.resolution[dim] * point_fraction + int_index = round(float_index) + if any(torch.isclose(self.grid()[0][((slice(None),) + (0,) * (self.dim-1))], torch.tensor(coordinate,dtype=bounds[0].dtype))) is not True: + raise ValueError( + f"Cannot split {self} at coordinate {coordinate} along dimension {dim}." + f"Coordinate not contained in box." + ) + bound = self.lower + int_index * self.cell_lengths + grid_points.append(int_index-sum(grid_points)) + bounds.append(bound) + grid_points.append(self.resolution[dim] - sum(grid_points) + self.n_ghost[0, 1]) + bounds.append(upper_virtual) + # bounds[-1] -= + domains = [] + for i in range(len(bounds) - 1): + lower_point, upper_point = bounds[i], bounds[i+1] + lower = self.lower[:].clone() + lower[dim] = lower_point[dim] + upper = upper_virtual.clone() + upper[dim] = upper_point[dim] + resolution = torch.tensor(self.resolution, device=self.upper.device).clone() + resolution[dim] = grid_points[i] + domain = BoxDomain( + lower=lower, + upper=upper, + resolution=resolution, + mpi_rank=self.mpi_rank, + cubic_cells=self._cubic_cells, + ) + domain.coord[0] = i #TODO: version 1 (only for first dimension) + domains.append(domain) + return tuple(domains) + + + def __str__(self): + bounds_string = [f'[{a:.1f},{b:.1f}]' for a, b in zip(self.lower, self.upper)] + res_string = 'x'.join(str(r) for r in self.resolution) + bounds_string = 'x'.join(bounds_string) + return f"BoxDomain(bounds={bounds_string}; res={res_string}; endpoint={self.endpoint}; on rank {self.rank}; with coord {self.coord})" + diff --git a/torchdd/mpi.py b/torchdd/mpi.py new file mode 100644 index 00000000..d05bedb7 --- /dev/null +++ b/torchdd/mpi.py @@ -0,0 +1,11 @@ + +__all__ = ["mpi_rank", "mpi_size"] + + +def mpi_size(): + pass + + +def mpi_rank(): + pass +