# -*- coding: utf-8 -*-
# Copyright 2023, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of eoreader project
# https://github.com/sertit/eoreader
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Capella products.
Take a look
`here <https://support.capellaspace.com/hc/en-us/categories/360002612692-SAR-Imagery-Products>`_.
"""
import logging
from datetime import datetime
from enum import unique
from pathlib import Path
from typing import Union
import geopandas as gpd
from affine import Affine
from cloudpathlib import CloudPath
from dicttoxml import dicttoxml
from lxml import etree
from rasterio import CRS, transform
from sertit import files, vectors
from sertit.misc import ListEnum
from sertit.vectors import WGS84
from shapely.geometry import Point, box
from eoreader import DATETIME_FMT, EOREADER_NAME, cache
from eoreader.bands import SarBandNames as sab
from eoreader.exceptions import InvalidProductError
from eoreader.products import SarProduct, SarProductType
from eoreader.products.product import OrbitDirection
LOGGER = logging.getLogger(EOREADER_NAME)
[docs]@unique
class CapellaProductType(ListEnum):
"""
Capella products types.
Take a look
`here <https://support.capellaspace.com/hc/en-us/articles/360039702691-SAR-Data-Formats>`_.
"""
SLC = "SLC"
"""
Single Look Complex (SLC)
- Contains both amplitude and phase of the radar signal
- Range-compressed and focused SAR image in slant-range geometry
- Georeferenced using orbit data and Range-Doppler projected
"""
GEC = "GEC"
"""
Geocoded Ellipsoid Corrected (GEC)
- Contains amplitude information only
- Range-compressed, detected, focused and multi-looked SAR image
- Multi-look techniques applied to enhance radiometric resolution
- Resampled and projected onto WGS84 ellipsoid with average scene center height
- Universal Transverse Mercator (UTM) and Universal Polar Stereographic (UPS) projections
"""
GEO = "GEO"
"""
Geocoded Terrain Corrected (GEO)
- Contains amplitude information only
- Range-compressed, detected, focused and multi-looked SAR image
- Multi-look techniques applied to enhance radiometric resolution
- Terrain-height corrected using a high-resolution Digital Elevation Model (DEM)
- Universal Transverse Mercator (UTM) and Universal Polar Stereographic (UPS) projections
"""
SICD = "SICD"
"""
Sensor Independent Complex Data (SICD)
- Contains both amplitude and phase of the radar signal
- Range-compressed and focused SAR image in slant-range geometry
- Sensor independent format
Not used by EOReader.
"""
SIDD = "SIDD"
"""
Sensor Independent Derived Data (SIDD)
- Contains amplitude information only
- Range-compressed, detected, focused and multi-looked SAR image
- Multi-look techniques applied to enhance radiometric resolution
- Planar Gridded Display (PGD) projection
- Sensor independent format
Not used by EOReader.
"""
CPHD = "CPHD"
"""
Compensated Phase History Data (CPHD)
- Contains raw phase history data that is compensated for hardware timing & platform motion
- Sensor independent format
- Only available to United States Government customers
Not used by EOReader.
"""
[docs]@unique
class CapellaSensorMode(ListEnum):
"""
Capella imaging mode.
Take a look
`here <https://support.capellaspace.com/hc/en-us/articles/360059224291-What-SAR-imagery-products-are-available-with-Capella->`_.
"""
SM = "stripmap"
"""Stripmap"""
SP = "spotlight"
"""Spotlight"""
SS = "sliding_spotlight"
"""Sliding Spotlight"""
[docs]class CapellaProduct(SarProduct):
"""
Class for Capella Products
Take a look
`here <https://support.capellaspace.com/hc/en-us/categories/360002612692-SAR-Imagery-Products>`_.
"""
[docs] def __init__(
self,
product_path: Union[str, CloudPath, Path],
archive_path: Union[str, CloudPath, Path] = None,
output_path: Union[str, CloudPath, Path] = None,
remove_tmp: bool = False,
**kwargs,
) -> None:
self._has_stac_mtd = False
# Initialization from the super class
super().__init__(product_path, archive_path, output_path, remove_tmp, **kwargs)
def _set_pixel_size(self) -> None:
"""
Set product default pixel size (in meters)
See here
`here <https://support.capellaspace.com/hc/en-us/articles/360059224291-What-SAR-imagery-products-are-available-with-Capella->`_
"""
# Using az resolution
if self.sensor_mode == CapellaSensorMode.SP:
def_pixel_size = 0.35
def_res = 0.5
elif self.sensor_mode == CapellaSensorMode.SM:
def_pixel_size = 0.6
def_res = 1.0
elif self.sensor_mode == CapellaSensorMode.SS:
def_pixel_size = 0.8
def_res = 1.2
else:
raise InvalidProductError(f"Unknown sensor mode: {self.sensor_mode}")
self.pixel_size = def_pixel_size
self.resolution = def_res
def _set_instrument(self) -> None:
"""
Set instrument
ICEYE: https://earth.esa.int/eogateway/missions/iceye
"""
self.instrument = "SAR X-band"
def _pre_init(self, **kwargs) -> None:
"""
Function used to pre_init the products
(setting needs_extraction and so on)
"""
self._band_folder = self.path
# SNAP cannot process its archive
self.needs_extraction = True
# Its original filename is its name
self._use_filename = True
# Post init done by the super class
super()._pre_init(**kwargs)
def _post_init(self, **kwargs) -> None:
"""
Function used to post_init the products
(setting product-type, band names and so on)
"""
# Private attributes
self.snap_filename = str(next(self.path.glob("*CAPELLA*.json")).name)
self._raw_band_regex = f"{self.name}.tif"
# Post init done by the super class
super()._post_init(**kwargs)
[docs] @cache
def wgs84_extent(self) -> gpd.GeoDataFrame:
"""
Get the WGS84 extent of the file before any reprojection.
This is useful when the SAR pre-process has not been done yet.
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = r"1011117-766193"
>>> prod = Reader().open(path)
>>> prod.wgs84_extent()
geometry
0 POLYGON ((108.09797 15.61011, 108.48224 15.678...
Returns:
gpd.GeoDataFrame: WGS84 extent as a gpd.GeoDataFrame
"""
if self._has_stac_mtd:
mtd_file = next(self.path.glob(f"{self.name}.json"))
extent = vectors.read(mtd_file)
else:
extent = None
root, _ = self.read_mtd()
# Get image size
height = int(root.findtext(".//rows"))
width = int(root.findtext(".//columns"))
# Investigate image geometry
img_geom = root.find(".//image_geometry")
geom_type = img_geom.findtext("type")
# Use given geotransform if existing
if geom_type == "geotransform":
tf = [
float(it.text)
for it in img_geom.find("geotransform").iterfind("item")
]
# TODO: manage not WKT case
crs = img_geom.find("coordinate_system").findtext("wkt")
if crs:
# Convert to rasterio
west, south, east, north = transform.array_bounds(
height, width, transform=Affine.from_gdal(*tf)
)
extent = gpd.GeoDataFrame(
geometry=[box(minx=west, miny=north, maxx=east, maxy=south)],
crs=CRS.from_string(crs),
)
# Use center pixel
if extent is None:
# Get center pixel point in UTM
center_pixel = root.find(".//center_pixel")
target_position = [
float(it.text)
for it in center_pixel.find("target_position").iterfind("item")
]
center_pix = gpd.GeoDataFrame(
geometry=[Point(target_position)],
crs={"proj": "geocent", "ellps": "WGS84", "datum": "WGS84"},
).to_crs(WGS84)
center_pix.to_crs(center_pix.extent_wgs84.estimate_utm_crs())
# Get pixel spacing in meters
pixel_spacing_h = float(root.findtext(".//pixel_spacing_row"))
pixel_spacing_w = float(root.findtext(".//pixel_spacing_column"))
# Compute offset from center of image
offset_h = pixel_spacing_h * height / 2
offset_w = pixel_spacing_w * width / 2
tl_corner = center_pix.translate(xoff=-offset_w, yoff=-offset_h)
br_corner = center_pix.translate(xoff=offset_w, yoff=offset_h)
extent = gpd.GeoDataFrame(
geometry=[
box(
minx=tl_corner.x.iat[0],
miny=tl_corner.y.iat[0],
maxx=br_corner.x.iat[0],
maxy=br_corner.y.iat[0],
)
],
crs=CRS.from_string(self.crs()),
)
return extent.to_crs(WGS84)
def _set_product_type(self) -> None:
"""Set products type"""
# Open identifier
prod_type = self.split_name[3]
self.product_type = getattr(CapellaProductType, prod_type)
if self.product_type in [CapellaProductType.GEO, CapellaProductType.GEC]:
self.sar_prod_type = SarProductType.GDRG
elif self.product_type == CapellaProductType.SLC:
self.sar_prod_type = SarProductType.CPLX
else:
raise NotImplementedError(
f"{self.product_type.value} product type is not handled by EOReader"
)
def _set_sensor_mode(self) -> None:
"""Get sensor mode"""
sensor_mode = self.split_name[2]
self.sensor_mode = getattr(CapellaSensorMode, sensor_mode)
[docs] def get_datetime(self, as_datetime: bool = False) -> Union[str, datetime]:
"""
Get the product's acquisition datetime, with format :code:`YYYYMMDDTHHMMSS` <-> :code:`%Y%m%dT%H%M%S`
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = r"1011117-766193"
>>> prod = Reader().open(path)
>>> prod.get_datetime(as_datetime=True)
datetime.datetime(2020, 10, 28, 22, 46, 25)
>>> prod.get_datetime(as_datetime=False)
'20201028T224625'
Args:
as_datetime (bool): Return the date as a datetime.datetime. If false, returns a string.
Returns:
Union[str, datetime.datetime]: Its acquisition datetime
"""
if self.datetime is None:
# Get MTD XML file
root, _ = self.read_mtd()
# Open identifier
if self._has_stac_mtd:
acq_date = root.findtext(".//datetime")
else:
acq_date = root.findtext(".//start_timestamp")
if not acq_date:
raise InvalidProductError(
"datetime or start_timestamp not found in metadata!"
)
# Convert to datetime (too many microseconds)
date = datetime.strptime(acq_date.split(".")[0], "%Y-%m-%dT%H:%M:%S")
else:
date = self.datetime
if not as_datetime:
date = date.strftime(DATETIME_FMT)
return date
def _get_name_constellation_specific(self) -> str:
"""
Set product real name from metadata
Returns:
str: True name of the product (from metadata)
"""
name = None
for file in self.path.glob("*.tif"):
if "preview" not in file.name:
name = files.get_filename(file)
return name
@cache
def _read_mtd(self) -> (etree._Element, dict):
"""
Read GeoJSON metadata and outputs its as a metadata XML root and its namespaces as an empty dict
Returns:
(etree._Element, dict): Metadata XML root and its namespaces as a dict
"""
# MTD are JSON
try:
try:
mtd_file = next(self.path.glob(f"{self.name}.json"))
self._has_stac_mtd = True
except StopIteration:
try:
LOGGER.warning(
f"Non available STAC metadata for the product {self.name}. Opening Extended Metadata instead."
)
mtd_file = next(self.path.glob(f"{self.name}*.json"))
self._has_stac_mtd = False
except StopIteration as ex:
raise InvalidProductError(
f"Metadata file not found in {self.path}"
) from ex
data = files.read_json(mtd_file, print_file=False)
root = etree.fromstring(dicttoxml(data))
except etree.XMLSyntaxError:
raise InvalidProductError(
f"Cannot convert metadata to XML for {self.path}!"
)
return root, {}
[docs] def get_raw_band_paths(self, **kwargs) -> dict:
"""
Return the existing path of the VV band (as they come with the archived products).
ICEYE product only contains a VV band !
Args:
**kwargs: Additional arguments
Returns:
dict: Dictionary containing the path of every band existing in the raw products
"""
band_paths = {}
try:
pol = sab.from_value(self.split_name[4])
band_paths[pol] = files.get_file_in_dir(
self._band_folder, self._raw_band_regex, exact_name=True, get_list=False
)
except FileNotFoundError:
raise InvalidProductError(
"An ICEYE product should at least contain one band !"
)
return band_paths
[docs] def get_quicklook_path(self) -> str:
"""
Get quicklook path if existing.
Returns:
str: Quicklook path
"""
quicklook_path = None
try:
quicklook_path = str(next(self.path.glob("*.png")))
except StopIteration:
LOGGER.warning(f"No quicklook found in {self.condensed_name}")
return quicklook_path
# @cache
[docs] def get_orbit_direction(self) -> OrbitDirection:
"""
Get cloud cover as given in the metadata
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = r"S2A_MSIL1C_20200824T110631_N0209_R137_T30TTK_20200824T150432.SAFE.zip"
>>> prod = Reader().open(path)
>>> prod.get_orbit_direction().value
"DESCENDING"
Returns:
OrbitDirection: Orbit direction (ASCENDING/DESCENDING)
"""
ob = None
if self._has_stac_mtd:
root, _ = self.read_mtd()
ob = root.findtext(".//key[@name='sat:orbit_state']")
ob = OrbitDirection.from_value(ob.upper())
if ob is None:
ob = super().get_orbit_direction()
return ob