Source code for nireports.assembler.misc

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2023 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
#     https://www.nipreps.org/community/licensing/
#
# STATEMENT OF CHANGES: This file was ported carrying over full git history from niworkflows,
# another NiPreps project licensed under the Apache-2.0 terms, and has been changed since.
"""Miscellaneous utilities."""

from collections import defaultdict
from pathlib import Path

from bids.utils import listify
from nipype.utils.filemanip import loadcrash


[docs] def read_crashfile(path, root=None, root_replace="<workdir>"): """ Prepare crashfiles for rendering into a report. Parameters ---------- path : :obj:`str` or :obj:`~pathlib.Path` The path where the crash-file is located. root : :obj:`str` or :obj:`~pathlib.Path` The root folder. If provided, the path will be replaced with ``root_replace``. root_replace : :obj:`str` A replacement for the absolute root path. Examples -------- .. testsetup:: >>> new_path = Path(__file__).resolve().parent >>> test_data_path = new_path / 'data' / 'tests' >>> info = read_crashfile(test_data_path / 'crashfile.txt') >>> info['node'] # doctest: +ELLIPSIS '...func_preproc_task_machinegame_run_02_wf.carpetplot_wf.conf_plot' >>> info['traceback'] # doctest: +ELLIPSIS '...ValueError: zero-size array to reduction operation minimum which has no identity' >>> info['file'] '...nireports/assembler/data/tests/crashfile.txt' >>> read_crashfile( ... test_data_path / 'crashfile.txt', ... root=test_data_path, ... root_replace="<outdir>", ... )['file'] '<outdir>/crashfile.txt' """ errordata = _read_pkl(path) if str(path).endswith(".pklz") else _read_txt(path) if root: errordata["file"] = f"{root_replace}/{Path(errordata['file']).relative_to(root)}" return errordata
def _read_pkl(path): crash_data = loadcrash(path) data = {"file": path, "traceback": "".join(crash_data["traceback"])} if "node" in crash_data: data["node"] = crash_data["node"] if data["node"].base_dir: data["node_dir"] = data["node"].output_dir() else: data["node_dir"] = "Node crashed before execution" data["inputs"] = sorted(data["node"].inputs.trait_get().items()) return data def _read_txt(path): """ Read a text crashfile. Examples -------- >>> new_path = Path(__file__).resolve().parent >>> test_data_path = new_path / 'data' / 'tests' >>> info = _read_txt(test_data_path / 'crashfile.txt') >>> info['node'] # doctest: +ELLIPSIS '...func_preproc_task_machinegame_run_02_wf.carpetplot_wf.conf_plot' >>> info['traceback'] # doctest: +ELLIPSIS '...ValueError: zero-size array to reduction operation minimum which has no identity' """ lines = Path(path).read_text().splitlines() data = {"file": str(path)} traceback_start = 0 if lines[0].startswith("Node"): data["node"] = lines[0].split(": ", 1)[1].strip() data["node_dir"] = lines[1].split(": ", 1)[1].strip() inputs = [] cur_key = "" cur_val = "" for i, line in enumerate(lines[5:]): if not line.strip(): continue if line[0].isspace(): cur_val += line continue if cur_val: inputs.append((cur_key, cur_val.strip())) if line.startswith("Traceback ("): traceback_start = i + 5 break cur_key, cur_val = tuple(line.split(" = ", 1)) data["inputs"] = sorted(inputs) else: data["node_dir"] = "Node crashed before execution" data["traceback"] = "\n".join(lines[traceback_start:]).strip() return data
[docs] def dict2html(indict, table_id): """Convert a dictionary into an HTML table.""" rows = sorted(unfold_columns(indict)) if not rows: return None width = max([len(row) for row in rows]) result_str = '<table id="%s" class="table table-sm table-striped">\n' % table_id td = "<td{1}>{0}</td>".format for row in rows: result_str += "<tr>" ncols = len(row) for i, col in enumerate(row): colspan = 0 colstring = "" if (width - ncols) > 0 and i == ncols - 2: colspan = (width - ncols) + 1 colstring = " colspan=%d" % colspan result_str += td(col, colstring) result_str += "</tr>\n" result_str += "</table>\n" return result_str
[docs] def unfold_columns(indict, prefix=None, delimiter="_"): """ Convert an input dict with flattened keys to an list of rows expanding columns. Parameters ---------- indict : :obj:`dict` Input dictionary to be expanded as a list of lists. prefix : :obj:`str` or :obj:`list` A string or list of strings to expand columns on the left (that is, all *rows* will be added these prefixes to the left side). delimiter : :obj:`str` The delimiter string. Examples -------- >>> unfold_columns({ ... "key1": "val1", ... "nested_key1": "nested value", ... "nested_key2": "another value", ... }) [['key1', 'val1'], ['nested', 'key1', 'nested value'], ['nested', 'key2', 'another value']] If nested keys do not share prefixes, they should not be unfolded. >>> unfold_columns({ ... "key1": "val1", ... "key1_split": "nonnested value", ... "key2_singleton": "another value", ... }) [['key1', 'val1'], ['key1_split', 'nonnested value'], ['key2_singleton', 'another value']] Nested/non-nested keys can be combined (see the behavior for key1): >>> unfold_columns({ ... "key1": "val1", ... "key1_split1": "nested value", ... "key1_split2": "nested value", ... "key2_singleton": "another value", ... }) [['key1', 'val1'], ['key1', 'split1', 'nested value'], ['key1', 'split2', 'nested value'], ['key2_singleton', 'another value']] >>> unfold_columns({ ... "key1": "val1", ... "nested_key1": "nested value", ... "nested_key2": "another value", ... }, prefix="prefix") [['prefix', 'key1', 'val1'], ['prefix', 'nested', 'key1', 'nested value'], ['prefix', 'nested', 'key2', 'another value']] >>> unfold_columns({ ... "key1": "val1", ... "nested_key1": "nested value", ... "nested_key2": "another value", ... }, prefix=["name", "lastname"]) [['name', 'lastname', 'key1', 'val1'], ['name', 'lastname', 'nested', 'key1', 'nested value'], ['name', 'lastname', 'nested', 'key2', 'another value']] >>> unfold_columns({ ... "key1": "val1", ... "nested_key1_sub1": "val2", ... "nested_key1_sub2": "val3", ... "nested_key2": "another value", ... }) [['key1', 'val1'], ['nested', 'key2', 'another value'], ['nested', 'key1', 'sub1', 'val2'], ['nested', 'key1', 'sub2', 'val3']] """ prefix = listify(prefix) if prefix is not None else [] keys = sorted(set(indict.keys())) data = [] subdict = defaultdict(dict, {}) for key in keys: col = key.split(delimiter, 1) if len(col) == 1: data.append(prefix + [col[0], indict[col[0]]]) else: subdict[col[0]][col[1]] = indict[key] if subdict: for skey in sorted(subdict.keys()): sskeys = list(subdict[skey].keys()) # If there is only one subkey, merge back if len(sskeys) == 1: value = subdict[skey][sskeys[0]] newkey = delimiter.join([skey] + sskeys) data.append(prefix + [newkey, value]) else: data += unfold_columns(subdict[skey], prefix=prefix + [skey]) return data