Source code for lucit_ubdcc_dcn.DepthCacheNode
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/lucit-ubdcc-dcn/lucit_ubdcc_dcn/DepthCacheNode.py
#
# Project website: https://www.lucit.tech/unicorn-depthcache-cluster-for-binance.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance
# Documentation: https://unicorn-depthcache-cluster-for-binance.docs.lucit.tech
# PyPI: https://pypi.org/project/lucit-ubdcc-dcn
# LUCIT Online Shop: https://shop.lucit.services/software/unicorn-depthcache-cluster-for-binance
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2024-2024, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.
from .RestEndpoints import RestEndpoints
from lucit_licensing_python.exceptions import NoValidatedLucitLicense
from lucit_ubdcc_shared_modules.ServiceBase import ServiceBase
from unicorn_binance_local_depth_cache import BinanceLocalDepthCacheManager, DepthCacheNotFound
from unicorn_binance_local_depth_cache.manager import __version__ as ubldc_version
[docs]
class DepthCacheNode(ServiceBase):
def __init__(self, cwd=None):
super().__init__(app_name="lucit-ubdcc-dcn", cwd=cwd)
[docs]
async def main(self):
self.app.data['depthcache_instances'] = {}
self.app.data['local_depthcaches'] = []
self.app.data['responsibilities'] = []
await self.start_rest_server(endpoints=RestEndpoints)
self.app.set_status_running()
await self.app.register_or_restart(ubldc_version=ubldc_version)
self.db_init()
while self.app.is_shutdown() is False:
await self.app.sleep()
await self.app.ubdcc_node_sync()
self.app.data['responsibilities'] = self.db.get_dcn_responsibilities()
self.app.stdout_msg(f"Local DepthCaches: {self.app.data['local_depthcaches']}", log="debug", stdout=False)
self.app.stdout_msg(f"Responsibilities: {self.app.data['responsibilities']}", log="debug", stdout=False)
for dc in self.app.data['responsibilities']:
if self.app.is_shutdown() is True:
break
if dc not in self.app.data['local_depthcaches']:
# Create DC
self.app.stdout_msg(f"Adding local DC: {dc}", log="info")
if self.app.data['depthcache_instances'].get(dc['exchange']) is None:
self.app.data['depthcache_instances'][dc['exchange']] = {}
if self.app.data['depthcache_instances'][dc['exchange']].get(dc['update_interval']) is None:
if self.app.data['db'].get_license_status() == "VALID":
if dc['update_interval'] is None:
try:
self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']] = \
BinanceLocalDepthCacheManager(
exchange=dc['exchange'],
lucit_api_secret=self.db.get_license_api_secret(),
lucit_license_token=self.db.get_license_license_token()
)
except NoValidatedLucitLicense as error_msg:
self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']] = None
self.app.stdout_msg(error_msg, log="critical")
await self.app.ubdcc_update_depthcache_distribution(exchange=dc['exchange'],
market=dc['market'],
status="stopped")
else:
try:
self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']] = \
BinanceLocalDepthCacheManager(
exchange=dc['exchange'],
depth_cache_update_interval=dc['update_interval'],
lucit_api_secret=self.db.get_license_api_secret(),
lucit_license_token=self.db.get_license_license_token()
)
except NoValidatedLucitLicense as error_msg:
self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']] = None
self.app.stdout_msg(error_msg, log="critical")
await self.app.ubdcc_update_depthcache_distribution(exchange=dc['exchange'],
market=dc['market'],
status="stopped")
else:
await self.app.ubdcc_update_depthcache_distribution(exchange=dc['exchange'],
market=dc['market'],
status="stopped")
self.app.stdout_msg(f"UBLDC instance cannot be started because no valid license is "
f"available!", log="critical")
break
else:
self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']].create_depth_cache(
markets=dc['market'],
refresh_interval=dc['refresh_interval']
)
await self.app.ubdcc_update_depthcache_distribution(exchange=dc['exchange'],
market=dc['market'],
status="running")
self.app.data['local_depthcaches'].append(dc)
await self.app.ubdcc_node_sync()
stop_depthcaches = {}
for dc in self.app.data['local_depthcaches']:
if self.app.is_shutdown() is True:
break
if dc not in self.app.data['responsibilities']:
# Stop DC
self.app.stdout_msg(f"Removing local DC: {dc}", log="info")
if stop_depthcaches.get(dc['exchange']) is None:
stop_depthcaches[dc['exchange']] = {dc['update_interval']: {'markets': [dc['market']]}}
else:
if stop_depthcaches[dc['exchange']].get(dc['update_interval']) is None:
stop_depthcaches[dc['exchange']] = {dc['update_interval']: {'markets': [dc['market']]}}
else:
stop_depthcaches[dc['exchange']][dc['update_interval']]['markets'].append(dc['market'])
self.app.data['local_depthcaches'].remove(dc)
for exchange in stop_depthcaches:
for update_interval in stop_depthcaches[exchange]:
try:
self.app.data['depthcache_instances'][exchange][update_interval].stop_depthcache(
markets=stop_depthcaches[exchange][update_interval]['markets']
)
except DepthCacheNotFound as error_msg:
self.app.stdout_msg(f"DepthCache not found: {error_msg}", log="error")
self.app.stdout_msg(f"Stopping all DepthCache instances ...", log="error")
for dc in self.app.data['local_depthcaches']:
for update_interval in self.app.data['depthcache_instances'][dc['exchange']]:
self.app.data['depthcache_instances'][dc['exchange']][update_interval].stop_manager()