Source code for lucit_ubdcc_shared_modules.RestEndpointsBase

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/lucit-ubdcc-shared-modules/lucit_ubdcc_shared_modules/RestEndpointsBase.py
#
# Project website: https://www.lucit.tech/unicorn-depthcache-cluster-for-binance.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance
# Documentation: https://unicorn-depthcache-cluster-for-binance.docs.lucit.tech
# PyPI: https://pypi.org/project/lucit-ubdcc-shared-modules
# LUCIT Online Shop: https://shop.lucit.services/software/unicorn-depthcache-cluster-for-binance
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2024-2024, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.

import json
import time
from fastapi import Request
from fastapi.responses import JSONResponse


[docs] class RestEndpointsBase: def __init__(self, app=None): self.app = app self.fastapi = app.get_fastapi_instance()
[docs] def create_cluster_info_response(self) -> dict: if self.app.data.get('db') is None: response = {} else: response = {"db": {"depthcaches": self.app.data['db'].get('depthcaches'), "license": self.app.data['db'].get('license'), "nodes": self.app.data['db'].get('nodes'), "pods": self.app.data['db'].get('pods'), "timestamp": self.app.data['db'].get('timestamp')}, "version": None} if self.app.info['name'] == "lucit-ubdcc-mgmt": response['version'] = self.app.get_version() return response
[docs] @staticmethod def create_debug_response(process_start_time: float = None, url: str = None, used_pods: list = None) -> dict: return {"cluster_execution_time": time.time() - process_start_time, "request_time": 0, "transmission_time": 0, "url": url, "used_pods": used_pods}
[docs] def create_depthcache_list_response(self) -> dict: if self.app.data.get('db') is None: response = {} else: response = {"depthcache_list": self.app.data['db'].get_depthcache_list()} return response
[docs] def create_depthcache_info_response(self, exchange: str = None, market: str = None) -> dict: if self.app.data.get('db') is None: response = {} else: response = {"depthcache_info": self.app.data['db'].get_depthcache_info(exchange=exchange, market=market)} return response
[docs] def get_fastapi_instance(self): return self.fastapi
[docs] def get_error_response(self, event: str = None, error_id: str = None, message: str = None, params: dict = None, process_start_time: float = None, url: str = None, used_pods: list = None): response = {"event": event, "message": message, "result": "ERROR"} if error_id is not None: response['error_id'] = error_id if params: response.update(params) if process_start_time is not None: response['debug'] = self.create_debug_response(process_start_time=process_start_time, url=url, used_pods=used_pods) response_sorted = self.app.sort_dict(input_dict=response) return JSONResponse(status_code=200, content=response_sorted)
[docs] def get_ok_response(self, event: str = None, params: dict = None, process_start_time: float = None, url: str = None, used_pods: list = None): response = {"event": event, "result": "OK"} if params: response.update(params) if process_start_time is not None: response['debug'] = self.create_debug_response(process_start_time=process_start_time, url=url, used_pods=used_pods) response_sorted = self.app.sort_dict(input_dict=response) return JSONResponse(status_code=200, content=response_sorted)
[docs] def is_ready(self): try: if self.app.data['is_ready'] is True: return True else: if (self.app.data['start_timestamp'] + self.app.mgmt_is_ready_time) < self.app.get_unix_timestamp(): self.app.data['is_ready'] = True return True else: return False except KeyError: self.app.data['is_ready'] = False self.app.data['start_timestamp'] = self.app.get_unix_timestamp() return False
[docs] def register(self): self.app.stdout_msg(f"Registering REST endpoints ...", log="info") @self.fastapi.get("/test") async def test(request: Request): return await self.test(request=request) @self.fastapi.get("/ubdcc_mgmt_backup") @self.fastapi.post("/ubdcc_mgmt_backup") async def ubdcc_mgmt_backup(request: Request): return await self.ubdcc_mgmt_backup(request=request)
[docs] async def test(self, request: Request): event = "TEST" response = {"message": f"Hello World!", "headers": f"{request.headers}", "app": self.app.info, "ubdcc_mgmt_backup": self.app.ubdcc_mgmt_backup} if self.app.pod_info is not None: pod = {"name": self.app.pod_info.metadata.name, "uid": self.app.pod_info.metadata.uid, "namespace": self.app.pod_info.metadata.namespace, "labels": self.app.pod_info.metadata.labels, "node": self.app.pod_info.spec.node_name} response['pod'] = pod return self.get_ok_response(event=event, params=response)
[docs] def throw_error_if_mgmt_not_ready(self, request: Request, event: str = None): if self.is_ready() is False: self.app.stdout_msg(f"Mgmt Service is not ready yet! Telling '{request.query_params.get('uid')}' to come " f"back later!", log="warn") return self.get_error_response(event=event, error_id="#1014", message=f"Mgmt Service is not ready yet! Please come back in a few seconds!") else: return None
[docs] async def ubdcc_mgmt_backup(self, request: Request): event = "UBDCC_MGMT_BACKUP" request_body = await request.body() if not request_body.decode('utf-8').strip('"'): # Get request: # provide timestamp of the stored backup backup_timestamp = request.query_params.get("get_backup_timestamp") if backup_timestamp is not None: return self.get_ok_response(event=event, params={"timestamp": self.app.get_backup_timestamp()}) # provide the backup data for restore self.app.stdout_msg(f"Provided backup database!", log="info") return self.get_ok_response(event=event, params={"db": self.app.ubdcc_mgmt_backup}) else: # Post request: save the backup data if self.app.info['name'] != "lucit-ubdcc-mgmt": self.app.data['db-cache'] = json.loads(request_body) try: self.app.data['db'].replace_data(data=self.app.data['db-cache']) except KeyError as error_msg: self.app.stdout_msg(f"Database not available: {error_msg}", log="debug", stdout=False) self.app.ubdcc_mgmt_backup = json.loads(request_body.decode('utf-8')) return self.get_ok_response(event=event, params={"message": "The backup has been saved!"})