# -*- coding: utf-8 -*-
# Copyright 2024, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of eoreader project
# https://github.com/sertit/eoreader
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
RapidEye products.
See
`Product specifications <https://assets.planet.com/docs/Planet_Combined_Imagery_Product_Specs_letter_screen.pdf>`_
and `Planet documentation <https://developers.planet.com/docs/data/rapideye/>`_
for more information.
"""
import logging
from datetime import datetime
from enum import unique
from typing import Union
import numpy as np
import xarray as xr
from sertit import files, path
from sertit.misc import ListEnum
from sertit.types import AnyPathType
from eoreader import DATETIME_FMT, EOREADER_NAME
from eoreader.bands import (
BLUE,
GREEN,
NARROW_NIR,
NIR,
RED,
VRE_1,
BandNames,
SpectralBand,
)
from eoreader.exceptions import InvalidProductError
from eoreader.products.optical.planet_product import PlanetProduct
from eoreader.stac import GSD, ID, NAME, WV_MAX, WV_MIN
LOGGER = logging.getLogger(EOREADER_NAME)
_RE_EAI = {
BLUE: 1997.8,
GREEN: 1863.5,
RED: 1560.4,
VRE_1: 1395.0,
NIR: 1124.4,
NARROW_NIR: 1124.4,
}
"""
From RE `Product specifications <https://assets.planet.com/docs/Planet_Combined_Imagery_Product_Specs_letter_screen.pdf>`_
"""
[docs]
@unique
class ReProductType(ListEnum):
"""
RapidEye product types (processing levels)
See `Product specs <https://assets.planet.com/docs/Planet_Combined_Imagery_Product_Specs_letter_screen.pdf>`_
for more information.
"""
L1B = "RapidEye Basic Scene Product"
"""
Radiometric and sensor corrections applied to the data.
On-board spacecraft attitude and ephemeris applied to the data.
"""
L3A = "RapidEye Ortho Tile Product"
"""
Radiometric and sensor corrections applied to the data.
Imagery is orthorectified using the RPCs and an elevation model.
"""
[docs]
class ReProduct(PlanetProduct):
"""
Class of PlanetScope products.
See `Product specs <https://assets.planet.com/docs/Planet_Combined_Imagery_Product_Specs_letter_screen.pdf>`_
for more information.
The scaling factor to retrieve the calibrated radiance is 0.01.
"""
def _pre_init(self, **kwargs) -> None:
"""
Function used to pre_init the products
(setting needs_extraction and so on)
"""
self.constellation = self._get_constellation()
super()._pre_init(**kwargs)
def _post_init(self, **kwargs) -> None:
"""
Function used to post_init the products
(setting sensor type, band names and so on)
"""
try:
if self.is_archived:
path.get_archived_path(self.path, r".*udm\.tif")
else:
next(self.path.glob("**/*udm.tif"))
self._has_udm = True
except (FileNotFoundError, StopIteration):
# Some RE products don't have udm files
LOGGER.warning(
"UDM mask not found. This product won't be cleaned and won't have any cloud band."
)
pass
self._has_cloud_cover = True
# Post init done by the super class
super()._post_init(**kwargs)
def _set_pixel_size(self) -> None:
"""
Set product default pixel size (in meters)
"""
self.pixel_size = 5.0
def _set_instrument(self) -> None:
"""
Set instrument
See: https://space.oscar.wmo.int/instruments/view/reis
"""
self.instrument = "REIS"
def _map_bands(self):
"""
Map bands
See <Product specs `https://assets.planet.com/docs/Planet_Combined_Imagery_Product_Specs_letter_screen.pdf`>_ for more information.
"""
gsd = 6.5
blue = SpectralBand(
eoreader_name=BLUE,
**{NAME: "Blue", ID: 1, GSD: gsd, WV_MIN: 440, WV_MAX: 510},
)
green = SpectralBand(
eoreader_name=GREEN,
**{NAME: "Green", ID: 2, GSD: gsd, WV_MIN: 520, WV_MAX: 590},
)
red = SpectralBand(
eoreader_name=RED,
**{NAME: "Red", ID: 3, GSD: gsd, WV_MIN: 630, WV_MAX: 685},
)
vre = SpectralBand(
eoreader_name=VRE_1,
**{NAME: "Red Edge", ID: 4, GSD: gsd, WV_MIN: 690, WV_MAX: 730},
)
nir = SpectralBand(
eoreader_name=NIR,
**{NAME: "NIR", ID: 5, GSD: gsd, WV_MIN: 760, WV_MAX: 850},
)
# Set the band map
self.bands.map_bands(
{
BLUE: blue,
GREEN: green,
RED: red,
VRE_1: vre,
NIR: nir,
NARROW_NIR: nir,
}
)
def _set_product_type(self) -> None:
"""Set products type"""
# Get MTD XML file
root, nsmap = self.read_mtd()
# Manage product type
prod_type = root.findtext(f".//{nsmap['eop']}productType")
if not prod_type:
raise InvalidProductError(
"Cannot find the product type in the metadata file"
)
# Set correct product type
self.product_type = getattr(ReProductType, prod_type)
if self.product_type == ReProductType.L1B:
# TODO: implement orthorectification for Planet products
raise NotImplementedError(
f"Basic Scene Product are not managed for {self.constellation.value} products:\n{self.path}"
)
[docs]
def get_datetime(self, as_datetime: bool = False) -> Union[str, datetime]:
"""
Get the product's acquisition datetime, with format :code:`YYYYMMDDTHHMMSS` <-> :code:`%Y%m%dT%H%M%S`
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = r"SENTINEL2A_20190625-105728-756_L2A_T31UEQ_C_V2-2"
>>> prod = Reader().open(path)
>>> prod.get_datetime(as_datetime=True)
datetime.datetime(2019, 6, 25, 10, 57, 28, 756000), fetched from metadata, so we have the ms
>>> prod.get_datetime(as_datetime=False)
'20190625T105728'
Args:
as_datetime (bool): Return the date as a datetime.datetime. If false, returns a string.
Returns:
Union[str, datetime.datetime]: Its acquisition datetime
"""
if self.datetime is None:
# Get MTD XML file
root, nsmap = self.read_mtd()
datetime_str = root.findtext(f".//{nsmap['eop']}acquisitionDate")
if not datetime_str:
raise InvalidProductError(
"Cannot find acquisitionDate in the metadata file."
)
# Convert to datetime
datetime_str = datetime_str.split(".")[
0
] # Too many microseconds, remove them
datetime_str = datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%S")
else:
datetime_str = self.datetime
if not as_datetime:
datetime_str = datetime_str.strftime(DATETIME_FMT)
return datetime_str
def _get_stack_path(self, as_list: bool = False) -> Union[str, list]:
"""
Get Planet stack path(s)
Args:
as_list (bool): Get stack path as a list (useful if several subdatasets are present)
Returns:
Union[str, list]: Stack path(s)
"""
if self._merged:
stack_path, _ = self._get_out_path(f"{self.condensed_name}_analytic.vrt")
if as_list:
stack_path = [stack_path]
else:
stack_path = self._get_path(
self.name, "tif", invalid_lookahead=["_udm", "_browse"], as_list=as_list
)
if not stack_path:
stack_path = self._get_path(self.name, "TIF", as_list=as_list)
return stack_path
def _dn_to_toa_rad(self, dn_arr: xr.DataArray, band: BandNames) -> xr.DataArray:
"""
Compute DN to TOA radiance
See
`Product specs <https://assets.planet.com/docs/Planet_Combined_Imagery_Product_Specs_letter_screen.pdf>`_
for more information.
Args:
dn_arr (xr.DataArray): DN array
band (BandNames): Band
Returns:
xr.DataArray: TOA Radiance array
"""
# Get MTD XML file
root, nsmap = self.read_mtd()
# Open identifier
rad_coef = None
for band_mtd in root.iterfind(
f".//{nsmap[self._nsmap_key]}bandSpecificMetadata"
):
if (
int(band_mtd.findtext(f".//{nsmap[self._nsmap_key]}bandNumber"))
== self.bands[band].id
):
rad_coef = float(
band_mtd.findtext(
f".//{nsmap[self._nsmap_key]}radiometricScaleFactor"
)
)
break
if rad_coef is None:
raise InvalidProductError(
"Couldn't find any radiometricScaleFactor in the product metadata!"
)
# To reflectance
return dn_arr * rad_coef
def _toa_rad_to_toa_refl(
self, rad_arr: xr.DataArray, band: BandNames
) -> xr.DataArray:
"""
Compute TOA reflectance from TOA radiance
See
`Product specs <https://assets.planet.com/docs/Planet_Combined_Imagery_Product_Specs_letter_screen.pdf>`_
for more information.
WARNING: in this formula, d**2 = 1 / sqrt(dt) !
Args:
rad_arr (xr.DataArray): TOA Radiance array
band (BandNames): Band
Returns:
xr.DataArray: TOA Reflectance array
"""
# Compute the coefficient converting TOA radiance in TOA reflectance
dt = self._sun_earth_distance_variation() ** 2
_, sun_zen = self.get_mean_sun_angles()
rad_sun_zen = np.deg2rad(sun_zen)
eai = _RE_EAI[band]
toa_refl_coeff = np.pi / (eai * dt * np.cos(rad_sun_zen))
return rad_arr.copy(data=toa_refl_coeff * rad_arr)
def _to_reflectance(
self,
band_arr: xr.DataArray,
band_path: AnyPathType,
band: BandNames,
**kwargs,
) -> xr.DataArray:
"""
Converts band to reflectance
Args:
band_arr (xr.DataArray): Band array to convert
band_path (AnyPathType): Band path
band (BandNames): Band to read
**kwargs: Other keywords
Returns:
xr.DataArray: Band in reflectance
"""
# Convert DN into radiance
band_arr = self._dn_to_toa_rad(band_arr, band)
# Convert radiance into reflectance
band_arr = self._toa_rad_to_toa_refl(band_arr, band)
# To float32
if band_arr.dtype != np.float32:
band_arr = band_arr.astype(np.float32)
return band_arr
[docs]
def get_quicklook_path(self) -> str:
"""
Get quicklook path if existing
Returns:
str: Quicklook path
"""
quicklook_path = None
try:
if self.is_archived:
quicklook_path = path.get_archived_rio_path(
self.path, file_regex=r".*_browse\.tif"
)
else:
quicklook_path = str(next(self.path.glob("**/*_browse.tif")))
except (StopIteration, FileNotFoundError):
pass
return quicklook_path
def _merge_subdatasets_mtd(self):
"""
Merge subdataset, when several Planet products avec been ordered together
Will create a reflectance (if possible) VRT, a UDM/UDM2 VRT and a merged metadata XML file.
"""
LOGGER.warning(
"_merge_subdatasets_mtd is not yet implemented (because of lack of tiled RapideEye product), only copying the first metadata file!"
)
mtd_file, mtd_exists = self._get_out_path(
f"{self.condensed_name}_metadata.json"
)
if not mtd_exists:
files.copy(self._get_path("metadata", "xml"), mtd_file)