Source code for iris.fileformats.netcdf.loader

# Copyright Iris contributors
#
# This file is part of Iris and is released under the LGPL license.
# See COPYING and COPYING.LESSER in the root of the repository for full
# licensing details.
"""
Module to support the loading of Iris cubes from NetCDF files, also using the CF
conventions for metadata interpretation.

See : `NetCDF User's Guide <https://docs.unidata.ucar.edu/nug/current/>`_
and `netCDF4 python module <https://github.com/Unidata/netcdf4-python>`_.

Also : `CF Conventions <https://cfconventions.org/>`_.

"""
import warnings

import netCDF4
import numpy as np

from iris._lazy_data import as_lazy_data
from iris.aux_factory import (
    AtmosphereSigmaFactory,
    HybridHeightFactory,
    HybridPressureFactory,
    OceanSFactory,
    OceanSg1Factory,
    OceanSg2Factory,
    OceanSigmaFactory,
    OceanSigmaZFactory,
)
import iris.config
import iris.coord_systems
import iris.coords
import iris.exceptions
import iris.fileformats.cf
from iris.fileformats.netcdf.saver import _CF_ATTRS
import iris.io
import iris.util

# Show actions activation statistics.
DEBUG = False

# Get the logger : shared logger for all in 'iris.fileformats.netcdf'.
from . import logger


def _actions_engine():
    # Return an 'actions engine', which provides a pyke-rules-like interface to
    # the core cf translation code.
    # Deferred import to avoid circularity.
    import iris.fileformats._nc_load_rules.engine as nc_actions_engine

    engine = nc_actions_engine.Engine()
    return engine


[docs]class NetCDFDataProxy: """A reference to the data payload of a single NetCDF file variable.""" __slots__ = ("shape", "dtype", "path", "variable_name", "fill_value") def __init__(self, shape, dtype, path, variable_name, fill_value): self.shape = shape self.dtype = dtype self.path = path self.variable_name = variable_name self.fill_value = fill_value @property def ndim(self): return len(self.shape) def __getitem__(self, keys): dataset = netCDF4.Dataset(self.path) try: variable = dataset.variables[self.variable_name] # Get the NetCDF variable data and slice. var = variable[keys] finally: dataset.close() return np.asanyarray(var) def __repr__(self): fmt = ( "<{self.__class__.__name__} shape={self.shape}" " dtype={self.dtype!r} path={self.path!r}" " variable_name={self.variable_name!r}>" ) return fmt.format(self=self) def __getstate__(self): return {attr: getattr(self, attr) for attr in self.__slots__} def __setstate__(self, state): for key, value in state.items(): setattr(self, key, value)
def _assert_case_specific_facts(engine, cf, cf_group): # Initialise a data store for built cube elements. # This is used to patch element attributes *not* setup by the actions # process, after the actions code has run. engine.cube_parts["coordinates"] = [] engine.cube_parts["cell_measures"] = [] engine.cube_parts["ancillary_variables"] = [] # Assert facts for CF coordinates. for cf_name in cf_group.coordinates.keys(): engine.add_case_specific_fact("coordinate", (cf_name,)) # Assert facts for CF auxiliary coordinates. for cf_name in cf_group.auxiliary_coordinates.keys(): engine.add_case_specific_fact("auxiliary_coordinate", (cf_name,)) # Assert facts for CF cell measures. for cf_name in cf_group.cell_measures.keys(): engine.add_case_specific_fact("cell_measure", (cf_name,)) # Assert facts for CF ancillary variables. for cf_name in cf_group.ancillary_variables.keys(): engine.add_case_specific_fact("ancillary_variable", (cf_name,)) # Assert facts for CF grid_mappings. for cf_name in cf_group.grid_mappings.keys(): engine.add_case_specific_fact("grid_mapping", (cf_name,)) # Assert facts for CF labels. for cf_name in cf_group.labels.keys(): engine.add_case_specific_fact("label", (cf_name,)) # Assert facts for CF formula terms associated with the cf_group # of the CF data variable. # Collect varnames of formula-root variables as we go. # NOTE: use dictionary keys as an 'OrderedSet' # - see: https://stackoverflow.com/a/53657523/2615050 # This is to ensure that we can handle the resulting facts in a definite # order, as using a 'set' led to indeterminate results. formula_root = {} for cf_var in cf.cf_group.formula_terms.values(): for cf_root, cf_term in cf_var.cf_terms_by_root.items(): # Only assert this fact if the formula root variable is # defined in the CF group of the CF data variable. if cf_root in cf_group: formula_root[cf_root] = True engine.add_case_specific_fact( "formula_term", (cf_var.cf_name, cf_root, cf_term), ) for cf_root in formula_root.keys(): engine.add_case_specific_fact("formula_root", (cf_root,)) def _actions_activation_stats(engine, cf_name): print("-" * 80) print("CF Data Variable: %r" % cf_name) engine.print_stats() print("Rules Triggered:") for rule in sorted(list(engine.rules_triggered)): print("\t%s" % rule) print("Case Specific Facts:") kb_facts = engine.get_kb() for key in kb_facts.entity_lists.keys(): for arg in kb_facts.entity_lists[key].case_specific_facts: print("\t%s%s" % (key, arg)) def _set_attributes(attributes, key, value): """Set attributes dictionary, converting unicode strings appropriately.""" if isinstance(value, str): try: attributes[str(key)] = str(value) except UnicodeEncodeError: attributes[str(key)] = value else: attributes[str(key)] = value def _add_unused_attributes(iris_object, cf_var): """ Populate the attributes of a cf element with the "unused" attributes from the associated CF-netCDF variable. That is, all those that aren't CF reserved terms. """ def attribute_predicate(item): return item[0] not in _CF_ATTRS tmpvar = filter(attribute_predicate, cf_var.cf_attrs_unused()) for attr_name, attr_value in tmpvar: _set_attributes(iris_object.attributes, attr_name, attr_value) def _get_actual_dtype(cf_var): # Figure out what the eventual data type will be after any scale/offset # transforms. dummy_data = np.zeros(1, dtype=cf_var.dtype) if hasattr(cf_var, "scale_factor"): dummy_data = cf_var.scale_factor * dummy_data if hasattr(cf_var, "add_offset"): dummy_data = cf_var.add_offset + dummy_data return dummy_data.dtype def _get_cf_var_data(cf_var, filename): # Get lazy chunked data out of a cf variable. dtype = _get_actual_dtype(cf_var) # Create cube with deferred data, but no metadata fill_value = getattr( cf_var.cf_data, "_FillValue", netCDF4.default_fillvals[cf_var.dtype.str[1:]], ) proxy = NetCDFDataProxy( cf_var.shape, dtype, filename, cf_var.cf_name, fill_value ) # Get the chunking specified for the variable : this is either a shape, or # maybe the string "contiguous". chunks = cf_var.cf_data.chunking() # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. if chunks == "contiguous": chunks = None return as_lazy_data(proxy, chunks=chunks) class _OrderedAddableList(list): """ A custom container object for actions recording. Used purely in actions debugging, to accumulate a record of which actions were activated. It replaces a set, so as to preserve the ordering of operations, with possible repeats, and it also numbers the entries. The actions routines invoke an 'add' method, so this effectively replaces a set.add with a list.append. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._n_add = 0 def add(self, msg): self._n_add += 1 n_add = self._n_add self.append(f"#{n_add:03d} : {msg}") def _load_cube(engine, cf, cf_var, filename): from iris.cube import Cube """Create the cube associated with the CF-netCDF data variable.""" data = _get_cf_var_data(cf_var, filename) cube = Cube(data) # Reset the actions engine. engine.reset() # Initialise engine rule processing hooks. engine.cf_var = cf_var engine.cube = cube engine.cube_parts = {} engine.requires = {} engine.rules_triggered = _OrderedAddableList() engine.filename = filename # Assert all the case-specific facts. # This extracts 'facts' specific to this data-variable (aka cube), from # the info supplied in the CFGroup object. _assert_case_specific_facts(engine, cf, cf_var.cf_group) # Run the actions engine. # This creates various cube elements and attaches them to the cube. # It also records various other info on the engine, to be processed later. engine.activate() # Having run the rules, now add the "unused" attributes to each cf element. def fix_attributes_all_elements(role_name): elements_and_names = engine.cube_parts.get(role_name, []) for iris_object, cf_var_name in elements_and_names: _add_unused_attributes(iris_object, cf.cf_group[cf_var_name]) # Populate the attributes of all coordinates, cell-measures and ancillary-vars. fix_attributes_all_elements("coordinates") fix_attributes_all_elements("ancillary_variables") fix_attributes_all_elements("cell_measures") # Also populate attributes of the top-level cube itself. _add_unused_attributes(cube, cf_var) # Work out reference names for all the coords. names = { coord.var_name: coord.standard_name or coord.var_name or "unknown" for coord in cube.coords() } # Add all the cube cell methods. cube.cell_methods = [ iris.coords.CellMethod( method=method.method, intervals=method.intervals, comments=method.comments, coords=[ names[coord_name] if coord_name in names else coord_name for coord_name in method.coord_names ], ) for method in cube.cell_methods ] if DEBUG: # Show activation statistics for this data-var (i.e. cube). _actions_activation_stats(engine, cf_var.cf_name) return cube def _load_aux_factory(engine, cube): """ Convert any CF-netCDF dimensionless coordinate to an AuxCoordFactory. """ formula_type = engine.requires.get("formula_type") if formula_type in [ "atmosphere_sigma_coordinate", "atmosphere_hybrid_height_coordinate", "atmosphere_hybrid_sigma_pressure_coordinate", "ocean_sigma_z_coordinate", "ocean_sigma_coordinate", "ocean_s_coordinate", "ocean_s_coordinate_g1", "ocean_s_coordinate_g2", ]: def coord_from_term(term): # Convert term names to coordinates (via netCDF variable names). name = engine.requires["formula_terms"].get(term, None) if name is not None: for coord, cf_var_name in engine.cube_parts["coordinates"]: if cf_var_name == name: return coord warnings.warn( "Unable to find coordinate for variable " "{!r}".format(name) ) if formula_type == "atmosphere_sigma_coordinate": pressure_at_top = coord_from_term("ptop") sigma = coord_from_term("sigma") surface_air_pressure = coord_from_term("ps") factory = AtmosphereSigmaFactory( pressure_at_top, sigma, surface_air_pressure ) elif formula_type == "atmosphere_hybrid_height_coordinate": delta = coord_from_term("a") sigma = coord_from_term("b") orography = coord_from_term("orog") factory = HybridHeightFactory(delta, sigma, orography) elif formula_type == "atmosphere_hybrid_sigma_pressure_coordinate": # Hybrid pressure has two valid versions of its formula terms: # "p0: var1 a: var2 b: var3 ps: var4" or # "ap: var1 b: var2 ps: var3" where "ap = p0 * a" # Attempt to get the "ap" term. delta = coord_from_term("ap") if delta is None: # The "ap" term is unavailable, so try getting terms "p0" # and "a" terms in order to derive an "ap" equivalent term. coord_p0 = coord_from_term("p0") if coord_p0 is not None: if coord_p0.shape != (1,): msg = ( "Expecting {!r} to be a scalar reference " "pressure coordinate, got shape {!r}".format( coord_p0.var_name, coord_p0.shape ) ) raise ValueError(msg) if coord_p0.has_bounds(): msg = ( "Ignoring atmosphere hybrid sigma pressure " "scalar coordinate {!r} bounds.".format( coord_p0.name() ) ) warnings.warn(msg) coord_a = coord_from_term("a") if coord_a is not None: if coord_a.units.is_unknown(): # Be graceful, and promote unknown to dimensionless units. coord_a.units = "1" delta = coord_a * coord_p0.points[0] delta.units = coord_a.units * coord_p0.units delta.rename("vertical pressure") delta.var_name = "ap" cube.add_aux_coord(delta, cube.coord_dims(coord_a)) sigma = coord_from_term("b") surface_air_pressure = coord_from_term("ps") factory = HybridPressureFactory(delta, sigma, surface_air_pressure) elif formula_type == "ocean_sigma_z_coordinate": sigma = coord_from_term("sigma") eta = coord_from_term("eta") depth = coord_from_term("depth") depth_c = coord_from_term("depth_c") nsigma = coord_from_term("nsigma") zlev = coord_from_term("zlev") factory = OceanSigmaZFactory( sigma, eta, depth, depth_c, nsigma, zlev ) elif formula_type == "ocean_sigma_coordinate": sigma = coord_from_term("sigma") eta = coord_from_term("eta") depth = coord_from_term("depth") factory = OceanSigmaFactory(sigma, eta, depth) elif formula_type == "ocean_s_coordinate": s = coord_from_term("s") eta = coord_from_term("eta") depth = coord_from_term("depth") a = coord_from_term("a") depth_c = coord_from_term("depth_c") b = coord_from_term("b") factory = OceanSFactory(s, eta, depth, a, b, depth_c) elif formula_type == "ocean_s_coordinate_g1": s = coord_from_term("s") c = coord_from_term("c") eta = coord_from_term("eta") depth = coord_from_term("depth") depth_c = coord_from_term("depth_c") factory = OceanSg1Factory(s, c, eta, depth, depth_c) elif formula_type == "ocean_s_coordinate_g2": s = coord_from_term("s") c = coord_from_term("c") eta = coord_from_term("eta") depth = coord_from_term("depth") depth_c = coord_from_term("depth_c") factory = OceanSg2Factory(s, c, eta, depth, depth_c) cube.add_aux_factory(factory) def _translate_constraints_to_var_callback(constraints): """ Translate load constraints into a simple data-var filter function, if possible. Returns: * function(cf_var:CFDataVariable): --> bool, or None. For now, ONLY handles a single NameConstraint with no 'STASH' component. """ import iris._constraints constraints = iris._constraints.list_of_constraints(constraints) result = None if len(constraints) == 1: (constraint,) = constraints if ( isinstance(constraint, iris._constraints.NameConstraint) and constraint.STASH == "none" ): # As long as it doesn't use a STASH match, then we can treat it as # a testing against name properties of cf_var. # That's just like testing against name properties of a cube, except that they may not all exist. def inner(cf_datavar): match = True for name in constraint._names: expected = getattr(constraint, name) if name != "STASH" and expected != "none": attr_name = "cf_name" if name == "var_name" else name # Fetch property : N.B. CFVariable caches the property values # The use of a default here is the only difference from the code in NameConstraint. if not hasattr(cf_datavar, attr_name): continue actual = getattr(cf_datavar, attr_name, "") if actual != expected: match = False break return match result = inner return result
[docs]def load_cubes(filenames, callback=None, constraints=None): """ Loads cubes from a list of NetCDF filenames/OPeNDAP URLs. Args: * filenames (string/list): One or more NetCDF filenames/OPeNDAP URLs to load from. Kwargs: * callback (callable function): Function which can be passed on to :func:`iris.io.run_callback`. Returns: Generator of loaded NetCDF :class:`iris.cube.Cube`. """ # TODO: rationalise UGRID/mesh handling once experimental.ugrid is folded # into standard behaviour. # Deferred import to avoid circular imports. from iris.experimental.ugrid.cf import CFUGridReader from iris.experimental.ugrid.load import ( PARSE_UGRID_ON_LOAD, _build_mesh_coords, _meshes_from_cf, ) from iris.io import run_callback # Create a low-level data-var filter from the original load constraints, if they are suitable. var_callback = _translate_constraints_to_var_callback(constraints) # Create an actions engine. engine = _actions_engine() if isinstance(filenames, str): filenames = [filenames] for filename in filenames: # Ingest the netCDF file. meshes = {} if PARSE_UGRID_ON_LOAD: cf = CFUGridReader(filename) meshes = _meshes_from_cf(cf) else: cf = iris.fileformats.cf.CFReader(filename) # Process each CF data variable. data_variables = list(cf.cf_group.data_variables.values()) + list( cf.cf_group.promoted.values() ) for cf_var in data_variables: if var_callback and not var_callback(cf_var): # Deliver only selected results. continue # cf_var-specific mesh handling, if a mesh is present. # Build the mesh_coords *before* loading the cube - avoids # mesh-related attributes being picked up by # _add_unused_attributes(). mesh_name = None mesh = None mesh_coords, mesh_dim = [], None if PARSE_UGRID_ON_LOAD: mesh_name = getattr(cf_var, "mesh", None) if mesh_name is not None: try: mesh = meshes[mesh_name] except KeyError: message = ( f"File does not contain mesh: '{mesh_name}' - " f"referenced by variable: '{cf_var.cf_name}' ." ) logger.debug(message) if mesh is not None: mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var) cube = _load_cube(engine, cf, cf_var, filename) # Attach the mesh (if present) to the cube. for mesh_coord in mesh_coords: cube.add_aux_coord(mesh_coord, mesh_dim) # Process any associated formula terms and attach # the corresponding AuxCoordFactory. try: _load_aux_factory(engine, cube) except ValueError as e: warnings.warn("{}".format(e)) # Perform any user registered callback function. cube = run_callback(callback, cube, cf_var, filename) # Callback mechanism may return None, which must not be yielded if cube is None: continue yield cube