Source code for data_import

# -*- coding: utf-8 -*-

"""

The below functions can be used to import delimited data files into Numpy or
Matlab database format.

"""

import argparse
import copy
import glob
import math
import os
import re
from enum import Enum

import numpy as np
import pkg_resources
# pylint: disable=no-member
import scipy.io


class _Colors:
    """

    A collection of colors that can be used to highlight terminal outputs.

    """

    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'


class _TextSnippets(Enum):
    """

    Text snippets to be used when merging delimited files.

    """
    header = "This file was automatically generated using the merge_del\n" \
             "function of the Python tribology package, version {}.\n" \
             "\n" \
             "See here for more information:\n" \
             "https://pypi.org/project/tribology/\n"\
             "\n"\
             "The file contains data from the following source files " \
             "(in  order):\n"

    seperator = "\n" \
                "Beginning of file:\n" \
                "{}\n"


def __make_dir(dirpath):
    if not os.path.isdir(dirpath):
        os.makedirs(dirpath)
    return dirpath


def __get_outpath(outdir):
    if outdir:
        outpath = __make_dir(outdir)
    else:
        outpath = os.getcwd()
    return outpath


def __get_outfile(in_file, idx, out_ext):
    fname = ''.join(in_file.split('.')[:-1]).split(os.sep)[-1]
    return '{}-{}.{}'.format(fname, str(idx), out_ext)


def __num_char(char):
    return bool(char.isdigit() or char == '-')


[docs]def split_del(file, deli='\t', ext='txt', cmin=3, hspan=1, outdir=None, force=False): """ Split a delimited data file into several separate data files, if the file contains more than one block of data. Blocks of data are typically separated by at least one line of column headers. The first data column of each data block has to be numeric. This function is meant to be used on data files where different blocks of data have different numbers of columns or different column headers. After splitting the data file into individual data files, import methods like :code:`import_del` can be used on the individual files. If all data should be merged into a single database afterwards, the :code:`merge_npz` function can be used. Parameters ---------- file: str Path to the data file. deli: str, optional Delimiter used to separate data columns in :code:`file` ext: str, optional File extension of output files. Default is :code:`txt` cmin: int, optional Minimum number of columns that a line of data needs to have in order to be classified as data. hspan: int, optional Maximum number of non-data lines above each data block that should be written to individual data files (usually equal to number of lines spanned by the column headers). outdir: str, optional Path to output directory. Default is current working directory. force: bool If True, existing output files will be overwritten. Will raise an exception if file exists and force is False. Returns ------- outfiles: list Paths to output files. """ outpath = __get_outpath(outdir) outfiles = [] idx = 0 f_out = None write = False to_write = [] with open(file) as infile: for line in infile: # if first character of line is not numeric if not __num_char(line[0]): write = False to_write.append(line) while len(to_write) > hspan: del to_write[0] else: # if numeric line has at least 'cmin' columns if len(line.split(deli)) >= cmin and not write: write = True idx += 1 f_out = os.sep.join([outpath, __get_outfile(file, idx, ext)]) if f_out not in outfiles: outfiles.append(f_out) if os.path.isfile(f_out): if force: os.remove(f_out) else: raise OSError("output file exists. " "use argument 'force' to overwrite.") if write and f_out: with open(f_out, "a") as out: for element in to_write: out.write(element) to_write = [] out.write(line) return outfiles
def __verify_merge(in_files, accum): """ Check if all npz files have same set of keys and contain all keys in accum. Throw exception if not. Parameters ---------- in_files: list Paths to database files to merge. Files are merged in order. accum: list Database keys for which values should be accumulated. Values must be numeric. """ ref_keys = [] for idx, file in enumerate(in_files): keys = sorted(np.load(file).keys()) if idx == 0: ref_keys = copy.deepcopy(keys) if keys != ref_keys: raise KeyError('keys in npz databases 0 and {} differ'.format(idx)) if accum and not all(key in keys for key in accum): raise KeyError('key(s) defined in accum not in npz database {}' .format(file))
[docs]def merge_npz(in_files, accum=None, safe=True): """ Merge npz databases by concatenating all databases in :code:`in_files`. Databases are concatenated in the order given in :code:`in_files`. Database keys for which values are to be accumulated can be given as a list using the :code:`accum` argument. For examples, if all databases have the key :code:`time`, then :code:`accum=['time']` will produce a continuous time axis, adding the last time value of the first database to all time values of the second database (and so on). Parameters ---------- in_files: list Paths to database files to merge. Files are merged in order. accum: list Database keys for which values should be accumulated. Values must be numeric. safe: bool If True, checks will be performed to ensure that all databases share the exact same set of keys and that all keys in :code:`accum` are in all databases. An exception (type KeyError) will be raised if not. Returns ------- merged: dict Merged data. """ if safe: __verify_merge(in_files, accum) merged = {} for file in in_files: in_dat = np.load(file) for key in in_dat.keys(): if key in merged: if accum and key in accum: merged[key] = np.append(merged[key], in_dat[key] + merged[key][-1]) else: merged[key] = np.append(merged[key], in_dat[key]) else: merged[key] = in_dat[key] return merged
def __get_version(package): """ Get the version of a Python package. Parameters ---------- package: str The name of the package Returns ------- Version number as string. """ return pkg_resources.get_distribution(package).version def __long_substr(strings): """ Returns longest common substring of list of strings. taken from: # https://stackoverflow.com/questions/2892931/longest-common-substring- from-more-than-two-strings-python Parameters ---------- strings: list A list of strings. Returns ------- substr: str The longest common substring of all list elements. For a list with only one element, the list element is returned; for an empty list, and empty string is returned. """ substr = '' if len(strings) > 1 and len(strings[0]) > 0: for i in range(len(strings[0])): for j in range(len(strings[0]) - i + 1): if j > len(substr) and all(strings[0][i:i + j] in x for x in strings): substr = strings[0][i:i + j] return substr
[docs]def merge_del(in_files, out_file=None): """ Merge several delimited data files into a single file. The merged file contains all data from the data files, in the order given in the :code:`in_files` argument. No checks are performed to ensure that the data files have a compatible format, for example the same number of data columns. Parameters ---------- in_files: list File paths to the files to be merged. Files will be merged in order. out_file: str, optional Path to output file, including file extension. If no path is provided, a file name is generated based on the input file names. Returns ------- out_file_abs: str Absolute path to the merged file. """ if len(in_files) == 0: raise ValueError('need at least one file to merge') in_files_abs = [os.path.abspath(file) for file in in_files] if out_file: out_file_abs = os.path.abspath(out_file) else: out_file = __long_substr(in_files_abs).split('.')[0] out_file_abs = out_file + 'xxx-merged.txt' max_len_path = max(len(file) for file in in_files_abs) with open(out_file_abs, "w") as txt_file: # write header txt_file.write(str(_TextSnippets.header.value).format( __get_version("tribology"))) for in_file in in_files_abs: txt_file.write(in_file + "\n") # write files for in_file in in_files_abs: txt_file.write('\n' + '#' * max_len_path) txt_file.write(str(_TextSnippets.seperator.value).format(in_file)) txt_file.write('#' * max_len_path + '\n') with open(in_file) as file: for line in file: txt_file.write(line) return out_file_abs
def __print_status(message, status_color=_Colors.ENDC): """ Print a color-coded message to the terminal. Parameters ---------- message: str The message to print to the terminal. status_color: The color in which to print the message. """ print(status_color + message + _Colors.ENDC) def __is_floatable(num): """ Check if 'num' can be converted to float. If yes, return :code:`True`, else return :code:`False`. """ try: float(num) return True except ValueError: return False def __to_float(num): """ Try to convert 'num' to float, return 'num' if it's not possible, else return converted :code:`num`. """ try: float(num) return float(num) except ValueError: return num def __assemble_data_table(num_data_tables, max_num_data_length): """ Assemble the complete data table from a list of data tables. """ num_data = np.zeros(( (len(num_data_tables) - 1) * max_num_data_length + num_data_tables[-1].shape[0], num_data_tables[-1].shape[1]), dtype=object) for idx, data_table in enumerate(num_data_tables): # do this for all but the last data table if idx + 1 < len(num_data_tables): num_data[idx * max_num_data_length: (idx + 1) * max_num_data_length, :] = data_table # do this for the last data table else: num_data[idx * max_num_data_length:, :] = data_table return num_data def __write_to_out_dict(num_data, column_headers, pcs=False): """ Extract the data columns from the num_data array and write them to a dictionary. Parameters ---------- num_data: ndarray The data extracted from the delimited file, stored in a single table. column_headers: ndarray The column headers corresponding to the columns in :code:`num_data` Returns ------- out_dict: dict A dictionary containing all data that is to be saved to the output database. Keys are based on column headers, values are data columns of num_data. """ out_dict = {'column_headers': column_headers} for idx, column in enumerate(column_headers): # explicitly take care of the fact that PCS forgot a '\tab' character in # their data export implementation if column == 'image_file_name' and \ math.isnan(float(num_data[0, idx])) and not \ column_headers[column_headers.tolist().index(column) - 1] and \ pcs is True: out_dict[column] = num_data[:, idx - 1].astype(object)[:, None] # take care of all other columns # if empty data columns are not padded with tabs elif column: if idx >= num_data.shape[1]: out_dict[column] = np.zeros(num_data.shape[1]) * float('nan') else: # if data is of numeric type if __is_floatable(num_data[0, idx]): out_dict[column] = num_data[:, idx].astype(float)[:, None] # if data is of other type (string) else: out_dict[column] = num_data[:, idx].astype(object)[:, None] return out_dict def __process_header(heads): """ Process the column headers by removing special characters and converting to Matlab-optimized data type. Parameters ---------- prev_line: list of strings The column headers of the delimited file. Returns ------- col_heads: ndarray (dtype = object) The re-formated column headers. """ merge = [] # merge colum headers if they span several lines for i in range(len(heads[0])): merge.extend([' '.join([heads[row][i] for row in range(len(heads))])]) # replace non-alphanumeric characters and trailing underscores col_heads = [re.sub(r"\W+", '_', item.lower()).strip('_') for item in merge] # convert data type for easy matlab export col_heads = np.asarray(col_heads, dtype='object') return col_heads def __process_data(split_line, num_dat, max_len, num_data_tables): """ Append a data line to the current data table. If the length of the current data table exceeds the maximum permitted data table length, save the current data table to a list of data tables and initialise a new one. Parameters ---------- split_line: ls The data that is to be appended to the table. num_dat: ndarray The current data table to which the last line of data was appended. max_len: positive int The maximum length of a data table. num_data_tables: ls The complete list of data tables. Returns ------- num_dat: ndarray The data table to which the current line of data was appended. """ # if data table becomes large, make new data table and add old # table to table list (for speed) if num_dat.shape[0] == max_len: num_data_tables.append(num_dat) num_dat = np.asarray( [__to_float(item.rstrip('\n')) for item in split_line]).reshape((1, len(split_line))) # else simply append to data table else: num_dat = np.append(num_dat, np.asarray( [__to_float(item.rstrip('\n')) for item in split_line]) .reshape((1, len(split_line))), axis=0) return num_dat def __process_file(in_file, dec_mark, deli, pad=0, colheadlines=1): """ Extract data from a delimited text file and return a dictionary containing all data. Parameters ---------- in_file: str The file handle of the delimited file that is to be imported. dec_mark: str The decimal mark of the data file. deli: str The delimiter used to separate data columns in the delimited file. pad: positive int Ignore the first :code:`n` leading columns in the delimited file, where :code:`n = pad`. For example, if pad = 8, the first 8 columns are ignored. Returns ------- out_dict: dict A dictionary containing all data that is to be saved to the output database. Keys are based on column headers, values are data columns of num_data. """ max_len = 1000 num_dat = [] col_heads = [] num_data_tables = [] prev_lines = [] with open(in_file) as dat_file: for line in dat_file: split_line = line.replace(dec_mark, '.').split(deli) if len(split_line) > pad: split_line = split_line[pad:] # get rid of trailing newline characters if split_line[-1] == '\n': split_line[-1] = '' # check if first character is not (digit or minus symbol (hyphen)) # to identify non-data lines. skip non-data lines. if not (line[0].isdigit() or line[0] == '-') or \ len(split_line) <= 1: if split_line != ['']: prev_lines.append(split_line) if len(prev_lines) > colheadlines: del prev_lines[0] continue # if line contains data, split line into data fields, fill empty # fields with 'nan' split_line[:] = (item or 'nan' for item in split_line) # if this is the first data-containing line... if not len(col_heads): # get the column headers col_heads = __process_header(prev_lines) # write the first line to the data table num_dat = np.asarray( [__to_float(item.rstrip('\n')) for item in split_line]).reshape((1, len(split_line))) else: num_dat = __process_data(split_line, num_dat, max_len, num_data_tables) # assemble the complete data table and create output dictionary num_data_tables.append(num_dat) num_dat = __assemble_data_table(num_data_tables, max_len) return num_dat, col_heads def __get_file_handles(in_dir, ext, recursive=False): """ Get file handles for all delimited files that are to be imported. Parameters ---------- in_dir: str The directory in which the delimited files are stored. ext: str The file extension of the delimited files. recursive: bool, optional If :code:`True`, delimited files are imported for all child directories of :code:`directory` (including :code:`directory`). If :code:`False`, only files in :code:`directory` are imported. Default is :code:`False`. Returns ------- in_files: ls of strings The file handles to all delimited files that are to be imported. """ if not recursive: in_files = sorted(glob.glob('{}{}*.{}'.format(in_dir, os.sep, ext))) else: in_files = [] dir_list = [x[0] + os.sep for x in os.walk(in_dir)] for directory in dir_list: in_files.extend(sorted(glob.glob('{}*.{}'.format(directory, ext)))) # in_files = [f.replace(in_dir, '').lstrip(os.sep) for f in in_files] return in_files def __save_out_file(out_file, out_dict, out_ext): """ Save the imported data to an output database, either in Numpy or Matlab format. Parameters ---------- out_file: str A handle to the output file that was generated during import. out_dict: dict The output data stored in a dictionary where keys correspond to column headers, values correspond to data. out_ext: str The file extension (format) of the output file. Options are :code:`npz` for Numpy format and :code:`mat` for Matlab database format. Returns ------- out_file: str A handle to the output file that was generated after import. """ if out_ext == 'mat': out_file = '{}.mat'.format(out_file) scipy.io.savemat(out_file, out_dict) elif out_ext == 'npz': out_file = '{}.npz'.format(out_file) np.savez(out_file, **out_dict) return out_file def __get_out_file(in_file, out_dir): """ Get the path of the output file. Parameters ---------- in_file: str Path to input file. out_dir: str Path to output directory. Returns ------- file_no_ext: str The file name without extension. out_dir: str The path to the output directory. out_file: str The path of the output file. """ if out_dir == '': out_dir = os.path.dirname(in_file) file_no_ext = os.path.splitext(in_file)[0].split(os.sep)[-1] if out_dir == '': out_dir = '.' out_file = '/'.join([out_dir, file_no_ext]) return file_no_ext, out_dir, out_file def __import_file(in_file, out_file, out_ext, force=False, deli='\t', dec_mark='.', pad=0, colheadlines=1): import_status = None num_dat = None col_heads = None out_file_exists = os.path.isfile('{}.{}'.format(out_file, out_ext)) if (not out_file_exists) or (force is True): try: num_dat, col_heads = __process_file(in_file, dec_mark, deli, pad=pad, colheadlines=colheadlines) import_status = True except (ValueError, AttributeError): import_status = False return num_dat, col_heads, import_status
[docs]def import_del(in_file, force=False, deli='\t', dec_mark='.', out_ext='npz', out_dir='', pad=0, colheadlines=1): """ Import a delimited data file into Numpy or Matlab database format. The file must have at least two data columns that are separated by :code:`deli`. Parameters ---------- in_file: str The file handle of the delimited file that is to be imported. force: bool, optional If :code:`True`, existing output files will be overwritten during import. Default is :code:`False`. deli: str, optional The delimiter used to separate data columns in the delimited file. Default is tab. dec_mark: str, optional The decimal mark of the data file. Default is dot. out_ext: str, optional The file extension (format) of the output file. Default is :code:`npz` for Numpy database format. Alternative is :code:`mat` for Matlab database format. out_dir: str, optional The absolute or relative path to the output directory. Default is the current working directory. pad: positive int The numbers of data columns to skip. For :code:`pad = n`, the first :code:`n` data columns will not be imported. colheadlines: int, optional The number of lines spanned by the column headers. If several lines are spanned, the lines will be merged to generate the column keys in the output dictionary. Returns ------- out_file: str A handle to the output file that was generated during import. import_status: str The import status of :code:`in_file`. If :code:`True`, the file was successfully imported. If :code:`False`, file import was attempted and failed. If :code:`None`, file import was not attempted (most likely because an output file with the same name already exists). out_dict: dict The data that was imported from :code:`in_file`. """ _, out_dir, out_file_no_ext = __get_out_file(in_file, out_dir) out_dict = None num_dat, col_heads, import_status = \ __import_file(in_file, out_file_no_ext, out_ext, force=force, deli=deli, dec_mark=dec_mark, pad=pad, colheadlines=colheadlines) if import_status is True: out_dict = __write_to_out_dict(num_dat, col_heads) out_file = __save_out_file(out_file_no_ext, out_dict, out_ext) else: out_file = None return out_file, import_status, out_dict
def __gen_acc_time(step_time, steps, outformat='npz'): """ For files produced by PCS Instrument test rigs, generate a continuous time axis by combining all step times from all steps. """ # get index of last data point of each step current_step_end = np.where(np.subtract(step_time[1:], step_time[0:-1]) < 0) step_end = np.append(current_step_end[0], [step_time.shape[0] - 1]) # get index of first data point of each step step_start = np.append([0], [step_end[0:-1] + 1]) # add empty steps for mapper steps step_start_with_other = [] step_end_with_other = [] idx = 0 for step_type in steps: if step_type == 'data': step_start_with_other.append(step_start[idx]) step_end_with_other.append(step_end[idx]) idx += 1 elif step_type == 'other': if step_start_with_other: step_start_with_other.append(step_end_with_other[-1]) step_end_with_other.append(step_end_with_other[-1]) else: step_start_with_other.append(0) step_end_with_other.append(0) # loop over steps and create continuous time axis time_accumulated_s = copy.copy(step_time) offset = 0 for step in range(1, len(step_end)): offset += step_time[step_end[step - 1]] time_accumulated_s[step_start[step]:step_end[step] + 1] += offset # save data to dictionary if outformat == 'mat': sub_dict = {'time_accumulated_s': time_accumulated_s, 'step_start': [s + 1 for s in step_start_with_other], 'step_end': [s + 1 for s in step_end_with_other]} else: sub_dict = {'time_accumulated_s': time_accumulated_s, 'step_start': step_start_with_other, 'step_end': step_end_with_other} return sub_dict def __post_process_image_data(out_dict): """ For SLIM Mapper Analysis files produced by PCS Instrument test rigs, extract the (x, y) coordinate system, generate an (x, y) grid and map the film thickness data to the grid. """ img_dat = {} # get (unique) x and y axis values and allocate film thickness matrix x_ax = out_dict['x'] y_ax = out_dict['y'] x_uniq = np.unique(x_ax) y_uniq = np.unique(y_ax) x_index = np.zeros(len(x_ax)) y_index = np.zeros(len(y_ax)) film = np.zeros((len(x_uniq), len(y_uniq))) * float('nan') # get unique rank index for each element in x and y for idx, rank_value in enumerate(sorted(x_uniq)): x_index[np.where(x_ax == rank_value)[0]] = idx for idx, rank_value in enumerate(sorted(y_uniq)): y_index[np.where(y_ax == rank_value)[0]] = idx # combine x and y indices in a list that can be used to index the film array arr_idx = [x_index.astype(int), y_index.astype(int)] # assign all measured film thickness values to film thickness matrix film[arr_idx] = out_dict['film'][:, 0] # create variables that simplify plotting of film thickness data img_dat['film_surf'] = film img_dat['x_set'] = np.asarray(list(x_uniq))[:, None] img_dat['y_set'] = np.asarray(list(y_uniq))[:, None] img_dat['x_grid'], img_dat['y_grid'] = \ np.meshgrid(img_dat['x_set'], img_dat['y_set'], indexing='ij') return img_dat def __get_pcs_steps(in_file): """ Get a list indicating the type of step for each step in a PCS data file. Parameters ---------- in_file: str Path to PCS file Returns ------- steps: list A list of step types. for numeric data, the step type is 'data', for other step types 'other' """ steps = [] with open(in_file) as dat_file: for line in dat_file: if line.startswith('Step ') and ' started at ' in line: steps.append('data') if line.lower().startswith('step type mapper ') or \ line.lower().startswith('step type zero_check ') or \ line.lower().startswith('step type film_zero ') or \ line.lower().startswith('step type heating '): steps[-1] = 'other' return steps
[docs]def import_pcs(in_file, force=False, out_ext='npz', out_dir=''): """ Import a delimited data file that was produced by an MTM, ETM or EHD2 test rig manufactured by PCS Instruments. The method calls the :code:`import_del` method to perform a basic import of a delimited text file, and generates additional output variables that simplify data analysis. Parameters ---------- in_file: str The file handle of the delimited file that is to be imported. force: bool, optional If :code:`True`, existing output files will be overwritten during import. Default is :code:`False`. out_ext: str, optional The file extension (format) of the output file. Default is :code:`npz` for Numpy database format. Alternative is :code:`mat` for Matlab database format. out_dir: str, optional The absolute or relative path to the output directory. Default is the current working directory. Returns ------- out_file: str A handle to the output file that was generated during import. import_status: str The import status of :code:`in_file`. If :code:`True`, the file was successfully imported. If :code:`False`, file import was attempted and failed. If :code:`None`, file import was not attempted (most likely because an output file with the same name already exists). out_dict: dict The data that was imported from :code:`in_file`. """ _, out_dir, out_file_no_ext = __get_out_file(in_file, out_dir) out_dict = None out_file = None num_dat, col_heads, import_status = \ __import_file(in_file, out_file_no_ext, out_ext, force=force, deli='\t', dec_mark='.', pad=8) steps = __get_pcs_steps(in_file) if import_status is True: out_dict = __write_to_out_dict(num_dat, col_heads, pcs=True) try: if 'step_time_s' in out_dict: t_dict = \ __gen_acc_time(out_dict['step_time_s'].astype(float), steps, out_ext) out_dict = {**out_dict, **t_dict} out_dict = {**out_dict, **__post_process_image_data(out_dict)} except KeyError: pass except IndexError: out_dict = None import_status = False if import_status: out_file = __save_out_file(out_file_no_ext, out_dict, out_ext) return out_file, import_status, out_dict
def __print_import_stats(in_file, status): """ Print the import status to the console. Parameters ---------- in_file: str The file name of the file for which to print the status. status: bool or None The import status of :code:`in_file`. """ if status is False: out_col = _Colors.FAIL elif status is True: out_col = _Colors.OKGREEN else: out_col = _Colors.WARNING out_str = '\t'.join([str(status), str(in_file)]) __print_status(out_str, out_col) def __parse_args(): """ Parse all parser arguments that are provided when the script is running in a terminal. Returns ------- args: Namespace The parsed parser arguments. """ parser = argparse.ArgumentParser() parser.add_argument('-f', '--force', action="store_true", default=False, help='overwrite existing database files during import') parser.add_argument('-e', '--extension', action="store", default='txt', help='specify file extension. default is "txt"') parser.add_argument('-d', '--delimiter', action="store", default='\t', help='specify column delimiter. default is tab (\\t)') parser.add_argument('-m', '--mark', action="store", default='.', help='specify decimal mark for numeric data. default is' ' dot (.)') parser.add_argument('-o', '--outformat', action="store", default='npz', help='specify output database format. default is "npz"' ' for numpy database. use "mat" for matlab ' ' database format.') parser.add_argument('-r', '--recursive', action="store_true", default=False, help='recursively walk through all sub-directories of' ' current working directory') parser.add_argument('-p', '--pcs', action="store_true", default=False, help='indicate if files are pcs files.') parser.add_argument('-c', '--colheadlines', action="store", default='1', help='number of lines spanned by the column headers') args = parser.parse_args() return args
[docs]def import_dir(in_dir, in_ext='txt', recursive=False, force=False, deli='\t', dec_mark='.', out_ext='npz', out_dir='', print_stat=False, pcs=False, colheadlines=1): """ Import all delimited data files in a directory into Numpy or Matlab database format. Optionally, all data files in a directory and all its child directories can be imported. The method can be applied to regular delimited files as well as files generated by test rigs made by PCS Instruments. All files must have at least two data columns that are separated by :code:`deli`. Parameters ---------- in_dir: str Path to directory for which to import all files with extension :code:`in_ext`. If :code:`recursive=True`, imports are performed for all files with extension :code:`in_ext` in the directory tree with parent :code:`in_dir`. in_ext: str, optional File extension of files to import (without dot). Default is :code:`txt`. recursive: bool, optional If :code:`True`, all files in :code:`in_dir` and all its child directories are imported. Default is :code:`False`. force: bool, optional If :code:`True`, existing output files will be overwritten during import. Default is :code:`False`. deli: str, optional The delimiter used to separate data columns in the delimited file. Default is tab. dec_mark: str, optional The decimal mark of the data file. Default is dot. out_ext: str, optional The file extension (format) of the output file. Default is :code:`npz` for Numpy database format. Alternative is :code:`mat` for Matlab database format. out_dir: str, optional The path to the output directory where output databases are stored after import. By default, files are stored in :code:`in_dir` if :code:`recursive=False`. If :code:`recursive=True`, files are stored in the respective child directories of :code:`in_dir` if :code:`out_dir` is not specified. print_stat: bool, optional If :code:`True`, the current import status is printed to the console. Default is :code:`False`. pcs: bool, optional If :code:`True`, the delimited files are treated like files that were generated using an MTM or EHD2 test rig manufactured by PCS Instruments. colheadlines: int, optional The number of lines spanned by the column headers. If several lines are spanned, the lines will be merged to generate the column keys in the output dictionary. Returns ------- in_files: ls of strings The file handles of all files for which import was attempted. out_files: ls of strings The file handles of all output files that were generated during the import process. import_status: ls of bools The import status of each file in :code:`in_files`. If :code:`True`, the file was successfully imported. If :code:`False`, file import was attempted and failed. If :code:`None`, file import was not attempted (most likely because an output file with the same name already exists). """ in_files = __get_file_handles(in_dir, in_ext, recursive) # type: ls out_files = [] import_status = [] if print_stat: print('importing {} files'.format(len(in_files))) print('status\tfilename\n' '======\t========') for in_file in in_files: if pcs is False: out_file, status, _ = import_del(in_file, force=force, deli=deli, dec_mark=dec_mark, out_ext=out_ext, out_dir=out_dir, colheadlines=colheadlines) else: out_file, status, _ = import_pcs(in_file, force=force, out_ext=out_ext, out_dir=out_dir) out_files.append(out_file) import_status.append(status) if print_stat: __print_import_stats(in_file, status) return in_files, out_files, import_status
if __name__ == "__main__": # if the file is executed as a script, import all data files in the # current working directory based on the parser arguments provided. ARGS = __parse_args() import_dir(os.getcwd(), in_ext=ARGS.extension, recursive=ARGS.recursive, force=ARGS.force, deli=ARGS.delimiter, dec_mark=ARGS.mark, out_ext=ARGS.outformat, out_dir=os.getcwd(), print_stat=True, pcs=ARGS.pcs, colheadlines=int(ARGS.colheadlines))