Skip to content

Data Formats

In-memory Data

Basic Concepts

The main format we're using within plottr is the DataDict. While most of the actual numeric data will typically live in numpy arrays (or lists, or similar), they don't typically capture easily arbitrary metadata and relationships between arrays. Say, for example, we have some data z that depends on two other variables, x and y. This information has be stored somewhere, and numpy doesn't offer readily a solution here. There are various extensions, for example xarray or the MetaArray class. Those however typically have a grid format in mind, which we do not want to impose. Instead, we use a wrapper around the python dictionary that contains all the required meta information to infer the relevant relationships, and that uses numpy arrays internally to store the numeric data. Additionally we can store any other arbitrary meta data.

A DataDict container (a dataset) can contain multiple data fields (or variables), that have values and can contain their own meta information. Importantly, we distinct between independent fields (the axes) and dependent fields (the data).

Despite the naming, axes is not meant to imply that the data have to have a certain shape (but the degree to which this is true depends on the class used). A list of classes for different shapes of data can be found below.

The basic structure of data conceptually looks like this (we inherit from dict):

{
    'data_1' : {
        'axes' : ['ax1', 'ax2'],
        'unit' : 'some unit',
        'values' : [ ... ],
        '__meta__' : 'This is very important data',
        ...
    },
    'ax1' : {
        'axes' : [],
        'unit' : 'some other unit',
        'values' : [ ... ],
        ...,
    },
    'ax2' : {
        'axes' : [],
        'unit' : 'a third unit',
        'values' : [ ... ],
        ...,
    },
    '__globalmeta__' : 'some information about this data set',
    '__moremeta__' : 1234,
    ...
}

In this case we have one dependent variable, data_1, that depends on two axes, ax1 and ax2. This concept is restricted only in the following way:

  • A dependent can depend on any number of independents.
  • An independent cannot depend on other fields itself.
  • Any field that does not depend on another, is treated as an axis.

Note that meta information is contained in entries whose keys start and end with double underscores. Both the DataDict itself, as well as each field can contain meta information.

In the most basic implementation, the only restriction on the data values is that they need to be contained in a sequence (typically as list, or numpy array), and that the length of all values in the data set (the number of records) must be equal. Note that this does not preclude nested sequences!

Relevant Data Classes

DataDictBase: The main base class. Only checks for correct dependencies. Any requirements on data structure is left to the inheriting classes. The class contains methods for easy access to data and metadata.

DataDict: The only requirement for valid data is that the number of records is the same for all data fields. Contains some tools for expansion of data.

MeshgridDataDict: For data that lives on a grid (not necessarily regular).

Datadict

Note

Because DataDicts are python dictionaries , we highly recommend becoming familiar with them before utilizing DataDicts.

Basic Use

We can start by creating an empty DataDict like any other python object:

>>> data_dict = DataDict()
>>> data_dict
{}

We can create the structure of the data_dict by creating dictionary items and populating them like a normal python dictionary:

>>> data_dict['x'] = dict(unit='m')
>>> data_dict
{'x': {'unit': 'm'}}

We can also start by creating a DataDict that has the structure of the data we are going to record:

>>> data_dict = DataDict(x=dict(unit='m'), y = dict(unit='m'), z = dict(axes=['x', 'y']))
>>> data_dict
{'x': {'unit': 'm'}, 'y': {'unit': 'm'}, 'z': {'axes': ['x', 'y']}}

The DataDict that we just created contains no data yet, only the structure and relationship of the data fields. We have also specified the unit of x and y and which variables are independent variables (x, y), or how we will call them from now on, axes and dependent variables (z), or, dependents.

Structure

From the basic and empty DataDict we can already start to inspect its structure. To see the entire structure of a DataDict we can use the structure() method:

>>> data_dict = DataDict(x=dict(unit='m'), y = dict(unit='m'), z = dict(axes=['x', 'y']))
>>> data_dict.structure()
{'x': {'unit': 'm', 'axes': [], 'label': ''},
 'y': {'unit': 'm', 'axes': [], 'label': ''},
 'z': {'axes': ['x', 'y'], 'unit': '', 'label': ''}}

We can check for specific things inside the DataDict. We can look at the axes:

>>> data_dict.axes()
['x', 'y']

We can look at all the dependents:

>>> data_dict.dependents()
['z']

We can also see the shape of a DataDict by using the shapes() method:

>>> data_dict.shapes()
{'x': (0,), 'y': (0,), 'z': (0,)}

Populating the DataDict

One of the only "restrictions" that DataDict implements is that every data field must have the same number of records (items). However, restrictions is in quotes because there is nothing that is stopping you from having different data fields have different number of records, this will only make the DataDict invalid. We will explore what his means later.

There are 2 different ways of safely populating a DataDict, adding data to it or appending 2 different DataDict to each other.

Note

You can always manually update the item values any data field like any other item of a python dictionary, however, populating the DataDict this way can result in an invalid DataDict if you are not being careful. Both population methods presented below contains checks to make sure that the new data being added will not create an invalid DataDict.

We can add data to an existing DataDict with the [add_data()(#labcore.data.datadict.DataDict.add_data) method:

>>> data_dict = DataDict(x=dict(unit='m'), y = dict(unit='m'), z = dict(axes=['x', 'y']))
>>> data_dict.add_data(x=[0,1,2], y=[0,1,2], z=[0,1,4])
>>> data_dict
{'x': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2])},
 'y': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2])},
 'z': {'axes': ['x', 'y'],  'unit': '',  'label': '',  'values': array([0, 1, 4])}}

We now have a populated DataDict. It is important to notice that this method will also add any of the missing special keys that a data field doesn't have (values, axes, unit, and label). Populating the DataDict with this method will also ensure that every item has the same number of records and the correct shape, either by adding nan to the other data fields or by nesting the data arrays so that the outer most dimension of every data field has the same number of records.

We can see this in action if we add a single record to a data field with items but no the rest:

>>> data_dict.add_data(x=[9])
>>> data_dict
{'x': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2, 9])},
 'y': {'unit': 'm', 'axes': [], 'label': '', 'values': array([ 0.,  1.,  2., nan])},
 'z': {'axes': ['x', 'y'], 'unit': '', 'label': '', 'values': array([ 0.,  1.,  4., nan])}}

As we can see, both y and z have an extra nan record in them. We can observe the change of dimension if we do not add the same number of records to all data fields:

>>> data_dict = DataDict(x=dict(unit='m'), y = dict(unit='m'), z = dict(axes=['x', 'y']))
>>> data_dict.add_data(x=[0,1,2], y=[0,1,2],z=[0])
>>> data_dict
{'x': {'unit': 'm', 'axes': [], 'label': '', 'values': array([[0, 1, 2]])},
 'y': {'unit': 'm', 'axes': [], 'label': '', 'values': array([[0, 1, 2]])},
 'z': {'axes': ['x', 'y'], 'unit': '', 'label': '', 'values': array([0])}}

If we want to expand our DataDict by appending another one, we need to make sure that both of our DataDicts have the same inner structure. We can check that by utilizing the static method same_structure():

>>> data_dict_1 = DataDict(x=dict(unit='m'), y=dict(unit='m'), z=dict(axes=['x','y']))
>>> data_dict_2 = DataDict(x=dict(unit='m'), y=dict(unit='m'), z=dict(axes=['x','y']))
>>> data_dict_1.add_data(x=[0,1,2], y=[0,1,2], z=[0,1,4])
>>> data_dict_2.add_data(x=[3,4], y=[3,4], z=[9,16])
>>> DataDict.same_structure(data_dict_1, data_dict_2)
True

Note

Make sure that both DataDicts have the exact same structure. This means that every item of every data field that appears when using the method same_structure() (unit, axes, and label) are identical to one another, except for values. Any slight difference will make this method fail due to conflicting structures.

The append() method will do this check before appending the 2 DataDict, and will only append them if the check returns True. Once we know that the structure is the same we can append them:

>>> data_dict_1.append(data_dict_2)
>>> data_dict_1
{'x': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2, 3, 4])},
 'y': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2, 3, 4])},
 'z': {'axes': ['x', 'y'], 'unit': '', 'label': '', 'values': array([ 0,  1,  4,  9, 16])}}

Meta Data

One of the advantages DataDicts have over regular python dictionaries is their ability to contain meta data. Meta data can be added to the entire DataDict or to individual data fields. Any object inside a DataDict whose key starts and ends with two underscores is considered meta data.

We can simply add meta data manually by adding an item with the proper notation:

>>> data_dict['__metadata__'] = 'important meta data'

Or we can use the add_meta() method:

>>> data_dict.add_meta('sample_temperature', '10mK')
>>> data_dict
{'x': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2])},
 'y': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2])},
 'z': {'axes': ['x', 'y'], 'unit': '', 'label': '', 'values': array([0, 1, 4])},
 '__metadata__': 'important meta data',
 '__sample_temperature__': '10mK'}

We can also add meta data to a specific data field by passing its name as the last argument:

>>> data_dict.has_meta('sample_temperature')
True

We can retrieve the meta data with the meta_val() method:

>>> data_dict.meta_val('sample_temperature')
'10mK'

We can also ask for a meta value from a specific data field by passing the data field as the second argument:

>>> data_dict.meta_val('extra_metadata','x')
'important meta data'

We can delete a specific meta field by using the delete_meta() method:

>>> data_dict.delete_meta('metadata')
>>> data_dict.has_meta('metadata')
False

This also work for meta data in data fields by passing the data field as the last argument:

>>> data_dict.delete_meta('extra_metadata', 'x')
>>> data_dict['x']
{'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2])}

We can delete all the meta data present in the DataDict with the clear_meta() method:

>>> data_dict.add_meta('metadata', 'important meta data')
>>> data_dict.add_meta('extra_metadata', 'important meta data', 'x')
>>> data_dict.clear_meta()
>>> data_dict
{'x': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2])},
 'y': {'unit': 'm', 'axes': [], 'label': '', 'values': array([0, 1, 2])},
 'z': {'axes': ['x', 'y'], 'unit': '', 'label': '', 'values': array([0, 1, 4])}}

Note

There are 3 helper functions in the datadict module that help converting from meta data name to key. These are:

Meshgrid DataDict

A dataset where the axes form a grid on which the dependent values reside.

This is a more special case than DataDict, but a very common scenario. To support flexible grids, this class requires that all axes specify values for each datapoint, rather than a single row/column/dimension.

For example, if we want to specify a 3-dimensional grid with axes x, y, z, the values of x, y, z all need to be 3-dimensional arrays; the same goes for all dependents that live on that grid. Then, say, x[i,j,k] is the x-coordinate of point i,j,k of the grid.

This implies that a MeshgridDataDict can only have a single shape, i.e., all data values share the exact same nesting structure.

For grids where the axes do not depend on each other, the correct values for the axes can be obtained from np.meshgrid (hence the name of the class).

Example: a simple uniform 3x2 grid might look like this; x and y are the coordinates of the grid, and z is a function of the two:

    x = [[0, 0],
         [1, 1],
         [2, 2]]

    y = [[0, 1],
         [0, 1],
         [0, 1]]

    z = x * y =
        [[0, 0],
         [0, 1],
         [0, 2]]

Note

Internally we will typically assume that the nested axes are ordered from slow to fast, i.e., dimension 1 is the most outer axis, and dimension N of an N-dimensional array the most inner (i.e., the fastest changing one). This guarantees, for example, that the default implementation of np.reshape has the expected outcome. If, for some reason, the specified axes are not in that order (e.g., we might have z with axes = ['x', 'y'], but x is the fast axis in the data). In such a case, the guideline is that at creation of the meshgrid, the data should be transposed such that it conforms correctly to the order as given in the axis = [...] specification of the data. The function datadict_to_meshgrid() provides options for that.

This implementation of DataDictBase consists only of three extra methods:

So the only way of populating it is by manually modifying the values object of each data field since the tools for populating the DataDict are specific to the DataDict implementation.

DataDict Storage

The datadict_storage.py module offers tools to help with saving DataDicts into disk by storing them in DDH5 files (HDF5 files that contains DataDicts inside).

Description of the HDF5 Storage Format

We use a simple mapping from DataDict to the HDF5 file. Within the file, a single DataDict is stored in a (top-level) group of the file. The data fields are datasets within that group.

Global meta data of the DataDict are attributes of the group; field meta data are attributes of the dataset (incl., the unit and axes values). The meta data keys are given exactly like in the DataDict, i.e., includes the double underscore pre- and suffix.

For more specific information on how HDF5 works please read the following documentation

Working With DDH5 Files

When we are working with data, the first thing we usually want to do is to save it in disk. We can directly save an already existing DataDict into disk by calling the function datadict_to_hdf5().

>>> data_dict = DataDict(x=dict(values=np.array([0,1,2]), axes=[], __unit__='cm'), y=dict(values=np.array([3,4,5]), axes=['x']))
>>> data_dict
{'x': {'values': array([0, 1, 2]), 'axes': [], '__unit__': 'cm'},
 'y': {'values': array([3, 4, 5]), 'axes': ['x']}}
>>> datadict_to_hdf5(data_dict, 'folder\data.ddh5')

datadict_to_hdf5() will save data_dict in a file named 'data.ddh5' in whatever directory is passed to it, creating new folders if they don't already exists. The file will contain all of the data fields as well as all the metadata, with some more metadata generated to specify when the DataDict was created.

Note

Meta data is only written during initial writing of the dataset. If we're appending to existing datasets, we're not setting meta data anymore.

Warning

For this method to properly work the objects that are being saved in the values key of a data field must by a numpy array, or numpy array like.

Data saved on disk is useless however if we do not have a way of accessing it. To do this we use the datadict_from_hdf5():

>>> loaded_data_dict = datadict_from_hdf5('folder\data.ddh5')
>>> loaded_data_dict
{'__creation_time_sec__': 1651159636.0,
 '__creation_time_str__': '2022-04-28 10:27:16',
 'x': {'values': array([0, 1, 2]),
  'axes': [],
  '__shape__': (3,),
  '__creation_time_sec__': 1651159636.0,
  '__creation_time_str__': '2022-04-28 10:27:16',
  '__unit__': 'cm',
  'unit': '',
  'label': ''},
 'y': {'values': array([3, 4, 5]),
  'axes': ['x'],
  '__shape__': (3,),
  '__creation_time_sec__': 1651159636.0,
  '__creation_time_str__': '2022-04-28 10:27:16',
  'unit': '',
  'label': ''}}

We can see that the DataDict is the same one we saved earlier with the added metadata that indicates the time it was created.

By default both datadict_to_hdf5() and datadict_from_hdf5() save and load the datadict in the 'data' group of the DDH5. Both of these can by changed by passing another group to the argument 'groupname'. We can see this if we manually create a second group and save a new DataDict there:

>>> data_dict2 = DataDict(a=dict(values=np.array([0,1,2]), axes=[], __unit__='cm'), b=dict(values=np.array([3,4,5]), axes=['a']))
>>> with h5py.File('folder\data.ddh5', 'a') as file:
>>>    file.create_group('other_data')
>>> datadict_to_hdf5(data_dict2, 'folder\data.ddh5', groupname='other_data')

If we then load the DDH5 file like before we only see the first DataDict:

>>> loaded_data_dict = datadict_from_hdf5('folder\data.ddh5', 'data')
>>> loaded_data_dict
{'__creation_time_sec__': 1651159636.0,
 '__creation_time_str__': '2022-04-28 10:27:16',
 'x': {'values': array([0, 1, 2]),
  'axes': [],
  '__shape__': (3,),
  '__creation_time_sec__': 1651159636.0,
  '__creation_time_str__': '2022-04-28 10:27:16',
  '__unit__': 'cm',
  'unit': '',
  'label': ''},
 'y': {'values': array([3, 4, 5]),
  'axes': ['x'],
  '__shape__': (3,),
  '__creation_time_sec__': 1651159636.0,
  '__creation_time_str__': '2022-04-28 10:27:16',
  'unit': '',
  'label': ''}}

To see the other DataDict we can specify the group in the argument 'groupname':

>>> loaded_data_dict = datadict_from_hdf5('folder\data.ddh5', 'other_data')
>>> loaded_data_dict
{'a': {'values': array([0, 1, 2]),
  'axes': [],
  '__shape__': (3,),
  '__creation_time_sec__': 1651159636.0,
  '__creation_time_str__': '2022-04-28 10:27:16',
  '__unit__': 'cm',
  'unit': '',
  'label': ''},
 'b': {'values': array([3, 4, 5]),
  'axes': ['a'],
  '__shape__': (3,),
  '__creation_time_sec__': 1651159636.0,
  '__creation_time_str__': '2022-04-28 10:27:16',
  'unit': '',
  'label': ''}}

We can also use all_datadicts_from_hdf5() to get a dictionary with all DataDicts in every group inside:

>>> all_datadicts = all_datadicts_from_hdf5('folder\data.ddh5')
>>> all_datadicts
{'data': {'__creation_time_sec__': 1651159636.0,
  '__creation_time_str__': '2022-04-28 10:27:16',
  'x': {'values': array([0, 1, 2]),
   'axes': [],
   '__shape__': (3,),
   '__creation_time_sec__': 1651159636.0,
   '__creation_time_str__': '2022-04-28 10:27:16',
   '__unit__': 'cm',
   'unit': '',
   'label': ''},
  'y': {'values': array([3, 4, 5]),
   'axes': ['x'],
   '__shape__': (3,),
   '__creation_time_sec__': 1651159636.0,
   '__creation_time_str__': '2022-04-28 10:27:16',
   'unit': '',
   'label': ''}},
 'other_data': {'a': {'values': array([0, 1, 2]),
   'axes': [],
   '__shape__': (3,),
   '__creation_time_sec__': 1651159636.0,
   '__creation_time_str__': '2022-04-28 10:27:16',
   '__unit__': 'cm',
   'unit': '',
   'label': ''},
  'b': {'values': array([3, 4, 5]),
   'axes': ['a'],
   '__shape__': (3,),
   '__creation_time_sec__': 1651159636.0,
   '__creation_time_str__': '2022-04-28 10:27:16',
   'unit': '',
   'label': ''}}}

DDH5 Writer

Most times we want to be saving data to disk as soon as it is generated by an experiment (or iteration), instead of waiting to have a complete DataDict. To do this, Datadict_storage also offers a context manager with which we can safely save our incoming data.

To use it we first need to create an empty DataDict that contains the structure of how the data is going to look like:

>>> data_dict = DataDict(
>>> x = dict(unit='x_unit'),
>>> y = dict(unit='y_unit', axes=['x']))

With our created DataDict, we can start the DDH5Writer context manager and add data to our DataDict utilizing the add_data()

>>> with DDH5Writer(datadict=data_dict, basedir='./data/', name='Test') as writer:
>>>    for x in range(10):
>>>        writer.add_data(x=x, y=x**2)
Data location:  data\2022-04-27\2022-04-27T145308_a986867c-Test\data.ddh5

The writer created the folder 'data' (because it did not exist before) and inside that folder, created another new folder for the current day and another new folder inside of it day folder for the the DataDict that we saved with the naming structure of YYYY-mm-dd_THHMMSS_<ID>-<name>/<filename>.ddh5, where name is the name parameter passed to the writer. The writer creates this structure such that when we run the writer again with new data, it will create another folder following the naming structure inside the current date folder. This way each new DataDict will be saved in the date it was generated with a time stamp in the name of the folder containing it.

Change File Extension and Time Format

Finally, datadict_storage contains 2 module variables, 'DATAFILEXT' and 'TIMESTRFORMAT'.

'DATAFILEXT' by default is 'ddh5', and it is used to specify the extension file of all of the module saving functions. Change this variable if you want your HDF5 to have a different extension by default, instead of passing it everytime.

'TIMESTRFORMAT' specifies how the time is formated in the new metadata created when saving a DataDict. The default is: "%Y-%m-%d %H:%M:%S", and it follows the structure of strftime.

Reference

Datadict

datadict.py :

Data classes we use throughout the plottr package, and tools to work on them.

DataDict

Bases: DataDictBase

The most basic implementation of the DataDict class.

It only enforces that the number of records per data field must be equal for all fields. This refers to the most outer dimension in case of nested arrays.

The class further implements simple appending of datadicts through the DataDict.append method, as well as allowing addition of DataDict instances.

Source code in labcore/data/datadict.py
class DataDict(DataDictBase):
    """
    The most basic implementation of the DataDict class.

    It only enforces that the number of `records` per data field must be
    equal for all fields. This refers to the most outer dimension in case
    of nested arrays.

    The class further implements simple appending of datadicts through the
    ``DataDict.append`` method, as well as allowing addition of DataDict
    instances.
    """

    def __add__(self, newdata: 'DataDict') -> 'DataDict':
        """
        Adding two datadicts by appending each data array.

        Requires that the datadicts have the same structure.
        Retains the meta information of the first array.

        :param newdata: DataDict to be added.
        :returns: combined DataDict.
        :raises: ``ValueError`` if the structures are incompatible.
        """

        # FIXME: remove shape
        s = misc.unwrap_optional(self.structure(add_shape=False))
        if DataDictBase.same_structure(self, newdata):
            for k, v in self.data_items():
                val0 = self[k]['values']
                val1 = newdata[k]['values']
                s[k]['values'] = np.append(
                    self[k]['values'],
                    newdata[k]['values'],
                    axis=0
                )
            return s
        else:
            raise ValueError('Incompatible data structures.')

    def append(self, newdata: "DataDict") -> None:
        """
        Append a datadict to this one by appending data values.

        :param newdata: DataDict to append.
        :raises: ``ValueError``, if the structures are incompatible.
        """
        if not DataDictBase.same_structure(self, newdata):
            raise ValueError('Incompatible data structures.')

        newvals = {}
        for k, v in newdata.data_items():
            if isinstance(self[k]['values'], list) and isinstance(
                    v['values'], list):
                newvals[k] = self[k]['values'] + v['values']
            else:
                newvals[k] = np.append(
                    self[k]['values'],
                    v['values'],
                    axis=0
                )

        # only actually
        for k, v in newvals.items():
            self[k]['values'] = v

    def add_data(self, **kw: Any) -> None:
        # TODO: fill non-given data with nan or none
        """
        Add data to all values. new data must be valid in itself.

        This method is useful to easily add data without needing to specify
        meta data or dependencies, etc.

        :param kw: one array per data field (none can be omitted).
        """
        dd = misc.unwrap_optional(self.structure(same_type=True))
        for name, _ in dd.data_items():
            if name not in kw:
                kw[name] = None

        records = self.to_records(**kw)
        for name, datavals in records.items():
            dd[name]['values'] = datavals

        if dd.validate():
            nrecords = self.nrecords()
            if nrecords is not None and nrecords > 0:
                self.append(dd)
            else:
                for key, val in dd.data_items():
                    self[key]['values'] = val['values']
            self.validate()

    # shape information and expansion

    def nrecords(self) -> Optional[int]:
        """
        Gets the number of records in the dataset.

        :return: The number of records in the dataset.
        """
        self.validate()
        for _, v in self.data_items():
            return len(v['values'])
        return None

    def _inner_shapes(self) -> Dict[str, Tuple[int, ...]]:
        shapes = self.shapes()
        return {k: v[1:] for k, v in shapes.items()}

    def is_expanded(self) -> bool:
        """
        Determine if the DataDict is expanded.

        :return: ``True`` if expanded. ``False`` if not.
        """
        ishp = self._inner_shapes()
        if set(ishp.values()) == {tuple()}:
            return True
        else:
            return False

    def is_expandable(self) -> bool:
        """
        Determine if the DataDict can be expanded.

        Expansion flattens all nested data values to a 1D array. For doing so,
        we require that all data fields that have nested/inner dimensions (i.e,
        inside the `records` level) shape the inner shape.
        In other words, all data fields must be of shape (N,) or (N, (shape)),
        where shape is common to all that have a shape not equal to (N,).

        :return: ``True`` if expandable. ``False`` otherwise.
        """
        shp = self._inner_shapes()
        if len(set(shp.values())) == 1:
            return True
        elif len(set(shp.values())) == 2 and tuple() in set(shp.values()):
            return True
        else:
            return False

    def expand(self) -> 'DataDict':
        """
        Expand nested values in the data fields.

        Flattens all value arrays. If nested dimensions
        are present, all data with non-nested dims will be repeated
        accordingly -- each record is repeated to match the size of
        the nested dims.

        :return: The flattened dataset.
        :raises: ``ValueError`` if data is not expandable.
        """
        self.validate()
        if not self.is_expandable():
            raise ValueError('Data cannot be expanded.')
        struct = misc.unwrap_optional(self.structure(add_shape=False))
        ret = DataDict(**struct)

        if self.is_expanded():
            return self

        ishp = self._inner_shapes()
        size = max([int(np.prod(s)) for s in ishp.values()])

        for k, v in self.data_items():
            reps = size // np.prod(ishp[k])
            if reps > 1:
                ret[k]['values'] = \
                    self[k]['values'].repeat(reps, axis=0).reshape(-1)
            else:
                ret[k]['values'] = self[k]['values'].reshape(-1)

        return ret

    # validation and sanitizing

    def validate(self) -> bool:
        """
        Check dataset validity.

        Beyond the checks performed in the base class ``DataDictBase``,
        check whether the number of records is the same for all data fields.

        :return: ``True`` if valid.
        :raises: ``ValueError`` if invalid.
        """
        if super().validate():
            nvals = None
            nvalsrc = None
            msg = '\n'

            for n, v in self.data_items():
                if type(v['values']) not in [np.ndarray,
                                             np.ma.core.MaskedArray]:
                    self[n]['values'] = np.array(v['values'])

                if nvals is None:
                    nvals = len(v['values'])
                    nvalsrc = n
                else:
                    if len(v['values']) != nvals:
                        msg += " * '{}' has length {}, but have found {} in " \
                               "'{}'\n".format(
                            n, len(v['values']), nvals, nvalsrc)

            if msg != '\n':
                raise ValueError(msg)

        return True

    def sanitize(self) -> "DataDict":
        """
        Clean-up.

        Beyond the tasks of the base class ``DataDictBase``:
            * remove invalid entries as far as reasonable.

        :return: sanitized DataDict.
        """
        ret = super().sanitize()
        return ret.remove_invalid_entries()

    def remove_invalid_entries(self) -> 'DataDict':
        """
        Remove all rows that are ``None`` or ``np.nan`` in *all* dependents.

        :return: The cleaned DataDict.
        """
        ishp = self._inner_shapes()
        idxs = []

        # collect rows that are completely invalid
        for d in self.dependents():

            #  need to discriminate whether there are nested dims or not
            if len(ishp[d]) == 0:
                rows = self.data_vals(d)
            else:
                datavals = self.data_vals(d)
                rows = datavals.reshape(-1, int(np.prod(ishp[d])))

            _idxs: np.ndarray = np.array([])

            # get indices of all rows that are fully None
            if len(ishp[d]) == 0:
                _newidxs = np.atleast_1d(np.asarray(rows is None)).nonzero()[0]
            else:
                _newidxs = np.atleast_1d(np.asarray(np.all(rows is None, axis=-1))).nonzero()[0]
            _idxs = np.append(_idxs, _newidxs)

            # get indices for all rows that are fully NaN. works only
            # for some dtypes, so except TypeErrors.
            try:
                if len(ishp[d]) == 0:
                    _newidxs = np.where(np.isnan(rows))[0]
                else:
                    _newidxs = np.where(np.all(np.isnan(rows), axis=-1))[0]
                _idxs = np.append(_idxs, _newidxs)
            except TypeError:
                pass

            idxs.append(_idxs)

        if len(idxs) > 0:
            remove_idxs = reduce(np.intersect1d,
                                 tuple(np.array(idxs).astype(int)))
            for k, v in self.data_items():
                v['values'] = np.delete(v['values'], remove_idxs, axis=0)

        return self

__add__(newdata)

Adding two datadicts by appending each data array.

Requires that the datadicts have the same structure. Retains the meta information of the first array.

Parameters:

Name Type Description Default
newdata DataDict

DataDict to be added.

required

Returns:

Type Description
DataDict

combined DataDict.

Source code in labcore/data/datadict.py
def __add__(self, newdata: 'DataDict') -> 'DataDict':
    """
    Adding two datadicts by appending each data array.

    Requires that the datadicts have the same structure.
    Retains the meta information of the first array.

    :param newdata: DataDict to be added.
    :returns: combined DataDict.
    :raises: ``ValueError`` if the structures are incompatible.
    """

    # FIXME: remove shape
    s = misc.unwrap_optional(self.structure(add_shape=False))
    if DataDictBase.same_structure(self, newdata):
        for k, v in self.data_items():
            val0 = self[k]['values']
            val1 = newdata[k]['values']
            s[k]['values'] = np.append(
                self[k]['values'],
                newdata[k]['values'],
                axis=0
            )
        return s
    else:
        raise ValueError('Incompatible data structures.')

add_data(**kw)

Add data to all values. new data must be valid in itself.

This method is useful to easily add data without needing to specify meta data or dependencies, etc.

Parameters:

Name Type Description Default
kw Any

one array per data field (none can be omitted).

{}
Source code in labcore/data/datadict.py
def add_data(self, **kw: Any) -> None:
    # TODO: fill non-given data with nan or none
    """
    Add data to all values. new data must be valid in itself.

    This method is useful to easily add data without needing to specify
    meta data or dependencies, etc.

    :param kw: one array per data field (none can be omitted).
    """
    dd = misc.unwrap_optional(self.structure(same_type=True))
    for name, _ in dd.data_items():
        if name not in kw:
            kw[name] = None

    records = self.to_records(**kw)
    for name, datavals in records.items():
        dd[name]['values'] = datavals

    if dd.validate():
        nrecords = self.nrecords()
        if nrecords is not None and nrecords > 0:
            self.append(dd)
        else:
            for key, val in dd.data_items():
                self[key]['values'] = val['values']
        self.validate()

append(newdata)

Append a datadict to this one by appending data values.

Parameters:

Name Type Description Default
newdata DataDict

DataDict to append.

required
Source code in labcore/data/datadict.py
def append(self, newdata: "DataDict") -> None:
    """
    Append a datadict to this one by appending data values.

    :param newdata: DataDict to append.
    :raises: ``ValueError``, if the structures are incompatible.
    """
    if not DataDictBase.same_structure(self, newdata):
        raise ValueError('Incompatible data structures.')

    newvals = {}
    for k, v in newdata.data_items():
        if isinstance(self[k]['values'], list) and isinstance(
                v['values'], list):
            newvals[k] = self[k]['values'] + v['values']
        else:
            newvals[k] = np.append(
                self[k]['values'],
                v['values'],
                axis=0
            )

    # only actually
    for k, v in newvals.items():
        self[k]['values'] = v

expand()

Expand nested values in the data fields.

Flattens all value arrays. If nested dimensions are present, all data with non-nested dims will be repeated accordingly -- each record is repeated to match the size of the nested dims.

Returns:

Type Description
DataDict

The flattened dataset.

Source code in labcore/data/datadict.py
def expand(self) -> 'DataDict':
    """
    Expand nested values in the data fields.

    Flattens all value arrays. If nested dimensions
    are present, all data with non-nested dims will be repeated
    accordingly -- each record is repeated to match the size of
    the nested dims.

    :return: The flattened dataset.
    :raises: ``ValueError`` if data is not expandable.
    """
    self.validate()
    if not self.is_expandable():
        raise ValueError('Data cannot be expanded.')
    struct = misc.unwrap_optional(self.structure(add_shape=False))
    ret = DataDict(**struct)

    if self.is_expanded():
        return self

    ishp = self._inner_shapes()
    size = max([int(np.prod(s)) for s in ishp.values()])

    for k, v in self.data_items():
        reps = size // np.prod(ishp[k])
        if reps > 1:
            ret[k]['values'] = \
                self[k]['values'].repeat(reps, axis=0).reshape(-1)
        else:
            ret[k]['values'] = self[k]['values'].reshape(-1)

    return ret

is_expandable()

Determine if the DataDict can be expanded.

Expansion flattens all nested data values to a 1D array. For doing so, we require that all data fields that have nested/inner dimensions (i.e, inside the records level) shape the inner shape. In other words, all data fields must be of shape (N,) or (N, (shape)), where shape is common to all that have a shape not equal to (N,).

Returns:

Type Description
bool

True if expandable. False otherwise.

Source code in labcore/data/datadict.py
def is_expandable(self) -> bool:
    """
    Determine if the DataDict can be expanded.

    Expansion flattens all nested data values to a 1D array. For doing so,
    we require that all data fields that have nested/inner dimensions (i.e,
    inside the `records` level) shape the inner shape.
    In other words, all data fields must be of shape (N,) or (N, (shape)),
    where shape is common to all that have a shape not equal to (N,).

    :return: ``True`` if expandable. ``False`` otherwise.
    """
    shp = self._inner_shapes()
    if len(set(shp.values())) == 1:
        return True
    elif len(set(shp.values())) == 2 and tuple() in set(shp.values()):
        return True
    else:
        return False

is_expanded()

Determine if the DataDict is expanded.

Returns:

Type Description
bool

True if expanded. False if not.

Source code in labcore/data/datadict.py
def is_expanded(self) -> bool:
    """
    Determine if the DataDict is expanded.

    :return: ``True`` if expanded. ``False`` if not.
    """
    ishp = self._inner_shapes()
    if set(ishp.values()) == {tuple()}:
        return True
    else:
        return False

nrecords()

Gets the number of records in the dataset.

Returns:

Type Description
Optional[int]

The number of records in the dataset.

Source code in labcore/data/datadict.py
def nrecords(self) -> Optional[int]:
    """
    Gets the number of records in the dataset.

    :return: The number of records in the dataset.
    """
    self.validate()
    for _, v in self.data_items():
        return len(v['values'])
    return None

remove_invalid_entries()

Remove all rows that are None or np.nan in all dependents.

Returns:

Type Description
DataDict

The cleaned DataDict.

Source code in labcore/data/datadict.py
def remove_invalid_entries(self) -> 'DataDict':
    """
    Remove all rows that are ``None`` or ``np.nan`` in *all* dependents.

    :return: The cleaned DataDict.
    """
    ishp = self._inner_shapes()
    idxs = []

    # collect rows that are completely invalid
    for d in self.dependents():

        #  need to discriminate whether there are nested dims or not
        if len(ishp[d]) == 0:
            rows = self.data_vals(d)
        else:
            datavals = self.data_vals(d)
            rows = datavals.reshape(-1, int(np.prod(ishp[d])))

        _idxs: np.ndarray = np.array([])

        # get indices of all rows that are fully None
        if len(ishp[d]) == 0:
            _newidxs = np.atleast_1d(np.asarray(rows is None)).nonzero()[0]
        else:
            _newidxs = np.atleast_1d(np.asarray(np.all(rows is None, axis=-1))).nonzero()[0]
        _idxs = np.append(_idxs, _newidxs)

        # get indices for all rows that are fully NaN. works only
        # for some dtypes, so except TypeErrors.
        try:
            if len(ishp[d]) == 0:
                _newidxs = np.where(np.isnan(rows))[0]
            else:
                _newidxs = np.where(np.all(np.isnan(rows), axis=-1))[0]
            _idxs = np.append(_idxs, _newidxs)
        except TypeError:
            pass

        idxs.append(_idxs)

    if len(idxs) > 0:
        remove_idxs = reduce(np.intersect1d,
                             tuple(np.array(idxs).astype(int)))
        for k, v in self.data_items():
            v['values'] = np.delete(v['values'], remove_idxs, axis=0)

    return self

sanitize()

Clean-up.

Beyond the tasks of the base class DataDictBase: * remove invalid entries as far as reasonable.

Returns:

Type Description
DataDict

sanitized DataDict.

Source code in labcore/data/datadict.py
def sanitize(self) -> "DataDict":
    """
    Clean-up.

    Beyond the tasks of the base class ``DataDictBase``:
        * remove invalid entries as far as reasonable.

    :return: sanitized DataDict.
    """
    ret = super().sanitize()
    return ret.remove_invalid_entries()

validate()

Check dataset validity.

Beyond the checks performed in the base class DataDictBase, check whether the number of records is the same for all data fields.

Returns:

Type Description
bool

True if valid.

Source code in labcore/data/datadict.py
def validate(self) -> bool:
    """
    Check dataset validity.

    Beyond the checks performed in the base class ``DataDictBase``,
    check whether the number of records is the same for all data fields.

    :return: ``True`` if valid.
    :raises: ``ValueError`` if invalid.
    """
    if super().validate():
        nvals = None
        nvalsrc = None
        msg = '\n'

        for n, v in self.data_items():
            if type(v['values']) not in [np.ndarray,
                                         np.ma.core.MaskedArray]:
                self[n]['values'] = np.array(v['values'])

            if nvals is None:
                nvals = len(v['values'])
                nvalsrc = n
            else:
                if len(v['values']) != nvals:
                    msg += " * '{}' has length {}, but have found {} in " \
                           "'{}'\n".format(
                        n, len(v['values']), nvals, nvalsrc)

        if msg != '\n':
            raise ValueError(msg)

    return True

DataDictBase

Bases: dict

Simple data storage class that is based on a regular dictionary.

This base class does not make assumptions about the structure of the values. This is implemented in inheriting classes.

Source code in labcore/data/datadict.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
class DataDictBase(dict):
    """
    Simple data storage class that is based on a regular dictionary.

    This base class does not make assumptions about the structure of the
    values. This is implemented in inheriting classes.
    """

    def __init__(self, **kw: Any):
        super().__init__(self, **kw)
        self.d_ = DataDictBase._DataAccess(self) 

    def __eq__(self, other: object) -> bool:
        """Check for content equality of two datadicts."""
        if not isinstance(other, DataDictBase):
            return False
        else:
            return datasets_are_equal(self, other)

    def __repr__(self) -> str:
        ret = ""
        for i, dn in enumerate(self.dependents()):
            if i > 0:
                ret += "\n"
            ret += f"{self.label(dn)}: {self[dn]['values'].shape}"
            for ax in self.axes(dn):
                ret += f"\n  \u2319 {self.label(ax)}: {self[ax]['values'].shape}"
        return ret

    # Assignment and retrieval of data and meta data

    @staticmethod
    def _is_meta_key(key: str) -> bool:
        return is_meta_key(key)

    @staticmethod
    def _meta_key_to_name(key: str) -> str:
        return meta_key_to_name(key)

    @staticmethod
    def _meta_name_to_key(name: str) -> str:
        return meta_name_to_key(name)

    @staticmethod
    def to_records(**data: Any) -> Dict[str, np.ndarray]:
        """Convert data to records that can be added to the ``DataDict``.
        All data is converted to np.array, and reshaped such that the first dimension of all resulting
        arrays have the same length (chosen to be the smallest possible number
        that does not alter any shapes beyond adding a length-1 dimension as
        first dimension, if necessary).

        If a data field is given as ``None``, it will be converted to ``numpy.array([numpy.nan])``.

        :param data: keyword arguments for each data field followed by data.
        :returns: Dictionary with properly shaped data.
        """
        records: Dict[str, np.ndarray] = {}

        seqtypes = (np.ndarray, tuple, list)
        nantypes = (type(None), )

        for k, v in data.items():
            if isinstance(v, seqtypes):
                records[k] = np.array(v)
            elif isinstance(v, nantypes):
                records[k] = np.array([np.nan])
            else:
                records[k] = np.array([v])

        possible_nrecords = {}
        for k, v in records.items():
            possible_nrecords[k] = [1, v.shape[0]]

        commons = []
        for k, v in possible_nrecords.items():
            for n in v:
                if n in commons:
                    continue
                is_common = True
                for kk, vv in possible_nrecords.items():
                    if n not in vv:
                        is_common = False
                if is_common:
                    commons.append(n)

        nrecs = max(commons)

        for k, v in records.items():
            shp = v.shape
            if nrecs == 1 and shp[0] > 1:
                newshp = tuple([1] + list(shp))
                records[k] = v.reshape(newshp)
        return records

    def data_items(self) -> Iterator[Tuple[str, Dict[str, Any]]]:
        """
        Generator for data field items.

        Like dict.items(), but ignores meta data.

        :return: Generator yielding first the key of the data field and second its value.
        """
        for k, v in self.items():
            if not self._is_meta_key(k):
                yield k, v

    def meta_items(self, data: Union[str, None] = None,
                   clean_keys: bool = True) -> Iterator[Tuple[str, Dict[str, Any]]]:
        """
        Generator for meta items.

        Like dict.items(), but yields `only` meta entries.
        The keys returned do not contain the underscores used internally.

        :param data: If ``None`` iterate over global meta data.
                     If it's the name of a data field, iterate over the meta
                     information of that field.
        :param clean_keys: If `True`, remove the underscore pre/suffix.
        :return: Generator yielding first the key of the data field and second its value.

        """
        if data is None:
            for k, v in self.items():
                if self._is_meta_key(k):
                    if clean_keys:
                        n = self._meta_key_to_name(k)
                    else:
                        n = k
                    yield n, v

        else:
            for k, v in self[data].items():
                if self._is_meta_key(k):
                    if clean_keys:
                        n = self._meta_key_to_name(k)
                    else:
                        n = k
                    yield n, v

    def data_vals(self, key: str) -> np.ndarray:
        """
        Return the data values of field ``key``.

        Equivalent to ``DataDict['key'].values``.

        :param key: Name of the data field.
        :return: Values of the data field.
        """
        if self._is_meta_key(key):
            raise ValueError(f"{key} is a meta key.")
        return self[key].get('values', np.array([]))

    def has_meta(self, key: str) -> bool:
        """Check whether meta field exists in the dataset.

        :return: ``True`` if it exists, ``False`` if it doesn't.
        """
        k = self._meta_name_to_key(key)
        if k in self:
            return True
        else:
            for key, field_dict in self.data_items():
                if k in field_dict:
                    return True
            return False

    def meta_val(self, key: str, data: Union[str, None] = None) -> Any:
        """
        Return the value of meta field ``key`` (given without underscore).

        :param key: Name of the meta field.
        :param data: ``None`` for global meta; name of data field for data meta.
        :return: The value of the meta information.
        """
        k = self._meta_name_to_key(key)
        if data is None:
            return self[k]
        else:
            return self[data][k]

    def add_meta(self, key: str, value: Any, data: Union[str, None] = None) -> None:
        """
        Add meta info to the dataset.

        If the key already exists, meta info will be overwritten.

        :param key: Name of the meta field (without underscores).
        :param value: Value of the meta information.
        :param data: If ``None``, meta will be global; otherwise assigned to
                     data field ``data``.

        """
        key = self._meta_name_to_key(key)
        if data is None:
            self[key] = value
        else:
            self[data][key] = value

    set_meta = add_meta

    def delete_meta(self, key: str, data: Union[str, None] = None) -> None:
        """
        Deletes specific meta data.

        :param key: Name of the meta field to remove.
        :param data: If ``None``, this affects global meta; otherwise remove
                     from data field ``data``.

        """
        key = self._meta_name_to_key(key)
        if data is None:
            del self[key]
        else:
            del self[data][key]

    def clear_meta(self, data: Union[str, None] = None) -> None:
        """
        Deletes all meta data.

        :param data: If not ``None``, delete all meta only from specified data field ``data``.
                     Else, deletes all top-level meta, as well as meta for all data fields.

        """
        if data is None:
            meta_list = [k for k, _ in self.meta_items()]
            for m in meta_list:
                self.delete_meta(m)

            for d, _ in self.data_items():
                data_meta_list = [k for k, _ in self.meta_items(d)]
                for m in data_meta_list:
                    self.delete_meta(m, d)

        else:
            data_meta_list = [m for m, _ in self.meta_items(data)]
            for m in data_meta_list:
                self.delete_meta(m, data)

    def extract(self: T, data: List[str], include_meta: bool = True,
                copy: bool = True, sanitize: bool = True) -> T:
        """
        Extract data from a dataset.

        Return a new datadict with all fields specified in ``data`` included.
        Will also take any axes fields along that have not been explicitly
        specified. Will return empty if ``data`` consists of only axes fields.

        :param data: Data field or list of data fields to be extracted.
        :param include_meta: If ``True``, include the global meta data.
                             data meta will always be included.
        :param copy: If ``True``, data fields will be `deep copies <https://docs.python.org/3/library/copy.html>`__
                     of the original.
        :param sanitize: If ``True``, will run DataDictBase.sanitize before
                         returning.
        :return: New DataDictBase containing only requested fields.
        """
        if isinstance(data, str):
            data = [data]
        else:
            data = data.copy()

        # include all the axes used by the data.
        for d in data:
            for a in self.axes(d):
                if a not in data:
                    data.append(a)

        ret = self.__class__()
        for d in data:
            if copy:
                ret[d] = cp.deepcopy(self[d])
            else:
                ret[d] = self[d]

        if include_meta:
            for k, v in self.meta_items():
                if copy:
                    ret.add_meta(k, cp.deepcopy(v))
                else:
                    ret.add_meta(k, v)

        if sanitize:
            ret = ret.sanitize()

        ret.validate()
        return ret

    # info about structure

    @staticmethod
    def same_structure(*data: T,
                       check_shape: bool = False) -> bool:
        """
        Check if all supplied DataDicts share the same data structure
        (i.e., dependents and axes).

        Ignores meta data and values. Checks also for matching shapes if
        `check_shape` is `True`.

        :param data: The data sets to compare.
        :param check_shape: Whether to include shape check in the comparison.
        :return: ``True`` if the structure matches for all, else ``False``.
        """
        if len(data) < 2:
            return True

        def empty_structure(d: T) -> T:
            s = misc.unwrap_optional(d.structure(include_meta=False, add_shape=check_shape))
            for k, v in s.data_items():
                if 'values' in v:
                    del s[k]['values']
            return s

        s0 = empty_structure(data[0])
        for d in data[1:]:
            if d is None:
                return False
            if s0 != empty_structure(d):
                return False

        return True

    def structure(self: T, add_shape: bool = False,
                  include_meta: bool = True,
                  same_type: bool = False,
                  remove_data: Optional[List[str]] = None) -> Optional[T]:
        """
        Get the structure of the DataDict.

        Return the datadict without values (`value` omitted in the dict).

        :param add_shape: Deprecated -- ignored.
        :param include_meta: If `True`, include the meta information in
                             the returned dict.
        :param same_type: If `True`, return type will be the one of the
                          object this is called on. Else, DataDictBase.
        :param remove_data: any data fields listed will be removed from
                            the result, also when listed in any axes.

        :return: The DataDict containing the structure only. The exact type
                     is the same as the type of ``self``.

        """
        if add_shape:
            warnings.warn("'add_shape' is deprecated and will be ignored",
                          DeprecationWarning)
        add_shape = False

        if remove_data is None:
            remove_data = []

        if self.validate():
            s = self.__class__()
            for n, v in self.data_items():
                if n not in remove_data:
                    v2 = v.copy()
                    v2['values'] = []
                    s[n] = cp.deepcopy(v2)
                    if 'axes' in s[n]:
                        for r in remove_data:
                            if r in s[n]['axes']:
                                i = s[n]['axes'].index(r)
                                s[n]['axes'].pop(i)

            if include_meta:
                for n, v in self.meta_items():
                    s.add_meta(n, v)
            else:
                s.clear_meta()

            if same_type:
                s = self.__class__(**s)

            return s
        return None


    def nbytes(self, name: Optional[str]=None) -> Optional[int]:
        """Get the size of data.

        :param name: Name of the data field. if none, return size of 
            entire datadict.
        :return: size in bytes.
        """
        if self.validate():
            if name is None:
                return sum([v['values'].size * v['values'].itemsize 
                            for _, v in self.data_items()])
            else:
                return self.data_vals(name).size * self.data_vals(name).itemsize

        return None


    def label(self, name: str) -> Optional[str]:
        """
        Get the label for a data field. If no label is present returns the
        name of the data field as the label. If a unit is present, it will
        be appended at the end in brackets: "label (unit)".

        :param name: Name of the data field.
        :return: Labelled name.
        """
        if self.validate():
            if name not in self:
                raise ValueError("No field '{}' present.".format(name))

            if self[name]['label'] != '':
                n = self[name]['label']
            else:
                n = name

            if self[name]['unit'] != '':
                n += ' ({})'.format(self[name]['unit'])

            return n
        return None

    def axes_are_compatible(self) -> bool:
        """
        Check if all dependent data fields have the same axes.

        This includes axes order.

        :return: ``True`` or ``False``.
        """
        axes = []
        for i, d in enumerate(self.dependents()):
            if i == 0:
                axes = self.axes(d)
            else:
                if self.axes(d) != axes:
                    return False
        return True

    def axes(self, data: Union[Sequence[str], str, None] = None) -> List[str]:
        """
        Return a list of axes.

        :param data: if ``None``, return all axes present in the dataset,
                     otherwise only the axes of the dependent ``data``.
        :return: The list of axes.
        """
        lst = []
        if data is None:
            for k, v in self.data_items():
                if 'axes' in v:
                    for n in v['axes']:
                        if n not in lst and self[n].get('axes', []) == []:
                            lst.append(n)
        else:
            if isinstance(data, str):
                dataseq: Sequence[str] = (data,)
            else:
                dataseq = data
            for n in dataseq:
                if 'axes' not in self[n]:
                    continue
                for m in self[n]['axes']:
                    if m not in lst and self[m].get('axes', []) == []:
                        lst.append(m)

        return lst

    def dependents(self) -> List[str]:
        """
        Get all dependents in the dataset.

        :return: A list of the names of dependents.
        """
        ret = []
        for n, v in self.data_items():
            if len(v.get('axes', [])) != 0:
                ret.append(n)
        return ret

    def shapes(self) -> Dict[str, Tuple[int, ...]]:
        """
        Get the shapes of all data fields.

        :return: A dictionary of the form ``{key : shape}``, where shape is the
                 np.shape-tuple of the data with name ``key``.

        """
        shapes = {}
        for k, v in self.data_items():
            shapes[k] = np.array(self.data_vals(k)).shape

        return shapes

    # validation and sanitizing

    def validate(self) -> bool:
        """
        Check the validity of the dataset.

        Checks performed:
            * All axes specified with dependents must exist as data fields.

        Other tasks performed:
            * ``unit`` keys are created if omitted.
            * ``label`` keys are created if omitted.
            * ``shape`` meta information is updated with the correct values
              (only if present already).

        :return: ``True`` if valid, ``False`` if invalid.
        :raises: ``ValueError`` if invalid.
        """
        self._update_data_access()

        msg = '\n'
        for n, v in self.data_items():

            if 'axes' in v:
                for na in v['axes']:
                    if na not in self:
                        msg += " * '{}' has axis '{}', but no field " \
                               "with name '{}' registered.\n".format(
                            n, na, na)
                    elif na not in self.axes():
                        msg += " * '{}' has axis '{}', but no independent " \
                               "with name '{}' registered.\n".format(
                            n, na, na)
            else:
                v['axes'] = []

            if 'unit' not in v:
                v['unit'] = ''

            if 'label' not in v:
                v['label'] = ''

            vals = v.get('values', [])
            if type(vals) not in [np.ndarray, np.ma.core.MaskedArray]:
                vals = np.array(vals)
            v['values'] = vals

        if msg != '\n':
            raise ValueError(msg)

        return True

    def remove_unused_axes(self: T) -> T:
        """
        Removes axes not associated with dependents.

        :return: Cleaned dataset.
        """
        dependents = self.dependents()
        unused = []

        for n, v in self.data_items():
            used = False
            if n not in dependents:
                for m in dependents:
                    if n in self[m]['axes']:
                        used = True
            else:
                used = True
            if not used:
                unused.append(n)

        for u in unused:
            del self[u]

        return self

    def sanitize(self: T) -> T:
        """
        Clean-up tasks:
            * Removes unused axes.

        :return: Sanitized dataset.
        """
        return self.remove_unused_axes()

    # axes order tools

    def reorder_axes_indices(self, name: str,
                             **pos: int) -> Tuple[Tuple[int, ...], List[str]]:
        """
        Get the indices that can reorder axes in a given way.

        :param name: Name of the data field of which we want to reorder axes.
        :param pos: New axes position in the form ``axis_name = new_position``.
                    Non-specified axes positions are adjusted automatically.
        :return: The tuple of new indices, and the list of axes names in the
                 new order.

        """
        axlist = self.axes(name)
        order = misc.reorder_indices_from_new_positions(axlist, **pos)
        return order, [axlist[i] for i in order]

    def reorder_axes(self: T, data_names: Union[str, Sequence[str], None] = None,
                     **pos: int) -> T:
        """
        Reorder data axes.

        :param data_names: Data name(s) for which to reorder the axes.
                           If None, apply to all dependents.
        :param pos: New axes position in the form ``axis_name = new_position``.
                    Non-specified axes positions are adjusted automatically.

        :return: Dataset with re-ordered axes (not a copy)
        """
        if data_names is None:
            data_names = self.dependents()
        if isinstance(data_names, str):
            data_names = [data_names]

        for n in data_names:
            neworder, newaxes = self.reorder_axes_indices(n, **pos)
            self[n]['axes'] = newaxes

        self.validate()
        return self

    def copy(self: T) -> T:
        """
        Make a copy of the dataset.

        :return: A copy of the dataset.
        """
        logger.debug(f'copying a dataset with size {self.nbytes()}')
        ret = self.structure()
        assert ret is not None

        for k, v in self.data_items():
            ret[k]['values'] = self.data_vals(k).copy()
        return ret

    def astype(self: T, dtype: np.dtype) -> T:
        """
        Convert all data values to given dtype.

        :param dtype: np dtype.
        :return: Dataset, with values as given type (not a copy)
        """
        for k, v in self.data_items():
            vals = v['values']
            if type(v['values']) not in [np.ndarray, np.ma.core.MaskedArray]:
                vals = np.array(v['values'])
            self[k]['values'] = vals.astype(dtype)

        return self

    def mask_invalid(self: T) -> T:
        """
        Mask all invalid data in all values.
        :return: Copy of the dataset with invalid entries (nan/None) masked.
        """
        for d, _ in self.data_items():
            arr = self.data_vals(d)
            vals = np.ma.masked_where(num.is_invalid(arr), arr, copy=True)
            try:
                vals.fill_value = np.nan
            except TypeError:
                vals.fill_value = -9999
            self[d]['values'] = vals

        return self

    class _DataAccess:
        def __init__(self, parent: "DataDictBase") -> None:
            self._parent = parent

        def __getattribute__(self, __name: str) -> Any:
            parent = super(DataDictBase._DataAccess, self).__getattribute__('_parent')

            if __name in [k for k, _ in parent.data_items()]:
                return parent.data_vals(__name)
            else:
                return super(DataDictBase._DataAccess, self).__getattribute__(__name)

        def __setattr__(self, __name: str, __value: Any) -> None:
            # this check: make sure that we can set the parent correctly in the
            # constructor.
            if hasattr(self, '_parent'):
                if __name in [k for k, _ in self._parent.data_items()]:
                    self._parent[__name]['values'] = __value

                # still allow setting random things, essentially.
                else:
                    super(DataDictBase._DataAccess, self).__setattr__(__name, __value)
            else:
                super(DataDictBase._DataAccess, self).__setattr__(__name, __value)

    def _update_data_access(self) -> None:
        for d, i in self.data_items():
            self.d_.__dict__[d] = None

__eq__(other)

Check for content equality of two datadicts.

Source code in labcore/data/datadict.py
def __eq__(self, other: object) -> bool:
    """Check for content equality of two datadicts."""
    if not isinstance(other, DataDictBase):
        return False
    else:
        return datasets_are_equal(self, other)

add_meta(key, value, data=None)

Add meta info to the dataset.

If the key already exists, meta info will be overwritten.

Parameters:

Name Type Description Default
key str

Name of the meta field (without underscores).

required
value Any

Value of the meta information.

required
data Union[str, None]

If None, meta will be global; otherwise assigned to data field data.

None
Source code in labcore/data/datadict.py
def add_meta(self, key: str, value: Any, data: Union[str, None] = None) -> None:
    """
    Add meta info to the dataset.

    If the key already exists, meta info will be overwritten.

    :param key: Name of the meta field (without underscores).
    :param value: Value of the meta information.
    :param data: If ``None``, meta will be global; otherwise assigned to
                 data field ``data``.

    """
    key = self._meta_name_to_key(key)
    if data is None:
        self[key] = value
    else:
        self[data][key] = value

astype(dtype)

Convert all data values to given dtype.

Parameters:

Name Type Description Default
dtype dtype

np dtype.

required

Returns:

Type Description
T

Dataset, with values as given type (not a copy)

Source code in labcore/data/datadict.py
def astype(self: T, dtype: np.dtype) -> T:
    """
    Convert all data values to given dtype.

    :param dtype: np dtype.
    :return: Dataset, with values as given type (not a copy)
    """
    for k, v in self.data_items():
        vals = v['values']
        if type(v['values']) not in [np.ndarray, np.ma.core.MaskedArray]:
            vals = np.array(v['values'])
        self[k]['values'] = vals.astype(dtype)

    return self

axes(data=None)

Return a list of axes.

Parameters:

Name Type Description Default
data Union[Sequence[str], str, None]

if None, return all axes present in the dataset, otherwise only the axes of the dependent data.

None

Returns:

Type Description
List[str]

The list of axes.

Source code in labcore/data/datadict.py
def axes(self, data: Union[Sequence[str], str, None] = None) -> List[str]:
    """
    Return a list of axes.

    :param data: if ``None``, return all axes present in the dataset,
                 otherwise only the axes of the dependent ``data``.
    :return: The list of axes.
    """
    lst = []
    if data is None:
        for k, v in self.data_items():
            if 'axes' in v:
                for n in v['axes']:
                    if n not in lst and self[n].get('axes', []) == []:
                        lst.append(n)
    else:
        if isinstance(data, str):
            dataseq: Sequence[str] = (data,)
        else:
            dataseq = data
        for n in dataseq:
            if 'axes' not in self[n]:
                continue
            for m in self[n]['axes']:
                if m not in lst and self[m].get('axes', []) == []:
                    lst.append(m)

    return lst

axes_are_compatible()

Check if all dependent data fields have the same axes.

This includes axes order.

Returns:

Type Description
bool

True or False.

Source code in labcore/data/datadict.py
def axes_are_compatible(self) -> bool:
    """
    Check if all dependent data fields have the same axes.

    This includes axes order.

    :return: ``True`` or ``False``.
    """
    axes = []
    for i, d in enumerate(self.dependents()):
        if i == 0:
            axes = self.axes(d)
        else:
            if self.axes(d) != axes:
                return False
    return True

clear_meta(data=None)

Deletes all meta data.

Parameters:

Name Type Description Default
data Union[str, None]

If not None, delete all meta only from specified data field data. Else, deletes all top-level meta, as well as meta for all data fields.

None
Source code in labcore/data/datadict.py
def clear_meta(self, data: Union[str, None] = None) -> None:
    """
    Deletes all meta data.

    :param data: If not ``None``, delete all meta only from specified data field ``data``.
                 Else, deletes all top-level meta, as well as meta for all data fields.

    """
    if data is None:
        meta_list = [k for k, _ in self.meta_items()]
        for m in meta_list:
            self.delete_meta(m)

        for d, _ in self.data_items():
            data_meta_list = [k for k, _ in self.meta_items(d)]
            for m in data_meta_list:
                self.delete_meta(m, d)

    else:
        data_meta_list = [m for m, _ in self.meta_items(data)]
        for m in data_meta_list:
            self.delete_meta(m, data)

copy()

Make a copy of the dataset.

Returns:

Type Description
T

A copy of the dataset.

Source code in labcore/data/datadict.py
def copy(self: T) -> T:
    """
    Make a copy of the dataset.

    :return: A copy of the dataset.
    """
    logger.debug(f'copying a dataset with size {self.nbytes()}')
    ret = self.structure()
    assert ret is not None

    for k, v in self.data_items():
        ret[k]['values'] = self.data_vals(k).copy()
    return ret

data_items()

Generator for data field items.

Like dict.items(), but ignores meta data.

Returns:

Type Description
Iterator[Tuple[str, Dict[str, Any]]]

Generator yielding first the key of the data field and second its value.

Source code in labcore/data/datadict.py
def data_items(self) -> Iterator[Tuple[str, Dict[str, Any]]]:
    """
    Generator for data field items.

    Like dict.items(), but ignores meta data.

    :return: Generator yielding first the key of the data field and second its value.
    """
    for k, v in self.items():
        if not self._is_meta_key(k):
            yield k, v

data_vals(key)

Return the data values of field key.

Equivalent to DataDict['key'].values.

Parameters:

Name Type Description Default
key str

Name of the data field.

required

Returns:

Type Description
ndarray

Values of the data field.

Source code in labcore/data/datadict.py
def data_vals(self, key: str) -> np.ndarray:
    """
    Return the data values of field ``key``.

    Equivalent to ``DataDict['key'].values``.

    :param key: Name of the data field.
    :return: Values of the data field.
    """
    if self._is_meta_key(key):
        raise ValueError(f"{key} is a meta key.")
    return self[key].get('values', np.array([]))

delete_meta(key, data=None)

Deletes specific meta data.

Parameters:

Name Type Description Default
key str

Name of the meta field to remove.

required
data Union[str, None]

If None, this affects global meta; otherwise remove from data field data.

None
Source code in labcore/data/datadict.py
def delete_meta(self, key: str, data: Union[str, None] = None) -> None:
    """
    Deletes specific meta data.

    :param key: Name of the meta field to remove.
    :param data: If ``None``, this affects global meta; otherwise remove
                 from data field ``data``.

    """
    key = self._meta_name_to_key(key)
    if data is None:
        del self[key]
    else:
        del self[data][key]

dependents()

Get all dependents in the dataset.

Returns:

Type Description
List[str]

A list of the names of dependents.

Source code in labcore/data/datadict.py
def dependents(self) -> List[str]:
    """
    Get all dependents in the dataset.

    :return: A list of the names of dependents.
    """
    ret = []
    for n, v in self.data_items():
        if len(v.get('axes', [])) != 0:
            ret.append(n)
    return ret

extract(data, include_meta=True, copy=True, sanitize=True)

Extract data from a dataset.

Return a new datadict with all fields specified in data included. Will also take any axes fields along that have not been explicitly specified. Will return empty if data consists of only axes fields.

Parameters:

Name Type Description Default
data List[str]

Data field or list of data fields to be extracted.

required
include_meta bool

If True, include the global meta data. data meta will always be included.

True
copy bool

If True, data fields will be deep copies <https://docs.python.org/3/library/copy.html>__ of the original.

True
sanitize bool

If True, will run DataDictBase.sanitize before returning.

True

Returns:

Type Description
T

New DataDictBase containing only requested fields.

Source code in labcore/data/datadict.py
def extract(self: T, data: List[str], include_meta: bool = True,
            copy: bool = True, sanitize: bool = True) -> T:
    """
    Extract data from a dataset.

    Return a new datadict with all fields specified in ``data`` included.
    Will also take any axes fields along that have not been explicitly
    specified. Will return empty if ``data`` consists of only axes fields.

    :param data: Data field or list of data fields to be extracted.
    :param include_meta: If ``True``, include the global meta data.
                         data meta will always be included.
    :param copy: If ``True``, data fields will be `deep copies <https://docs.python.org/3/library/copy.html>`__
                 of the original.
    :param sanitize: If ``True``, will run DataDictBase.sanitize before
                     returning.
    :return: New DataDictBase containing only requested fields.
    """
    if isinstance(data, str):
        data = [data]
    else:
        data = data.copy()

    # include all the axes used by the data.
    for d in data:
        for a in self.axes(d):
            if a not in data:
                data.append(a)

    ret = self.__class__()
    for d in data:
        if copy:
            ret[d] = cp.deepcopy(self[d])
        else:
            ret[d] = self[d]

    if include_meta:
        for k, v in self.meta_items():
            if copy:
                ret.add_meta(k, cp.deepcopy(v))
            else:
                ret.add_meta(k, v)

    if sanitize:
        ret = ret.sanitize()

    ret.validate()
    return ret

has_meta(key)

Check whether meta field exists in the dataset.

Returns:

Type Description
bool

True if it exists, False if it doesn't.

Source code in labcore/data/datadict.py
def has_meta(self, key: str) -> bool:
    """Check whether meta field exists in the dataset.

    :return: ``True`` if it exists, ``False`` if it doesn't.
    """
    k = self._meta_name_to_key(key)
    if k in self:
        return True
    else:
        for key, field_dict in self.data_items():
            if k in field_dict:
                return True
        return False

label(name)

Get the label for a data field. If no label is present returns the name of the data field as the label. If a unit is present, it will be appended at the end in brackets: "label (unit)".

Parameters:

Name Type Description Default
name str

Name of the data field.

required

Returns:

Type Description
Optional[str]

Labelled name.

Source code in labcore/data/datadict.py
def label(self, name: str) -> Optional[str]:
    """
    Get the label for a data field. If no label is present returns the
    name of the data field as the label. If a unit is present, it will
    be appended at the end in brackets: "label (unit)".

    :param name: Name of the data field.
    :return: Labelled name.
    """
    if self.validate():
        if name not in self:
            raise ValueError("No field '{}' present.".format(name))

        if self[name]['label'] != '':
            n = self[name]['label']
        else:
            n = name

        if self[name]['unit'] != '':
            n += ' ({})'.format(self[name]['unit'])

        return n
    return None

mask_invalid()

Mask all invalid data in all values.

Returns:

Type Description
T

Copy of the dataset with invalid entries (nan/None) masked.

Source code in labcore/data/datadict.py
def mask_invalid(self: T) -> T:
    """
    Mask all invalid data in all values.
    :return: Copy of the dataset with invalid entries (nan/None) masked.
    """
    for d, _ in self.data_items():
        arr = self.data_vals(d)
        vals = np.ma.masked_where(num.is_invalid(arr), arr, copy=True)
        try:
            vals.fill_value = np.nan
        except TypeError:
            vals.fill_value = -9999
        self[d]['values'] = vals

    return self

meta_items(data=None, clean_keys=True)

Generator for meta items.

Like dict.items(), but yields only meta entries. The keys returned do not contain the underscores used internally.

Parameters:

Name Type Description Default
data Union[str, None]

If None iterate over global meta data. If it's the name of a data field, iterate over the meta information of that field.

None
clean_keys bool

If True, remove the underscore pre/suffix.

True

Returns:

Type Description
Iterator[Tuple[str, Dict[str, Any]]]

Generator yielding first the key of the data field and second its value.

Source code in labcore/data/datadict.py
def meta_items(self, data: Union[str, None] = None,
               clean_keys: bool = True) -> Iterator[Tuple[str, Dict[str, Any]]]:
    """
    Generator for meta items.

    Like dict.items(), but yields `only` meta entries.
    The keys returned do not contain the underscores used internally.

    :param data: If ``None`` iterate over global meta data.
                 If it's the name of a data field, iterate over the meta
                 information of that field.
    :param clean_keys: If `True`, remove the underscore pre/suffix.
    :return: Generator yielding first the key of the data field and second its value.

    """
    if data is None:
        for k, v in self.items():
            if self._is_meta_key(k):
                if clean_keys:
                    n = self._meta_key_to_name(k)
                else:
                    n = k
                yield n, v

    else:
        for k, v in self[data].items():
            if self._is_meta_key(k):
                if clean_keys:
                    n = self._meta_key_to_name(k)
                else:
                    n = k
                yield n, v

meta_val(key, data=None)

Return the value of meta field key (given without underscore).

Parameters:

Name Type Description Default
key str

Name of the meta field.

required
data Union[str, None]

None for global meta; name of data field for data meta.

None

Returns:

Type Description
Any

The value of the meta information.

Source code in labcore/data/datadict.py
def meta_val(self, key: str, data: Union[str, None] = None) -> Any:
    """
    Return the value of meta field ``key`` (given without underscore).

    :param key: Name of the meta field.
    :param data: ``None`` for global meta; name of data field for data meta.
    :return: The value of the meta information.
    """
    k = self._meta_name_to_key(key)
    if data is None:
        return self[k]
    else:
        return self[data][k]

nbytes(name=None)

Get the size of data.

Parameters:

Name Type Description Default
name Optional[str]

Name of the data field. if none, return size of entire datadict.

None

Returns:

Type Description
Optional[int]

size in bytes.

Source code in labcore/data/datadict.py
def nbytes(self, name: Optional[str]=None) -> Optional[int]:
    """Get the size of data.

    :param name: Name of the data field. if none, return size of 
        entire datadict.
    :return: size in bytes.
    """
    if self.validate():
        if name is None:
            return sum([v['values'].size * v['values'].itemsize 
                        for _, v in self.data_items()])
        else:
            return self.data_vals(name).size * self.data_vals(name).itemsize

    return None

remove_unused_axes()

Removes axes not associated with dependents.

Returns:

Type Description
T

Cleaned dataset.

Source code in labcore/data/datadict.py
def remove_unused_axes(self: T) -> T:
    """
    Removes axes not associated with dependents.

    :return: Cleaned dataset.
    """
    dependents = self.dependents()
    unused = []

    for n, v in self.data_items():
        used = False
        if n not in dependents:
            for m in dependents:
                if n in self[m]['axes']:
                    used = True
        else:
            used = True
        if not used:
            unused.append(n)

    for u in unused:
        del self[u]

    return self

reorder_axes(data_names=None, **pos)

Reorder data axes.

Parameters:

Name Type Description Default
data_names Union[str, Sequence[str], None]

Data name(s) for which to reorder the axes. If None, apply to all dependents.

None
pos int

New axes position in the form axis_name = new_position. Non-specified axes positions are adjusted automatically.

{}

Returns:

Type Description
T

Dataset with re-ordered axes (not a copy)

Source code in labcore/data/datadict.py
def reorder_axes(self: T, data_names: Union[str, Sequence[str], None] = None,
                 **pos: int) -> T:
    """
    Reorder data axes.

    :param data_names: Data name(s) for which to reorder the axes.
                       If None, apply to all dependents.
    :param pos: New axes position in the form ``axis_name = new_position``.
                Non-specified axes positions are adjusted automatically.

    :return: Dataset with re-ordered axes (not a copy)
    """
    if data_names is None:
        data_names = self.dependents()
    if isinstance(data_names, str):
        data_names = [data_names]

    for n in data_names:
        neworder, newaxes = self.reorder_axes_indices(n, **pos)
        self[n]['axes'] = newaxes

    self.validate()
    return self

reorder_axes_indices(name, **pos)

Get the indices that can reorder axes in a given way.

Parameters:

Name Type Description Default
name str

Name of the data field of which we want to reorder axes.

required
pos int

New axes position in the form axis_name = new_position. Non-specified axes positions are adjusted automatically.

{}

Returns:

Type Description
Tuple[Tuple[int, ...], List[str]]

The tuple of new indices, and the list of axes names in the new order.

Source code in labcore/data/datadict.py
def reorder_axes_indices(self, name: str,
                         **pos: int) -> Tuple[Tuple[int, ...], List[str]]:
    """
    Get the indices that can reorder axes in a given way.

    :param name: Name of the data field of which we want to reorder axes.
    :param pos: New axes position in the form ``axis_name = new_position``.
                Non-specified axes positions are adjusted automatically.
    :return: The tuple of new indices, and the list of axes names in the
             new order.

    """
    axlist = self.axes(name)
    order = misc.reorder_indices_from_new_positions(axlist, **pos)
    return order, [axlist[i] for i in order]

same_structure(*data, check_shape=False) staticmethod

Check if all supplied DataDicts share the same data structure (i.e., dependents and axes).

Ignores meta data and values. Checks also for matching shapes if check_shape is True.

Parameters:

Name Type Description Default
data T

The data sets to compare.

()
check_shape bool

Whether to include shape check in the comparison.

False

Returns:

Type Description
bool

True if the structure matches for all, else False.

Source code in labcore/data/datadict.py
@staticmethod
def same_structure(*data: T,
                   check_shape: bool = False) -> bool:
    """
    Check if all supplied DataDicts share the same data structure
    (i.e., dependents and axes).

    Ignores meta data and values. Checks also for matching shapes if
    `check_shape` is `True`.

    :param data: The data sets to compare.
    :param check_shape: Whether to include shape check in the comparison.
    :return: ``True`` if the structure matches for all, else ``False``.
    """
    if len(data) < 2:
        return True

    def empty_structure(d: T) -> T:
        s = misc.unwrap_optional(d.structure(include_meta=False, add_shape=check_shape))
        for k, v in s.data_items():
            if 'values' in v:
                del s[k]['values']
        return s

    s0 = empty_structure(data[0])
    for d in data[1:]:
        if d is None:
            return False
        if s0 != empty_structure(d):
            return False

    return True

sanitize()

Clean-up tasks: * Removes unused axes.

Returns:

Type Description
T

Sanitized dataset.

Source code in labcore/data/datadict.py
def sanitize(self: T) -> T:
    """
    Clean-up tasks:
        * Removes unused axes.

    :return: Sanitized dataset.
    """
    return self.remove_unused_axes()

shapes()

Get the shapes of all data fields.

Returns:

Type Description
Dict[str, Tuple[int, ...]]

A dictionary of the form {key : shape}, where shape is the np.shape-tuple of the data with name key.

Source code in labcore/data/datadict.py
def shapes(self) -> Dict[str, Tuple[int, ...]]:
    """
    Get the shapes of all data fields.

    :return: A dictionary of the form ``{key : shape}``, where shape is the
             np.shape-tuple of the data with name ``key``.

    """
    shapes = {}
    for k, v in self.data_items():
        shapes[k] = np.array(self.data_vals(k)).shape

    return shapes

structure(add_shape=False, include_meta=True, same_type=False, remove_data=None)

Get the structure of the DataDict.

Return the datadict without values (value omitted in the dict).

Parameters:

Name Type Description Default
add_shape bool

Deprecated -- ignored.

False
include_meta bool

If True, include the meta information in the returned dict.

True
same_type bool

If True, return type will be the one of the object this is called on. Else, DataDictBase.

False
remove_data Optional[List[str]]

any data fields listed will be removed from the result, also when listed in any axes.

None

Returns:

Type Description
Optional[T]

The DataDict containing the structure only. The exact type is the same as the type of self.

Source code in labcore/data/datadict.py
def structure(self: T, add_shape: bool = False,
              include_meta: bool = True,
              same_type: bool = False,
              remove_data: Optional[List[str]] = None) -> Optional[T]:
    """
    Get the structure of the DataDict.

    Return the datadict without values (`value` omitted in the dict).

    :param add_shape: Deprecated -- ignored.
    :param include_meta: If `True`, include the meta information in
                         the returned dict.
    :param same_type: If `True`, return type will be the one of the
                      object this is called on. Else, DataDictBase.
    :param remove_data: any data fields listed will be removed from
                        the result, also when listed in any axes.

    :return: The DataDict containing the structure only. The exact type
                 is the same as the type of ``self``.

    """
    if add_shape:
        warnings.warn("'add_shape' is deprecated and will be ignored",
                      DeprecationWarning)
    add_shape = False

    if remove_data is None:
        remove_data = []

    if self.validate():
        s = self.__class__()
        for n, v in self.data_items():
            if n not in remove_data:
                v2 = v.copy()
                v2['values'] = []
                s[n] = cp.deepcopy(v2)
                if 'axes' in s[n]:
                    for r in remove_data:
                        if r in s[n]['axes']:
                            i = s[n]['axes'].index(r)
                            s[n]['axes'].pop(i)

        if include_meta:
            for n, v in self.meta_items():
                s.add_meta(n, v)
        else:
            s.clear_meta()

        if same_type:
            s = self.__class__(**s)

        return s
    return None

to_records(**data) staticmethod

Convert data to records that can be added to the DataDict. All data is converted to np.array, and reshaped such that the first dimension of all resulting arrays have the same length (chosen to be the smallest possible number that does not alter any shapes beyond adding a length-1 dimension as first dimension, if necessary).

If a data field is given as None, it will be converted to numpy.array([numpy.nan]).

Parameters:

Name Type Description Default
data Any

keyword arguments for each data field followed by data.

{}

Returns:

Type Description
Dict[str, ndarray]

Dictionary with properly shaped data.

Source code in labcore/data/datadict.py
@staticmethod
def to_records(**data: Any) -> Dict[str, np.ndarray]:
    """Convert data to records that can be added to the ``DataDict``.
    All data is converted to np.array, and reshaped such that the first dimension of all resulting
    arrays have the same length (chosen to be the smallest possible number
    that does not alter any shapes beyond adding a length-1 dimension as
    first dimension, if necessary).

    If a data field is given as ``None``, it will be converted to ``numpy.array([numpy.nan])``.

    :param data: keyword arguments for each data field followed by data.
    :returns: Dictionary with properly shaped data.
    """
    records: Dict[str, np.ndarray] = {}

    seqtypes = (np.ndarray, tuple, list)
    nantypes = (type(None), )

    for k, v in data.items():
        if isinstance(v, seqtypes):
            records[k] = np.array(v)
        elif isinstance(v, nantypes):
            records[k] = np.array([np.nan])
        else:
            records[k] = np.array([v])

    possible_nrecords = {}
    for k, v in records.items():
        possible_nrecords[k] = [1, v.shape[0]]

    commons = []
    for k, v in possible_nrecords.items():
        for n in v:
            if n in commons:
                continue
            is_common = True
            for kk, vv in possible_nrecords.items():
                if n not in vv:
                    is_common = False
            if is_common:
                commons.append(n)

    nrecs = max(commons)

    for k, v in records.items():
        shp = v.shape
        if nrecs == 1 and shp[0] > 1:
            newshp = tuple([1] + list(shp))
            records[k] = v.reshape(newshp)
    return records

validate()

Check the validity of the dataset.

Checks performed: * All axes specified with dependents must exist as data fields.

Other tasks performed: * unit keys are created if omitted. * label keys are created if omitted. * shape meta information is updated with the correct values (only if present already).

Returns:

Type Description
bool

True if valid, False if invalid.

Source code in labcore/data/datadict.py
def validate(self) -> bool:
    """
    Check the validity of the dataset.

    Checks performed:
        * All axes specified with dependents must exist as data fields.

    Other tasks performed:
        * ``unit`` keys are created if omitted.
        * ``label`` keys are created if omitted.
        * ``shape`` meta information is updated with the correct values
          (only if present already).

    :return: ``True`` if valid, ``False`` if invalid.
    :raises: ``ValueError`` if invalid.
    """
    self._update_data_access()

    msg = '\n'
    for n, v in self.data_items():

        if 'axes' in v:
            for na in v['axes']:
                if na not in self:
                    msg += " * '{}' has axis '{}', but no field " \
                           "with name '{}' registered.\n".format(
                        n, na, na)
                elif na not in self.axes():
                    msg += " * '{}' has axis '{}', but no independent " \
                           "with name '{}' registered.\n".format(
                        n, na, na)
        else:
            v['axes'] = []

        if 'unit' not in v:
            v['unit'] = ''

        if 'label' not in v:
            v['label'] = ''

        vals = v.get('values', [])
        if type(vals) not in [np.ndarray, np.ma.core.MaskedArray]:
            vals = np.array(vals)
        v['values'] = vals

    if msg != '\n':
        raise ValueError(msg)

    return True

MeshgridDataDict

Bases: DataDictBase

Implementation of DataDictBase meant to be used for when the axes form a grid on which the dependent values reside.

It enforces that all dependents have the same axes and all shapes need to be identical.

Source code in labcore/data/datadict.py
class MeshgridDataDict(DataDictBase):
    """
    Implementation of DataDictBase meant to be used for when the axes form
    a grid on which the dependent values reside.

    It enforces that all dependents have the same axes and all shapes need to be identical.
    """

    def shape(self) -> Union[None, Tuple[int, ...]]:
        """
        Return the shape of the meshgrid.

        :returns: The shape as tuple. ``None`` if no data in the set.
        """
        for d, _ in self.data_items():
            return np.array(self.data_vals(d)).shape
        return None

    def validate(self) -> bool:
        """
        Validation of the dataset.

        Performs the following checks:
        * All dependents must have the same axes.
        * All shapes need to be identical.

        :return: ``True`` if valid.
        :raises: ``ValueError`` if invalid.
        """
        if not super().validate():
            return False

        msg = '\n'

        axes = None
        axessrc = ''
        for d in self.dependents():
            if axes is None:
                axes = self.axes(d)
            else:
                if axes != self.axes(d):
                    msg += f" * All dependents must have the same axes, but "
                    msg += f"{d} has {self.axes(d)} and {axessrc} has {axes}\n"

        shp = None
        shpsrc = ''

        data_items = dict(self.data_items())

        for n, v in data_items.items():
            if type(v['values']) not in [np.ndarray, np.ma.core.MaskedArray]:
                self[n]['values'] = np.array(v['values'])

            if shp is None:
                shp = v['values'].shape
                shpsrc = n
            else:
                if v['values'].shape != shp:
                    msg += f" * shapes need to match, but '{n}' has"
                    msg += f" {v['values'].shape}, "
                    msg += f"and '{shpsrc}' has {shp}.\n"

            if msg != '\n':
                raise ValueError(msg)

            if 'axes' in v:
                for axis_num, na in enumerate(v['axes']):
                    # check that the data of the axes matches its use
                    # if data present
                    axis_data = data_items[na]['values']

                    # for the data to be a valid meshgrid, we need to have an increase/decrease along each
                    # axis that contains data.
                    if axis_data.size > 0:
                        # if axis length is 1, then we cannot infer anything about grids yet

                        try:
                            if axis_data.shape[axis_num] > 1:
                                steps = np.unique(np.sign(np.diff(axis_data, axis=axis_num)))

                                # for incomplete data, there maybe nan steps -- we need to remove those, 
                                # doesn't mean anything is wrong.
                                steps = steps[~np.isnan(steps)]

                                if 0 in steps:
                                    msg += (f"Malformed data: {na} is expected to be {axis_num}th "
                                            "axis but has no variation along that axis.\n")
                                if steps.size > 1:
                                    msg += (f"Malformed data: axis {na} is not monotonous.\n")

                        # can happen if we have bad shapes. but that should already have been caught.
                        except IndexError:
                            pass

            if '__shape__' in v:
                v['__shape__'] = shp

            if msg != '\n':
                raise ValueError(msg)

        return True

    def reorder_axes(self, data_names: Union[str, Sequence[str], None] = None,
                     **pos: int) -> 'MeshgridDataDict':
        """
        Reorder the axes for all data.

        This includes transposing the data, since we're on a grid.

        :param data_names: Which dependents to include. if None are given,
                           all dependents are included.
        :param pos: New axes position in the form ``axis_name = new_position``.
                    non-specified axes positions are adjusted automatically.

        :return: Dataset with re-ordered axes.
        """
        if data_names is None:
            data_names = self.dependents()
        if isinstance(data_names, str):
            data_names = [data_names]

        transposed = []
        orders = {}
        orig_axes = {}
        for n in data_names:
            orders[n] = self.reorder_axes_indices(n, **pos)
            orig_axes[n] = self.axes(n).copy()

        for n in data_names:
            neworder, newaxes = orders[n]
            self[n]['axes'] = newaxes
            self[n]['values'] = self[n]['values'].transpose(neworder)
            for ax in orig_axes[n]:
                if ax not in transposed:
                    self[ax]['values'] = self[ax]['values'].transpose(neworder)
                    transposed.append(ax)

        self.validate()
        return self

    def mean(self, axis: str) -> 'MeshgridDataDict':
        """Take the mean over the given axis.

        :param axis: which axis to take the average over.
        :return: data, averaged over ``axis``.
        """
        return _mesh_mean(self, axis)

    def slice(self, **kwargs: Dict[str, Union[slice, int]]) -> 'MeshgridDataDict':
        """Return a N-d slice of the data.

        :param kwargs: slicing information in the format ``axis: spec``, where
            ``spec`` can be a ``slice`` object, or an integer (usual slicing 
            notation).
        :return: sliced data (as a copy)
        """
        return _mesh_slice(self, **kwargs)

    def squeeze(self) -> None:
        """Remove size-1 dimensions."""
        raise NotImplementedError

mean(axis)

Take the mean over the given axis.

Parameters:

Name Type Description Default
axis str

which axis to take the average over.

required

Returns:

Type Description
MeshgridDataDict

data, averaged over axis.

Source code in labcore/data/datadict.py
def mean(self, axis: str) -> 'MeshgridDataDict':
    """Take the mean over the given axis.

    :param axis: which axis to take the average over.
    :return: data, averaged over ``axis``.
    """
    return _mesh_mean(self, axis)

reorder_axes(data_names=None, **pos)

Reorder the axes for all data.

This includes transposing the data, since we're on a grid.

Parameters:

Name Type Description Default
data_names Union[str, Sequence[str], None]

Which dependents to include. if None are given, all dependents are included.

None
pos int

New axes position in the form axis_name = new_position. non-specified axes positions are adjusted automatically.

{}

Returns:

Type Description
MeshgridDataDict

Dataset with re-ordered axes.

Source code in labcore/data/datadict.py
def reorder_axes(self, data_names: Union[str, Sequence[str], None] = None,
                 **pos: int) -> 'MeshgridDataDict':
    """
    Reorder the axes for all data.

    This includes transposing the data, since we're on a grid.

    :param data_names: Which dependents to include. if None are given,
                       all dependents are included.
    :param pos: New axes position in the form ``axis_name = new_position``.
                non-specified axes positions are adjusted automatically.

    :return: Dataset with re-ordered axes.
    """
    if data_names is None:
        data_names = self.dependents()
    if isinstance(data_names, str):
        data_names = [data_names]

    transposed = []
    orders = {}
    orig_axes = {}
    for n in data_names:
        orders[n] = self.reorder_axes_indices(n, **pos)
        orig_axes[n] = self.axes(n).copy()

    for n in data_names:
        neworder, newaxes = orders[n]
        self[n]['axes'] = newaxes
        self[n]['values'] = self[n]['values'].transpose(neworder)
        for ax in orig_axes[n]:
            if ax not in transposed:
                self[ax]['values'] = self[ax]['values'].transpose(neworder)
                transposed.append(ax)

    self.validate()
    return self

shape()

Return the shape of the meshgrid.

Returns:

Type Description
Union[None, Tuple[int, ...]]

The shape as tuple. None if no data in the set.

Source code in labcore/data/datadict.py
def shape(self) -> Union[None, Tuple[int, ...]]:
    """
    Return the shape of the meshgrid.

    :returns: The shape as tuple. ``None`` if no data in the set.
    """
    for d, _ in self.data_items():
        return np.array(self.data_vals(d)).shape
    return None

slice(**kwargs)

Return a N-d slice of the data.

Parameters:

Name Type Description Default
kwargs Dict[str, Union[slice, int]]

slicing information in the format axis: spec, where spec can be a slice object, or an integer (usual slicing notation).

{}

Returns:

Type Description
MeshgridDataDict

sliced data (as a copy)

Source code in labcore/data/datadict.py
def slice(self, **kwargs: Dict[str, Union[slice, int]]) -> 'MeshgridDataDict':
    """Return a N-d slice of the data.

    :param kwargs: slicing information in the format ``axis: spec``, where
        ``spec`` can be a ``slice`` object, or an integer (usual slicing 
        notation).
    :return: sliced data (as a copy)
    """
    return _mesh_slice(self, **kwargs)

squeeze()

Remove size-1 dimensions.

Source code in labcore/data/datadict.py
def squeeze(self) -> None:
    """Remove size-1 dimensions."""
    raise NotImplementedError

validate()

Validation of the dataset.

Performs the following checks: * All dependents must have the same axes. * All shapes need to be identical.

Returns:

Type Description
bool

True if valid.

Source code in labcore/data/datadict.py
def validate(self) -> bool:
    """
    Validation of the dataset.

    Performs the following checks:
    * All dependents must have the same axes.
    * All shapes need to be identical.

    :return: ``True`` if valid.
    :raises: ``ValueError`` if invalid.
    """
    if not super().validate():
        return False

    msg = '\n'

    axes = None
    axessrc = ''
    for d in self.dependents():
        if axes is None:
            axes = self.axes(d)
        else:
            if axes != self.axes(d):
                msg += f" * All dependents must have the same axes, but "
                msg += f"{d} has {self.axes(d)} and {axessrc} has {axes}\n"

    shp = None
    shpsrc = ''

    data_items = dict(self.data_items())

    for n, v in data_items.items():
        if type(v['values']) not in [np.ndarray, np.ma.core.MaskedArray]:
            self[n]['values'] = np.array(v['values'])

        if shp is None:
            shp = v['values'].shape
            shpsrc = n
        else:
            if v['values'].shape != shp:
                msg += f" * shapes need to match, but '{n}' has"
                msg += f" {v['values'].shape}, "
                msg += f"and '{shpsrc}' has {shp}.\n"

        if msg != '\n':
            raise ValueError(msg)

        if 'axes' in v:
            for axis_num, na in enumerate(v['axes']):
                # check that the data of the axes matches its use
                # if data present
                axis_data = data_items[na]['values']

                # for the data to be a valid meshgrid, we need to have an increase/decrease along each
                # axis that contains data.
                if axis_data.size > 0:
                    # if axis length is 1, then we cannot infer anything about grids yet

                    try:
                        if axis_data.shape[axis_num] > 1:
                            steps = np.unique(np.sign(np.diff(axis_data, axis=axis_num)))

                            # for incomplete data, there maybe nan steps -- we need to remove those, 
                            # doesn't mean anything is wrong.
                            steps = steps[~np.isnan(steps)]

                            if 0 in steps:
                                msg += (f"Malformed data: {na} is expected to be {axis_num}th "
                                        "axis but has no variation along that axis.\n")
                            if steps.size > 1:
                                msg += (f"Malformed data: axis {na} is not monotonous.\n")

                    # can happen if we have bad shapes. but that should already have been caught.
                    except IndexError:
                        pass

        if '__shape__' in v:
            v['__shape__'] = shp

        if msg != '\n':
            raise ValueError(msg)

    return True

combine_datadicts(*dicts)

Try to make one datadict out of multiple.

Basic rules:

  • We try to maintain the input type.
  • Return type is 'downgraded' to DataDictBase if the contents are not compatible (i.e., different numbers of records in the inputs).

Returns:

Type Description
Union[DataDictBase, DataDict]

Combined data.

Source code in labcore/data/datadict.py
def combine_datadicts(*dicts: DataDict) -> Union[DataDictBase, DataDict]:
    """
    Try to make one datadict out of multiple.

    Basic rules:

    - We try to maintain the input type.
    - Return type is 'downgraded' to DataDictBase if the contents are not
      compatible (i.e., different numbers of records in the inputs).

    :returns: Combined data.
    """

    # TODO: deal correctly with MeshGridData when combined with other types
    # TODO: should we strictly copy all values?
    # TODO: we should try to consolidate axes as much as possible. Currently
    #   axes in the return can be separated even if they match (caused
    #   by earlier mismatches)

    ret = None
    rettype = None

    for d in dicts:
        if ret is None:
            ret = d.copy()
            rettype = type(d)

        else:

            # if we don't have a well defined number of records anymore,
            # need to revert the type to DataDictBase
            if hasattr(d, 'nrecords') and hasattr(ret, 'nrecords'):
                if d.nrecords() != ret.nrecords():
                    rettype = DataDictBase
            else:
                rettype = DataDictBase
            ret = rettype(**ret)

            # First, parse the axes in the to-be-added ddict.
            # if dimensions with same names are present already in the current
            # return ddict and are not compatible with what's to be added,
            # rename the incoming dimension.
            ax_map = {}
            for d_ax in d.axes():
                if d_ax in ret.axes():
                    if num.arrays_equal(d.data_vals(d_ax), ret.data_vals(d_ax)):
                        ax_map[d_ax] = d_ax
                    else:
                        newax = _find_replacement_name(ret, d_ax)
                        ax_map[d_ax] = newax
                        ret[newax] = d[d_ax]
                elif d_ax in ret.dependents():
                    newax = _find_replacement_name(ret, d_ax)
                    ax_map[d_ax] = newax
                    ret[newax] = d[d_ax]
                else:
                    ax_map[d_ax] = d_ax
                    ret[d_ax] = d[d_ax]

            for d_dep in d.dependents():
                if d_dep in ret:
                    newdep = _find_replacement_name(ret, d_dep)
                else:
                    newdep = d_dep

                dep_axes = [ax_map[ax] for ax in d[d_dep]['axes']]
                ret[newdep] = d[d_dep]
                ret[newdep]['axes'] = dep_axes

    if ret is None:
        ret = DataDict()
    else:
        ret.validate()

    return ret

datadict_to_meshgrid(data, target_shape=None, inner_axis_order=None, use_existing_shape=False, copy=True)

Try to make a meshgrid from a dataset.

Parameters:

Name Type Description Default
data DataDict

Input DataDict.

required
target_shape Union[Tuple[int, ...], None]

Target shape. If None we use guess_shape_from_datadict to infer.

None
inner_axis_order Union[None, Sequence[str]]

If axes of the datadict are not specified in the 'C' order (1st the slowest, last the fastest axis) then the 'true' inner order can be specified as a list of axes names, which has to match the specified axes in all but order. The data is then transposed to conform to the specified order. .. note:: If this is given, then target_shape needs to be given in in the order of this inner_axis_order. The output data will keep the axis ordering specified in the axes property.

None
use_existing_shape bool

if True, simply use the shape that the data already has. For numpy-array data, this might already be present. If False, flatten and reshape.

False
copy bool

if True, then we make a copy of the data arrays. if False, data array is modified in-place.

True

Returns:

Type Description
MeshgridDataDict

The generated MeshgridDataDict.

Source code in labcore/data/datadict.py
def datadict_to_meshgrid(data: DataDict,
                         target_shape: Union[Tuple[int, ...], None] = None,
                         inner_axis_order: Union[None, Sequence[str]] = None,
                         use_existing_shape: bool = False,
                         copy: bool = True) \
        -> MeshgridDataDict:
    """
    Try to make a meshgrid from a dataset.

    :param data: Input DataDict.
    :param target_shape: Target shape. If ``None`` we use
        ``guess_shape_from_datadict`` to infer.
    :param inner_axis_order: If axes of the datadict are not specified in the
        'C' order (1st the slowest, last the fastest axis) then the
        'true' inner order can be specified as a list of axes names, which has
        to match the specified axes in all but order. The data is then
        transposed to conform to the specified order.

        .. note::
            If this is given, then ``target_shape`` needs to be given in
            in the order of this inner_axis_order. The output data will keep the
            axis ordering specified in the `axes` property.

    :param use_existing_shape: if ``True``, simply use the shape that the data
        already has. For numpy-array data, this might already be present.
        If ``False``, flatten and reshape.
    :param copy: if ``True``, then we make a copy of the data arrays.
        if ``False``, data array is modified in-place.

    :raises: GriddingError (subclass of ValueError) if the data cannot be gridded.
    :returns: The generated ``MeshgridDataDict``.
    """

    # if the data is empty, return empty MeshgridData
    if len([k for k, _ in data.data_items()]) == 0:
        return MeshgridDataDict()

    if not data.axes_are_compatible():
        raise GriddingError('Non-compatible axes, cannot grid that.')

    if not use_existing_shape and data.is_expandable():
        data = data.expand()
    elif use_existing_shape:
        target_shape = list(data.shapes().values())[0]

    # guess what the shape likely is.
    if target_shape is None:
        shp_specs = guess_shape_from_datadict(data)
        shps = set(order_shape[1] if order_shape is not None
                   else None for order_shape in shp_specs.values())
        if len(shps) > 1:
            raise GriddingError('Cannot determine unique shape for all data.')
        ret = list(shp_specs.values())[0]
        if ret is None:
            raise GriddingError('Shape could not be inferred.')
        # the guess-function returns both axis order as well as shape.
        inner_axis_order, target_shape = ret

    # construct new data
    newdata = MeshgridDataDict(**misc.unwrap_optional(data.structure(add_shape=False)))
    axlist = data.axes(data.dependents()[0])

    for k, v in data.data_items():
        vals = num.array1d_to_meshgrid(v['values'], target_shape, copy=copy)

        # if an inner axis order is given, we transpose to transform from that
        # to the specified order.
        if inner_axis_order is not None:
            transpose_idxs = misc.reorder_indices(
                inner_axis_order, axlist)
            vals = vals.transpose(transpose_idxs)

        newdata[k]['values'] = vals

    newdata = newdata.sanitize()
    newdata.validate()
    return newdata

datasets_are_equal(a, b, ignore_meta=False)

Check whether two datasets are equal.

Compares type, structure, and content of all fields.

Parameters:

Name Type Description Default
a DataDictBase

First dataset.

required
b DataDictBase

Second dataset.

required
ignore_meta bool

If True, do not verify if metadata matches.

False

Returns:

Type Description
bool

True or False.

Source code in labcore/data/datadict.py
def datasets_are_equal(a: DataDictBase, b: DataDictBase,
                       ignore_meta: bool = False) -> bool:
    """Check whether two datasets are equal.

    Compares type, structure, and content of all fields.

    :param a: First dataset.
    :param b: Second dataset.
    :param ignore_meta: If ``True``, do not verify if metadata matches.
    :returns: ``True`` or ``False``.
    """

    if not type(a) == type(b):
        return False

    if not a.same_structure(a, b):
        return False

    if not ignore_meta:
        # are all meta data of a also in b, and are they the same value?
        for k, v in a.meta_items():
            if k not in [kk for kk, vv in b.meta_items()]:
                return False
            elif b.meta_val(k) != v:
                return False

        # are all meta data of b also in a?
        for k, v in b.meta_items():
            if k not in [kk for kk, vv in a.meta_items()]:
                return False

    # check all data fields in a
    for dn, dv in a.data_items():

        # are all fields also present in b?
        if dn not in [dnn for dnn, dvv in b.data_items()]:
            return False

        # check if data is equal
        if not num.arrays_equal(
                np.array(a.data_vals(dn)),
                np.array(b.data_vals(dn)),
        ):
            return False

        if not ignore_meta:
            # check meta data
            for k, v in a.meta_items(dn):
                if k not in [kk for kk, vv in b.meta_items(dn)]:
                    return False
                elif v != b.meta_val(k, dn):
                    return False

    # only thing left to check is whether there are items in b but not a
    for dn, dv in b.data_items():
        if dn not in [dnn for dnn, dvv in a.data_items()]:
            return False

        if not ignore_meta:
            for k, v in b.meta_items(dn):
                if k not in [kk for kk, vv in a.meta_items(dn)]:
                    return False

    return True

datastructure_from_string(description)

Construct a DataDict from a string description.

Examples: * "data[mV](x, y)" results in a datadict with one dependent data with unit mV and two independents, x and y, that do not have units.

* ``"data_1[mV](x, y); data_2[mA](x); x[mV]; y[nT]"`` results in two dependents,
  one of them depening on ``x`` and ``y``, the other only on ``x``.
  Note that ``x`` and ``y`` have units. We can (but do not have to) omit them when specifying
  the dependencies.

* ``"data_1[mV](x[mV], y[nT]); data_2[mA](x[mV])"``. Same result as the previous example.

Rules: We recognize descriptions of the form field1[unit1](ax1, ax2, ...); field1[unit2](...); ....

* Field names (like ``field1`` and ``field2`` above) have to start with a letter, and may contain
  word characters.
* Field descriptors consist of the name, optional unit (presence signified by square brackets),
  and optional dependencies (presence signified by round brackets).
* Dependencies (axes) are implicitly recognized as fields (and thus have the same naming restrictions as field
  names).
* Axes are separated by commas.
* Axes may have a unit when specified as dependency, but besides the name, square brackets, and commas no other
  characters are recognized within the round brackets that specify the dependency.
* In addition to being specified as dependency for a field,
  axes may be specified also as additional field without dependency,
  for instance to specify the unit (may simplify the string). For example,
  ``z1[x, y]; z2[x, y]; x[V]; y[V]``.
* Units may only consist of word characters.
* Use of unexpected characters will result in the ignoring the part that contains the symbol.
* The regular expression used to find field descriptors is:
  ``((?<=\A)|(?<=\;))[a-zA-Z]+\w*(\[\w*\])?(\(([a-zA-Z]+\w*(\[\w*\])?\,?)*\))?``
Source code in labcore/data/datadict.py
def datastructure_from_string(description: str) -> DataDict:
    r"""Construct a DataDict from a string description.

    Examples:
        * ``"data[mV](x, y)"`` results in a datadict with one dependent ``data`` with unit ``mV`` and
          two independents, ``x`` and ``y``, that do not have units.

        * ``"data_1[mV](x, y); data_2[mA](x); x[mV]; y[nT]"`` results in two dependents,
          one of them depening on ``x`` and ``y``, the other only on ``x``.
          Note that ``x`` and ``y`` have units. We can (but do not have to) omit them when specifying
          the dependencies.

        * ``"data_1[mV](x[mV], y[nT]); data_2[mA](x[mV])"``. Same result as the previous example.

    Rules:
        We recognize descriptions of the form ``field1[unit1](ax1, ax2, ...); field1[unit2](...); ...``.

        * Field names (like ``field1`` and ``field2`` above) have to start with a letter, and may contain
          word characters.
        * Field descriptors consist of the name, optional unit (presence signified by square brackets),
          and optional dependencies (presence signified by round brackets).
        * Dependencies (axes) are implicitly recognized as fields (and thus have the same naming restrictions as field
          names).
        * Axes are separated by commas.
        * Axes may have a unit when specified as dependency, but besides the name, square brackets, and commas no other
          characters are recognized within the round brackets that specify the dependency.
        * In addition to being specified as dependency for a field,
          axes may be specified also as additional field without dependency,
          for instance to specify the unit (may simplify the string). For example,
          ``z1[x, y]; z2[x, y]; x[V]; y[V]``.
        * Units may only consist of word characters.
        * Use of unexpected characters will result in the ignoring the part that contains the symbol.
        * The regular expression used to find field descriptors is:
          ``((?<=\A)|(?<=\;))[a-zA-Z]+\w*(\[\w*\])?(\(([a-zA-Z]+\w*(\[\w*\])?\,?)*\))?``
    """

    description = description.replace(" ", "")

    data_name_pattern = r"[a-zA-Z]+\w*(\[\w*\])?"
    pattern = r"((?<=\A)|(?<=\;))" + data_name_pattern + r"(\((" + data_name_pattern + r"\,?)*\))?"
    r = re.compile(pattern)

    data_fields = []
    while (r.search(description)):
        match = r.search(description)
        if match is None: break
        data_fields.append(description[slice(*match.span())])
        description = description[match.span()[1]:]

    dd: Dict[str, Any] = dict()

    def analyze_field(df: str) -> Tuple[str, Optional[str], Optional[List[str]]]:
        has_unit = True if '[' in df and ']' in df else False
        has_dependencies = True if '(' in df and ')' in df else False

        name: str = ""
        unit: Optional[str] = None
        axes: Optional[List[str]] = None

        if has_unit:
            name = df.split('[')[0]
            unit = df.split('[')[1].split(']')[0]
            if has_dependencies:
                axes = df.split('(')[1].split(')')[0].split(',')
        elif has_dependencies:
            name = df.split('(')[0]
            axes = df.split('(')[1].split(')')[0].split(',')
        else:
            name = df

        if axes is not None and len(axes) == 0:
            axes = None
        return name, unit, axes

    for df in data_fields:
        name, unit, axes = analyze_field(df)

        # double specifying is only allowed for independents.
        # if an independent is specified multiple times, units must not collide
        # (but units do not have to be specified more than once)
        if name in dd:
            if 'axes' in dd[name] or axes is not None:
                raise ValueError(f'{name} is specified more than once.')
            if 'unit' in dd[name] and unit is not None and dd[name]['unit'] != unit:
                raise ValueError(f'conflicting units for {name}')

        dd[name] = dict()
        if unit is not None:
            dd[name]['unit'] = unit

        if axes is not None:
            for ax in axes:
                ax_name, ax_unit, ax_axes = analyze_field(ax)

                # we do not allow nested dependencies.
                if ax_axes is not None:
                    raise ValueError(f'{ax_name} is independent, may not have dependencies')

                # we can add fields implicitly from dependencies.
                # independents may be given both implicitly and explicitly, but only
                # when units don't collide.
                if ax_name not in dd:
                    dd[ax_name] = dict()
                    if ax_unit is not None:
                        dd[ax_name]['unit'] = ax_unit
                else:
                    if 'unit' in dd[ax_name] and ax_unit is not None and dd[ax_name]['unit'] != ax_unit:
                        raise ValueError(f'conflicting units for {ax_name}')

                if 'axes' not in dd[name]:
                    dd[name]['axes'] = []
                dd[name]['axes'].append(ax_name)

    return DataDict(**dd)

dd2df(dd)

make a pandas Dataframe from a datadict. Uses MultiIndex, and assumes that all data fields are compatible.

Parameters

dd : DataDict source data

Returns

DataFrame pandas DataFrame

Source code in labcore/data/datadict.py
def dd2df(dd: DataDict):
    """make a pandas Dataframe from a datadict.
    Uses MultiIndex, and assumes that all data fields are compatible.

    Parameters
    ----------
    dd : DataDict
        source data

    Returns
    -------
    DataFrame
        pandas DataFrame
    """
    dd_flat = dd.expand()
    idx = pd.MultiIndex.from_arrays(
        [dd_flat[a]['values'] for a in dd_flat.axes()],
        names = dd_flat.axes(),
    )
    vals = {d: dd_flat[d]['values'] for d in dd_flat.dependents()}
    return pd.DataFrame(data=vals, index=idx)

dd2xr(dd)

makes an xarray Dataset from a MeshgridDataDict.

TODO: currently only supports 'regular' grides, i.e., all axes are independet of each other, and can be represented by 1d arrays. For each axis, the first slice is used as coordinate values.

Parameters

dd : MeshgridDataDict input data

Returns

xr.Dataset xarray Dataset

Source code in labcore/data/datadict.py
def dd2xr(dd: MeshgridDataDict) -> xr.Dataset:
    """makes an xarray Dataset from a MeshgridDataDict.

    TODO: currently only supports 'regular' grides, i.e., all axes
        are independet of each other, and can be represented by 1d arrays.
        For each axis, the first slice is used as coordinate values.

    Parameters
    ----------
    dd : MeshgridDataDict
        input data

    Returns
    -------
    xr.Dataset
        xarray Dataset
    """
    axes = dd.axes()
    coords = {}
    for i, a in enumerate(axes):
        slices = [0] * len(axes)
        slices[i] = slice(None)
        coords[a] = dd[a]['values'][tuple(slices)]

    xds = xr.Dataset(
        {d: (axes, dd[d]['values']) for d in dd.dependents()},
        coords=coords,
    )

    for d in xds.data_vars:
        xds[d].attrs['units'] = dd[d]['unit']
    for d in xds.dims:
        xds[d].attrs['units'] = dd[d]['unit']

    return xds

guess_shape_from_datadict(data)

Try to guess the shape of the datadict dependents from the axes values.

Parameters:

Name Type Description Default
data DataDict

Dataset to examine.

required

Returns:

Type Description
Dict[str, Union[None, Tuple[List[str], Tuple[int, ...]]]]

A dictionary with the dependents as keys, and inferred shapes as values. Value is None, if the shape could not be inferred.

Source code in labcore/data/datadict.py
def guess_shape_from_datadict(data: DataDict) -> \
        Dict[str, Union[None, Tuple[List[str], Tuple[int, ...]]]]:
    """
    Try to guess the shape of the datadict dependents from the axes values.

    :param data: Dataset to examine.
    :return: A dictionary with the dependents as keys, and inferred shapes as
             values. Value is ``None``, if the shape could not be inferred.
    """

    shapes = {}
    for d in data.dependents():
        axnames = data.axes(d)
        axes: Dict[str, np.ndarray] = {}
        for a in axnames:
            axdata = data.data_vals(a)
            axes[a] = axdata
        shapes[d] = num.guess_grid_from_sweep_direction(**axes)

    return shapes

is_meta_key(key)

Checks if key is meta information.

Parameters:

Name Type Description Default
key str

The key we are checking.

required

Returns:

Type Description
bool

True if it is, False if it isn't.

Source code in labcore/data/datadict.py
def is_meta_key(key: str) -> bool:
    """Checks if ``key`` is meta information.

    :param key: The ``key`` we are checking.
    :return: ``True`` if it is, ``False`` if it isn't.
    """
    if key[:2] == '__' and key[-2:] == '__':
        return True
    else:
        return False

meshgrid_to_datadict(data)

Make a DataDict from a MeshgridDataDict by reshaping the data.

Parameters:

Name Type Description Default
data MeshgridDataDict

Input MeshgridDataDict.

required

Returns:

Type Description
DataDict

Flattened DataDict.

Source code in labcore/data/datadict.py
def meshgrid_to_datadict(data: MeshgridDataDict) -> DataDict:
    """
    Make a DataDict from a MeshgridDataDict by reshaping the data.

    :param data: Input ``MeshgridDataDict``.
    :return: Flattened ``DataDict``.
    """
    newdata = DataDict(**misc.unwrap_optional(data.structure(add_shape=False)))
    for k, v in data.data_items():
        val = v['values'].copy().reshape(-1)
        newdata[k]['values'] = val

    newdata = newdata.sanitize()
    newdata.validate()
    return newdata

meta_key_to_name(key)

Converts a meta data key to just the name. E.g: for key: "meta" returns "meta"

Parameters:

Name Type Description Default
key str

The key that is being converted

required

Returns:

Type Description
str

The name of the key.

Source code in labcore/data/datadict.py
def meta_key_to_name(key: str) -> str:
    """
    Converts a meta data key to just the name.
    E.g: for ``key``: "__meta__" returns "meta"

    :param key: The key that is being converted
    :return: The name of the key.
    :raises: ``ValueError`` if the ``key`` is not a meta key.


    """

    if is_meta_key(key):
        return key[2:-2]
    else:
        raise ValueError(f'{key} is not a meta key.')

meta_name_to_key(name)

Converts name into a meta data key. E.g: "meta" gets converted to "meta"

Parameters:

Name Type Description Default
name str

The name that is being converted.

required

Returns:

Type Description
str

The meta data key based on name.

Source code in labcore/data/datadict.py
def meta_name_to_key(name: str) -> str:
    """
    Converts ``name`` into a meta data key. E.g: "meta" gets converted to "__meta__"

    :param name: The name that is being converted.
    :return: The meta data key based on ``name``.
    """
    return '__' + name + '__'

Datadict Storage

plottr.data.datadict_storage

Provides file-storage tools for the DataDict class.

.. note:: Any function in this module that interacts with a ddh5 file, will create a lock file while it is using the file. The lock file has the following format: ~.lock. The file lock will get deleted even if the program crashes. If the process is suddenly stopped however, we cannot guarantee that the file lock will be deleted.

AppendMode

Bases: Enum

How/Whether to append data to existing data.

Source code in labcore/data/datadict_storage.py
class AppendMode(Enum):
    """How/Whether to append data to existing data."""

    #: Data that is additional compared to already existing data is appended.
    new = 0
    #: All data is appended to existing data.
    all = 1
    #: Data is overwritten.
    none = 2

DDH5Writer

Bases: object

Context manager for writing data to DDH5. Based on typical needs in taking data in an experimental physics lab.

Creates lock file when writing data.

Can be used in safe_write_mode to make sure the experiment and data will be saved even if the ddh5 is being used by other programs. In this mode, the data is individually saved in files in a .tmp folder. When the experiment is finished, the data is unified and saved in the original file. If the data is correctly reconstructed, the .tmp folder is deleted. If not you can use the function unify_safe_write_data to reconstruct the data.

Parameters:

Name Type Description Default
basedir Union[str, Path]

The root directory in which data is stored. :meth:.create_file_structure is creating the structure inside this root and determines the file name of the data. The default structure implemented here is <root>/YYYY-MM-DD/YYYY-mm-dd_THHMMSS_<ID>-<name>/<filename>.ddh5, where is a short identifier string and is the value of parameter name. To change this, re-implement :meth:.data_folder and/or :meth:.create_file_structure.

'.'
datadict DataDict

Initial data object. Must contain at least the structure of the data to be able to use :meth:add_data to add data.

required
groupname str

Name of the top-level group in the file container. An existing group of that name will be deleted.

'data'
name Optional[str]

Name of this dataset. Used in path/file creation and added as meta data.

None
filename str

Filename to use. Defaults to 'data.ddh5'.

'data'
file_timeout Optional[float]

How long the function will wait for the ddh5 file to unlock. If none uses the default value from the :class:FileOpener.

None
safe_write_mode Optional[bool]

If True, will save the data in the safe writing mode. Defaults to False.

False
Source code in labcore/data/datadict_storage.py
class DDH5Writer(object):
    """Context manager for writing data to DDH5.
    Based on typical needs in taking data in an experimental physics lab.

    Creates lock file when writing data.

    Can be used in safe_write_mode to make sure the experiment and data will be saved even if the ddh5 is being used by
    other programs. In this mode, the data is individually saved in files in a .tmp folder. When the experiment is
    finished, the data is unified and saved in the original file.
    If the data is correctly reconstructed, the .tmp folder is deleted. If not you can use the function unify_safe_write_data
    to reconstruct the data.


    :param basedir: The root directory in which data is stored.
        :meth:`.create_file_structure` is creating the structure inside this root and
        determines the file name of the data. The default structure implemented here is
        ``<root>/YYYY-MM-DD/YYYY-mm-dd_THHMMSS_<ID>-<name>/<filename>.ddh5``,
        where <ID> is a short identifier string and <name> is the value of parameter `name`.
        To change this, re-implement :meth:`.data_folder` and/or
        :meth:`.create_file_structure`.
    :param datadict: Initial data object. Must contain at least the structure of the
        data to be able to use :meth:`add_data` to add data.
    :param groupname: Name of the top-level group in the file container. An existing
        group of that name will be deleted.
    :param name: Name of this dataset. Used in path/file creation and added as meta data.
    :param filename: Filename to use. Defaults to 'data.ddh5'.
    :param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
        value from the :class:`FileOpener`.
    :param safe_write_mode: If True, will save the data in the safe writing mode. Defaults to False.
    """

    # TODO: need an operation mode for not keeping data in memory.
    # TODO: a mode for working with pre-allocated data

    # Sets how many files before the writer creates a new folder in its safe writing mode
    n_files_per_dir = 1000

    # Controls how often the writer reconstructs the data in its safe writing mode.
    # It will reconstruct the data every `n_files_per_reconstruction` files or every `n_seconds_per_reconstruction`
    # seconds, whichever comes first.
    n_files_per_reconstruction = 1000
    n_seconds_per_reconstruction = 10

    def __init__(
        self,
        datadict: DataDict,
        basedir: Union[str, Path] = ".",
        groupname: str = "data",
        name: Optional[str] = None,
        filename: str = "data",
        filepath: Optional[Union[str, Path]] = None,
        file_timeout: Optional[float] = None,
        safe_write_mode: Optional[bool] = False,
    ):
        """Constructor for :class:`.DDH5Writer`"""

        self.basedir = Path(basedir)
        self.datadict = datadict

        if name is None:
            name = ""
        self.name = name

        self.groupname = groupname
        self.filename = Path(filename)

        self.filepath: Optional[Path] = None
        if filepath is not None:
            self.filepath = Path(filepath)

        self.datadict.add_meta("dataset.name", name)
        self.file_timeout = file_timeout
        self.uuid = uuid.uuid1()

        self.safe_write_mode = safe_write_mode
        # Stores how many individual data files have been written for safe_write_mode
        self.n_files = 0
        self.last_update_n_files = 0
        self.last_reconstruction_time = time.time()

    def __enter__(self) -> "DDH5Writer":
        if self.filepath is None:
            self.filepath = _data_file_path(self.data_file_path(), True)
        logger.info(f"Data location: {self.filepath}")

        nrecords: Optional[int] = self.datadict.nrecords()
        if nrecords is not None and nrecords > 0:
            datadict_to_hdf5(
                self.datadict,
                str(self.filepath),
                groupname=self.groupname,
                append_mode=AppendMode.none,
                file_timeout=self.file_timeout,
            )
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        exc_traceback: Optional[TracebackType],
    ) -> None:
        assert self.filepath is not None

        if self.safe_write_mode:
            try:
                logger.debug("Starting reconstruction of data")
                dd = reconstruct_safe_write_data(self.filepath, file_timeout=self.file_timeout)

                # Makes sure the reconstructed data matches the one in the .tmp folder
                assert datasets_are_equal(dd, self.datadict, ignore_meta=True)

                datadict_to_hdf5(dd, self.filepath, groupname=self.groupname, file_timeout=self.file_timeout, append_mode=AppendMode.none)
                shutil.rmtree(self.filepath.parent / ".tmp")

            except Exception as e:
                logger.error(f"Error while unifying data. Data should be located in the .tmp directory: {e}")
                self.add_tag("__not_reconstructed__")
                raise e

        with FileOpener(self.filepath, "a", timeout=self.file_timeout) as f:
            add_cur_time_attr(f.require_group(self.groupname), name="close")
        if exc_type is None:
            # exiting because the measurement is complete
            self.add_tag("__complete__")
        else:
            # exiting because of an exception
            self.add_tag("__interrupted__")

    def data_folder(self) -> Path:
        """Return the folder, relative to the data root path, in which data will
        be saved.

        Default format:
        ``<basedir>/YYYY-MM-DD/YYYY-mm-ddTHHMMSS_<ID>-<name>``.
        In this implementation we use the first 8 characters of a UUID as ID.

        :returns: The folder path.
        """
        ID = str(self.uuid).split("-")[0]
        parent = f"{datetime.datetime.now().replace(microsecond=0).isoformat().replace(':', '')}_{ID}"
        if self.name:
            parent += f"-{self.name}"
        path = Path(time.strftime("%Y-%m-%d"), parent)
        return path

    def data_file_path(self) -> Path:
        """Determine the filepath of the data file.

        :returns: The filepath of the data file.
        """
        data_folder_path = Path(self.basedir, self.data_folder())
        appendix = ""
        idx = 2
        while data_folder_path.exists():
            appendix = f"-{idx}"
            data_folder_path = Path(self.basedir, str(self.data_folder()) + appendix)
            idx += 1

        return Path(data_folder_path, self.filename)

    def _generate_next_safe_write_path(self):
        """
        Generates the next path for the data to be saved in the safe writing mode. Should not be used for other things.
        """

        now = datetime.datetime.now()

        # Creates tmp folder
        tmp_folder = self.filepath.parent / ".tmp"
        tmp_folder.mkdir(exist_ok=True)

        # Creates today folder
        today_folder = tmp_folder / now.strftime("%Y-%m-%d")
        today_folder.mkdir(exist_ok=True)

        # Creates hour folder
        hour_folder = today_folder / now.strftime("%H")
        hour_folder.mkdir(exist_ok=True)

        # Creates minute folder
        minute_folder = hour_folder / now.strftime("%M")
        minute_folder.mkdir(exist_ok=True)

        n_secs = 0
        second_folder = minute_folder / (now.strftime("%S") + f"_#{str(n_secs)}")
        if second_folder.exists():
            n_data_files = len(list(second_folder.iterdir())) + 1
            if n_data_files >= self.n_files_per_dir:
                keep_searching = True
                while keep_searching:
                    n_secs += 1
                    second_folder = minute_folder / (now.strftime("%S") + f"_#{str(n_secs)}")
                    if not second_folder.exists():
                        keep_searching = False
                        second_folder.mkdir()
                    else:
                        n_data_files = len(list(second_folder.iterdir())) + 1
                        if n_data_files < self.n_files_per_dir:
                            keep_searching = False

        # Creates the filename that follows the structure: yyyy-mm-dd-HHMM-SS#_#<total_number_of_files>.ddh5
        filename = now.strftime("%Y-%m-%d-%H_%M_%S") + f"_{n_secs}_#{self.n_files}.ddh5"
        self.n_files += 1

        return second_folder/filename

    def add_data(self, **kwargs: Any) -> None:
        """Add data to the file (and the internal `DataDict`).

        Requires one keyword argument per data field in the `DataDict`, with
        the key being the name, and value the data to add. It is required that
        all added data has the same number of 'rows', i.e., the most outer dimension
        has to match for data to be inserted faithfully.
        If some data is scalar and others are not, then the data should be reshaped
        to (1, ) for the scalar data, and (1, ...) for the others; in other words,
        an outer dimension with length 1 is added for all.
        """
        self.datadict.add_data(**kwargs)

        if self.safe_write_mode:
            clean_dd_copy = self.datadict.structure()
            clean_dd_copy.add_data(**kwargs)
            filepath = self._generate_next_safe_write_path()

            datadict_to_hdf5(
                clean_dd_copy,
                filepath,
                groupname=self.groupname,
                append_mode=AppendMode.new,
                file_timeout=self.file_timeout,
            )

            delta_t = time.time() - self.last_reconstruction_time

            # Reconstructs the data every n_files_per_reconstruction files or every n_seconds_per_reconstruction seconds
            if (self.n_files - self.last_update_n_files >= self.n_files_per_reconstruction or
                    delta_t > self.n_seconds_per_reconstruction):
                try:
                    dd = reconstruct_safe_write_data(self.filepath, unification_from_scratch=False,
                                                     file_timeout=self.file_timeout)
                    datadict_to_hdf5(dd, self.filepath, groupname=self.groupname, file_timeout=self.file_timeout, append_mode=AppendMode.none)
                except RuntimeError as e:
                    logger.warning(f"Error while unifying data: {e} \nData is still getting saved in .tmp directory.")

                with FileOpener(self.filepath, "a", timeout=self.file_timeout) as f:
                    add_cur_time_attr(f, name="last_change")
                    add_cur_time_attr(f[self.groupname], name="last_change")

                # Even if I fail at reconstruction, I want to wait the same amount as if it was successful to try again.
                self.last_reconstruction_time = time.time()
                self.last_update_n_files = self.n_files

        else:
            nrecords = self.datadict.nrecords()
            if nrecords is not None and nrecords > 0:
                datadict_to_hdf5(
                    self.datadict,
                    str(self.filepath),
                    groupname=self.groupname,
                    file_timeout=self.file_timeout,
                )

                assert self.filepath is not None
                with FileOpener(self.filepath, "a", timeout=self.file_timeout) as f:
                    add_cur_time_attr(f, name="last_change")
                    add_cur_time_attr(f[self.groupname], name="last_change")

    # convenience methods for saving things in the same directory as the ddh5 file

    def add_tag(self, tags: Union[str, Collection[str]]) -> None:
        assert self.filepath is not None
        if isinstance(tags, str):
            tags = [tags]
        for tag in tags:
            open(self.filepath.parent / f"{tag}.tag", "x").close()

    def backup_file(self, paths: Union[str, Collection[str]]) -> None:
        assert self.filepath is not None
        if isinstance(paths, str):
            paths = [paths]
        for path in paths:
            shutil.copy(path, self.filepath.parent)

    def save_text(self, name: str, text: str) -> None:
        assert self.filepath is not None
        with open(self.filepath.parent / name, "x") as f:
            f.write(text)

    def save_dict(self, name: str, d: dict) -> None:
        assert self.filepath is not None
        with open(self.filepath.parent / name, "x") as f:
            json.dump(d, f, indent=4, ensure_ascii=False, cls=NumpyEncoder)

__init__(datadict, basedir='.', groupname='data', name=None, filename='data', filepath=None, file_timeout=None, safe_write_mode=False)

Constructor for :class:.DDH5Writer

Source code in labcore/data/datadict_storage.py
def __init__(
    self,
    datadict: DataDict,
    basedir: Union[str, Path] = ".",
    groupname: str = "data",
    name: Optional[str] = None,
    filename: str = "data",
    filepath: Optional[Union[str, Path]] = None,
    file_timeout: Optional[float] = None,
    safe_write_mode: Optional[bool] = False,
):
    """Constructor for :class:`.DDH5Writer`"""

    self.basedir = Path(basedir)
    self.datadict = datadict

    if name is None:
        name = ""
    self.name = name

    self.groupname = groupname
    self.filename = Path(filename)

    self.filepath: Optional[Path] = None
    if filepath is not None:
        self.filepath = Path(filepath)

    self.datadict.add_meta("dataset.name", name)
    self.file_timeout = file_timeout
    self.uuid = uuid.uuid1()

    self.safe_write_mode = safe_write_mode
    # Stores how many individual data files have been written for safe_write_mode
    self.n_files = 0
    self.last_update_n_files = 0
    self.last_reconstruction_time = time.time()

add_data(**kwargs)

Add data to the file (and the internal DataDict).

Requires one keyword argument per data field in the DataDict, with the key being the name, and value the data to add. It is required that all added data has the same number of 'rows', i.e., the most outer dimension has to match for data to be inserted faithfully. If some data is scalar and others are not, then the data should be reshaped to (1, ) for the scalar data, and (1, ...) for the others; in other words, an outer dimension with length 1 is added for all.

Source code in labcore/data/datadict_storage.py
def add_data(self, **kwargs: Any) -> None:
    """Add data to the file (and the internal `DataDict`).

    Requires one keyword argument per data field in the `DataDict`, with
    the key being the name, and value the data to add. It is required that
    all added data has the same number of 'rows', i.e., the most outer dimension
    has to match for data to be inserted faithfully.
    If some data is scalar and others are not, then the data should be reshaped
    to (1, ) for the scalar data, and (1, ...) for the others; in other words,
    an outer dimension with length 1 is added for all.
    """
    self.datadict.add_data(**kwargs)

    if self.safe_write_mode:
        clean_dd_copy = self.datadict.structure()
        clean_dd_copy.add_data(**kwargs)
        filepath = self._generate_next_safe_write_path()

        datadict_to_hdf5(
            clean_dd_copy,
            filepath,
            groupname=self.groupname,
            append_mode=AppendMode.new,
            file_timeout=self.file_timeout,
        )

        delta_t = time.time() - self.last_reconstruction_time

        # Reconstructs the data every n_files_per_reconstruction files or every n_seconds_per_reconstruction seconds
        if (self.n_files - self.last_update_n_files >= self.n_files_per_reconstruction or
                delta_t > self.n_seconds_per_reconstruction):
            try:
                dd = reconstruct_safe_write_data(self.filepath, unification_from_scratch=False,
                                                 file_timeout=self.file_timeout)
                datadict_to_hdf5(dd, self.filepath, groupname=self.groupname, file_timeout=self.file_timeout, append_mode=AppendMode.none)
            except RuntimeError as e:
                logger.warning(f"Error while unifying data: {e} \nData is still getting saved in .tmp directory.")

            with FileOpener(self.filepath, "a", timeout=self.file_timeout) as f:
                add_cur_time_attr(f, name="last_change")
                add_cur_time_attr(f[self.groupname], name="last_change")

            # Even if I fail at reconstruction, I want to wait the same amount as if it was successful to try again.
            self.last_reconstruction_time = time.time()
            self.last_update_n_files = self.n_files

    else:
        nrecords = self.datadict.nrecords()
        if nrecords is not None and nrecords > 0:
            datadict_to_hdf5(
                self.datadict,
                str(self.filepath),
                groupname=self.groupname,
                file_timeout=self.file_timeout,
            )

            assert self.filepath is not None
            with FileOpener(self.filepath, "a", timeout=self.file_timeout) as f:
                add_cur_time_attr(f, name="last_change")
                add_cur_time_attr(f[self.groupname], name="last_change")

data_file_path()

Determine the filepath of the data file.

Returns:

Type Description
Path

The filepath of the data file.

Source code in labcore/data/datadict_storage.py
def data_file_path(self) -> Path:
    """Determine the filepath of the data file.

    :returns: The filepath of the data file.
    """
    data_folder_path = Path(self.basedir, self.data_folder())
    appendix = ""
    idx = 2
    while data_folder_path.exists():
        appendix = f"-{idx}"
        data_folder_path = Path(self.basedir, str(self.data_folder()) + appendix)
        idx += 1

    return Path(data_folder_path, self.filename)

data_folder()

Return the folder, relative to the data root path, in which data will be saved.

Default format: <basedir>/YYYY-MM-DD/YYYY-mm-ddTHHMMSS_<ID>-<name>. In this implementation we use the first 8 characters of a UUID as ID.

Returns:

Type Description
Path

The folder path.

Source code in labcore/data/datadict_storage.py
def data_folder(self) -> Path:
    """Return the folder, relative to the data root path, in which data will
    be saved.

    Default format:
    ``<basedir>/YYYY-MM-DD/YYYY-mm-ddTHHMMSS_<ID>-<name>``.
    In this implementation we use the first 8 characters of a UUID as ID.

    :returns: The folder path.
    """
    ID = str(self.uuid).split("-")[0]
    parent = f"{datetime.datetime.now().replace(microsecond=0).isoformat().replace(':', '')}_{ID}"
    if self.name:
        parent += f"-{self.name}"
    path = Path(time.strftime("%Y-%m-%d"), parent)
    return path

FileOpener

Context manager for opening files, creates its own file lock to indicate other programs that the file is being used. The lock file follows the following structure: "~.lock".

Parameters:

Name Type Description Default
path Union[Path, str]

The file path.

required
mode str

The opening file mode. Only the following modes are supported: 'r', 'w', 'w-', 'a'. Defaults to 'r'.

'r'
timeout Optional[float]

Time, in seconds, the context manager waits for the file to unlock. Defaults to 30.

None
test_delay float

Length of time in between checks. I.e. how long the FileOpener waits to see if a file got unlocked again

0.1
Source code in labcore/data/datadict_storage.py
class FileOpener:
    """
    Context manager for opening files, creates its own file lock to indicate other programs that the file is being
    used. The lock file follows the following structure: "~<file_name>.lock".

    :param path: The file path.
    :param mode: The opening file mode. Only the following modes are supported: 'r', 'w', 'w-', 'a'. Defaults to 'r'.
    :param timeout: Time, in seconds, the context manager waits for the file to unlock. Defaults to 30.
    :param test_delay: Length of time in between checks. I.e. how long the FileOpener waits to see if a file got
        unlocked again
    """

    def __init__(
        self,
        path: Union[Path, str],
        mode: str = "r",
        timeout: Optional[float] = None,
        test_delay: float = 0.1,
    ):
        self.path = Path(path)
        self.lock_path = self.path.parent.joinpath("~" + str(self.path.stem) + ".lock")
        if mode not in ["r", "w", "w-", "a"]:
            raise ValueError("Only 'r', 'w', 'w-', 'a' modes are supported.")
        self.mode = mode
        self.default_timeout = 300.0
        if timeout is None:
            self.timeout = self.default_timeout
        else:
            self.timeout = timeout
        self.test_delay = test_delay

        self.file: Optional[h5py.File] = None

    def __enter__(self) -> h5py.File:
        self.file = self.open_when_unlocked()
        return self.file

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        exc_traceback: Optional[TracebackType],
    ) -> None:
        try:
            assert self.file is not None
            self.file.close()
        finally:
            if self.lock_path.is_file():
                self.lock_path.unlink()

    def open_when_unlocked(self) -> h5py.File:
        t0 = time.time()
        while True:
            if not self.lock_path.is_file():
                try:
                    self.lock_path.touch(exist_ok=False)
                # This happens if some other process beat this one and created the file beforehand
                except FileExistsError:
                    continue

                while True:
                    try:
                        f = h5py.File(str(self.path), self.mode)
                        return f
                    except (OSError, PermissionError, RuntimeError):
                        pass
                    time.sleep(
                        self.test_delay
                    )  # don't overwhelm the FS by very fast repeated calls.
                    if time.time() - t0 > self.timeout:
                        raise RuntimeError("Waiting or file unlock timeout")

            time.sleep(
                self.test_delay
            )  # don't overwhelm the FS by very fast repeated calls.
            if time.time() - t0 > self.timeout:
                raise RuntimeError("Lock file remained for longer than timeout time")

add_cur_time_attr(h5obj, name='creation', prefix='__', suffix='__')

Add current time information to the given HDF5 object, following the format of: <prefix><name>_time_sec<suffix>.

Parameters:

Name Type Description Default
h5obj Any

The HDF5 object.

required
name str

The name of the attribute.

'creation'
prefix str

Prefix of the attribute.

'__'
suffix str

Suffix of the attribute.

'__'
Source code in labcore/data/datadict_storage.py
def add_cur_time_attr(
    h5obj: Any, name: str = "creation", prefix: str = "__", suffix: str = "__"
) -> None:
    """Add current time information to the given HDF5 object, following the format of:
    ``<prefix><name>_time_sec<suffix>``.

    :param h5obj: The HDF5 object.
    :param name: The name of the attribute.
    :param prefix: Prefix of the attribute.
    :param suffix: Suffix of the attribute.
    """

    t = time.localtime()
    tsec = time.mktime(t)
    tstr = time.strftime(TIMESTRFORMAT, t)

    set_attr(h5obj, prefix + name + "_time_sec" + suffix, tsec)
    set_attr(h5obj, prefix + name + "_time_str" + suffix, tstr)

all_datadicts_from_hdf5(path, file_timeout=None, **kwargs)

Loads all the DataDicts contained on a single HDF5 file. Returns a dictionary with the group names as keys and the DataDicts as the values of that key.

Parameters:

Name Type Description Default
path Union[str, Path]

The path of the HDF5 file.

required
file_timeout Optional[float]

How long the function will wait for the ddh5 file to unlock. If none uses the default value from the :class:FileOpener.

None

Returns:

Type Description
Dict[str, Any]

Dictionary with group names as key, and the DataDicts inside them as values.

Source code in labcore/data/datadict_storage.py
def all_datadicts_from_hdf5(
    path: Union[str, Path], file_timeout: Optional[float] = None, **kwargs: Any
) -> Dict[str, Any]:
    """
    Loads all the DataDicts contained on a single HDF5 file. Returns a dictionary with the group names as keys and
    the DataDicts as the values of that key.

    :param path: The path of the HDF5 file.
    :param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
        value from the :class:`FileOpener`.
    :return: Dictionary with group names as key, and the DataDicts inside them as values.
    """
    filepath = _data_file_path(path)
    if not os.path.exists(filepath):
        raise ValueError("Specified file does not exist.")

    ret = {}
    with FileOpener(filepath, "r", file_timeout) as f:
        keys = [k for k in f.keys()]
    for k in keys:
        ret[k] = datadict_from_hdf5(
            path=path, groupname=k, file_timeout=file_timeout, **kwargs
        )
    return ret

datadict_from_hdf5(path, groupname='data', startidx=None, stopidx=None, structure_only=False, ignore_unequal_lengths=True, file_timeout=None)

Load a DataDict from file.

Parameters:

Name Type Description Default
path Union[str, Path]

Full filepath without the file extension.

required
groupname str

Name of hdf5 group.

'data'
startidx Union[int, None]

Start row.

None
stopidx Union[int, None]

End row + 1.

None
structure_only bool

If True, don't load the data values.

False
ignore_unequal_lengths bool

If True, don't fail when the rows have unequal length; will return the longest consistent DataDict possible.

True
file_timeout Optional[float]

How long the function will wait for the ddh5 file to unlock. If none uses the default value from the :class:FileOpener.

None

Returns:

Type Description
DataDict

Validated DataDict.

Source code in labcore/data/datadict_storage.py
def datadict_from_hdf5(
    path: Union[str, Path],
    groupname: str = "data",
    startidx: Union[int, None] = None,
    stopidx: Union[int, None] = None,
    structure_only: bool = False,
    ignore_unequal_lengths: bool = True,
    file_timeout: Optional[float] = None,
) -> DataDict:
    """Load a DataDict from file.

    :param path: Full filepath without the file extension.
    :param groupname: Name of hdf5 group.
    :param startidx: Start row.
    :param stopidx: End row + 1.
    :param structure_only: If `True`, don't load the data values.
    :param ignore_unequal_lengths: If `True`, don't fail when the rows have
        unequal length; will return the longest consistent DataDict possible.
    :param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
        value from the :class:`FileOpener`.
    :return: Validated DataDict.
    """
    filepath = _data_file_path(path)
    if not filepath.exists():
        raise ValueError(f"Specified file '{filepath}' does not exist.")

    if startidx is None:
        startidx = 0

    res = {}
    with FileOpener(filepath, "r", file_timeout) as f:
        if groupname not in f:
            raise ValueError("Group does not exist.")

        grp = f[groupname]
        keys = list(grp.keys())
        lens = [len(grp[k][:]) for k in keys]

        if len(set(lens)) > 1:
            if not ignore_unequal_lengths:
                raise RuntimeError("Unequal lengths in the datasets.")

            if stopidx is None or stopidx > min(lens):
                stopidx = min(lens)
        else:
            if stopidx is None or stopidx > lens[0]:
                stopidx = lens[0]

        for attr in grp.attrs:
            if is_meta_key(attr):
                res[attr] = deh5ify(grp.attrs[attr])

        for k in keys:
            ds = grp[k]
            entry: Dict[str, Union[Collection[Any], np.ndarray]] = dict(
                values=np.array([]),
            )

            if "axes" in ds.attrs:
                entry["axes"] = deh5ify(ds.attrs["axes"]).tolist()
            else:
                entry["axes"] = []

            if "unit" in ds.attrs:
                entry["unit"] = deh5ify(ds.attrs["unit"])

            if not structure_only:
                entry["values"] = ds[startidx:stopidx]

            entry["__shape__"] = ds[:].shape

            # and now the meta data
            for attr in ds.attrs:
                if is_meta_key(attr):
                    _val = deh5ify(ds.attrs[attr])
                    entry[attr] = deh5ify(ds.attrs[attr])

            res[k] = entry

    dd = DataDict(**res)
    dd.validate()
    return dd

datadict_to_hdf5(datadict, path, groupname='data', append_mode=AppendMode.new, file_timeout=None)

Write a DataDict to DDH5

Note: Meta data is only written during initial writing of the dataset. If we're appending to existing datasets, we're not setting meta data anymore.

Parameters:

Name Type Description Default
datadict DataDict

Datadict to write to disk.

required
path Union[str, Path]

Path of the file (extension may be omitted).

required
groupname str

Name of the top level group to store the data in.

'data'
append_mode AppendMode
  • AppendMode.none : Delete and re-create group. - AppendMode.new : Append rows in the datadict that exceed the number of existing rows in the dataset already stored. Note: we're not checking for content, only length! - AppendMode.all : Append all data in datadict to file data sets.
new
file_timeout Optional[float]

How long the function will wait for the ddh5 file to unlock. Only relevant if you are writing to a file that already exists and some other program is trying to read it at the same time. If none uses the default value from the :class:FileOpener.

None
Source code in labcore/data/datadict_storage.py
def datadict_to_hdf5(
    datadict: DataDict,
    path: Union[str, Path],
    groupname: str = "data",
    append_mode: AppendMode = AppendMode.new,
    file_timeout: Optional[float] = None,
) -> None:
    """Write a DataDict to DDH5

    Note: Meta data is only written during initial writing of the dataset.
    If we're appending to existing datasets, we're not setting meta
    data anymore.

    :param datadict: Datadict to write to disk.
    :param path: Path of the file (extension may be omitted).
    :param groupname: Name of the top level group to store the data in.
    :param append_mode:
        - `AppendMode.none` : Delete and re-create group.
        - `AppendMode.new` : Append rows in the datadict that exceed
          the number of existing rows in the dataset already stored.
          Note: we're not checking for content, only length!

        - `AppendMode.all` : Append all data in datadict to file data sets.
    :param file_timeout: How long the function will wait for the ddh5 file to unlock. Only relevant if you are
        writing to a file that already exists and some other program is trying to read it at the same time.
        If none uses the default value from the :class:`FileOpener`.

    """
    filepath = _data_file_path(path, True)
    if not filepath.exists():
        append_mode = AppendMode.none

    with FileOpener(filepath, "a", file_timeout) as f:
        if append_mode is AppendMode.none:
            init_file(f, groupname)
        assert groupname in f
        grp = f[groupname]

        # add top-level meta data.
        for k, v in datadict.meta_items(clean_keys=False):
            set_attr(grp, k, v)

        for k, v in datadict.data_items():
            data = v["values"]
            shp = data.shape
            nrows = shp[0]

            # create new dataset, add axes and unit metadata
            if k not in grp:
                maxshp = tuple([None] + list(shp[1:]))
                ds = grp.create_dataset(k, maxshape=maxshp, data=data)

                # add meta data
                add_cur_time_attr(ds)

                if v.get("axes", []):
                    set_attr(ds, "axes", v["axes"])
                if v.get("unit", "") != "":
                    set_attr(ds, "unit", v["unit"])

                for kk, vv in datadict.meta_items(k, clean_keys=False):
                    set_attr(ds, kk, vv)
                ds.flush()

            # if the dataset already exits, append data according to
            # chosen append mode.
            else:
                ds = grp[k]
                dslen = ds.shape[0]

                if append_mode == AppendMode.new:
                    newshp = tuple([nrows] + list(shp[1:]))
                    ds.resize(newshp)
                    ds[dslen:] = data[dslen:]
                elif append_mode == AppendMode.all:
                    newshp = tuple([dslen + nrows] + list(shp[1:]))
                    ds.resize(newshp)
                    ds[dslen:] = data[:]
                ds.flush()

deh5ify(obj)

Convert slightly mangled types back to more handy ones.

Parameters:

Name Type Description Default
obj Any

Input object.

required

Returns:

Type Description
Any

Object

Source code in labcore/data/datadict_storage.py
def deh5ify(obj: Any) -> Any:
    """Convert slightly mangled types back to more handy ones.

    :param obj: Input object.
    :return: Object
    """
    if type(obj) == bytes:
        return obj.decode()

    if type(obj) == np.ndarray and obj.dtype.kind == "S":
        return np.char.decode(obj)

    return obj

h5ify(obj)

Convert an object into something that we can assign to an HDF5 attribute.

Performs the following conversions: - list/array of strings -> numpy chararray of unicode type

Parameters:

Name Type Description Default
obj Any

Input object.

required

Returns:

Type Description
Any

Object, converted if necessary.

Source code in labcore/data/datadict_storage.py
def h5ify(obj: Any) -> Any:
    """
    Convert an object into something that we can assign to an HDF5 attribute.

    Performs the following conversions:
    - list/array of strings -> numpy chararray of unicode type

    :param obj: Input object.
    :return: Object, converted if necessary.
    """
    if isinstance(obj, list):
        all_string = True
        for elt in obj:
            if not isinstance(elt, str):
                all_string = False
                break
        if not all_string:
            obj = np.array(obj)

    if type(obj) == np.ndarray and obj.dtype.kind == "U":
        return np.char.encode(obj, encoding="utf8")

    return obj

load_as_xr(folder, fn='data.ddh5', fields=None)

Load ddh5 data as xarray (only for gridable data).

Parameters

folder : data folder fn : str, optional filename, by default 'data.ddh5'

Returns

type description

Source code in labcore/data/datadict_storage.py
def load_as_xr(
    folder: Path, fn="data.ddh5", fields: Optional[List[str]] = None
) -> xr.Dataset:
    """Load ddh5 data as xarray (only for gridable data).

    Parameters
    ----------
    folder :
        data folder
    fn : str, optional
        filename, by default 'data.ddh5'

    Returns
    -------
    _type_
        _description_
    """
    fn = folder / fn
    dd = datadict_from_hdf5(fn)
    if fields is not None:
        dd = dd.extract(fields)
    xrdata = split_complex(dd2xr(datadict_to_meshgrid(dd)))
    xrdata.attrs["raw_data_folder"] = str(folder.resolve())
    xrdata.attrs["raw_data_fn"] = str(fn)
    return xrdata

reconstruct_safe_write_data(path, unification_from_scratch=True, file_timeout=None)

Creates a new DataDict from the data saved in the .tmp folder. This is used when the data is saved in the safe writing mode. The data is saved in individual files in the .tmp folder. This function reconstructs the data from these files and returns a DataDict with the data.

Parameters:

Name Type Description Default
path Union[str, Path]

The path to the folder containing the .tmp path

required
unification_from_scratch bool

If True, will reconstruct the data from scratch. If False, will try to load the data from the last reconstructed file.

True
file_timeout Optional[float]

How long the function will wait for the ddh5 file to unlock. If none uses the default value

None
Source code in labcore/data/datadict_storage.py
def reconstruct_safe_write_data(path: Union[str, Path],
                          unification_from_scratch: bool = True,
                          file_timeout: Optional[float] = None) -> DataDictBase:
    """
    Creates a new DataDict from the data saved in the .tmp folder. This is used when the data is saved in the safe
    writing mode. The data is saved in individual files in the .tmp folder. This function reconstructs the data from
    these files and returns a DataDict with the data.

    :param path: The path to the folder containing the .tmp path
    :param unification_from_scratch: If True, will reconstruct the data from scratch. If False, will try to load the
        data from the last reconstructed file.
    :param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default value
    """

    path = Path(path)

    tmp_path = path.parent / ".tmp"

    # FIXME: This should probably raise a warning more than a crash, but will leave it as crash for now
    if not tmp_path.exists():
        raise ValueError("Specified folder does not exist.")

    files = []
    for dirpath, dirnames, filenames in os.walk(str(tmp_path)):
        files.extend([(Path(dirpath)/file) for file in filenames if file.endswith(".ddh5")])

    files = sorted(files, key=lambda x: int(x.stem.split("#")[-1]))

    # Checks if data is already there.
    # If there is, loads it from the latest loaded file not to have to load unnecessary data
    if path.exists() and not unification_from_scratch:
        dd = datadict_from_hdf5(path, file_timeout=file_timeout)
        if not dd.has_meta("last_reconstructed_file"):
            raise ValueError("The file does not have the meta data 'last_reconstructed_file', "
                             "could not know where to reconstruct from.")
        last_reconstructed_file = Path(dd.meta_val("last_reconstructed_file"))
        if not last_reconstructed_file.exists() or last_reconstructed_file not in files:
            raise ValueError("When reconstructing the data, could find the last reconstructed file. "
                             "This indicates that something wrong happened in the tmp folder.")
        starting_index = files.index(last_reconstructed_file) + 1
    else:
        first = files.pop(0)
        dd = datadict_from_hdf5(first, file_timeout=file_timeout)
        starting_index = 0

    for file in files[starting_index:]:
        d = datadict_from_hdf5(file, file_timeout=file_timeout)
        # Create a dictionary with just the keys and values to add to the original one.
        dd.add_data(**{x[0]: d.data_vals(x[0]) for x in d.data_items()})

    # Add shape to axes
    for name, datavals in dd.data_items():
        datavals["__shape__"] = tuple(np.array(datavals["values"][:]).shape,)

    # Catches the edge case where there is a single file in the .tmp folder.
    # This will not happen other than the first time, so it is ok to have that first variable there.
    if len(files) > 0:
        dd.add_meta("last_reconstructed_file", str(files[-1]))
    else:
        dd.add_meta("last_reconstructed_file", str(first))

    return dd

set_attr(h5obj, name, val)

Set attribute name of object h5obj to val

Use :func:h5ify to convert the object, then try to set the attribute to the returned value. If that does not succeed due to a HDF5 typing restriction, set the attribute to the string representation of the value.

Source code in labcore/data/datadict_storage.py
def set_attr(h5obj: Any, name: str, val: Any) -> None:
    """Set attribute `name` of object `h5obj` to `val`

    Use :func:`h5ify` to convert the object, then try to set the attribute
    to the returned value. If that does not succeed due to a HDF5 typing
    restriction, set the attribute to the string representation of the value.
    """
    try:
        h5obj.attrs[name] = h5ify(val)
    except TypeError:
        newval = str(val)
        h5obj.attrs[name] = h5ify(newval)

timestamp_from_path(p)

Return a datetime timestamp from a standard-formatted path. Assumes that the path stem has a timestamp that begins in ISO-like format YYYY-mm-ddTHHMMSS.

Source code in labcore/data/datadict_storage.py
def timestamp_from_path(p: Path) -> datetime.datetime:
    """Return a `datetime` timestamp from a standard-formatted path.
    Assumes that the path stem has a timestamp that begins in ISO-like format
    ``YYYY-mm-ddTHHMMSS``.
    """
    timestring = str(p.stem)[:13] + ":" + str(p.stem)[13:15] + ":" + str(p.stem)[15:17]
    return datetime.datetime.fromisoformat(timestring)

Extra Tools

Data = Union[xr.Dataset, pd.DataFrame] module-attribute

Type alias for valid data. Can be either a pandas DataFrame or an xarray Dataset.

split_complex(data)

Split complex dependents into real and imaginary parts.

TODO: should update units as well

Parameters

data input data.

Returns

data with complex dependents split into real and imaginary parts.

Raises

NotImplementedError if data is not a pandas DataFrame or an xarray Dataset.

Source code in labcore/data/tools.py
def split_complex(data: Data) -> Data:
    """Split complex dependents into real and imaginary parts.

    TODO: should update units as well

    Parameters
    ----------
    data
        input data.

    Returns
    -------
    data with complex dependents split into real and imaginary parts.

    Raises
    ------
    NotImplementedError
        if data is not a pandas DataFrame or an xarray Dataset.
    """
    indep, dep = data_dims(data)

    if not isinstance(data, pd.DataFrame) and not isinstance(data, xr.Dataset):
        raise NotImplementedError

    dropped = []
    for d in dep:
        if np.iscomplexobj(data[d]):
            data[f"{d}_Re"] = np.real(data[d])
            data[f"{d}_Im"] = np.imag(data[d])
            if isinstance(data, xr.Dataset):
                data[f"{d}_Re"].attrs = data[d].attrs
                data[f"{d}_Im"].attrs = data[d].attrs
            dropped.append(d)
    if isinstance(data, pd.DataFrame):
        return data.drop(columns=dropped)
    else:
        return data.drop_vars(dropped)