Source code for eoreader.products.optical.spot45_product

# -*- coding: utf-8 -*-
# Copyright 2024, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of eoreader project
#     https://github.com/sertit/eoreader
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SPOT-4/5 products.
See `here <http://www.engesat.com.br/wp-content/uploads/S5-ST-73-1-CN_2_9-Spec-Format-Produits-SPOT.pdf>`_
for more information.
"""
import logging
from datetime import timedelta
from enum import unique

import numpy as np
import xarray as xr
from lxml import etree
from rasterio import crs as riocrs
from sertit import path
from sertit.misc import ListEnum
from sertit.types import AnyPathType

from eoreader import DATETIME_FMT, EOREADER_NAME, cache
from eoreader.bands import (
    GREEN,
    NARROW_NIR,
    NIR,
    PAN,
    RED,
    SWIR_1,
    BandNames,
    SpectralBand,
)
from eoreader.exceptions import InvalidProductError
from eoreader.products import DimapV1Product
from eoreader.products.optical.optical_product import RawUnits
from eoreader.reader import Constellation
from eoreader.stac import GSD, ID, NAME, WV_MAX, WV_MIN

LOGGER = logging.getLogger(EOREADER_NAME)

_DIMAP_BAND_MTD = {
    PAN: "PAN",
    GREEN: "XS1",
    RED: "XS2",
    NIR: "XS3",
    NARROW_NIR: "XS3",
    SWIR_1: "XS4",
}

# https://spot.cnes.fr/sites/default/files/migration/smsc/spot/calibration_synthesis_SPOT1245_ed1.pdf (3.3)
# HRVIR 1, 2
_SPOT4_E0 = {
    PAN: [1570.2, 1589],
    GREEN: [1842.9, 1850.9],
    RED: [1570.2, 1589],
    NIR: [1052.1, 1054.8],
    NARROW_NIR: [1052.1, 1054.8],
    SWIR_1: [235.84, 241.93],
}

# HRG 1, 2
_SPOT5_E0 = {
    PAN: [1764.2, 1775],
    GREEN: [1859.8, 1859.8],
    RED: [1575.3, 1577.6],
    NIR: [1043.9, 1048.2],
    NARROW_NIR: [1043.9, 1048.2],
    SWIR_1: [238.87, 237.78],
}
"""
The values of the normalized solar irradiance have been computed using WMO (World Meteorogical Organization) spectral solar irradiance.
"""


[docs]@unique class Spot4BandCombination(ListEnum): """ Band combination for SPOT4 data See `this <https://www.intelligence-airbusds.com/files/pmedia/public/r451_9_resolutionspectralmodes_uk_sept2010.pdf>`_ for more information. """ M = "M" """ "M" for Spot 4 PAN product (10 m) """ X = "X" """ "X" for Spot 4 multispectral product (3 bands, without SWIR) (20 m) """ I = "I" # noqa """ "I" for Spot 4 multispectral product (4 bands, with SWIR) (20 m) """ MX = "M+X" """ "M+X" for Spot 4 merge product (3 bands, without SWIR) (10 m) """ MI = "M+I" """ "M+I" for Spot 4 merge product (4 bands, with SWIR) (10 m) """
[docs]@unique class Spot5BandCombination(ListEnum): """ Band combination for SPOT4/5 data See `this <https://www.intelligence-airbusds.com/files/pmedia/public/r451_9_resolutionspectralmodes_uk_sept2010.pdf>`_ for more information. """ T = "T" """ "T" for supermode Spot data (2.5 m) """ HM = "HM" """ "HM" for Spot-5 PAN data (5 m) """ X = "X" """ "X" for 3 bands extracted from 4 bands (10 m) """ J = "J" """ "J" for Spot 5 multipectral product (4 bands, with SWIR) (10 m) """ HMX = "HM+X" """ "HM+X" for Spot 5 merge product (3 bands, without SWIR) (5 m) """ TX = "T+X" """ "T+X" for Spot 5 supermode and multipectral merge product (3 bands, without SWIR) (2.5 m) """
[docs]@unique class Spot45ProductType(ListEnum): """ Product Type for SPOT4/5 data See `here <http://www.engesat.com.br/wp-content/uploads/S5-ST-73-1-CN_2_9-Spec-Format-Produits-SPOT.pdf>`_ for more information. """ L0 = "0" """ Level-0 """ L1A = "1A" """ Level-1A """ L1B = "1B" """ Level-1B """ L2A = "2A" """ Level-2A """
[docs]class Spot45Product(DimapV1Product): """ Class of SPOT4/5 products. See `here <http://www.engesat.com.br/wp-content/uploads/S5-ST-73-1-CN_2_9-Spec-Format-Produits-SPOT.pdf>`_ for more information. """ def _pre_init(self, **kwargs) -> None: """ Function used to pre_init the products (setting needs_extraction and so on) """ root, _ = self.read_mtd() mission_idx = int(root.findtext(".//MISSION_INDEX")) if mission_idx == 4: self._supermode_res = None self._pan_res = 10.0 self._ms_res = 20.0 self.constellation = Constellation.SPOT4 elif mission_idx == 5: self._supermode_res = 2.5 self._pan_res = 5.0 self._ms_res = 10.0 self.constellation = Constellation.SPOT5 else: raise InvalidProductError("Mission index should be 4 or 5.") self.needs_extraction = False self._use_filename = True self._proj_prod_type = [Spot45ProductType.L0] # Raw units rad_proc = root.findtext(".//RADIOMETRIC_PROCESSING").upper() if rad_proc == "REFLECTANCE": self._raw_units = RawUnits.REFL elif rad_proc in "BASIC": self._raw_units = RawUnits.DN else: self._raw_units = RawUnits.NONE # Pre init done by the super class super()._pre_init(**kwargs) def _set_band_combi(self) -> None: """ Set Band combination """ root, _ = self.read_mtd() band_combi = root.findtext(".//SPECTRAL_PROCESSING") if self.constellation == Constellation.SPOT4: self.band_combi = Spot4BandCombination.from_value(band_combi) else: self.band_combi = Spot5BandCombination.from_value(band_combi) def _post_init(self, **kwargs) -> None: """ Function used to post_init the products (setting sensor type, band names and so on) """ if self.band_combi is None: self._set_band_combi() # Post init done by the super class super()._post_init(**kwargs) def _set_pixel_size(self) -> None: """ Set product default pixel size (in meters) """ # Not Pansharpened images if self.band_combi in [ Spot4BandCombination.X, Spot4BandCombination.I, Spot5BandCombination.X, Spot5BandCombination.J, ]: self.pixel_size = self._ms_res # Pansharpened images elif self.band_combi in [ Spot4BandCombination.M, Spot4BandCombination.MX, Spot4BandCombination.MI, Spot5BandCombination.HM, Spot5BandCombination.HMX, ]: self.pixel_size = self._pan_res # Supermode images else: self.pixel_size = self._supermode_res def _set_instrument(self) -> None: """ Set instrument SPOT-4: https://space.oscar.wmo.int/instruments/view/hrvir SPOT-5: https://space.oscar.wmo.int/instruments/view/hrg """ if self.constellation == Constellation.SPOT4: self.instrument = "HRVIR" else: self.instrument = "HRG" def _set_product_type(self) -> None: """ Set products type See Vision-1_web_201906.pdf for more information. """ if self.product_type is None: # Get MTD XML file root, _ = self.read_mtd() proc_lvl = root.findtext(".//SCENE_PROCESSING_LEVEL") self.product_type = Spot45ProductType.from_value(proc_lvl) # Manage not orthorectified product if self.product_type == Spot45ProductType.L0: self.is_ortho = False def _map_bands(self) -> None: """ Map bands """ if self.constellation == Constellation.SPOT4: # Create spectral bands pan = SpectralBand( eoreader_name=PAN, **{NAME: "PAN", ID: 1, GSD: self._pan_res, WV_MIN: 610, WV_MAX: 680}, ) green = SpectralBand( eoreader_name=GREEN, **{NAME: "GREEN", ID: 3, GSD: self._ms_res, WV_MIN: 500, WV_MAX: 590}, ) red = SpectralBand( eoreader_name=RED, **{NAME: "RED", ID: 2, GSD: self._ms_res, WV_MIN: 610, WV_MAX: 680}, ) nir = SpectralBand( eoreader_name=NIR, **{NAME: "NIR", ID: 1, GSD: self._ms_res, WV_MIN: 790, WV_MAX: 890}, ) swir1 = SpectralBand( eoreader_name=SWIR_1, **{ NAME: "SWIR_1", ID: 4, GSD: self._ms_res, WV_MIN: 1580, WV_MAX: 1750, }, ) else: # Create spectral bands pan = SpectralBand( eoreader_name=PAN, **{NAME: "PAN", ID: 1, GSD: self._pan_res, WV_MIN: 490, WV_MAX: 690}, ) green = SpectralBand( eoreader_name=GREEN, **{NAME: "GREEN", ID: 3, GSD: self._ms_res, WV_MIN: 490, WV_MAX: 610}, ) red = SpectralBand( eoreader_name=RED, **{NAME: "RED", ID: 2, GSD: self._ms_res, WV_MIN: 610, WV_MAX: 680}, ) nir = SpectralBand( eoreader_name=NIR, **{NAME: "NIR", ID: 1, GSD: self._ms_res, WV_MIN: 780, WV_MAX: 890}, ) swir1 = SpectralBand( eoreader_name=SWIR_1, **{NAME: "SWIR_1", ID: 4, GSD: 20.0, WV_MIN: 1580, WV_MAX: 1750}, ) # Manage bands of the product if self.band_combi in [Spot4BandCombination.M, Spot5BandCombination.HM]: self.bands.map_bands({PAN: pan}) elif self.band_combi in [ Spot4BandCombination.X, Spot5BandCombination.X, ]: self.bands.map_bands( { GREEN: green, RED: red, NIR: nir, NARROW_NIR: nir, } ) elif self.band_combi in [ Spot4BandCombination.MX, Spot5BandCombination.HMX, ]: self.bands.map_bands( { GREEN: green.update(gsd=self._pan_res), RED: red.update(gsd=self._pan_res), NIR: nir.update(gsd=self._pan_res), NARROW_NIR: nir.update(gsd=self._pan_res), } ) elif self.band_combi == Spot5BandCombination.T: self.bands.map_bands({PAN: pan.update(gsd=self._supermode_res)}) elif self.band_combi == Spot5BandCombination.TX: self.bands.map_bands( { GREEN: green.update(gsd=self._supermode_res), RED: red.update(gsd=self._supermode_res), NIR: nir.update(gsd=self._supermode_res), NARROW_NIR: nir.update(gsd=self._supermode_res), } ) elif self.band_combi in [ Spot4BandCombination.I, Spot5BandCombination.J, ]: self.bands.map_bands( { GREEN: green, RED: red, NIR: nir, NARROW_NIR: nir, SWIR_1: swir1, } ) elif self.band_combi == Spot4BandCombination.MI: self.bands.map_bands( { GREEN: green.update(gsd=self._pan_res), RED: red.update(gsd=self._pan_res), NIR: nir.update(gsd=self._pan_res), NARROW_NIR: nir.update(gsd=self._pan_res), SWIR_1: swir1.update(gsd=self._pan_res), } ) else: raise InvalidProductError( f"Unusual band combination: {self.band_combi.name}" ) def _get_raw_crs(self) -> riocrs.CRS: """ Get raw CRS of the tile Returns: rasterio.crs.CRS: CRS object """ # Open metadata root, _ = self.read_mtd() # Get CRS crs_name = root.findtext(".//HORIZONTAL_CS_CODE") if not crs_name: raise InvalidProductError( "Cannot find the CRS name (from GEOGRAPHIC_CS_CODE or HORIZONTAL_CS_CODE) type in the metadata file" ) return riocrs.CRS.from_string(crs_name) def _get_constellation(self) -> Constellation: """Getter of the constellation""" if self.split_name[0] == "SP04": const = Constellation.SPOT4 elif self.split_name[0] == "SP05": const = Constellation.SPOT5 else: raise InvalidProductError( f"Invalid name: {self.name}, should start with SP04 or SP05." ) return const def _get_name_constellation_specific(self) -> str: """ Set product real name from metadata Returns: str: True name of the product (from metadata) """ # Get MTD XML file root, _ = self.read_mtd() # Mission index mission_idx = int(root.findtext(".//MISSION_INDEX")) # Instrument instrument = "HIR" if self.constellation == Constellation.SPOT4 else "HRG" # Product type self._set_band_combi() # Not set yet band_combi = f"{self.band_combi.name:_<4}" # Datetimes dt = self.get_datetime(as_datetime=True) start_dt = (dt - timedelta(seconds=4)).strftime(DATETIME_FMT) end_dt = (dt + timedelta(seconds=4)).strftime(DATETIME_FMT) # Unknown data digit = 3 # TODO: what's this ? suffix = "TOU_1234_eord" # Create name name = f"SP0{mission_idx}_{instrument}_{band_combi}_{digit}_{start_dt}_{end_dt}_{suffix}" return name
[docs] @cache def get_mean_viewing_angles(self) -> (float, float, float): """ Get Mean Viewing angles (azimuth, off-nadir and incidence angles) .. code-block:: python >>> from eoreader.reader import Reader >>> path = r"S2A_MSIL1C_20200824T110631_N0209_R137_T30TTK_20200824T150432.SAFE.zip" >>> prod = Reader().open(path) >>> prod.get_mean_viewing_angles() Returns: (float, float, float): Mean azimuth, off-nadir and incidence angles """ # Get MTD XML file root, _ = self.read_mtd() # Open incidence and off-nadir angle az = None try: incidence_angle = abs(float(root.findtext(".//INCIDENCE_ANGLE"))) except TypeError: raise InvalidProductError( "INCIDENCE_ANGLE or VIEWING_ANGLE not found in metadata!" ) if self.constellation == Constellation.SPOT5: try: off_nadir = abs(float(root.findtext(".//VIEWING_ANGLE"))) except TypeError: raise InvalidProductError("VIEWING_ANGLE not found in metadata!") else: # See: https://earth.esa.int/eogateway/missions/spot-4 orbit_height = 832000 earth_radius = 6378137 orbit_coeff = (earth_radius + orbit_height) / earth_radius off_nadir = np.rad2deg( np.arcsin(np.sin(np.deg2rad(90 - incidence_angle)) / orbit_coeff) ) return az, off_nadir, incidence_angle
def _to_reflectance( self, band_arr: xr.DataArray, band_path: AnyPathType, band: BandNames, **kwargs, ) -> xr.DataArray: """ Converts band to reflectance Args: band_arr (xr.DataArray): Band array to convert band_path (AnyPathType): Band path band (BandNames): Band to read **kwargs: Other keywords Returns: xr.DataArray: Band in reflectance """ if self._raw_units == RawUnits.REFL: # Compute the correct radiometry of the band original_dtype = band_arr.encoding["dtype"] if original_dtype == "uint16": band_arr /= 10000.0 elif self._raw_units == RawUnits.DN: # Convert DN into radiance band_arr = self._dn_to_toa_rad(band_arr, band) # Get the solar irradiance value of raw radiometric Band (in watt/m2/micron) root, _ = self.read_mtd() instrument_idx = int(root.findtext(".//INSTRUMENT_INDEX")) - 1 if self.constellation == Constellation.SPOT4: e0 = _SPOT4_E0[band][instrument_idx] else: e0 = _SPOT5_E0[band][instrument_idx] # Convert radiance into reflectance band_arr = self._toa_rad_to_toa_refl(band_arr, band, e0=e0) else: LOGGER.warning( "The spectral properties of a SEAMLESS radiometric processed image " "cannot be retrieved since the initial images have undergone " "several radiometric adjustments for aesthetic rendering." "Returned as is." ) # To float32 if band_arr.dtype != np.float32: band_arr = band_arr.astype(np.float32) return band_arr @cache def _read_mtd(self) -> (etree._Element, dict): """ Read metadata and outputs the metadata XML root and its namespaces as a dict Returns: (etree._Element, dict): Metadata XML root and its namespaces as a dict """ mtd_from_path = "METADATA.DIM" mtd_archived = r"METADATA\.DIM" return self._read_mtd_xml(mtd_from_path, mtd_archived) def _get_tile_path(self) -> AnyPathType: """ Get the DIMAP filepath Returns: AnyPathType: DIMAP filepath """ return self._get_path("IMAGERY", "TIF") def _dn_to_toa_rad(self, dn_arr: xr.DataArray, band: BandNames) -> xr.DataArray: """ Compute DN to TOA radiance Args: dn_arr (xr.DataArray): DN array band (BandNames): Band Returns: xr.DataArray: TOA Radiance array """ band_mtd_str = _DIMAP_BAND_MTD[band] # Get MTD XML file root, _ = self.read_mtd() # Convert DN to TOA radiance # <MEASURE_DESC>Raw radiometric counts (DN) to TOA Radiance (L). Formulae L=DN/GAIN+BIAS</MEASURE_DESC> try: rad_gain = None rad_bias = None for br in root.iterfind(".//Spectral_Band_Info"): if br.findtext("BAND_DESCRIPTION") == band_mtd_str: rad_gain = float(br.findtext("PHYSICAL_GAIN")) rad_bias = float(br.findtext("PHYSICAL_BIAS")) break if rad_gain is None or rad_bias is None: raise TypeError except TypeError: raise InvalidProductError( "PHYSICAL_GAIN and PHYSICAL_BIAS from Spectral_Band_Info not found in metadata!" ) return dn_arr / rad_gain + rad_bias
[docs] def get_quicklook_path(self) -> str: """ Get quicklook path if existing. Returns: str: Quicklook path """ quicklook_path = None try: if self.is_archived: quicklook_path = self.path / path.get_archived_path( self.path, file_regex=r".*PREVIEW\.JPG" ) else: quicklook_path = str(next(self.path.glob("*PREVIEW.JPG"))) except (StopIteration, FileNotFoundError): LOGGER.warning(f"No quicklook found in {self.condensed_name}") return quicklook_path
def _get_job_id(self) -> str: """ Get VHR job ID Returns: str: VHR product ID """ # Get MTD XML file root, _ = self.read_mtd() return root.findtext(".//JOB_ID") def _get_condensed_name(self) -> str: """ Get PlanetScope products condensed name ({date}_PLD_{product_type}_{band_combi}). Returns: str: Condensed name """ return f"{self.get_datetime()}_{self.constellation.name}_{self.product_type.name}_{self.band_combi.name}"