API Reference
Python utilities by the Professorship of Environmental Sensing and Modeling at the Technical University of Munich.
GitHub Repository https://github.com/tum-esm/utils (opens in a new tab) Documentation: https://tum-esm-utils.netlify.app (opens in a new tab) PyPI: https://pypi.org/project/tum-esm-utils (opens in a new tab)
tum_esm_utils.code
Functions for interacting with GitHub and GitLab.
Implements: request_github_file
, request_gitlab_file
request_github_file
def request_github_file(repository: str,
filepath: str,
access_token: Optional[str] = None,
branch_name: str = "main",
timeout: int = 10) -> str
Sends a request and returns the content of the response, as a string. Raises an HTTPError if the response status code is not 200.
Arguments:
repository
- In the format "owner/repo".filepath
- The path to the file in the repository.access_token
- The GitHub access token. Only required if the repo is private.branch_name
- The branch name.timeout
- The request timeout in seconds.
Returns:
The content of the file as a string.
request_gitlab_file
def request_gitlab_file(repository: str,
filepath: str,
access_token: Optional[str] = None,
branch_name: str = "main",
hostname: str = "gitlab.com",
timeout: int = 10) -> str
Sends a request and returns the content of the response, as a string. Raises an HTTPError if the response status code is not 200.
Arguments:
repository
- In the format "owner/repo".filepath
- The path to the file in the repository.access_token
- The GitLab access token. Only required if the repo is private.branch_name
- The branch name.hostname
- The GitLab hostname.timeout
- The request timeout in seconds.
Returns:
The content of the file as a string.
tum_esm_utils.datastructures
Datastructures not in the standard library.
Implements: RingList
, merge_dicts
RingList
Objects
class RingList()
__init__
def __init__(max_size: int)
Initialize a RingList with a maximum size.
clear
def clear() -> None
Removes all elements from the list.
is_full
def is_full() -> bool
Returns True if the list is full.
append
def append(x: float) -> None
Appends an element to the list.
get
def get() -> list[float]
Returns the list of elements.
sum
def sum() -> float
Returns the max size of the list
set_max_size
def set_max_size(new_max_size: int) -> None
Sets a new max size fo the list.
merge_dicts
def merge_dicts(old_object: Any, new_object: Any) -> Any
For a given dict, update it recursively from a new dict. It will not add any properties and assert that the types remain the same (or null). null->int or int->null is possible but not int->dict or list->int.
example:
merge_dicts(
old_object={"a": 3, "b": {"c": 50, "e": None}},
new_object={"b": {"e": 80}},
) == {"a": 3, "b": {"c": 50, "e": 80}}
tum_esm_utils.decorators
Decorators that can be used wrap functions.
Implements: with_filelock
with_filelock
Objects
class with_filelock()
FileLock = Mark, that a file is being used and other programs should not interfere. A file "*.lock" will be created and the content of this file will make the wrapped function possibly wait until other programs are done using it.
See https://en.wikipedia.org/wiki/Semaphore_(programming) (opens in a new tab).
Credits for the typing of higher level decorators goes to https://github.com/python/mypy/issues/1551#issuecomment-253978622 (opens in a new tab).
__init__
def __init__(lockfile_path: str, timeout: float = -1) -> None
Create a new filelock decorator.
A timeout of -1 means that the code waits forever.
Arguments:
lockfile_path
- The path to the lockfile.timeout
- The time to wait for the lock in seconds.
tum_esm_utils.em27
Functions for interacting with EM27 interferograms.
Implements: detect_corrupt_opus_files
, load_proffast2_result
.
This requires you to install this utils library with the optional polars
dependency:
pip install "tum_esm_utils[polars]"
## `or`
pdm add "tum_esm_utils[polars]"
detect_corrupt_opus_files
def detect_corrupt_opus_files(
ifg_directory: str,
silent: bool = True,
fortran_compiler: Literal["gfortran", "gfortran-9"] = "gfortran",
force_recompile: bool = False) -> dict[str, list[str]]
Returns dict[filename, list[error_messages]] for all corrupt opus files in the given directory.
It will compile the fortran code using a given compiler to perform this task. The fortran code is derived from the preprocess source code of Proffast 2 (https://www.imk-asf.kit.edu/english/3225.php (opens in a new tab)). We use it because the retrieval using Proffast 2 will fail if there are corrupt interferograms in the input.
Arguments:
ifg_directory
- The directory containing the interferograms.silent
- If set to False, print additional information.fortran_compiler
- The fortran compiler to use.force_recompile
- If set to True, the fortran code will be recompiled.
Returns:
A dictionary containing corrupt filenames as keys and a list of error messages as values.
detect_corrupt_ifgs
@deprecated("This will be removed in the next breaking release. Please use " +
"the identical function `detect_corrupt_opus_files` instead.")
def detect_corrupt_ifgs(ifg_directory: str,
silent: bool = True,
fortran_compiler: Literal["gfortran",
"gfortran-9"] = "gfortran",
force_recompile: bool = False) -> dict[str, list[str]]
Returns dict[filename, list[error_messages]] for all corrupt opus files in the given directory.
It will compile the fortran code using a given compiler to perform this task. The fortran code is derived from the preprocess source code of Proffast 2 (https://www.imk-asf.kit.edu/english/3225.php (opens in a new tab)). We use it because the retrieval using Proffast 2 will fail if there are corrupt interferograms in the input.
Arguments:
ifg_directory
- The directory containing the interferograms.silent
- If set to False, print additional information.fortran_compiler
- The fortran compiler to use.force_recompile
- If set to True, the fortran code will be recompiled.
Returns:
A dictionary containing corrupt filenames as keys and a list of error messages as values.
load_proffast2_result
def load_proffast2_result(path: str) -> pl.DataFrame
Loads the output of Proffast 2 into a polars DataFrame.
Arguments:
path
- The path to the Proffast 2 output file.
Returns:
A polars DataFrame containing all columns.
SERIAL_NUMBERS
The serial numbers of the EM27 devices.
COLORS
Colors recommended for plotting the EM27 data.
COLORS_LIGHT
Lighter colors recommended for plotting the EM27 data.
COLORS_DARK
Darker colors recommended for plotting the EM27 data.
PROFFAST_MULTIPLIERS
Multiplication factors for the EM27 data retrieved using Proffast to bring the data in a common unit.
PROFFAST_UNITS
Units for the EM27 data retrieved using Proffast after applying the multiplication factor.
tum_esm_utils.files
File-related utility functions.
Implements: load_file
, dump_file
, load_json_file
,
dump_json_file
, get_parent_dir_path
, get_dir_checksum
,
get_file_checksum
, rel_to_abs_path
, read_last_n_lines
,
expect_file_contents
, render_directory_tree
, list_directory
get_parent_dir_path
def get_parent_dir_path(script_path: str, current_depth: int = 1) -> str
Get the absolute path of a parent directory based on the
current script path. Simply pass the __file__
variable of
the current script to this function. Depth of 1 will return
the direct parent directory of the current script.
get_dir_checksum
def get_dir_checksum(path: str) -> str
Get the checksum of a directory using md5deep.
get_file_checksum
def get_file_checksum(path: str) -> str
Get the checksum of a file using MD5 from haslib
.
Significantly faster than get_dir_checksum
since it does
not spawn a new process.
rel_to_abs_path
def rel_to_abs_path(*path: str) -> str
Convert a path relative to the caller's file to an absolute path.
Inside file /home/somedir/somepath/somefile.py
, calling
rel_to_abs_path("..", "config", "config.json")
will
return /home/somedir/config/config.json
.
Credits to https://stackoverflow.com/a/59004672/8255842 (opens in a new tab)
read_last_n_lines
def read_last_n_lines(file_path: str,
n: int,
ignore_trailing_whitespace: bool = False) -> list[str]
Read the last n
lines of a file.
The function returns less than n
lines if the file has less than n
lines.
The last element in the list is the last line of the file.
This function uses seeking in order not to read the full file. The simple approach of reading the last 10 lines would be:
with open(path, "r") as f:
return f.read().split("\n")[:-10]
However, this would read the full file and if we only need to read 10 lines out of a 2GB file, this would be a big waste of resources.
The ignore_trailing_whitespace
option to crop off trailing whitespace, i.e.
only return the last n
lines that are not empty or only contain whitespace.
expect_file_contents
def expect_file_contents(filepath: str,
required_content_blocks: list[str] = [],
forbidden_content_blocks: list[str] = []) -> None
Assert that the given file contains all of the required content blocks, and/or none of the forbidden content blocks.
Arguments:
filepath
- The path to the file.required_content_blocks
- A list of strings that must be present in the file.forbidden_content_blocks
- A list of strings that must not be present in the file.
render_directory_tree
def render_directory_tree(root: str,
ignore: list[str] = [],
max_depth: Optional[int] = None,
root_alias: Optional[str] = None,
directory_prefix: Optional[str] = "📁 ",
file_prefix: Optional[str] = "📄 ") -> Optional[str]
Render a file tree as a string.
Example:
📁 <config.general.data.results>
├─── 📁 bundle
│ ├─── 📄 __init__.py
│ ├─── 📄 load_results.py
│ └─── 📄 main.py
├─── 📁 profiles
│ ├─── 📄 __init__.py
│ ├─── 📄 cache.py
│ ├─── 📄 download_logic.py
│ ├─── 📄 generate_queries.py
│ ├─── 📄 main.py
│ ├─── 📄 std_site_logic.py
│ └─── 📄 upload_logic.py
├─── 📁 retrieval
│ ├─── 📁 algorithms
...
Arguments:
-
root
- The root directory to render. -
ignore
- A list of patterns to ignore. If the basename of a directory matches any of the patterns, the directory is ignored. -
max_depth
- The maximum depth to render. IfNone
, render the full tree. -
root_alias
- An alias for the root directory. IfNone
, the basename of the root directory is used. In the example above, the root directory is was aliased to<config.general.data.results>
. -
directory_prefix
- The prefix to use for directories. -
file_prefix
- The prefix to use for files. -
Returns
- The directory tree as a string. If the root directory is ignored,None
.
list_directory
def list_directory(path: str,
regex: Optional[str] = None,
ignore: Optional[list[str]] = None,
include_directories: bool = True,
include_files: bool = True,
include_links: bool = True) -> list[str]
List the contents of a directory based on certain criteria. Like os.listdir
with superpowers. You can filter the list by a regex or you can ignore Unix shell
style patterns like *.lock
.
Arguments:
-
path
- The path to the directory. -
regex
- A regex pattern to match the item names against. -
ignore
- A list of patterns to ignore. If the basename of an item matches any of the patterns, the item is ignored. -
include_directories
- Whether to include directories in the output. -
include_files
- Whether to include files in the output. -
include_links
- Whether to include symbolic links in the output. -
Returns
- A list of items in the directory that match the criteria.
tum_esm_utils.mathematics
Mathematical functions.
Implements: distance_between_angles
distance_between_angles
def distance_between_angles(angle_1: float, angle_2: float) -> float
Calculate the directional distance (in degrees) between two angles.
tum_esm_utils.plotting
Better defaults for matplotlib plots and utilities for creating and saving figures.
Implements: apply_better_defaults
, create_figure
, add_subplot
This requires you to install this utils library with the optional plotting
dependencies:
pip install "tum_esm_utils[plotting]"
## `or`
pdm add "tum_esm_utils[plotting]"
apply_better_defaults
def apply_better_defaults(font_family: Optional[str] = "Roboto") -> None
Apply better defaults to matplotlib plots.
Arguments:
font_family
- The font family to use for the plots. If None, the default settings are not changed.
create_figure
@contextlib.contextmanager
def create_figure(path: str,
title: Optional[str] = None,
width: float = 10,
height: float = 10,
suptitle_y: float = 0.97,
padding: float = 2,
dpi: int = 250) -> Generator[plt.Figure, None, None]
Create a figure for plotting.
Usage:
with create_figure("path/to/figure.png", title="Title") as fig:
...
Arguments:
path
- The path to save the figure to.title
- The title of the figure.width
- The width of the figure.height
- The height of the figure.suptitle_y
- The y-coordinate of the figure title.padding
- The padding of the figure.dpi
- The DPI of the figure.
add_subplot
def add_subplot(fig: plt.Figure,
position: tuple[int, int, int],
title: Optional[str] = None,
xlabel: Optional[str] = None,
ylabel: Optional[str] = None,
**kwargs: dict[str, Any]) -> plt.Axes
Add a subplot to a figure.
Arguments:
fig
- The figure to add the subplot to.position
- The position of the subplot. The tuple should contain three integers: the number of rows, the number of columns, and the index of the subplot.title
- The title of the subplot.xlabel
- The x-axis label of the subplot.ylabel
- The y-axis label of the subplot.**kwargs
- Additional keyword arguments for the subplot.
Returns:
An axis object for the new subplot.
Raises:
ValueError
- If the index of the subplot is invalid.
add_colorpatch_legend
def add_colorpatch_legend(fig: plt.Figure,
handles: list[tuple[str, Union[
str,
tuple[float, float, float],
tuple[float, float, float, float],
]]],
ncols: Optional[int] = None,
location: str = "upper left") -> None
Add a color patch legend to a figure.
Arguments:
fig
- The figure to add the legend to.handles
- A list of tuples containing the label and color of each patch (e.g.[("Label 1", "red"), ("Label 2", "blue")]
). You can pass any color that is accepted by matplotlib.ncols
- The number of columns in the legend.location
- The location of the legend.
tum_esm_utils.processes
Functions to start and terminate background processes.
Implements: get_process_pids
, start_background_process
,
terminate_process
get_process_pids
def get_process_pids(script_path: str) -> list[int]
Return a list of PIDs that have the given script as their entrypoint.
Arguments:
script_path
- The absolute path of the python file entrypoint.
start_background_process
def start_background_process(interpreter_path: str,
script_path: str,
waiting_period: float = 0.5) -> int
Start a new background process with nohup with a given python interpreter and script path. The script paths parent directory will be used as the working directory for the process.
Arguments:
-
interpreter_path
- The absolute path of the python interpreter. -
script_path
- The absolute path of the python file entrypoint. -
waiting_period
- The waiting period in seconds after starting the process. -
Returns
- The PID of the started process.
terminate_process
def terminate_process(script_path: str,
termination_timeout: Optional[int] = None) -> list[int]
Terminate all processes that have the given script as their entrypoint. Returns the list of terminated PIDs.
If termination_timeout
is not None, the processes will be
terminated forcefully after the given timeout (in seconds).
Arguments:
script_path
- The absolute path of the python file entrypoint.termination_timeout
- The timeout in seconds after which the processes will be terminated forcefully.
Returns:
The list of terminated PIDs.
tum_esm_utils.shell
Implements custom logging functionality, because the standard logging module is hard to configure for special cases.
Implements: run_shell_command
, CommandLineException
,
get_hostname
, get_commit_sha
, change_file_permissions
CommandLineException
Objects
class CommandLineException(Exception)
Exception raised for errors in the command line.
run_shell_command
def run_shell_command(command: str,
working_directory: Optional[str] = None,
executable: str = "/bin/bash") -> str
runs a shell command and raises a CommandLineException
if the return code is not zero, returns the stdout. Uses
/bin/bash
by default.
Arguments:
command
- The command to run.working_directory
- The working directory for the command.executable
- The shell executable to use.
Returns:
The stdout of the command as a string.
get_hostname
def get_hostname() -> str
returns the hostname of the device, removes network
postfix (somename.local
) if present. Only works reliably,
when the hostname doesn't contain a dot.
get_commit_sha
def get_commit_sha(
variant: Literal["short", "long"] = "short") -> Optional[str]
Get the current commit sha of the repository. Returns
None
if there is not git repository in any parent directory.
Arguments:
variant
- "short" or "long" to specify the length of the sha.
Returns:
The commit sha as a string, or None
if there is no git
repository in the parent directories.
change_file_permissions
def change_file_permissions(file_path: str, permission_string: str) -> None
Change a file's system permissions.
Example permission_strings: --x------
, rwxr-xr-x
, rw-r--r--
.
Arguments:
file_path
- The path to the file.permission_string
- The new permission string.
tum_esm_utils.system
Common system status related functions.
Implements: get_cpu_usage
, get_memory_usage
, get_disk_space
,
get_system_battery
, get_last_boot_time
, get_utc_offset
get_cpu_usage
def get_cpu_usage() -> list[float]
Checks the CPU usage of the system.
Returns:
The CPU usage in percent for each core.
get_memory_usage
def get_memory_usage() -> float
Checks the memory usage of the system.
Returns:
The memory usage in percent.
get_disk_space
def get_disk_space(path: str = "/") -> float
Checks the disk space of a given path.
Arguments:
path
- The path to check the disk space for.
Returns:
The available disk space in percent.
get_system_battery
def get_system_battery() -> Optional[int]
Checks the system battery.
Returns:
The battery state in percent if available, else None.
get_last_boot_time
def get_last_boot_time() -> datetime.datetime
Checks the last boot time of the system.
get_utc_offset
def get_utc_offset() -> float
Returns the UTC offset of the system.
Credits to https://stackoverflow.com/a/35058476/8255842 (opens in a new tab)
x = get_utc_offset()
local time == utc time + x
Returns:
The UTC offset in hours.
tum_esm_utils.text
Functions used for text manipulation/processing.
Implements: get_random_string
, pad_string
, is_date_string
,
is_rfc3339_datetime_string
, insert_replacements
, simplify_string_characters
,
replace_consecutive_characters
, RandomLabelGenerator
get_random_string
def get_random_string(length: int, forbidden: list[str] = []) -> str
Return a random string from lowercase letters.
Arguments:
length
- The length of the random string.forbidden
- A list of strings that should not be generated.
Returns:
A random string.
pad_string
def pad_string(text: str,
min_width: int,
pad_position: Literal["left", "right"] = "left",
fill_char: Literal["0", " ", "-", "_"] = " ") -> str
Pad a string with a fill character to a minimum width.
Arguments:
text
- The text to pad.min_width
- The minimum width of the text.pad_position
- The position of the padding. Either "left" or "right".fill_char
- The character to use for padding.
Returns:
The padded string.
is_date_string
def is_date_string(date_string: str) -> bool
Returns True
if string is in a valid YYYYMMDD
format.
is_rfc3339_datetime_string
def is_rfc3339_datetime_string(rfc3339_datetime_string: str) -> bool
Returns True
if string is in a valid YYYY-MM-DDTHH:mm:ssZ
(RFC3339)
format. Caution: The appendix of +00:00
is required for UTC!
insert_replacements
def insert_replacements(content: str, replacements: dict[str, str]) -> str
For every key in replacements, replaces %key%
in the
content with its value.
simplify_string_characters
def simplify_string_characters(s: str,
additional_replacements: dict[str,
str] = {}) -> str
Simplify a string by replacing special characters with their ASCII counterparts and removing unwanted characters.
For example, simplify_string_characters("Héllo, wörld!")
will return "hello-woerld"
.
Arguments:
-
s
- The string to simplify. -
additional_replacements
- A dictionary of additional replacements to apply.{ "ö": "oe" }
will replaceö
withoe
. -
Returns
- The simplified string.
replace_consecutive_characters
def replace_consecutive_characters(s: str,
characters: list[str] = [" ", "-"]) -> str
Replace consecutiv characters in a string (e.g. "hello---world" -> "hello-world" or "hello world" -> "hello world").
Arguments:
s
- The string to process.characters
- A list of characters to replace duplicates of.
Returns:
The string with duplicate characters replaced.
RandomLabelGenerator
Objects
class RandomLabelGenerator()
A class to generate random labels that follow the Docker style naming of
containers, e.g admiring-archimedes
or happy-tesla
.
Usage with tracking duplicates:
generator = RandomLabelGenerator()
label = generator.generate()
another_label = generator.generate() # Will not be the same as `label`
generator.free(label) # Free the label to be used again
Usage without tracking duplicates:
label = RandomLabelGenerator.generate_fully_random()
Source for the names and adjectives: https://github.com/moby/moby/blob/master/pkg/namesgenerator/names-generator.go (opens in a new tab)
__init__
def __init__(occupied_labels: set[str] | list[str] = set(),
adjectives: set[str] | list[str] = CONTAINER_ADJECTIVES,
names: set[str] | list[str] = CONTAINER_NAMES) -> None
Initialize the label generator.
generate
def generate() -> str
Generate a random label that is not already occupied.
free
def free(label: str) -> None
Free a label to be used again.
generate_fully_random
@staticmethod
def generate_fully_random(
adjectives: set[str] | list[str] = CONTAINER_ADJECTIVES,
names: set[str] | list[str] = CONTAINER_NAMES) -> str
Get a random label without tracking duplicates.
Use an instance of RandomLabelGenerator
if you want to avoid
duplicates by tracking occupied labels.
tum_esm_utils.timing
Functions used for timing or time calculations.
Implements: date_range
, ensure_section_duration
, set_alarm
,
clear_alarm
, wait_for_condition
, ExponentialBackoff
date_range
def date_range(from_date: datetime.date,
to_date: datetime.date) -> list[datetime.date]
Returns a list of dates between from_date and to_date (inclusive).
ensure_section_duration
@contextlib.contextmanager
def ensure_section_duration(duration: float) -> Generator[None, None, None]
Make sure that the duration of the section is at least the given duration.
Usage example - do one measurement every 6 seconds:
with ensure_section_duration(6):
do_measurement()
set_alarm
def set_alarm(timeout: int, label: str) -> None
Set an alarm that will raise a TimeoutError
after
timeout
seconds. The message will be formatted as
{label} took too long (timed out after {timeout} seconds)
.
clear_alarm
def clear_alarm() -> None
Clear the alarm set by set_alarm
.
parse_timezone_string
def parse_timezone_string(timezone_string: str,
dt: Optional[datetime.datetime] = None) -> float
Parse a timezone string and return the offset in hours.
Why does this function exist? The strptime
function cannot parse strings
other than "±HHMM". This function can also parse strings in the format "±H"
("+2", "-3", "+5.5"), and "±HH:MM".
Examples:
parse_timezone_string("GMT") # returns 0
parse_timezone_string("GMT+2") # returns 2
parse_timezone_string("UTC+2.0") # returns 2
parse_timezone_string("UTC-02:00") # returns -2
You are required to pass a datetime object in case the utc offset for the passed timezone is not constant - e.g. for "Europe/Berlin".
wait_for_condition
def wait_for_condition(is_successful: Callable[[], bool],
timeout_message: str,
timeout_seconds: float = 5,
check_interval_seconds: float = 0.25) -> None
Wait for the given condition to be true, or raise a TimeoutError if the condition is not met within the given timeout. The condition is passed as a function that will be called periodically.
Arguments:
is_successful
- A function that returns True if the condition is met.timeout_message
- The message to include in the TimeoutError.timeout_seconds
- The maximum time to wait for the condition to be met.check_interval_seconds
- How long to wait inbetweenis_successful()
calls.
parse_iso_8601_datetime
def parse_iso_8601_datetime(s: str) -> datetime.datetime
Parse a datetime string from various formats and return a datetime object.
ISO 8601 supports time zones as <time>Z
, <time>±hh:mm
, <time>±hhmm
and
<time>±hh
. However, only the second format is supported by datetime.datetime.fromisoformat()
(HH[:MM[:SS[.fff[fff]]]][+HH:MM[:SS[.ffffff]]]
).
This function supports parsing alll ISO 8601 time formats.
datetime_span_intersection
def datetime_span_intersection(
dt_span_1: tuple[datetime.datetime,
datetime.datetime], dt_span_2: tuple[datetime.datetime,
datetime.datetime]
) -> Optional[tuple[datetime.datetime, datetime.datetime]]
Check if two datetime spans overlap.
Arguments:
dt_span_1
- The first datetime span (start, end).dt_span_2
- The second datetime span (start, end).
Returns:
The intersection of the two datetime spans or None if they do not overlap. Returns None if the intersection is a single point.
date_span_intersection
def date_span_intersection(
d_span_1: tuple[datetime.date,
datetime.date], d_span_2: tuple[datetime.date,
datetime.date]
) -> Optional[tuple[datetime.date, datetime.date]]
Check if two date spans overlap. This functions behaves
differently from datetime_span_intersection
in that it
returns a single point as an intersection if the two date
spans overlap at a single date.
Arguments:
d_span_1
- The first date span (start, end).d_span_2
- The second date span (start, end).
Returns:
The intersection of the two date spans or None if they do not overlap.
ExponentialBackoff
Objects
class ExponentialBackoff()
Exponential backoff e.g. when errors occur. First try again in 1 minute, then 4 minutes, then 15 minutes, etc.. Usage:
exponential_backoff = ExponentialBackoff(
log_info=logger.info, buckets= [60, 240, 900, 3600, 14400]
)
while True:
try:
# do something that might fail
exponential_backoff.reset()
except Exception as e:
logger.exception(e)
exponential_backoff.sleep()
__init__
def __init__(log_info: Optional[Callable[[str], None]] = None,
buckets: list[int] = [60, 240, 900, 3600, 14400]) -> None
Create a new exponential backoff object.
Arguments:
log_info
- The function to call when logging information.buckets
- The buckets to use for the exponential backoff.
sleep
def sleep(max_sleep_time: Optional[float] = None) -> float
Wait and increase the wait time to the next bucket.
Arguments:
max_sleep_time
- The maximum time to sleep. If None, no maximum is set.
Returns:
The amount of seconds waited.
reset
def reset() -> None
Reset the waiting period to the first bucket
tum_esm_utils.validators
Implements validator utils for use with pydantic models.
Implements: StrictFilePath
, StrictDirectoryPath
StrictFilePath
Objects
class StrictFilePath(pydantic.RootModel[str])
A pydantic model that validates a file path.
Example usage:
class MyModel(pyndatic.BaseModel):
path: StrictFilePath
m = MyModel(path='/path/to/file') # validates that the file exists
The validation can be ignored by setting the context variable:
m = MyModel.model_validate(
{"path": "somenonexistingpath"},
context={"ignore-path-existence": True},
) # does not raise an error
StrictDirectoryPath
Objects
class StrictDirectoryPath(pydantic.RootModel[str])
A pydantic model that validates a directory path.
Example usage:
class MyModel(pyndatic.BaseModel):
path: StrictDirectoryPath
m = MyModel(path='/path/to/directory') # validates that the directory exists
The validation can be ignored by setting the context variable:
m = MyModel.model_validate(
{"path": "somenonexistingpath"},
context={"ignore-path-existence": True},
) # does not raise an error
Version
Objects
class Version(pydantic.RootModel[str])
A version string in the format of MAJOR.MINOR.PATCH[-(alpha|beta|rc).N]
as_tag
def as_tag() -> str
Return the version string as a tag, i.e. vMAJOR.MINOR.PATCH...
as_identifier
def as_identifier() -> str
Return the version string as a number, i.e. MAJOR.MINOR.PATCH...
StricterBaseModel
Objects
class StricterBaseModel(pydantic.BaseModel)
The same as pydantic.BaseModel, but with stricter rules. It does not allow extra fields and validates assignments after initialization.
StrictIPv4Adress
Objects
class StrictIPv4Adress(pydantic.RootModel[str])
A pydantic model that validates an IPv4 address.
Example usage:
class MyModel(pyndatic.BaseModel):
ip: StrictIPv4Adress
m = MyModel(ip='192.186.2.1')
m = MyModel(ip='192.186.2.1:22')