import os
import re
from statistics import mean
from datetime import datetime, timezone
import pystac
from pystac.extensions.sar import SarExtension, FrequencyBand, Polarization, ObservationDirection
from pystac.extensions.sat import SatExtension, OrbitState
from pystac.extensions.projection import ProjectionExtension
from pystac.extensions.view import ViewExtension
from pystac.extensions.mgrs import MgrsExtension
from pystac.extensions.file import FileExtension, ByteOrder
from pystac.extensions.raster import RasterExtension, RasterBand, DataType
from pystac.extensions.classification import ClassificationExtension, Classification
from spatialist import Raster
from spatialist.ancillary import finder
from cesard.metadata.mapping import ASSET_MAP
from cesard.metadata.extract import get_header_size, evaluate_types
from cesard.ancillary import compute_hash
from typing import Any
import logging
log = logging.getLogger('cesard')
[docs]
def parse(
meta: dict[str, dict[str, Any]],
target: str,
assets: list[str],
exist_ok: bool = False
) -> None:
"""
Wrapper for :func:`~cesard.metadata.stac.source_json` and :func:`~cesard.metadata.stac.product_json`.
Parameters
----------
meta:
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
target:
A path pointing to the root directory of a product scene.
assets:
List of paths to all GeoTIFF and VRT assets of the currently processed ARD product.
exist_ok:
Do not create files if they already exist?
"""
evaluate_types(meta)
source_json(meta=meta, target=target, exist_ok=exist_ok)
product_json(meta=meta, target=target, assets=assets, exist_ok=exist_ok)
[docs]
def source_json(
meta: dict[str, dict[str, Any]],
target: str,
exist_ok: bool = False
) -> None:
"""
Function to generate source-level metadata for an ARD product in STAC compliant JSON format.
Parameters
----------
meta:
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
target:
A path pointing to the root directory of a product scene.
exist_ok:
Do not create files if they already exist?
"""
metadir = os.path.join(target, 'source')
for uid in list(meta['source'].keys()):
scene = os.path.splitext(os.path.basename(meta['source'][uid]['filename']))[0]
outname = os.path.join(metadir, '{}.json'.format(scene))
if os.path.isfile(outname) and exist_ok:
continue
log.info(f'creating {os.path.relpath(outname, target)}')
start = meta['source'][uid]['timeStart']
stop = meta['source'][uid]['timeStop']
date = start + (stop - start) / 2
#################################################################################
# Initialise STAC item
item = pystac.Item(id=scene,
geometry=meta['source'][uid]['geom_stac_geometry_4326'],
bbox=meta['source'][uid]['geom_stac_bbox_4326'],
datetime=date,
properties={})
#################################################################################
# Add common metadata
item.common_metadata.start_datetime = start
item.common_metadata.end_datetime = stop
item.common_metadata.created = meta['source'][uid]['processingDate']
item.common_metadata.instruments = [meta['common']['instrumentShortName'].lower()]
item.common_metadata.constellation = meta['common']['constellation']
item.common_metadata.platform = meta['common']['platformFullname']
#################################################################################
# Add properties
# prepare values
values = {}
for key in ['rangeResolution', 'rangePixelSpacing', 'rangeNumberOfLooks',
'azimuthResolution', 'azimuthPixelSpacing', 'azimuthNumberOfLooks']:
value = meta['source'][uid][key]
values[key] = mean(value.values()) if isinstance(value, dict) else value
for key in ['rangeLookBandwidth', 'azimuthLookBandwidth']:
if hasattr(meta['source'][uid], key):
value = meta['source'][uid][key]
if isinstance(value, dict):
value = {k: v / 1e9 for k, v in value.items()} # GHz
if isinstance(value, (float, int)):
value = value / 1e9
values[key] = value
####################################
# sat extension properties
sat_ext = SatExtension.ext(item, add_if_missing=True)
sat_ext.apply(orbit_state=OrbitState[meta['common']['orbitDirection'].upper()],
relative_orbit=meta['common']['orbitNumber_rel'],
absolute_orbit=meta['common']['orbitNumber_abs'],
anx_datetime=meta['source'][uid]['ascendingNodeDate'])
####################################
# sar extension properties
sar_ext = SarExtension.ext(item, add_if_missing=True)
sar_ext.apply(instrument_mode=meta['common']['operationalMode'],
frequency_band=FrequencyBand[meta['common']['radarBand'].upper()],
polarizations=[Polarization[pol] for pol in meta['common']['polarisationChannels']],
product_type=meta['source'][uid]['productType'],
center_frequency=float(meta['common']['radarCenterFreq'] / 1e9),
resolution_range=values['rangeResolution'],
resolution_azimuth=values['azimuthResolution'],
pixel_spacing_range=values['rangePixelSpacing'],
pixel_spacing_azimuth=values['azimuthPixelSpacing'],
looks_range=values['rangeNumberOfLooks'],
looks_azimuth=values['azimuthNumberOfLooks'],
looks_equivalent_number=meta['source'][uid]['perfEquivalentNumberOfLooks'],
observation_direction=ObservationDirection[meta['common']['antennaLookDirection']])
####################################
# view extension properties
view_ext = ViewExtension.ext(item, add_if_missing=True)
view_ext.apply(incidence_angle=meta['source'][uid]['incidenceAngleMidSwath'],
azimuth=meta['source'][uid]['instrumentAzimuthAngle'])
####################################
# processing extension properties
item.stac_extensions.append('https://stac-extensions.github.io/processing/v1.1.0/schema.json')
item.properties['processing:facility'] = meta['source'][uid]['processingCenter']
item.properties['processing:software'] = meta['source'][uid]['processorVersion']
item.properties['processing:level'] = meta['common']['processingLevel']
####################################
# card4l extension properties
item.stac_extensions.append('https://stac-extensions.github.io/card4l/v0.1.0/sar/source.json')
item.properties['card4l:specification'] = meta['prod']['productName-short']
item.properties['card4l:specification_version'] = meta['prod']['card4l-version']
item.properties['card4l:beam_id'] = meta['common']['swathIdentifier']
item.properties['card4l:orbit_mean_altitude'] = float(meta['common']['orbitMeanAltitude'])
proc_param = {'lut_applied': meta['source'][uid]['lutApplied']}
pairs = [('range_look_bandwidth', 'rangeLookBandwidth'),
('azimuth_look_bandwidth', 'azimuthLookBandwidth')]
for key_dst, key_src in pairs:
if hasattr(values, key_src):
proc_param[key_dst] = values[key_src]
item.properties['card4l:source_processing_parameters'] = proc_param
keys = [
('card4l:incidence_angle_far_range', 'incidenceAngleMax'),
('card4l:incidence_angle_near_range', 'incidenceAngleMin'),
('card4l:integrated_sidelobe_ratio', 'perfIntegratedSideLobeRatio'),
('card4l:ionosphere_indicator', 'ionosphereIndicator'),
('card4l:mean_faraday_rotation_angle', 'faradayMeanRotationAngle'),
('card4l:noise_equivalent_intensity', 'perfEstimates'),
('card4l:noise_equivalent_intensity_type', 'perfNoiseEquivalentIntensityType'),
('card4l:orbit_data_source', 'orbitDataSource'),
('card4l:peak_sidelobe_ratio', 'perfPeakSideLobeRatio'),
('card4l:resolution_range', 'rangeResolution'),
('card4l:resolution_azimuth', 'azimuthResolution'),
('card4l:source_geometry', 'dataGeometry')
]
for key_dst, key_src in keys:
val = meta['source'][uid][key_src]
if val is not None:
item.properties[key_dst] = val
#################################################################################
# Add links
links = [
{'rel': 'card4l-document',
'target': meta['prod']['card4l-link'].replace('.pdf', '.docx'),
'media_type': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'title': f'CARD4L Product Family Specification: '
f'{meta["prod"]["productName"]} (v{meta["prod"]["card4l-version"]})'},
{'rel': 'card4l-document',
'target': meta['prod']['card4l-link'],
'media_type': 'application/pdf',
'title': f'CARD4L Product Family Specification: '
f'{meta["prod"]["productName"]} (v{meta["prod"]["card4l-version"]})'},
{'rel': 'about',
'target': meta['source'][uid]['doi'],
'media_type': None,
'title': 'Product definition reference.'},
{'rel': 'access',
'target': meta['source'][uid]['access'],
'media_type': None,
'title': 'Product data access.'},
{'rel': 'satellite',
'target': meta['common']['platformReference'],
'media_type': None,
'title': 'CEOS Missions, Instruments and Measurements Database record'},
{'rel': 'state-vectors',
'target': meta['source'][uid]['orbitStateVector'],
'media_type': None,
'title': 'Orbit data file containing state vectors.'},
{'rel': 'sensor-calibration',
'target': meta['source'][uid]['sensorCalibration'],
'media_type': None,
'title': 'Reference describing sensor calibration parameters.'},
{'rel': 'pol-cal-matrices',
'target': meta['source'][uid]['polCalMatrices'],
'media_type': None,
'title': 'Reference to the complex-valued polarimetric distortion matrices.'},
{'rel': 'referenced-faraday-rotation',
'target': meta['source'][uid]['faradayRotationReference'],
'media_type': None,
'title': 'Reference describing the method used to derive the estimate '
'for the mean Faraday rotation angle.'}]
for link in links:
if link['target'] is not None:
item.add_link(link=pystac.Link(**link))
#################################################################################
# Add assets
relpath = os.path.relpath(outname.replace('.json', '.xml'), metadir)
xml_relpath = './' + relpath.replace('\\', '/')
item.add_asset(key='card4l',
asset=pystac.Asset(href=xml_relpath,
title='Metadata in XML format.',
media_type=pystac.MediaType.XML,
roles=['metadata', 'card4l']))
_asset_add_orig_src(metadir=metadir, uid=uid, item=item)
#################################################################################
# (validate) and save the result
# item.validate() # for later
item.save_object(dest_href=outname)
def _asset_add_orig_src(
metadir: str,
uid: str,
item: pystac.Item
) -> None:
"""
Helper function to add the original source metadata files as assets to a STAC item.
Parameters
----------
metadir:
Source directory of the current ARD product.
uid:
Unique identifier of a source scene.
item:
The pystac.Item to add the assets to.
"""
pattern = r'^(.+?)-\d{8}t\d{6}-\d{8}t\d{6}-\w+-\w+-\d{3}\.xml$'
prefixes = {'calibration': 'Calibration metadata',
'noise': 'Estimated thermal noise look-up tables',
'rfi': 'Radio Frequency Interference metadata'}
root_dir = os.path.join(metadir, uid)
if not os.path.isdir(root_dir):
return
file_list = finder(target=root_dir, matchlist=['*.safe', '*.xml'], foldermode=0)
if len(file_list) > 0:
for file in file_list:
basename = os.path.basename(file)
href = './' + os.path.relpath(file, metadir).replace('\\', '/')
if basename == 'manifest.safe':
key = 'manifest'
title = 'Mandatory product metadata'
else:
try:
key = re.match(pattern, basename).group(1)
except AttributeError:
raise RuntimeError(
'Unexpected file in original source metadata directory: ' + os.path.join(root_dir, file))
title = prefixes.get(key.split('-')[0], 'Measurement metadata')
if title == 'Measurement metadata':
title = title + ' ({},{})'.format(key.split('-')[1].upper(), key.split('-')[3].upper())
else:
title = title + ' ({},{})'.format(key.split('-')[2].upper(), key.split('-')[4].upper())
item.add_asset(key=key,
asset=pystac.Asset(href=href,
title=title,
media_type=pystac.MediaType.XML,
roles=['metadata']))
[docs]
def product_json(
meta: dict[str, dict[str, Any]],
target: str,
assets: list[str],
exist_ok: bool = False
) -> None:
"""
Function to generate product-level metadata for an ARD product in STAC compliant JSON format.
Parameters
----------
meta:
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
target:
A path pointing to the root directory of a product scene.
assets:
List of paths to all GeoTIFF and VRT assets of the currently processed ARD product.
exist_ok:
Do not create files if they already exist?
"""
scene_id = os.path.basename(target)
outname = os.path.join(target, '{}.json'.format(scene_id))
if os.path.isfile(outname) and exist_ok:
return
log.info(f'creating {os.path.relpath(outname, target)}')
start = meta['prod']['timeStart']
stop = meta['prod']['timeStop']
date = start + (stop - start) / 2
mgrs = meta['prod']['mgrsID']
# Initialise STAC item
item = pystac.Item(id=scene_id,
geometry=meta['prod']['geom_stac_geometry_4326'],
bbox=meta['prod']['geom_stac_bbox_4326'],
datetime=date,
properties={})
# Add common metadata
item.common_metadata.license = meta['prod']['licence']
item.common_metadata.start_datetime = start
item.common_metadata.end_datetime = stop
item.common_metadata.created = meta['prod']['timeCreated']
item.common_metadata.instruments = [meta['common']['instrumentShortName'].lower()]
item.common_metadata.constellation = meta['common']['constellation']
item.common_metadata.platform = meta['common']['platformFullname']
item.common_metadata.gsd = float(meta['prod']['pxSpacingColumn'])
# Initialise STAC extensions for properties
sar_ext = SarExtension.ext(item, add_if_missing=True)
sat_ext = SatExtension.ext(item, add_if_missing=True)
proj_ext = ProjectionExtension.ext(item, add_if_missing=True)
mgrs_ext = MgrsExtension.ext(item, add_if_missing=True)
item.stac_extensions.append('https://stac-extensions.github.io/processing/v1.1.0/schema.json')
item.stac_extensions.append('https://stac-extensions.github.io/card4l/v0.1.0/sar/product.json')
#################################################################################
# Add properties
sat_ext.apply(orbit_state=OrbitState[meta['common']['orbitDirection'].upper()],
relative_orbit=meta['common']['orbitNumber_rel'],
absolute_orbit=meta['common']['orbitNumber_abs'])
sar_ext.apply(instrument_mode=meta['common']['operationalMode'],
frequency_band=FrequencyBand[meta['common']['radarBand'].upper()],
polarizations=[Polarization[pol] for pol in meta['common']['polarisationChannels']],
product_type=meta['prod']['productName-short'],
looks_range=meta['prod']['rangeNumberOfLooks'],
looks_azimuth=meta['prod']['azimuthNumberOfLooks'],
looks_equivalent_number=meta['prod']['equivalentNumberOfLooks'])
proj_ext.apply(epsg=int(meta['prod']['crsEPSG']),
wkt2=meta['prod']['crsWKT'],
bbox=meta['prod']['geom_stac_bbox_native'],
shape=[int(meta['prod']['numPixelsPerLine']), int(meta['prod']['numLines'])],
transform=meta['prod']['transform'])
mgrs_ext.apply(latitude_band=mgrs[2:3],
grid_square=mgrs[3:],
utm_zone=int(mgrs[:2]))
if meta['prod']['processingCenter'] is not None:
item.properties['processing:facility'] = meta['prod']['processingCenter']
item.properties['processing:software'] = meta['prod']['processorVersion']
item.properties['processing:level'] = meta['common']['processingLevel']
item.properties['card4l:specification'] = meta['prod']['productName-short']
item.properties['card4l:specification_version'] = meta['prod']['card4l-version']
item.properties['card4l:beam_id'] = meta['common']['swathIdentifier']
item.properties['card4l:measurement_type'] = meta['prod']['backscatterMeasurement']
item.properties['card4l:measurement_convention'] = meta['prod']['backscatterConvention']
item.properties['card4l:pixel_coordinate_convention'] = meta['prod']['pixelCoordinateConvention']
item.properties['card4l:speckle_filtering'] = meta['prod']['speckleFilterApplied']
item.properties['card4l:noise_removal_applied'] = meta['prod']['noiseRemovalApplied']
item.properties['card4l:conversion_eq'] = meta['prod']['backscatterConversionEq']
item.properties['card4l:relative_radiometric_accuracy'] = float(meta['prod']['radiometricAccuracyRelative'])
item.properties['card4l:absolute_radiometric_accuracy'] = float(meta['prod']['radiometricAccuracyAbsolute'])
item.properties['card4l:resampling_method'] = meta['prod']['geoCorrResamplingMethod']
item.properties['card4l:dem_resampling_method'] = meta['prod']['demResamplingMethod']
item.properties['card4l:egm_resampling_method'] = meta['prod']['demEGMResamplingMethod']
item.properties['card4l:gridding_convention'] = 'Sentinel-2 MGRS'
item.properties['card4l:geometric_accuracy_type'] = meta['prod']['geoCorrAccuracyType']
for x in ['Northern', 'Eastern']:
key = ['geoCorrAccuracy{}{}'.format(x, y) for y in ['STDev', 'Bias']]
stddev = float(meta['prod'][key[0]])
bias = float(meta['prod'][key[1]])
item.properties['card4l:{}_geometric_accuracy'.format(x.lower())] = {'bias': bias, 'stddev': stddev}
item.properties['card4l:geometric_accuracy_radial_rmse'] = float(meta['prod']['geoCorrAccuracy_rRMSE'])
#################################################################################
# Add links
links = [
{'rel': 'card4l-document',
'target': meta['prod']['card4l-link'].replace('.pdf', '.docx'),
'media_type': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'title': f'CARD4L Product Family Specification: '
f'{meta["prod"]["productName"]} (v{meta["prod"]["card4l-version"]})'},
{'rel': 'card4l-document',
'target': meta['prod']['card4l-link'],
'media_type': 'application/pdf',
'title': f'CARD4L Product Family Specification: '
f'{meta["prod"]["productName"]} (v{meta["prod"]["card4l-version"]})'},
{"rel": "about",
"target": meta["prod"]["doi"],
"title": "Product definition reference.",
"media_type": None},
{"rel": "access",
"target": meta["prod"]["access"],
"title": "Product data access",
"media_type": None},
{"rel": "related",
"target": meta["prod"]["ancillaryData_KML"],
"title": "Sentinel-2 Military Grid Reference System (MGRS) tiling "
"grid file used as auxiliary data during processing",
"media_type": None},
{"rel": "noise-removal",
"target": meta["prod"]["noiseRemovalAlgorithm"],
"title": "Reference to the noise removal algorithm details",
"media_type": None},
{"rel": "radiometric-terrain-correction",
"target": meta["prod"]["RTCAlgorithm"],
"title": "Reference to the Radiometric Terrain Correction algorithm details",
"media_type": None},
{"rel": "radiometric-accuracy",
"target": meta["prod"]["radiometricAccuracyReference"],
"title": "Reference describing the radiometric uncertainty of the product",
"media_type": None},
{"rel": "geometric-correction",
"target": meta["prod"]["geoCorrAlgorithm"],
"title": "Reference to the Geometric Correction algorithm details",
"media_type": None},
# only one type should be necessary, which is available via meta['prod']['demType']
# However, the validation fails, because the schema expects both
{"rel": "elevation-model",
"target": meta["prod"]["demReference"],
"title": f"Digital Elevation Model used as auxiliary data during processing: "
f"{meta['prod']['demName']}",
"media_type": None},
{"rel": "surface-model",
"target": meta["prod"]["demReference"],
"title": f"Digital Elevation Model used as auxiliary data during processing: "
f"{meta['prod']['demName']}",
"media_type": None},
{"rel": "earth-gravitational-model",
"target": meta["prod"]["demEGMReference"],
"title": "Reference to the Earth Gravitational Model (EGM) used for geometric correction",
"media_type": None},
{"rel": "geometric-accuracy",
"target": meta["prod"]["geoCorrAccuracyReference"],
"title": "Reference documenting the estimate of absolute localization error",
"media_type": None},
{"rel": "gridding-convention",
"target": meta["prod"]["griddingConventionURL"],
"title": "Reference describing the gridding convention used",
"media_type": None}
]
if meta['prod']['productName-short'] == 'ORB':
links.append(
{"rel": "wind-norm-reference",
"target": meta["prod"]["windNormReferenceModel"],
"title": "Reference to the model used to create the wind normalisation layer",
"media_type": None}
)
for src in list(meta['source'].keys()):
x = os.path.basename(meta['source'][src]['filename']).split('.')[0]
src_target = os.path.join('./source', '{}.json'.format(x)).replace('\\', '/')
links.append(
{'rel': 'derived_from',
'target': src_target,
'media_type': 'application/json',
'title': 'Source metadata formatted in STAC compliant JSON format'})
for link in links:
if link['target'] is not None:
item.add_link(link=pystac.Link(**link))
#################################################################################
# Add assets
assets = assets.copy()
xml = outname.replace('.json', '.xml')
if os.path.isfile(xml):
assets.append(xml)
assets_dict = {'measurement': {},
'annotation': {},
'metadata': {}}
for asset in assets:
relpath = './' + os.path.relpath(asset, target).replace('\\', '/')
size = os.path.getsize(asset)
hash = compute_hash(asset)
created = None
header_size = None
media_type = pystac.MediaType.COG # COG, VRT
byte_order = None
if asset.endswith('.tif'):
with Raster(asset) as ras:
nodata = ras.nodata
created = datetime.fromtimestamp(os.path.getctime(asset), tz=timezone.utc).isoformat()
header_size = get_header_size(tif=asset)
byte_order = ByteOrder.LITTLE_ENDIAN
if 'measurement' in asset:
key, title = _asset_get_key_title(meta=meta, asset=asset)
stac_asset = pystac.Asset(href=relpath,
title=title,
media_type=media_type,
roles=['backscatter', 'data'],
extra_fields=None)
if asset.endswith('.tif'):
stac_asset.extra_fields = {'created': created,
'card4l:border_pixels': meta['prod']['numBorderPixels']}
_asset_handle_raster_ext(stac_asset=stac_asset, nodata=nodata)
file_ext = FileExtension.ext(stac_asset)
file_ext.apply(byte_order=byte_order, size=size,
header_size=header_size, checksum=hash)
assets_dict['measurement'][key] = stac_asset
elif 'annotation' in asset:
key, title = _asset_get_key_title(meta=meta, asset=asset)
if key == 'np':
pol = re.search('-[vh]{2}', relpath).group().removeprefix('-')
asset_key = 'noise-power-{}'.format(pol)
title = "{} {}".format(title, pol.upper())
else:
asset_key = ASSET_MAP[key]['role']
if ASSET_MAP[key]['unit'] is None:
ASSET_MAP[key]['unit'] = 'unitless'
stac_asset = pystac.Asset(href=relpath,
title=title,
media_type=media_type,
roles=[ASSET_MAP[key]['role'], 'metadata'],
extra_fields=None)
if key == '-ei.tif':
stac_asset.extra_fields = {'card4l:ellipsoidal_height': meta['prod']['ellipsoidalHeight']}
file_ext = FileExtension.ext(stac_asset)
file_ext.apply(byte_order=byte_order, size=size,
header_size=header_size, checksum=hash)
_asset_handle_raster_ext(stac_asset=stac_asset, nodata=nodata, key=key, meta=meta, asset=asset)
assets_dict['annotation'][asset_key] = stac_asset
else:
stac_asset = pystac.Asset(href=relpath,
title='Metadata in XML format.',
media_type=pystac.MediaType.XML,
roles=['metadata', 'card4l'])
file_ext = FileExtension.ext(stac_asset)
file_ext.apply(byte_order=byte_order, size=size,
header_size=header_size, checksum=hash)
assets_dict['metadata']['card4l'] = stac_asset
for category in ['measurement', 'annotation', 'metadata']:
for key in sorted(assets_dict[category].keys()):
item.add_asset(key=key, asset=assets_dict[category][key])
if any(x in item.get_assets().keys() for x in ['acquisition-id', 'data-mask']):
ClassificationExtension.add_to(item)
FileExtension.add_to(item)
RasterExtension.add_to(item)
# downgrade some extensions as required by the card4l extension
exts = list(item.stac_extensions)
item.stac_extensions = [
"https://stac-extensions.github.io/file/v2.0.0/schema.json"
if e.startswith("https://stac-extensions.github.io/file/")
else "https://stac-extensions.github.io/projection/v1.0.0/schema.json"
if e.startswith("https://stac-extensions.github.io/projection/")
else e
for e in exts
]
item.validate()
item.save_object(dest_href=outname)
def _asset_get_key_title(
meta: dict[str, dict[str, Any]],
asset: str
) -> tuple[str, str]:
"""
Helper function to handle creation of identifying key and title of a given asset.
Parameters
----------
meta:
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
asset:
Path to a GeoTIFF or VRT asset.
Returns
-------
The key identifying the asset and the generated title of the asset.
"""
key = None
title = None
if 'measurement' in asset:
title_dict = {'g': 'gamma nought', 's': 'sigma nought',
'lin': 'linear', 'log': 'logarithmic'}
pattern = ('(?P<key>(?P<pol>[vhc]{2})-'
'(?P<nought>[gs])-'
'(?P<scaling>lin|log)'
'(?P<windnorm>-wn|))')
info = re.search(pattern, asset).groupdict()
key = info['key']
if re.search('cc-[gs]-lin', key):
pols = meta['common']['polarisationChannels']
co = pols.pop(0) if pols[0][0] == pols[0][1] else pols.pop(1)
cross = pols[0]
title = 'RGB color composite (' \
'{co}-{nought}-lin, ' \
'{cross}-{nought}-lin, ' \
'{co}-{nought}-lin/{cross}-{nought}-lin)'
title = title.format(co=co.lower(),
cross=cross.lower(),
nought=info['nought'])
else:
if info['windnorm'] == '-wn':
skeleton = '{pol} {nought} {subtype} wind normalisation ratio, {scale} scaling'
else:
skeleton = '{pol} {nought} {subtype} backscatter, {scale} scaling'
subtype = 'RTC'
title = skeleton.format(pol=info['pol'].upper(),
nought=title_dict[info['nought']],
subtype=subtype,
scale=title_dict[info['scaling']])
elif 'annotation' in asset:
pattern = '(dm|ei|em|lc|ld|li|gs|id|np-[vh]{2}|sg|wm).tif'
key = re.search(pattern, asset).groups()[0][:2]
title = ASSET_MAP[key]['title']
return key, title
def _asset_handle_raster_ext(
stac_asset: pystac.Asset,
nodata: float,
key: str | None = None,
meta: dict[str, dict[str, Any]] | None = None,
asset: str | None = None
) -> None:
"""
Helper function to handle the STAC RasterExtension for a given pystac.Asset
Parameters
----------
stac_asset:
The pystac.Asset to set the RasterExtension for.
nodata:
The asset's nodata value.
key:
Key identifying the asset. Only necessary for annotation assets.
meta: dict
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
Only necessary for annotation assets.
asset:
Path to a GeoTIFF or VRT asset. Only necessary for annotation assets.
"""
raster_ext = RasterExtension.ext(stac_asset)
if 'measurement' in stac_asset.href:
raster_ext.bands = [RasterBand.create(nodata=nodata,
data_type=DataType.FLOAT32,
unit='natural')]
elif 'annotation' in stac_asset.href:
if key is None or meta is None or asset is None:
raise ValueError(f'`key`, `meta` and `asset` parameters need to be defined to handle RasterExtension '
f'for {stac_asset.href}')
if key == 'id':
src_list = [
os.path.basename(meta['source'][src]['filename']).replace('.SAFE', '').replace('.zip', '')
for src in list(meta['source'].keys())]
band = RasterBand.create(nodata=nodata,
data_type=DataType.UINT8,
unit=ASSET_MAP[key]['unit'])
class_ext = ClassificationExtension.ext(band)
class_ext.classes = [Classification.create(value=i + 1, description=j) for i, j in enumerate(src_list)]
raster_ext.bands = [band]
elif key == 'dm':
with Raster(asset) as dm_ras:
band_descr = [dm_ras.raster.GetRasterBand(band).GetDescription() for band in
range(1, dm_ras.bands + 1)]
samples = [x for x in band_descr if x in ASSET_MAP[key]['allowed']]
bands = []
for sample in samples:
band = RasterBand.create(nodata=nodata,
data_type=DataType.UINT8,
unit=ASSET_MAP[key]['unit'])
class_ext = ClassificationExtension.ext(band, add_if_missing=True)
class_ext.classes = [Classification.create(value=1, description=sample)]
bands.append(band)
raster_ext.bands = bands
else:
raster_ext.bands = [RasterBand.create(nodata=nodata,
data_type=DataType.FLOAT32,
unit=ASSET_MAP[key]['unit'])]
if key == 'em':
raster_ext.bands[0].spatial_resolution = int(meta['prod']['demGSD'].split()[0])