Source code for lucit_ubdcc_shared_modules.ServiceBase
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/lucit-ubdcc-shared-modules/lucit_ubdcc_shared_modules/ServiceBase.py
#
# Project website: https://www.lucit.tech/unicorn-depthcache-cluster-for-binance.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance
# Documentation: https://unicorn-depthcache-cluster-for-binance.docs.lucit.tech
# PyPI: https://pypi.org/project/lucit-ubdcc-shared-modules
# LUCIT Online Shop: https://shop.lucit.services/software/unicorn-depthcache-cluster-for-binance
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-depthcache-cluster-for-binance/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2024-2024, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.
import asyncio
import socket
from .App import App
from .RestServer import RestServer
from .Database import Database
[docs]
class ServiceBase:
def __init__(self, app_name=None, cwd=None):
self.db: Database | None = None
self.rest_server = None
self.app = App(app_name=app_name,
cwd=cwd,
service=self,
service_call=self.run,
stop_call=self.stop)
self.app.start()
# Never gets executed ;)
[docs]
def db_init(self) -> bool:
self.app.stdout_msg(f"Starting Database ...", log="info")
if self.db is None:
self.db = Database(app=self.app)
return True
return False
[docs]
@staticmethod
def is_port_free(port, host='127.0.0.1'):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((host, port))
return True
except OSError:
return False
[docs]
async def main(self) -> None:
# Override with specific Service main() function
pass
[docs]
def run(self) -> None:
self.app.stdout_msg(f"Starting the main execution flow ...", log="debug", stdout=False)
asyncio.run(self.main())
[docs]
async def start_rest_server(self, endpoints=None) -> bool:
while self.is_port_free(port=self.app.api_port_rest) is False:
self.app.api_port_rest = self.app.api_port_rest + 1
self.rest_server = RestServer(app=self.app, endpoints=endpoints, port=self.app.api_port_rest)
self.rest_server.start()
await asyncio.sleep(1)
return True
[docs]
def stop(self) -> bool:
try:
if self.rest_server:
self.rest_server.stop()
return True
except AttributeError as error_msg:
self.app.stdout_msg(f"ERROR: {error_msg}", log="info")
return False