import csv
import functools
import hashlib
import logging
import os
import re
import requests
from datetime import datetime, timezone
import fsspec
import numpy
import pandas
import pygeoprocessing
import yaml
from osgeo import gdal
from osgeo import osr
from pathlib import Path
from pydantic import ValidationError
import tarfile
from . import models
from .config import Config
logging.getLogger('chardet').setLevel(logging.INFO) # DEBUG is just too noisy
GDAL_VERSION = tuple(int(_) for _ in gdal.__version__.split('.'))
LOGGER = logging.getLogger('geometamaker')
_NOT_FOR_CLI = 'not_for_cli'
_LOG_EXTRA_NOT_FOR_CLI = {
_NOT_FOR_CLI: True
}
# URI schemes we support. A subset of fsspec.available_protocols()
PROTOCOLS = [
'file',
'http',
'https',
]
DT_FMT = '%Y-%m-%d %H:%M:%S %Z'
ARCHIVE_EXTENSIONS = ['.zip', '.tar', '.tgz', '.tar.gz']
TABLE_EXTENSIONS = ['.csv', '.tsv']
READ_CSV_KWARGS = {
'index_col': False,
'sep': None, # let the python engine guess the separator
'engine': 'python'
}
def _gdal_progress_callback(complete, message, data):
percentage = complete * 100
if (percentage > 0) & (percentage % 5 == 0):
LOGGER.info(f'{message} {percentage}%')
# TODO: In the future we can remove these exception managers in favor of the
# builtin gdal.ExceptionMgr. It was released in 3.7.0 and debugged in 3.9.1.
# https://github.com/OSGeo/gdal/blob/v3.9.3/NEWS.md#gdalogr-391-release-notes
class _OSGEOUseExceptions:
"""Context manager that enables GDAL/OSR exceptions and restores state after."""
def __init__(self):
pass
def __enter__(self):
self.currentGDALUseExceptions = gdal.GetUseExceptions()
self.currentOSRUseExceptions = osr.GetUseExceptions()
gdal.UseExceptions()
osr.UseExceptions()
def __exit__(self, exc_type, exc_val, exc_tb):
# The error-handlers are in a stack, so
# these must be called from the top down.
if self.currentOSRUseExceptions == 0:
osr.DontUseExceptions()
if self.currentGDALUseExceptions == 0:
gdal.DontUseExceptions()
def _osgeo_use_exceptions(func):
"""Decorator that enables GDAL/OSR exceptions and restores state after.
Args:
func (callable): function to call with GDAL/OSR exceptions enabled
Returns:
Wrapper function that calls ``func`` with GDAL/OSR exceptions enabled
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
with _OSGEOUseExceptions():
return func(*args, **kwargs)
return wrapper
def _vsi_path(filepath, scheme):
"""Construct a GDAL virtual file system path.
Args:
filepath (str): path to a file to be opened by GDAL
scheme (str): the protocol prefix of the filepath
Returns:
str
"""
if scheme.startswith('http'):
filepath = f'/vsicurl/{filepath}'
return filepath
def _wkt_to_epsg_units_string(wkt_string):
crs_string = 'unknown'
units_string = 'unknown'
try:
srs = osr.SpatialReference(wkt_string)
srs.AutoIdentifyEPSG()
crs_string = (
f"{srs.GetAttrValue('AUTHORITY', 0)}:"
f"{srs.GetAttrValue('AUTHORITY', 1)}")
units_string = srs.GetAttrValue('UNIT', 0)
except RuntimeError:
LOGGER.warning(
f'{wkt_string} cannot be interpreted as a coordinate reference system')
return crs_string, units_string
def _epsg_to_wkt_units_string(epsg_code):
wkt_string = 'unknown'
units_string = 'unknown'
try:
srs = osr.SpatialReference()
srs.ImportFromEPSG(epsg_code)
wkt_string = srs.ExportToWkt()
units_string = srs.GetAttrValue('UNIT', 0)
except RuntimeError:
LOGGER.warning(
f'EPSG: {epsg_code} cannot be interpreted as a coordinate reference system')
return wkt_string, units_string
def _list_files_with_depth(directory, depth, exclude_regex=None,
exclude_hidden=True):
"""List files in directory up to depth
Args:
directory (string): path to a directory
depth (int): maximum number of subdirectory levels to traverse when
walking through a directory. A value of 1 limits the walk to files
in the top-level ``directory`` only. A value of 2 allows
descending into immediate subdirectories, etc.
exclude_regex (str, optional): a regular expression to pattern-match
any files for which you do not want to create metadata.
exclude_hidden (bool, default True): whether to ignore hidden files
Returns:
list of relative filepaths in ``directory``
"""
directory = Path(directory).resolve()
file_list = []
for path in directory.rglob("*"):
relative_path = path.relative_to(directory)
current_depth = len(relative_path.parts)
if current_depth > depth:
continue
if exclude_hidden and (
any(part.startswith('.') for part in relative_path.parts)):
continue
file_list.append(str(relative_path))
# remove excluded files based on regex
if exclude_regex is not None:
file_list = [f for f in file_list if not re.search(exclude_regex, f)]
return sorted(file_list)
def _get_collection_size_time_uid(directory):
"""Get size of directory (in bytes), when it was last modified, and uid"""
total_bytes = 0
latest_mtime = 0
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
stat = os.stat(file_path)
total_bytes += stat.st_size
latest_mtime = max(latest_mtime, stat.st_mtime)
last_modified = datetime.fromtimestamp(latest_mtime, tz=timezone.utc)
last_modified_str = last_modified.strftime('%Y-%m-%d %H:%M:%S %Z')
hash_func = hashlib.sha256()
hash_func.update(
f'{total_bytes}{last_modified_str}{directory}'.encode('utf-8'))
uid = f'sizetimestamp:{hash_func.hexdigest()}'
return total_bytes, last_modified_str, uid
[docs]
def detect_file_type(filepath, scheme):
"""Detect the type of resource contained in the file.
Args:
filepath (str): path to a file
scheme (str): the protocol prefix of the filepath
Returns:
str
Raises:
ValueError on unsupported file formats.
"""
extension = os.path.splitext(filepath)[1].lower()
if extension in ARCHIVE_EXTENSIONS:
return 'archive'
# GDAL considers CSV a vector, so check for tables first.
if extension in TABLE_EXTENSIONS:
return 'table'
# TODO: guard against classifying netCDF, HDF5, etc as GDAL rasters.
# We'll likely want a different data model for multi-dimensional arrays.
try:
gis_type = pygeoprocessing.get_gis_type(_vsi_path(filepath, scheme))
except ValueError:
raise ValueError(
f'{filepath} does not appear to be a supported file format.'
f' Supported formats are {ARCHIVE_EXTENSIONS}, {TABLE_EXTENSIONS}'
f' or any format supported by GDAL.')
if gis_type == pygeoprocessing.VECTOR_TYPE:
return 'vector'
if gis_type == pygeoprocessing.RASTER_TYPE:
return 'raster'
raise ValueError(
f'{filepath} contains both raster and vector data. '
'Such files are not supported by GeoMetaMaker. '
'If you wish to see support for these files, please '
'submit a feature request and share your dataset: '
'https://github.com/natcap/geometamaker/issues ')
[docs]
def describe_file(source_dataset_path, scheme):
"""Describe basic properties of a file.
Args:
source_dataset_path (str): path to a file.
scheme (str): the protocol prefix of the filepath
Returns:
dict
"""
description = {
'path': source_dataset_path,
'format': os.path.splitext(source_dataset_path)[1].lower().lstrip('.')
}
# If we want to support more file protocols in the future, it may
# make sense to use fsspec to access file info in a protocol-agnostic way.
# But not all protocols are equally supported yet.
# https://github.com/fsspec/filesystem_spec/issues/526
if scheme.startswith('http'):
info = requests.head(source_dataset_path).headers
description['bytes'] = info['Content-Length']
description['last_modified'] = datetime.strptime(
info['Last-Modified'], '%a, %d %b %Y %H:%M:%S %Z').strftime(DT_FMT)
else:
info = os.stat(source_dataset_path)
description['bytes'] = info.st_size
description['last_modified'] = datetime.fromtimestamp(
info.st_mtime, tz=timezone.utc).strftime(DT_FMT)
hash_func = hashlib.new('sha256')
hash_func.update(
f'{description["bytes"]}{description["last_modified"]}\
{description["path"]}'.encode('utf-8'))
description['uid'] = f'sizetimestamp:{hash_func.hexdigest()}'
return description
[docs]
def describe_archive(source_dataset_path, scheme, **kwargs):
"""Describe file properties of an archive file.
Args:
source_dataset_path (str): path to a file.
scheme (str): the protocol prefix of the filepath
kwargs (dict): additional options when describing a dataset.
Returns:
dict
"""
def _list_tgz_contents(path):
"""List contents of a .tar, .tgz, or .tar.gz archive."""
file_list = []
with fsspec.open(path, 'rb') as fobj:
with tarfile.open(fileobj=fobj, mode='r:*') as tar:
file_list = [member.name for member in tar.getmembers()
if member.isfile()]
return file_list
def _list_zip_contents(path):
"""List contents of a zip archive"""
file_list = []
ZFS = fsspec.get_filesystem_class('zip')
zfs = ZFS(path)
for dirpath, _, files in zfs.walk(zfs.root_marker):
for f in files:
file_list.append(os.path.join(dirpath, f))
return file_list
description = describe_file(source_dataset_path, scheme)
if description['format'] == 'zip':
description['compression'] = 'zip'
file_list = _list_zip_contents(source_dataset_path)
elif description['format'] in ['tgz', 'tar', 'gz']:
file_list = _list_tgz_contents(source_dataset_path)
if description['format'] in ['tgz', 'gz']:
description['compression'] = 'gz'
else:
raise ValueError(f'Unsupported archive format: {source_dataset_path}')
description['sources'] = file_list
return description
[docs]
def describe_vector(source_dataset_path, scheme, **kwargs):
"""Describe properties of a GDAL vector file.
Args:
source_dataset_path (str): path to a GDAL vector.
scheme (str): the protocol prefix of the filepath
kwargs (dict): additional options when describing a dataset.
Returns:
dict
"""
description = describe_file(source_dataset_path, scheme)
if 'http' in scheme:
source_dataset_path = f'/vsicurl/{source_dataset_path}'
vector = gdal.OpenEx(source_dataset_path, gdal.OF_VECTOR)
layer = vector.GetLayer()
fields = []
for fld in layer.schema:
fields.append(
models.FieldSchema(name=fld.name, type=fld.GetTypeName()))
layer_schema = models.LayerSchema(
name=layer.GetName(),
n_features=layer.GetFeatureCount(),
table=models.TableSchema(fields=fields),
gdal_metadata=layer.GetMetadata())
description['data_model'] = models.VectorSchema(
layers=[layer_schema],
gdal_metadata=vector.GetMetadata())
vector = layer = None
info = pygeoprocessing.get_vector_info(source_dataset_path)
bbox = models.BoundingBox(*info['bounding_box'])
epsg_string, units_string = _wkt_to_epsg_units_string(
info['projection_wkt'])
description['spatial'] = models.SpatialSchema(
bounding_box=bbox,
crs=epsg_string,
crs_units=units_string)
description['sources'] = info['file_list']
return description
[docs]
def describe_raster(source_dataset_path, scheme, **kwargs):
"""Describe properties of a GDAL raster file.
Args:
source_dataset_path (str): path to a GDAL raster.
scheme (str): the protocol prefix of the filepath
kwargs (dict): additional options when describing a dataset:
* ``'compute_stats'`` (bool): whether to compute statistics
for each band in the raster. Default is False.
Returns:
dict
"""
compute_stats = kwargs.get('compute_stats', False)
description = describe_file(source_dataset_path, scheme)
if 'http' in scheme:
source_dataset_path = f'/vsicurl/{source_dataset_path}'
info = pygeoprocessing.get_raster_info(source_dataset_path)
raster = gdal.OpenEx(source_dataset_path)
raster_gdal_metadata = raster.GetMetadata()
bands = []
for i in range(info['n_bands']):
b = i + 1
band = raster.GetRasterBand(b)
rat = None
gdal_rat = band.GetDefaultRAT()
if gdal_rat:
rat = models.RasterAttributeTable.from_gdal(gdal_rat)
elif GDAL_VERSION < (3, 11, 0):
# GetDefaultRAT did not support DBF prior to 3.11.0
dbf_rat = f'{source_dataset_path}.vat.dbf'
if dbf_rat in info['file_list']:
rat = models.RasterAttributeTable.from_gdal_dbf(dbf_rat)
band_gdal_metadata = band.GetMetadata()
if compute_stats:
try:
if 'STATISTICS_VALID_PERCENT' not in band_gdal_metadata:
# Sometimes some stats exist, but not all. If this one doesn't,
# it's important enough that we want to force computation.
_ = band.ComputeStatistics(0, callback=_gdal_progress_callback)
else:
# 0=do not approximate stats, 1=calculate if they don't exist
# If exact stats exist they will be retrieved without
# computing them, otherwise, this forces computation.
# https://github.com/OSGeo/gdal/blob/master/gcore/gdalrasterband.cpp
_ = band.GetStatistics(0, 1)
band_gdal_metadata = band.GetMetadata()
except RuntimeError as e:
LOGGER.warning(
f'Could not compute statistics for band {b} of '
f'{source_dataset_path}: {e}')
bands.append(models.BandSchema(
index=b,
gdal_type=gdal.GetDataTypeName(info['datatype']),
numpy_type=numpy.dtype(info['numpy_type']).name,
nodata=info['nodata'][i],
gdal_metadata=band_gdal_metadata,
raster_attribute_table=rat))
band = None
raster = None
description['data_model'] = models.RasterSchema(
bands=bands,
pixel_size=info['pixel_size'],
raster_size={'width': info['raster_size'][0],
'height': info['raster_size'][1]},
gdal_metadata=raster_gdal_metadata)
# Some values of raster info are numpy types, which the
# yaml dumper doesn't know how to represent.
bbox = models.BoundingBox(*[float(x) for x in info['bounding_box']])
epsg_string, units_string = _wkt_to_epsg_units_string(
info['projection_wkt'])
description['spatial'] = models.SpatialSchema(
bounding_box=bbox,
crs=epsg_string,
crs_units=units_string)
description['sources'] = info['file_list']
return description
[docs]
def describe_table(source_dataset_path, scheme, **kwargs):
"""Describe properties of a tabular dataset.
Args:
source_dataset_path (str): path to a file representing a table.
scheme (str): the protocol prefix of the filepath
kwargs (dict): additional options when describing a dataset.
Returns:
dict
Raises:
ValueError if the file cannot be read as a table.
"""
description = describe_file(source_dataset_path, scheme)
try:
# Read enough rows to make a good inference on each column's
# datatype.
dataframe = pandas.read_csv(
source_dataset_path, nrows=100, **READ_CSV_KWARGS)
except (UnicodeDecodeError, pandas.errors.ParserError, csv.Error) as error:
raise ValueError(
f'{source_dataset_path} cannot be read as a table: {error}')
field_list = []
for field in dataframe.columns:
field_type = pandas.api.types.infer_dtype(dataframe[field])
field_list.append(
models.FieldSchema(name=field, type=field_type))
description['data_model'] = models.TableSchema(fields=field_list)
return description
[docs]
def describe_collection(directory, depth=numpy.iinfo(numpy.int16).max,
exclude_regex=None, exclude_hidden=True,
describe_files=False, backup=True, target_filename=None,
**kwargs):
"""Create a single metadata document to describe a collection of files.
Describe all the files within a directory as members of a "collection".
The resulting metadata resource should include a list of all the files
included in the collection along with a description and metadata filepath
(or placeholder). Optionally create individual metadata files for each
supported file in a directory.
Args:
directory (str): path to collection
depth (int, optional): maximum number of subdirectory levels to
traverse when walking through ``directory`` to find files included
in the collection. A value of 1 limits the walk to files in the
top-level ``directory`` only. A value of 2 allows descending into
immediate subdirectories, etc. All files in all subdirectories in
the collection will be included by default.
exclude_regex (str, optional): a regular expression to pattern-match
any files you do not want included in the output metadata yml.
exclude_hidden (bool, default True): whether to exclude hidden files
(files that start with ".").
describe_files (bool, default False): whether to ``describe`` all
files, i.e., create individual metadata files for each supported
resource in the collection.
backup (bool): whether to write a backup of a pre-existing metadata
file before ovewriting it in cases where that file is not a valid
geometamaker document.
kwargs (dict): optional keyward arguments accepted by ``describe``.
Returns:
Collection metadata
"""
directory = str(Path(directory).resolve())
file_list = _list_files_with_depth(directory, depth, exclude_regex,
exclude_hidden)
items = []
collection_crs_set = set()
item_spatial_list = []
# These extensions almost always represent sidecar files that should
# not be described in isolation. Typically, these are components of a
# shapefile, but '.dbf' can also represent a raster attribute table.
# Theoretically a DBF can also be a standalone table, but that
# is not currently supported by this function.
skip_extensions = [
'.shx', '.sbn', '.sbx', '.prj', '.dbf', '.cpg', '.qix', '.xml', '.tfw',
'.qlr', '.lyr', '.qpj', '.yml']
for rel_filepath in file_list:
abs_filepath = os.path.join(directory, rel_filepath)
root, extension = os.path.splitext(abs_filepath)
if extension.lower() in skip_extensions:
continue
try:
item_resource = describe(abs_filepath, **kwargs)
if item_resource.spatial is not None:
collection_crs_set.add(item_resource.spatial.crs)
item_spatial_list.append(item_resource.spatial)
except ValueError:
# if file type isn't supported by geometamaker, e.g. pdf
# or if trying to describe a dir
item_resource = None
if describe_files and item_resource:
item_resource.write(backup=backup)
if os.path.exists(f'{abs_filepath}.yml'):
metadata_yml = f'{rel_filepath}.yml'
else:
metadata_yml = ''
collection_item = models.CollectionItemSchema(
path=rel_filepath,
description=item_resource.description if item_resource else '',
metadata=metadata_yml
)
items.append(collection_item)
total_bytes, last_modified, uid = _get_collection_size_time_uid(directory)
spatial = None
if len(collection_crs_set) == 1:
collection_bbox = pygeoprocessing.merge_bounding_box_list(
[list(spatial.bounding_box) for spatial in item_spatial_list],
'union')
spatial = models.SpatialSchema(
bounding_box=models.BoundingBox(*collection_bbox),
crs=item_spatial_list[0].crs,
crs_units=item_spatial_list[0].crs_units)
if len(collection_crs_set) > 1:
wgs84_bbox_list = []
target_projection_wkt, crs_units = _epsg_to_wkt_units_string(4326)
try:
for spatial in item_spatial_list:
base_projection_wkt, crs_units = _epsg_to_wkt_units_string(
int(spatial.crs.split(':')[1]))
bbox = pygeoprocessing.transform_bounding_box(
bounding_box=list(spatial.bounding_box),
base_projection_wkt=base_projection_wkt,
target_projection_wkt=target_projection_wkt)
wgs84_bbox_list.append(bbox)
collection_bbox = pygeoprocessing.merge_bounding_box_list(
wgs84_bbox_list, 'union')
spatial = models.SpatialSchema(
bounding_box=models.BoundingBox(*collection_bbox),
crs='EPSG:4326',
crs_units=crs_units)
except (ValueError, RuntimeError) as error:
# transform_bounding_box can raise a ValueError
LOGGER.error(error)
LOGGER.warning(
f'Cannot define spatial attribute for Collection {directory}')
resource = models.CollectionResource(
path=directory,
type='collection',
format='directory',
scheme=fsspec.utils.get_protocol(directory),
bytes=total_bytes,
last_modified=last_modified,
items=items,
uid=uid,
spatial=spatial
)
# Check if there is existing metadata for the collection
if not target_filename:
target_filename = f'{os.path.basename(directory)}-metadata.yml'
metadata_path = os.path.join(directory, target_filename)
try:
existing_metadata = models.CollectionResource.load(metadata_path)
# Copy any existing item descriptions from existing yml to new metadata
# Note that descriptions in individual resources' ymls will take
# priority over item descriptions from preexisting collection metadata
for item in resource.items:
# Existing metadata's item desc will overwrite new metadata item
# desc if new item desc is ''
existing_item_desc = [
i.description for i in existing_metadata.items if (
i.path == item.path)]
if item.description == '' and len(existing_item_desc) > 0:
item.description = existing_item_desc[0]
# Replace fields in existing yml if new metadata has existing value
resource = existing_metadata.replace(resource)
except (ValueError, ValidationError) as error:
LOGGER.warning(error)
LOGGER.warning(
f'Ignoring an existing YAML document: {metadata_path} because it'
f' is invalid or incompatible.')
LOGGER.warning(
'A subsequent call to `.write()` will replace this file, but it'
f' will be backed up to {metadata_path}.bak.\n'
f'Use `.write(backup=False)` to skip the backup.\n',
extra=_LOG_EXTRA_NOT_FOR_CLI)
resource._would_overwrite = True
except FileNotFoundError:
pass
# Add profile metadata
config = Config()
resource = resource.replace(config.profile)
resource.metadata_path = metadata_path
return resource
DESCRIBE_FUNCS = {
'archive': describe_archive,
'table': describe_table,
'vector': describe_vector,
'raster': describe_raster
}
RESOURCE_MODELS = {
'archive': models.ArchiveResource,
'table': models.TableResource,
'vector': models.VectorResource,
'raster': models.RasterResource,
'collection': models.CollectionResource
}
[docs]
@_osgeo_use_exceptions
def describe(source_dataset_path, compute_stats=False):
"""Create a metadata resource instance with properties of the dataset.
Properties of the dataset are used to populate as many metadata
properties as possible. Default/placeholder
values are used for properties that require user input.
Args:
source_dataset_path (string): path or URL to dataset to which the
metadata applies
compute_stats (bool): whether to compute statistics
for each band in a raster.
Returns:
geometamaker.models.Resource: a metadata object
Raises:
ValueError if the file type or protocol of the dataset is not supported.
FileNotFoundError if the path does not exist.
"""
metadata_path = f'{source_dataset_path}.yml'
if os.path.isdir(source_dataset_path):
raise ValueError(
f"Cannot `describe` {source_dataset_path} as it is a directory, "
"not a dataset. \nIf you are trying to create metadata for the "
"files within a directory and/or the directory itself, please use "
"`geometamaker.describe_collection` instead.")
# Despite naming, this does not open a file that must be closed
of = fsspec.open(source_dataset_path)
if not of.fs.exists(source_dataset_path):
raise FileNotFoundError(f'{source_dataset_path} does not exist')
protocol = fsspec.utils.get_protocol(source_dataset_path)
if protocol not in PROTOCOLS:
raise ValueError(
f'Cannot describe {source_dataset_path}. {protocol} '
f'is not one of the suppored file protocols: {PROTOCOLS}')
resource_type = detect_file_type(source_dataset_path, protocol)
description = DESCRIBE_FUNCS[resource_type](
source_dataset_path, protocol, compute_stats=compute_stats)
description['type'] = resource_type
resource = RESOURCE_MODELS[resource_type](**description)
# Load existing metadata file
try:
# For the data model, use heuristic to decide if the new resource
# should inherit values from the existing resource.
# After that, take all non-empty values from the new resource
# and update the existing resource.
existing_resource = RESOURCE_MODELS[resource_type].load(metadata_path)
if resource_type == 'raster':
for band in resource.data_model.bands:
try:
eband = existing_resource.get_band_description(band.index)
except IndexError:
continue
if (band.numpy_type, band.gdal_type, band.nodata) == (
eband.numpy_type, eband.gdal_type, eband.nodata):
resource.set_band_description(
band.index,
title=eband.title,
description=eband.description,
units=eband.units)
if resource_type in ('vector', 'table'):
for field in resource._get_fields():
try:
efield = existing_resource.get_field_description(field.name)
except KeyError:
continue
if field.type == efield.type:
resource.set_field_description(
field.name,
title=efield.title,
description=efield.description,
units=efield.units)
resource = existing_resource.replace(resource)
except (ValueError, ValidationError) as error:
LOGGER.warning(error)
LOGGER.warning(
f'Ignoring an existing YAML document: {metadata_path} because it'
f' is invalid or incompatible.')
LOGGER.warning(
'A subsequent call to `.write()` will replace this file, but it'
' will be backed up to {metadata_path}.bak.\n'
f'Use `.write(backup=False)` to skip the backup.\n',
extra=_LOG_EXTRA_NOT_FOR_CLI)
resource._would_overwrite = True
except FileNotFoundError:
# Common path: metadata file does not already exist
pass
config = Config()
resource = resource.replace(config.profile)
return resource
[docs]
def validate(filepath):
"""Validate a YAML metadata document.
Validation includes type-checking of property values and
checking for the presence of required properties.
Args:
directory (string): path to a YAML file
Returns:
pydantic.ValidationError
Raises:
ValueError if the YAML document is not a geometamaker metadata doc.
"""
with fsspec.open(filepath, 'r') as file:
yaml_string = file.read()
yaml_dict = yaml.safe_load(yaml_string)
if not yaml_dict or ('metadata_version' not in yaml_dict
and 'geometamaker_version' not in yaml_dict):
message = (f'{filepath} exists but is not compatible with '
f'geometamaker.')
raise ValueError(message)
try:
RESOURCE_MODELS[yaml_dict['type']](**yaml_dict)
except ValidationError as error:
return error
[docs]
def validate_dir(directory, depth=numpy.iinfo(numpy.int16).max):
"""Validate all compatible yml documents in the directory.
Args:
directory (string): path to a directory
depth (int): maximum number of subdirectory levels to
traverse when walking through ``directory``.
Returns:
tuple (list, list): a list of the filepaths that were validated and
an equal-length list of the validation messages.
"""
file_list = _list_files_with_depth(directory, depth)
messages = []
yaml_files = []
for filepath in file_list:
if filepath.endswith('.yml'):
yaml_files.append(filepath)
msg = ''
try:
error = validate(os.path.join(directory, filepath))
if error:
msg = error
except ValueError:
msg = 'does not appear to be a geometamaker document'
except yaml.YAMLError as exc:
LOGGER.debug(exc)
msg = 'is not a readable yaml document'
messages.append(msg)
return (yaml_files, messages)