#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/lucit-ubdcc-mgmt/lucit_ubdcc_mgmt/Database.py
#
# Project website: https://www.lucit.tech/unicorn-depthcache-cluster-for-binance.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance
# Documentation: https://unicorn-depthcache-cluster-for-binance.docs.lucit.tech
# PyPI: https://pypi.org/project/lucit-ubdcc-shared-modules
# LUCIT Online Shop: https://shop.lucit.services/software/unicorn-depthcache-cluster-for-binance
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2024-2024, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.
import threading
import time
[docs]
class Database:
def __init__(self, app=None):
self.app = app
self.app.data['db'] = self
self.data = {}
self.data_lock = threading.Lock()
self._init()
def _init(self) -> bool:
self.app.stdout_msg(f"Initiating Database ...", log="info")
self.set(key="depthcaches", value={})
self.set(key="license", value={"api_secret": "",
"license_token": "",
"status": "INVALID"})
self.set(key="nodes", value={})
self.set(key="pods", value={})
self.set(key="timestamp", value=float())
if self.app.info['name'] == "lucit-ubdcc-mgmt":
self.update_nodes()
return True
def _set_update_timestamp(self) -> bool:
self.data['timestamp'] = self.app.get_unix_timestamp()
return True
[docs]
def is_empty(self) -> bool:
if len(self.data['pods']) == 0 and \
len(self.data['depthcaches']) == 0 and \
len(self.data['license']['api_secret']) == 0 and \
len(self.data['license']['license_token']) == 0:
return True
return False
[docs]
def add_depthcache(self,
exchange: str = None,
market: str = None,
desired_quantity: int = None,
update_interval: int = None,
refresh_interval: int = None) -> bool:
if exchange is None or market is None:
raise ValueError("Missing mandatory parameter: exchange, market")
if desired_quantity is None or desired_quantity == "None":
desired_quantity = 1
else:
desired_quantity = int(desired_quantity)
if update_interval is None or update_interval == "None":
update_interval = None
else:
update_interval = int(update_interval)
if refresh_interval is None or refresh_interval == "None":
refresh_interval = None
else:
refresh_interval = int(refresh_interval)
depthcache = {"CREATED_TIME": self.app.get_unix_timestamp(),
"DESIRED_QUANTITY": desired_quantity,
"DISTRIBUTION": {},
"EXCHANGE": exchange,
"REFRESH_INTERVAL": refresh_interval,
"MARKET": market,
"UPDATE_INTERVAL": update_interval}
with self.data_lock:
if self.data['depthcaches'].get(exchange) is None:
self.data['depthcaches'][exchange] = {}
self.data['depthcaches'][exchange][market] = depthcache
self._set_update_timestamp()
return True
[docs]
def add_depthcache_distribution(self,
exchange: str = None,
market: str = None,
pod_uid: str = None,
scheduled_start_time: float = None) -> bool:
if exchange is None or market is None or pod_uid is None or scheduled_start_time is None:
raise ValueError("Missing mandatory parameter: exchange, pod_uid, market, scheduled_start_time")
distribution = {"CREATED_TIME": self.app.get_unix_timestamp(),
"LAST_RESTART_TIME": 0,
"POD_UID": pod_uid,
"SCHEDULED_START_TIME": scheduled_start_time,
"STATUS": "starting"}
with self.data_lock:
self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid] = distribution
self._set_update_timestamp()
return True
[docs]
def add_pod(self, name: str = None, uid: str = None, node: str = None, role: str = None, ip: str = None,
api_port_rest: int = None, status: str = None, ubldc_version: str = None, version: str = None) -> bool:
if uid is None:
raise ValueError("Missing mandatory parameter: uid")
pod = {"NAME": name,
"UID": uid,
"NODE": node,
"ROLE": role,
"IP": ip,
"API_PORT_REST": api_port_rest,
"LAST_SEEN": self.app.get_unix_timestamp(),
"STATUS": status,
"UBLDC_VERSION": ubldc_version,
"VERSION": version}
if ubldc_version is None:
del pod['UBLDC_VERSION']
with self.data_lock:
self.data['pods'][uid] = pod
self._set_update_timestamp()
return True
[docs]
def delete(self, key: str = None) -> bool:
with self.data_lock:
if key in self.data:
del self.data[key]
self._set_update_timestamp()
self.app.stdout_msg(f"DB entry deleted: {key}", log="debug", stdout=False)
return True
self.app.stdout_msg(f"DB entry {key} not found.", log="debug", stdout=False)
return False
[docs]
def delete_depthcache(self, exchange: str = None, market: str = None) -> bool:
if exchange is None or market is None:
raise ValueError("Missing mandatory parameter: exchange, market")
with self.data_lock:
try:
del self.data["depthcaches"][exchange][market]
except KeyError:
return True
self._set_update_timestamp()
self.app.stdout_msg(f"DB depthcaches deleted: {exchange}, {market}", log="debug")
return True
[docs]
def delete_depthcache_distribution(self, exchange: str = None, market: str = None, pod_uid: str = None) -> bool:
if exchange is None or market is None or pod_uid is None:
raise ValueError("Missing mandatory parameter: exchange, pod_uid, market")
with self.data_lock:
del self.data["depthcaches"][exchange][market]['DISTRIBUTION'][pod_uid]
self._set_update_timestamp()
self.app.stdout_msg(f"DB depthcache distribution deleted: {exchange}, {market}, {pod_uid}", log="debug")
return True
[docs]
def delete_pod(self, uid: str = None) -> bool:
if uid is None:
raise ValueError("Missing mandatory parameter: uid")
with self.data_lock:
del self.data["pods"][uid]
self._set_update_timestamp()
self.app.stdout_msg(f"DB pod deleted: {uid}", log="debug", stdout=True)
return True
[docs]
def delete_old_pods(self) -> bool:
old_pods = []
max_age = 60
with self.data_lock:
for uid in self.data['pods']:
if (time.time() - max_age) > self.data['pods'][uid]['LAST_SEEN']:
old_pods.append(uid)
for uid in old_pods:
self.delete_pod(uid=uid)
return True
[docs]
def exists_depthcache(self, exchange: str = None, market: str = None) -> bool:
if exchange is None or market is None:
raise ValueError("Missing mandatory parameter: exchange, market")
with self.data_lock:
try:
return market in self.data['depthcaches'][exchange]
except KeyError:
return False
[docs]
def exists_pod(self, uid: str = None) -> bool:
if uid is None:
raise ValueError("Missing mandatory parameter: uid")
return uid in self.data['pods']
[docs]
def get(self, key: str = None):
with self.data_lock:
return self.data.get(key)
[docs]
def get_all(self) -> dict:
with self.data_lock:
return self.data
[docs]
def get_available_dcn_pods(self) -> dict:
available_dcn_pods = {}
for uid in self.data['pods']:
if self.data['pods'][uid]['ROLE'] == "lucit-ubdcc-dcn":
try:
available_dcn_pods[uid] = self.data['nodes'][self.data['pods'][uid]['NODE']]['USAGE_CPU_PERCENT']
except KeyError:
available_dcn_pods[uid] = 0
return available_dcn_pods
[docs]
def get_backup_dict(self) -> dict:
with self.data_lock:
return self.app.sort_dict(input_dict=self.app.data['db'].data)
[docs]
def get_best_dcn(self, available_pods: dict = None, excluded_pods: list = None) -> str | None:
if available_pods is None:
available_pods = self.get_available_dcn_pods()
for uid in excluded_pods:
try:
del available_pods[uid]
except KeyError:
pass
available_pods_uid: list = [uid for uid in available_pods.keys()]
return self.app.get_dcn_uid_unused_longest_time(selection=available_pods_uid)
[docs]
def get_dcn_responsibilities(self) -> list:
with (self.data_lock):
responsibilities = []
for exchange in self.data['depthcaches']:
for market in self.data['depthcaches'][exchange]:
for pod_uid in self.data['depthcaches'][exchange][market]['DISTRIBUTION']:
if pod_uid == self.app.id['uid'] and \
self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid]['SCHEDULED_START_TIME'] < \
self.app.get_unix_timestamp():
responsibilities.append({"exchange": exchange,
"market": market,
"refresh_interval": self.data['depthcaches'][exchange][market]['REFRESH_INTERVAL'],
"update_interval": self.data['depthcaches'][exchange][market]['UPDATE_INTERVAL']})
return responsibilities
[docs]
def get_depthcache_list(self) -> dict:
with self.data_lock:
try:
return self.data['depthcaches']
except KeyError:
return {}
[docs]
def get_depthcache_info(self, exchange: str = None, market: str = None) -> dict:
if exchange is None or market is None:
raise ValueError("Missing mandatory parameter: exchange, market")
with self.data_lock:
try:
return self.data['depthcaches'][exchange][market]
except KeyError:
return {}
[docs]
def get_license_api_secret(self) -> str:
with self.data_lock:
return self.data['license']['api_secret']
[docs]
def get_license_license_token(self) -> str:
with self.data_lock:
return self.data['license']['license_token']
[docs]
def get_license_status(self) -> str:
with self.data_lock:
return self.data['license']['status']
[docs]
def get_pod_by_address(self, address: str = None) -> dict | None:
if address is None:
raise ValueError("Missing mandatory parameter: address")
with self.data_lock:
try:
for uid in self.data['pods']:
if self.data['pods'][uid]['IP'] == address:
return self.data['pods'][uid]
except KeyError:
return None
[docs]
def get_pod_by_uid(self, uid=None) -> dict | None:
if uid is None:
raise ValueError("Missing mandatory parameter: uid")
with self.data_lock:
try:
return self.data['pods'][uid]
except KeyError:
return None
[docs]
def get_responsible_dcn_addresses(self, exchange: str = None, market: str = None) -> list:
with self.data_lock:
responsible_dcn = []
try:
for pod_uid in self.data['depthcaches'][exchange][market]['DISTRIBUTION']:
if self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid]['STATUS'] == "running":
responsible_dcn.append([self.data['pods'][pod_uid]['IP'],
self.data['pods'][pod_uid]['API_PORT_REST'],
pod_uid])
except KeyError:
pass
return responsible_dcn
[docs]
def get_worst_dcn(self, available_pods: dict | None = None, excluded_pods: list = None) -> str | None:
if available_pods is None:
available_pods = self.get_available_dcn_pods()
# Todo: Find worst pod (cpu or subscriptions or ...)
worst_pod = None
for uid in available_pods:
if uid not in excluded_pods:
worst_pod = uid
return worst_pod
[docs]
def replace_data(self, data: dict = None):
with self.data_lock:
self.data = data
return True
[docs]
def remove_orphaned_distribution_entries(self) -> bool:
with self.data_lock:
remove_distributions = []
for exchange in self.data['depthcaches']:
for market in self.data['depthcaches'][exchange]:
for pod_uid in self.data['depthcaches'][exchange][market]['DISTRIBUTION']:
if self.exists_pod(uid=pod_uid) is False:
remove_distributions.append({"exchange": exchange,
"market": market,
"pod_uid": pod_uid})
with self.data_lock:
for item in remove_distributions:
del self.data['depthcaches'][item['exchange']][item['market']]['DISTRIBUTION'][item['pod_uid']]
self._set_update_timestamp()
return True
[docs]
def revise(self) -> bool:
start_time = time.time()
if self.app.data['db'].get_license_status() == "VALID":
self.app.stdout_msg(f"Revise the Database ...", log="info")
self.update_nodes()
self.delete_old_pods()
self.remove_orphaned_distribution_entries()
self.manage_distribution()
run_time = time.time() - start_time
self.app.stdout_msg(f"Database revised in {run_time} seconds!", log="info")
return True
else:
self.app.stdout_msg(f"Please submit a valid license!", log="critical")
return False
[docs]
def manage_distribution(self) -> bool:
add_distributions = []
remove_distributions = []
with self.data_lock:
for exchange in self.data['depthcaches']:
print(f"exchange={exchange}")
for market in self.data['depthcaches'][exchange]:
print(f"market={market}")
existing_distribution = []
for pod_uid in self.data['depthcaches'][exchange][market]['DISTRIBUTION']:
existing_distribution.append(pod_uid)
existing_quantity = len(self.data['depthcaches'][exchange][market]['DISTRIBUTION'])
desired_quantity = self.data['depthcaches'][exchange][market]['DESIRED_QUANTITY']
if existing_quantity < desired_quantity:
add_quantity = desired_quantity - existing_quantity
delayed_start_time: float = 300.0
for i in range(0, add_quantity):
best_dcn = self.get_best_dcn(excluded_pods=existing_distribution)
print(f"best_dcn={best_dcn}")
if best_dcn is not None:
if existing_quantity > 0:
multiplikator = i + existing_quantity
else:
multiplikator = i
scheduled_start_time = self.app.get_unix_timestamp() + float(multiplikator) * delayed_start_time
add_distributions.append({"exchange": exchange,
"market": market,
"pod_uid": best_dcn,
"scheduled_start_time": scheduled_start_time})
existing_distribution.append(best_dcn)
print(f"ADDED: {add_distributions}")
break
elif existing_quantity > desired_quantity:
remove_quantity = existing_quantity - desired_quantity
exclude_dcn = []
available_pods = {}
for uid in existing_distribution:
available_pods[uid] = uid
for _ in range(0, remove_quantity):
worst_dcn = self.get_worst_dcn(available_pods=available_pods,
excluded_pods=exclude_dcn)
if worst_dcn is not None:
exclude_dcn.append(worst_dcn)
remove_distributions.append({"exchange": exchange,
"market": market,
"pod_uid": worst_dcn})
for item in add_distributions:
self.add_depthcache_distribution(exchange=item['exchange'],
market=item['market'],
pod_uid=item['pod_uid'],
scheduled_start_time=item['scheduled_start_time'])
for item in remove_distributions:
self.delete_depthcache_distribution(exchange=item['exchange'],
market=item['market'],
pod_uid=item['pod_uid'])
return True
[docs]
def set(self, key: str = None, value: dict | str | float | list | set | tuple = None) -> bool:
with self.data_lock:
self.data[key] = value
self._set_update_timestamp()
self.app.stdout_msg(f"DB entry added/updated: {key} = {value}", log="debug", stdout=False)
return True
[docs]
def set_license_status(self, status: str = None) -> bool:
if status is None:
raise ValueError("Missing mandatory parameter: status")
with self.data_lock:
self.data['license']['status'] = status
self._set_update_timestamp()
self.app.stdout_msg(f"DB license status change to: {status}", log="debug", stdout=False)
return True
[docs]
def submit_license(self, api_secret: str = None, license_token: str = None) -> bool:
if api_secret is None or license_token is None:
raise ValueError("Missing mandatory parameter: api_secret, license_token")
with self.data_lock:
self.data['license']['api_secret'] = api_secret
self.data['license']['license_token'] = license_token
self._set_update_timestamp()
self.app.stdout_msg(f"DB license submitted: {api_secret}, {license_token}", log="debug", stdout=False)
return True
[docs]
def update_nodes(self) -> bool:
nodes = self.app.get_k8s_nodes()
if nodes:
self.set(key="nodes", value=nodes)
self.app.stdout_msg(f"DB all nodes updated!", log="debug", stdout=False)
return True
else:
self.app.stdout_msg(f"Timed update of the DB key 'nodes': Query of the k8s nodes was empty, no "
f"update is performed!", log="error", stdout=True)
return False
[docs]
def update_depthcache(self,
desired_quantity: int = None,
exchange: str = None,
refresh_interval: int = None,
market: str = None,
update_interval: int = None) -> bool:
if exchange is None or market is None:
raise ValueError("Missing mandatory parameter: exchange, market")
with self.data_lock:
if desired_quantity is not None:
self.data['depthcaches'][exchange][market]['DESIRED_QUANTITY'] = desired_quantity
self._set_update_timestamp()
if update_interval is not None:
self.data['depthcaches'][exchange][market]['UPDATE_INTERVAL'] = update_interval
self._set_update_timestamp()
if refresh_interval is not None:
self.data['depthcaches'][exchange][market]['REFRESH_INTERVAL'] = refresh_interval
self._set_update_timestamp()
self.app.stdout_msg(f"DB depthcaches updated: {exchange}, {market}, {desired_quantity}, {update_interval}",
log="debug")
return True
[docs]
def update_depthcache_distribution(self,
exchange: str = None,
market: str = None,
pod_uid: str = None,
last_restart_time: float = None,
status: str = None) -> bool:
if exchange is None or market is None or pod_uid is None:
raise ValueError("Missing mandatory parameter: exchange, pod_uid, market")
with self.data_lock:
if last_restart_time is not None:
self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid]['LAST_RESTART_TIME'] = \
last_restart_time
self._set_update_timestamp()
if status is not None:
try:
self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid]['STATUS'] = status
except KeyError:
return False
self._set_update_timestamp()
self.app.stdout_msg(f"DB depthcache distribution updated: {exchange}, {market}, {pod_uid}, {last_restart_time},"
f" {status}", log="debug")
return True
[docs]
def update_pod(self, uid: str = None, node: str = None, ip: str = None, api_port_rest: int = None,
status: str = None) -> bool:
if uid is None:
raise ValueError("Missing mandatory parameter: uid")
with self.data_lock:
self.data['pods'][uid]['LAST_SEEN'] = self.app.get_unix_timestamp()
if api_port_rest is not None:
self.data['pods'][uid]['API_PORT_REST'] = api_port_rest
self._set_update_timestamp()
if ip is not None:
self.data['pods'][uid]['IP'] = ip
self._set_update_timestamp()
if node is not None:
self.data['pods'][uid]['NODE'] = node
self._set_update_timestamp()
if status is not None:
self.data['pods'][uid]['STATUS'] = status
self._set_update_timestamp()
self.app.stdout_msg(f"DB pod updated: {uid}", log="debug", stdout=False)
return True