# Copyright Iris contributors
#
# This file is part of Iris and is released under the LGPL license.
# See COPYING and COPYING.LESSER in the root of the repository for full
# licensing details.
"""
Module to support the loading of a NetCDF file into an Iris cube.
See also: `netCDF4 python <https://github.com/Unidata/netcdf4-python>`_
Also refer to document 'NetCDF Climate and Forecast (CF) Metadata Conventions'.
"""
import collections
import collections.abc
from itertools import repeat, zip_longest
import os
import os.path
import re
import string
from typing import List
import warnings
import cf_units
import dask.array as da
import netCDF4
import numpy as np
import numpy.ma as ma
from iris._lazy_data import _co_realise_lazy_arrays, as_lazy_data, is_lazy_data
from iris.aux_factory import (
AtmosphereSigmaFactory,
HybridHeightFactory,
HybridPressureFactory,
OceanSFactory,
OceanSg1Factory,
OceanSg2Factory,
OceanSigmaFactory,
OceanSigmaZFactory,
)
import iris.config
import iris.coord_systems
import iris.coords
from iris.coords import AncillaryVariable, AuxCoord, CellMeasure, DimCoord
import iris.exceptions
import iris.fileformats.cf
import iris.io
import iris.util
# Show actions activation statistics.
DEBUG = False
# Configure the logger.
logger = iris.config.get_logger(__name__)
# Standard CML spatio-temporal axis names.
SPATIO_TEMPORAL_AXES = ["t", "z", "y", "x"]
# Pass through CF attributes:
# - comment
# - Conventions
# - flag_masks
# - flag_meanings
# - flag_values
# - history
# - institution
# - reference
# - source
# - title
# - positive
#
_CF_ATTRS = [
"add_offset",
"ancillary_variables",
"axis",
"bounds",
"calendar",
"cell_measures",
"cell_methods",
"climatology",
"compress",
"coordinates",
"_FillValue",
"formula_terms",
"grid_mapping",
"leap_month",
"leap_year",
"long_name",
"missing_value",
"month_lengths",
"scale_factor",
"standard_error_multiplier",
"standard_name",
"units",
]
# CF attributes that should not be global.
_CF_DATA_ATTRS = [
"flag_masks",
"flag_meanings",
"flag_values",
"instance_dimension",
"missing_value",
"sample_dimension",
"standard_error_multiplier",
]
# CF attributes that should only be global.
_CF_GLOBAL_ATTRS = ["conventions", "featureType", "history", "title"]
# UKMO specific attributes that should not be global.
_UKMO_DATA_ATTRS = ["STASH", "um_stash_source", "ukmo__process_flags"]
CF_CONVENTIONS_VERSION = "CF-1.7"
_FactoryDefn = collections.namedtuple(
"_FactoryDefn", ("primary", "std_name", "formula_terms_format")
)
_FACTORY_DEFNS = {
AtmosphereSigmaFactory: _FactoryDefn(
primary="sigma",
std_name="atmosphere_sigma_coordinate",
formula_terms_format="ptop: {pressure_at_top} sigma: {sigma} "
"ps: {surface_air_pressure}",
),
HybridHeightFactory: _FactoryDefn(
primary="delta",
std_name="atmosphere_hybrid_height_coordinate",
formula_terms_format="a: {delta} b: {sigma} orog: {orography}",
),
HybridPressureFactory: _FactoryDefn(
primary="delta",
std_name="atmosphere_hybrid_sigma_pressure_coordinate",
formula_terms_format="ap: {delta} b: {sigma} "
"ps: {surface_air_pressure}",
),
OceanSigmaZFactory: _FactoryDefn(
primary="zlev",
std_name="ocean_sigma_z_coordinate",
formula_terms_format="sigma: {sigma} eta: {eta} depth: {depth} "
"depth_c: {depth_c} nsigma: {nsigma} zlev: {zlev}",
),
OceanSigmaFactory: _FactoryDefn(
primary="sigma",
std_name="ocean_sigma_coordinate",
formula_terms_format="sigma: {sigma} eta: {eta} depth: {depth}",
),
OceanSFactory: _FactoryDefn(
primary="s",
std_name="ocean_s_coordinate",
formula_terms_format="s: {s} eta: {eta} depth: {depth} a: {a} b: {b} "
"depth_c: {depth_c}",
),
OceanSg1Factory: _FactoryDefn(
primary="s",
std_name="ocean_s_coordinate_g1",
formula_terms_format="s: {s} c: {c} eta: {eta} depth: {depth} "
"depth_c: {depth_c}",
),
OceanSg2Factory: _FactoryDefn(
primary="s",
std_name="ocean_s_coordinate_g2",
formula_terms_format="s: {s} c: {c} eta: {eta} depth: {depth} "
"depth_c: {depth_c}",
),
}
# Cell methods.
_CM_KNOWN_METHODS = [
"point",
"sum",
"mean",
"maximum",
"minimum",
"mid_range",
"standard_deviation",
"variance",
"mode",
"median",
]
_CM_COMMENT = "comment"
_CM_EXTRA = "extra"
_CM_INTERVAL = "interval"
_CM_METHOD = "method"
_CM_NAME = "name"
_CM_PARSE_NAME = re.compile(r"([\w_]+\s*?:\s+)+")
_CM_PARSE = re.compile(
r"""
(?P<name>([\w_]+\s*?:\s+)+)
(?P<method>[\w_\s]+(?![\w_]*\s*?:))\s*
(?:
\(\s*
(?P<extra>.+)
\)\s*
)?
""",
re.VERBOSE,
)
[docs]class UnknownCellMethodWarning(Warning):
pass
def _split_cell_methods(nc_cell_methods: str) -> List[re.Match]:
"""
Split a CF cell_methods attribute string into a list of zero or more cell
methods, each of which is then parsed with a regex to return a list of match
objects.
Args:
* nc_cell_methods: The value of the cell methods attribute to be split.
Returns:
* nc_cell_methods_matches: A list of the re.Match objects associated with
each parsed cell method
Splitting is done based on words followed by colons outside of any brackets.
Validation of anything other than being laid out in the expected format is
left to the calling function.
"""
# Find name candidates
name_start_inds = []
for m in _CM_PARSE_NAME.finditer(nc_cell_methods):
name_start_inds.append(m.start())
# Remove those that fall inside brackets
bracket_depth = 0
for ind, cha in enumerate(nc_cell_methods):
if cha == "(":
bracket_depth += 1
elif cha == ")":
bracket_depth -= 1
if bracket_depth < 0:
msg = (
"Cell methods may be incorrectly parsed due to mismatched "
"brackets"
)
warnings.warn(msg, UserWarning, stacklevel=2)
if bracket_depth > 0 and ind in name_start_inds:
name_start_inds.remove(ind)
# List tuples of indices of starts and ends of the cell methods in the string
method_indices = []
for ii in range(len(name_start_inds) - 1):
method_indices.append((name_start_inds[ii], name_start_inds[ii + 1]))
method_indices.append((name_start_inds[-1], len(nc_cell_methods)))
# Index the string and match against each substring
nc_cell_methods_matches = []
for start_ind, end_ind in method_indices:
nc_cell_method_str = nc_cell_methods[start_ind:end_ind]
nc_cell_method_match = _CM_PARSE.match(nc_cell_method_str.strip())
if not nc_cell_method_match:
msg = (
f"Failed to fully parse cell method string: {nc_cell_methods}"
)
warnings.warn(msg, UserWarning, stacklevel=2)
continue
nc_cell_methods_matches.append(nc_cell_method_match)
return nc_cell_methods_matches
[docs]def parse_cell_methods(nc_cell_methods):
"""
Parse a CF cell_methods attribute string into a tuple of zero or
more CellMethod instances.
Args:
* nc_cell_methods (str):
The value of the cell methods attribute to be parsed.
Returns:
* cell_methods
An iterable of :class:`iris.coords.CellMethod`.
Multiple coordinates, intervals and comments are supported.
If a method has a non-standard name a warning will be issued, but the
results are not affected.
"""
cell_methods = []
if nc_cell_methods is not None:
for m in _split_cell_methods(nc_cell_methods):
d = m.groupdict()
method = d[_CM_METHOD]
method = method.strip()
# Check validity of method, allowing for multi-part methods
# e.g. mean over years.
method_words = method.split()
if method_words[0].lower() not in _CM_KNOWN_METHODS:
msg = "NetCDF variable contains unknown cell method {!r}"
warnings.warn(
msg.format("{}".format(method_words[0])),
UnknownCellMethodWarning,
)
d[_CM_METHOD] = method
name = d[_CM_NAME]
name = name.replace(" ", "")
name = name.rstrip(":")
d[_CM_NAME] = tuple([n for n in name.split(":")])
interval = []
comment = []
if d[_CM_EXTRA] is not None:
#
# tokenise the key words and field colon marker
#
d[_CM_EXTRA] = d[_CM_EXTRA].replace(
"comment:", "<<comment>><<:>>"
)
d[_CM_EXTRA] = d[_CM_EXTRA].replace(
"interval:", "<<interval>><<:>>"
)
d[_CM_EXTRA] = d[_CM_EXTRA].split("<<:>>")
if len(d[_CM_EXTRA]) == 1:
comment.extend(d[_CM_EXTRA])
else:
next_field_type = comment
for field in d[_CM_EXTRA]:
field_type = next_field_type
index = field.rfind("<<interval>>")
if index == 0:
next_field_type = interval
continue
elif index > 0:
next_field_type = interval
else:
index = field.rfind("<<comment>>")
if index == 0:
next_field_type = comment
continue
elif index > 0:
next_field_type = comment
if index != -1:
field = field[:index]
field_type.append(field.strip())
#
# cater for a shared interval over multiple axes
#
if len(interval):
if len(d[_CM_NAME]) != len(interval) and len(interval) == 1:
interval = interval * len(d[_CM_NAME])
#
# cater for a shared comment over multiple axes
#
if len(comment):
if len(d[_CM_NAME]) != len(comment) and len(comment) == 1:
comment = comment * len(d[_CM_NAME])
d[_CM_INTERVAL] = tuple(interval)
d[_CM_COMMENT] = tuple(comment)
cell_method = iris.coords.CellMethod(
d[_CM_METHOD],
coords=d[_CM_NAME],
intervals=d[_CM_INTERVAL],
comments=d[_CM_COMMENT],
)
cell_methods.append(cell_method)
return tuple(cell_methods)
[docs]class CFNameCoordMap:
"""Provide a simple CF name to CF coordinate mapping."""
_Map = collections.namedtuple("_Map", ["name", "coord"])
def __init__(self):
self._map = []
[docs] def append(self, name, coord):
"""
Append the given name and coordinate pair to the mapping.
Args:
* name:
CF name of the associated coordinate.
* coord:
The coordinate of the associated CF name.
Returns:
None.
"""
self._map.append(CFNameCoordMap._Map(name, coord))
@property
def names(self):
"""Return all the CF names."""
return [pair.name for pair in self._map]
@property
def coords(self):
"""Return all the coordinates."""
return [pair.coord for pair in self._map]
[docs] def name(self, coord):
"""
Return the CF name, given a coordinate, or None if not recognised.
Args:
* coord:
The coordinate of the associated CF name.
Returns:
Coordinate or None.
"""
result = None
for pair in self._map:
if coord == pair.coord:
result = pair.name
break
return result
[docs] def coord(self, name):
"""
Return the coordinate, given a CF name, or None if not recognised.
Args:
* name:
CF name of the associated coordinate, or None if not recognised.
Returns:
CF name or None.
"""
result = None
for pair in self._map:
if name == pair.name:
result = pair.coord
break
return result
def _actions_engine():
# Return an 'actions engine', which provides a pyke-rules-like interface to
# the core cf translation code.
# Deferred import to avoid circularity.
import iris.fileformats._nc_load_rules.engine as nc_actions_engine
engine = nc_actions_engine.Engine()
return engine
[docs]class NetCDFDataProxy:
"""A reference to the data payload of a single NetCDF file variable."""
__slots__ = ("shape", "dtype", "path", "variable_name", "fill_value")
def __init__(self, shape, dtype, path, variable_name, fill_value):
self.shape = shape
self.dtype = dtype
self.path = path
self.variable_name = variable_name
self.fill_value = fill_value
@property
def ndim(self):
return len(self.shape)
def __getitem__(self, keys):
dataset = netCDF4.Dataset(self.path)
try:
variable = dataset.variables[self.variable_name]
# Get the NetCDF variable data and slice.
var = variable[keys]
finally:
dataset.close()
return np.asanyarray(var)
def __repr__(self):
fmt = (
"<{self.__class__.__name__} shape={self.shape}"
" dtype={self.dtype!r} path={self.path!r}"
" variable_name={self.variable_name!r}>"
)
return fmt.format(self=self)
def __getstate__(self):
return {attr: getattr(self, attr) for attr in self.__slots__}
def __setstate__(self, state):
for key, value in state.items():
setattr(self, key, value)
def _assert_case_specific_facts(engine, cf, cf_group):
# Initialise a data store for built cube elements.
# This is used to patch element attributes *not* setup by the actions
# process, after the actions code has run.
engine.cube_parts["coordinates"] = []
engine.cube_parts["cell_measures"] = []
engine.cube_parts["ancillary_variables"] = []
# Assert facts for CF coordinates.
for cf_name in cf_group.coordinates.keys():
engine.add_case_specific_fact("coordinate", (cf_name,))
# Assert facts for CF auxiliary coordinates.
for cf_name in cf_group.auxiliary_coordinates.keys():
engine.add_case_specific_fact("auxiliary_coordinate", (cf_name,))
# Assert facts for CF cell measures.
for cf_name in cf_group.cell_measures.keys():
engine.add_case_specific_fact("cell_measure", (cf_name,))
# Assert facts for CF ancillary variables.
for cf_name in cf_group.ancillary_variables.keys():
engine.add_case_specific_fact("ancillary_variable", (cf_name,))
# Assert facts for CF grid_mappings.
for cf_name in cf_group.grid_mappings.keys():
engine.add_case_specific_fact("grid_mapping", (cf_name,))
# Assert facts for CF labels.
for cf_name in cf_group.labels.keys():
engine.add_case_specific_fact("label", (cf_name,))
# Assert facts for CF formula terms associated with the cf_group
# of the CF data variable.
# Collect varnames of formula-root variables as we go.
# NOTE: use dictionary keys as an 'OrderedSet'
# - see: https://stackoverflow.com/a/53657523/2615050
# This is to ensure that we can handle the resulting facts in a definite
# order, as using a 'set' led to indeterminate results.
formula_root = {}
for cf_var in cf.cf_group.formula_terms.values():
for cf_root, cf_term in cf_var.cf_terms_by_root.items():
# Only assert this fact if the formula root variable is
# defined in the CF group of the CF data variable.
if cf_root in cf_group:
formula_root[cf_root] = True
engine.add_case_specific_fact(
"formula_term",
(cf_var.cf_name, cf_root, cf_term),
)
for cf_root in formula_root.keys():
engine.add_case_specific_fact("formula_root", (cf_root,))
def _actions_activation_stats(engine, cf_name):
print("-" * 80)
print("CF Data Variable: %r" % cf_name)
engine.print_stats()
print("Rules Triggered:")
for rule in sorted(list(engine.rules_triggered)):
print("\t%s" % rule)
print("Case Specific Facts:")
kb_facts = engine.get_kb()
for key in kb_facts.entity_lists.keys():
for arg in kb_facts.entity_lists[key].case_specific_facts:
print("\t%s%s" % (key, arg))
def _set_attributes(attributes, key, value):
"""Set attributes dictionary, converting unicode strings appropriately."""
if isinstance(value, str):
try:
attributes[str(key)] = str(value)
except UnicodeEncodeError:
attributes[str(key)] = value
else:
attributes[str(key)] = value
def _add_unused_attributes(iris_object, cf_var):
"""
Populate the attributes of a cf element with the "unused" attributes
from the associated CF-netCDF variable. That is, all those that aren't CF
reserved terms.
"""
def attribute_predicate(item):
return item[0] not in _CF_ATTRS
tmpvar = filter(attribute_predicate, cf_var.cf_attrs_unused())
for attr_name, attr_value in tmpvar:
_set_attributes(iris_object.attributes, attr_name, attr_value)
def _get_actual_dtype(cf_var):
# Figure out what the eventual data type will be after any scale/offset
# transforms.
dummy_data = np.zeros(1, dtype=cf_var.dtype)
if hasattr(cf_var, "scale_factor"):
dummy_data = cf_var.scale_factor * dummy_data
if hasattr(cf_var, "add_offset"):
dummy_data = cf_var.add_offset + dummy_data
return dummy_data.dtype
def _get_cf_var_data(cf_var, filename):
# Get lazy chunked data out of a cf variable.
dtype = _get_actual_dtype(cf_var)
# Create cube with deferred data, but no metadata
fill_value = getattr(
cf_var.cf_data,
"_FillValue",
netCDF4.default_fillvals[cf_var.dtype.str[1:]],
)
proxy = NetCDFDataProxy(
cf_var.shape, dtype, filename, cf_var.cf_name, fill_value
)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
chunks = cf_var.cf_data.chunking()
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == "contiguous":
chunks = None
return as_lazy_data(proxy, chunks=chunks)
class _OrderedAddableList(list):
"""
A custom container object for actions recording.
Used purely in actions debugging, to accumulate a record of which actions
were activated.
It replaces a set, so as to preserve the ordering of operations, with
possible repeats, and it also numbers the entries.
The actions routines invoke an 'add' method, so this effectively replaces
a set.add with a list.append.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._n_add = 0
def add(self, msg):
self._n_add += 1
n_add = self._n_add
self.append(f"#{n_add:03d} : {msg}")
def _load_cube(engine, cf, cf_var, filename):
from iris.cube import Cube
"""Create the cube associated with the CF-netCDF data variable."""
data = _get_cf_var_data(cf_var, filename)
cube = Cube(data)
# Reset the actions engine.
engine.reset()
# Initialise engine rule processing hooks.
engine.cf_var = cf_var
engine.cube = cube
engine.cube_parts = {}
engine.requires = {}
engine.rules_triggered = _OrderedAddableList()
engine.filename = filename
# Assert all the case-specific facts.
# This extracts 'facts' specific to this data-variable (aka cube), from
# the info supplied in the CFGroup object.
_assert_case_specific_facts(engine, cf, cf_var.cf_group)
# Run the actions engine.
# This creates various cube elements and attaches them to the cube.
# It also records various other info on the engine, to be processed later.
engine.activate()
# Having run the rules, now add the "unused" attributes to each cf element.
def fix_attributes_all_elements(role_name):
elements_and_names = engine.cube_parts.get(role_name, [])
for iris_object, cf_var_name in elements_and_names:
_add_unused_attributes(iris_object, cf.cf_group[cf_var_name])
# Populate the attributes of all coordinates, cell-measures and ancillary-vars.
fix_attributes_all_elements("coordinates")
fix_attributes_all_elements("ancillary_variables")
fix_attributes_all_elements("cell_measures")
# Also populate attributes of the top-level cube itself.
_add_unused_attributes(cube, cf_var)
# Work out reference names for all the coords.
names = {
coord.var_name: coord.standard_name or coord.var_name or "unknown"
for coord in cube.coords()
}
# Add all the cube cell methods.
cube.cell_methods = [
iris.coords.CellMethod(
method=method.method,
intervals=method.intervals,
comments=method.comments,
coords=[
names[coord_name] if coord_name in names else coord_name
for coord_name in method.coord_names
],
)
for method in cube.cell_methods
]
if DEBUG:
# Show activation statistics for this data-var (i.e. cube).
_actions_activation_stats(engine, cf_var.cf_name)
return cube
def _load_aux_factory(engine, cube):
"""
Convert any CF-netCDF dimensionless coordinate to an AuxCoordFactory.
"""
formula_type = engine.requires.get("formula_type")
if formula_type in [
"atmosphere_sigma_coordinate",
"atmosphere_hybrid_height_coordinate",
"atmosphere_hybrid_sigma_pressure_coordinate",
"ocean_sigma_z_coordinate",
"ocean_sigma_coordinate",
"ocean_s_coordinate",
"ocean_s_coordinate_g1",
"ocean_s_coordinate_g2",
]:
def coord_from_term(term):
# Convert term names to coordinates (via netCDF variable names).
name = engine.requires["formula_terms"].get(term, None)
if name is not None:
for coord, cf_var_name in engine.cube_parts["coordinates"]:
if cf_var_name == name:
return coord
warnings.warn(
"Unable to find coordinate for variable "
"{!r}".format(name)
)
if formula_type == "atmosphere_sigma_coordinate":
pressure_at_top = coord_from_term("ptop")
sigma = coord_from_term("sigma")
surface_air_pressure = coord_from_term("ps")
factory = AtmosphereSigmaFactory(
pressure_at_top, sigma, surface_air_pressure
)
elif formula_type == "atmosphere_hybrid_height_coordinate":
delta = coord_from_term("a")
sigma = coord_from_term("b")
orography = coord_from_term("orog")
factory = HybridHeightFactory(delta, sigma, orography)
elif formula_type == "atmosphere_hybrid_sigma_pressure_coordinate":
# Hybrid pressure has two valid versions of its formula terms:
# "p0: var1 a: var2 b: var3 ps: var4" or
# "ap: var1 b: var2 ps: var3" where "ap = p0 * a"
# Attempt to get the "ap" term.
delta = coord_from_term("ap")
if delta is None:
# The "ap" term is unavailable, so try getting terms "p0"
# and "a" terms in order to derive an "ap" equivalent term.
coord_p0 = coord_from_term("p0")
if coord_p0 is not None:
if coord_p0.shape != (1,):
msg = (
"Expecting {!r} to be a scalar reference "
"pressure coordinate, got shape {!r}".format(
coord_p0.var_name, coord_p0.shape
)
)
raise ValueError(msg)
if coord_p0.has_bounds():
msg = (
"Ignoring atmosphere hybrid sigma pressure "
"scalar coordinate {!r} bounds.".format(
coord_p0.name()
)
)
warnings.warn(msg)
coord_a = coord_from_term("a")
if coord_a is not None:
if coord_a.units.is_unknown():
# Be graceful, and promote unknown to dimensionless units.
coord_a.units = "1"
delta = coord_a * coord_p0.points[0]
delta.units = coord_a.units * coord_p0.units
delta.rename("vertical pressure")
delta.var_name = "ap"
cube.add_aux_coord(delta, cube.coord_dims(coord_a))
sigma = coord_from_term("b")
surface_air_pressure = coord_from_term("ps")
factory = HybridPressureFactory(delta, sigma, surface_air_pressure)
elif formula_type == "ocean_sigma_z_coordinate":
sigma = coord_from_term("sigma")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
depth_c = coord_from_term("depth_c")
nsigma = coord_from_term("nsigma")
zlev = coord_from_term("zlev")
factory = OceanSigmaZFactory(
sigma, eta, depth, depth_c, nsigma, zlev
)
elif formula_type == "ocean_sigma_coordinate":
sigma = coord_from_term("sigma")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
factory = OceanSigmaFactory(sigma, eta, depth)
elif formula_type == "ocean_s_coordinate":
s = coord_from_term("s")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
a = coord_from_term("a")
depth_c = coord_from_term("depth_c")
b = coord_from_term("b")
factory = OceanSFactory(s, eta, depth, a, b, depth_c)
elif formula_type == "ocean_s_coordinate_g1":
s = coord_from_term("s")
c = coord_from_term("c")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
depth_c = coord_from_term("depth_c")
factory = OceanSg1Factory(s, c, eta, depth, depth_c)
elif formula_type == "ocean_s_coordinate_g2":
s = coord_from_term("s")
c = coord_from_term("c")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
depth_c = coord_from_term("depth_c")
factory = OceanSg2Factory(s, c, eta, depth, depth_c)
cube.add_aux_factory(factory)
def _translate_constraints_to_var_callback(constraints):
"""
Translate load constraints into a simple data-var filter function, if possible.
Returns:
* function(cf_var:CFDataVariable): --> bool,
or None.
For now, ONLY handles a single NameConstraint with no 'STASH' component.
"""
import iris._constraints
constraints = iris._constraints.list_of_constraints(constraints)
result = None
if len(constraints) == 1:
(constraint,) = constraints
if (
isinstance(constraint, iris._constraints.NameConstraint)
and constraint.STASH == "none"
):
# As long as it doesn't use a STASH match, then we can treat it as
# a testing against name properties of cf_var.
# That's just like testing against name properties of a cube, except that they may not all exist.
def inner(cf_datavar):
match = True
for name in constraint._names:
expected = getattr(constraint, name)
if name != "STASH" and expected != "none":
attr_name = "cf_name" if name == "var_name" else name
# Fetch property : N.B. CFVariable caches the property values
# The use of a default here is the only difference from the code in NameConstraint.
if not hasattr(cf_datavar, attr_name):
continue
actual = getattr(cf_datavar, attr_name, "")
if actual != expected:
match = False
break
return match
result = inner
return result
[docs]def load_cubes(filenames, callback=None, constraints=None):
"""
Loads cubes from a list of NetCDF filenames/OPeNDAP URLs.
Args:
* filenames (string/list):
One or more NetCDF filenames/OPeNDAP URLs to load from.
Kwargs:
* callback (callable function):
Function which can be passed on to :func:`iris.io.run_callback`.
Returns:
Generator of loaded NetCDF :class:`iris.cube.Cube`.
"""
# TODO: rationalise UGRID/mesh handling once experimental.ugrid is folded
# into standard behaviour.
# Deferred import to avoid circular imports.
from iris.experimental.ugrid.cf import CFUGridReader
from iris.experimental.ugrid.load import (
PARSE_UGRID_ON_LOAD,
_build_mesh_coords,
_meshes_from_cf,
)
from iris.io import run_callback
# Create a low-level data-var filter from the original load constraints, if they are suitable.
var_callback = _translate_constraints_to_var_callback(constraints)
# Create an actions engine.
engine = _actions_engine()
if isinstance(filenames, str):
filenames = [filenames]
for filename in filenames:
# Ingest the netCDF file.
meshes = {}
if PARSE_UGRID_ON_LOAD:
cf = CFUGridReader(filename)
meshes = _meshes_from_cf(cf)
else:
cf = iris.fileformats.cf.CFReader(filename)
# Process each CF data variable.
data_variables = list(cf.cf_group.data_variables.values()) + list(
cf.cf_group.promoted.values()
)
for cf_var in data_variables:
if var_callback and not var_callback(cf_var):
# Deliver only selected results.
continue
# cf_var-specific mesh handling, if a mesh is present.
# Build the mesh_coords *before* loading the cube - avoids
# mesh-related attributes being picked up by
# _add_unused_attributes().
mesh_name = None
mesh = None
mesh_coords, mesh_dim = [], None
if PARSE_UGRID_ON_LOAD:
mesh_name = getattr(cf_var, "mesh", None)
if mesh_name is not None:
try:
mesh = meshes[mesh_name]
except KeyError:
message = (
f"File does not contain mesh: '{mesh_name}' - "
f"referenced by variable: '{cf_var.cf_name}' ."
)
logger.debug(message)
if mesh is not None:
mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var)
cube = _load_cube(engine, cf, cf_var, filename)
# Attach the mesh (if present) to the cube.
for mesh_coord in mesh_coords:
cube.add_aux_coord(mesh_coord, mesh_dim)
# Process any associated formula terms and attach
# the corresponding AuxCoordFactory.
try:
_load_aux_factory(engine, cube)
except ValueError as e:
warnings.warn("{}".format(e))
# Perform any user registered callback function.
cube = run_callback(callback, cube, cf_var, filename)
# Callback mechanism may return None, which must not be yielded
if cube is None:
continue
yield cube
def _bytes_if_ascii(string):
"""
Convert the given string to a byte string (str in py2k, bytes in py3k)
if the given string can be encoded to ascii, else maintain the type
of the inputted string.
Note: passing objects without an `encode` method (such as None) will
be returned by the function unchanged.
"""
if isinstance(string, str):
try:
return string.encode(encoding="ascii")
except (AttributeError, UnicodeEncodeError):
pass
return string
def _setncattr(variable, name, attribute):
"""
Put the given attribute on the given netCDF4 Data type, casting
attributes as we go to bytes rather than unicode.
"""
attribute = _bytes_if_ascii(attribute)
return variable.setncattr(name, attribute)
class _FillValueMaskCheckAndStoreTarget:
"""
To be used with da.store. Remembers whether any element was equal to a
given value and whether it was masked, before passing the chunk to the
given target.
"""
def __init__(self, target, fill_value=None):
self.target = target
self.fill_value = fill_value
self.contains_value = False
self.is_masked = False
def __setitem__(self, keys, arr):
if self.fill_value is not None:
self.contains_value = self.contains_value or self.fill_value in arr
self.is_masked = self.is_masked or ma.is_masked(arr)
self.target[keys] = arr
# NOTE : this matches :class:`iris.experimental.ugrid.mesh.Mesh.ELEMENTS`,
# but in the preferred order for coord/connectivity variables in the file.
MESH_ELEMENTS = ("node", "edge", "face")
[docs]class Saver:
"""A manager for saving netcdf files."""
def __init__(self, filename, netcdf_format):
"""
A manager for saving netcdf files.
Args:
* filename (string):
Name of the netCDF file to save the cube.
* netcdf_format (string):
Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC',
'NETCDF3_CLASSIC' or 'NETCDF3_64BIT'. Default is 'NETCDF4' format.
Returns:
None.
For example::
# Initialise Manager for saving
with Saver(filename, netcdf_format) as sman:
# Iterate through the cubelist.
for cube in cubes:
sman.write(cube)
"""
if netcdf_format not in [
"NETCDF4",
"NETCDF4_CLASSIC",
"NETCDF3_CLASSIC",
"NETCDF3_64BIT",
]:
raise ValueError(
"Unknown netCDF file format, got %r" % netcdf_format
)
# All persistent variables
#: CF name mapping with iris coordinates
self._name_coord_map = CFNameCoordMap()
#: Map of dimensions to characteristic coordinates with which they are identified
self._dim_names_and_coords = CFNameCoordMap()
#: List of grid mappings added to the file
self._coord_systems = []
#: A dictionary, listing dimension names and corresponding length
self._existing_dim = {}
#: A map from meshes to their actual file dimensions (names).
# NB: might not match those of the mesh, if they were 'incremented'.
self._mesh_dims = {}
#: A dictionary, mapping formula terms to owner cf variable name
self._formula_terms_cache = {}
#: NetCDF dataset
try:
self._dataset = netCDF4.Dataset(
filename, mode="w", format=netcdf_format
)
except RuntimeError:
dir_name = os.path.dirname(filename)
if not os.path.isdir(dir_name):
msg = "No such file or directory: {}".format(dir_name)
raise IOError(msg)
if not os.access(dir_name, os.R_OK | os.W_OK):
msg = "Permission denied: {}".format(filename)
raise IOError(msg)
else:
raise
def __enter__(self):
return self
[docs] def __exit__(self, type, value, traceback):
"""Flush any buffered data to the CF-netCDF file before closing."""
self._dataset.sync()
self._dataset.close()
[docs] def write(
self,
cube,
local_keys=None,
unlimited_dimensions=None,
zlib=False,
complevel=4,
shuffle=True,
fletcher32=False,
contiguous=False,
chunksizes=None,
endian="native",
least_significant_digit=None,
packing=None,
fill_value=None,
):
"""
Wrapper for saving cubes to a NetCDF file.
Args:
* cube (:class:`iris.cube.Cube`):
A :class:`iris.cube.Cube` to be saved to a netCDF file.
Kwargs:
* local_keys (iterable of strings):
An interable of cube attribute keys. Any cube attributes with
matching keys will become attributes on the data variable rather
than global attributes.
* unlimited_dimensions (iterable of strings and/or
:class:`iris.coords.Coord` objects):
List of coordinate names (or coordinate objects)
corresponding to coordinate dimensions of `cube` to save with the
NetCDF dimension variable length 'UNLIMITED'. By default, no
unlimited dimensions are saved. Only the 'NETCDF4' format
supports multiple 'UNLIMITED' dimensions.
* zlib (bool):
If `True`, the data will be compressed in the netCDF file using
gzip compression (default `False`).
* complevel (int):
An integer between 1 and 9 describing the level of compression
desired (default 4). Ignored if `zlib=False`.
* shuffle (bool):
If `True`, the HDF5 shuffle filter will be applied before
compressing the data (default `True`). This significantly improves
compression. Ignored if `zlib=False`.
* fletcher32 (bool):
If `True`, the Fletcher32 HDF5 checksum algorithm is activated to
detect errors. Default `False`.
* contiguous (bool):
If `True`, the variable data is stored contiguously on disk.
Default `False`. Setting to `True` for a variable with an unlimited
dimension will trigger an error.
* chunksizes (tuple of int):
Used to manually specify the HDF5 chunksizes for each dimension of
the variable. A detailed discussion of HDF chunking and I/O
performance is available here:
https://www.unidata.ucar.edu/software/netcdf/documentation/NUG/netcdf_perf_chunking.html.
Basically, you want the chunk size for each dimension to match
as closely as possible the size of the data block that users will
read from the file. `chunksizes` cannot be set if `contiguous=True`.
* endian (string):
Used to control whether the data is stored in little or big endian
format on disk. Possible values are 'little', 'big' or 'native'
(default). The library will automatically handle endian conversions
when the data is read, but if the data is always going to be read
on a computer with the opposite format as the one used to create
the file, there may be some performance advantage to be gained by
setting the endian-ness.
* least_significant_digit (int):
If `least_significant_digit` is specified, variable data will be
truncated (quantized). In conjunction with `zlib=True` this
produces 'lossy', but significantly more efficient compression. For
example, if `least_significant_digit=1`, data will be quantized
using `numpy.around(scale*data)/scale`, where `scale = 2**bits`,
and `bits` is determined so that a precision of 0.1 is retained (in
this case `bits=4`). From
http://www.esrl.noaa.gov/psd/data/gridded/conventions/cdc_netcdf_standard.shtml:
"least_significant_digit -- power of ten of the smallest decimal
place in unpacked data that is a reliable value". Default is
`None`, or no quantization, or 'lossless' compression.
* packing (type or string or dict or list): A numpy integer datatype
(signed or unsigned) or a string that describes a numpy integer
dtype(i.e. 'i2', 'short', 'u4') or a dict of packing parameters as
described below. This provides support for netCDF data packing as
described in
https://www.unidata.ucar.edu/software/netcdf/documentation/NUG/best_practices.html#bp_Packed-Data-Values
If this argument is a type (or type string), appropriate values of
scale_factor and add_offset will be automatically calculated based
on `cube.data` and possible masking. For more control, pass a dict
with one or more of the following keys: `dtype` (required),
`scale_factor` and `add_offset`. Note that automatic calculation of
packing parameters will trigger loading of lazy data; set them
manually using a dict to avoid this. The default is `None`, in
which case the datatype is determined from the cube and no packing
will occur.
* fill_value:
The value to use for the `_FillValue` attribute on the netCDF
variable. If `packing` is specified the value of `fill_value`
should be in the domain of the packed data.
Returns:
None.
.. note::
The `zlib`, `complevel`, `shuffle`, `fletcher32`, `contiguous`,
`chunksizes` and `endian` keywords are silently ignored for netCDF
3 files that do not use HDF5.
"""
if unlimited_dimensions is None:
unlimited_dimensions = []
cf_profile_available = iris.site_configuration.get(
"cf_profile"
) not in [None, False]
if cf_profile_available:
# Perform a CF profile of the cube. This may result in an exception
# being raised if mandatory requirements are not satisfied.
profile = iris.site_configuration["cf_profile"](cube)
# Ensure that attributes are CF compliant and if possible to make them
# compliant.
self.check_attribute_compliance(cube, cube.dtype)
for coord in cube.coords():
self.check_attribute_compliance(coord, coord.dtype)
# Get suitable dimension names.
mesh_dimensions, cube_dimensions = self._get_dim_names(cube)
# Create all the CF-netCDF data dimensions.
# Put mesh dims first, then non-mesh dims in cube-occurring order.
nonmesh_dimensions = [
dim for dim in cube_dimensions if dim not in mesh_dimensions
]
all_dimensions = mesh_dimensions + nonmesh_dimensions
self._create_cf_dimensions(cube, all_dimensions, unlimited_dimensions)
# Create the mesh components, if there is a mesh.
# We do this before creating the data-var, so that mesh vars precede
# data-vars in the file.
cf_mesh_name = self._add_mesh(cube)
# Create the associated cube CF-netCDF data variable.
cf_var_cube = self._create_cf_data_variable(
cube,
cube_dimensions,
local_keys,
zlib=zlib,
complevel=complevel,
shuffle=shuffle,
fletcher32=fletcher32,
contiguous=contiguous,
chunksizes=chunksizes,
endian=endian,
least_significant_digit=least_significant_digit,
packing=packing,
fill_value=fill_value,
)
# Associate any mesh with the data-variable.
# N.B. _add_mesh cannot do this, as we want to put mesh variables
# before data-variables in the file.
if cf_mesh_name is not None:
_setncattr(cf_var_cube, "mesh", cf_mesh_name)
_setncattr(cf_var_cube, "location", cube.location)
# Add coordinate variables.
self._add_dim_coords(cube, cube_dimensions)
# Add the auxiliary coordinate variables and associate the data
# variable to them
self._add_aux_coords(cube, cf_var_cube, cube_dimensions)
# Add the cell_measures variables and associate the data
# variable to them
self._add_cell_measures(cube, cf_var_cube, cube_dimensions)
# Add the ancillary_variables variables and associate the data variable
# to them
self._add_ancillary_variables(cube, cf_var_cube, cube_dimensions)
# Add the formula terms to the appropriate cf variables for each
# aux factory in the cube.
self._add_aux_factories(cube, cf_var_cube, cube_dimensions)
# Add data variable-only attribute names to local_keys.
if local_keys is None:
local_keys = set()
else:
local_keys = set(local_keys)
local_keys.update(_CF_DATA_ATTRS, _UKMO_DATA_ATTRS)
# Add global attributes taking into account local_keys.
global_attributes = {
k: v
for k, v in cube.attributes.items()
if (k not in local_keys and k.lower() != "conventions")
}
self.update_global_attributes(global_attributes)
if cf_profile_available:
cf_patch = iris.site_configuration.get("cf_patch")
if cf_patch is not None:
# Perform a CF patch of the dataset.
cf_patch(profile, self._dataset, cf_var_cube)
else:
msg = "cf_profile is available but no {} defined.".format(
"cf_patch"
)
warnings.warn(msg)
[docs] @staticmethod
def check_attribute_compliance(container, data_dtype):
def _coerce_value(val_attr, val_attr_value, data_dtype):
val_attr_tmp = np.array(val_attr_value, dtype=data_dtype)
if (val_attr_tmp != val_attr_value).any():
msg = '"{}" is not of a suitable value ({})'
raise ValueError(msg.format(val_attr, val_attr_value))
return val_attr_tmp
# Ensure that conflicting attributes are not provided.
if (
container.attributes.get("valid_min") is not None
or container.attributes.get("valid_max") is not None
) and container.attributes.get("valid_range") is not None:
msg = (
'Both "valid_range" and "valid_min" or "valid_max" '
"attributes present."
)
raise ValueError(msg)
# Ensure correct datatype
for val_attr in ["valid_range", "valid_min", "valid_max"]:
val_attr_value = container.attributes.get(val_attr)
if val_attr_value is not None:
val_attr_value = np.asarray(val_attr_value)
if data_dtype.itemsize == 1:
# Allow signed integral type
if val_attr_value.dtype.kind == "i":
continue
new_val = _coerce_value(val_attr, val_attr_value, data_dtype)
container.attributes[val_attr] = new_val
[docs] def update_global_attributes(self, attributes=None, **kwargs):
"""
Update the CF global attributes based on the provided
iterable/dictionary and/or keyword arguments.
Args:
* attributes (dict or iterable of key, value pairs):
CF global attributes to be updated.
"""
if attributes is not None:
# Handle sequence e.g. [('fruit', 'apple'), ...].
if not hasattr(attributes, "keys"):
attributes = dict(attributes)
for attr_name in sorted(attributes):
_setncattr(self._dataset, attr_name, attributes[attr_name])
for attr_name in sorted(kwargs):
_setncattr(self._dataset, attr_name, kwargs[attr_name])
def _create_cf_dimensions(
self, cube, dimension_names, unlimited_dimensions=None
):
"""
Create the CF-netCDF data dimensions.
Args:
* cube (:class:`iris.cube.Cube`):
A :class:`iris.cube.Cube` in which to lookup coordinates.
Kwargs:
* unlimited_dimensions (iterable of strings and/or
:class:`iris.coords.Coord` objects):
List of coordinates to make unlimited (None by default).
Returns:
None.
"""
unlimited_dim_names = []
if unlimited_dimensions is not None:
for coord in unlimited_dimensions:
try:
coord = cube.coord(name_or_coord=coord, dim_coords=True)
except iris.exceptions.CoordinateNotFoundError:
# coordinate isn't used for this cube, but it might be
# used for a different one
pass
else:
dim_name = self._get_coord_variable_name(cube, coord)
unlimited_dim_names.append(dim_name)
for dim_name in dimension_names:
# NOTE: these dim-names have been chosen by _get_dim_names, and
# were already checked+fixed to avoid any name collisions.
if dim_name not in self._dataset.dimensions:
if dim_name in unlimited_dim_names:
size = None
else:
size = self._existing_dim[dim_name]
self._dataset.createDimension(dim_name, size)
def _add_mesh(self, cube_or_mesh):
"""
Add the cube's mesh, and all related variables to the dataset.
Includes all the mesh-element coordinate and connectivity variables.
..note::
Here, we do *not* add the relevant referencing attributes to the
data-variable, because we want to create the data-variable later.
Args:
* cube_or_mesh (:class:`iris.cube.Cube`
or :class:`iris.experimental.ugrid.Mesh`):
The Cube or Mesh being saved to the netCDF file.
Returns:
* cf_mesh_name (string or None):
The name of the mesh variable created, or None if the cube does not
have a mesh.
"""
cf_mesh_name = None
# Do cube- or -mesh-based save
from iris.cube import Cube
if isinstance(cube_or_mesh, Cube):
cube = cube_or_mesh
mesh = cube.mesh
else:
cube = None # The underlying routines must support this !
mesh = cube_or_mesh
if mesh:
cf_mesh_name = self._name_coord_map.name(mesh)
if cf_mesh_name is None:
# Not already present : create it
cf_mesh_name = self._create_mesh(mesh)
self._name_coord_map.append(cf_mesh_name, mesh)
cf_mesh_var = self._dataset.variables[cf_mesh_name]
# Get the mesh-element dim names.
mesh_dims = self._mesh_dims[mesh]
# Add all the element coordinate variables.
for location in MESH_ELEMENTS:
coords_meshobj_attr = f"{location}_coords"
coords_file_attr = f"{location}_coordinates"
mesh_coords = getattr(mesh, coords_meshobj_attr, None)
if mesh_coords:
coord_names = []
for coord in mesh_coords:
if coord is None:
continue # an awkward thing that mesh.coords does
coord_name = self._create_generic_cf_array_var(
cube_or_mesh,
[],
coord,
element_dims=(mesh_dims[location],),
)
coord_names.append(coord_name)
# Record the coordinates (if any) on the mesh variable.
if coord_names:
coord_names = " ".join(coord_names)
_setncattr(
cf_mesh_var, coords_file_attr, coord_names
)
# Add all the connectivity variables.
# pre-fetch the set + ignore "None"s, which are empty slots.
conns = [
conn
for conn in mesh.all_connectivities
if conn is not None
]
for conn in conns:
# Get the connectivity role, = "{loc1}_{loc2}_connectivity".
cf_conn_attr_name = conn.cf_role
loc_from, loc_to, _ = cf_conn_attr_name.split("_")
# Construct a trailing dimension name.
last_dim = f"{cf_mesh_name}_{loc_from}_N_{loc_to}s"
# Create if it does not already exist.
if last_dim not in self._dataset.dimensions:
while last_dim in self._dataset.variables:
# Also avoid collision with variable names.
# See '_get_dim_names' for reason.
last_dim = self._increment_name(last_dim)
length = conn.shape[1 - conn.location_axis]
self._dataset.createDimension(last_dim, length)
# Create variable.
# NOTE: for connectivities *with missing points*, this will use a
# fixed standard fill-value of -1. In that case, we create the
# variable with a '_FillValue' property, which can only be done
# when it is first created.
loc_dim_name = mesh_dims[loc_from]
conn_dims = (loc_dim_name, last_dim)
if conn.location_axis == 1:
# Has the 'other' dimension order, =reversed
conn_dims = conn_dims[::-1]
if iris.util.is_masked(conn.core_indices()):
# Flexible mesh.
fill_value = -1
else:
fill_value = None
cf_conn_name = self._create_generic_cf_array_var(
cube_or_mesh,
[],
conn,
element_dims=conn_dims,
fill_value=fill_value,
)
# Add essential attributes to the Connectivity variable.
cf_conn_var = self._dataset.variables[cf_conn_name]
_setncattr(cf_conn_var, "cf_role", cf_conn_attr_name)
_setncattr(cf_conn_var, "start_index", conn.start_index)
# Record the connectivity on the parent mesh var.
_setncattr(cf_mesh_var, cf_conn_attr_name, cf_conn_name)
# If the connectivity had the 'alternate' dimension order, add the
# relevant dimension property
if conn.location_axis == 1:
loc_dim_attr = f"{loc_from}_dimension"
# Should only get here once.
assert loc_dim_attr not in cf_mesh_var.ncattrs()
_setncattr(cf_mesh_var, loc_dim_attr, loc_dim_name)
return cf_mesh_name
def _add_inner_related_vars(
self, cube, cf_var_cube, dimension_names, coordlike_elements
):
"""
Create a set of variables for aux-coords, ancillaries or cell-measures,
and attach them to the parent data variable.
"""
if coordlike_elements:
# Choose the approriate parent attribute
elem_type = type(coordlike_elements[0])
if elem_type in (AuxCoord, DimCoord):
role_attribute_name = "coordinates"
elif elem_type == AncillaryVariable:
role_attribute_name = "ancillary_variables"
else:
# We *only* handle aux-coords, cell-measures and ancillaries
assert elem_type == CellMeasure
role_attribute_name = "cell_measures"
# Add CF-netCDF variables for the given cube components.
element_names = []
for element in sorted(
coordlike_elements, key=lambda element: element.name()
):
# Re-use, or create, the associated CF-netCDF variable.
cf_name = self._name_coord_map.name(element)
if cf_name is None:
# Not already present : create it
cf_name = self._create_generic_cf_array_var(
cube, dimension_names, element
)
self._name_coord_map.append(cf_name, element)
if role_attribute_name == "cell_measures":
# In the case of cell-measures, the attribute entries are not just
# a var_name, but each have the form "<measure>: <varname>".
cf_name = "{}: {}".format(element.measure, cf_name)
element_names.append(cf_name)
# Add CF-netCDF references to the primary data variable.
if element_names:
variable_names = " ".join(sorted(element_names))
_setncattr(cf_var_cube, role_attribute_name, variable_names)
def _add_aux_coords(self, cube, cf_var_cube, dimension_names):
"""
Add aux. coordinate to the dataset and associate with the data variable
Args:
* cube (:class:`iris.cube.Cube`):
A :class:`iris.cube.Cube` to be saved to a netCDF file.
* cf_var_cube (:class:`netcdf.netcdf_variable`):
cf variable cube representation.
* dimension_names (list):
Names associated with the dimensions of the cube.
"""
# Exclude any mesh coords, which are bundled in with the aux-coords.
aux_coords_no_mesh = [
coord for coord in cube.aux_coords if not hasattr(coord, "mesh")
]
return self._add_inner_related_vars(
cube,
cf_var_cube,
dimension_names,
aux_coords_no_mesh,
)
def _add_cell_measures(self, cube, cf_var_cube, dimension_names):
"""
Add cell measures to the dataset and associate with the data variable
Args:
* cube (:class:`iris.cube.Cube`):
A :class:`iris.cube.Cube` to be saved to a netCDF file.
* cf_var_cube (:class:`netcdf.netcdf_variable`):
cf variable cube representation.
* dimension_names (list):
Names associated with the dimensions of the cube.
"""
return self._add_inner_related_vars(
cube,
cf_var_cube,
dimension_names,
cube.cell_measures(),
)
def _add_ancillary_variables(self, cube, cf_var_cube, dimension_names):
"""
Add ancillary variables measures to the dataset and associate with the
data variable
Args:
* cube (:class:`iris.cube.Cube`):
A :class:`iris.cube.Cube` to be saved to a netCDF file.
* cf_var_cube (:class:`netcdf.netcdf_variable`):
cf variable cube representation.
* dimension_names (list):
Names associated with the dimensions of the cube.
"""
return self._add_inner_related_vars(
cube,
cf_var_cube,
dimension_names,
cube.ancillary_variables(),
)
def _add_dim_coords(self, cube, dimension_names):
"""
Add coordinate variables to NetCDF dataset.
Args:
* cube (:class:`iris.cube.Cube`):
A :class:`iris.cube.Cube` to be saved to a netCDF file.
* dimension_names (list):
Names associated with the dimensions of the cube.
"""
# Ensure we create the netCDF coordinate variables first.
for coord in cube.dim_coords:
# Create the associated coordinate CF-netCDF variable.
if coord not in self._name_coord_map.coords:
# Not already present : create it
cf_name = self._create_generic_cf_array_var(
cube, dimension_names, coord
)
self._name_coord_map.append(cf_name, coord)
def _add_aux_factories(self, cube, cf_var_cube, dimension_names):
"""
Modifies the variables of the NetCDF dataset to represent
the presence of dimensionless vertical coordinates based on
the aux factories of the cube (if any).
Args:
* cube (:class:`iris.cube.Cube`):
A :class:`iris.cube.Cube` to be saved to a netCDF file.
* cf_var_cube (:class:`netcdf.netcdf_variable`)
CF variable cube representation.
* dimension_names (list):
Names associated with the dimensions of the cube.
"""
primaries = []
for factory in cube.aux_factories:
factory_defn = _FACTORY_DEFNS.get(type(factory), None)
if factory_defn is None:
msg = (
"Unable to determine formula terms "
"for AuxFactory: {!r}".format(factory)
)
warnings.warn(msg)
else:
# Override `standard_name`, `long_name`, and `axis` of the
# primary coord that signals the presense of a dimensionless
# vertical coord, then set the `formula_terms` attribute.
primary_coord = factory.dependencies[factory_defn.primary]
if primary_coord in primaries:
msg = (
"Cube {!r} has multiple aux factories that share "
"a common primary coordinate {!r}. Unable to save "
"to netCDF as having multiple formula terms on a "
"single coordinate is not supported."
)
raise ValueError(msg.format(cube, primary_coord.name()))
primaries.append(primary_coord)
cf_name = self._name_coord_map.name(primary_coord)
cf_var = self._dataset.variables[cf_name]
names = {
key: self._name_coord_map.name(coord)
for key, coord in factory.dependencies.items()
}
formula_terms = factory_defn.formula_terms_format.format(
**names
)
std_name = factory_defn.std_name
if hasattr(cf_var, "formula_terms"):
if (
cf_var.formula_terms != formula_terms
or cf_var.standard_name != std_name
):
# TODO: We need to resolve this corner-case where
# the dimensionless vertical coordinate containing
# the formula_terms is a dimension coordinate of
# the associated cube and a new alternatively named
# dimensionless vertical coordinate is required
# with new formula_terms and a renamed dimension.
if cf_name in dimension_names:
msg = (
"Unable to create dimensonless vertical "
"coordinate."
)
raise ValueError(msg)
key = (cf_name, std_name, formula_terms)
name = self._formula_terms_cache.get(key)
if name is None:
# Create a new variable
name = self._create_generic_cf_array_var(
cube, dimension_names, primary_coord
)
cf_var = self._dataset.variables[name]
_setncattr(cf_var, "standard_name", std_name)
_setncattr(cf_var, "axis", "Z")
# Update the formula terms.
ft = formula_terms.split()
ft = [name if t == cf_name else t for t in ft]
_setncattr(cf_var, "formula_terms", " ".join(ft))
# Update the cache.
self._formula_terms_cache[key] = name
# Update the associated cube variable.
coords = cf_var_cube.coordinates.split()
coords = [name if c == cf_name else c for c in coords]
_setncattr(
cf_var_cube, "coordinates", " ".join(coords)
)
else:
_setncattr(cf_var, "standard_name", std_name)
_setncattr(cf_var, "axis", "Z")
_setncattr(cf_var, "formula_terms", formula_terms)
def _get_dim_names(self, cube_or_mesh):
"""
Determine suitable CF-netCDF data dimension names.
Args:
* cube_or_mesh (:class:`iris.cube.Cube`
or :class:`iris.experimental.ugrid.Mesh`):
The Cube or Mesh being saved to the netCDF file.
Returns:
mesh_dimensions, cube_dimensions
* mesh_dimensions (list of string):
A list of the mesh dimensions of the attached mesh, if any.
* cube_dimensions (list of string):
A lists of dimension names for each dimension of the cube
..note::
The returned lists are in the preferred file creation order.
One of the mesh dimensions will typically also appear in the cube
dimensions.
"""
def record_dimension(names_list, dim_name, length, matching_coords=[]):
"""
Record a file dimension, its length and associated "coordinates"
(which may in fact also be connectivities).
If the dimension has been seen already, check that it's length
matches the earlier finding.
"""
if dim_name not in self._existing_dim:
self._existing_dim[dim_name] = length
else:
# Make sure we never re-write one, though it's really not
# clear how/if this could ever actually happen.
existing_length = self._existing_dim[dim_name]
if length != existing_length:
msg = (
"Netcdf saving error : existing dimension "
f'"{dim_name}" has length {existing_length}, '
f"but occurrence in cube {cube} has length {length}"
)
raise ValueError(msg)
# Record given "coords" (sort-of, maybe connectivities) to be
# identified with this dimension: add to the already-seen list.
existing_coords = self._dim_names_and_coords.coords
for coord in matching_coords:
if coord not in existing_coords:
self._dim_names_and_coords.append(dim_name, coord)
# Add the latest name to the list passed in.
names_list.append(dim_name)
# Choose cube or mesh saving.
from iris.cube import Cube
if isinstance(cube_or_mesh, Cube):
# there is a mesh, only if the cube has one
cube = cube_or_mesh
mesh = cube.mesh
else:
# there is no cube, only a mesh
cube = None
mesh = cube_or_mesh
# Get info on mesh, first.
mesh_dimensions = []
if mesh is None:
cube_mesh_dim = None
else:
# Identify all the mesh dimensions.
mesh_location_dimnames = {}
# NOTE: one of these will be a cube dimension, but that one does not
# get any special handling. We *do* want to list/create them in a
# definite order (node,edge,face), and before non-mesh dimensions.
for location in MESH_ELEMENTS:
# Find if this location exists in the mesh, and a characteristic
# coordinate to identify it with.
# To use only _required_ UGRID components, we use a location
# coord for nodes, but a connectivity for faces/edges
if location == "node":
# For nodes, identify the dim with a coordinate variable.
# Selecting the X-axis one for definiteness.
dim_coords = mesh.coords(include_nodes=True, axis="x")
else:
# For face/edge, use the relevant "optionally required"
# connectivity variable.
cf_role = f"{location}_node_connectivity"
dim_coords = mesh.connectivities(cf_role=cf_role)
if len(dim_coords) > 0:
# As the mesh contains this location, we want to include this
# dim in our returned mesh dims.
# We should have 1 identifying variable (of either type).
assert len(dim_coords) == 1
dim_element = dim_coords[0]
dim_name = self._dim_names_and_coords.name(dim_element)
if dim_name is not None:
# For mesh-identifying coords, we require the *same*
# coord, not an identical one (i.e. "is" not "==")
stored_coord = self._dim_names_and_coords.coord(
dim_name
)
if dim_element is not stored_coord:
# This is *not* a proper match after all.
dim_name = None
if dim_name is None:
# No existing dim matches this, so assign a new name
if location == "node":
# always 1-d
(dim_length,) = dim_element.shape
else:
# extract source dim, respecting dim-ordering
dim_length = dim_element.shape[
dim_element.location_axis
]
# Name it for the relevant mesh dimension
location_dim_attr = f"{location}_dimension"
dim_name = getattr(mesh, location_dim_attr)
# NOTE: This cannot currently be empty, as a Mesh
# "invents" dimension names which were not given.
assert dim_name is not None
# Ensure it is a valid variable name.
dim_name = self.cf_valid_var_name(dim_name)
# Disambiguate if it has the same name as an existing
# dimension.
# NOTE: *OR* if it matches the name of an existing file
# variable. Because there is a bug ...
# See https://github.com/Unidata/netcdf-c/issues/1772
# N.B. the workarounds here *ONLY* function because the
# caller (write) will not create any more variables
# in between choosing dim names (here), and creating
# the new dims (via '_create_cf_dimensions').
while (
dim_name in self._existing_dim
or dim_name in self._dataset.variables
):
dim_name = self._increment_name(dim_name)
# Record the new dimension.
record_dimension(
mesh_dimensions, dim_name, dim_length, dim_coords
)
# Store the mesh dims indexed by location
mesh_location_dimnames[location] = dim_name
if cube is None:
cube_mesh_dim = None
else:
# Finally, identify the cube dimension which maps to the mesh: this
# is used below to recognise the mesh among the cube dimensions.
any_mesh_coord = cube.coords(mesh_coords=True)[0]
(cube_mesh_dim,) = any_mesh_coord.cube_dims(cube)
# Record actual file dimension names for each mesh saved.
self._mesh_dims[mesh] = mesh_location_dimnames
# Get the cube dimensions, in order.
cube_dimensions = []
if cube is not None:
for dim in range(cube.ndim):
if dim == cube_mesh_dim:
# Handle a mesh dimension: we already named this.
dim_coords = []
dim_name = self._mesh_dims[mesh][cube.location]
else:
# Get a name from the dim-coord (if any).
dim_coords = cube.coords(dimensions=dim, dim_coords=True)
if dim_coords:
# Derive a dim name from a coord.
coord = dim_coords[0] # always have at least one
# Add only dimensions that have not already been added.
dim_name = self._dim_names_and_coords.name(coord)
if dim_name is None:
# Not already present : create a unique dimension name
# from the coord.
dim_name = self._get_coord_variable_name(
cube, coord
)
# Disambiguate if it has the same name as an
# existing dimension.
# OR if it matches an existing file variable name.
# NOTE: check against variable names is needed
# because of a netcdf bug ... see note in the
# mesh dimensions block above.
while (
dim_name in self._existing_dim
or dim_name in self._dataset.variables
):
dim_name = self._increment_name(dim_name)
else:
# No CF-netCDF coordinates describe this data dimension.
# Make up a new, distinct dimension name
dim_name = f"dim{dim}"
# Increment name if conflicted with one already existing
# (or planned)
# NOTE: check against variable names is needed because
# of a netcdf bug ... see note in the mesh dimensions
# block above.
while (
dim_name in self._existing_dim
and (
self._existing_dim[dim_name] != cube.shape[dim]
)
) or dim_name in self._dataset.variables:
dim_name = self._increment_name(dim_name)
# Record the dimension.
record_dimension(
cube_dimensions, dim_name, cube.shape[dim], dim_coords
)
return mesh_dimensions, cube_dimensions
[docs] @staticmethod
def cf_valid_var_name(var_name):
"""
Return a valid CF var_name given a potentially invalid name.
Args:
* var_name (str):
The var_name to normalise
Returns:
A var_name suitable for passing through for variable creation.
"""
# Replace invalid charaters with an underscore ("_").
var_name = re.sub(r"[^a-zA-Z0-9]", "_", var_name)
# Ensure the variable name starts with a letter.
if re.match(r"^[^a-zA-Z]", var_name):
var_name = "var_{}".format(var_name)
return var_name
@staticmethod
def _cf_coord_standardised_units(coord):
"""
Determine a suitable units from a given coordinate.
Args:
* coord (:class:`iris.coords.Coord`):
A coordinate of a cube.
Returns:
The (standard_name, long_name, unit) of the given
:class:`iris.coords.Coord` instance.
"""
units = str(coord.units)
# Set the 'units' of 'latitude' and 'longitude' coordinates specified
# in 'degrees' to 'degrees_north' and 'degrees_east' respectively,
# as defined in the CF conventions for netCDF files: sections 4.1 and
# 4.2.
if (
isinstance(coord.coord_system, iris.coord_systems.GeogCS)
or coord.coord_system is None
) and coord.units == "degrees":
if coord.standard_name == "latitude":
units = "degrees_north"
elif coord.standard_name == "longitude":
units = "degrees_east"
return units
def _ensure_valid_dtype(self, values, src_name, src_object):
# NetCDF3 and NetCDF4 classic do not support int64 or unsigned ints,
# so we check if we can store them as int32 instead.
if (
np.issubdtype(values.dtype, np.int64)
or np.issubdtype(values.dtype, np.unsignedinteger)
) and self._dataset.file_format in (
"NETCDF3_CLASSIC",
"NETCDF3_64BIT",
"NETCDF4_CLASSIC",
):
val_min, val_max = (values.min(), values.max())
if is_lazy_data(values):
val_min, val_max = _co_realise_lazy_arrays([val_min, val_max])
# Cast to an integer type supported by netCDF3.
can_cast = all(
[np.can_cast(m, np.int32) for m in (val_min, val_max)]
)
if not can_cast:
msg = (
"The data type of {} {!r} is not supported by {} and"
" its values cannot be safely cast to a supported"
" integer type."
)
msg = msg.format(
src_name, src_object, self._dataset.file_format
)
raise ValueError(msg)
values = values.astype(np.int32)
return values
def _create_cf_bounds(self, coord, cf_var, cf_name):
"""
Create the associated CF-netCDF bounds variable.
Args:
* coord (:class:`iris.coords.Coord`):
A coordinate of a cube.
* cf_var:
CF-netCDF variable
* cf_name (string):
name of the CF-NetCDF variable.
Returns:
None
"""
if hasattr(coord, "has_bounds") and coord.has_bounds():
# Get the values in a form which is valid for the file format.
bounds = self._ensure_valid_dtype(
coord.core_bounds(), "the bounds of coordinate", coord
)
n_bounds = bounds.shape[-1]
if n_bounds == 2:
bounds_dimension_name = "bnds"
else:
bounds_dimension_name = "bnds_%s" % n_bounds
if coord.climatological:
property_name = "climatology"
varname_extra = "climatology"
else:
property_name = "bounds"
varname_extra = "bnds"
if bounds_dimension_name not in self._dataset.dimensions:
# Create the bounds dimension with the appropriate extent.
while bounds_dimension_name in self._dataset.variables:
# Also avoid collision with variable names.
# See '_get_dim_names' for reason.
bounds_dimension_name = self._increment_name(
bounds_dimension_name
)
self._dataset.createDimension(bounds_dimension_name, n_bounds)
boundsvar_name = "{}_{}".format(cf_name, varname_extra)
_setncattr(cf_var, property_name, boundsvar_name)
cf_var_bounds = self._dataset.createVariable(
boundsvar_name,
bounds.dtype.newbyteorder("="),
cf_var.dimensions + (bounds_dimension_name,),
)
self._lazy_stream_data(
data=bounds,
fill_value=None,
fill_warn=True,
cf_var=cf_var_bounds,
)
def _get_cube_variable_name(self, cube):
"""
Returns a CF-netCDF variable name for the given cube.
Args:
* cube (class:`iris.cube.Cube`):
An instance of a cube for which a CF-netCDF variable
name is required.
Returns:
A CF-netCDF variable name as a string.
"""
if cube.var_name is not None:
cf_name = cube.var_name
else:
# Convert to lower case and replace whitespace by underscores.
cf_name = "_".join(cube.name().lower().split())
cf_name = self.cf_valid_var_name(cf_name)
return cf_name
def _get_coord_variable_name(self, cube_or_mesh, coord):
"""
Returns a CF-netCDF variable name for a given coordinate-like element.
Args:
* cube_or_mesh (:class:`iris.cube.Cube`
or :class:`iris.experimental.ugrid.Mesh`):
The Cube or Mesh being saved to the netCDF file.
* coord (:class:`iris.coords._DimensionalMetadata`):
An instance of a coordinate (or similar), for which a CF-netCDF
variable name is required.
Returns:
A CF-netCDF variable name as a string.
"""
# Support cube or mesh save.
from iris.cube import Cube
if isinstance(cube_or_mesh, Cube):
cube = cube_or_mesh
mesh = cube.mesh
else:
cube = None
mesh = cube_or_mesh
if coord.var_name is not None:
cf_name = coord.var_name
else:
name = coord.standard_name or coord.long_name
if not name or set(name).intersection(string.whitespace):
# We need to invent a name, based on its associated dimensions.
if cube is not None and cube.coords(coord):
# It is a regular cube coordinate.
# Auto-generate a name based on the dims.
name = ""
for dim in cube.coord_dims(coord):
name += f"dim{dim}"
# Handle scalar coordinate (dims == ()).
if not name:
name = "unknown_scalar"
else:
# Not a cube coord, so must be a connectivity or
# element-coordinate of the mesh.
# Name it for it's first dim, i.e. mesh-dim of its location.
from iris.experimental.ugrid.mesh import Connectivity
# At present, a location-coord cannot be nameless, as the
# Mesh code relies on guess_coord_axis.
assert isinstance(coord, Connectivity)
location = coord.cf_role.split("_")[0]
location_dim_attr = f"{location}_dimension"
name = getattr(mesh, location_dim_attr)
# Convert to lower case and replace whitespace by underscores.
cf_name = "_".join(name.lower().split())
cf_name = self.cf_valid_var_name(cf_name)
return cf_name
def _get_mesh_variable_name(self, mesh):
"""
Returns a CF-netCDF variable name for the mesh.
Args:
* mesh (:class:`iris.experimental.ugrid.mesh.Mesh`):
An instance of a Mesh for which a CF-netCDF variable name is
required.
Returns:
A CF-netCDF variable name as a string.
"""
cf_name = mesh.var_name or mesh.long_name
# Prefer a var-name, but accept a long_name as an alternative.
# N.B. we believe it can't (shouldn't) have a standard name.
if not cf_name:
# Auto-generate a name based on mesh properties.
cf_name = f"Mesh_{mesh.topology_dimension}d"
# Ensure valid form for var-name.
cf_name = self.cf_valid_var_name(cf_name)
return cf_name
def _create_mesh(self, mesh):
"""
Create a mesh variable in the netCDF dataset.
Args:
* mesh (:class:`iris.experimental.ugrid.mesh.Mesh`):
The Mesh to be saved to CF-netCDF file.
Returns:
The string name of the associated CF-netCDF variable saved.
"""
# First choose a var-name for the mesh variable itself.
cf_mesh_name = self._get_mesh_variable_name(mesh)
# Disambiguate any possible clashes.
while cf_mesh_name in self._dataset.variables:
cf_mesh_name = self._increment_name(cf_mesh_name)
# Create the main variable
cf_mesh_var = self._dataset.createVariable(
cf_mesh_name,
np.dtype(np.int32),
[],
)
# Add the basic essential attributes
_setncattr(cf_mesh_var, "cf_role", "mesh_topology")
_setncattr(
cf_mesh_var,
"topology_dimension",
np.int32(mesh.topology_dimension),
)
# Add the usual names + units attributes
self._set_cf_var_attributes(cf_mesh_var, mesh)
return cf_mesh_name
def _set_cf_var_attributes(self, cf_var, element):
# Deal with CF-netCDF units, and add the name+units properties.
if isinstance(element, iris.coords.Coord):
# Fix "degree" units if needed.
units_str = self._cf_coord_standardised_units(element)
else:
units_str = str(element.units)
if cf_units.as_unit(units_str).is_udunits():
_setncattr(cf_var, "units", units_str)
standard_name = element.standard_name
if standard_name is not None:
_setncattr(cf_var, "standard_name", standard_name)
long_name = element.long_name
if long_name is not None:
_setncattr(cf_var, "long_name", long_name)
# Add the CF-netCDF calendar attribute.
if element.units.calendar:
_setncattr(cf_var, "calendar", str(element.units.calendar))
# Add any other custom coordinate attributes.
for name in sorted(element.attributes):
value = element.attributes[name]
if name == "STASH":
# Adopting provisional Metadata Conventions for representing MO
# Scientific Data encoded in NetCDF Format.
name = "um_stash_source"
value = str(value)
# Don't clobber existing attributes.
if not hasattr(cf_var, name):
_setncattr(cf_var, name, value)
def _create_generic_cf_array_var(
self,
cube_or_mesh,
cube_dim_names,
element,
element_dims=None,
fill_value=None,
):
"""
Create the associated CF-netCDF variable in the netCDF dataset for the
given dimensional_metadata.
..note::
If the metadata element is a coord, it may also contain bounds.
In which case, an additional var is created and linked to it.
Args:
* cube_or_mesh (:class:`iris.cube.Cube`
or :class:`iris.experimental.ugrid.Mesh`):
The Cube or Mesh being saved to the netCDF file.
* cube_dim_names (list of string):
The name of each dimension of the cube.
* element:
An Iris :class:`iris.coords._DimensionalMetadata`, belonging to the
cube. Provides data, units and standard/long/var names.
Not used if 'element_dims' is not None.
* element_dims (list of string, or None):
If set, contains the variable dimension (names),
otherwise these are taken from `element.cube_dims[cube]`.
For Mesh components (element coordinates and connectivities), this
*must* be passed in, as "element.cube_dims" does not function.
* fill_value (number or None):
If set, create the variable with this fill-value, and fill any
masked data points with this value.
If not set, standard netcdf4-python behaviour : the variable has no
'_FillValue' property, and uses the "standard" fill-value for its
type.
Returns:
var_name (string):
The name of the CF-netCDF variable created.
"""
# Support cube or mesh save.
from iris.cube import Cube
if isinstance(cube_or_mesh, Cube):
cube = cube_or_mesh
else:
cube = None
# Work out the var-name to use.
# N.B. the only part of this routine that may use a mesh _or_ a cube.
cf_name = self._get_coord_variable_name(cube_or_mesh, element)
while cf_name in self._dataset.variables:
cf_name = self._increment_name(cf_name)
if element_dims is None:
# Get the list of file-dimensions (names), to create the variable.
element_dims = [
cube_dim_names[dim] for dim in element.cube_dims(cube)
] # NB using 'cube_dims' as this works for any type of element
# Get the data values, in a way which works for any element type, as
# all are subclasses of _DimensionalMetadata.
# (e.g. =points if a coord, =data if an ancillary, etc)
data = element._core_values()
if np.issubdtype(data.dtype, np.str_):
# Deal with string-type variables.
# Typically CF label variables, but also possibly ancil-vars ?
string_dimension_depth = data.dtype.itemsize
if data.dtype.kind == "U":
string_dimension_depth //= 4
string_dimension_name = "string%d" % string_dimension_depth
# Determine whether to create the string length dimension.
if string_dimension_name not in self._dataset.dimensions:
while string_dimension_name in self._dataset.variables:
# Also avoid collision with variable names.
# See '_get_dim_names' for reason.
string_dimension_name = self._increment_name(
string_dimension_name
)
self._dataset.createDimension(
string_dimension_name, string_dimension_depth
)
# Add the string length dimension to the variable dimensions.
element_dims.append(string_dimension_name)
# Create the label coordinate variable.
cf_var = self._dataset.createVariable(cf_name, "|S1", element_dims)
# Convert data from an array of strings into a character array
# with an extra string-length dimension.
if len(element_dims) == 1:
data_first = data[0]
if is_lazy_data(data_first):
data_first = data_first.compute()
data = list("%- *s" % (string_dimension_depth, data_first))
else:
orig_shape = data.shape
new_shape = orig_shape + (string_dimension_depth,)
new_data = np.zeros(new_shape, cf_var.dtype)
for index in np.ndindex(orig_shape):
index_slice = tuple(list(index) + [slice(None, None)])
new_data[index_slice] = list(
"%- *s" % (string_dimension_depth, data[index])
)
data = new_data
else:
# A normal (numeric) variable.
# ensure a valid datatype for the file format.
element_type = type(element).__name__
data = self._ensure_valid_dtype(data, element_type, element)
# Check if this is a dim-coord.
is_dimcoord = cube is not None and element in cube.dim_coords
if isinstance(element, iris.coords.CellMeasure):
# Disallow saving of *masked* cell measures.
# NOTE: currently, this is the only functional difference in
# variable creation between an ancillary and a cell measure.
if iris.util.is_masked(data):
# We can't save masked points properly, as we don't maintain
# a fill_value. (Load will not record one, either).
msg = "Cell measures with missing data are not supported."
raise ValueError(msg)
if is_dimcoord:
# By definition of a CF-netCDF coordinate variable this
# coordinate must be 1-D and the name of the CF-netCDF variable
# must be the same as its dimension name.
cf_name = element_dims[0]
# Create the CF-netCDF variable.
cf_var = self._dataset.createVariable(
cf_name,
data.dtype.newbyteorder("="),
element_dims,
fill_value=fill_value,
)
# Add the axis attribute for spatio-temporal CF-netCDF coordinates.
if is_dimcoord:
axis = iris.util.guess_coord_axis(element)
if axis is not None and axis.lower() in SPATIO_TEMPORAL_AXES:
_setncattr(cf_var, "axis", axis.upper())
# Create the associated CF-netCDF bounds variable, if any.
self._create_cf_bounds(element, cf_var, cf_name)
# Add the data to the CF-netCDF variable.
self._lazy_stream_data(
data=data, fill_value=fill_value, fill_warn=True, cf_var=cf_var
)
# Add names + units
self._set_cf_var_attributes(cf_var, element)
return cf_name
def _create_cf_cell_methods(self, cube, dimension_names):
"""
Create CF-netCDF string representation of a cube cell methods.
Args:
* cube (:class:`iris.cube.Cube`) or cubelist
(:class:`iris.cube.CubeList`):
A :class:`iris.cube.Cube`, :class:`iris.cube.CubeList` or list of
cubes to be saved to a netCDF file.
* dimension_names (list):
Names associated with the dimensions of the cube.
Returns:
CF-netCDF string representation of a cube cell methods.
"""
cell_methods = []
# Identify the collection of coordinates that represent CF-netCDF
# coordinate variables.
cf_coordinates = cube.dim_coords
for cm in cube.cell_methods:
names = ""
for name in cm.coord_names:
coord = cube.coords(name)
if coord:
coord = coord[0]
if coord in cf_coordinates:
name = dimension_names[cube.coord_dims(coord)[0]]
names += "%s: " % name
interval = " ".join(
["interval: %s" % interval for interval in cm.intervals or []]
)
comment = " ".join(
["comment: %s" % comment for comment in cm.comments or []]
)
extra = " ".join([interval, comment]).strip()
if extra:
extra = " (%s)" % extra
cell_methods.append(names + cm.method + extra)
return " ".join(cell_methods)
def _create_cf_grid_mapping(self, cube, cf_var_cube):
"""
Create CF-netCDF grid mapping variable and associated CF-netCDF
data variable grid mapping attribute.
Args:
* cube (:class:`iris.cube.Cube`) or cubelist
(:class:`iris.cube.CubeList`):
A :class:`iris.cube.Cube`, :class:`iris.cube.CubeList` or list of
cubes to be saved to a netCDF file.
* cf_var_cube (:class:`netcdf.netcdf_variable`):
cf variable cube representation.
Returns:
None
"""
cs = cube.coord_system("CoordSystem")
if cs is not None:
# Grid var not yet created?
if cs not in self._coord_systems:
while cs.grid_mapping_name in self._dataset.variables:
aname = self._increment_name(cs.grid_mapping_name)
cs.grid_mapping_name = aname
cf_var_grid = self._dataset.createVariable(
cs.grid_mapping_name, np.int32
)
_setncattr(
cf_var_grid, "grid_mapping_name", cs.grid_mapping_name
)
def add_ellipsoid(ellipsoid):
cf_var_grid.longitude_of_prime_meridian = (
ellipsoid.longitude_of_prime_meridian
)
semi_major = ellipsoid.semi_major_axis
semi_minor = ellipsoid.semi_minor_axis
if semi_minor == semi_major:
cf_var_grid.earth_radius = semi_major
else:
cf_var_grid.semi_major_axis = semi_major
cf_var_grid.semi_minor_axis = semi_minor
if ellipsoid.datum is not None:
cf_var_grid.horizontal_datum_name = ellipsoid.datum
# latlon
if isinstance(cs, iris.coord_systems.GeogCS):
add_ellipsoid(cs)
# rotated latlon
elif isinstance(cs, iris.coord_systems.RotatedGeogCS):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.grid_north_pole_latitude = (
cs.grid_north_pole_latitude
)
cf_var_grid.grid_north_pole_longitude = (
cs.grid_north_pole_longitude
)
cf_var_grid.north_pole_grid_longitude = (
cs.north_pole_grid_longitude
)
# tmerc
elif isinstance(cs, iris.coord_systems.TransverseMercator):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.longitude_of_central_meridian = (
cs.longitude_of_central_meridian
)
cf_var_grid.latitude_of_projection_origin = (
cs.latitude_of_projection_origin
)
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
cf_var_grid.scale_factor_at_central_meridian = (
cs.scale_factor_at_central_meridian
)
# merc
elif isinstance(cs, iris.coord_systems.Mercator):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.longitude_of_projection_origin = (
cs.longitude_of_projection_origin
)
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
# Only one of these should be set
if cs.standard_parallel is not None:
cf_var_grid.standard_parallel = cs.standard_parallel
elif cs.scale_factor_at_projection_origin is not None:
cf_var_grid.scale_factor_at_projection_origin = (
cs.scale_factor_at_projection_origin
)
# lcc
elif isinstance(cs, iris.coord_systems.LambertConformal):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.standard_parallel = cs.secant_latitudes
cf_var_grid.latitude_of_projection_origin = cs.central_lat
cf_var_grid.longitude_of_central_meridian = cs.central_lon
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
# polar stereo (have to do this before Stereographic because it subclasses it)
elif isinstance(cs, iris.coord_systems.PolarStereographic):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.latitude_of_projection_origin = cs.central_lat
cf_var_grid.straight_vertical_longitude_from_pole = (
cs.central_lon
)
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
# Only one of these should be set
if cs.true_scale_lat is not None:
cf_var_grid.true_scale_lat = cs.true_scale_lat
elif cs.scale_factor_at_projection_origin is not None:
cf_var_grid.scale_factor_at_projection_origin = (
cs.scale_factor_at_projection_origin
)
else:
cf_var_grid.scale_factor_at_projection_origin = 1.0
# stereo
elif isinstance(cs, iris.coord_systems.Stereographic):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.longitude_of_projection_origin = cs.central_lon
cf_var_grid.latitude_of_projection_origin = cs.central_lat
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
# Only one of these should be set
if cs.true_scale_lat is not None:
msg = (
"It is not valid CF to save a true_scale_lat for "
"a Stereographic grid mapping."
)
raise ValueError(msg)
elif cs.scale_factor_at_projection_origin is not None:
cf_var_grid.scale_factor_at_projection_origin = (
cs.scale_factor_at_projection_origin
)
else:
cf_var_grid.scale_factor_at_projection_origin = 1.0
# osgb (a specific tmerc)
elif isinstance(cs, iris.coord_systems.OSGB):
warnings.warn("OSGB coordinate system not yet handled")
# lambert azimuthal equal area
elif isinstance(
cs, iris.coord_systems.LambertAzimuthalEqualArea
):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.longitude_of_projection_origin = (
cs.longitude_of_projection_origin
)
cf_var_grid.latitude_of_projection_origin = (
cs.latitude_of_projection_origin
)
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
# albers conical equal area
elif isinstance(cs, iris.coord_systems.AlbersEqualArea):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.longitude_of_central_meridian = (
cs.longitude_of_central_meridian
)
cf_var_grid.latitude_of_projection_origin = (
cs.latitude_of_projection_origin
)
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
cf_var_grid.standard_parallel = cs.standard_parallels
# vertical perspective
elif isinstance(cs, iris.coord_systems.VerticalPerspective):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.longitude_of_projection_origin = (
cs.longitude_of_projection_origin
)
cf_var_grid.latitude_of_projection_origin = (
cs.latitude_of_projection_origin
)
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
cf_var_grid.perspective_point_height = (
cs.perspective_point_height
)
# geostationary
elif isinstance(cs, iris.coord_systems.Geostationary):
if cs.ellipsoid:
add_ellipsoid(cs.ellipsoid)
cf_var_grid.longitude_of_projection_origin = (
cs.longitude_of_projection_origin
)
cf_var_grid.latitude_of_projection_origin = (
cs.latitude_of_projection_origin
)
cf_var_grid.false_easting = cs.false_easting
cf_var_grid.false_northing = cs.false_northing
cf_var_grid.perspective_point_height = (
cs.perspective_point_height
)
cf_var_grid.sweep_angle_axis = cs.sweep_angle_axis
# other
else:
warnings.warn(
"Unable to represent the horizontal "
"coordinate system. The coordinate system "
"type %r is not yet implemented." % type(cs)
)
self._coord_systems.append(cs)
# Refer to grid var
_setncattr(cf_var_cube, "grid_mapping", cs.grid_mapping_name)
def _create_cf_data_variable(
self,
cube,
dimension_names,
local_keys=None,
packing=None,
fill_value=None,
**kwargs,
):
"""
Create CF-netCDF data variable for the cube and any associated grid
mapping.
Args:
* cube (:class:`iris.cube.Cube`):
The associated cube being saved to CF-netCDF file.
* dimension_names (list):
String names for each dimension of the cube.
Kwargs:
* local_keys (iterable of strings):
* see :func:`iris.fileformats.netcdf.Saver.write`
* packing (type or string or dict or list):
* see :func:`iris.fileformats.netcdf.Saver.write`
* fill_value:
* see :func:`iris.fileformats.netcdf.Saver.write`
All other keywords are passed through to the dataset's `createVariable`
method.
Returns:
The newly created CF-netCDF data variable.
"""
# Get the values in a form which is valid for the file format.
data = self._ensure_valid_dtype(cube.core_data(), "cube", cube)
if packing:
if isinstance(packing, dict):
if "dtype" not in packing:
msg = "The dtype attribute is required for packing."
raise ValueError(msg)
dtype = np.dtype(packing["dtype"])
scale_factor = packing.get("scale_factor", None)
add_offset = packing.get("add_offset", None)
valid_keys = {"dtype", "scale_factor", "add_offset"}
invalid_keys = set(packing.keys()) - valid_keys
if invalid_keys:
msg = (
"Invalid packing key(s) found: '{}'. The valid "
"keys are '{}'.".format(
"', '".join(invalid_keys), "', '".join(valid_keys)
)
)
raise ValueError(msg)
else:
# We compute the scale_factor and add_offset based on the
# min/max of the data.
masked = iris.util.is_masked(data)
dtype = np.dtype(packing)
cmin, cmax = (data.min(), data.max())
if is_lazy_data(data):
cmin, cmax = _co_realise_lazy_arrays([cmin, cmax])
n = dtype.itemsize * 8
if masked:
scale_factor = (cmax - cmin) / (2**n - 2)
else:
scale_factor = (cmax - cmin) / (2**n - 1)
if dtype.kind == "u":
add_offset = cmin
elif dtype.kind == "i":
if masked:
add_offset = (cmax + cmin) / 2
else:
add_offset = cmin + 2 ** (n - 1) * scale_factor
else:
dtype = data.dtype.newbyteorder("=")
def set_packing_ncattrs(cfvar):
"""Set netCDF packing attributes."""
if packing:
if scale_factor:
_setncattr(cfvar, "scale_factor", scale_factor)
if add_offset:
_setncattr(cfvar, "add_offset", add_offset)
cf_name = self._get_cube_variable_name(cube)
while cf_name in self._dataset.variables:
cf_name = self._increment_name(cf_name)
# Create the cube CF-netCDF data variable with data payload.
cf_var = self._dataset.createVariable(
cf_name, dtype, dimension_names, fill_value=fill_value, **kwargs
)
set_packing_ncattrs(cf_var)
self._lazy_stream_data(
data=data,
fill_value=fill_value,
fill_warn=(not packing),
cf_var=cf_var,
)
if cube.standard_name:
_setncattr(cf_var, "standard_name", cube.standard_name)
if cube.long_name:
_setncattr(cf_var, "long_name", cube.long_name)
if cube.units.is_udunits():
_setncattr(cf_var, "units", str(cube.units))
# Add the CF-netCDF calendar attribute.
if cube.units.calendar:
_setncattr(cf_var, "calendar", cube.units.calendar)
# Add data variable-only attribute names to local_keys.
if local_keys is None:
local_keys = set()
else:
local_keys = set(local_keys)
local_keys.update(_CF_DATA_ATTRS, _UKMO_DATA_ATTRS)
# Add any cube attributes whose keys are in local_keys as
# CF-netCDF data variable attributes.
attr_names = set(cube.attributes).intersection(local_keys)
for attr_name in sorted(attr_names):
# Do not output 'conventions' attribute.
if attr_name.lower() == "conventions":
continue
value = cube.attributes[attr_name]
if attr_name == "STASH":
# Adopting provisional Metadata Conventions for representing MO
# Scientific Data encoded in NetCDF Format.
attr_name = "um_stash_source"
value = str(value)
if attr_name == "ukmo__process_flags":
value = " ".join([x.replace(" ", "_") for x in value])
if attr_name in _CF_GLOBAL_ATTRS:
msg = (
"{attr_name!r} is being added as CF data variable "
"attribute, but {attr_name!r} should only be a CF "
"global attribute.".format(attr_name=attr_name)
)
warnings.warn(msg)
_setncattr(cf_var, attr_name, value)
# Create the CF-netCDF data variable cell method attribute.
cell_methods = self._create_cf_cell_methods(cube, dimension_names)
if cell_methods:
_setncattr(cf_var, "cell_methods", cell_methods)
# Create the CF-netCDF grid mapping.
self._create_cf_grid_mapping(cube, cf_var)
return cf_var
def _increment_name(self, varname):
"""
Increment string name or begin increment.
Avoidance of conflicts between variable names, where the name is
incremented to distinguish it from others.
Args:
* varname (string):
Variable name to increment.
Returns:
Incremented varname.
"""
num = 0
try:
name, endnum = varname.rsplit("_", 1)
if endnum.isdigit():
num = int(endnum) + 1
varname = name
except ValueError:
pass
return "{}_{}".format(varname, num)
@staticmethod
def _lazy_stream_data(data, fill_value, fill_warn, cf_var):
if hasattr(data, "shape") and data.shape == (1,) + cf_var.shape:
# (Don't do this check for string data).
# Reduce dimensionality where the data array has an extra dimension
# versus the cf_var - to avoid a broadcasting ambiguity.
# Happens when bounds data is for a scalar point - array is 2D but
# contains just 1 row, so the cf_var is 1D.
data = data.squeeze(axis=0)
if is_lazy_data(data):
def store(data, cf_var, fill_value):
# Store lazy data and check whether it is masked and contains
# the fill value
target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value)
da.store([data], [target])
return target.is_masked, target.contains_value
else:
def store(data, cf_var, fill_value):
cf_var[:] = data
is_masked = np.ma.is_masked(data)
contains_value = fill_value is not None and fill_value in data
return is_masked, contains_value
dtype = cf_var.dtype
# fill_warn allows us to skip warning if packing attributes have been
# specified. It would require much more complex operations to work out
# what the values and fill_value _would_ be in such a case.
if fill_warn:
if fill_value is not None:
fill_value_to_check = fill_value
else:
fill_value_to_check = netCDF4.default_fillvals[dtype.str[1:]]
else:
fill_value_to_check = None
# Store the data and check if it is masked and contains the fill value.
is_masked, contains_fill_value = store(
data, cf_var, fill_value_to_check
)
if dtype.itemsize == 1 and fill_value is None:
if is_masked:
msg = (
"CF var '{}' contains byte data with masked points, but "
"no fill_value keyword was given. As saved, these "
"points will read back as valid values. To save as "
"masked byte data, `_FillValue` needs to be explicitly "
"set. For Cube data this can be done via the 'fill_value' "
"keyword during saving, otherwise use ncedit/equivalent."
)
warnings.warn(msg.format(cf_var.name))
elif contains_fill_value:
msg = (
"CF var '{}' contains unmasked data points equal to the "
"fill-value, {}. As saved, these points will read back "
"as missing data. To save these as normal values, "
"`_FillValue` needs to be set to not equal any valid data "
"points. For Cube data this can be done via the 'fill_value' "
"keyword during saving, otherwise use ncedit/equivalent."
)
warnings.warn(msg.format(cf_var.name, fill_value))
[docs]def save(
cube,
filename,
netcdf_format="NETCDF4",
local_keys=None,
unlimited_dimensions=None,
zlib=False,
complevel=4,
shuffle=True,
fletcher32=False,
contiguous=False,
chunksizes=None,
endian="native",
least_significant_digit=None,
packing=None,
fill_value=None,
):
"""
Save cube(s) to a netCDF file, given the cube and the filename.
* Iris will write CF 1.7 compliant NetCDF files.
* The attributes dictionaries on each cube in the saved cube list
will be compared and common attributes saved as NetCDF global
attributes where appropriate.
* Keyword arguments specifying how to save the data are applied
to each cube. To use different settings for different cubes, use
the NetCDF Context manager (:class:`~Saver`) directly.
* The save process will stream the data payload to the file using dask,
enabling large data payloads to be saved and maintaining the 'lazy'
status of the cube's data payload, unless the netcdf_format is explicitly
specified to be 'NETCDF3' or 'NETCDF3_CLASSIC'.
Args:
* cube (:class:`iris.cube.Cube` or :class:`iris.cube.CubeList`):
A :class:`iris.cube.Cube`, :class:`iris.cube.CubeList` or other
iterable of cubes to be saved to a netCDF file.
* filename (string):
Name of the netCDF file to save the cube(s).
Kwargs:
* netcdf_format (string):
Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC',
'NETCDF3_CLASSIC' or 'NETCDF3_64BIT'. Default is 'NETCDF4' format.
* local_keys (iterable of strings):
An interable of cube attribute keys. Any cube attributes with
matching keys will become attributes on the data variable rather
than global attributes.
* unlimited_dimensions (iterable of strings and/or
:class:`iris.coords.Coord` objects):
List of coordinate names (or coordinate objects) corresponding
to coordinate dimensions of `cube` to save with the NetCDF dimension
variable length 'UNLIMITED'. By default, no unlimited dimensions are
saved. Only the 'NETCDF4' format supports multiple 'UNLIMITED'
dimensions.
* zlib (bool):
If `True`, the data will be compressed in the netCDF file using gzip
compression (default `False`).
* complevel (int):
An integer between 1 and 9 describing the level of compression desired
(default 4). Ignored if `zlib=False`.
* shuffle (bool):
If `True`, the HDF5 shuffle filter will be applied before compressing
the data (default `True`). This significantly improves compression.
Ignored if `zlib=False`.
* fletcher32 (bool):
If `True`, the Fletcher32 HDF5 checksum algorithm is activated to
detect errors. Default `False`.
* contiguous (bool):
If `True`, the variable data is stored contiguously on disk. Default
`False`. Setting to `True` for a variable with an unlimited dimension
will trigger an error.
* chunksizes (tuple of int):
Used to manually specify the HDF5 chunksizes for each dimension of the
variable. A detailed discussion of HDF chunking and I/O performance is
available here: https://www.unidata.ucar.edu/software/netcdf/documentation/NUG/netcdf_perf_chunking.html.
Basically, you want the chunk size for each dimension to match as
closely as possible the size of the data block that users will read
from the file. `chunksizes` cannot be set if `contiguous=True`.
* endian (string):
Used to control whether the data is stored in little or big endian
format on disk. Possible values are 'little', 'big' or 'native'
(default). The library will automatically handle endian conversions
when the data is read, but if the data is always going to be read on a
computer with the opposite format as the one used to create the file,
there may be some performance advantage to be gained by setting the
endian-ness.
* least_significant_digit (int):
If `least_significant_digit` is specified, variable data will be
truncated (quantized). In conjunction with `zlib=True` this produces
'lossy', but significantly more efficient compression. For example, if
`least_significant_digit=1`, data will be quantized using
`numpy.around(scale*data)/scale`, where `scale = 2**bits`, and `bits`
is determined so that a precision of 0.1 is retained (in this case
`bits=4`). From
http://www.esrl.noaa.gov/psd/data/gridded/conventions/cdc_netcdf_standard.shtml:
"least_significant_digit -- power of ten of the smallest decimal place
in unpacked data that is a reliable value". Default is `None`, or no
quantization, or 'lossless' compression.
* packing (type or string or dict or list): A numpy integer datatype
(signed or unsigned) or a string that describes a numpy integer dtype
(i.e. 'i2', 'short', 'u4') or a dict of packing parameters as described
below or an iterable of such types, strings, or dicts.
This provides support for netCDF data packing as described in
https://www.unidata.ucar.edu/software/netcdf/documentation/NUG/best_practices.html#bp_Packed-Data-Values
If this argument is a type (or type string), appropriate values of
scale_factor and add_offset will be automatically calculated based
on `cube.data` and possible masking. For more control, pass a dict with
one or more of the following keys: `dtype` (required), `scale_factor`
and `add_offset`. Note that automatic calculation of packing parameters
will trigger loading of lazy data; set them manually using a dict to
avoid this. The default is `None`, in which case the datatype is
determined from the cube and no packing will occur. If this argument is
a list it must have the same number of elements as `cube` if `cube` is
a `:class:`iris.cube.CubeList`, or one element, and each element of
this argument will be applied to each cube separately.
* fill_value (numeric or list):
The value to use for the `_FillValue` attribute on the netCDF variable.
If `packing` is specified the value of `fill_value` should be in the
domain of the packed data. If this argument is a list it must have the
same number of elements as `cube` if `cube` is a
`:class:`iris.cube.CubeList`, or a single element, and each element of
this argument will be applied to each cube separately.
Returns:
None.
.. note::
The `zlib`, `complevel`, `shuffle`, `fletcher32`, `contiguous`,
`chunksizes` and `endian` keywords are silently ignored for netCDF 3
files that do not use HDF5.
.. seealso::
NetCDF Context manager (:class:`~Saver`).
"""
from iris.cube import Cube, CubeList
if unlimited_dimensions is None:
unlimited_dimensions = []
if isinstance(cube, Cube):
cubes = CubeList()
cubes.append(cube)
else:
cubes = cube
if local_keys is None:
local_keys = set()
else:
local_keys = set(local_keys)
# Determine the attribute keys that are common across all cubes and
# thereby extend the collection of local_keys for attributes
# that should be attributes on data variables.
attributes = cubes[0].attributes
common_keys = set(attributes)
for cube in cubes[1:]:
keys = set(cube.attributes)
local_keys.update(keys.symmetric_difference(common_keys))
common_keys.intersection_update(keys)
different_value_keys = []
for key in common_keys:
if np.any(attributes[key] != cube.attributes[key]):
different_value_keys.append(key)
common_keys.difference_update(different_value_keys)
local_keys.update(different_value_keys)
def is_valid_packspec(p):
"""Only checks that the datatype is valid."""
if isinstance(p, dict):
if "dtype" in p:
return is_valid_packspec(p["dtype"])
else:
msg = "The argument to packing must contain the key 'dtype'."
raise ValueError(msg)
elif isinstance(p, str) or isinstance(p, type) or isinstance(p, str):
pdtype = np.dtype(p) # Does nothing if it's already a numpy dtype
if pdtype.kind != "i" and pdtype.kind != "u":
msg = "The packing datatype must be a numpy integer type."
raise ValueError(msg)
return True
elif p is None:
return True
else:
return False
if is_valid_packspec(packing):
packspecs = repeat(packing)
else:
# Assume iterable, make sure packing is the same length as cubes.
for cube, packspec in zip_longest(cubes, packing, fillvalue=-1):
if cube == -1 or packspec == -1:
msg = (
"If packing is a list, it must have the "
"same number of elements as the argument to"
"cube."
)
raise ValueError(msg)
if not is_valid_packspec(packspec):
msg = "Invalid packing argument: {}.".format(packspec)
raise ValueError(msg)
packspecs = packing
# Make fill-value(s) into an iterable over cubes.
if isinstance(fill_value, str):
# Strings are awkward -- handle separately.
fill_values = repeat(fill_value)
else:
try:
fill_values = tuple(fill_value)
except TypeError:
fill_values = repeat(fill_value)
else:
if len(fill_values) != len(cubes):
msg = (
"If fill_value is a list, it must have the "
"same number of elements as the cube argument."
)
raise ValueError(msg)
# Initialise Manager for saving
with Saver(filename, netcdf_format) as sman:
# Iterate through the cubelist.
for cube, packspec, fill_value in zip(cubes, packspecs, fill_values):
sman.write(
cube,
local_keys,
unlimited_dimensions,
zlib,
complevel,
shuffle,
fletcher32,
contiguous,
chunksizes,
endian,
least_significant_digit,
packing=packspec,
fill_value=fill_value,
)
if iris.config.netcdf.conventions_override:
# Set to the default if custom conventions are not available.
conventions = cube.attributes.get(
"Conventions", CF_CONVENTIONS_VERSION
)
else:
conventions = CF_CONVENTIONS_VERSION
# Perform a CF patch of the conventions attribute.
cf_profile_available = iris.site_configuration.get(
"cf_profile"
) not in [None, False]
if cf_profile_available:
conventions_patch = iris.site_configuration.get(
"cf_patch_conventions"
)
if conventions_patch is not None:
conventions = conventions_patch(conventions)
else:
msg = "cf_profile is available but no {} defined.".format(
"cf_patch_conventions"
)
warnings.warn(msg)
# Add conventions attribute.
sman.update_global_attributes(Conventions=conventions)