import os
import re
from copy import deepcopy
from lxml import etree
from datetime import datetime, timezone
from spatialist import Raster
from spatialist.ancillary import finder
from statistics import mean
from cesard.metadata.mapping import ASSET_MAP, NS_MAP
from cesard.metadata.extract import get_header_size, evaluate_types
from typing import Any
import logging
log = logging.getLogger('cesard')
[docs]
def parse(
meta: dict[str, dict[str, Any]],
target: str,
assets: list[str],
exist_ok: bool = False
) -> None:
"""
Wrapper for :func:`~cesard.metadata.xml.source_xml` and :func:`~cesard.metadata.xml.product_xml`.
Parameters
----------
meta:
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
target:
A path pointing to the root directory of a product scene.
assets:
List of paths to all GeoTIFF and VRT assets of the currently processed ARD product.
exist_ok:
Do not create files if they already exist?
"""
evaluate_types(meta)
platform = meta['common']['platformShortName']
pid = {'Sentinel-1': 's1', 'ERS': 'ers', 'ENVISAT': 'env'}[platform]
key = f"{pid}-{meta['prod']['productName-short'].lower()}"
nsmap = deepcopy(NS_MAP)
nsmap[key] = nsmap.pop('placeholder').format(meta['common']['constellation'])
src_url = nsmap[key].replace('spec', key.split('-')[1]).replace('role', 'source')
prod_url = nsmap[key].replace('spec', key.split('-')[1]).replace('role', 'product')
nsmap.update({key: src_url})
source_xml(meta=meta, target=target, nsmap=nsmap, ard_ns=key, exist_ok=exist_ok)
nsmap.update({key: prod_url})
product_xml(meta=meta, target=target, assets=assets, nsmap=nsmap, ard_ns=key,
exist_ok=exist_ok)
[docs]
def source_xml(
meta: dict[str, dict[str, Any]],
target: str,
nsmap: dict[str, str],
ard_ns: str,
exist_ok: bool = False
) -> None:
"""
Function to generate source-level metadata for an ARD product in `OGC 10-157r4` compliant XML format.
Parameters
----------
meta:
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
target:
A path pointing to the root directory of a product scene.
nsmap:
Dictionary listing abbreviation (key) and URI (value) of all necessary XML namespaces.
ard_ns:
Abbreviation of the ARD namespace. E.g., `s1-nrb` for the NRB ARD product.
exist_ok:
Do not create files if they already exist?
"""
metadir = os.path.join(target, 'source')
for uid in list(meta['source'].keys()):
scene = os.path.basename(meta['source'][uid]['filename']).split('.')[0]
outname = os.path.join(metadir, '{}.xml'.format(scene))
if os.path.isfile(outname) and exist_ok:
continue
log.info(f'creating {os.path.relpath(outname, target)}')
timeStart = meta['source'][uid]['timeStart'].isoformat()
timeStop = meta['source'][uid]['timeStop'].isoformat()
root = etree.Element(_nsc('_:EarthObservation', nsmap, ard_ns=ard_ns), nsmap=nsmap,
attrib={_nsc('gml:id', nsmap): scene + '_1'})
_om_time(root=root, nsmap=nsmap, scene_id=scene, time_start=timeStart, time_stop=timeStop)
_om_procedure(root=root, nsmap=nsmap, ard_ns=ard_ns, scene_id=scene, meta=meta, uid=uid, prod=False)
observedProperty = etree.SubElement(root, _nsc('om:observedProperty', nsmap),
attrib={'nilReason': 'inapplicable'})
_om_feature_of_interest(root=root, nsmap=nsmap, scene_id=scene,
extent=meta['source'][uid]['geom_xml_envelope'],
center=meta['source'][uid]['geom_xml_center'])
################################################################################################################
result = etree.SubElement(root, _nsc('om:result', nsmap))
earthObservationResult = etree.SubElement(result, _nsc('eop:EarthObservationResult', nsmap),
attrib={_nsc('gml:id', nsmap): scene + '_9'})
product = etree.SubElement(earthObservationResult, _nsc('eop:product', nsmap))
productInformation = etree.SubElement(product, _nsc('_:ProductInformation', nsmap, ard_ns=ard_ns))
fileName = etree.SubElement(productInformation, _nsc('eop:fileName', nsmap))
serviceReference = etree.SubElement(fileName, _nsc('ows:ServiceReference', nsmap),
attrib={_nsc('xlink:href', nsmap): scene})
requestMessage = etree.SubElement(serviceReference, _nsc('ows:RequestMessage', nsmap))
org_src_files_dir = os.path.join(metadir, uid)
if os.path.isdir(org_src_files_dir):
org_src_files = finder(target=org_src_files_dir, matchlist=['*.safe', '*.xml'], foldermode=0)
if len(org_src_files) > 0:
for file in org_src_files:
href = './' + os.path.relpath(file, metadir).replace('\\', '/')
product = etree.SubElement(earthObservationResult, _nsc('eop:product', nsmap))
productInformation = etree.SubElement(product, _nsc('_:ProductInformation', nsmap, ard_ns=ard_ns))
fileName = etree.SubElement(productInformation, _nsc('eop:fileName', nsmap))
serviceReference = etree.SubElement(fileName, _nsc('ows:ServiceReference', nsmap),
attrib={_nsc('xlink:href', nsmap): href})
requestMessage = etree.SubElement(serviceReference, _nsc('ows:RequestMessage', nsmap))
dataFormat = etree.SubElement(productInformation, _nsc('_:dataFormat', nsmap, ard_ns=ard_ns))
dataFormat.text = 'XML'
################################################################################################################
metaDataProperty = etree.SubElement(root, _nsc('eop:metaDataProperty', nsmap))
earthObservationMetaData = etree.SubElement(metaDataProperty, _nsc('_:EarthObservationMetaData', nsmap,
ard_ns=ard_ns))
identifier = etree.SubElement(earthObservationMetaData, _nsc('eop:identifier', nsmap))
identifier.text = scene
doi = etree.SubElement(earthObservationMetaData, _nsc('eop:doi', nsmap))
doi.text = meta['source'][uid]['doi']
acquisitionType = etree.SubElement(earthObservationMetaData, _nsc('eop:acquisitionType', nsmap))
acquisitionType.text = meta['source'][uid]['acquisitionType']
status = etree.SubElement(earthObservationMetaData, _nsc('eop:status', nsmap))
status.text = meta['source'][uid]['status']
processing = etree.SubElement(earthObservationMetaData, _nsc('eop:processing', nsmap))
processingInformation = etree.SubElement(processing, _nsc('_:ProcessingInformation', nsmap, ard_ns=ard_ns))
fields = [(processingInformation, 'eop:processingCenter', {'codeSpace': 'urn:esa:eop:Sentinel1:facility'},
'processingCenter'),
(processingInformation, 'eop:processingDate', None, 'processingDate'),
(processingInformation, 'eop:processorName', None, 'processorName'),
(processingInformation, 'eop:processorVersion', None, 'processorVersion'),
(processingInformation, 'eop:processingMode', None, 'processingMode'),
(processingInformation, '_:orbitDataSource', None, 'orbitDataSource'),
(processingInformation, '_:orbitStateVector', {'access': meta['source'][uid]['orbitDataAccess']},
'orbitStateVector'),
(processingInformation, '_:lutApplied', None, 'lutApplied'),
(earthObservationMetaData, '_:productType', {'codeSpace': 'urn:esa:eop:Sentinel1:class'},
'productType'),
(earthObservationMetaData, '_:dataGeometry', None, 'dataGeometry'),
(earthObservationMetaData, '_:azimuthPixelSpacing', {'uom': 'm'}, 'azimuthPixelSpacing'),
(earthObservationMetaData, '_:rangePixelSpacing', {'uom': 'm'}, 'rangePixelSpacing'),
(earthObservationMetaData, '_:meanFaradayRotationAngle', {'uom': 'deg'}, 'faradayMeanRotationAngle'),
(earthObservationMetaData, '_:referenceFaradayRotation',
{_nsc('xlink:href', nsmap): str(meta['source'][uid]['faradayRotationReference'])}, None),
(earthObservationMetaData, '_:ionosphereIndicator', None, 'ionosphereIndicator')]
for parent, field_dst, attrib, field_src in fields:
element = etree.SubElement(_parent=parent,
_tag=_nsc(text=field_dst, nsmap=nsmap, ard_ns=ard_ns),
attrib=attrib)
if field_src is not None:
value = meta['source'][uid][field_src]
if field_src == 'processingDate':
value = value.isoformat()
elif field_src == 'orbitDataSource':
value = value.upper()
elif field_src in ['azimuthPixelSpacing', 'rangePixelSpacing']:
value = str(mean(value.values()))
elif field_src == 'processorVersion':
value = value[meta['source'][uid]['processorName']]
element.text = value
processingLevel = etree.SubElement(processingInformation, _nsc('_:processingLevel', nsmap, ard_ns=ard_ns))
processingLevel.text = meta['common']['processingLevel']
for swath in meta['source'][uid]['swaths']:
fields = [
(processingInformation, 'azimuthLookBandwidth', {'uom': 'Hz', 'beam': swath}),
(processingInformation, 'rangeLookBandwidth', {'uom': 'Hz', 'beam': swath}),
(earthObservationMetaData, 'azimuthNumberOfLooks', {'beam': swath}),
(earthObservationMetaData, 'rangeNumberOfLooks', {'beam': swath}),
(earthObservationMetaData, 'azimuthResolution', {'uom': 'm', 'beam': swath}),
(earthObservationMetaData, 'rangeResolution', {'uom': 'm', 'beam': swath})
]
for parent, key, attrib in fields:
if key in meta['source'][uid].keys():
element = etree.SubElement(_parent=parent,
_tag=_nsc(f'_:{key}', nsmap, ard_ns=ard_ns),
attrib=attrib)
element.text = str(meta['source'][uid][key][swath])
performance = etree.SubElement(earthObservationMetaData, _nsc('_:performance', nsmap, ard_ns=ard_ns))
performanceIndicators = etree.SubElement(performance, _nsc('_:PerformanceIndicators', nsmap, ard_ns=ard_ns))
noiseEquivalentIntensityType = etree.SubElement(performanceIndicators,
_nsc('_:noiseEquivalentIntensityType', nsmap, ard_ns=ard_ns),
attrib={'uom': 'dB'})
noiseEquivalentIntensityType.text = str(meta['source'][uid]['perfNoiseEquivalentIntensityType'])
for pol in meta['common']['polarisationChannels']:
for type in ['minimum', 'mean', 'maximum']:
estimate = etree.SubElement(_parent=performanceIndicators,
_tag=_nsc('_:estimates', nsmap, ard_ns=ard_ns),
attrib={'pol': pol, 'type': type})
estimate.text = str(meta['source'][uid]['perfEstimates'][pol][type])
fields = [(performanceIndicators, '_:equivalentNumberOfLooks', 'perfEquivalentNumberOfLooks'),
(performanceIndicators, '_:peakSideLobeRatio', 'perfPeakSideLobeRatio'),
(performanceIndicators, '_:integratedSideLobeRatio', 'perfIntegratedSideLobeRatio')]
for parent, field_dst, field_src in fields:
element = etree.SubElement(parent,
_nsc(field_dst, nsmap, ard_ns=ard_ns))
element.text = str(meta['source'][uid][field_src])
polCalMatrices = etree.SubElement(earthObservationMetaData, _nsc('_:polCalMatrices', nsmap, ard_ns=ard_ns),
attrib={
_nsc('xlink:href', nsmap): str(meta['source'][uid]['polCalMatrices'])})
################################################################################################################
etree.indent(root)
tree = etree.ElementTree(root)
tree.write(outname, pretty_print=True, xml_declaration=True, encoding='utf-8')
[docs]
def product_xml(
meta: dict[str, dict[str, Any]],
target: str,
assets: list[str],
nsmap: dict[str, str],
ard_ns: str,
exist_ok: bool = False
) -> None:
"""
Function to generate product-level metadata for an ARD product in `OGC 10-157r4` compliant XML format.
Parameters
----------
meta:
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
target:
A path pointing to the root directory of a product scene.
assets:
List of paths to all GeoTIFF and VRT assets of the currently processed ARD product.
nsmap:
Dictionary listing abbreviation (key) and URI (value) of all necessary XML namespaces.
ard_ns:
Abbreviation of the ARD namespace. E.g., `s1-nrb` for the NRB ARD product.
exist_ok:
Do not create files if they already exist?
"""
scene_id = os.path.basename(target)
outname = os.path.join(target, '{}.xml'.format(scene_id))
if os.path.isfile(outname) and exist_ok:
return
log.info(f'creating {os.path.relpath(outname, target)}')
timeCreated = meta['prod']['timeCreated'].isoformat()
timeStart = meta['prod']['timeStart'].isoformat()
timeStop = meta['prod']['timeStop'].isoformat()
root = etree.Element(_nsc('_:EarthObservation', nsmap, ard_ns=ard_ns), nsmap=nsmap,
attrib={_nsc('gml:id', nsmap): scene_id + '_1'})
_om_time(root=root, nsmap=nsmap, scene_id=scene_id, time_start=timeStart, time_stop=timeStop)
_om_procedure(root=root, nsmap=nsmap, ard_ns=ard_ns, scene_id=scene_id, meta=meta, prod=True)
observedProperty = etree.SubElement(root, _nsc('om:observedProperty', nsmap),
attrib={'nilReason': 'inapplicable'})
_om_feature_of_interest(root=root, nsmap=nsmap, scene_id=scene_id,
extent=meta['prod']['geom_xml_envelope'],
center=meta['prod']['geom_xml_center'])
####################################################################################################################
result = etree.SubElement(root, _nsc('om:result', nsmap))
earthObservationResult = etree.SubElement(result, _nsc('eop:EarthObservationResult', nsmap),
attrib={_nsc('gml:id', nsmap): scene_id + '_9'})
product = etree.SubElement(earthObservationResult, _nsc('eop:product', nsmap))
productInformation = etree.SubElement(product, _nsc('_:ProductInformation', nsmap, ard_ns=ard_ns))
fileName = etree.SubElement(productInformation, _nsc('eop:fileName', nsmap))
serviceReference = etree.SubElement(fileName, _nsc('ows:ServiceReference', nsmap),
attrib={_nsc('xlink:href', nsmap): scene_id})
requestMessage = etree.SubElement(serviceReference, _nsc('ows:RequestMessage', nsmap))
for asset in assets:
relpath = './' + os.path.relpath(asset, target).replace('\\', '/')
no_data = None
header_size = None
data_format = 'VRT'
byte_order = None
data_type = None
z_error = None
if asset.endswith('.tif'):
with Raster(asset) as ras:
no_data = str(ras.nodata)
header_size = str(get_header_size(tif=asset))
data_format = 'COG'
byte_order = 'little-endian'
data_type = 'FLOAT 32'
prefix = '[0-9a-z]{5}-'
match = re.search(prefix + f"({'|'.join(meta['prod']['compression_zerrors'].keys())})",
os.path.basename(asset))
if match is not None:
k = match.group()
k = k.removeprefix(re.search(prefix, k).group())
z_error = str(meta['prod']['compression_zerrors'][k])
product = etree.SubElement(earthObservationResult, _nsc('eop:product', nsmap))
productInformation = etree.SubElement(product, _nsc('_:ProductInformation', nsmap, ard_ns=ard_ns))
fileName = etree.SubElement(productInformation, _nsc('eop:fileName', nsmap))
serviceReference = etree.SubElement(fileName, _nsc('ows:ServiceReference', nsmap),
attrib={_nsc('xlink:href', nsmap): relpath})
requestMessage = etree.SubElement(serviceReference, _nsc('ows:RequestMessage', nsmap))
size = etree.SubElement(productInformation, _nsc('eop:size', nsmap), attrib={'uom': 'bytes'})
size.text = str(os.path.getsize(asset))
lookup = [
('headerSize', header_size, {'uom': 'bytes'}),
('byteOrder', byte_order, None),
('dataFormat', data_format, None),
('noDataValue', no_data, None)
]
for key, value, attrib in lookup:
if value is not None:
element = etree.SubElement(_parent=productInformation,
_tag=_nsc(text=f'_:{key}', nsmap=nsmap, ard_ns=ard_ns),
attrib=attrib)
element.text = value
if data_type is not None:
dataType = etree.SubElement(productInformation, _nsc('_:dataType', nsmap, ard_ns=ard_ns))
dataType.text = data_type.split()[0]
bitsPerSample = etree.SubElement(productInformation, _nsc('_:bitsPerSample', nsmap, ard_ns=ard_ns))
bitsPerSample.text = data_type.split()[1]
if z_error is not None:
compressionType = etree.SubElement(productInformation, _nsc('_:compressionType', nsmap, ard_ns=ard_ns))
compressionType.text = meta['prod']['compression_type']
compressionzError = etree.SubElement(productInformation, _nsc('_:compressionZError', nsmap, ard_ns=ard_ns))
compressionzError.text = z_error
if 'annotation' in asset:
pattern = '(dm|ei|em|lc|ld|li|gs|id|np-[vh]{2}|sg|wm).tif'
key = re.search(pattern, asset).groups()[0][:2]
sampleType = etree.SubElement(productInformation, _nsc('_:sampleType', nsmap, ard_ns=ard_ns),
attrib={'uom': 'unitless' if ASSET_MAP[key]['unit'] is None else
ASSET_MAP[key]['unit']})
sampleType.text = ASSET_MAP[key]['type']
if key in ['dm', 'id']:
dataType.text = 'UINT'
bitsPerSample.text = '8'
if key == 'dm':
with Raster(asset) as dm_ras:
band_descr = [dm_ras.raster.GetRasterBand(band).GetDescription() for band in
range(1, dm_ras.bands + 1)]
samples = [x for x in band_descr if x in ASSET_MAP[key]['allowed']]
for i, sample in enumerate(samples):
bitValue = etree.SubElement(productInformation, _nsc('_:bitValue', nsmap, ard_ns=ard_ns),
attrib={'band': str(i + 1),
'name': sample})
bitValue.text = '1'
else: # key == 'id'
src_list = list(meta['source'].keys())
src_target = [os.path.basename(meta['source'][src]['filename']).replace('.SAFE',
'').replace('.zip', '')
for src in src_list]
for i, s in enumerate(src_target):
bitValue = etree.SubElement(productInformation, _nsc('_:bitValue', nsmap, ard_ns=ard_ns),
attrib={'band': '1', 'name': s})
bitValue.text = str(i + 1)
if key == 'ei':
ellipsoidalHeight = etree.SubElement(productInformation, _nsc('_:ellipsoidalHeight', nsmap,
ard_ns=ard_ns),
attrib={'uom': 'm'})
ellipsoidalHeight.text = meta['prod']['ellipsoidalHeight']
if 'measurement' in asset and not asset.endswith('.vrt'):
creationTime = etree.SubElement(productInformation, _nsc('_:creationTime', nsmap, ard_ns=ard_ns))
creationTime.text = datetime.fromtimestamp(os.path.getctime(asset), tz=timezone.utc).isoformat()
polarisation = etree.SubElement(productInformation, _nsc('_:polarisation', nsmap, ard_ns=ard_ns))
polarisation.text = re.search('-[vh]{2}', relpath).group().removeprefix('-').upper()
numBorderPixels = etree.SubElement(productInformation, _nsc('_:numBorderPixels', nsmap, ard_ns=ard_ns))
numBorderPixels.text = str(meta['prod']['numBorderPixels'])
####################################################################################################################
metaDataProperty = etree.SubElement(root, _nsc('eop:metaDataProperty', nsmap))
earthObservationMetaData = etree.SubElement(metaDataProperty, _nsc('_:EarthObservationMetaData', nsmap,
ard_ns=ard_ns))
identifier = etree.SubElement(earthObservationMetaData, _nsc('eop:identifier', nsmap))
identifier.text = scene_id
if meta['prod']['doi'] is not None:
doi = etree.SubElement(earthObservationMetaData, _nsc('eop:doi', nsmap))
doi.text = meta['prod']['doi']
acquisitionType = etree.SubElement(earthObservationMetaData, _nsc('eop:acquisitionType', nsmap))
acquisitionType.text = meta['prod']['acquisitionType']
status = etree.SubElement(earthObservationMetaData, _nsc('eop:status', nsmap))
status.text = meta['prod']['status']
processing = etree.SubElement(earthObservationMetaData, _nsc('eop:processing', nsmap))
processingInformation = etree.SubElement(processing, _nsc('_:ProcessingInformation', nsmap, ard_ns=ard_ns))
if meta['prod']['processingCenter'] is not None:
processingCenter = etree.SubElement(processingInformation, _nsc('eop:processingCenter', nsmap),
attrib={'codeSpace': 'urn:esa:eop:Sentinel1:facility'})
processingCenter.text = meta['prod']['processingCenter']
processingDate = etree.SubElement(processingInformation, _nsc('eop:processingDate', nsmap))
processingDate.text = timeCreated
processorName = etree.SubElement(processingInformation, _nsc('eop:processorName', nsmap))
processorName.text = meta['prod']['processorName']
processorVersion = etree.SubElement(processingInformation, _nsc('eop:processorVersion', nsmap))
processorVersion.text = meta['prod']['processorVersion'][meta['prod']['processorName']]
processingMode = etree.SubElement(processingInformation, _nsc('eop:processingMode', nsmap),
attrib={'codeSpace': 'urn:esa:eop:Sentinel1:class'})
processingMode.text = meta['prod']['processingMode']
processingLevel = etree.SubElement(processingInformation, _nsc('_:processingLevel', nsmap, ard_ns=ard_ns))
processingLevel.text = meta['common']['processingLevel']
for src in list(meta['source'].keys()):
src_path = '{}.xml'.format(os.path.basename(meta['source'][src]['filename']).split('.')[0])
src_target = os.path.join('./source', src_path).replace('\\', '/')
sourceProduct = etree.SubElement(processingInformation, _nsc('_:sourceProduct', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): src_target})
auxData1 = etree.SubElement(processingInformation, _nsc('_:auxiliaryDataSetFileName', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): meta['prod']['ancillaryData_KML']})
speckleFilterApplied = etree.SubElement(processingInformation, _nsc('_:speckleFilterApplied', nsmap,
ard_ns=ard_ns))
speckleFilterApplied.text = str(meta['prod']['speckleFilterApplied']).lower()
noiseRemovalApplied = etree.SubElement(processingInformation, _nsc('_:noiseRemovalApplied', nsmap, ard_ns=ard_ns))
noiseRemovalApplied.text = str(meta['prod']['noiseRemovalApplied']).lower()
value = meta['prod']['noiseRemovalAlgorithm']
if meta['prod']['noiseRemovalApplied']:
noiseRemovalAlgorithm = etree.SubElement(processingInformation,
_nsc('_:noiseRemovalAlgorithm', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): value})
if meta['prod']['RTCAlgorithm'] is not None:
rtcAlgorithm = etree.SubElement(processingInformation, _nsc('_:RTCAlgorithm', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): meta['prod']['RTCAlgorithm']})
key = 'windNormBackscatterMeasurement'
if hasattr(meta['prod'], key) and meta['prod'][key] is not None:
windNormBackscatterMeasurement = etree.SubElement(processingInformation,
_nsc('_:windNormBackscatterMeasurement',
nsmap, ard_ns=ard_ns))
windNormBackscatterMeasurement.text = meta['prod']['windNormBackscatterMeasurement']
windNormBackscatterConvention = etree.SubElement(processingInformation,
_nsc('_:windNormBackscatterConvention', nsmap, ard_ns=ard_ns))
windNormBackscatterConvention.text = meta['prod']['windNormBackscatterConvention']
windNormReferenceDirection = etree.SubElement(processingInformation,
_nsc('_:windNormReferenceDirection', nsmap, ard_ns=ard_ns),
attrib={'uom': 'deg'})
windNormReferenceDirection.text = str(meta['prod']['windNormReferenceDirection'])
windNormReferenceModel = etree.SubElement(processingInformation, _nsc('_:windNormReferenceModel', nsmap,
ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap):
meta['prod']['windNormReferenceModel']})
windNormReferenceSpeed = etree.SubElement(processingInformation,
_nsc('_:windNormReferenceSpeed', nsmap, ard_ns=ard_ns),
attrib={'uom': 'm_s'})
windNormReferenceSpeed.text = str(meta['prod']['windNormReferenceSpeed'])
windNormReferenceType = etree.SubElement(processingInformation,
_nsc('_:windNormReferenceType', nsmap, ard_ns=ard_ns))
windNormReferenceType.text = meta['prod']['windNormReferenceType']
geoCorrAlgorithm = etree.SubElement(processingInformation, _nsc('_:geoCorrAlgorithm', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): str(meta['prod']['geoCorrAlgorithm'])})
geoCorrResamplingMethod = etree.SubElement(processingInformation, _nsc('_:geoCorrResamplingAlgorithm', nsmap,
ard_ns=ard_ns))
geoCorrResamplingMethod.text = meta['prod']['geoCorrResamplingMethod'].upper()
demReference = etree.SubElement(processingInformation, _nsc('_:DEMReference', nsmap, ard_ns=ard_ns),
attrib={'name': meta['prod']['demName'],
'dem': meta['prod']['demType'],
_nsc('xlink:href', nsmap): meta['prod']['demReference']})
demResamplingMethod = etree.SubElement(processingInformation, _nsc('_:DEMResamplingMethod', nsmap, ard_ns=ard_ns))
demResamplingMethod.text = meta['prod']['demResamplingMethod'].upper()
demAccess = etree.SubElement(processingInformation, _nsc('_:DEMAccess', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): meta['prod']['demAccess']})
demGSD = etree.SubElement(processingInformation, _nsc('_:DEMGroundSamplingDistance', nsmap, ard_ns=ard_ns),
attrib={'uom': meta['prod']['demGSD'].split()[1]})
demGSD.text = meta['prod']['demGSD'].split()[0]
egmReference = etree.SubElement(processingInformation, _nsc('_:EGMReference', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): meta['prod']['demEGMReference']})
egmResamplingMethod = etree.SubElement(processingInformation, _nsc('_:EGMResamplingMethod', nsmap,
ard_ns=ard_ns))
egmResamplingMethod.text = meta['prod']['demEGMResamplingMethod'].upper()
productType = etree.SubElement(earthObservationMetaData, _nsc('_:productType', nsmap, ard_ns=ard_ns),
attrib={'codeSpace': 'urn:esa:eop:Sentinel1:class'})
productType.text = meta['prod']['productName-short']
refDoc = etree.SubElement(earthObservationMetaData, _nsc('_:refDoc', nsmap, ard_ns=ard_ns),
attrib={'name': meta['prod']['productName'],
'version': meta['prod']['card4l-version'],
_nsc('xlink:href', nsmap): meta['prod']['card4l-link']})
lookup = [
('azimuthNumberOfLooks', None),
('rangeNumberOfLooks', None),
('equivalentNumberOfLooks', None),
('radiometricAccuracyRelative', {'uom': 'dB'}),
('radiometricAccuracyAbsolute', {'uom': 'dB'})
]
for key, attrib in lookup:
element = etree.SubElement(_parent=earthObservationMetaData,
_tag=_nsc(text=f'_:{key}', nsmap=nsmap, ard_ns=ard_ns),
attrib=attrib)
element.text = str(meta['prod'][key])
radacc_ref = str(meta['prod']['radiometricAccuracyReference'])
radiometricAccuracyReference = etree.SubElement(earthObservationMetaData,
_nsc('_:radiometricAccuracyReference', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): radacc_ref})
lookup = [('geoCorrAccuracyType', None)]
lookup += [(f'geoCorrAccuracy{direction}{measure}', {'uom': 'm'})
for direction in ['Northern', 'Eastern']
for measure in ['STDev', 'Bias']]
lookup += [('geoCorrAccuracy_rRMSE', {'uom': 'm'})]
for key, attrib in lookup:
element = etree.SubElement(_parent=earthObservationMetaData,
_tag=_nsc(f'_:{key}', nsmap, ard_ns=ard_ns),
attrib=attrib)
element.text = str(meta['prod'][key])
geoacc_ref = meta['prod']['geoCorrAccuracyReference']
if geoacc_ref is not None:
geoCorrAccuracyReference = etree.SubElement(earthObservationMetaData,
_tag=_nsc('_:geoCorrAccuracyReference', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): geoacc_ref})
lookup = [
('numLines', None, None),
('numPixelsPerLine', None, None),
('pxSpacingColumn', 'columnSpacing', {'uom': 'm'}),
('pxSpacingRow', 'rowSpacing', {'uom': 'm'}),
('pixelCoordinateConvention', None, None),
('backscatterMeasurement', None, None),
('backscatterConvention', None, None),
('backscatterConversionEq', None, {'uom': 'dB'})
]
for key_src, key_dst, attrib in lookup:
if key_dst is None:
key_dst = key_src
element = etree.SubElement(_parent=earthObservationMetaData,
_tag=_nsc(text=f'_:{key_dst}', nsmap=nsmap, ard_ns=ard_ns),
attrib=attrib)
element.text = str(meta['prod'][key_src])
griddingConvention = etree.SubElement(_parent=earthObservationMetaData,
_tag=_nsc('_:griddingConvention', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap):
meta['prod']['griddingConventionURL']})
lookup = [
('mgrsID', None),
('crsEPSG', {'codeSpace': 'urn:esa:eop:crs'}),
('crsWKT', None)
]
for key, attrib in lookup:
element = etree.SubElement(_parent=earthObservationMetaData,
_tag=_nsc(text=f'_:{key}', nsmap=nsmap, ard_ns=ard_ns),
attrib=attrib)
element.text = str(meta['prod'][key])
####################################################################################################################
etree.indent(root)
tree = etree.ElementTree(root)
tree.write(outname, pretty_print=True, xml_declaration=True, encoding='utf-8')
def _nsc(
text: str,
nsmap: dict[str, str],
ard_ns: str | None = None
) -> str:
ns, key = text.split(':')
if ard_ns is not None and ns == '_':
ns = ard_ns
return '{{{0}}}{1}'.format(nsmap[ns], key)
def _om_time(
root: etree.Element,
nsmap: dict[str, str],
scene_id: str,
time_start: str,
time_stop: str
) -> None:
"""
Creates the `om:phenomenonTime` and `om:resultTime` XML elements.
Parameters
----------
root:
Root XML element.
nsmap:
Dictionary listing abbreviation (key) and URI (value) of all necessary XML namespaces.
scene_id:
Scene basename.
time_start:
Start time of the scene acquisition.
time_stop:
Stop time of the acquisition.
"""
phenomenonTime = etree.SubElement(root, _nsc('om:phenomenonTime', nsmap))
timePeriod = etree.SubElement(phenomenonTime, _nsc('gml:TimePeriod', nsmap),
attrib={_nsc('gml:id', nsmap): scene_id + '_2'})
beginPosition = etree.SubElement(timePeriod, _nsc('gml:beginPosition', nsmap))
beginPosition.text = time_start
endPosition = etree.SubElement(timePeriod, _nsc('gml:endPosition', nsmap))
endPosition.text = time_stop
resultTime = etree.SubElement(root, _nsc('om:resultTime', nsmap))
timeInstant = etree.SubElement(resultTime, _nsc('gml:TimeInstant', nsmap),
attrib={_nsc('gml:id', nsmap): scene_id + '_3'})
timePosition = etree.SubElement(timeInstant, _nsc('gml:timePosition', nsmap))
timePosition.text = time_stop
def _om_procedure(
root: etree.Element,
nsmap: dict[str, str],
ard_ns: str,
scene_id: str,
meta: dict[str, dict[str, Any]],
uid: str | None = None,
prod: bool = True
) -> None:
"""
Creates the `om:procedure/eop:EarthObservationEquipment` XML elements and all relevant subelements for source and
product metadata. Differences between source and product are controlled using the `prod=[True|False]` switch.
Parameters
----------
root:
Root XML element.
nsmap:
Dictionary listing abbreviation (key) and URI (value) of all necessary XML namespaces.
ard_ns:
Abbreviation of the ARD namespace. E.g., `s1-nrb` for the NRB ARD product.
scene_id: str
Scene basename.
meta:
Metadata dictionary generated by the satellite-specific ARD packages,
e.g. :func:`s1ard.metadata.extract.meta_dict`.
uid:
Unique identifier of a source SLC scene.
prod:
Return XML subelements for further usage in :func:`~cesard.metadata.xml.product_xml` parsing function?
Default is True. If False, the XML subelements for further usage in the :func:`~cesard.metadata.xml.source_xml`
parsing function will be returned.
"""
procedure = etree.SubElement(root, _nsc('om:procedure', nsmap))
earthObservationEquipment = etree.SubElement(procedure, _nsc('eop:EarthObservationEquipment', nsmap),
attrib={_nsc('gml:id', nsmap): scene_id + '_4'})
# eop:platform
platform0 = etree.SubElement(earthObservationEquipment, _nsc('eop:platform', nsmap))
if prod:
platform1 = etree.SubElement(platform0, _nsc('eop:Platform', nsmap))
else:
platform1 = etree.SubElement(platform0, _nsc('_:Platform', nsmap, ard_ns=ard_ns))
shortName = etree.SubElement(platform1, _nsc('eop:shortName', nsmap))
shortName.text = meta['common']['platformShortName'].upper()
serialIdentifier = etree.SubElement(platform1, _nsc('eop:serialIdentifier', nsmap))
serialIdentifier.text = meta['common']['platformIdentifier']
if not prod:
satReference = etree.SubElement(platform1, _nsc('_:satelliteReference', nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): meta['common']['platformReference']})
# eop:instrument
instrument0 = etree.SubElement(earthObservationEquipment, _nsc('eop:instrument', nsmap))
instrument1 = etree.SubElement(instrument0, _nsc('eop:Instrument', nsmap))
shortName = etree.SubElement(instrument1, _nsc('eop:shortName', nsmap))
shortName.text = meta['common']['instrumentShortName']
# eop:sensor
sensor0 = etree.SubElement(earthObservationEquipment, _nsc('eop:sensor', nsmap))
sensor1 = etree.SubElement(sensor0, _nsc('_:Sensor', nsmap, ard_ns=ard_ns))
sensorType = etree.SubElement(sensor1, _nsc('eop:sensorType', nsmap))
sensorType.text = meta['common']['sensorType']
operationalMode = etree.SubElement(sensor1, _nsc('eop:operationalMode', nsmap),
attrib={'codeSpace': 'urn:esa:eop:C-SAR:operationalMode'})
operationalMode.text = meta['common']['operationalMode']
swathIdentifier = etree.SubElement(sensor1, _nsc('eop:swathIdentifier', nsmap),
attrib={'codeSpace': 'urn:esa:eop:C-SAR:swathIdentifier'})
swathIdentifier.text = meta['common']['swathIdentifier']
radarBand = etree.SubElement(sensor1, _nsc('_:radarBand', nsmap, ard_ns=ard_ns))
radarBand.text = meta['common']['radarBand']
if not prod:
radarCenterFreq = etree.SubElement(sensor1, _nsc('_:radarCenterFrequency', nsmap, ard_ns=ard_ns),
attrib={'uom': 'Hz'})
radarCenterFreq.text = '{:.3e}'.format(meta['common']['radarCenterFreq'])
value = meta['source'][uid]['sensorCalibration']
if value is not None:
sensorCalibration = etree.SubElement(sensor1, _nsc('_:sensorCalibration',
nsmap, ard_ns=ard_ns),
attrib={_nsc('xlink:href', nsmap): value})
# eop:acquisitionParameters
acquisitionParameters = etree.SubElement(earthObservationEquipment, _nsc('eop:acquisitionParameters', nsmap))
acquisition = etree.SubElement(acquisitionParameters, _nsc('_:Acquisition', nsmap, ard_ns=ard_ns))
orbitNumber = etree.SubElement(acquisition, _nsc('eop:orbitNumber', nsmap))
orbitNumber.text = str(meta['common']['orbitNumber_abs'])
orbitDirection = etree.SubElement(acquisition, _nsc('eop:orbitDirection', nsmap))
orbitDirection.text = meta['common']['orbitDirection'].upper()
wrsLongitudeGrid = etree.SubElement(acquisition, _nsc('eop:wrsLongitudeGrid', nsmap),
attrib={'codeSpace': 'urn:esa:eop:Sentinel1:relativeOrbits'})
wrsLongitudeGrid.text = meta['common']['wrsLongitudeGrid']
if not prod:
value = meta['source'][uid]['ascendingNodeDate']
ascendingNodeDate = etree.SubElement(acquisition, _nsc('eop:ascendingNodeDate', nsmap))
if value is not None:
ascendingNodeDate.text = value.isoformat()
startTimeFromAscendingNode = etree.SubElement(acquisition, _nsc('eop:startTimeFromAscendingNode', nsmap),
attrib={'uom': 'ms'})
startTimeFromAscendingNode.text = meta['source'][uid]['timeStartFromAscendingNode']
completionTimeFromAscendingNode = etree.SubElement(acquisition,
_nsc('eop:completionTimeFromAscendingNode', nsmap),
attrib={'uom': 'ms'})
completionTimeFromAscendingNode.text = meta['source'][uid]['timeCompletionFromAscendingNode']
instrumentAzimuthAngle = etree.SubElement(acquisition, _nsc('eop:instrumentAzimuthAngle', nsmap),
attrib={'uom': 'deg'})
instrumentAzimuthAngle.text = str(meta['source'][uid]['instrumentAzimuthAngle'])
polarisationMode = etree.SubElement(acquisition, _nsc('sar:polarisationMode', nsmap))
polarisationMode.text = meta['common']['polarisationMode']
polarisationChannels = etree.SubElement(acquisition, _nsc('sar:polarisationChannels', nsmap))
polarisationChannels.text = ', '.join(meta['common']['polarisationChannels'])
if prod:
numberOfAcquisitions = etree.SubElement(acquisition, _nsc('_:numberOfAcquisitions', nsmap, ard_ns=ard_ns))
numberOfAcquisitions.text = meta['prod']['numberOfAcquisitions']
else:
antennaLookDirection = etree.SubElement(acquisition, _nsc('sar:antennaLookDirection', nsmap))
antennaLookDirection.text = meta['common']['antennaLookDirection']
minimumIncidenceAngle = etree.SubElement(acquisition, _nsc('sar:minimumIncidenceAngle', nsmap),
attrib={'uom': 'deg'})
minimumIncidenceAngle.text = str(meta['source'][uid]['incidenceAngleMin'])
maximumIncidenceAngle = etree.SubElement(acquisition, _nsc('sar:maximumIncidenceAngle', nsmap),
attrib={'uom': 'deg'})
maximumIncidenceAngle.text = str(meta['source'][uid]['incidenceAngleMax'])
orbitMeanAltitude = etree.SubElement(acquisition, _nsc('_:orbitMeanAltitude', nsmap, ard_ns=ard_ns),
attrib={'uom': 'm'})
orbitMeanAltitude.text = meta['common']['orbitMeanAltitude']
if meta['common']['platformShortName'] == 'Sentinel':
dataTakeID = etree.SubElement(acquisition, _nsc('_:dataTakeID', nsmap, ard_ns=ard_ns))
dataTakeID.text = meta['source'][uid]['datatakeID']
majorCycleID = etree.SubElement(acquisition, _nsc('_:majorCycleID', nsmap, ard_ns=ard_ns))
majorCycleID.text = meta['source'][uid]['majorCycleID']
def _om_feature_of_interest(
root: etree.Element,
nsmap: dict[str, str],
scene_id: str,
extent: str,
center: str
):
"""
Creates the `om:featureOfInterest` XML elements.
Parameters
----------
root:
Root XML element.
nsmap:
Dictionary listing abbreviation (key) and URI (value) of all necessary XML namespaces.
scene_id:
Scene basename.
extent:
Footprint coordinates of the scene.
center:
Center coordinates of the footprint.
"""
featureOfInterest = etree.SubElement(root, _nsc('om:featureOfInterest', nsmap))
footprint = etree.SubElement(featureOfInterest, _nsc('eop:Footprint', nsmap),
attrib={_nsc('gml:id', nsmap): scene_id + '_5'})
multiExtentOf = etree.SubElement(footprint, _nsc('eop:multiExtentOf', nsmap))
multiSurface = etree.SubElement(multiExtentOf, _nsc('gml:MultiSurface', nsmap),
attrib={_nsc('gml:id', nsmap): scene_id + '_6'})
surfaceMember = etree.SubElement(multiSurface, _nsc('gml:surfaceMember', nsmap))
polygon = etree.SubElement(surfaceMember, _nsc('gml:Polygon', nsmap),
attrib={_nsc('gml:id', nsmap): scene_id + '_7'})
exterior = etree.SubElement(polygon, _nsc('gml:exterior', nsmap))
linearRing = etree.SubElement(exterior, _nsc('gml:LinearRing', nsmap))
posList = etree.SubElement(linearRing, _nsc('gml:posList', nsmap))
posList.text = extent
centerOf = etree.SubElement(footprint, _nsc('eop:centerOf', nsmap))
point = etree.SubElement(centerOf, _nsc('gml:Point', nsmap), attrib={_nsc('gml:id', nsmap): scene_id + '_8'})
pos = etree.SubElement(point, _nsc('gml:pos', nsmap))
pos.text = center