#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/lucit-ubdcc-shared-modules/lucit_ubdcc_shared_modules/App.py
#
# Project website: https://www.lucit.tech/unicorn-depthcache-cluster-for-binance.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance
# Documentation: https://unicorn-depthcache-cluster-for-binance.docs.lucit.tech
# PyPI: https://pypi.org/project/lucit-ubdcc-shared-modules
# LUCIT Online Shop: https://shop.lucit.services/software/unicorn-depthcache-cluster-for-binance
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2024-2024, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.
import aiohttp
import asyncio
import cython
import logging
import json
import os
import signal as sys_signal
import socket
import random
import string
import sys
import kubernetes
import time
import traceback
from fastapi import FastAPI
from .LicensingManager import LucitLicensingManager, NoValidatedLucitLicense
MGMT_IS_READY_TIME: int = 10
K8S_SERVICE_PORT_MGMT: int = 4280
REST_SERVER_PORT: int = 8080
REST_SERVER_PORT_DEV_DCN: int = 42082
REST_SERVER_PORT_DEV_MGMT: int = 42080
REST_SERVER_PORT_DEV_RESTAPI: int = 42081
VERSION: str = "0.1.4"
[docs]
class App:
def __init__(self, app_name=None, cwd=None, logger=None, service=None, service_call=None, stop_call=None):
self.app_name = app_name
self.app_version = VERSION
self.api_port_rest: int = 0
self.cwd = cwd
self.ubdcc_mgmt_url = None
self.dev_mode = False
self.fastapi = None
self.info: dict = {}
self.k8s_client = None
self.k8s_metrics_client = None
self.logger = logger
self.pod_info = None
self.node_info = None
self.mgmt_is_ready_time = MGMT_IS_READY_TIME
self.k8s_service_port_mgmt = K8S_SERVICE_PORT_MGMT
self.rest_server_port = REST_SERVER_PORT
self.rest_server_port_dev_dcn = REST_SERVER_PORT_DEV_DCN
self.rest_server_port_dev_mgmt = REST_SERVER_PORT_DEV_MGMT
self.rest_server_port_dev_restapi = REST_SERVER_PORT_DEV_RESTAPI
self.service = service
self.service_call = service_call
self.sigterm = False
self.stop_call = stop_call
self.status = "starting"
self.ubdcc_mgmt_backup: dict | None = None
self.data: dict = {}
self.id: dict = {}
self.dcn_usage: dict = {}
self.llm: LucitLicensingManager | None = None
[docs]
def deactivate_license(self, close_api_session: bool = True) -> bool | None:
try:
self.data['db'].set_license_status(status="INVALID")
except KeyError:
return False
if close_api_session is True:
self.llm.close()
return True
[docs]
@staticmethod
def generate_string(length):
letters = string.ascii_letters + string.digits
return ''.join(random.choice(letters) for _ in range(length))
[docs]
async def get_backup_from_node(self, host, port) -> dict | None:
data = await self.request(f"http://{host}:{port}/ubdcc_mgmt_backup", method="get")
try:
data = data['db']
except KeyError:
return None
return data
[docs]
async def get_backup_timestamp_from_node(self, host, port) -> float | None:
data = await self.request(f"http://{host}:{port}/ubdcc_mgmt_backup?get_backup_timestamp", method="get")
try:
data = json.loads(data['db'])
except KeyError:
return None
if data.get('timestamp'):
return float(data.get('timestamp'))
return None
[docs]
def get_dcn_uid_unused_longest_time(self, selection: list = None) -> str | None:
available_dcn: dict = self.data['db'].get_available_dcn_pods()
dcn_unused_longest_time: str | None = None
for uid in available_dcn:
if self.dcn_usage.get(uid) is None:
self.dcn_usage[uid]: float = 0.0
for uid in self.dcn_usage:
for selection_uid in selection:
if selection_uid == uid:
if dcn_unused_longest_time is None:
dcn_unused_longest_time = selection_uid
else:
if self.dcn_usage[dcn_unused_longest_time] > self.dcn_usage[uid]:
dcn_unused_longest_time = uid
self.dcn_usage[dcn_unused_longest_time] = time.time()
return dcn_unused_longest_time
[docs]
def get_fastapi_instance(self) -> FastAPI:
if self.fastapi:
return self.fastapi
else:
if self.dev_mode:
# DEV MODE!!!
self.stdout_msg("Starting REST Server (DEV MODE) ...", log="info")
self.fastapi = FastAPI()
else:
# PRODUCTIVE MODE!!!
self.stdout_msg("Starting REST Server ...", log="info")
self.fastapi = FastAPI(docs_url=None, redoc_url=None)
return self.fastapi
[docs]
def get_k8s_nodes(self) -> dict:
if self.status != "running":
raise RuntimeError(f"Instance is not running!")
if self.k8s_client is not None:
k8s_nodes = self.k8s_client.list_node()
result_nodes = {}
for node in k8s_nodes.items:
node_name = node.metadata.name
node_uid = node.metadata.uid
try:
metrics = self.k8s_metrics_client.get_cluster_custom_object(
group="metrics.k8s.io", version="v1beta1", plural="nodes", name=node_name
)
except kubernetes.client.exceptions.ApiException as error_msg:
self.stdout_msg(f"Error when querying the K8s nodes: {error_msg}", log="error")
return {}
cpu_usage = metrics['usage']['cpu']
memory_usage = metrics['usage']['memory']
cpu_capacity = node.status.capacity['cpu']
memory_capacity = node.status.capacity['memory']
# Convert CPU usage to milli-units
if cpu_usage.endswith('m'):
cpu_usage_milli = int(cpu_usage[:-1])
elif cpu_usage.endswith('u'): # Handle micro-units 'u'
cpu_usage_milli = int(cpu_usage[:-1]) / 1000
elif cpu_usage.endswith('n'): # Handle nano-units 'n'
cpu_usage_milli = int(cpu_usage[:-1]) / 1_000_000
else:
cpu_usage_milli = int(cpu_usage) * 1000 # Assume no unit means cores, convert to milli
# Convert CPU capacity to milli-units
if cpu_capacity.endswith('m'):
cpu_capacity_milli = int(cpu_capacity[:-1])
elif cpu_capacity.endswith('u'): # Handle micro-units 'u'
cpu_capacity_milli = int(cpu_capacity[:-1]) / 1000
elif cpu_capacity.endswith('n'): # Handle nano-units 'n'
cpu_capacity_milli = int(cpu_capacity[:-1]) / 1_000_000
else:
cpu_capacity_milli = int(cpu_capacity) * 1000 # Assume no unit means cores, convert to milli
# Calculate CPU usage percentage
cpu_percentage = (cpu_usage_milli / cpu_capacity_milli) * 100
# Convert memory usage and capacity to bytes
memory_usage_bytes = int(memory_usage[:-2]) * 1024 # Assuming 'Ki' suffix
memory_capacity_bytes = int(memory_capacity[:-2]) * 1024 # Assuming 'Ki' suffix
# Calculate memory usage percentage
memory_percentage = (memory_usage_bytes / memory_capacity_bytes) * 100
# Store the node information
result_nodes[node_uid] = {
"NAME": node_name,
"UID": node_uid,
"USAGE_CPU_PERCENT": f"{cpu_percentage:.2f}",
"USAGE_MEMORY_PERCENT": f"{memory_percentage:.2f}"
}
return result_nodes
return {}
[docs]
def get_backup_timestamp(self) -> float | None:
if self.ubdcc_mgmt_backup is None:
timestamp = None
else:
timestamp = float(self.ubdcc_mgmt_backup['timestamp'])
return timestamp
[docs]
def get_cluster_mgmt_address(self):
if self.dev_mode:
# DEV MODE!!!
url = f"http://localhost:{self.rest_server_port_dev_mgmt}"
else:
# PRODUCTIVE MODE!!!
url = f"http://lucit-ubdcc-mgmt.{self.id['namespace']}.svc.cluster.local:{self.k8s_service_port_mgmt}"
return url
[docs]
@staticmethod
def get_unix_timestamp():
return time.time()
[docs]
@staticmethod
def get_version() -> str:
return VERSION
[docs]
@staticmethod
def is_compiled() -> bool:
return cython.compiled
[docs]
def is_shutdown(self) -> bool:
return self.sigterm
[docs]
async def register_or_restart(self, ubldc_version: str = None):
if await self.ubdcc_node_registration(ubldc_version=ubldc_version) is False:
self.shutdown(message="Node registration failed!")
[docs]
def register_graceful_shutdown(self) -> None:
sys_signal.signal(sys_signal.SIGINT, self.sigterm_handler)
sys_signal.signal(sys_signal.SIGTERM, self.sigterm_handler)
[docs]
@staticmethod
async def request(url, method, params=None, headers=None, timeout=10) -> dict:
try:
async with aiohttp.ClientSession() as session:
if method == "get":
async with session.get(url, params=params, headers=headers, timeout=timeout) as response:
response.raise_for_status()
return await response.json()
elif method == "post":
async with session.post(url, json=params, headers={"Content-Type": "application/json"},
timeout=timeout) as response:
response.raise_for_status()
return await response.json()
else:
raise ValueError("Allowed 'method' values: get, post")
except asyncio.CancelledError as error_msg:
print(f"An error occurred: asyncio.CancelledError - {url} - {error_msg}")
return {"error": f"asyncio.CancelledError - {url} - {str(error_msg)}"}
except asyncio.TimeoutError:
print(f"An error occurred: asyncio.TimeoutError - {url}")
return {"error": f"asyncio.TimeoutError - {url}"}
except aiohttp.ClientError as error_msg:
print(f"An error occurred: aiohttp.ClientError- {url} - {error_msg}")
return {"error": f"aiohttp.ClientError - {url} - {str(error_msg)}"}
[docs]
async def send_backup_to_node(self, host, port) -> dict:
return await self.request(f"http://{host}:{port}/ubdcc_mgmt_backup", method="post",
params=self.data['db'].get_backup_dict())
[docs]
def set_api_rest_port(self):
if self.dev_mode:
# DEV MODE!!!
if self.info['name'] == "lucit-ubdcc-dcn":
self.api_port_rest = self.rest_server_port_dev_dcn
elif self.info['name'] == "lucit-ubdcc-mgmt":
self.api_port_rest = self.rest_server_port_dev_mgmt
elif self.info['name'] == "lucit-ubdcc-restapi":
self.api_port_rest = self.rest_server_port_dev_restapi
else:
raise ValueError(f"Not able to choose the right rest server port for app '{self.info['name']}'")
else:
# PRODUCTIVE MODE!!!
self.api_port_rest = self.rest_server_port
[docs]
def set_status_running(self) -> bool:
self.status = "running"
return True
[docs]
def shutdown(self, message=None) -> None:
self.sigterm = True
self.deactivate_license()
self.stdout_msg(f"Shutdown is performed: {message}", log="warn")
[docs]
def sigterm_handler(self, signal, frame) -> None:
self.sigterm = True
self.stdout_msg(f"Processing SIGTERM - signal: {signal} - frame: {frame}", log="debug", stdout=False)
self.stdout_msg(f"Received SIGTERM, performing graceful shutdown ...", log="warn")
[docs]
async def sleep(self, seconds: int = MGMT_IS_READY_TIME) -> bool:
internal_sleep_time = 3
time_start = self.get_unix_timestamp()
time_limit = time_start + seconds
for i in range(int(seconds/internal_sleep_time)):
if self.get_unix_timestamp() < time_limit and self.is_shutdown() is False:
await asyncio.sleep(internal_sleep_time)
else:
break
return True
[docs]
@staticmethod
def sort_dict(input_dict: dict, reverse: bool = False) -> dict:
sorted_items = sorted(input_dict.items(), key=lambda item: item[0], reverse=reverse)
return dict(sorted_items)
[docs]
def start(self) -> None:
# Working Directory
if self.cwd:
os.chdir(self.cwd)
# Logging
if self.logger is None:
self.logger = logging.getLogger("unicorn_binance_depthcache_cluster")
logging.basicConfig(level=logging.DEBUG,
filename=f"{socket.gethostname()}.log",
format="{asctime} [{levelname:8}] {process} {thread} {module}: {message}",
style="{")
# App Info
self.info = {'name': self.app_name,
'version': self.get_version(),
'author': "LUCIT Systems and Development",
'build_type': "compiled" if self.is_compiled() else "source"}
info = (f"Starting {self.info['name']}_{self.info['version']}_{self.info['build_type']} by "
f"{self.info['author']} ...")
self.stdout_msg(info, log="info")
# Catch Termination Signals
self.register_graceful_shutdown()
# Runtime Information
self.get_k8s_runtime_information()
# Define and set the rest server port
self.set_api_rest_port()
# Running the core app
exception_shutdown = False
exception_shutdown_error = None
self.status = "running"
try:
self.service_call()
except KeyboardInterrupt:
self.stdout_msg(f"Keyboard interrupt was caught!", log="warn")
except Exception as error_msg:
exception_shutdown = True
exception_shutdown_error = error_msg
print("Exception occurred:")
traceback.print_exc()
finally:
if self.info['name'] == "lucit-ubdcc-mgmt":
if self.llm is not None:
self.llm.close()
else:
asyncio.run(self.ubdcc_node_cancellation())
if exception_shutdown is True:
if exception_shutdown_error:
self.stdout_msg(f"ERROR: {exception_shutdown_error}", log="critical")
self.stop_call()
self.stdout_msg(f"The system was gracefully shut down after a critical error was encountered.",
log="info")
sys.exit(1)
else:
# Shutdown
self.stop_call()
self.stdout_msg(f"Gracefully shutdown finished! Thank you and good bye ...", log="info")
sys.exit(0)
[docs]
def start_licensing_manager(self) -> bool:
try:
self.llm = LucitLicensingManager(api_secret=self.data['db'].get_license_api_secret(),
license_token=self.data['db'].get_license_license_token(),
parent_shutdown_function=self.deactivate_license,
program_used=self.app_name,
needed_license_type="UNICORN-BINANCE-SUITE",
start=True)
except NoValidatedLucitLicense as error_msg:
self.stdout_msg(error_msg, log="critical")
return False
if self.llm.get_license_exception() is None:
self.data['db'].set_license_status(status="VALID")
return True
else:
return False
[docs]
def stdout_msg(self, msg=None, log=None, stdout=True) -> bool:
if msg is None:
return False
if log is None and stdout is False:
return False
if log is not None:
if log == "debug":
self.logger.debug(msg)
elif log == "info":
self.logger.info(msg)
elif log == "warn":
self.logger.warn(msg)
elif log == "error":
self.logger.error(msg)
elif log == "critical":
self.logger.critical(msg)
else:
return False
if stdout is True:
print(msg, flush=True)
return True
[docs]
async def ubdcc_get_responsible_dcn_addresses(self,
exchange: str = None,
market: str = None):
self.stdout_msg(f"Get responsible DCN addresses for {market} on {exchange} ...", log="info")
endpoint = "/ubdcc_get_responsible_dcn_addresses"
host = self.get_cluster_mgmt_address()
query = (f"?exchange={exchange}&"
f"market={market}")
url = host + endpoint + query
result = await self.request(url=url, method="get")
if result.get('error_id') is None and result.get('error') is None:
self.stdout_msg(f"Successfully caught responsible DCN addresses for {market} on {exchange}!",
log="info")
return result
else:
self.stdout_msg(f"Can not catch responsible DCN addresses: {result}", log="error")
return None
[docs]
async def ubdcc_node_cancellation(self):
self.stdout_msg(f"Cancel node registration ...", log="info")
endpoint = "/ubdcc_node_cancellation"
host = self.get_cluster_mgmt_address()
query = f"?uid={self.id['uid']}"
url = host + endpoint + query
result = await self.request(url=url, method="get")
if result.get('error_id') is None and result.get('error') is None:
self.stdout_msg(f"Node cancellation successful!", log="info")
return True
elif result.get('error') is not None:
self.stdout_msg(f"Error during node cancellation: {result.get('error')}", log="warn")
return False
elif result.get('error_id') == "#1005":
self.stdout_msg(f"The node is no longer recognized by {url}.", log="warn")
return self.ubdcc_node_registration()
else:
self.stdout_msg(f"Error during node sync: {result.get('error_id')} - {result.get('message')}",
log="error")
return False
[docs]
async def ubdcc_node_registration(self, ubldc_version: str = None, retries: int = 30) -> bool:
self.stdout_msg(f"Starting node registration ...", log="info")
endpoint = "/ubdcc_node_registration"
query = (f"?name={self.id['name']}&"
f"uid={self.id['uid']}&"
f"node={self.id['node']}&"
f"role={self.info['name']}&"
f"api_port_rest={self.api_port_rest}&"
f"status={self.status}&"
f"version={self.get_version()}")
if ubldc_version is not None:
query = query + f"&ubldc_version={ubldc_version}"
loops = 0
result = None
while loops < retries and self.is_shutdown() is False:
loops += 1
host = self.get_cluster_mgmt_address()
url = host + endpoint + query
result = await self.request(url=url, method="get")
if result.get('error_id') is None and result.get('error') is None:
self.stdout_msg(f"Node registration succeeded!", log="info")
return True
elif result.get('error_id') == "#1003":
self.stdout_msg(f"The node is already recognized by {url}.", log="warn")
return True
elif result.get('error_id') == "#1014":
self.stdout_msg(f"Mgmt Service is not ready yet! Waiting a few seconds till retry!", log="warn")
await asyncio.sleep(self.mgmt_is_ready_time)
await asyncio.sleep(3)
self.stdout_msg(f"Error during node registration: {result.get('error_id')} - {result.get('error')}",
log="error")
return False
[docs]
async def ubdcc_node_sync(self) -> bool:
self.stdout_msg(f"Starting node sync ...", log="info")
endpoint = "/ubdcc_node_sync"
host = self.get_cluster_mgmt_address()
backup_timestamp = ""
query = (f"?uid={self.id['uid']}&"
f"node={self.id['node']}&"
f"api_port_rest={self.api_port_rest}&"
f"status={self.status}&"
f"backup_timestamp={backup_timestamp}")
url = host + endpoint + query
result = await self.request(url=url, method="get")
if result.get('error_id') is None and result.get('error') is None:
self.stdout_msg(f"Node sync succeeded!", log="info")
return True
elif result.get('error') is not None:
self.stdout_msg(f"Error during node sync: {result.get('error')}", log="warn")
return False
elif result.get('error_id') == "#1001":
self.stdout_msg(f"The node is no longer recognized by {url}.", log="warn")
return await self.ubdcc_node_registration()
else:
self.stdout_msg(f"Error during node sync: {result.get('error_id')} - {result.get('message')}",
log="error")
return False
[docs]
async def ubdcc_update_depthcache_distribution(self,
exchange: str = None,
market: str = None,
last_restart_time: int = None,
status: str = None) -> bool:
self.stdout_msg(f"Updating depthcache distribution ...", log="info")
endpoint = "/ubdcc_update_depthcache_distribution"
query = (f"?exchange={exchange}&"
f"market={market}&"
f"pod_uid={self.id['uid']}&"
f"last_restart_time={last_restart_time}&"
f"status={status}")
while self.is_shutdown() is False:
host = self.get_cluster_mgmt_address()
url = host + endpoint + query
result = await self.request(url=url, method="get")
if result.get('error_id') is None and result.get('error') is None:
self.stdout_msg(f"DepthCache distribution update succeeded!", log="info")
return True
elif result.get('error_id') == "#1023":
self.stdout_msg(f"Error during DepthCache distribution update: {result}",
log="error")
return False
else:
self.stdout_msg(f"Error during DepthCache distribution update: {result}",
log="error")
await asyncio.sleep(3)
return False
[docs]
def verify_license(self) -> bool:
llm = LucitLicensingManager(api_secret=self.data['db'].get_license_api_secret(),
license_token=self.data['db'].get_license_license_token(),
program_used=self.app_name,
needed_license_type="UNICORN-BINANCE-SUITE",
start=False)
result = llm.verify(api_secret=self.data['db'].get_license_api_secret(),
license_token=self.data['db'].get_license_license_token())
try:
if result.get('license').get('status') == "VALID":
return True
else:
return False
except AttributeError as error_msg:
self.stdout_msg(f"Invalid license! - AttributeError: {error_msg}", log="error")
return False