# -*- coding: utf-8 -*-
# Copyright 2024, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of eoreader project
# https://github.com/sertit/eoreader
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Sentinel-3 SLSTR products
.. WARNING:
Not georeferenced NetCDF files are badly opened by GDAL and therefore by rasterio !
-> use xr.open_dataset that manages that correctly
"""
import logging
from collections import namedtuple
from enum import unique
from functools import reduce
from typing import Union
import geopandas as gpd
import numpy as np
import xarray as xr
from rasterio import features
from rasterio.enums import Resampling
from sertit import files, path, rasters, types
from sertit.misc import ListEnum
from sertit.types import AnyPathStrType, AnyPathType
from eoreader import EOREADER_NAME, cache, utils
from eoreader.bands import (
ALL_CLOUDS,
CIRRUS,
CLOUDS,
F1,
F2,
GREEN,
NARROW_NIR,
NIR,
RAW_CLOUDS,
RED,
S7,
SWIR_1,
SWIR_2,
SWIR_CIRRUS,
TIR_1,
TIR_2,
BandNames,
SpectralBand,
to_str,
)
from eoreader.exceptions import InvalidTypeError
from eoreader.keywords import (
CLEAN_OPTICAL,
SLSTR_RAD_ADJUST,
SLSTR_STRIPE,
SLSTR_VIEW,
TO_REFLECTANCE,
)
from eoreader.products import S3DataType, S3Product, S3ProductType
from eoreader.products.optical.optical_product import DEF_CLEAN_METHOD, CleanMethod
from eoreader.stac import ASSET_ROLE, BT, CENTER_WV, DESCRIPTION, FWHM, GSD, ID, NAME
LOGGER = logging.getLogger(EOREADER_NAME)
# FROM SNAP (only for radiance bands, not for brightness temperatures)
# https://github.com/senbox-org/s3tbx/blob/197c9a471002eb2ec1fbd54e9a31bfc963446645/s3tbx-rad2refl/src/main/java/org/esa/s3tbx/processor/rad2refl/Rad2ReflConstants.java#L141
# Not used for now
SLSTR_SOLAR_FLUXES_DEFAULT = {
GREEN: 1837.39,
RED: 1525.94,
NIR: 956.17,
NARROW_NIR: 956.17,
SWIR_CIRRUS: 365.90,
SWIR_1: 248.33,
SWIR_2: 78.33,
}
# Link band names to their stripe
SLSTR_A_BANDS = ["S1", "S2", "S3"]
SLSTR_ABC_BANDS = ["S4", "S5", "S6"]
SLSTR_F_BANDS = ["F1"]
SLSTR_I_BANDS = ["S7", "S8", "S9", "F1", "F2"]
# Link band names to their physical quantity (radiance vs brightness temperature)
SLSTR_RAD_BANDS = SLSTR_A_BANDS + SLSTR_ABC_BANDS
SLSTR_BT_BANDS = SLSTR_I_BANDS
# Radiance adjustment
# Nadir and Oblique
FIELDS = [f"{rad}_n" for rad in SLSTR_A_BANDS + SLSTR_ABC_BANDS] + [
f"{rad}_o" for rad in SLSTR_A_BANDS + SLSTR_ABC_BANDS
]
SlstrRadAdjustTuple = namedtuple(
"SlstrRadAdjustTuple", FIELDS, defaults=(1.0,) * len(FIELDS)
)
[docs]
@unique
class SlstrView(ListEnum):
"""
Sentinel-3 SLSTR views: nadir view (n) and oblique view (o)
Used in the context:
- "an" and "ao" refer to the 500 m grid, stripe A, respectively for nadir view (n) and oblique view (o)
- "bn" and "bo" refer to the 500 m grid, stripe B
- "cn" and "co" refer to the 500 m grid, stripe C
- "in" and "io" refer to the 1 km grid
- "fn" and "fo" refer to the F1 channel 1 km grid
"""
NADIR = "n"
"""Nadir view (n)"""
OBLIQUE = "o"
"""Oblique view (o)"""
[docs]
@unique
class SlstrStripe(ListEnum):
"""
Sentinel-3 SLSTR stripes for 500m data: A and B
Used in the context:
- "an" and "ao" refer to the 500 m grid, stripe A, respectively for nadir view (n) and oblique view (o)
- "bn" and "bo" refer to the 500 m grid, stripe B
- "cn" and "co" refer to the 500 m grid, TDI
- "in" and "io" refer to the 1 km grid
- "fn" and "fo" refer to the F1 channel 1 km grid
"""
A = "a"
"""Stripe A (a)"""
B = "b"
"""Stripe B (b)"""
TDI = "c"
"""TDI (c)"""
I = "i" # noqa
"""Not really a stripe, but refers to the 1 km grid"""
F = "f"
"""Not really a stripe, but refers to the F1 channel 1 km grid"""
[docs]
class SlstrRadAdjust(ListEnum):
"""
SLSTR Radiance Adjustment dictionaries.
Sentinel-3 SLSTR radiometry is not nominal, therefore a first-order radiometric correction is provided.
"""
SNAP = SlstrRadAdjustTuple(
# Nadir
S5_n=1.12,
S6_n=1.13,
# Oblique
S5_o=1.15,
S6_o=1.14,
)
"""
SNAP Radiometric adjustment used in S3MPC Adjustment (optional in SNAP). Coefficients can be seen
[here](https://github.com/senbox-org/s3tbx/blob/b10514e399f7a8a436002d2bacdb0c62be72f8f8/s3tbx-sentinel3-reader/src/main/java/org/esa/s3tbx/dataio/s3/slstr/SlstrLevel1ProductFactory.java#L72-L75)
"""
S3_PN_SLSTR_L1_06 = SlstrRadAdjustTuple(
# Nadir
S5_n=1.12,
S6_n=1.15,
# Oblique
S5_o=1.20,
S6_o=1.26,
)
"""
Coefficients given in the
[Sentinel-3 Product Notice 06](https://www-cdn.eumetsat.int/files/2020-04/pdf_s3a_pn_slstr_l1_06.pdf),
edited the 07/11/2018 and reviewed the 19/11/2018
"""
S3_PN_SLSTR_L1_07 = S3_PN_SLSTR_L1_06
"""
Coefficients given in the
[Sentinel-3 Product Notice 07](https://www-cdn.eumetsat.int/files/2020-06/pdf_s3a_pn_slstr_l1_07_1.1.pdf),
edited the 15/01/2020 and reviewed the 09/06/2020, same as the Product Notice 06.
"""
S3_PN_SLSTR_L1_08 = SlstrRadAdjustTuple(
# Nadir
S1_n=0.97,
S2_n=0.98,
S3_n=0.98,
S5_n=1.11,
S6_n=1.13,
# Oblique
S1_o=0.94,
S2_o=0.95,
S3_o=0.95,
S5_o=1.04,
S6_o=1.07,
)
"""
Coefficients given in the
[Sentinel-3 Product Notice 08](https://www-cdn.eumetsat.int/files/2021-05/S3.PN-SLSTR-L1.08%20-%20i1r0%20-%20SLSTR%20L1%20PB%202.75-A%20and%201.53-B.pdf),
edited the 18/05/2021.
The default one.
"""
NONE = SlstrRadAdjustTuple()
"""
Coefficients set to one.
"""
[docs]
class S3SlstrProduct(S3Product):
"""
Class of Sentinel-3 SLSTR Products
"""
[docs]
def __init__(
self,
product_path: AnyPathStrType,
archive_path: AnyPathStrType = None,
output_path: AnyPathStrType = None,
remove_tmp: bool = False,
**kwargs,
) -> None:
"""
https://sentinels.copernicus.eu/web/sentinel/technical-guides/sentinel-3-slstr/level-1/observation-mode-desc
Note that the name of each netCDF file provides information about its content.
The suffix of each filename is associated with the selected grid:
- "an" and "ao" refer to the 500 m grid, stripe A, respectively for nadir view (n) and oblique view (o)
- "bn" and "bo" refer to the 500 m grid, stripe B
- "in" and "io" refer to the 1 km grid
- "fn" and "fo" refer to the F1 channel 1 km grid
- "tx/n/o" refer to the tie-point grid for agnostic/nadir and oblique view
"""
self._flags_file = None
self._cloud_name = None
self._exception_name = None
# Default stripe (A) and view (NADIR)
self._stripe = SlstrStripe.A
self._view = SlstrView.NADIR
self._rad_adjust = SlstrRadAdjust.S3_PN_SLSTR_L1_08
# Brightness temperature
self._bt_file = "{band}_BT_{suffix}.nc"
self._bt_subds = "{band}_BT_{suffix}"
super().__init__(
product_path, archive_path, output_path, remove_tmp, **kwargs
) # Order is important here, gcps NEED to be after this
self._F1_is_f = True
try:
self._get_raw_band_path(F1)
except (FileNotFoundError, StopIteration):
self._F1_is_f = False
def _get_preprocessed_band_path(
self,
filename: str,
suffix: str,
pixel_size: Union[float, tuple, list] = None,
writable: bool = True,
) -> AnyPathType:
"""
Create the pre-processed band path
Args:
filename (str): Filename
pixel_size (Union[float, tuple, list]): Pixel size of the wanted UTM band
writable (bool): Do we need to write the pre-processed band ?
Returns:
AnyPathType: Pre-processed band path
"""
res_str = self._pixel_size_to_str(pixel_size)
if filename.endswith(suffix):
pp_name = f"{self.condensed_name}_{filename}_{res_str}.tif"
else:
pp_name = f"{self.condensed_name}_{filename}_{suffix}_{res_str}.tif"
return self._get_band_folder(writable=writable).joinpath(pp_name)
def _set_preprocess_members(self):
"""Set pre-process members"""
# Radiance bands
self._radiance_file = "{band}_radiance_{suffix}.nc"
self._radiance_subds = "{band}_radiance_{suffix}"
# Geocoding
self._geo_file = "geodetic_{suffix}.nc"
self._lat_nc_name = "latitude_{suffix}"
self._lon_nc_name = "longitude_{suffix}"
self._alt_nc_name = "elevation_{suffix}"
# Tie geocoding
self._tie_geo_file = "geodetic_tx.nc"
self._tie_lat_nc_name = "latitude_tx"
self._tie_lon_nc_name = "longitude_tx"
# Mean Sun angles
self._geom_file = "geometry_t{view}.nc"
self._saa_name = "solar_azimuth_t{view}"
self._sza_name = "solar_zenith_t{view}"
# Rad 2 Refl
self._misc_file = "{band}_quality_{suffix}.nc"
self._solar_flux_name = "{band}_solar_irradiance_{suffix}"
# Clouds
self._flags_file = "flags_{suffix}.nc"
self._cloud_name = "cloud_{suffix}"
# Other
self._exception_name = "{band}_exception_{suffix}"
def _set_pixel_size(self) -> None:
"""
Set product default pixel size (in meters)
"""
self.pixel_size = 500.0
def _set_product_type(self) -> None:
"""Set products type"""
# Product type
if self.name[7] != "1":
raise InvalidTypeError("Only L1 products are used for Sentinel-3 data.")
self.product_type = S3ProductType.SLSTR_RBT
self._data_type = S3DataType.RBT
def _map_bands(self) -> None:
"""
Map bands
"""
# Bands
bt_res = 1000.0
slstr_bands = {
GREEN: SpectralBand(
eoreader_name=GREEN,
**{
NAME: SLSTR_A_BANDS[0],
ID: SLSTR_A_BANDS[0],
GSD: self.pixel_size,
CENTER_WV: 554.27,
FWHM: 19.26,
DESCRIPTION: "Cloud screening, vegetation monitoring, aerosol",
},
),
RED: SpectralBand(
eoreader_name=RED,
**{
NAME: SLSTR_A_BANDS[1],
ID: SLSTR_A_BANDS[1],
GSD: self.pixel_size,
CENTER_WV: 659.47,
FWHM: 19.25,
DESCRIPTION: "NDVI, vegetation monitoring, aerosol",
},
),
NIR: SpectralBand(
eoreader_name=NIR,
**{
NAME: SLSTR_A_BANDS[2],
ID: SLSTR_A_BANDS[2],
GSD: self.pixel_size,
CENTER_WV: 868.00,
FWHM: 20.60,
DESCRIPTION: "NDVI, cloud flagging, pixel co-registration",
},
),
NARROW_NIR: SpectralBand(
eoreader_name=NARROW_NIR,
**{
NAME: SLSTR_A_BANDS[2],
ID: SLSTR_A_BANDS[2],
GSD: self.pixel_size,
CENTER_WV: 868.00,
FWHM: 20.60,
DESCRIPTION: "NDVI, cloud flagging, pixel co-registration",
},
),
SWIR_CIRRUS: SpectralBand(
eoreader_name=SWIR_CIRRUS,
**{
NAME: SLSTR_ABC_BANDS[0],
ID: SLSTR_ABC_BANDS[0],
GSD: self.pixel_size,
CENTER_WV: 1374.80,
FWHM: 20.80,
DESCRIPTION: "Cirrus detection over land",
},
),
SWIR_1: SpectralBand(
eoreader_name=SWIR_1,
**{
NAME: SLSTR_ABC_BANDS[1],
ID: SLSTR_ABC_BANDS[1],
GSD: self.pixel_size,
CENTER_WV: 1613.40,
FWHM: 60.68,
DESCRIPTION: "Cloud clearing, ice, snow, vegetation monitoring",
},
),
SWIR_2: SpectralBand(
eoreader_name=SWIR_2,
**{
NAME: SLSTR_ABC_BANDS[2],
ID: SLSTR_ABC_BANDS[2],
GSD: bt_res,
CENTER_WV: 2255.70,
FWHM: 50.15,
DESCRIPTION: "Vegetation state and cloud clearing",
},
),
S7: SpectralBand(
eoreader_name=S7,
**{
NAME: SLSTR_I_BANDS[0],
ID: SLSTR_I_BANDS[0],
GSD: bt_res,
CENTER_WV: 3742.00,
FWHM: 398.00,
DESCRIPTION: "SST, LST, Active fire, brightness temperature, 1km",
ASSET_ROLE: BT,
},
),
TIR_1: SpectralBand(
eoreader_name=TIR_1,
**{
NAME: SLSTR_I_BANDS[1],
ID: SLSTR_I_BANDS[1],
GSD: bt_res,
CENTER_WV: 10854.00,
FWHM: 776.00,
DESCRIPTION: "SST, LST, Active fire, brightness temperature, 1km",
ASSET_ROLE: BT,
},
),
TIR_2: SpectralBand(
eoreader_name=TIR_2,
**{
NAME: SLSTR_I_BANDS[2],
ID: SLSTR_I_BANDS[2],
GSD: bt_res,
CENTER_WV: 12022.50,
FWHM: 905.00,
DESCRIPTION: "SST, LST, brightness temperature, 1km",
ASSET_ROLE: BT,
},
),
F1: SpectralBand(
eoreader_name=F1,
**{
NAME: SLSTR_I_BANDS[3],
ID: SLSTR_I_BANDS[3],
GSD: bt_res,
CENTER_WV: 3742.00,
FWHM: 398.00,
DESCRIPTION: "Active fire, brightness temperature, 1km",
ASSET_ROLE: BT,
},
),
F2: SpectralBand(
eoreader_name=F2,
**{
NAME: SLSTR_I_BANDS[4],
ID: SLSTR_I_BANDS[4],
GSD: bt_res,
CENTER_WV: 10854.00,
FWHM: 776.00,
DESCRIPTION: "Active fire, brightness temperature, 1km",
ASSET_ROLE: BT,
},
),
}
# Bands
self.bands.map_bands(slstr_bands)
[docs]
def get_raw_band_paths(self, **kwargs) -> dict:
"""
Return the raw band paths.
Args:
kwargs: Additional arguments
Returns:
dict: Dictionary containing the path of each queried band
"""
raw_band_paths = {}
for band in self.get_existing_bands():
raw_band_paths[band] = self._get_raw_band_path(band, **kwargs)
return raw_band_paths
def _get_raw_band_path(self, band: BandNames, **kwargs) -> AnyPathType:
"""
Return the raw band path.
Args:
band (BandNames): Wanted band
kwargs: Additional arguments
Returns:
AnyPathType: Raw path of queried band
"""
band_id = self.bands[band].id
# Get this band's suffix
suffix = kwargs.get("suffix", self._get_suffix(band, **kwargs))
# Get band filename and subdataset
if band_id in SLSTR_RAD_BANDS:
filename = self._replace(self._radiance_file, band=band, suffix=suffix)
elif band_id in SLSTR_BT_BANDS:
filename = self._replace(self._bt_file, band=band, suffix=suffix)
else:
filename = band
if self.is_archived:
raw_path = path.get_archived_path(self.path, f".*{filename}*")
else:
try:
raw_path = next(self.path.glob(f"*{filename}*"))
except StopIteration:
raise FileNotFoundError(f"Non existing file {filename} in {self.path}")
return raw_path
def _preprocess(
self,
band: Union[BandNames, str],
pixel_size: float = None,
to_reflectance: bool = True,
subdataset: str = None,
**kwargs,
) -> AnyPathType:
"""
Pre-process S3 SLSTR bands:
- Geocode
- Adjust radiance
- Convert radiance to reflectance
Args:
band (Union[BandNames, str]): Band to preprocess (quality flags or others are accepted)
pixel_size (float): Pixl size
to_reflectance (bool): Convert band to reflectance
subdataset (str): Subdataset
kwargs: Other arguments used to load bands
Returns:
dict: Dictionary containing {band: path}
"""
if isinstance(band, BandNames):
band_id = self.bands[band].id
band_str = band.name
else:
band_id = band
band_str = band
# Get this band's suffix
suffix = kwargs.get("suffix", self._get_suffix(band, **kwargs))
# Get band filename and subdataset
pp_name = subdataset if subdataset else band_str
if band_id in SLSTR_RAD_BANDS:
if not subdataset:
subdataset = self._replace(
self._radiance_subds, band=band, suffix=suffix
)
filename = self._replace(self._radiance_file, band=band, suffix=suffix)
elif band_id in SLSTR_BT_BANDS:
if not subdataset:
subdataset = self._replace(self._bt_subds, band=band, suffix=suffix)
filename = self._replace(self._bt_file, band=band, suffix=suffix)
else:
filename = band
# Get the pre-processed path
pp_path = self._get_preprocessed_band_path(
pp_name, suffix=suffix, pixel_size=pixel_size, writable=False
)
if not pp_path.is_file():
pp_path = self._get_preprocessed_band_path(
pp_name, suffix=suffix, pixel_size=pixel_size, writable=True
)
# Get raw band
band_arr = self._read_nc(
filename, subdataset, dtype=kwargs.get("dtype", np.float32)
)
# Radiance preprocess (BT bands are given in BT !)
if not kwargs.get("flags", False) and band_id in SLSTR_RAD_BANDS:
# Adjust radiance if needed
# Get the user's radiance adjustment if existing
rad_adjust = kwargs.get(SLSTR_RAD_ADJUST, self._rad_adjust)
try:
# Try to convert the rad_adjust to the correct enum
rad_adjust = SlstrRadAdjust.from_value(rad_adjust)
except ValueError:
# Allow the user to pass a custom SlstrRadAdjustTuple
assert isinstance(rad_adjust, SlstrRadAdjustTuple)
band_arr = self._radiance_adjustment(
band_arr, band, view=suffix[-1], rad_adjust=rad_adjust
)
# Convert radiance to reflectances if needed
# Convert first pixel by pixel before reprojection !
if to_reflectance:
LOGGER.debug(f"Converting {band_str} to reflectance")
band_arr = self._rad_2_refl(band_arr, band, suffix)
# Debug
# utils.write(
# band_arr,
# self._get_band_folder(writable=True).joinpath(
# f"{self.condensed_name}_{band.name}_rad2refl.tif"
# ),
# )
# Geocode
LOGGER.debug(f"Geocoding {pp_name}")
kwargs.pop("suffix", None)
pp_arr = self._geocode(
band_arr, pixel_size=pixel_size, suffix=suffix, **kwargs
)
# Write on disk
utils.write(pp_arr, pp_path)
return pp_path
def _get_suffix(self, band: Union[str, BandNames] = None, **kwargs) -> str:
"""
Get the suffix according to the (given) stripe and view.
- "an" and "ao" refer to the 500 m grid, stripe A, respectively for nadir view (n) and oblique view (o)
- "bn" and "bo" refer to the 500 m grid, stripe B
- "cn" and "co" refer to the 500 m grid, stripe C
- "in" and "io" refer to the 1 km grid
- "fn" and "fo" refer to the F1 channel 1 km grid
Args:
band (Union[BandNames, str]): Band from which to get the stripe
kwargs: Other arguments
Returns:
str: Suffix (an, bn, cn, in, fn, ao, bo, co, io, fo)
"""
# Get the view
view = SlstrView.from_value(kwargs.get(SLSTR_VIEW, self._view))
if band is not None:
# Get the stripe
if isinstance(band, BandNames):
band = self.bands[band].id
if band == F1.value:
if self._F1_is_f:
stripe = SlstrStripe.F
else:
stripe = SlstrStripe.I
else:
if band in SLSTR_I_BANDS:
stripe = SlstrStripe.I
elif band in SLSTR_ABC_BANDS:
stripe = SlstrStripe.from_value(
kwargs.get(SLSTR_STRIPE, SlstrStripe.A)
)
else:
stripe = SlstrStripe.A
else:
stripe = SlstrStripe.from_value(kwargs.get(SLSTR_STRIPE, SlstrStripe.A))
# Return the prefix
return f"{stripe.value}{view.value}"
def _tie_to_img(self, tie_arr: np.ndarray, suffix: str) -> np.ndarray:
"""
Convert an image sampled on the tie point grid (tx) to the wanted gris, given by the suffix
Args:
tie_arr (xr.Dataset): Image sampled on the tie point grid (tx)
suffix: Suffix of the new grid
Returns:
np.ndarray: Array resampled to the wanted grid as a numpy array
"""
# Load tie point grid
tie_cart_file = "cartesian_tx.nc"
tx_nc_name = "x_tx"
ty_nc_name = "y_tx"
# WARNING: RectBivariateSpline must have increasing values
tx = np.squeeze(self._read_nc(tie_cart_file, tx_nc_name).data)[0, ::-1]
ty = np.squeeze(self._read_nc(tie_cart_file, ty_nc_name).data)[:, 0]
# Load fill image grid (cartesian)
geo_file = f"cartesian_{suffix}.nc"
fx_nc_name = f"x_{suffix}"
fy_nc_name = f"y_{suffix}"
fx = np.squeeze(self._read_nc(geo_file, fx_nc_name))
fy = np.squeeze(self._read_nc(geo_file, fy_nc_name))
# Interpolate via Spline (as extrapolation is possible and the grid is very sparse along the rows)
# Import scipy here (long import)
from scipy.interpolate import RectBivariateSpline
# WARNING 2: Rasterio reads like [count, y, x] !
no_nan_arr = np.nan_to_num(np.squeeze(tie_arr).data[:, ::-1])
spline_interp = RectBivariateSpline(ty, tx, no_nan_arr)
# Interpolate and set nodata back
img_arr = spline_interp.ev(fy, fx)
img_arr[img_arr == 0] = np.nan
return img_arr
# def _bt_2_rad(self, band_arr: xr.DataArray, band: BandNames = None) -> xr.DataArray:
# """
# Convert brightness temperature to radiance
#
# The Level-1 brightness temperature measurements provided for the thermal channels (S7-S9, F1 and F2)
# can be converted to radiance by integrating the Planck function at the BT of interest multiplied over the
# spectral response of each band. The spectral response functions for SLSTR-A and SLSTR-B are available on
# the ESA Sentinel Online website (see Section 8.2.10)
# https://sentinel.esa.int/web/sentinel/technical-guides/sentinel-3-slstr/instrument/measured-spectral-response-function-data
#
# In https://sentinel.esa.int/documents/247904/4598085/Sentinel-3-SLSTR-Land-Handbook.pdf/bee342eb-40d4-9b31-babb-8bea2748264a
#
# Args:
# band_arr (xr.DataArray): Band array
# band (BandNames): Spectral Band
#
# Returns:
# dict: Dictionary containing {band: path}
# """
#
# return band_arr
def _rad_2_refl(
self, band_arr: xr.DataArray, band: BandNames, suffix: str
) -> xr.DataArray:
"""
Convert radiance to reflectance
The visible and SWIR channels (S1-S6) provide measurements of top of atmosphere (ToA) radiances
(mW/m2/sr/nm). These values can be converted to normalised reflectance for better comparison or
merging of data with different sun angles as follows:
reflectance = π* (ToA radiance / solar irradiance / COS(solar zenith angle))
where the solar irradiance at ToA is given in the ‘quality’ dataset for the channel,
and the solar zenith angle is given in the ‘geometry’ dataset.
The solar irradiance contained in the quality dataset is derived from the solar spectrum
of Thuillier et al. (2003) integrated over the measured SLSTR spectral responses
and corrected for the earth-to-sun distance at the time of the measurement.
In https://sentinel.esa.int/documents/247904/4598085/Sentinel-3-SLSTR-Land-Handbook.pdf/bee342eb-40d4-9b31-babb-8bea2748264a
Args:
band_arr (xr.DataArray): Band array
band (BandNames): Spectral Band
suffix (str): Band suffix
Returns:
dict: Dictionary containing {band: path}
"""
rad_2_refl_path, rad_2_refl_exists = self._get_out_path(
f"rad_2_refl_{band.name}_{suffix}.npy"
)
if not rad_2_refl_exists:
# Open SZA array (resampled to band_arr size)
sza = self._compute_sza_img_grid(suffix)
# Open solar flux (resampled to band_arr size)
e0 = self._compute_e0(band, suffix)
# Compute rad_2_refl coeff
rad_2_refl_coeff = (np.pi / e0 / np.cos(sza)).astype(np.float32)
# Write on disk
np.save(str(rad_2_refl_path), rad_2_refl_coeff)
else:
# Open rad_2_refl_coeff (resampled to band_arr size)
rad_2_refl_coeff = utils.load_np(rad_2_refl_path, self._tmp_output)
return band_arr * rad_2_refl_coeff
def _radiance_adjustment(
self,
band_arr: xr.DataArray,
band: Union[str, BandNames],
view: str,
rad_adjust: SlstrRadAdjust = SlstrRadAdjust.S3_PN_SLSTR_L1_08,
) -> xr.DataArray:
"""
Applying the radiance adjustment as recommended in the product notice:
S3.PN-SLSTR-L1.08 (https://www-cdn.eumetsat.int/files/2021-05/S3.PN-SLSTR-L1.08%20-%20i1r0%20-%20SLSTR%20L1%20PB%202.75-A%20and%201.53-B.pdf):
SLSTR-A/B: All solar channels (S1-S6) have been undergoing a vicarious calibration assessment to
quantify their radiometric calibration adjustment. Recent analysis of vicarious calibration results
over desert sites performed by RAL, CNES, Rayference and University of Arizona have determined
new and consistent radiometric deviations wrt. common reference sensors (MERIS, MODIS)
[S3MPC.RAL.TN.010]. Consequently, these have been used to provide a first-order radiometric
corrections which are provided in the below tables with more detail at the following link
[S3MPC.RAL.TN.020]. Current radiances in the L1B product remain uncorrected of these
radiometric calibration adjustments. Hence, these multiplicative coefficients are strongly
recommended to be used by all users.
Nadir view
S1 S2 S3 S5 S6
Correction 0.97 0.98 0.98 1.11 1.13
Uncertainty 0.03 0.02 0.02 0.02 0.02
Oblique view
S1 S2 S3 S5 S6
Correction 0.94 0.95 0.95 1.04 1.07
Uncertainty 0.05 0.03 0.03 0.03 0.05
Args:
band_arr (xr.DataArray): Band array
band (Union[str, BandNames]): Optical Band
view (str): View (n or o for Nadir and Oblique)
rad_adjust (SlstrRadAdjust): Radiance Adjustment
Returns:
xr.DataArray: Adjusted band array
"""
try:
band_id = self.bands[band].id
if band_id in SLSTR_RAD_BANDS:
# Allow the tuple and the enum
if isinstance(rad_adjust, SlstrRadAdjust):
rad_adjust_tuple = rad_adjust.value
else:
rad_adjust_tuple = rad_adjust
# Get the band coefficient and multiply the band
rad_coeff = getattr(rad_adjust_tuple, f"{band_id}_{view}")
band_arr *= rad_coeff
except KeyError:
# Not a band (ie Quality Flags) or Brightness temperature: no adjust needed
pass
return band_arr
def _compute_sza_img_grid(self, suffix) -> np.ndarray:
"""
Compute Sun Zenith Angle (in radian) resampled to the image grid (from the tie point grid)
Args:
suffix (str): Suffix
Returns:
np.ndarray: Resampled Sun Zenith Angle as a numpy array
"""
sza_img_path, sza_exists = self._get_out_path(f"sza_{suffix}.npy")
if not sza_exists:
geom_file = self._replace(self._geom_file, view=suffix[-1])
sza_name = self._replace(self._sza_name, view=suffix[-1])
sza = self._read_nc(geom_file, sza_name)
sza_rad = sza * np.pi / 180.0
# From tie grid to image grid
sza_img = self._tie_to_img(sza_rad, suffix)
# Write on disk
np.save(str(sza_img_path), sza_img)
else:
# Open sza_img (resampled to band_arr size)
sza_img = utils.load_np(sza_img_path, self._tmp_output)
return sza_img
def _compute_e0(self, band: BandNames, suffix: str) -> np.ndarray:
"""
Compute the solar spectral flux in mW / (m^2 * sr * nm)
Args:
band (BandNames): Optical Band
suffix (str): Suffix
Returns:
np.ndarray: Solar Flux
"""
misc = self._replace(self._misc_file, band=band, suffix=suffix)
solar_flux_name = self._replace(self._solar_flux_name, band=band, suffix=suffix)
e0 = self._read_nc(misc, solar_flux_name).data
e0 = np.nanmean(e0)
if np.isnan(e0):
e0 = SLSTR_SOLAR_FLUXES_DEFAULT[band]
return e0
def _manage_invalid_pixels(
self, band_arr: xr.DataArray, band: BandNames, **kwargs
) -> xr.DataArray:
"""
Manage invalid pixels (Nodata, saturated, defective...)
ISP_absent pixel_absent not_decompressed no_signal saturation invalid_radiance no_parameters unfilled_pixel
Args:
band_arr (xr.DataArray): Band array
band (BandNames): Band name as a SpectralBandNames
kwargs: Other arguments used to load bands
Returns:
xr.DataArray: Cleaned band array
"""
# Open quality flags
# NOT OPTIMIZED, MAYBE CHECK INVALID PIXELS ON NOT GEOCODED DATA
suffix = self._get_suffix(band, **kwargs)
qual_flags_path = self._preprocess(
band,
suffix=suffix,
subdataset=self._replace(self._exception_name, band=band, suffix=suffix),
pixel_size=band_arr.rio.resolution(),
to_reflectance=False,
flags=True,
dtype=np.uint8,
)
# Open flag file
qual_arr = utils.read(
qual_flags_path,
size=(band_arr.rio.width, band_arr.rio.height),
resampling=Resampling.nearest, # Nearest to keep the flags
masked=False,
as_type=np.uint8,
**kwargs,
)
# Set no data for everything that caused an exception (3 and more)
exception = np.where(qual_arr >= 3, self._mask_true, self._mask_false)
# Get nodata mask
no_data = np.where(np.isnan(band_arr.data), self._mask_true, self._mask_false)
# Combine masks
mask = no_data | exception
# DO not set 0 to epsilons as they are a part of the
return self._set_nodata_mask(band_arr, mask)
def _has_cloud_band(self, band: BandNames) -> bool:
"""
Does this product has the specified cloud band ?
-> SLSTR does
"""
if band in [
RAW_CLOUDS,
ALL_CLOUDS,
CLOUDS,
CIRRUS,
]:
has_band = True
else:
has_band = False
return has_band
def _open_clouds(
self,
bands: list,
pixel_size: float = None,
size: Union[list, tuple] = None,
**kwargs,
) -> dict:
"""
Load cloud files as xarrays.
Read S3 SLSTR clouds from the flags file:cloud netcdf file.
https://sentinels.copernicus.eu/web/sentinel/technical-guides/sentinel-3-slstr/level-1/cloud-identification
bit_id flag_masks (ushort) flag_meanings
=== === ===
0 1US visible
1 2US 1.37_threshold
2 4US 1.6_small_histogram
3 8US 1.6_large_histogram
4 16US 2.25_small_histogram
5 32US 2.25_large_histogram
6 64US 11_spatial_coherence
7 128US gross_cloud
8 256US thin_cirrus
9 512US medium_high
10 1024US fog_low_stratus
11 2048US 11_12_view_difference
12 4096US 3.7_11_view_difference
13 8192US thermal_histogram
14 16384US spare
15 32768US spare
Args:
bands (list): List of the wanted bands
pixel_size (int): Band pixel size in meters
size (Union[tuple, list]): Size of the array (width, height). Not used if pixel_size is provided.
kwargs: Additional arguments
Returns:
dict: Dictionary {band_name, band_xarray}
"""
band_dict = {}
if bands:
all_ids = list(np.arange(0, 14))
cir_id = 8
cloud_ids = [cid for cid in all_ids if cid != cir_id]
# Open path
suffix = self._get_suffix(**kwargs)
flags_file = self._replace(self._flags_file, suffix=suffix)
cloud_name = self._replace(self._cloud_name, suffix=suffix)
cloud_path = self._preprocess(
flags_file,
suffix=suffix,
subdataset=cloud_name,
pixel_size=pixel_size,
to_reflectance=False,
)
# Open cloud file
clouds_array = utils.read(
cloud_path,
pixel_size=pixel_size,
size=size,
resampling=Resampling.nearest,
masked=False,
as_type=np.uint16,
**kwargs,
)
# Get nodata mask
nodata = np.where(np.isnan(clouds_array), 1, 0)
for band in bands:
if band == ALL_CLOUDS:
cloud = self._create_mask(clouds_array, all_ids, nodata)
elif band == CLOUDS:
cloud = self._create_mask(clouds_array, cloud_ids, nodata)
elif band == CIRRUS:
cloud = self._create_mask(clouds_array, cir_id, nodata)
elif band == RAW_CLOUDS:
cloud = clouds_array
else:
raise InvalidTypeError(
f"Non existing cloud band for Sentinel-3 SLSTR: {band}"
)
# Rename
band_name = to_str(band)[0]
# Multi bands -> do not change long name
if band != RAW_CLOUDS:
cloud.attrs["long_name"] = band_name
band_dict[band] = cloud.rename(band_name).astype(np.float32)
return band_dict
def _create_mask(
self,
bit_array: xr.DataArray,
bit_ids: Union[int, list],
nodata: np.ndarray,
) -> xr.DataArray:
"""
Create a mask masked array (uint8) from a bit array, bit IDs and a nodata mask.
Args:
bit_array (xr.DataArray): Conditional array
bit_ids (Union[int, list]): Bit IDs
nodata (np.ndarray): Nodata mask
Returns:
xr.DataArray: Mask masked array
"""
bit_ids = types.make_iterable(bit_ids)
conds = rasters.read_bit_array(bit_array, bit_ids)
cond = reduce(lambda x, y: x | y, conds) # Use every condition (bitwise or)
cond_arr = np.where(cond, self._mask_true, self._mask_false).astype(np.uint8)
cond_arr = np.squeeze(cond_arr)
try:
cond_arr = features.sieve(cond_arr, size=10, connectivity=4)
except TypeError:
# Manage dask arrays that fails with rasterio sieve
cond_arr = features.sieve(cond_arr.compute(), size=10, connectivity=4)
cond_arr = np.expand_dims(cond_arr, axis=0)
return super()._create_mask(bit_array, cond_arr, nodata)
[docs]
@cache
def get_mean_sun_angles(self, view: SlstrView = SlstrView.NADIR) -> (float, float):
"""
Get Mean Sun angles (Azimuth and Zenith angles)
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = "S3B_SL_1_RBT____20191115T233722_20191115T234022_20191117T031722_0179_032_144_3420_LN2_O_NT_003.SEN3"
>>> prod = Reader().open(path)
>>> prod.get_mean_sun_angles()
(78.55043955912154, 31.172127033319388)
Args:
view (SlstrView): SLSTR View (Nadir or Oblique)
Returns:
(float, float): Mean Azimuth and Zenith angle
"""
geom_file = self._replace(self._geom_file, view=view.value)
saa_name = self._replace(self._saa_name, view=view.value)
sza_name = self._replace(self._sza_name, view=view.value)
# Open sun azimuth and zenith files
sun_az = self._read_nc(geom_file, saa_name)
sun_ze = self._read_nc(geom_file, sza_name)
return float(sun_az.mean().data) % 360, float(sun_ze.mean().data)
def _get_clean_band_path(
self,
band: BandNames,
pixel_size: float = None,
writable: bool = False,
**kwargs,
) -> AnyPathType:
"""
Get clean band path.
The clean band is the opened band where invalid pixels have been managed.
Args:
band (BandNames): Wanted band
pixel_size (float): Band pixel size in meters
kwargs: Additional arguments
Returns:
AnyPathType: Clean band path
"""
cleaning_method = CleanMethod.from_value(
kwargs.get(CLEAN_OPTICAL, DEF_CLEAN_METHOD)
)
suffix = self._get_suffix(band, **kwargs)
res_str = self._pixel_size_to_str(pixel_size)
# Window name
window = kwargs.get("window")
win_suffix = ""
if window is not None:
if path.is_path(window):
win_suffix = f"{path.get_filename(window)}"
elif isinstance(window, gpd.GeoDataFrame):
win_suffix = f"{window.attrs.get('name')}"
if not win_suffix:
win_suffix = f"win{files.hash_file_content(str(window))}"
win_suffix += "_"
# Radiometric processing
rad_proc = "" if kwargs.get(TO_REFLECTANCE, True) else "_as_is"
return self._get_band_folder(writable).joinpath(
f"{self.condensed_name}_{band.name}_{suffix}_{res_str.replace('.', '-')}_{win_suffix}{cleaning_method.value}{rad_proc}.tif",
)