# Copyright 2026, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of eoreader project
# https://github.com/sertit/eoreader
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Aleph-1 (Satellogic) products https://developers.satellogic.com/imagery-products/introduction.html
"""
import logging
from datetime import datetime
from enum import unique
import geopandas as gpd
import numpy as np
import xarray as xr
from lxml import etree
from rasterio.enums import Resampling
from sertit import files, path
from sertit.misc import ListEnum
from sertit.types import AnyPathType
from eoreader import DATETIME_FMT, EOREADER_NAME, cache, utils
from eoreader.bands import (
ALL_CLOUDS,
BLUE,
CIRRUS,
CLOUDS,
GREEN,
NARROW_NIR,
NIR,
RAW_CLOUDS,
RED,
SHADOWS,
BandNames,
SpectralBand,
to_str,
)
from eoreader.bands.band_names import Aleph1MaskBandNames
from eoreader.exceptions import InvalidProductError
from eoreader.products import VhrProduct
from eoreader.products.optical.optical_product import RawUnits
from eoreader.stac import GSD, ID, NAME, WV_MAX, WV_MIN
from eoreader.utils import qck_wrapper, simplify
LOGGER = logging.getLogger(EOREADER_NAME)
[docs]
@unique
class Aleph1ProductType(ListEnum):
"""Aleph-1 (Satellogic) product types (processing levels)"""
L0 = "Raw"
"""
The L0 Product is the rawest product from Satellogic’s products family.
It is composed by the raw frames coming directly from the sensor of the payloads without any modification or processing, along with the necessary metadata.
The frames are simple packaged in a standardized format to create the L0 product. The units of each raster are in DNs directly from the sensor.
See: https://developers.satellogic.com/imagery-products/l0.html
**Not handled by EOReader**
"""
L1A = "Raw corrected"
"""
The L1A Product is a raw, but corrected product from Satellogic’s products family.
It is composed by the raw frames that have been radiometrically corrected along with the necessary metadata.
The algorithms applied to the L0 data to convert it into L1A are:
- Artifacts removals
- PSF Deconvolution
- Radiometric correction to Top of the atosmphere units
- HDR (combining 2 L0 frames with different exposures, only on Mark V)
- Stray light correction
- NO-data and cloud mask calculation
See: https://developers.satellogic.com/imagery-products/l1a.html
**Not handled by EOReader**
"""
L1B = "L1 Basic"
"""
The L1 Basic (aka L1B and QuickView) product is a 4-band (RGB and Near infrared) product characterised by its low processing time and that can be generated from Mark IV and Mark V satellites raw imagery.
It is geolocated and projected to the ground but not orthorectified and it is presented at native resolution.
This implies that different captures may have different pixel sizes depending on the altitude and the off-nadir angle of the satellite at capture time.
This imagery product is currently used in Rapid Response products, as well as the base collection to browse the archive.
See: https://developers.satellogic.com/imagery-products/l1basic.html
✅ **Handled by EOReader**
"""
L1C = "Ortho ready"
"""
The Ortho Ready (L1C) product is a 4-band imagery dataset designed for users who wish to perform their own orthorectification using a Digital Elevation Model (DEM) of their choice.
The 16-bit, 4-band rasters (visual and near-infrared) are provided with Rational Polynomial Coefficients (RPCs).
By combining the imagery, the RPCs, and a DEM, users can generate their own geometrically corrected, orthorectified images.
See: https://developers.satellogic.com/imagery-products/ortho_ready.html
**Not handled by EOReader**
"""
L1D = "Ortho"
"""
The L1D and L1D_SR imagery products are 4-band (RGB and Near infrared) product designed for accuracy and best of class image quality.
It is delivered to customers after going through radiometric and geometric correction process.
See: https://developers.satellogic.com/imagery-products/ortho.html
✅ **Handled by EOReader**
"""
L1D_SR = "Ortho Super Resolution"
"""
The L1D and L1D_SR imagery products are 4-band (RGB and Near infrared) product designed for accuracy and best of class image quality.
It is delivered to customers after going through radiometric and geometric correction process.
See: https://developers.satellogic.com/imagery-products/ortho.html
✅ **Handled by EOReader**
"""
[docs]
@unique
class Aleph1Instrument(ListEnum):
"""Aleph-1 (Satellogic) instrument
See `Satellogic documentation <https://developers.satellogic.com/imagery-products/msi_payload_specifications.html>`_ for more information.
"""
MARKIV = "MarkIV"
"""
Mark IV satellite generation
"""
MARKV = "MarkV"
"""
Mark V satellite generation
"""
UNKOWN = "Unknown"
"""
Missing instrument in mtd (older versions)
"""
[docs]
class Aleph1Product(VhrProduct):
"""
Class for Aleph-1 (Satellogic) products
"""
def _pre_init(self, **kwargs) -> None:
"""
Function used to pre_init the products
(setting needs_extraction and so on)
"""
self._raw_units = RawUnits.REFL
self._has_cloud_cover = True
self._use_filename = True
self.needs_extraction = False
# Pre init done by the super class
super()._pre_init(**kwargs)
def _set_pixel_size(self) -> None:
"""
Set product default pixel size (in meters)
"""
# Get MTD XML file
root, _ = self.read_mtd()
# Open identifier
gsd = root.findtext(".//gsd")
if not gsd:
raise InvalidProductError("'gsd' not found in metadata!")
# GSD varies according to the product type
self.pixel_size = np.round(float(gsd), 2)
# Images are captured with approximately 1m native resolution at nadir for all spectral bands
# https://developers.satellogic.com/imagery-products/msi_payload_specifications.html#spatial-resolution
self.resolution = 1.0
[docs]
@cache
def extent(self) -> gpd.GeoDataFrame:
"""
Get UTM extent of the tile, managing the case with not orthorectified bands.
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = "S3B_SL_1_RBT____20191115T233722_20191115T234022_20191117T031722_0179_032_144_3420_LN2_O_NT_003.SEN3"
>>> prod = Reader().open(path)
>>> prod.utm_extent()
geometry
0 POLYGON ((1488846.028 6121896.451, 1488846.028...
Returns:
gpd.GeoDataFrame: Extent in UTM
"""
extent = self.footprint().copy()
extent.geometry = extent.envelope
return extent
def _set_product_type(self) -> None:
"""
Set product type.
"""
# Processing level
prod_type = "L1D_SR" if "L1D_SR" in self.name else self.split_name[3]
try:
self.product_type = getattr(Aleph1ProductType, prod_type)
except AttributeError as exc:
raise InvalidProductError(
f"Product type '{prod_type}' not handled by EOReader."
) from exc
if self.product_type in [
Aleph1ProductType.L0,
Aleph1ProductType.L1A,
Aleph1ProductType.L1C,
]:
raise NotImplementedError(
f"Satellogic {prod_type} products are not handled by EOReader yet."
)
def _set_instrument(self) -> None:
"""
Set instrument
"""
# Get MTD XML file
root, _ = self.read_mtd()
# Open identifier
instrument = root.findtext(".//satellite_generation")
if not instrument:
version = root.findtext(".//product_version")
LOGGER.warning(
f"'satellite_generation' not found in metadata (maybe old product version: {version}). 'Unknown' instrument set by default."
)
instrument = "Unknown"
self.instrument = Aleph1Instrument.from_value(instrument)
def _map_bands(self) -> None:
"""
Map bands
"""
blue = SpectralBand(
eoreader_name=BLUE,
**{NAME: "BLUE", ID: 1, GSD: self.pixel_size, WV_MIN: 450, WV_MAX: 510},
)
green = SpectralBand(
eoreader_name=GREEN,
**{NAME: "GREEN", ID: 2, GSD: self.pixel_size, WV_MIN: 510, WV_MAX: 580},
)
red = SpectralBand(
eoreader_name=RED,
**{NAME: "RED", ID: 3, GSD: self.pixel_size, WV_MIN: 590, WV_MAX: 690},
)
nir = SpectralBand(
eoreader_name=NIR,
**{NAME: "NIR", ID: 4, GSD: self.pixel_size, WV_MIN: 750, WV_MAX: 900},
)
self.bands.map_bands(
{
BLUE: blue.update(id=3),
GREEN: green.update(id=2),
RED: red.update(id=1),
NARROW_NIR: nir.update(id=4),
NIR: nir.update(id=4),
}
)
[docs]
def get_datetime(self, as_datetime: bool = False) -> str | datetime:
"""
Get the product's acquisition datetime, with format :code:`YYYYMMDDTHHMMSS` <-> :code:`%Y%m%dT%H%M%S`
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = r"LC08_L1GT_023030_20200518_20200527_01_T2"
>>> prod = Reader().open(path)
>>> prod.get_datetime(as_datetime=True)
datetime.datetime(2020, 5, 18, 16, 34, 7)
>>> prod.get_datetime(as_datetime=False)
'20200518T163407'
Args:
as_datetime (bool): Return the date as a datetime.datetime. If false, returns a string.
Returns:
str | dt.datetime: Its acquisition datetime
"""
if self.datetime is None:
# Get MTD XML file
root, _ = self.read_mtd()
# Open datetime
acq_date = root.findtext(".//datetime")
if not acq_date:
raise InvalidProductError("'datetime' not found in metadata!")
# Convert to datetime
date = datetime.strptime(acq_date, "%Y-%m-%dT%H:%M:%S.%fZ")
else:
date = self.datetime
if not as_datetime:
date = date.strftime(DATETIME_FMT)
return date
def _get_name_constellation_specific(self) -> str:
"""
Set product real name from metadata
Returns:
str: True name of the product (from metadata)
"""
# Get MTD XML file
root, _ = self.read_mtd()
# Open identifier
return root.findtext(".//id")
[docs]
@cache
def get_mean_sun_angles(self) -> (float, float):
"""
Get Mean Sun angles (Azimuth and Zenith angles)
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = r"IMG_PHR1A_PMS_001"
>>> prod = Reader().open(path)
>>> prod.get_mean_sun_angles()
(45.6624568841367, 30.219881316357643)
Returns:
(float, float): Mean Azimuth and Zenith angle
"""
# Get MTD XML file
root, _ = self.read_mtd()
# Open zenith and azimuth angle
zenith_angle = 90 - float(root.findtext(".//sun_elevation"))
azimuth_angle = float(root.findtext(".//sun_azimuth"))
return azimuth_angle, zenith_angle
[docs]
@cache
def get_mean_viewing_angles(self) -> (float, float, float):
"""
Get Mean Viewing angles (azimuth, off-nadir and incidence angles)
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = r"S2A_MSIL1C_20200824T110631_N0209_R137_T30TTK_20200824T150432.SAFE.zip"
>>> prod = Reader().open(path)
>>> prod.get_mean_viewing_angles()
Returns:
(float, float, float): Mean azimuth, off-nadir and incidence angles
"""
# Get MTD XML file
root, _ = self.read_mtd()
# Open zenith and azimuth angle
try:
az = float(root.findtext(".//azimuth"))
off_nadir = float(root.findtext(".//off_nadir"))
incidence_angle = float(root.findtext(".//incidence_angle"))
except TypeError as exc:
raise InvalidProductError(
"azimuth, off_nadir or incidence_angle not found in metadata!"
) from exc
return az, off_nadir, incidence_angle
[docs]
@cache
def get_cloud_cover(self) -> float:
"""
Get cloud cover as given in the metadata
.. code-block:: python
>>> from eoreader.reader import Reader
>>> path = r"S2A_MSIL1C_20200824T110631_N0209_R137_T30TTK_20200824T150432.SAFE.zip"
>>> prod = Reader().open(path)
>>> prod.get_cloud_cover()
55.5
Returns:
float: Cloud cover as given in the metadata
"""
# Get MTD XML file
root, _ = self.read_mtd()
# Get the cloud cover
try:
try:
cc = float(root.findtext(".//cloud_cover"))
except TypeError as exc:
raise InvalidProductError(
"'cloud_cover' not found in metadata!"
) from exc
except (InvalidProductError, TypeError) as ex:
LOGGER.warning(ex)
cc = 0
return cc
def _to_reflectance(
self,
band_arr: xr.DataArray,
band_path: AnyPathType,
band: BandNames,
**kwargs,
) -> xr.DataArray:
"""
Converts band to reflectance
Args:
band_arr (xr.DataArray): Band array to convert
band_path (AnyPathType): Band path
band (BandNames): Band to read
**kwargs: Other keywords
Returns:
xr.DataArray: Band in reflectance
"""
# Delivered in uint16
# Compute the correct radiometry of the band for raw band
if path.get_filename(band_path).endswith("TOA"):
band_arr *= 0.0001
# To float32
if band_arr.dtype != np.float32:
band_arr = band_arr.astype(np.float32)
return band_arr
@cache
def _read_stac_mtd(self) -> gpd.GeoDataFrame:
"""Read STAC metadata (also geojson footprint of the image)"""
return self._read_vector("*_stac.geojson")
@cache
def _read_mtd_dict(self) -> dict:
"""Return metadata as a dict"""
glob_str = "*_stac.geojson"
if self.is_archived:
import json
regex = utils.convert_glob_to_regex(glob_str)
mtd = json.loads(self._read_archived_file(regex))
else:
try:
mtd = files.read_json(next(self.path.glob(glob_str)), print_file=False)
except StopIteration as exc:
raise FileNotFoundError(
f"Non existing file '{glob_str}' in {self.path}"
) from exc
return mtd
@cache
def _read_mtd(self) -> (etree._Element, dict):
"""
Read Satellogic metadata.
Returns:
(etree._Element, dict): Metadata XML root and its namespaces
"""
# MTD are JSON
try:
from dict2xml import dict2xml
mtd = self._read_mtd_dict()
# Sanitize STAC mtd (remove STAC prefixes with ':' that break XML keys)
def __sanitize_recursive(d):
for key in d.copy():
k = key.split(":")[-1]
d[k] = d.pop(key)
if isinstance(d[k], dict):
__sanitize_recursive(d[k])
mtd.pop("assets", None)
mtd.pop("links", None)
mtd.pop("stac_extensions", None)
mtd.pop("stac_version", None)
mtd.pop("type", None)
__sanitize_recursive(mtd)
root = etree.fromstring(dict2xml(mtd, wrap="all"))
except etree.XMLSyntaxError as exc:
raise InvalidProductError(
f"Cannot convert metadata to XML for {self.path}!"
) from exc
return root, {}
def _has_mask(self, mask: BandNames) -> bool:
"""
Can the specified mask be loaded from this product?
.. code-block:: python
>>> from eoreader.reader import Reader
>>> from eoreader.bands import *
>>> path = r"S2A_MSIL1C_20200824T110631_N0209_R137_T30TTK_20200824T150432.SAFE.zip"
>>> prod = Reader().open(path)
>>> prod.has_index(DETFOO)
True
Args:
mask (BandNames): Mask
Returns:
bool: True if the specified mask is provided by the current product
"""
return mask in [Aleph1MaskBandNames.CLOUD]
def _open_masks(
self,
bands: list,
pixel_size: float = None,
size: list | tuple = None,
**kwargs,
) -> dict:
"""
Open a list of mask files as xarrays.
Args:
bands (list): List of the wanted bands
pixel_size (int): Band pixel size in meters
size (tuple | list): Size of the array (width, height). Not used if pixel_size is provided.
kwargs: Additional arguments
Returns:
dict: Dictionary {band_name, band_xarray}
"""
band_dict = {}
for band in bands:
band_name = band.name
mask_path = self._get_path(band_name, "vrt")
band_arr = self._read_band(
mask_path,
band=band,
pixel_size=pixel_size,
size=size,
resampling=Resampling.nearest,
**kwargs,
)
band_arr.attrs["long_name"] = band_name
band_dict[band] = band_arr.rename(band_name)
return band_dict
def _has_cloud_band(self, band: BandNames) -> bool:
"""
Does this product has the specified cloud band?
"""
return band in list(self._get_clouds_thresh().keys()) + [RAW_CLOUDS]
@cache
def _get_clouds_thresh(self) -> dict:
"""Return cloud threshold according to the product version"""
latest_thresh = {ALL_CLOUDS: [2, 3], CIRRUS: 2, CLOUDS: 3}
# Get MTD XML file
root, _ = self.read_mtd()
# Open zenith and azimuth angle
version = root.findtext(".//product_version")
if version is None:
LOGGER.warning(
"'product_version' not found in metadata! Using latest cloud thresholds."
)
cloud_thresh = latest_thresh
else:
LOGGER.debug(f"Product version: {version}")
from packaging.version import Version
version = Version(version)
if self.product_type == Aleph1ProductType.L1B:
# https://developers.satellogic.com/imagery-products/l1basic.html#changelog
# [v1.9.0] 2025-11-17:
# Changed cloud mask codification, from [0: no_data, 1: valid_data, 128: shadow_data, 255: cloud] to [0: no_data, 1: clear, 2: haze, 3: cloud].
cloud_thresh = (
latest_thresh
if version >= Version("1.9.0")
else {SHADOWS: 128, CLOUDS: 255, ALL_CLOUDS: 255}
)
elif self.product_type == Aleph1ProductType.L1C:
# https://developers.satellogic.com/imagery-products/ortho_ready.html#changelog
# [v2.2.0] 17/11/2025:
# Changed cloud mask codification, from [0: no_data, 1: valid_data, 255: cloud] to [0: no_data, 1: clear, 2: haze, 3: cloud].
cloud_thresh = (
latest_thresh
if version >= Version("2.2.0")
else {CLOUDS: 255, ALL_CLOUDS: 255}
)
elif self.product_type in [Aleph1ProductType.L1D, Aleph1ProductType.L1D_SR]:
# https://developers.satellogic.com/imagery-products/ortho.html#changelog-l1d
# [v2.0.0] 17/11/2025:
# Changed cloud mask codification, from [0: no_data, 1: valid_data, 128: cloud] to [0: no_data, 1: clear, 2: haze, 3: cloud].
cloud_thresh = (
latest_thresh
if version >= Version("2.0.0")
else {CLOUDS: 128, ALL_CLOUDS: 128}
)
else:
raise NotImplementedError
return cloud_thresh
def _open_clouds(
self,
bands: list,
pixel_size: float = None,
size: list | tuple = None,
**kwargs,
) -> dict:
"""
Load cloud files as xarrays.
Args:
bands (list): List of the wanted bands
pixel_size (int): Band pixel size in meters
size (tuple | list): Size of the array (width, height). Not used if pixel_size is provided.
kwargs: Additional arguments
Returns:
dict: Dictionary {band_name, band_xarray}
"""
band_dict = {}
if bands:
cld_arr = self._open_masks(
[Aleph1MaskBandNames.CLOUD], pixel_size=pixel_size, size=size
)[Aleph1MaskBandNames.CLOUD]
cloud_thresh = self._get_clouds_thresh()
nodata = np.isnan(cld_arr.data)
for band in bands:
if band == RAW_CLOUDS:
cloud = cld_arr
else:
cloud_thresh_band = cloud_thresh[band]
if isinstance(cloud_thresh_band, list):
cond = np.isin(cld_arr, cloud_thresh_band)
else:
cond = cld_arr == cloud_thresh_band
cloud = self._create_mask(cld_arr, cond, nodata)
# Rename
band_name = to_str(band)[0]
cloud.attrs["long_name"] = band_name
band_dict[band] = cloud.rename(band_name).astype(np.float32)
return band_dict
def _get_tile_path(self, **kwargs) -> AnyPathType:
"""
Get the VHR tile path
Returns:
AnyPathType: VHR filepath
"""
return self._get_path("TOA", "vrt")
[docs]
@qck_wrapper
def get_quicklook_path(self) -> str:
"""
Get quicklook path if existing.
Returns:
str: Quicklook path
"""
return self._glob("*preview.png")
def _get_job_id(self) -> str:
"""
Get VHR job ID
Returns:
str: VHR product ID
"""
return self.split_name[-1]
def _get_condensed_name(self) -> str:
"""
Get VHR products condensed name ({date}_{constellation}_{product_type}_{job_id}).
Returns:
str: Condensed name
"""
return f"{self.get_datetime()}_{self.constellation.name}_{self.product_type.name}_{self._job_id}"