Source code for labcore.ddh5

"""plottr.data.datadict_storage

Provides file-storage tools for the DataDict class.

Description of the HDF5 storage format
======================================

We use a simple mapping from DataDict to the HDF5 file. Within the file,
a single DataDict is stored in a (top-level) group of the file.
The data fields are datasets within that group.

Global meta data of the DataDict are attributes of the group; field meta data
are attributes of the dataset (incl., the `unit` and `axes` values). The meta
data keys are given exactly like in the DataDict, i.e., incl the double
underscore pre- and suffix.
"""
import os
import time
from enum import Enum
from typing import Any, Union, Optional, Dict, Type, Collection, List
from types import TracebackType
from pathlib import Path
import json
import pickle
import shutil
import glob

import numpy as np
import h5py

from plottr.data.datadict import DataDict, is_meta_key
from plottr.data.datadict_storage import DDH5Writer

from .measurement.sweep import Sweep

__author__ = 'Wolfgang Pfaff'
__license__ = 'MIT'

TIMESTRFORMAT = "%Y-%m-%dT%H%M%S"


def _create_datadict_structure(sweep: Sweep) -> DataDict:
    """
    Returns a structured DataDict from the DataSpecs of a Sweep.

    :param sweep: Sweep object from which the DataDict is created.
    """

    data_specs = sweep.get_data_specs()
    data_dict = DataDict()
    for spec in data_specs:

        depends_on = spec.depends_on
        unit = spec.unit
        name = spec.name

        # Checks which fields have information and which ones are None.
        if depends_on is None:
            if unit is None:
                data_dict[name] = dict()
            else:
                data_dict[name] = dict(unit=unit)
        else:
            if unit is None:
                data_dict[name] = dict(axes=depends_on)
            else:
                data_dict[name] = dict(axes=depends_on, unit=unit)

    data_dict.validate()

    return data_dict


def _check_none(line: Dict, all: bool = True) -> bool:
    """
    Checks if the values in a Dict are all None.
    :returns: True if all values are None, False otherwise.
    """
    if all:
        for k, v in line.items():
            if v is None:
                return True
        return False

    if len(set(line.values())) == 1:
        for k, v in line.items():
            if v is None:
                return True
    return False


def _save_dictionary(dict: Dict, filepath: str) -> None:
    with open(filepath, 'w') as f:
        json.dump(dict, f, indent=2, sort_keys=True, cls=NumpyEncoder)


def _pickle_and_save(obj, filepath: str) -> None:
    try:
        with open(filepath, 'wb') as f:
            pickle.dump(obj, f)
    except TypeError as pickle_error:
        print(f'Object could not be pickled: {pickle_error.args}')


[docs]class NumpyEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() return json.JSONEncoder.default(self, obj)
[docs]def run_and_save_sweep(sweep: Sweep, data_dir: str, name: str, ignore_all_None_results: bool = True, save_action_kwargs: bool = False, add_timestamps = False, archive_files: List[str]=None, **extra_saving_items) -> None: """ Iterates through a sweep, saving the data coming through it into a file called <name> at <data_dir> directory. :param sweep: Sweep object to iterate through. :param data_dir: Directory of file location. :param name: Name of the file. :param ignore_all_None_results: if ``True``, don't save any records that contain a ``None``. if ``False``, only do not save records that are all-``None``. :param save_action_kwargs: If ``True``, the action_kwargs of the sweep will be saved as a json file named after the first key of the kwargs dctionary followed by '_action_kwargs' in the same directory as the data. :param archive_files: List of files to copy into a folder called 'archived_files' in the same directory that the data is saved. It should be a list of paths (str), regular expressions are supported. If a folder is passed, it will copy the entire folder and all of its subdirectories and files into the archived_files folder. If one of the arguments could not be found, a message will be printed and the measurement will be performed without the file being archived. An exception is raised if the type is invalid. e.g. archive_files=['*.txt', 'calibration_files', '../test_file.py']. '*.txt' will copy every txt file located in the working directory. 'calibration_files' will copy the entire folder called calibration_files from the working directory into the archived_files folder. '../test_file.py' will copy the script test_file.py from one directory above the working directory. :param extra_saving_items: Kwargs for extra objects that should be saved. If the kwarg is a dictionary, the function will try and save it as a JSON file. If the dictionary contains objects that are not JSON serializable it will be pickled. Any other kind of object will be pickled too. The files will have their keys as names. :raises TypeError: A Typerror is raised if the object passed for archive_files is not correct """ data_dict = _create_datadict_structure(sweep) # Creates a file even when it fails. with DDH5Writer(data_dict, data_dir, name=name) as writer: # Saving meta-data dir: Path = writer.filepath.parent if add_timestamps: t = time.localtime() time_stamp = time.strftime(TIMESTRFORMAT, t) + '_' for key, val in extra_saving_items.items(): if callable(val): value = val() else: value = val if add_timestamps: pickle_path_file = os.path.join(dir, time_stamp + key + '.pickle') json_path_file = os.path.join(dir, time_stamp + key + '.json') else: pickle_path_file = os.path.join(dir, key + '.pickle') json_path_file = os.path.join(dir, key + '.json') if isinstance(value, dict): try: _save_dictionary(value, json_path_file) except TypeError as error: # Delete the file created by _save_dictionary. This file does not contain the complete dictionary. if os.path.isfile(json_path_file): os.remove(json_path_file) print(f'{key} has not been able to save to json: {error.args}.' f' The item will be pickled instead.') _pickle_and_save(value, pickle_path_file) else: _pickle_and_save(value, pickle_path_file) # Save the kwargs if save_action_kwargs: if add_timestamps: json_path_file = os.path.join(dir, time_stamp + 'sweep_action_kwargs.json') else: json_path_file = os.path.join(dir, 'sweep_action_kwargs.json') _save_dictionary(sweep.action_kwargs, json_path_file) # Save archive_files if archive_files != None: archive_files_dir = os.path.join(dir, 'archive_files') os.mkdir(archive_files_dir) if not isinstance(archive_files, list) and not isinstance(archive_files, tuple): if isinstance(archive_files, str): archive_files = [archive_files] else: raise TypeError(f'{type(archive_files)} is not a list.') for path in archive_files: if os.path.isdir(path): folder_name = os.path.basename(path) if folder_name == '': folder_name = os.path.basename(os.path.dirname(path)) shutil.copytree(path, os.path.join(archive_files_dir, folder_name), dirs_exist_ok=True) elif os.path.isfile(path): shutil.copy(path, archive_files_dir) else: matches = glob.glob(path, recursive=True) if len(matches) == 0: print(f'{path} could not be found. Measurement will continue without archiving {path}') for file in matches: shutil.copy(file, archive_files_dir) # Save data. for line in sweep: if not _check_none(line, all=ignore_all_None_results): writer.add_data(**line) print('The measurement has finished successfully and all of the data has been saved.')