The idea is to build a MosaicMethod class with 3 required method:
feed: add tile data and mask
is_done: check if we are done processing
get_data: return tile and mask data
class FirstMethod():
def __init__(self, tile=None):
self.tile = tile
self.exit_when_filled = True
def get_data(self):
"""Return tile data and mask."""
if self.tile is not None:
return self.tile.data, ~self.tile.mask[0] * 255
else:
return None, None
def is_done(self):
"""Check if we need more data."""
if self.exit_when_filled and not numpy.ma.is_masked(self.tile):
return True
def feed(self, tile):
"""Add data to tile."""
if self.tile is None:
self.tile = tile
pidex = self.tile.mask & ~tile.mask
mask = numpy.where(pidex, tile.mask, self.tile.mask)
self.tile = numpy.ma.where(pidex, tile, self.tile)
self.tile.mask = mask
Implementation:
algo = FirstMethod()
_tiler = partial(tiler, tile_x=tile_x, tile_y=tile_y, tile_z=tile_z, **kwargs)
max_threads = int(os.environ.get("MAX_THREADS", multiprocessing.cpu_count() * 5))
for chunks in _chunks(assets, max_threads):
with futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
future_tasks = [executor.submit(_tiler, asset) for asset in chunks]
for t, m in _filter_futures(future_tasks):
t = numpy.ma.array(t)
t.mask = m == 0
algo.feed(t)
if algo.is_done():
return algo.get_data()
This is how it could look like.
The idea is to build a
MosaicMethod
class with 3 required method:tile
andmask
dataImplementation: