Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove asyncio loop; add support for non-EPSG-4326 co-ordinate refere… #220

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependency-injector = "==4.41.0"
numba = "==0.56.4"
pint = "*"
shapely = "*"
pyproj = "*"

[dev-packages]
isort = "*"
Expand All @@ -29,6 +30,7 @@ sphinx-toolbox = "*"
sphinx-design = "*"
nbsphinx = "*"
pydata-sphinx-theme = "*"
ipykernel = "*"

[requires]
python_version = "3.8"
1,970 changes: 939 additions & 1,031 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ install_requires =
numpy==1.22.0
pillow==9.4.0
pydantic==2.4.2
pyproj==3.5.0
python-dotenv==0.19.2
requests==2.27.1
scipy==1.7.3
Expand Down
9 changes: 3 additions & 6 deletions src/physrisk/data/pregenerated_hazard_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import concurrent.futures
from collections import defaultdict
from typing import Dict, List, Mapping, MutableMapping, Optional, cast
Expand Down Expand Up @@ -50,13 +49,11 @@ def get_hazard_events( # noqa: C901
# e.g. asyncio.get_event_loop().run_in_executor
# 2) use thread pool
# for now we do 2; 1 might be preferred if the number of threads needed to download
# data in parallel becomes large (probably not given use of async in Zarr).
# data in parallel becomes large (probably not, given use of async in Zarr).

# result = asyncio.gather(self._get_hazard_events(requests))
# return result
return asyncio.run(self._get_hazard_events(requests))
return self._get_hazard_events(requests)

async def _get_hazard_events( # noqa: C901
def _get_hazard_events( # noqa: C901
self, requests: List[HazardDataRequest]
) -> Mapping[HazardDataRequest, HazardDataResponse]:
batches = defaultdict(list)
Expand Down
15 changes: 12 additions & 3 deletions src/physrisk/data/zarr_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import s3fs
import zarr
from affine import Affine
from pyproj import Transformer
from shapely import MultiPoint, Point, affinity


Expand Down Expand Up @@ -95,10 +96,13 @@ def get_curves(self, set_id, longitudes, latitudes, interpolation="floor"):
# OSC-specific attributes contain transform and return periods
t = z.attrs["transform_mat3x3"] # type: ignore
transform = Affine(t[0], t[1], t[2], t[3], t[4], t[5])
crs = z.attrs.get("crs", "epsg:4326")

# in the case of acute risks, index_values will contain the return periods
index_values = self.get_index_values(z)
image_coords = self._get_coordinates(longitudes, latitudes, transform, pixel_is_area=interpolation != "floor")
image_coords = self._get_coordinates(
longitudes, latitudes, crs, transform, pixel_is_area=interpolation != "floor"
)

if interpolation == "floor":
image_coords = np.floor(image_coords).astype(int)
Expand Down Expand Up @@ -334,8 +338,13 @@ def _linear_interp_frac_coordinates(z, image_coords, return_periods, interpolati
raise ValueError("interpolation must have value 'linear', 'max' or 'min")

@staticmethod
def _get_coordinates(longitudes, latitudes, transform: Affine, pixel_is_area: bool):
coords = np.vstack((longitudes, latitudes, np.ones(len(longitudes)))) # type: ignore
def _get_coordinates(longitudes, latitudes, crs: str, transform: Affine, pixel_is_area: bool):
if crs.lower() != "epsg:4236":
transproj = Transformer.from_crs("epsg:4326", crs, always_xy=True)
x, y = transproj.transform(longitudes, latitudes)
else:
x, y = longitudes, latitudes
coords = np.vstack((x, y, np.ones(len(longitudes)))) # type: ignore
inv_trans = ~transform
mat = np.array(inv_trans).reshape(3, 3)
frac_image_coords = mat @ coords
Expand Down
25 changes: 17 additions & 8 deletions src/physrisk/kernel/impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from physrisk.kernel.vulnerability_model import DataRequester, VulnerabilityModelAcuteBase, VulnerabilityModelBase
from physrisk.utils.helpers import get_iterable

logger = logging.getLogger(__name__)


class ImpactKey(NamedTuple):
asset: Asset
Expand Down Expand Up @@ -67,14 +69,21 @@ def calculate_impacts( # noqa: C901
assert isinstance(model, VulnerabilityModelBase)
results[ImpactKey(asset=asset, hazard_type=model.hazard_type)] = AssetImpactResult(EmptyImpactDistrib())
continue
if isinstance(model, VulnerabilityModelAcuteBase):
impact, vul, event = model.get_impact_details(asset, hazard_data)
results[ImpactKey(asset=asset, hazard_type=model.hazard_type)] = AssetImpactResult(
impact, vulnerability=vul, event=event, hazard_data=hazard_data
)
elif isinstance(model, VulnerabilityModelBase):
impact = model.get_impact(asset, hazard_data)
results[ImpactKey(asset, model.hazard_type)] = AssetImpactResult(impact, hazard_data=hazard_data)
try:
if isinstance(model, VulnerabilityModelAcuteBase):
impact, vul, event = model.get_impact_details(asset, hazard_data)
results[ImpactKey(asset=asset, hazard_type=model.hazard_type)] = AssetImpactResult(
impact, vulnerability=vul, event=event, hazard_data=hazard_data
)
elif isinstance(model, VulnerabilityModelBase):
impact = model.get_impact(asset, hazard_data)
results[ImpactKey(asset=asset, hazard_type=model.hazard_type)] = AssetImpactResult(
impact, hazard_data=hazard_data
)
except Exception as e:
logger.exception(e)
assert isinstance(model, VulnerabilityModelBase)
results[ImpactKey(asset=asset, hazard_type=model.hazard_type)] = AssetImpactResult(EmptyImpactDistrib())
return results


Expand Down
16 changes: 14 additions & 2 deletions src/test/data/hazard_model_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import zarr
import zarr.storage
from affine import Affine
from pyproj import Transformer

from physrisk.hazard_models.core_hazards import cmip6_scenario_to_rcp

Expand Down Expand Up @@ -71,14 +72,25 @@ def _add_curves(
)
z.attrs["transform_mat3x3"] = trans
z.attrs["index_values"] = return_periods
z.attrs["crs"] = crs

if crs.lower() != "epsg:4326":
transproj = Transformer.from_crs(
"epsg:4326",
crs,
always_xy=True,
)
x, y = transproj.transform(longitudes, latitudes)
else:
x, y = longitudes, latitudes

transform = Affine(trans[0], trans[1], trans[2], trans[3], trans[4], trans[5])
coords = np.vstack((longitudes, latitudes, np.ones(len(longitudes))))
coords = np.vstack((x, y, np.ones(len(x))))
inv_trans = ~transform
mat = np.array(inv_trans).reshape(3, 3)
frac_image_coords = mat @ coords
image_coords = np.floor(frac_image_coords).astype(int)
for j in range(len(longitudes)):
for j in range(len(x)):
z[:, image_coords[1, j], image_coords[0, j]] = intensities

def _crs_shape_transform(self, width: int, height: int, return_periods: Union[List[float], npt.NDArray] = [0.0]):
Expand Down
47 changes: 31 additions & 16 deletions src/test/data/test_events_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# from pathlib import PurePosixPath
from test.base_test import TestWithCredentials
from test.data.hazard_model_store import mock_hazard_model_store_inundation
from test.data.hazard_model_store import ZarrStoreMocker, mock_hazard_model_store_inundation

# import fsspec.implementations.local as local # type: ignore
import numpy as np
Expand All @@ -16,7 +16,10 @@
from physrisk.api.v1.hazard_data import HazardAvailabilityRequest, HazardResource, Scenario
from physrisk.data.inventory import EmbeddedInventory, Inventory
from physrisk.data.inventory_reader import InventoryReader
from physrisk.data.pregenerated_hazard_model import ZarrHazardModel
from physrisk.data.zarr_reader import ZarrReader
from physrisk.kernel.hazard_model import HazardDataRequest
from physrisk.kernel.hazards import RiverineInundation
from physrisk.requests import _get_hazard_data_availability


Expand All @@ -25,22 +28,10 @@ class TestEventRetrieval(TestWithCredentials):
def test_inventory_change(self):
# check validation passes calling in service-like way
embedded = EmbeddedInventory()

resources1 = embedded.to_resources()
# fs = local.LocalFileSystem()
# base_path = PurePosixPath(__file__).parents[2].joinpath("physrisk", "data", "static")
# reader = InventoryReader(fs=fs, base_path=str(base_path))
# resources2 = inventory.expand(reader.read("hazard"))

# inventory1 = Inventory([r for r in resources1 if "chronic_heat" not in r.path]).json_ordered()
# inventory2 = Inventory([r for r in resources2 if ("jupiter" not in r.group_id and \
# "tas" not in r.id and "chronic_heat" not in r.path)]).json_ordered()
inventory2 = Inventory(resources1).json_ordered()
# with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "inventory1.json"), 'w') as f:
# f.write(inventory1)

with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "inventory2.json"), "w") as f:
f.write(inventory2)
inventory = Inventory(resources1).json_ordered()
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "inventory.json"), "w") as f:
f.write(inventory)

def test_hazard_data_availability_summary(self):
# check validation passes calling in service-like way
Expand Down Expand Up @@ -209,3 +200,27 @@ def test_zarr_geomax(self):

curves_max_candidate, _ = zarr_reader.get_max_curves(set_id, shapes, interpolation="linear")
numpy.testing.assert_allclose(curves_max_candidate, curves_max_expected / 4, rtol=1e-6)

def test_reproject(self):
"""Test adding data in a non-ESPG-4326 coordinate reference system. Check that attribute
end in the correct convertion."""
mocker = ZarrStoreMocker()
lons = [1.1, -0.31]
lats = [47.0, 52.0]
mocker._add_curves(
"test",
lons,
lats,
"epsg:3035",
[3, 39420, 38371],
[100.0, 0.0, 2648100.0, 0.0, -100.0, 5404500],
[10.0, 100.0, 1000.0],
[1.0, 2.0, 3.0],
)

source_paths = {RiverineInundation: lambda indicator_id, scenario, year, hint: "test"}
hazard_model = ZarrHazardModel(source_paths=source_paths, store=mocker.store)
response = hazard_model.get_hazard_events(
[HazardDataRequest(RiverineInundation, lons[0], lats[0], indicator_id="", scenario="", year=2050)]
)
numpy.testing.assert_equal(next(iter(response.values())).intensities, [1.0, 2.0, 3.0])
Loading