# -*- coding: utf-8 -*-
# Copyright 2022, SERTIT-ICube - France, https:#sertit.unistra.fr/
# This file is part of eoreader project
# https:#github.com/sertit/eoreader
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Sentinel-3 OLCI products
.. WARNING:
Not georeferenced NetCDF files are badly opened by GDAL and therefore by rasterio !
-> use xr.open_dataset that manages that correctly
"""
import logging
from pathlib import Path
from typing import Union
import numpy as np
import rasterio
import xarray as xr
from cloudpathlib import CloudPath
from rasterio.enums import Resampling
from sertit import rasters, rasters_rio
from sertit.rasters import MAX_CORES, XDS_TYPE
from sertit.vectors import WGS84
from eoreader import cache, utils
from eoreader.bands import BandNames
from eoreader.bands import OpticalBandNames as obn
from eoreader.exceptions import InvalidProductError, InvalidTypeError
from eoreader.products import S3DataType, S3Instrument, S3Product, S3ProductType
from eoreader.reader import Platform
from eoreader.utils import EOREADER_NAME
LOGGER = logging.getLogger(EOREADER_NAME)
# FROM SNAP:
# https://github.com/senbox-org/s3tbx/blob/197c9a471002eb2ec1fbd54e9a31bfc963446645/s3tbx-rad2refl/src/main/java/org/esa/s3tbx/processor/rad2refl/Rad2ReflConstants.java#L97
# Not used for now
OLCI_SOLAR_FLUXES_DEFAULT = {
obn.Oa01: 1714.9084,
obn.Oa02: 1872.3961,
obn.CA: 1926.6102,
obn.BLUE: 1930.2483,
obn.Oa05: 1804.2762,
obn.GREEN: 1651.5836,
obn.YELLOW: 1531.4067,
obn.RED: 1475.615,
obn.Oa09: 1408.9949,
obn.Oa10: 1265.5425,
obn.VRE_1: 1255.4227,
obn.VRE_2: 1178.0286,
obn.Oa13: 955.07043,
obn.Oa14: 914.18945,
obn.Oa15: 882.8275,
obn.VRE_3: 882.8275,
obn.NIR: 882.8275,
obn.NARROW_NIR: 882.8275,
obn.Oa18: 882.8275,
obn.Oa19: 882.8275,
obn.WV: 882.8275,
obn.Oa21: 882.8275,
}
[docs]class S3OlciProduct(S3Product):
"""
Class of Sentinel-3 OLCI Products
"""
[docs] def __init__(
self,
product_path: Union[str, CloudPath, Path],
archive_path: Union[str, CloudPath, Path] = None,
output_path: Union[str, CloudPath, Path] = None,
remove_tmp: bool = False,
) -> None:
"""
https://sentinels.copernicus.eu/web/sentinel/technical-guides/sentinel-3-slstr/level-1/observation-mode-desc
Note that the name of each netCDF file provides information about its content.
"""
super().__init__(
product_path, archive_path, output_path, remove_tmp
) # Order is important here
self._gcps = []
def _get_preprocessed_band_path(
self,
band: Union[obn, str],
resolution: Union[float, tuple, list] = None,
writable=True,
) -> Union[CloudPath, Path]:
"""
Create the pre-processed band path
Args:
band (band: Union[obn, str]): Wanted band (quality flags accepted)
resolution (Union[float, tuple, list]): Resolution of the wanted UTM band
writable (bool): Do we need to write the pre-processed band ?
Returns:
Union[CloudPath, Path]: Pre-processed band path
"""
res_str = self._resolution_to_str(resolution)
band_str = band.name if isinstance(band, obn) else band
return self._get_band_folder(writable=writable).joinpath(
f"{self.condensed_name}_{band_str}_{res_str}.tif"
)
def _set_preprocess_members(self):
""" Set pre-process members """
# Radiance bands
self._radiance_file = "{band}_radiance.nc"
self._radiance_subds = "{band}_radiance"
# Geocoding
self._geo_file = "geo_coordinates.nc"
self._lat_nc_name = "latitude"
self._lon_nc_name = "longitude"
self._alt_nc_name = "altitude"
# Tie geocoding
self._tie_geo_file = "tie_geo_coordinates.nc"
self._tie_lat_nc_name = "latitude"
self._tie_lon_nc_name = "longitude"
# Mean Sun angles
self._geom_file = "tie_geometries.nc"
self._saa_name = "SAA"
self._sza_name = "SZA"
# Rad 2 Refl
self._misc_file = "instrument_data.nc"
self._solar_flux_name = "solar_flux"
self._det_index = "detector_index"
def _pre_init(self) -> None:
"""
Function used to pre_init the products
(setting needs_extraction and so on)
"""
self.needs_extraction = False
# Post init done by the super class
super()._pre_init()
def _get_platform(self) -> Platform:
""" Getter of the platform """
if "OL" in self.name:
# Instrument
self._instrument = S3Instrument.OLCI
sat_id = self._instrument.value
else:
raise InvalidProductError(
f"Only OLCI and SLSTR are valid Sentinel-3 instruments : {self.name}"
)
return getattr(Platform, sat_id)
def _set_resolution(self) -> float:
"""
Set product default resolution (in meters)
"""
return 300.0
def _set_product_type(self) -> None:
"""Set products type"""
# Product type
if self.name[7] != "1":
raise InvalidTypeError("Only L1 products are used for Sentinel-3 data.")
self.product_type = S3ProductType.OLCI_EFR
self._data_type = S3DataType.EFR
# Bands
self.band_names.map_bands(
{
obn.Oa01: "Oa01",
obn.Oa02: "Oa02",
obn.CA: "Oa03",
obn.BLUE: "Oa04",
obn.Oa05: "Oa05",
obn.GREEN: "Oa06",
obn.YELLOW: "Oa07",
obn.RED: "Oa08",
obn.Oa09: "Oa09",
obn.Oa10: "Oa10",
obn.VRE_1: "Oa11",
obn.VRE_2: "Oa12",
obn.Oa13: "Oa13",
obn.Oa14: "Oa14",
obn.Oa15: "Oa15",
obn.VRE_3: "Oa16",
obn.NIR: "Oa17",
obn.NARROW_NIR: "Oa17",
obn.Oa18: "Oa18",
obn.Oa19: "Oa19",
obn.WV: "Oa20",
obn.Oa21: "Oa21",
}
)
def _create_gcps(self) -> None:
"""
Create the GCPs sequence
"""
# Compute only ig needed
if not self._gcps:
# Open lon/lat/alt files to populate the GCPs
lat = self._read_nc(self._geo_file, self._lat_nc_name)
lon = self._read_nc(self._geo_file, self._lon_nc_name)
alt = self._read_nc(self._geo_file, self._alt_nc_name)
# Create GCPs
self._gcps = utils.create_gcps(lon, lat, alt)
def _preprocess(
self,
band: Union[obn, str],
resolution: float = None,
to_reflectance: bool = True,
subdataset: str = None,
**kwargs,
) -> Union[CloudPath, Path]:
"""
Pre-process S3 OLCI bands:
- Convert radiance to reflectance
- Geocode
Args:
band (Union[obn, str]): Band to preprocess (quality flags or others are accepted)
resolution (float): Resolution
to_reflectance (bool): Convert band to reflectance
subdataset (str): Subdataset
kwargs: Other arguments used to load bands
Returns:
dict: Dictionary containing {band: path}
"""
band_str = band if isinstance(band, str) else band.name
path = self._get_preprocessed_band_path(band, resolution=resolution)
if not path.is_file():
path = self._get_preprocessed_band_path(
band, resolution=resolution, writable=True
)
# Get band regex
if isinstance(band, obn):
band_name = self.band_names[band]
if not subdataset:
subdataset = self._replace(self._radiance_subds, band=band_name)
filename = self._replace(self._radiance_file, band=band_name)
else:
filename = band
# Get raw band
band_arr = self._read_nc(
filename, subdataset, dtype=kwargs.get("dtype", np.float32)
)
# Convert radiance to reflectances if needed
# Convert first pixel by pixel before reprojection !
if to_reflectance:
LOGGER.debug(f"Converting {band_str} to reflectance")
band_arr = self._rad_2_refl(band_arr, band)
# Debug
# utils.write(
# band_arr,
# self._get_band_folder(writable=True).joinpath(
# f"{self.condensed_name}_{band.name}_rad2refl.tif"
# ),
# )
# Geocode
LOGGER.debug(f"Geocoding {band_str}")
pp_arr = self._geocode(band_arr, resolution=resolution)
# Write on disk
utils.write(pp_arr, path)
return path
def _geocode(
self, band_arr: xr.DataArray, resolution: float = None
) -> xr.DataArray:
"""
Geocode Sentinel-3 bands
Args:
band_arr (xr.DataArray): Band array
resolution (float): Resolution
Returns:
xr.DataArray: Geocoded DataArray
"""
# Create GCPs if not existing
self._create_gcps()
# Assign a projection
band_arr.rio.write_crs(WGS84, inplace=True)
return band_arr.rio.reproject(
dst_crs=self.crs,
resolution=resolution,
gcps=self._gcps,
nodata=self._mask_nodata if band_arr.dtype == np.uint8 else self.nodata,
num_threads=MAX_CORES,
**{"SRC_METHOD": "GCP_TPS"},
)
def _rad_2_refl(self, band_arr: xr.DataArray, band: obn = None) -> xr.DataArray:
"""
Convert radiance to reflectance
Args:
band_arr (xr.DataArray): Band array
band (obn): Optical Band (for SLSTR only)
Returns:
dict: Dictionary containing {band: path}
"""
rad_2_refl_path = self._get_band_folder() / f"rad_2_refl_{band.name}.npy"
if not rad_2_refl_path.is_file():
rad_2_refl_path = (
self._get_band_folder(writable=True) / f"rad_2_refl_{band.name}.npy"
)
# Open SZA array (resampled to band_arr size)
sza_path = self._get_band_folder() / "sza.tif"
if not sza_path.is_file():
sza_path = self._get_band_folder(writable=True) / "sza.tif"
sza_nc = self._read_nc(self._geom_file, self._sza_name)
utils.write(sza_nc, sza_path)
with rasterio.open(sza_path) as ds_sza:
# Values can be easily interpolated at pixels from Tie Points by linear interpolation using the
# image column coordinate.
sza, _ = rasters_rio.read(
ds_sza,
size=(band_arr.rio.width, band_arr.rio.height),
resampling=Resampling.bilinear,
masked=False,
)
sza_rad = sza.astype(np.float32) * np.pi / 180.0
# Open solar flux (resampled to band_arr size)
e0 = self._compute_e0(band)
# Compute rad_2_refl coeff
rad_2_refl_coeff = (np.pi / e0 / np.cos(sza_rad)).astype(np.float32)
# Write on disk
np.save(rad_2_refl_path, rad_2_refl_coeff)
else:
# Open rad_2_refl_coeff (resampled to band_arr size)
rad_2_refl_coeff = np.load(rad_2_refl_path)
return band_arr * rad_2_refl_coeff
def _compute_e0(self, band: obn = None) -> np.ndarray:
"""
Compute the solar spectral flux in mW / (m^2 * sr * nm)
The Level 1 product provides measurements of top of atmosphere (ToA) radiances (mW/m2/sr/nm). These
values can be converted to normalised reflectance for better comparison or merging of data with different
sun angles as follows:
reflectance = π* (ToA radiance / solar irradiance / cos(solar zenith angle))
where the solar irradiance at ToA is given in the ‘solar_flux’ dataset (instrument_data.nc file) for each
detector and each channel, and the solar zenith angle is given at Tie Points in the ‘SZA’ dataset
(tie_geometry.nc file). The appropriate instrument detector is given at each pixel by the ‘detector_index’
dataset (instrument_data.nc file).
In https://sentinel.esa.int/documents/247904/4598069/Sentinel-3-OLCI-Land-Handbook.pdf/455f8c88-520f-da18-d744-f5cda41d2d91
Args:
band (obn): Optical Band (for SLSTR only)
Returns:
np.ndarray: Solar Flux
"""
# Do not convert to int here as we want to keep the nans
det_idx = self._read_nc(self._misc_file, self._det_index).data
e0_det = self._read_nc(self._misc_file, self._solar_flux_name).data
# Get band slice and open corresponding e0 for the detectors
band_slice = int(self.band_names[band][-2:]) - 1
e0_det = np.squeeze(e0_det[0, band_slice, :])
# Create e0
e0 = det_idx
not_nan_idx = ~np.isnan(det_idx)
e0[not_nan_idx] = e0_det[det_idx[not_nan_idx].astype(int)]
return e0
def _manage_invalid_pixels(
self, band_arr: XDS_TYPE, band: obn, **kwargs
) -> XDS_TYPE:
"""
Manage invalid pixels (Nodata, saturated, defective...) for OLCI data.
See there:
https:sentinel.esa.int/documents/247904/1872756/Sentinel-3-OLCI-Product-Data-Format-Specification-OLCI-Level-1
QUALITY FLAGS (From end to start of the 32 bits):
| Bit | Flag |
|----|----------------------|
| 0 | saturated21 |
| 1 | saturated20 |
| 2 | saturated19 |
| 3 | saturated18 |
| 4 | saturated17 |
| 5 | saturated16 |
| 6 | saturated15 |
| 7 | saturated14 |
| 8 | saturated13 |
| 9 | saturated12 |
| 10 | saturated11 |
| 11 | saturated10 |
| 12 | saturated09 |
| 13 | saturated08 |
| 14 | saturated07 |
| 15 | saturated06 |
| 16 | saturated05 |
| 17 | saturated04 |
| 18 | saturated03 |
| 19 | saturated02 |
| 20 | saturated01 |
| 21 | dubious |
| 22 | sun-glint_risk |
| 23 | duplicated |
| 24 | cosmetic |
| 25 | invalid |
| 26 | straylight_risk |
| 27 | bright |
| 28 | tidal_region |
| 29 | fresh_inland_water |
| 30 | coastline |
| 31 | land |
Args:
band_arr (XDS_TYPE): Band array
band (obn): Band name as an OpticalBandNames
kwargs: Other arguments used to load bands
Returns:
XDS_TYPE: Cleaned band array
"""
# Bit ids
band_bit_id = {
obn.Oa01: 20, # Band 1
obn.Oa02: 19, # Band 2
obn.CA: 18, # Band 3
obn.BLUE: 17, # Band 4
obn.Oa05: 16, # Band 5
obn.GREEN: 15, # Band 6
obn.YELLOW: 14, # Band 7
obn.RED: 13, # Band 8
obn.Oa09: 12, # Band 9
obn.Oa10: 11, # Band 10
obn.VRE_1: 10, # Band 11
obn.VRE_2: 9, # Band 12
obn.Oa13: 8, # Band 13
obn.Oa14: 7, # Band 14
obn.Oa15: 6, # Band 15
obn.VRE_3: 5, # Band 16
obn.NIR: 4, # Band 17
obn.NARROW_NIR: 4, # Band 17
obn.Oa18: 3, # Band 18
obn.Oa19: 2, # Band 19
obn.WV: 1, # Band 20
obn.Oa21: 0, # Band 21
}
invalid_id = 25
sat_band_id = band_bit_id[band]
# Open quality flags
# NOT OPTIMIZED, MAYBE CHECK INVALID PIXELS ON NOT GEOCODED DATA
qual_regex = "qualityFlags"
subds = "quality_flags"
qual_flags_path = self._preprocess(
qual_regex,
subdataset=subds,
resolution=band_arr.rio.resolution(),
to_reflectance=False,
)
# Open flag file
qual_arr, _ = rasters_rio.read(
qual_flags_path,
size=(band_arr.rio.width, band_arr.rio.height),
resampling=Resampling.nearest, # Nearest to keep the flags
masked=False,
)
invalid, sat = rasters.read_bit_array(
qual_arr.astype(np.uint32), [invalid_id, sat_band_id]
)
# Get nodata mask
no_data = np.where(np.isnan(band_arr.data), self._mask_true, self._mask_false)
# Combine masks
mask = no_data | invalid | sat
return self._set_nodata_mask(band_arr, mask)
def _has_cloud_band(self, band: BandNames) -> bool:
"""
Does this products has the specified cloud band ?
-> OLCI does not provide any cloud mask
"""
return False
def _open_clouds(
self,
bands: list,
resolution: float = None,
size: Union[list, tuple] = None,
**kwargs,
) -> dict:
"""
Does nothing for OLCI data
Args:
bands (list): List of the wanted bands
resolution (int): Band resolution in meters
size (Union[tuple, list]): Size of the array (width, height). Not used if resolution is provided.
kwargs: Additional arguments
Returns:
dict: Dictionary {band_name, band_xarray}
"""
if bands:
LOGGER.warning("Sentinel-3 OLCI L1B does not provide any cloud file")
return {}
[docs] @cache
def get_mean_sun_angles(self) -> (float, float):
"""
Get Mean Sun angles (Azimuth and Zenith angles)
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = "S3B_SL_1_RBT____20191115T233722_20191115T234022_20191117T031722_0179_032_144_3420_LN2_O_NT_003.SEN3"
>>> prod = Reader().open(path)
>>> prod.get_mean_sun_angles()
(78.55043955912154, 31.172127033319388)
Returns:
(float, float): Mean Azimuth and Zenith angle
"""
# Open sun azimuth and zenith files
sun_az = self._read_nc(self._geom_file, self._saa_name)
sun_ze = self._read_nc(self._geom_file, self._sza_name)
return sun_az.mean().data % 360, sun_ze.mean().data