Source code for eoreader.products.optical.pla_product

# -*- coding: utf-8 -*-
# Copyright 2022, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of eoreader project
#     https://github.com/sertit/eoreader
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PlanetScope products.
See `here <https://earth.esa.int/eogateway/documents/20142/37627/Planet-combined-imagery-product-specs-2020.pdf>`_
and `here <https://developers.planet.com/docs/data/planetscope/>`_
for more information.
"""
import logging
from datetime import datetime
from enum import unique
from pathlib import Path
from typing import Union

import geopandas as gpd
import numpy as np
import xarray
from cloudpathlib import CloudPath
from lxml import etree
from rasterio.enums import Resampling
from sertit import files, rasters
from sertit.misc import ListEnum
from sertit.rasters import XDS_TYPE

from eoreader import cache, cached_property, utils
from eoreader.bands import ALL_CLOUDS, CIRRUS, CLOUDS, RAW_CLOUDS, SHADOWS, BandNames
from eoreader.bands import OpticalBandNames as obn
from eoreader.bands import to_str
from eoreader.exceptions import InvalidProductError, InvalidTypeError
from eoreader.products import OpticalProduct
from eoreader.utils import DATETIME_FMT, EOREADER_NAME

LOGGER = logging.getLogger(EOREADER_NAME)


[docs]@unique class PlaInstrument(ListEnum): """PlanetScope instrument See `here <https://developers.planet.com/docs/apis/data/sensors/>`__ for more information. """ PS2 = "Dove Classic (PS2)" """ Dove Classic (PS2) Instrument: Four-band frame Image with a split-frame VIS+NIR filter """ PS2_SD = "Dove-R (PS2.SD)" """ Dove-R (PS2.SD) Instrument: Four-band frame imager with butcher-block filter providing blue, green, red,and NIR stripes """ PSB_SD = "SuperDove (PSB.SD)" """ SuperDove (PSB.SD) Instrument: Eight-band frame imager with butcher-block filter providing: - coastal blue, - blue, - green I, - green II, - yellow, - red, - red-edge, - NIR stripes """
[docs]@unique class PlaProductType(ListEnum): """PlanetScope product types (processing levels)""" L1B = "Basic Scene Product" """ **PlanetScope Basic Scene Product (Level 1B)** Scaled Top of Atmosphere Radiance(at sensor) and sensor corrected product. This product has scene based framing and is not projected to a cartographic projection. Radiometric and sensor corrections are applied to the data. """ L3B = "Ortho Scene Product" """ **PlanetScope Ortho Scene Product (Level 3B)** Orthorectified, scaled Top of Atmosphere Radiance (at sensor) or Surface Reflectance image product suitable for analytic and visual applications. This product has scene based framing and projected to a cartographic projection. **PSScene3Band** PlanetScope 3-band multispectral basic and orthorectified scenes. This data set includes imagery from PlanetScope-0 and PlanetScope-1 sensors as well as full-frame and split-frame PlanetScope-2 sensors. Newer PSScene3Band items have a corresponding PSScene4Band item. Resampled to 3.0m. **PSScene4Band** PlanetScope 4-band multispectral basic and orthorectified scenes. This data set includes imagery from all PlanetScope sensors. All PSScene4Band items have a corresponding PSScene3Band item. Resampled to 3.0m. """ """ **PSScene (Not found anywhere else)** PlanetScope 8-band multispectral basic and orthorectified scenes. This data set includes imagery from all PlanetScope sensors. Naming: <acq date>_<acq time>_<acq time seconds ms>_<satellite_id>_<productLevel>_<bandProduct>.<ext> Asset Types: ortho_analytic_4b Radiometrically-calibrated analytic image stored as 16-bit scaled radiance. ortho_analytic_8b Radiometrically-calibrated analytic image stored as 16-bit scaled radiance. ortho_analytic_8b_sr PlanetScope atmospherically corrected surface reflectance product. ortho_analytic_8b_xml Radiometrically-calibrated analytic image metadata. ortho_analytic_4b_sr PlanetScope atmospherically corrected surface reflectance product. ortho_analytic_4b_xml Radiometrically-calibrated analytic image metadata. basic_analytic_4b Unorthorectified radiometrically-calibrated analytic image stored as 16-bit scaled radiance. basic_analytic_8b Unorthorectified radiometrically-calibrated analytic image stored as 16-bit scaled radiance. basic_analytic_8b_xml Unorthorectified radiometrically-calibrated analytic image metadata basic_analytic_4b_rpc RPC for unorthorectified analytic image stored as 12-bit digital numbers. basic_analytic_4b_xml Unorthorectified radiometrically-calibrated analytic image metadata. basic_udm2 Unorthorectified usable data mask (Cloud 2.0) Read more about this new asset here. ortho_udm2 Usable data mask (Cloud 2.0) ortho_visual Visual image with color-correction """ L3A = "Ortho Tile Product" """ **PlanetScope Ortho Tile Product (Level 3A)** Radiometric and sensor corrections applied to the data. Imagery is orthorectified and projected to a UTM projection. **PSOrthoTile** PlanetScope Ortho Tiles as 25 km x 25 km UTM tiles. This data set includes imagery from all PlanetScope sensors. Resampled to 3.125m. Naming: <strip_id>_<tile_id>_<acquisition date>_<satellite_id>_<bandProduct>.<extension> Product band order: - Band 1 = Blue - Band 2 = Green - Band 3 = Red - Band 4 = Near-infrared (analytic products only) Analytic 5B Product band order: - Band 1 = Blue - Band 2 = Green - Band 3 = Red - Band 4 = Red-Edge - Band 5 = Near-infrared """
[docs]class PlaProduct(OpticalProduct): """ Class of PlanetScope products. See `here <https://earth.esa.int/eogateway/documents/20142/37627/Planet-combined-imagery-product-specs-2020.pdf>`__ for more information. The scaling factor to retrieve the calibrated radiance is 0.01. """ def _pre_init(self) -> None: """ Function used to pre_init the products (setting needs_extraction and so on) """ self.needs_extraction = False # Post init done by the super class super()._pre_init() def _post_init(self) -> None: """ Function used to post_init the products (setting sensor type, band names and so on) """ # Ortho Tiles if self.product_type == PlaProductType.L3A: self.tile_name = self.split_name[1] # Post init done by the super class super()._post_init() def _set_resolution(self) -> float: """ Set product default resolution (in meters) """ # Ortho Tiles if self.product_type == PlaProductType.L3A: return 3.125 # Ortho Scene else: return 3.0 def _set_product_type(self) -> None: """Set products type""" # Get MTD XML file root, nsmap = self.read_mtd() # Manage product type prod_type = root.findtext(f".//{nsmap['eop']}productType") if not prod_type: raise InvalidProductError( "Cannot find the product type in the metadata file" ) # Set correct product type self.product_type = getattr(PlaProductType, prod_type) if self.product_type == PlaProductType.L1B: raise NotImplementedError( f"Basic Scene Product are not managed for Planet products {self.path}" ) elif self.product_type == PlaProductType.L3A: LOGGER.warning( f"Ortho Tile Product are not well tested for Planet products {self.path}." f"Use it at your own risk !" ) # Manage platform instr_node = root.find(f".//{nsmap['eop']}Instrument") instrument = instr_node.findtext(f"{nsmap['eop']}shortName") if not instrument: raise InvalidProductError("Cannot find the platform in the metadata file") # Set correct platform self.instrument = getattr(PlaInstrument, instrument.replace(".", "_")) # Manage bands of the product nof_bands = len( [band for band in root.iterfind(f".//{nsmap['ps']}bandSpecificMetadata")] ) if nof_bands == 3: self.band_names.map_bands({obn.BLUE: 1, obn.GREEN: 2, obn.RED: 3}) elif nof_bands == 4: self.band_names.map_bands( {obn.BLUE: 1, obn.GREEN: 2, obn.RED: 3, obn.NIR: 4, obn.NARROW_NIR: 4} ) elif nof_bands == 5: self.band_names.map_bands( { obn.BLUE: 1, obn.GREEN: 2, obn.RED: 3, obn.VRE_1: 4, obn.NIR: 5, obn.NARROW_NIR: 5, } ) elif nof_bands == 8: raise NotImplementedError( f"8 Band Scenes are not yet implemented in EOReader: {self.path}" ) else: raise InvalidProductError( f"Unusual number of bands ({nof_bands}) for {self.path}. " f"Please check the validity of your product" ) @cached_property def footprint(self) -> gpd.GeoDataFrame: """ Get real footprint of the products (without nodata, in french == emprise utile) .. code-block:: python >>> from eoreader.reader import Reader >>> path = r"LC08_L1GT_023030_20200518_20200527_01_T2" >>> prod = Reader().open(path) >>> prod.footprint index geometry 0 0 POLYGON ((366165.000 4899735.000, 366165.000 4... Overload of the generic function because landsat nodata seems to be different in QA than in regular bands. Indeed, nodata pixels vary according to the band sensor footprint, whereas QA nodata is where at least one band has nodata. We chose to keep QA nodata values for the footprint in order to show where all bands are valid. **TL;DR: We use the QA nodata value to determine the product's footprint**. Returns: gpd.GeoDataFrame: Footprint as a GeoDataFrame """ nodata = self._load_nodata() # Vectorize the nodata band (rasters_rio is faster) footprint = rasters.vectorize( nodata, values=1, keep_values=False, dissolve=True ).convex_hull return gpd.GeoDataFrame(geometry=footprint.geometry, crs=footprint.crs)
[docs] def get_datetime(self, as_datetime: bool = False) -> Union[str, datetime]: """ Get the product's acquisition datetime, with format :code:`YYYYMMDDTHHMMSS` <-> :code:`%Y%m%dT%H%M%S` .. code-block:: python >>> from eoreader.reader import Reader >>> path = r"SENTINEL2A_20190625-105728-756_L2A_T31UEQ_C_V2-2" >>> prod = Reader().open(path) >>> prod.get_datetime(as_datetime=True) datetime.datetime(2019, 6, 25, 10, 57, 28, 756000), fetched from metadata, so we have the ms >>> prod.get_datetime(as_datetime=False) '20190625T105728' Args: as_datetime (bool): Return the date as a datetime.datetime. If false, returns a string. Returns: Union[str, datetime.datetime]: Its acquisition datetime """ if self.datetime is None: # Get MTD XML file root, nsmap = self.read_mtd() datetime_str = root.findtext(f".//{nsmap['eop']}acquisitionDate") if not datetime_str: raise InvalidProductError( "Cannot find EARLIESTACQTIME in the metadata file." ) # Convert to datetime datetime_str = datetime.strptime( datetime_str.split("+")[0], "%Y-%m-%dT%H:%M:%S" ) if not as_datetime: datetime_str = datetime_str.strftime(DATETIME_FMT) else: datetime_str = self.datetime if not as_datetime: datetime_str = datetime_str.strftime(DATETIME_FMT) return datetime_str
def _get_name(self) -> str: """ Set product real name from metadata Returns: str: True name of the product (from metadata) """ # Get MTD XML file root, nsmap = self.read_mtd() # Open identifier name = root.findtext(f".//{nsmap['eop']}identifier") if not name: raise InvalidProductError( f"{nsmap['eop']}identifier not found in metadata!" ) return name
[docs] def get_band_paths( self, band_list: list, resolution: float = None, **kwargs ) -> dict: """ Return the paths of required bands. .. code-block:: python >>> from eoreader.reader import Reader >>> from eoreader.bands import * >>> path = r"SENTINEL2A_20190625-105728-756_L2A_T31UEQ_C_V2-2" >>> prod = Reader().open(path) >>> prod.get_band_paths([GREEN, RED]) { <OpticalBandNames.GREEN: 'GREEN'>: 'SENTINEL2A_20190625-105728-756_L2A_T31UEQ_C_V2-2/SENTINEL2A_20190625-105728-756_L2A_T31UEQ_C_V2-2_FRE_B3.tif', <OpticalBandNames.RED: 'RED'>: 'SENTINEL2A_20190625-105728-756_L2A_T31UEQ_C_V2-2/SENTINEL2A_20190625-105728-756_L2A_T31UEQ_C_V2-2_FRE_B4.tif' } Args: band_list (list): List of the wanted bands resolution (float): Band resolution kwargs: Other arguments used to load bands Returns: dict: Dictionary containing the path of each queried band """ band_paths = {} for band in band_list: band_paths[band] = self._get_path( "AnalyticMS", "tif", invalid_lookahead="_DN_" ) return band_paths
def _read_band( self, path: Union[CloudPath, Path], band: BandNames = None, resolution: Union[tuple, list, float] = None, size: Union[list, tuple] = None, **kwargs, ) -> XDS_TYPE: """ Read band from disk. .. WARNING:: Invalid pixels are not managed here Args: path (Union[CloudPath, Path]): Band path band (BandNames): Band to read resolution (Union[tuple, list, float]): Resolution of the wanted band, in dataset resolution unit (X, Y) size (Union[tuple, list]): Size of the array (width, height). Not used if resolution is provided. kwargs: Other arguments used to load bands Returns: XDS_TYPE: Band xarray """ # Read band band_xda = utils.read( path, resolution=resolution, size=size, resampling=Resampling.bilinear, indexes=[self.band_names[band]], **kwargs, ) # Compute the correct radiometry of the band original_dtype = band_xda.encoding.get("dtype", band_xda.dtype) if original_dtype == "uint16": band_xda /= 10000.0 # Convert type if needed if band_xda.dtype != np.float32: band_xda = band_xda.astype(np.float32) return band_xda def _manage_invalid_pixels( self, band_arr: XDS_TYPE, band: obn, **kwargs ) -> XDS_TYPE: """ Manage invalid pixels (Nodata, saturated, defective...) See `here <https://earth.esa.int/eogateway/documents/20142/37627/Planet-combined-imagery-product-specs-2020.pdf>`_ (unusable data mask) for more information. Args: band_arr (XDS_TYPE): Band array band (obn): Band name as an OpticalBandNames kwargs: Other arguments used to load bands Returns: XDS_TYPE: Cleaned band array """ # Nodata no_data_mask = self._load_nodata( size=(band_arr.rio.width, band_arr.rio.height) ).values # Dubious pixels mapping dubious_bands = { key: val + 1 for key, val in self.band_names.items() if val is not None } udm = self.open_mask("UNUSABLE", size=(band_arr.rio.width, band_arr.rio.height)) # Workaround: # FutureWarning: The :code:`numpy.expand_dims` function is not implemented by Dask array. # You may want to use the da.map_blocks function or something similar to silence this warning. # Your code may stop working in a future release. dubious_mask = rasters.read_bit_array(udm.values, dubious_bands[band]) # Combine masks mask = no_data_mask | dubious_mask # -- Merge masks return self._set_nodata_mask(band_arr, mask) def _manage_nodata(self, band_arr: XDS_TYPE, band: obn, **kwargs) -> XDS_TYPE: """ Manage only nodata pixels Args: band_arr (XDS_TYPE): Band array band (obn): Band name as an OpticalBandNames kwargs: Other arguments used to load bands Returns: XDS_TYPE: Cleaned band array """ # Nodata no_data_mask = self._load_nodata( size=(band_arr.rio.width, band_arr.rio.height) ).values # -- Merge masks return self._set_nodata_mask(band_arr, no_data_mask) def _load_bands( self, bands: list, resolution: float = None, size: Union[list, tuple] = None, **kwargs, ) -> dict: """ Load bands as numpy arrays with the same resolution (and same metadata). Args: bands list: List of the wanted bands resolution (float): Band resolution in meters size (Union[tuple, list]): Size of the array (width, height). Not used if resolution is provided. kwargs: Other arguments used to load bands Returns: dict: Dictionary {band_name, band_xarray} """ # Return empty if no band are specified if not bands: return {} # Get band paths band_paths = self.get_band_paths(bands, **kwargs) # Open bands and get array (resampled if needed) band_arrays = self._open_bands( band_paths, resolution=resolution, size=size, **kwargs ) return band_arrays def _get_condensed_name(self) -> str: """ Get PlanetScope products condensed name ({date}_PLA_{product_type}). Returns: str: Condensed name """ return f"{self.get_datetime()}_{self.platform.name}_{self.product_type.name}"
[docs] @cache def get_mean_sun_angles(self) -> (float, float): """ Get Mean Sun angles (Azimuth and Zenith angles) .. code-block:: python >>> from eoreader.reader import Reader >>> path = r"SENTINEL2A_20190625-105728-756_L2A_T31UEQ_C_V2-2" >>> prod = Reader().open(path) >>> prod.get_mean_sun_angles() (154.554755774838, 27.5941391571236) Returns: (float, float): Mean Azimuth and Zenith angle """ # Get MTD XML file root, nsmap = self.read_mtd() # Open zenith and azimuth angle try: elev_angle = float( root.findtext(f".//{nsmap['opt']}illuminationElevationAngle") ) azimuth_angle = float( root.findtext(f".//{nsmap['opt']}illuminationAzimuthAngle") ) except TypeError: raise InvalidProductError("Azimuth or Zenith angles not found in metadata!") # From elevation to zenith zenith_angle = 90.0 - elev_angle return azimuth_angle, zenith_angle
@cache def _read_mtd(self) -> (etree._Element, dict): """ Read metadata and outputs the metadata XML root and its namespaces as a dict .. code-block:: python >>> from eoreader.reader import Reader >>> path = r"20210406_015904_37_2407.zip" >>> prod = Reader().open(path) >>> prod.read_mtd() (<Element {http://schemas.planet.com/ps/v1/planet_product_metadata_geocorrected_level} EarthObservation at 0x1a2621f03c8>, { 'opt': '{http://earth.esa.int/opt}', 'gml': '{http://www.opengis.net/gml}', 'eop': '{http://earth.esa.int/eop}', 'ps': '{http://schemas.planet.com/ps/v1/planet_product_metadata_geocorrected_level}' }) Returns: (etree._Element, dict): Metadata XML root and its namespaces as a dict """ mtd_from_path = "metadata*.xml" mtd_archived = r"metadata.*\.xml" return self._read_mtd_xml(mtd_from_path, mtd_archived) def _has_cloud_band(self, band: BandNames) -> bool: """ Does this products has the specified cloud band ? """ # NOTE: CIRRUS == HEAVY HAZE # FROM DOCUMENTATION: https://developers.planet.com/docs/data/udm-2/ # Percent of heavy haze values in dataset. # Heavy haze values represent scene content areas (non-blackfilled) that contain thin low altitude clouds, # higher altitude cirrus clouds, soot and dust which allow fair recognition of land cover features, # but not having reliable interpretation of the radiometry or surface reflectance. return True def _open_clouds( self, bands: list, resolution: float = None, size: Union[list, tuple] = None, **kwargs, ) -> dict: """ Load cloud files as xarrays. CIRRUS is HEAVY_HAZE Args: bands (list): List of the wanted bands resolution (int): Band resolution in meters size (Union[tuple, list]): Size of the array (width, height). Not used if resolution is provided. kwargs: Additional arguments Returns: dict: Dictionary {band_name, band_xarray} """ band_dict = {} # Load default xarray as a template def_xarr = self._read_band( self.get_default_band_path(), band=self.get_default_band(), resolution=resolution, size=size, ) # Load nodata nodata = self._load_nodata(resolution, size).data if bands: for band in bands: if band == ALL_CLOUDS: cloud = self._create_mask( def_xarr.rename(ALL_CLOUDS.name), ( self.open_mask("CLOUD", resolution, size).data & self.open_mask("SHADOW", resolution, size).data & self.open_mask("HEAVY_HAZE", resolution, size).data ), nodata, ) elif band == SHADOWS: cloud = self._create_mask( def_xarr.rename(SHADOWS.name), self.open_mask("SHADOW", resolution, size).data, nodata, ) elif band == CLOUDS: cloud = self._create_mask( def_xarr.rename(CLOUDS.name), self.open_mask("CLOUD", resolution, size).data, nodata, ) elif band == CIRRUS: cloud = self._create_mask( def_xarr.rename(CIRRUS.name), self.open_mask("HEAVY_HAZE", resolution, size).data, nodata, ) elif band == RAW_CLOUDS: cloud = utils.read(self._get_path("udm2", "tif"), resolution, size) else: raise InvalidTypeError( f"Non existing cloud band for Planet: {band}" ) # Rename band_name = to_str(band)[0] # Multi bands -> do not change long name if band != RAW_CLOUDS: cloud.attrs["long_name"] = band_name band_dict[band] = cloud.rename(band_name).astype(np.float32) return band_dict
[docs] def open_mask( self, mask_id: str, resolution: float = None, size: Union[list, tuple] = None, ) -> Union[xarray.DataArray, None]: """ Open a Planet UDM2 (Usable Data Mask) mask, band by band, as a xarray. Returns None if the mask is not available. Do not open cloud mask with this function. Use :code:`load` instead. See `here <https://developers.planet.com/docs/data/udm-2/>`_ for more information. Accepted mask IDs: - :code:`CLEAR`: Band 1 Clear map [0, 1] 0: not clear, 1: clear - :code:`SNOW`: Band 2 Snow map [0, 1] 0: no snow or ice, 1: snow or ice - :code:`SHADOW`: Band 3 Shadow map [0, 1] 0: no shadow, 1: shadow - :code:`LIGHT_HAZE`: Band 4 Light haze map [0, 1] 0: no light haze, 1: light haze - :code:`HEAVY_HAZE`: Band 5 Heavy haze map [0, 1] 0: no heavy haze, 1: heavy haze - :code:`CLOUD`: Band 6 Cloud map [0, 1] 0: no cloud, 1: cloud - :code:`CONFIDENCE`: Band 7 Confidence map [0-100] %age value: per-pixel algorithmic confidence in classif - :code:`UNUSABLE`: Band 8 Unusable pixels -- Equivalent to the UDM asset .. code-block:: python >>> from eoreader.bands import * >>> from eoreader.reader import Reader >>> path = r"SENTINEL2B_20190401-105726-885_L2A_T31UEQ_D_V2-0.zip" >>> prod = Reader().open(path) >>> prod.open_mask("EDG", GREEN) array([[[0, ..., 0]]], dtype=uint8) Args: mask_id: Mask ID resolution (float): Band resolution in meters size (Union[tuple, list]): Size of the array (width, height). Not used if resolution is provided. Returns: Union[xarray.DataArray, None]: Mask array """ band_mapping = { "CLEAR": 1, "SNOW": 2, "SHADOW": 3, "LIGHT_HAZE": 4, "HEAVY_HAZE": 5, "CLOUD": 6, "CONFIDENCE": 7, "UNUSABLE": 8, } assert mask_id in band_mapping mask_path = self._get_path("udm2", "tif") # Open mask band mask = utils.read( mask_path, resolution=resolution, size=size, resampling=Resampling.nearest, # Nearest to keep the flags masked=False, indexes=[band_mapping[mask_id]], ) return mask.astype(np.uint8)
def _load_nodata( self, resolution: float = None, size: Union[list, tuple] = None, ) -> Union[xarray.DataArray, None]: """ Load nodata (unimaged pixels) as a numpy array. See `here <https://earth.esa.int/eogateway/documents/20142/37627/Planet-combined-imagery-product-specs-2020.pdf>`_ (unusable data mask) for more information. Args: resolution (float): Band resolution in meters size (Union[tuple, list]): Size of the array (width, height). Not used if resolution is provided. Returns: Union[xarray.DataArray, None]: Nodata array """ udm = self.open_mask("UNUSABLE", resolution, size) nodata = udm.copy(data=rasters.read_bit_array(udm.compute(), 0)) return nodata.rename("NODATA") def _get_path(self, filename: str, extension: str, invalid_lookahead=None) -> str: """ Get either the archived path of the normal path of an asset Args: filename (str): Filename with wildcards extension (str): Extension Returns: str: Path """ path = "" try: if self.is_archived: if invalid_lookahead: regex = rf".*{filename}(?!{invalid_lookahead})\w*[_]*\.{extension}" else: regex = rf".*{filename}\w*[_]*\.{extension}" path = files.get_archived_rio_path(self.path, regex) else: paths = list(self.path.glob(f"**/*{filename}*.{extension}")) if invalid_lookahead: paths = [ path for path in paths if invalid_lookahead not in str(path) ] path = paths[0] except (FileNotFoundError, IndexError): LOGGER.warning( f"No file corresponding to *{filename}*.{extension} found in {self.path}" ) return path