import json
import re
import click
from jsonpath_ng import jsonpath, parse
from rich import box
from rich.console import Console
from rich.markup import escape
from rich.table import Table
from uac_cli.utils.config import get_default_view, get_table_settings, get_view_columns
from uac_cli.utils.defaults import DEFAULTS
_BOX_STYLES = {
"rounded": box.ROUNDED,
"simple": box.SIMPLE,
"minimal": box.MINIMAL,
"heavy": box.HEAVY,
"markdown": box.MARKDOWN,
"ascii": box.ASCII,
"none": None,
}
[docs]def snake_to_camel(snake_case_str):
"""Converts a snake_case string to camelCase.
Args:
snake_case_str: The string in snake_case format.
Returns:
The converted string in camelCase format.
"""
components = snake_case_str.split("_")
return components[0] + "".join(x.title() for x in components[1:])
[docs]def process_output(
output,
select,
response,
text=False,
binary=False,
table=None,
table_fields=None,
command=None,
):
if table is None:
table = get_default_view() == "table" and isinstance(response, list)
if output and binary:
output.write(response)
elif output and not binary and text:
output.write(f"{response.get('response', '')}".replace("\\n", "\n").encode())
elif output and not binary and not text:
output.write(json.dumps(response, indent=4))
if select:
jsonpath_expr = parse(select)
result = [str(match.value) for match in jsonpath_expr.find(response)]
click.echo("\n".join(result))
else:
if output is None and binary:
click.echo(response)
elif output is None and not binary and text:
click.echo(response.get("response", "").replace("\\n", "\n"))
elif output is None and not binary and not text:
if table:
columns_source = "defaults"
if table_fields is not None:
columns_source = "flag"
if isinstance(table_fields, str):
table_fields = table_fields.split(",")
else:
view_columns = get_view_columns(command) if command else None
if view_columns is not None:
columns_source = "config"
table_fields = view_columns
else:
node = DEFAULTS
for key in command or ():
node = node.get(key, {}) if isinstance(node, dict) else {}
table_fields = (
node.get("columns") if isinstance(node, dict) else None
) or ["name", "summary", "status"]
print_json_table(response, table_fields, columns_source=columns_source)
else:
click.echo(json.dumps(response, indent=4))
[docs]def get_nested_value(data, dotted_key):
"""Traverse nested dicts using dot notation (e.g., 'name.label')."""
keys = dotted_key.split(".")
result = data
for k in keys:
if isinstance(result, dict):
# Case-insensitive lookup
matched = False
for dk in result.keys():
if dk.lower() == k.lower():
result = result[dk]
matched = True
break
if not matched:
return None
else:
return None
return result
[docs]def get_keyname(data, key):
# Support dot-notation for nested keys (e.g., "name.label")
top_level_key = key.split(".")[0]
for dk in data.keys():
if top_level_key.lower() == dk.lower():
return {"key": key}
return None
[docs]def print_json_table(data, fieldnames, columns_source="defaults"):
"""
Converts JSON data into a formatted table and prints it using rich.
:param data: List of dictionaries representing the JSON data.
:param fieldnames: List of field name strings to include in the table (in desired order).
:param columns_source: Where fieldnames originated — "flag", "config", or "defaults".
"""
if not data:
click.echo("No results.")
return
fieldnames_ = []
for field in fieldnames:
key = get_keyname(data[0], field)
if key is not None:
fieldnames_.append(key)
elif columns_source == "config":
raise click.ClickException(
f"Column '{field}' does not exist in the response. "
"Please check output_options.table_columns in your config."
)
elif columns_source == "flag":
raise click.ClickException(
f"Column '{field}' does not exist in the response."
)
fieldnames = list(fieldnames_)
headers = [
".".join(s[:1].upper() + s[1:] for s in str(field.get("key", "")).split("."))
for field in fieldnames
]
rows = []
for item in data:
row = []
for field in fieldnames:
field_key = field.get("key", "")
value = get_nested_value(item, field_key)
if value is None:
value = ""
row.append(escape(str(value)))
rows.append(row)
ts = get_table_settings()
table = Table(
show_header=True,
header_style="bold",
box=_BOX_STYLES.get(ts["box"], box.ROUNDED),
show_lines=ts["show_lines"],
expand=ts["expand"],
padding=ts["padding"],
)
for header in headers:
table.add_column(
header, no_wrap=True, max_width=ts["max_column_width"], overflow="ellipsis"
)
for row in rows:
table.add_row(*row)
Console().print(table)
[docs]def add_payload_value(vars_dict, _input, binary):
"""
Modify `vars_dict` with payload data extracted from `_input`.
If `_input` is non-binary, it is read as a dictionary. Values in `vars_dict` (from command line arguments)
are converted and used to overwrite corresponding keys in the payload dictionary.
If `_input` is binary, the raw data is stored directly under the "data" key in `vars_dict`.
Args:
vars_dict (dict): Dictionary of existing key-value pairs.
_input (file-like or None): Optional input stream with data to be added.
binary (bool): Flag indicating if `_input` is binary data.
Note: Command Line keys will only overwrite existing keys in the payload; new keys will NOT be added.
"""
payload = _input.read() if _input else None
if _input and not binary:
payload = json.loads(payload) if not isinstance(payload, dict) else payload
for var in vars_dict:
_var = snake_to_camel(var)
value = input_value_parsing(vars_dict[var])
if var in payload:
payload[var] = value
elif _var in payload:
payload[_var] = value
vars_dict["payload"] = payload
elif _input and binary:
vars_dict["data"] = payload
[docs]def create_payload(args):
vars_dict = dict(var.split("=", 1) for var in args)
payload = {}
for k, v in vars_dict.items():
payload[k] = v
return payload