Source code for lucit_ubdcc_restapi.RestEndpoints

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/lucit-ubdcc-restapi/lucit_ubdcc_restapi/RestEndpoints.py
#
# Project website: https://www.lucit.tech/unicorn-depthcache-cluster-for-binance.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance
# Documentation: https://unicorn-depthcache-cluster-for-binance.docs.lucit.tech
# PyPI: https://pypi.org/project/lucit-ubdcc-restapi
# LUCIT Online Shop: https://shop.lucit.services/software/unicorn-depthcache-cluster-for-binance
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2024-2024, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.

from lucit_ubdcc_shared_modules.RestEndpointsBase import RestEndpointsBase, Request
import time


[docs] class RestEndpoints(RestEndpointsBase): def __init__(self, app=None): super().__init__(app=app)
[docs] def register(self): super().register() @self.fastapi.get("/create_depthcache") async def create_depthcache(request: Request): return await self.create_depthcache(request=request) @self.fastapi.get("/create_depthcaches") async def create_depthcaches(request: Request): return await self.create_depthcaches(request=request) @self.fastapi.get("/get_asks") async def get_asks(request: Request): return await self.get_asks(request=request) @self.fastapi.get("/get_bids") async def get_bids(request: Request): return await self.get_bids(request=request) @self.fastapi.get("/get_cluster_info") async def get_cluster_info(request: Request): return await self.get_cluster_info(request=request) @self.fastapi.get("/get_depthcache_list") async def get_depthcache_list(request: Request): return await self.get_depthcache_list(request=request) @self.fastapi.get("/get_depthcache_info") async def get_depthcache_info(request: Request): return await self.get_depthcache_info(request=request) @self.fastapi.get("/stop_depthcache") async def stop_depthcache(request: Request): return await self.stop_depthcache(request=request) @self.fastapi.get("/submit_license") async def submit_license(request: Request): return await self.submit_license(request=request)
async def _get_depthcache_data(self, request: Request, event=None, endpoint=None): process_start_time: float | None = time.time() if str(request.query_params.get("debug")).lower() == "true" \ else None exchange = request.query_params.get("exchange") market = request.query_params.get("market") request_url = str(request.url) responsible_dcn = await self.app.ubdcc_get_responsible_dcn_addresses(exchange=exchange, market=market) limit_count = request.query_params.get("limit_count") threshold_volume = request.query_params.get("threshold_volume") used_pods: list = [[self.app.id['name'], self.app.id['uid']]] if responsible_dcn is None: if not exchange or not market: return self.get_error_response(event=event, error_id="#1025", process_start_time=process_start_time, message="Missing required parameter: exchange, market", url=request_url, used_pods=used_pods) addresses = self.app.data['db'].get_responsible_dcn_addresses(exchange=exchange, market=market) else: addresses = responsible_dcn['addresses'] if len(addresses) == 0: if self.app.data['db'].exists_depthcache(exchange=exchange, market=market): return self.get_error_response(event=event, error_id="#4000", process_start_time=process_start_time, message=f"No DCN found for '{market}' on '{exchange}'!", url=request_url, used_pods=used_pods) else: return self.get_error_response(event=event, error_id="#7000", process_start_time=process_start_time, message=f"DepthCache '{market}' for '{exchange}' not found!", url=request_url, used_pods=used_pods) query = (f"?exchange={exchange}&" f"market={market}&" f"limit_count={limit_count}&" f"threshold_volume={threshold_volume}") result_errors = [] first_choice_dcn = self.app.get_dcn_uid_unused_longest_time(selection=[address[2] for address in addresses]) for i, address in enumerate(addresses): if address[2] == first_choice_dcn: addresses.insert(0, addresses.pop(i)) for address, port, uid in addresses: self.app.stdout_msg(f"Connecting http://{address}:{port}/{endpoint}{query} ...") url = f"http://{address}:{port}" + endpoint + query result = await self.app.request(url=url, method="get") if result.get('error') is None and result.get('error_id') is None: if str(request.query_params.get("debug")).lower() == "true": pod = self.app.data['db'].get_pod_by_address(address=address) used_pods.append([pod.get('NAME'), pod.get('UID')]) result['debug'] = self.create_debug_response(process_start_time=process_start_time, url=request_url, used_pods=used_pods) return result result_errors.append([address, port, str(result)]) self.app.stdout_msg(f"No DCN has responded to the request: {result_errors}") return self.get_error_response(event=event, error_id="#5000", message=f"No DCN has responded to the request!", params={"requests": result_errors}, process_start_time=process_start_time, url=request_url, used_pods=used_pods)
[docs] async def create_depthcache(self, request: Request): process_start_time: float | None = time.time() if str(request.query_params.get("debug")).lower() == "true" else None event = "CREATE_DEPTHCACHE" endpoint = "/create_depthcache" request_url = str(request.url) used_pods: list = [[self.app.id['name'], self.app.id['uid']]] host = self.app.get_cluster_mgmt_address() exchange = request.query_params.get("exchange") market = request.query_params.get("market") desired_quantity = request.query_params.get("desired_quantity") update_interval = request.query_params.get("update_interval") refresh_interval = request.query_params.get("refresh_interval") query = (f"?exchange={exchange}&" f"market={market}&" f"update_interval={update_interval}&" f"refresh_interval={refresh_interval}&" f"desired_quantity={desired_quantity}") url = host + endpoint + query result = await self.app.request(url=url, method="get") if result.get('error') is not None and result.get('error_id') is not None: return self.get_error_response(event=event, error_id="#9000", message=f"Mgmt service not available!", params={"error": str(result)}, process_start_time=process_start_time, url=request_url, used_pods=used_pods) elif result.get('error_id') is not None: return self.get_error_response(event=event, error_id=result.get('error_id'), message=result.get('message'), process_start_time=process_start_time, url=request_url, used_pods=used_pods) else: if str(request.query_params.get("debug")).lower() == "true": result['debug'] = self.create_debug_response(process_start_time=process_start_time, url=request_url, used_pods=used_pods) return result
[docs] async def create_depthcaches(self, request: Request): process_start_time: float | None = time.time() if str(request.query_params.get("debug")).lower() == "true" else None event = "CREATE_DEPTHCACHES" endpoint = "/create_depthcaches" request_url = str(request.url) used_pods: list = [[self.app.id['name'], self.app.id['uid']]] host = self.app.get_cluster_mgmt_address() exchange = request.query_params.get("exchange") markets = request.query_params.get("markets") desired_quantity = request.query_params.get("desired_quantity") update_interval = request.query_params.get("update_interval") refresh_interval = request.query_params.get("refresh_interval") query = (f"?exchange={exchange}&" f"markets={markets}&" f"update_interval={update_interval}&" f"refresh_interval={refresh_interval}&" f"desired_quantity={desired_quantity}") url = host + endpoint + query result = await self.app.request(url=url, method="get") if result.get('error') is not None and result.get('error_id') is not None: return self.get_error_response(event=event, error_id="#9000", message=f"Mgmt service not available!", params={"error": str(result)}, process_start_time=process_start_time, url=request_url, used_pods=used_pods) elif result.get('error_id') is not None: return self.get_error_response(event=event, error_id=result.get('error_id'), message=result.get('message'), url=request_url, process_start_time=process_start_time, used_pods=used_pods) else: if str(request.query_params.get("debug")).lower() == "true": result['debug'] = self.create_debug_response(process_start_time=process_start_time, url=request_url, used_pods=used_pods) return result
[docs] async def get_asks(self, request: Request): event = "GET_ASKS" endpoint = "/get_asks" return await self._get_depthcache_data(request=request, event=event, endpoint=endpoint)
[docs] async def get_bids(self, request: Request): event = "GET_BIDS" endpoint = "/get_bids" return await self._get_depthcache_data(request=request, event=event, endpoint=endpoint)
[docs] async def get_cluster_info(self, request: Request): process_start_time: float | None = time.time() if str(request.query_params.get("debug")).lower() == "true" else None event = "GET_CLUSTER_INFO" endpoint = "/get_cluster_info" request_url = str(request.url) used_pods: list = [[self.app.id['name'], self.app.id['uid']]] host = self.app.get_cluster_mgmt_address() url = host + endpoint result = await self.app.request(url=url, method="get") if result.get('error') is None and result.get('error_id') is None: if str(request.query_params.get("debug")).lower() == "true": result['debug'] = self.create_debug_response(process_start_time=process_start_time, url=request_url, used_pods=used_pods) return result elif result.get('error_id') is not None: return self.get_error_response(event=event, error_id=result.get('error_id'), message=result.get('message'), process_start_time=process_start_time, url=request_url, used_pods=used_pods) else: response = self.create_cluster_info_response() response['error'] = str(result) if self.app.data.get('db') is None: return self.get_error_response(event=event, error_id="#9000", message=f"Mgmt service not available!", params=response, process_start_time=process_start_time, url=request_url, used_pods=used_pods) else: return self.get_error_response(event=event, error_id="#8000", message=f"Mgmt service not available! This is cached data from pod " f"'{self.app.id['uid']}'!", params=response, process_start_time=process_start_time, url=request_url, used_pods=used_pods)
[docs] async def get_depthcache_list(self, request: Request): process_start_time: float | None = time.time() if str(request.query_params.get("debug")).lower() == "true" else None event = "GET_DEPTHCACHE_LIST" endpoint = "/get_depthcache_list" request_url = str(request.url) host = self.app.get_cluster_mgmt_address() used_pods: list = [[self.app.id['name'], self.app.id['uid']]] url = host + endpoint result = await self.app.request(url=url, method="get") if result.get('error') is None and result.get('error_id') is None: if str(request.query_params.get("debug")).lower() == "true": result['debug'] = self.create_debug_response(process_start_time=process_start_time, url=request_url, used_pods=used_pods) return result elif result.get('error_id') is not None: return self.get_error_response(event=event, error_id=result.get('error_id'), message=result.get('message'), process_start_time=process_start_time, url=request_url, used_pods=used_pods) else: response = self.create_depthcache_list_response() response['error'] = str(result) if self.app.data.get('db') is None: return self.get_error_response(event=event, error_id="#9000", message=f"Mgmt service not available!", params=response, process_start_time=process_start_time, url=request_url, used_pods=used_pods) else: return self.get_error_response(event=event, error_id="#8000", message=f"Mgmt service not available! This is cached data from pod " f"'{self.app.id['uid']}'!", params=response, process_start_time=process_start_time, url=request_url, used_pods=used_pods)
[docs] async def get_depthcache_info(self, request: Request): process_start_time: float | None = time.time() if str(request.query_params.get("debug")).lower() == "true" else None event = "GET_DEPTHCACHE_INFO" endpoint = "/get_depthcache_info" request_url = str(request.url) used_pods: list = [[self.app.id['name'], self.app.id['uid']]] host = self.app.get_cluster_mgmt_address() exchange = request.query_params.get("exchange") market = request.query_params.get("market") query = (f"?exchange={exchange}&" f"market={market}") url = host + endpoint + query result = await self.app.request(url=url, method="get") if result.get('error') is None and result.get('error_id') is None: if str(request.query_params.get("debug")).lower() == "true": result['debug'] = self.create_debug_response(process_start_time=process_start_time, url=request_url, used_pods=used_pods) return result elif result.get('error_id') is not None: return self.get_error_response(event=event, error_id=result.get('error_id'), message=result.get('message'), process_start_time=process_start_time, url=request_url, used_pods=used_pods) else: response = self.create_depthcache_info_response(exchange=exchange, market=market) response['error'] = str(result) if self.app.data.get('db') is None: return self.get_error_response(event=event, error_id="#9000", message=f"Mgmt service not available!", params=response, process_start_time=process_start_time, url=request_url, used_pods=used_pods) else: return self.get_error_response(event=event, error_id="#8000", message=f"Mgmt service not available! This is cached data from pod " f"'{self.app.id['uid']}'!", params=response, process_start_time=process_start_time, url=request_url, used_pods=used_pods)
[docs] async def stop_depthcache(self, request: Request): process_start_time: float | None = time.time() if str(request.query_params.get("debug")).lower() == "true" else None event = "STOP_DEPTHCACHE" endpoint = "/stop_depthcache" request_url = str(request.url) host = self.app.get_cluster_mgmt_address() used_pods: list = [[self.app.id['name'], self.app.id['uid']]] exchange = request.query_params.get("exchange") market = request.query_params.get("market") query = (f"?exchange={exchange}&" f"market={market}") url = host + endpoint + query result = await self.app.request(url=url, method="get") if result.get('error') is None and result.get('error_id') is None: if str(request.query_params.get("debug")).lower() == "true": result['debug'] = self.create_debug_response(process_start_time=process_start_time, url=request_url, used_pods=used_pods) return result elif result.get('error_id') is not None: return self.get_error_response(event=event, error_id=result.get('error_id'), message=result.get('message'), process_start_time=process_start_time, url=request_url, used_pods=used_pods) else: return self.get_error_response(event=event, error_id="#9000", message=f"Mgmt service not available!", params={"error": str(result)}, process_start_time=process_start_time, url=request_url, used_pods=used_pods)
[docs] async def submit_license(self, request: Request): process_start_time: float | None = time.time() if str(request.query_params.get("debug")).lower() == "true" else None event = "SUBMIT_LICENSE" request_url = str(request.url) used_pods: list = [[self.app.id['name'], self.app.id['uid']]] api_secret = request.query_params.get("api_secret") license_token = request.query_params.get("license_token") endpoint = "/submit_license" host = self.app.get_cluster_mgmt_address() query = (f"?api_secret={api_secret}&" f"license_token={license_token}") url = host + endpoint + query result = await self.app.request(url=url, method="get") if result.get('error') is None: if str(request.query_params.get("debug")).lower() == "true": result['debug'] = self.create_debug_response(process_start_time=process_start_time, url=request_url, used_pods=used_pods) return result else: return self.get_error_response(event=event, error_id="#9000", message=f"Mgmt service not available!", params={"error": str(result)}, process_start_time=process_start_time, url=request_url, used_pods=used_pods)