Source code for uac_cli.utils.process

import json
import re

import click
from jsonpath_ng import jsonpath, parse
from rich import box
from rich.console import Console
from rich.markup import escape
from rich.table import Table

from uac_cli.utils.config import get_default_view, get_table_settings, get_view_columns
from uac_cli.utils.defaults import DEFAULTS

_BOX_STYLES = {
    "rounded": box.ROUNDED,
    "simple": box.SIMPLE,
    "minimal": box.MINIMAL,
    "heavy": box.HEAVY,
    "markdown": box.MARKDOWN,
    "ascii": box.ASCII,
    "none": None,
}


[docs]def snake_to_camel(snake_case_str): """Converts a snake_case string to camelCase. Args: snake_case_str: The string in snake_case format. Returns: The converted string in camelCase format. """ components = snake_case_str.split("_") return components[0] + "".join(x.title() for x in components[1:])
[docs]def process_output( output, select, response, text=False, binary=False, table=None, table_fields=None, command=None, ): if table is None: table = get_default_view() == "table" and isinstance(response, list) if output and binary: output.write(response) elif output and not binary and text: output.write(f"{response.get('response', '')}".replace("\\n", "\n").encode()) elif output and not binary and not text: output.write(json.dumps(response, indent=4)) if select: jsonpath_expr = parse(select) result = [str(match.value) for match in jsonpath_expr.find(response)] click.echo("\n".join(result)) else: if output is None and binary: click.echo(response) elif output is None and not binary and text: click.echo(response.get("response", "").replace("\\n", "\n")) elif output is None and not binary and not text: if table: columns_source = "defaults" if table_fields is not None: columns_source = "flag" if isinstance(table_fields, str): table_fields = table_fields.split(",") else: view_columns = get_view_columns(command) if command else None if view_columns is not None: columns_source = "config" table_fields = view_columns else: node = DEFAULTS for key in command or (): node = node.get(key, {}) if isinstance(node, dict) else {} table_fields = ( node.get("columns") if isinstance(node, dict) else None ) or ["name", "summary", "status"] print_json_table(response, table_fields, columns_source=columns_source) else: click.echo(json.dumps(response, indent=4))
[docs]def get_nested_value(data, dotted_key): """Traverse nested dicts using dot notation (e.g., 'name.label').""" keys = dotted_key.split(".") result = data for k in keys: if isinstance(result, dict): # Case-insensitive lookup matched = False for dk in result.keys(): if dk.lower() == k.lower(): result = result[dk] matched = True break if not matched: return None else: return None return result
[docs]def get_keyname(data, key): # Support dot-notation for nested keys (e.g., "name.label") top_level_key = key.split(".")[0] for dk in data.keys(): if top_level_key.lower() == dk.lower(): return {"key": key} return None
[docs]def add_payload_value(vars_dict, _input, binary): """ Modify `vars_dict` with payload data extracted from `_input`. If `_input` is non-binary, it is read as a dictionary. Values in `vars_dict` (from command line arguments) are converted and used to overwrite corresponding keys in the payload dictionary. If `_input` is binary, the raw data is stored directly under the "data" key in `vars_dict`. Args: vars_dict (dict): Dictionary of existing key-value pairs. _input (file-like or None): Optional input stream with data to be added. binary (bool): Flag indicating if `_input` is binary data. Note: Command Line keys will only overwrite existing keys in the payload; new keys will NOT be added. """ payload = _input.read() if _input else None if _input and not binary: payload = json.loads(payload) if not isinstance(payload, dict) else payload for var in vars_dict: _var = snake_to_camel(var) value = input_value_parsing(vars_dict[var]) if var in payload: payload[var] = value elif _var in payload: payload[_var] = value vars_dict["payload"] = payload elif _input and binary: vars_dict["data"] = payload
[docs]def process_input(args, _input=None, binary=False): """ Processes input arguments (including argument file) into a dictionary of key-value pairs. Converts a list of "key=value" strings into a dictionary.If an argument file exists, reads additional key-value pairs from it, adding keys not present. If `_input` is provided and the keys "retain_sys_ids" or "retainSysIds" are absent, adds "retain_sys_ids" = "true". Then calls `add_payload_value` to overwrite duplicate keys. Args: args (list of str): Command line arguments as "key=value" strings. _input (optional): Input stream supplying JSON or binary payload. binary (bool): Whether `_input` is binary data. Returns: dict: Dictionary of all parsed and merged key-value pairs. """ vars_dict = {} for var in args: try: key, value = var.split("=", 1) vars_dict[key] = value except ValueError as ve: msg = f"Error while processing input: Argument '{var}' is not in 'key=value' format." click.secho(msg, fg="red") raise ValueError(msg) from ve except Exception as e: msg = f"Unexpected error occurred while processing input arguments: {e}" click.secho(msg, fg="red") raise RuntimeError(msg) from e ctx = click.get_current_context() argument_file = ctx.find_root().params.get("argument_file") extra_arg_lines = argument_file.read().split("\n") if argument_file else [] for extra_arg in extra_arg_lines: k_, v_ = extra_arg.split("=", 1) if "=" in extra_arg else (None, None) if k_ is not None and k_ not in vars_dict: vars_dict[k_] = v_ if ( _input is not None and "retain_sys_ids" not in vars_dict and "retainSysIds" not in vars_dict ): vars_dict["retain_sys_ids"] = "true" add_payload_value(vars_dict, _input, binary) return vars_dict
[docs]def input_value_parsing(value): if not isinstance(value, str): return value if value.lower() == ":none:": return None elif value.lower() == ":false:": return False elif value.lower() == ":true:": return True elif value == ":[]:": return [] elif value == ":{}:": return {} elif re.match(r"^:\d+:$", value): return int(value.strip(":")) else: return value
[docs]def create_payload(args): vars_dict = dict(var.split("=", 1) for var in args) payload = {} for k, v in vars_dict.items(): payload[k] = v return payload