Source code for cesard.metadata.stac

import os
import re
from statistics import mean
from datetime import datetime, timezone
import pystac
from pystac.extensions.sar import SarExtension, FrequencyBand, Polarization, ObservationDirection
from pystac.extensions.sat import SatExtension, OrbitState
from pystac.extensions.projection import ProjectionExtension
from pystac.extensions.view import ViewExtension
from pystac.extensions.mgrs import MgrsExtension
from pystac.extensions.file import FileExtension, ByteOrder
from pystac.extensions.raster import RasterExtension, RasterBand, DataType
from pystac.extensions.classification import ClassificationExtension, Classification
from spatialist import Raster
from spatialist.ancillary import finder
from cesard.metadata.mapping import ASSET_MAP
from cesard.metadata.extract import get_header_size, evaluate_types
from cesard.ancillary import compute_hash
from typing import Any
import logging

log = logging.getLogger('cesard')


[docs] def parse( meta: dict[str, dict[str, Any]], target: str, assets: list[str], exist_ok: bool = False ) -> None: """ Wrapper for :func:`~cesard.metadata.stac.source_json` and :func:`~cesard.metadata.stac.product_json`. Parameters ---------- meta: Metadata dictionary generated by the satellite-specific ARD packages, e.g. :func:`s1ard.metadata.extract.meta_dict`. target: A path pointing to the root directory of a product scene. assets: List of paths to all GeoTIFF and VRT assets of the currently processed ARD product. exist_ok: Do not create files if they already exist? """ evaluate_types(meta) source_json(meta=meta, target=target, exist_ok=exist_ok) product_json(meta=meta, target=target, assets=assets, exist_ok=exist_ok)
[docs] def source_json( meta: dict[str, dict[str, Any]], target: str, exist_ok: bool = False ) -> None: """ Function to generate source-level metadata for an ARD product in STAC compliant JSON format. Parameters ---------- meta: Metadata dictionary generated by the satellite-specific ARD packages, e.g. :func:`s1ard.metadata.extract.meta_dict`. target: A path pointing to the root directory of a product scene. exist_ok: Do not create files if they already exist? """ metadir = os.path.join(target, 'source') for uid in list(meta['source'].keys()): scene = os.path.splitext(os.path.basename(meta['source'][uid]['filename']))[0] outname = os.path.join(metadir, '{}.json'.format(scene)) if os.path.isfile(outname) and exist_ok: continue log.info(f'creating {os.path.relpath(outname, target)}') start = meta['source'][uid]['timeStart'] stop = meta['source'][uid]['timeStop'] date = start + (stop - start) / 2 ################################################################################# # Initialise STAC item item = pystac.Item(id=scene, geometry=meta['source'][uid]['geom_stac_geometry_4326'], bbox=meta['source'][uid]['geom_stac_bbox_4326'], datetime=date, properties={}) ################################################################################# # Add common metadata item.common_metadata.start_datetime = start item.common_metadata.end_datetime = stop item.common_metadata.created = meta['source'][uid]['processingDate'] item.common_metadata.instruments = [meta['common']['instrumentShortName'].lower()] item.common_metadata.constellation = meta['common']['constellation'] item.common_metadata.platform = meta['common']['platformFullname'] ################################################################################# # Add properties # prepare values values = {} for key in ['rangeResolution', 'rangePixelSpacing', 'rangeNumberOfLooks', 'azimuthResolution', 'azimuthPixelSpacing', 'azimuthNumberOfLooks']: value = meta['source'][uid][key] values[key] = mean(value.values()) if isinstance(value, dict) else value for key in ['rangeLookBandwidth', 'azimuthLookBandwidth']: if hasattr(meta['source'][uid], key): value = meta['source'][uid][key] if isinstance(value, dict): value = {k: v / 1e9 for k, v in value.items()} # GHz if isinstance(value, (float, int)): value = value / 1e9 values[key] = value #################################### # sat extension properties sat_ext = SatExtension.ext(item, add_if_missing=True) sat_ext.apply(orbit_state=OrbitState[meta['common']['orbitDirection'].upper()], relative_orbit=meta['common']['orbitNumber_rel'], absolute_orbit=meta['common']['orbitNumber_abs'], anx_datetime=meta['source'][uid]['ascendingNodeDate']) #################################### # sar extension properties sar_ext = SarExtension.ext(item, add_if_missing=True) sar_ext.apply(instrument_mode=meta['common']['operationalMode'], frequency_band=FrequencyBand[meta['common']['radarBand'].upper()], polarizations=[Polarization[pol] for pol in meta['common']['polarisationChannels']], product_type=meta['source'][uid]['productType'], center_frequency=float(meta['common']['radarCenterFreq'] / 1e9), resolution_range=values['rangeResolution'], resolution_azimuth=values['azimuthResolution'], pixel_spacing_range=values['rangePixelSpacing'], pixel_spacing_azimuth=values['azimuthPixelSpacing'], looks_range=values['rangeNumberOfLooks'], looks_azimuth=values['azimuthNumberOfLooks'], looks_equivalent_number=meta['source'][uid]['perfEquivalentNumberOfLooks'], observation_direction=ObservationDirection[meta['common']['antennaLookDirection']]) #################################### # view extension properties view_ext = ViewExtension.ext(item, add_if_missing=True) view_ext.apply(incidence_angle=meta['source'][uid]['incidenceAngleMidSwath'], azimuth=meta['source'][uid]['instrumentAzimuthAngle']) #################################### # processing extension properties item.stac_extensions.append('https://stac-extensions.github.io/processing/v1.1.0/schema.json') item.properties['processing:facility'] = meta['source'][uid]['processingCenter'] item.properties['processing:software'] = meta['source'][uid]['processorVersion'] item.properties['processing:level'] = meta['common']['processingLevel'] #################################### # card4l extension properties item.stac_extensions.append('https://stac-extensions.github.io/card4l/v0.1.0/sar/source.json') item.properties['card4l:specification'] = meta['prod']['productName-short'] item.properties['card4l:specification_version'] = meta['prod']['card4l-version'] item.properties['card4l:beam_id'] = meta['common']['swathIdentifier'] item.properties['card4l:orbit_mean_altitude'] = float(meta['common']['orbitMeanAltitude']) proc_param = {'lut_applied': meta['source'][uid]['lutApplied']} pairs = [('range_look_bandwidth', 'rangeLookBandwidth'), ('azimuth_look_bandwidth', 'azimuthLookBandwidth')] for key_dst, key_src in pairs: if hasattr(values, key_src): proc_param[key_dst] = values[key_src] item.properties['card4l:source_processing_parameters'] = proc_param keys = [ ('card4l:incidence_angle_far_range', 'incidenceAngleMax'), ('card4l:incidence_angle_near_range', 'incidenceAngleMin'), ('card4l:integrated_sidelobe_ratio', 'perfIntegratedSideLobeRatio'), ('card4l:ionosphere_indicator', 'ionosphereIndicator'), ('card4l:mean_faraday_rotation_angle', 'faradayMeanRotationAngle'), ('card4l:noise_equivalent_intensity', 'perfEstimates'), ('card4l:noise_equivalent_intensity_type', 'perfNoiseEquivalentIntensityType'), ('card4l:orbit_data_source', 'orbitDataSource'), ('card4l:peak_sidelobe_ratio', 'perfPeakSideLobeRatio'), ('card4l:resolution_range', 'rangeResolution'), ('card4l:resolution_azimuth', 'azimuthResolution'), ('card4l:source_geometry', 'dataGeometry') ] for key_dst, key_src in keys: val = meta['source'][uid][key_src] if val is not None: item.properties[key_dst] = val ################################################################################# # Add links links = [ {'rel': 'card4l-document', 'target': meta['prod']['card4l-link'].replace('.pdf', '.docx'), 'media_type': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'title': f'CARD4L Product Family Specification: ' f'{meta["prod"]["productName"]} (v{meta["prod"]["card4l-version"]})'}, {'rel': 'card4l-document', 'target': meta['prod']['card4l-link'], 'media_type': 'application/pdf', 'title': f'CARD4L Product Family Specification: ' f'{meta["prod"]["productName"]} (v{meta["prod"]["card4l-version"]})'}, {'rel': 'about', 'target': meta['source'][uid]['doi'], 'media_type': None, 'title': 'Product definition reference.'}, {'rel': 'access', 'target': meta['source'][uid]['access'], 'media_type': None, 'title': 'Product data access.'}, {'rel': 'satellite', 'target': meta['common']['platformReference'], 'media_type': None, 'title': 'CEOS Missions, Instruments and Measurements Database record'}, {'rel': 'state-vectors', 'target': meta['source'][uid]['orbitStateVector'], 'media_type': None, 'title': 'Orbit data file containing state vectors.'}, {'rel': 'sensor-calibration', 'target': meta['source'][uid]['sensorCalibration'], 'media_type': None, 'title': 'Reference describing sensor calibration parameters.'}, {'rel': 'pol-cal-matrices', 'target': meta['source'][uid]['polCalMatrices'], 'media_type': None, 'title': 'Reference to the complex-valued polarimetric distortion matrices.'}, {'rel': 'referenced-faraday-rotation', 'target': meta['source'][uid]['faradayRotationReference'], 'media_type': None, 'title': 'Reference describing the method used to derive the estimate ' 'for the mean Faraday rotation angle.'}] for link in links: if link['target'] is not None: item.add_link(link=pystac.Link(**link)) ################################################################################# # Add assets relpath = os.path.relpath(outname.replace('.json', '.xml'), metadir) xml_relpath = './' + relpath.replace('\\', '/') item.add_asset(key='card4l', asset=pystac.Asset(href=xml_relpath, title='Metadata in XML format.', media_type=pystac.MediaType.XML, roles=['metadata', 'card4l'])) _asset_add_orig_src(metadir=metadir, uid=uid, item=item) ################################################################################# # (validate) and save the result # item.validate() # for later item.save_object(dest_href=outname)
def _asset_add_orig_src( metadir: str, uid: str, item: pystac.Item ) -> None: """ Helper function to add the original source metadata files as assets to a STAC item. Parameters ---------- metadir: Source directory of the current ARD product. uid: Unique identifier of a source scene. item: The pystac.Item to add the assets to. """ pattern = r'^(.+?)-\d{8}t\d{6}-\d{8}t\d{6}-\w+-\w+-\d{3}\.xml$' prefixes = {'calibration': 'Calibration metadata', 'noise': 'Estimated thermal noise look-up tables', 'rfi': 'Radio Frequency Interference metadata'} root_dir = os.path.join(metadir, uid) if not os.path.isdir(root_dir): return file_list = finder(target=root_dir, matchlist=['*.safe', '*.xml'], foldermode=0) if len(file_list) > 0: for file in file_list: basename = os.path.basename(file) href = './' + os.path.relpath(file, metadir).replace('\\', '/') if basename == 'manifest.safe': key = 'manifest' title = 'Mandatory product metadata' else: try: key = re.match(pattern, basename).group(1) except AttributeError: raise RuntimeError( 'Unexpected file in original source metadata directory: ' + os.path.join(root_dir, file)) title = prefixes.get(key.split('-')[0], 'Measurement metadata') if title == 'Measurement metadata': title = title + ' ({},{})'.format(key.split('-')[1].upper(), key.split('-')[3].upper()) else: title = title + ' ({},{})'.format(key.split('-')[2].upper(), key.split('-')[4].upper()) item.add_asset(key=key, asset=pystac.Asset(href=href, title=title, media_type=pystac.MediaType.XML, roles=['metadata']))
[docs] def product_json( meta: dict[str, dict[str, Any]], target: str, assets: list[str], exist_ok: bool = False ) -> None: """ Function to generate product-level metadata for an ARD product in STAC compliant JSON format. Parameters ---------- meta: Metadata dictionary generated by the satellite-specific ARD packages, e.g. :func:`s1ard.metadata.extract.meta_dict`. target: A path pointing to the root directory of a product scene. assets: List of paths to all GeoTIFF and VRT assets of the currently processed ARD product. exist_ok: Do not create files if they already exist? """ scene_id = os.path.basename(target) outname = os.path.join(target, '{}.json'.format(scene_id)) if os.path.isfile(outname) and exist_ok: return log.info(f'creating {os.path.relpath(outname, target)}') start = meta['prod']['timeStart'] stop = meta['prod']['timeStop'] date = start + (stop - start) / 2 mgrs = meta['prod']['mgrsID'] # Initialise STAC item item = pystac.Item(id=scene_id, geometry=meta['prod']['geom_stac_geometry_4326'], bbox=meta['prod']['geom_stac_bbox_4326'], datetime=date, properties={}) # Add common metadata item.common_metadata.license = meta['prod']['licence'] item.common_metadata.start_datetime = start item.common_metadata.end_datetime = stop item.common_metadata.created = meta['prod']['timeCreated'] item.common_metadata.instruments = [meta['common']['instrumentShortName'].lower()] item.common_metadata.constellation = meta['common']['constellation'] item.common_metadata.platform = meta['common']['platformFullname'] item.common_metadata.gsd = float(meta['prod']['pxSpacingColumn']) # Initialise STAC extensions for properties sar_ext = SarExtension.ext(item, add_if_missing=True) sat_ext = SatExtension.ext(item, add_if_missing=True) proj_ext = ProjectionExtension.ext(item, add_if_missing=True) mgrs_ext = MgrsExtension.ext(item, add_if_missing=True) item.stac_extensions.append('https://stac-extensions.github.io/processing/v1.1.0/schema.json') item.stac_extensions.append('https://stac-extensions.github.io/card4l/v0.1.0/sar/product.json') ################################################################################# # Add properties sat_ext.apply(orbit_state=OrbitState[meta['common']['orbitDirection'].upper()], relative_orbit=meta['common']['orbitNumber_rel'], absolute_orbit=meta['common']['orbitNumber_abs']) sar_ext.apply(instrument_mode=meta['common']['operationalMode'], frequency_band=FrequencyBand[meta['common']['radarBand'].upper()], polarizations=[Polarization[pol] for pol in meta['common']['polarisationChannels']], product_type=meta['prod']['productName-short'], looks_range=meta['prod']['rangeNumberOfLooks'], looks_azimuth=meta['prod']['azimuthNumberOfLooks'], looks_equivalent_number=meta['prod']['equivalentNumberOfLooks']) proj_ext.apply(epsg=int(meta['prod']['crsEPSG']), wkt2=meta['prod']['crsWKT'], bbox=meta['prod']['geom_stac_bbox_native'], shape=[int(meta['prod']['numPixelsPerLine']), int(meta['prod']['numLines'])], transform=meta['prod']['transform']) mgrs_ext.apply(latitude_band=mgrs[2:3], grid_square=mgrs[3:], utm_zone=int(mgrs[:2])) if meta['prod']['processingCenter'] is not None: item.properties['processing:facility'] = meta['prod']['processingCenter'] item.properties['processing:software'] = meta['prod']['processorVersion'] item.properties['processing:level'] = meta['common']['processingLevel'] item.properties['card4l:specification'] = meta['prod']['productName-short'] item.properties['card4l:specification_version'] = meta['prod']['card4l-version'] item.properties['card4l:beam_id'] = meta['common']['swathIdentifier'] item.properties['card4l:measurement_type'] = meta['prod']['backscatterMeasurement'] item.properties['card4l:measurement_convention'] = meta['prod']['backscatterConvention'] item.properties['card4l:pixel_coordinate_convention'] = meta['prod']['pixelCoordinateConvention'] item.properties['card4l:speckle_filtering'] = meta['prod']['speckleFilterApplied'] item.properties['card4l:noise_removal_applied'] = meta['prod']['noiseRemovalApplied'] item.properties['card4l:conversion_eq'] = meta['prod']['backscatterConversionEq'] item.properties['card4l:relative_radiometric_accuracy'] = float(meta['prod']['radiometricAccuracyRelative']) item.properties['card4l:absolute_radiometric_accuracy'] = float(meta['prod']['radiometricAccuracyAbsolute']) item.properties['card4l:resampling_method'] = meta['prod']['geoCorrResamplingMethod'] item.properties['card4l:dem_resampling_method'] = meta['prod']['demResamplingMethod'] item.properties['card4l:egm_resampling_method'] = meta['prod']['demEGMResamplingMethod'] item.properties['card4l:gridding_convention'] = 'Sentinel-2 MGRS' item.properties['card4l:geometric_accuracy_type'] = meta['prod']['geoCorrAccuracyType'] for x in ['Northern', 'Eastern']: key = ['geoCorrAccuracy{}{}'.format(x, y) for y in ['STDev', 'Bias']] stddev = float(meta['prod'][key[0]]) bias = float(meta['prod'][key[1]]) item.properties['card4l:{}_geometric_accuracy'.format(x.lower())] = {'bias': bias, 'stddev': stddev} item.properties['card4l:geometric_accuracy_radial_rmse'] = float(meta['prod']['geoCorrAccuracy_rRMSE']) ################################################################################# # Add links links = [ {'rel': 'card4l-document', 'target': meta['prod']['card4l-link'].replace('.pdf', '.docx'), 'media_type': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'title': f'CARD4L Product Family Specification: ' f'{meta["prod"]["productName"]} (v{meta["prod"]["card4l-version"]})'}, {'rel': 'card4l-document', 'target': meta['prod']['card4l-link'], 'media_type': 'application/pdf', 'title': f'CARD4L Product Family Specification: ' f'{meta["prod"]["productName"]} (v{meta["prod"]["card4l-version"]})'}, {"rel": "about", "target": meta["prod"]["doi"], "title": "Product definition reference.", "media_type": None}, {"rel": "access", "target": meta["prod"]["access"], "title": "Product data access", "media_type": None}, {"rel": "related", "target": meta["prod"]["ancillaryData_KML"], "title": "Sentinel-2 Military Grid Reference System (MGRS) tiling " "grid file used as auxiliary data during processing", "media_type": None}, {"rel": "noise-removal", "target": meta["prod"]["noiseRemovalAlgorithm"], "title": "Reference to the noise removal algorithm details", "media_type": None}, {"rel": "radiometric-terrain-correction", "target": meta["prod"]["RTCAlgorithm"], "title": "Reference to the Radiometric Terrain Correction algorithm details", "media_type": None}, {"rel": "radiometric-accuracy", "target": meta["prod"]["radiometricAccuracyReference"], "title": "Reference describing the radiometric uncertainty of the product", "media_type": None}, {"rel": "geometric-correction", "target": meta["prod"]["geoCorrAlgorithm"], "title": "Reference to the Geometric Correction algorithm details", "media_type": None}, # only one type should be necessary, which is available via meta['prod']['demType'] # However, the validation fails, because the schema expects both {"rel": "elevation-model", "target": meta["prod"]["demReference"], "title": f"Digital Elevation Model used as auxiliary data during processing: " f"{meta['prod']['demName']}", "media_type": None}, {"rel": "surface-model", "target": meta["prod"]["demReference"], "title": f"Digital Elevation Model used as auxiliary data during processing: " f"{meta['prod']['demName']}", "media_type": None}, {"rel": "earth-gravitational-model", "target": meta["prod"]["demEGMReference"], "title": "Reference to the Earth Gravitational Model (EGM) used for geometric correction", "media_type": None}, {"rel": "geometric-accuracy", "target": meta["prod"]["geoCorrAccuracyReference"], "title": "Reference documenting the estimate of absolute localization error", "media_type": None}, {"rel": "gridding-convention", "target": meta["prod"]["griddingConventionURL"], "title": "Reference describing the gridding convention used", "media_type": None} ] if meta['prod']['productName-short'] == 'ORB': links.append( {"rel": "wind-norm-reference", "target": meta["prod"]["windNormReferenceModel"], "title": "Reference to the model used to create the wind normalisation layer", "media_type": None} ) for src in list(meta['source'].keys()): x = os.path.basename(meta['source'][src]['filename']).split('.')[0] src_target = os.path.join('./source', '{}.json'.format(x)).replace('\\', '/') links.append( {'rel': 'derived_from', 'target': src_target, 'media_type': 'application/json', 'title': 'Source metadata formatted in STAC compliant JSON format'}) for link in links: if link['target'] is not None: item.add_link(link=pystac.Link(**link)) ################################################################################# # Add assets assets = assets.copy() xml = outname.replace('.json', '.xml') if os.path.isfile(xml): assets.append(xml) assets_dict = {'measurement': {}, 'annotation': {}, 'metadata': {}} for asset in assets: relpath = './' + os.path.relpath(asset, target).replace('\\', '/') size = os.path.getsize(asset) hash = compute_hash(asset) created = None header_size = None media_type = pystac.MediaType.COG # COG, VRT byte_order = None if asset.endswith('.tif'): with Raster(asset) as ras: nodata = ras.nodata created = datetime.fromtimestamp(os.path.getctime(asset), tz=timezone.utc).isoformat() header_size = get_header_size(tif=asset) byte_order = ByteOrder.LITTLE_ENDIAN if 'measurement' in asset: key, title = _asset_get_key_title(meta=meta, asset=asset) stac_asset = pystac.Asset(href=relpath, title=title, media_type=media_type, roles=['backscatter', 'data'], extra_fields=None) if asset.endswith('.tif'): stac_asset.extra_fields = {'created': created, 'card4l:border_pixels': meta['prod']['numBorderPixels']} _asset_handle_raster_ext(stac_asset=stac_asset, nodata=nodata) file_ext = FileExtension.ext(stac_asset) file_ext.apply(byte_order=byte_order, size=size, header_size=header_size, checksum=hash) assets_dict['measurement'][key] = stac_asset elif 'annotation' in asset: key, title = _asset_get_key_title(meta=meta, asset=asset) if key == 'np': pol = re.search('-[vh]{2}', relpath).group().removeprefix('-') asset_key = 'noise-power-{}'.format(pol) title = "{} {}".format(title, pol.upper()) else: asset_key = ASSET_MAP[key]['role'] if ASSET_MAP[key]['unit'] is None: ASSET_MAP[key]['unit'] = 'unitless' stac_asset = pystac.Asset(href=relpath, title=title, media_type=media_type, roles=[ASSET_MAP[key]['role'], 'metadata'], extra_fields=None) if key == '-ei.tif': stac_asset.extra_fields = {'card4l:ellipsoidal_height': meta['prod']['ellipsoidalHeight']} file_ext = FileExtension.ext(stac_asset) file_ext.apply(byte_order=byte_order, size=size, header_size=header_size, checksum=hash) _asset_handle_raster_ext(stac_asset=stac_asset, nodata=nodata, key=key, meta=meta, asset=asset) assets_dict['annotation'][asset_key] = stac_asset else: stac_asset = pystac.Asset(href=relpath, title='Metadata in XML format.', media_type=pystac.MediaType.XML, roles=['metadata', 'card4l']) file_ext = FileExtension.ext(stac_asset) file_ext.apply(byte_order=byte_order, size=size, header_size=header_size, checksum=hash) assets_dict['metadata']['card4l'] = stac_asset for category in ['measurement', 'annotation', 'metadata']: for key in sorted(assets_dict[category].keys()): item.add_asset(key=key, asset=assets_dict[category][key]) if any(x in item.get_assets().keys() for x in ['acquisition-id', 'data-mask']): ClassificationExtension.add_to(item) FileExtension.add_to(item) RasterExtension.add_to(item) # downgrade some extensions as required by the card4l extension exts = list(item.stac_extensions) item.stac_extensions = [ "https://stac-extensions.github.io/file/v2.0.0/schema.json" if e.startswith("https://stac-extensions.github.io/file/") else "https://stac-extensions.github.io/projection/v1.0.0/schema.json" if e.startswith("https://stac-extensions.github.io/projection/") else e for e in exts ] item.validate() item.save_object(dest_href=outname)
def _asset_get_key_title( meta: dict[str, dict[str, Any]], asset: str ) -> tuple[str, str]: """ Helper function to handle creation of identifying key and title of a given asset. Parameters ---------- meta: Metadata dictionary generated by the satellite-specific ARD packages, e.g. :func:`s1ard.metadata.extract.meta_dict`. asset: Path to a GeoTIFF or VRT asset. Returns ------- The key identifying the asset and the generated title of the asset. """ key = None title = None if 'measurement' in asset: title_dict = {'g': 'gamma nought', 's': 'sigma nought', 'lin': 'linear', 'log': 'logarithmic'} pattern = ('(?P<key>(?P<pol>[vhc]{2})-' '(?P<nought>[gs])-' '(?P<scaling>lin|log)' '(?P<windnorm>-wn|))') info = re.search(pattern, asset).groupdict() key = info['key'] if re.search('cc-[gs]-lin', key): pols = meta['common']['polarisationChannels'] co = pols.pop(0) if pols[0][0] == pols[0][1] else pols.pop(1) cross = pols[0] title = 'RGB color composite (' \ '{co}-{nought}-lin, ' \ '{cross}-{nought}-lin, ' \ '{co}-{nought}-lin/{cross}-{nought}-lin)' title = title.format(co=co.lower(), cross=cross.lower(), nought=info['nought']) else: if info['windnorm'] == '-wn': skeleton = '{pol} {nought} {subtype} wind normalisation ratio, {scale} scaling' else: skeleton = '{pol} {nought} {subtype} backscatter, {scale} scaling' subtype = 'RTC' title = skeleton.format(pol=info['pol'].upper(), nought=title_dict[info['nought']], subtype=subtype, scale=title_dict[info['scaling']]) elif 'annotation' in asset: pattern = '(dm|ei|em|lc|ld|li|gs|id|np-[vh]{2}|sg|wm).tif' key = re.search(pattern, asset).groups()[0][:2] title = ASSET_MAP[key]['title'] return key, title def _asset_handle_raster_ext( stac_asset: pystac.Asset, nodata: float, key: str | None = None, meta: dict[str, dict[str, Any]] | None = None, asset: str | None = None ) -> None: """ Helper function to handle the STAC RasterExtension for a given pystac.Asset Parameters ---------- stac_asset: The pystac.Asset to set the RasterExtension for. nodata: The asset's nodata value. key: Key identifying the asset. Only necessary for annotation assets. meta: dict Metadata dictionary generated by the satellite-specific ARD packages, e.g. :func:`s1ard.metadata.extract.meta_dict`. Only necessary for annotation assets. asset: Path to a GeoTIFF or VRT asset. Only necessary for annotation assets. """ raster_ext = RasterExtension.ext(stac_asset) if 'measurement' in stac_asset.href: raster_ext.bands = [RasterBand.create(nodata=nodata, data_type=DataType.FLOAT32, unit='natural')] elif 'annotation' in stac_asset.href: if key is None or meta is None or asset is None: raise ValueError(f'`key`, `meta` and `asset` parameters need to be defined to handle RasterExtension ' f'for {stac_asset.href}') if key == 'id': src_list = [ os.path.basename(meta['source'][src]['filename']).replace('.SAFE', '').replace('.zip', '') for src in list(meta['source'].keys())] band = RasterBand.create(nodata=nodata, data_type=DataType.UINT8, unit=ASSET_MAP[key]['unit']) class_ext = ClassificationExtension.ext(band) class_ext.classes = [Classification.create(value=i + 1, description=j) for i, j in enumerate(src_list)] raster_ext.bands = [band] elif key == 'dm': with Raster(asset) as dm_ras: band_descr = [dm_ras.raster.GetRasterBand(band).GetDescription() for band in range(1, dm_ras.bands + 1)] samples = [x for x in band_descr if x in ASSET_MAP[key]['allowed']] bands = [] for sample in samples: band = RasterBand.create(nodata=nodata, data_type=DataType.UINT8, unit=ASSET_MAP[key]['unit']) class_ext = ClassificationExtension.ext(band, add_if_missing=True) class_ext.classes = [Classification.create(value=1, description=sample)] bands.append(band) raster_ext.bands = bands else: raster_ext.bands = [RasterBand.create(nodata=nodata, data_type=DataType.FLOAT32, unit=ASSET_MAP[key]['unit'])] if key == 'em': raster_ext.bands[0].spatial_resolution = int(meta['prod']['demGSD'].split()[0])