Source code for lucit_ubdcc_dcn.RestEndpoints

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/lucit-ubdcc-dcn/lucit_ubdcc_dcn/RestEndpoints.py
#
# Project website: https://www.lucit.tech/unicorn-depthcache-cluster-for-binance.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance
# Documentation: https://unicorn-depthcache-cluster-for-binance.docs.lucit.tech
# PyPI: https://pypi.org/project/lucit-ubdcc-dcn
# LUCIT Online Shop: https://shop.lucit.services/software/unicorn-depthcache-cluster-for-binance
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2024-2024, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.

from lucit_ubdcc_shared_modules.RestEndpointsBase import RestEndpointsBase, Request
from unicorn_binance_local_depth_cache import DepthCacheOutOfSync


[docs] class RestEndpoints(RestEndpointsBase): def __init__(self, app=None): super().__init__(app=app)
[docs] def register(self): super().register() @self.fastapi.get("/get_asks") async def get_asks(request: Request): return await self.get_asks(request=request) @self.fastapi.get("/get_bids") async def get_bids(request: Request): return await self.get_bids(request=request)
[docs] async def get_asks(self, request: Request): event = "GET_ASKS" exchange = request.query_params.get("exchange") market = request.query_params.get("market") limit_count = request.query_params.get("limit_count") threshold_volume = request.query_params.get("threshold_volume") if limit_count is None or limit_count == "None": limit_count = None else: limit_count = int(limit_count) if threshold_volume is None or threshold_volume == "None": threshold_volume = None else: threshold_volume = float(threshold_volume) for dc in self.app.data['local_depthcaches']: if dc['exchange'] == exchange and dc['market'] == market: try: asks = self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']].get_asks(market=dc['market'], limit_count=limit_count, threshold_volume=threshold_volume) except DepthCacheOutOfSync: return self.get_error_response(event=event, error_id="#6000", message=f"DepthCache '{market}' for '{exchange}' is out of sync!") return self.get_ok_response(event=event, params={"asks": asks}) return self.get_error_response(event=event, error_id="#7000", message=f"DepthCache '{market}' for '{exchange}'" f" not found!")
[docs] async def get_bids(self, request: Request): event = "GET_BIDS" exchange = request.query_params.get("exchange") market = request.query_params.get("market") limit_count = request.query_params.get("limit_count") threshold_volume = request.query_params.get("threshold_volume") if limit_count is None or limit_count == "None": limit_count = None else: limit_count = int(limit_count) if threshold_volume is None or threshold_volume == "None": threshold_volume = None else: threshold_volume = float(threshold_volume) for dc in self.app.data['local_depthcaches']: if dc['exchange'] == exchange and dc['market'] == market: try: bids = self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']].get_bids(market=dc['market'], limit_count=limit_count, threshold_volume=threshold_volume) except DepthCacheOutOfSync: return self.get_error_response(event=event, error_id="#6000", message=f"DepthCache '{market}' for '{exchange}' is out of sync!") return self.get_ok_response(event=event, params={"bids": bids}) return self.get_error_response(event=event, error_id="#7000", message=f"DepthCache '{market}' for '{exchange}'" f" not found!")