#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/lucit-ubdcc-mgmt/lucit_ubdcc_mgmt/RestEndpoints.py
#
# Project website: https://www.lucit.tech/unicorn-depthcache-cluster-for-binance.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance
# Documentation: https://unicorn-depthcache-cluster-for-binance.docs.lucit.tech
# PyPI: https://pypi.org/project/lucit-ubdcc-mgmt
# LUCIT Online Shop: https://shop.lucit.services/software/unicorn-depthcache-cluster-for-binance
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2024-2024, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.
import base64
import json
from lucit_ubdcc_shared_modules.Database import Database
from lucit_ubdcc_shared_modules.RestEndpointsBase import RestEndpointsBase, Request
[docs]
class RestEndpoints(RestEndpointsBase):
def __init__(self, app=None):
super().__init__(app=app)
self.db: Database = self.app.data['db']
[docs]
def register(self):
super().register()
@self.fastapi.get("/create_depthcache")
async def create_depthcache(request: Request):
return await self.create_depthcache(request=request)
@self.fastapi.get("/create_depthcaches")
async def create_depthcaches(request: Request):
return await self.create_depthcaches(request=request)
@self.fastapi.get("/get_cluster_info")
async def get_cluster_info(request: Request):
return await self.get_cluster_info(request=request)
@self.fastapi.get("/get_depthcache_list")
async def get_depthcache_list(request: Request):
return await self.get_depthcache_list(request=request)
@self.fastapi.get("/get_depthcache_info")
async def get_depthcache_info(request: Request):
return await self.get_depthcache_info(request=request)
@self.fastapi.get("/stop_depthcache")
async def stop_depthcache(request: Request):
return await self.stop_depthcache(request=request)
@self.fastapi.get("/submit_license")
async def submit_license(request: Request):
return await self.submit_license(request=request)
@self.fastapi.get("/ubdcc_get_responsible_dcn_addresses")
async def ubdcc_get_responsible_dcn_addresses(request: Request):
return await self.ubdcc_get_responsible_dcn_addresses(request=request)
@self.fastapi.get("/ubdcc_node_cancellation")
async def ubdcc_node_cancellation(request: Request):
return await self.ubdcc_node_cancellation(request=request)
@self.fastapi.get("/ubdcc_node_registration")
async def ubdcc_node_registration(request: Request):
return await self.ubdcc_node_registration(request=request)
@self.fastapi.get("/ubdcc_node_sync")
async def ubdcc_node_sync(request: Request):
return await self.ubdcc_node_sync(request=request)
@self.fastapi.get("/ubdcc_update_depthcache_distribution")
async def ubdcc_update_depthcache_distribution(request: Request):
return await self.ubdcc_update_depthcache_distribution(request=request)
[docs]
async def create_depthcache(self, request: Request):
event = "CREATE_DEPTHCACHE"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
exchange = request.query_params.get("exchange", None)
market = request.query_params.get("market", None)
desired_quantity = request.query_params.get("desired_quantity", None)
update_interval = request.query_params.get("update_interval", None)
refresh_interval = request.query_params.get("refresh_interval", None)
if exchange == "None":
exchange = None
if market == "None":
market = None
if desired_quantity is None or desired_quantity == "None":
desired_quantity = 1
else:
desired_quantity = int(desired_quantity)
if update_interval is None or update_interval == "None":
update_interval = None
else:
update_interval = int(update_interval)
if refresh_interval is None or refresh_interval == "None":
refresh_interval = None
else:
refresh_interval = int(refresh_interval)
if exchange is None or market is None:
return self.get_error_response(event=event, error_id="#1016",
message="Missing required parameter: exchange, market")
if self.db.exists_depthcache(exchange=exchange, market=market):
return self.get_error_response(event=event, error_id="#1024",
message=f"DepthCache '{market}' for '{exchange}' already exists!")
try:
result = self.db.add_depthcache(exchange=exchange, market=market, update_interval=update_interval,
refresh_interval=refresh_interval, desired_quantity=desired_quantity)
except ValueError as error_msg:
return self.get_error_response(event=event, error_id="#1017", message=str(error_msg))
if result is True:
# Todo:
# Add Option to run:
# self.db.manage_distribution()
return self.get_ok_response(event=event)
else:
return self.get_error_response(event=event, error_id="#1018", message="An unknown error has occurred!")
[docs]
async def create_depthcaches(self, request: Request):
event = "CREATE_DEPTHCACHES"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
exchange = request.query_params.get("exchange", None)
markets = request.query_params.get("markets", None)
desired_quantity = request.query_params.get("desired_quantity", None)
update_interval = request.query_params.get("update_interval", None)
refresh_interval = request.query_params.get("refresh_interval", None)
if exchange == "None":
exchange = None
if markets == "None":
markets = None
if desired_quantity is None or desired_quantity == "None":
desired_quantity = 1
else:
desired_quantity = int(desired_quantity)
if update_interval is None or update_interval == "None":
update_interval = None
else:
update_interval = int(update_interval)
if refresh_interval is None or refresh_interval == "None":
refresh_interval = None
else:
refresh_interval = int(refresh_interval)
if exchange is None or markets is None:
return self.get_error_response(event=event, error_id="#1016",
message="Missing required parameter: exchange, markets")
markets = json.loads(base64.b64decode(markets).decode('utf-8'))
for market in markets:
if self.db.exists_depthcache(exchange=exchange, market=market) is False:
try:
result = self.db.add_depthcache(exchange=exchange,
market=market,
update_interval=update_interval,
refresh_interval=refresh_interval,
desired_quantity=desired_quantity)
except ValueError as error_msg:
self.app.stdout(f"ERROR: {error_msg}", log="error")
continue
if result is True:
pass
# Todo:
# Add Option to run:
# self.db.manage_distribution()
else:
return self.get_error_response(event=event, error_id="#1018",
message="An unknown error has occurred!")
return self.get_ok_response(event=event)
[docs]
async def get_cluster_info(self, request: Request):
event = "GET_CLUSTER_INFO"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
response = self.create_cluster_info_response()
return self.get_ok_response(event=event, params=response)
[docs]
async def get_depthcache_list(self, request: Request):
event = "GET_DEPTHCACHE_LIST"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
response = self.create_depthcache_list_response()
return self.get_ok_response(event=event, params=response)
[docs]
async def get_depthcache_info(self, request: Request):
event = "GET_DEPTHCACHE_INFO"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
exchange = request.query_params.get("exchange", None)
market = request.query_params.get("market", None)
if market == "None":
exchange = None
if market == "None":
exchange = None
if exchange is None or market is None:
return self.get_error_response(event=event, error_id="#1006",
message="Missing required parameter: exchange, market")
response = self.create_depthcache_info_response(exchange=exchange, market=market)
if not response['depthcache_info']:
return self.get_error_response(event=event, error_id="#7000", message=f"DepthCache '{market}' for "
f"'{exchange}' not found!")
return self.get_ok_response(event=event, params=response)
[docs]
async def stop_depthcache(self, request: Request):
event = "STOP_DEPTHCACHE"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
exchange = request.query_params.get("exchange", None)
market = request.query_params.get("market", None)
if market == "None":
exchange = None
if market == "None":
exchange = None
if exchange is None or market is None:
return self.get_error_response(event=event, error_id="#1019",
message="Missing required parameter: exchange, market")
if not self.db.exists_depthcache(exchange=exchange, market=market):
return self.get_error_response(event=event, error_id="#7000", message=f"DepthCache '{market}' for "
f"'{exchange}' not found!")
try:
result = self.db.delete_depthcache(exchange=exchange, market=market)
except ValueError as error_msg:
return self.get_error_response(event=event, error_id="#1020", message=str(error_msg))
if result is True:
return self.get_ok_response(event=event)
else:
return self.get_error_response(event=event, error_id="#1021", message="An unknown error has occurred!")
[docs]
async def submit_license(self, request: Request):
event = "SUBMIT_LICENSE"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
api_secret = request.query_params.get("api_secret", None)
license_token = request.query_params.get("license_token", None)
if api_secret == "None":
api_secret = None
if license_token == "None":
license_token = None
if api_secret is None or license_token is None:
return self.get_error_response(event=event, error_id="#1007",
message="Missing required parameter: api_secret, license_token")
self.db.submit_license(api_secret=api_secret, license_token=license_token)
if self.db.get_license_status() == "VALID":
self.app.llm.close()
if self.app.start_licensing_manager():
return self.get_ok_response(event=event,
params={"message": "The license has been successfully validated and UBDCC "
"is now ready for operation! Have fun! ;)"})
return self.get_error_response(event=event, error_id="#1011", message="The license is invalid!")
[docs]
async def ubdcc_get_responsible_dcn_addresses(self, request: Request):
event = "UBDCC_GET_RESPONSIBLE_DCN_ADDRESSES"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
exchange = request.query_params.get("exchange", None)
market = request.query_params.get("market", None)
if market == "None":
exchange = None
if market == "None":
exchange = None
if exchange is None or market is None:
return self.get_error_response(event=event, error_id="#1012",
message="Missing required parameter: exchange, market")
result = self.db.get_responsible_dcn_addresses(exchange=exchange, market=market)
if result is True:
return self.get_ok_response(event=event, params={"addresses": result})
else:
return self.get_error_response(event=event, error_id="#1013",
message=f"No addresses of responsible DCN for '{market}' from '{exchange}' "
f"found!")
[docs]
async def ubdcc_node_cancellation(self, request: Request):
event = "UBDCC_NODE_CANCELLATION"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
uid = request.query_params.get("uid")
if uid == "None":
uid = None
if uid is None:
return self.get_error_response(event=event, error_id="#1004", message="Missing required parameter: uid")
if not self.db.exists_pod(uid=uid):
return self.get_error_response(event=event, error_id="#1005",
message=f"A pod with the uid '{uid}' does not exist!")
result = self.db.delete_pod(uid=uid)
if result is True:
return self.get_ok_response(event=event)
else:
return self.get_error_response(event=event, error_id="#1008", message="An unknown error has occurred!")
[docs]
async def ubdcc_node_registration(self, request: Request):
event = "UBDCC_NODE_REGISTRATION"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
name = request.query_params.get("name", None)
uid = request.query_params.get("uid", None)
node = request.query_params.get("node", None)
role = request.query_params.get("role", None)
api_port_rest = request.query_params.get("api_port_rest", None)
status = request.query_params.get("status", None)
ubldc_version = request.query_params.get("ubldc_version", None)
version = request.query_params.get("version", None)
if name == "None":
name = None
if uid == "None":
uid = None
if node == "None":
node = None
if role == "None":
role = None
if api_port_rest == "None":
api_port_rest = None
if status == "None":
status = None
if ubldc_version == "None":
ubldc_version = None
if version == "None":
version = None
if name is None or uid is None or node is None or role is None or api_port_rest is None or status is None:
return self.get_error_response(event=event, error_id="#1002",
message="Missing required parameter: name, uid, node, role, api_port_rest, "
"status")
if self.db.exists_pod(uid=uid):
return self.get_error_response(event=event, error_id="#1003",
message=f"A pod with the uid '{uid}' already exists!")
result = self.db.add_pod(name=name,
uid=uid,
node=node,
role=role,
ip=request.client.host,
api_port_rest=int(api_port_rest),
ubldc_version=ubldc_version,
status=status,
version=version)
if result is True:
await self.app.send_backup_to_node(host=request.client.host, port=api_port_rest)
return self.get_ok_response(event=event)
else:
return self.get_error_response(event=event, error_id="#1009", message="An unknown error has occurred!")
[docs]
async def ubdcc_node_sync(self, request: Request):
event = "UBDCC_NODE_SYNC"
uid = request.query_params.get("uid", None)
node = request.query_params.get("node", None)
api_port_rest = request.query_params.get("api_port_rest", None)
status = request.query_params.get("status", None)
if uid == "None":
uid = None
if node == "None":
node = None
if api_port_rest == "None":
api_port_rest = None
if status == "None":
status = None
if uid is None or api_port_rest is None:
return self.get_error_response(event=event, error_id="#1000",
message="Missing required parameter: uid, api_port_rest")
if not self.db.exists_pod(uid=uid) and self.db.is_empty() is True:
backup = await self.app.get_backup_from_node(host=request.client.host, port=api_port_rest)
if backup is not None:
source_ip = request.client.host
source_port = api_port_rest
source_uid = uid
timestamp_limit = float(backup['timestamp'])
pods = []
for pod in backup['pods']:
pods.append(backup['pods'][pod]['UID'])
timestamp = await self.app.get_backup_timestamp_from_node(host=backup['pods'][pod]['IP'],
port=backup['pods'][pod]['API_PORT_REST'])
if timestamp is not None:
if timestamp_limit < timestamp:
source_ip = pod['IP']
source_port = pod['API_PORT_REST']
source_uid = pod['UID']
timestamp_limit = timestamp
self.app.stdout_msg(f"Found pods: {pods}", log="info")
if source_uid != uid:
backup = await self.app.get_backup_from_node(host=source_ip, port=source_port)
if backup is not None:
self.db.replace_data(data=backup)
if self.db.get_license_status() == "VALID":
if self.app.start_licensing_manager() is False:
self.db.set_license_status(status="INVALID")
self.app.data['is_ready'] = True
self.app.stdout_msg(f"Loaded database from pod '{source_uid}'!", log="info")
if not self.db.exists_pod(uid=uid):
return self.get_error_response(event=event, error_id="#1001",
message=f"Registration for pod '{uid}' not found!")
result = self.db.update_pod(uid=uid,
node=node,
ip=request.client.host,
status=status)
pod = self.db.get_pod_by_uid(uid=uid)
if result is True:
await self.app.send_backup_to_node(host=request.client.host, port=pod['API_PORT_REST'])
return self.get_ok_response(event=event)
else:
return self.get_error_response(event=event, error_id="#1010",
message="An unknown error has occurred!")
[docs]
async def ubdcc_update_depthcache_distribution(self, request: Request):
event = "UBDCC_UPDATE_DEPTHCACHE_DISTRIBUTION"
ready_check = self.throw_error_if_mgmt_not_ready(request=request, event=event)
if ready_check is not None:
return ready_check
exchange = request.query_params.get("exchange", None)
market = request.query_params.get("market", None)
pod_uid = request.query_params.get("pod_uid", None)
last_restart_time = request.query_params.get("last_restart_time", None)
status = request.query_params.get("status", None)
if exchange == "None":
exchange = None
if market == "None":
market = None
if pod_uid == "None":
pod_uid = None
if last_restart_time == "None":
last_restart_time = None
if status == "None":
status = None
if exchange is None or market is None or pod_uid is None:
return self.get_error_response(event=event, error_id="#1015",
message="Missing required parameter: exchange, market, pod_uid")
if last_restart_time is None and status is None:
return self.get_error_response(event=event, error_id="#1022",
message="Nothing to update! Missing parameter: last_restart_time, status")
result = self.db.update_depthcache_distribution(exchange=exchange, market=market,
pod_uid=pod_uid, status=status)
if result is True:
return self.get_ok_response(event=event)
else:
return self.get_error_response(event=event, error_id="#1023", message=f"DepthCache '{exchange} {market} "
f"{pod_uid}' not found!")