1
0
mirror of https://github.com/azlux/botamusique synced 2024-11-23 13:56:17 +00:00
botamusique/util.py
Sean 5224610654
Update pipe_no_wait function to work on FreeBSD
This very small change made the bot work on my FreeBSD server. Haven't had any issues so far.
2021-08-31 20:07:48 -07:00

564 lines
18 KiB
Python

#!/usr/bin/python3
# coding=utf-8
import hashlib
import html
import magic
import os
import io
import sys
import variables as var
import zipfile
import re
import subprocess as sp
import logging
import youtube_dl
from importlib import reload
from sys import platform
import traceback
import requests
from packaging import version
log = logging.getLogger("bot")
def solve_filepath(path):
if not path:
return ''
if path[0] == '/':
return path
elif os.path.exists(path):
return path
else:
mydir = os.path.dirname(os.path.realpath(__file__))
return mydir + '/' + path
def get_recursive_file_list_sorted(path):
filelist = []
for root, dirs, files in os.walk(path, topdown=True, onerror=None, followlinks=True):
relroot = root.replace(path, '', 1)
if relroot != '' and relroot in var.config.get('bot', 'ignored_folders'):
continue
for file in files:
if file in var.config.get('bot', 'ignored_files'):
continue
fullpath = os.path.join(path, relroot, file)
if not os.access(fullpath, os.R_OK):
continue
try:
mime = magic.from_file(fullpath, mime=True)
if 'audio' in mime or 'audio' in magic.from_file(fullpath).lower() or 'video' in mime:
filelist.append(os.path.join(relroot, file))
except:
pass
filelist.sort()
return filelist
# - zips files
# - returns the absolute path of the created zip file
# - zip file will be in the applications tmp folder (according to configuration)
# - format of the filename itself = prefix_hash.zip
# - prefix can be controlled by the caller
# - hash is a sha1 of the string representation of the directories' contents (which are
# zipped)
def zipdir(files, zipname_prefix=None):
zipname = var.tmp_folder
if zipname_prefix and '../' not in zipname_prefix:
zipname += zipname_prefix.strip().replace('/', '_') + '_'
_hash = hashlib.sha1(str(files).encode()).hexdigest()
zipname += _hash + '.zip'
if os.path.exists(zipname):
return zipname
zipf = zipfile.ZipFile(zipname, 'w', zipfile.ZIP_DEFLATED)
for file_to_add in files:
if not os.access(file_to_add, os.R_OK):
continue
if file_to_add in var.config.get('bot', 'ignored_files'):
continue
add_file_as = os.path.basename(file_to_add)
zipf.write(file_to_add, add_file_as)
zipf.close()
return zipname
def get_user_ban():
res = "List of ban hash"
for i in var.db.items("user_ban"):
res += "<br/>" + i[0]
return res
def new_release_version(target):
if target == "testing":
r = requests.get("https://packages.azlux.fr/botamusique/testing-version")
else:
r = requests.get("https://packages.azlux.fr/botamusique/version")
v = r.text
return v.rstrip()
def fetch_changelog():
r = requests.get("https://packages.azlux.fr/botamusique/changelog")
c = r.text
return c
def check_update(current_version):
global log
log.debug("update: checking for updates...")
new_version = new_release_version(var.config.get('bot', 'target_version'))
if version.parse(new_version) > version.parse(current_version):
changelog = fetch_changelog()
log.info(f"update: new version {new_version} found, current installed version {current_version}.")
log.info(f"update: changelog: {changelog}")
changelog = changelog.replace("\n", "<br>")
return new_version, changelog
else:
log.debug("update: no new version found.")
return None, None
def update(current_version):
global log
target = var.config.get('bot', 'target_version')
new_version = new_release_version(target)
msg = ""
if target == "git":
msg = "git install, I do nothing"
elif (target == "stable" and version.parse(new_version) > version.parse(current_version)) or \
(target == "testing" and version.parse(new_version) != version.parse(current_version)):
log.info('update: new version, start updating...')
tp = sp.check_output(['/usr/bin/env', 'bash', 'update.sh', target]).decode()
log.debug(tp)
log.info('update: update pip libraries dependencies')
sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', '-r', 'requirements.txt']).decode()
msg = "New version installed, please restart the bot."
log.info('update: starting update youtube-dl via pip3')
tp = sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', 'youtube-dl']).decode()
if "Requirement already up-to-date" in tp:
msg += "Youtube-dl is up-to-date"
else:
msg += "Update done: " + tp.split('Successfully installed')[1]
reload(youtube_dl)
msg += "<br/> Youtube-dl reloaded"
return msg
def pipe_no_wait():
""" Generate a non-block pipe used to fetch the STDERR of ffmpeg.
"""
if platform == "linux" or platform == "linux2" or platform == "darwin" or platform.startswith("openbsd") or platform.startswith("freebsd"):
import fcntl
import os
pipe_rd = 0
pipe_wd = 0
if hasattr(os, "pipe2"):
pipe_rd, pipe_wd = os.pipe2(os.O_NONBLOCK)
else:
pipe_rd, pipe_wd = os.pipe()
try:
fl = fcntl.fcntl(pipe_rd, fcntl.F_GETFL)
fcntl.fcntl(pipe_rd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
except:
print(sys.exc_info()[1])
return None, None
return pipe_rd, pipe_wd
elif platform == "win32":
# https://stackoverflow.com/questions/34504970/non-blocking-read-on-os-pipe-on-windows
import msvcrt
import os
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
pipe_rd, pipe_wd = os.pipe()
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
ERROR_NO_DATA = 232
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipe_rd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return None, None
return pipe_rd, pipe_wd
class Dir(object):
def __init__(self, path):
self.name = os.path.basename(path.strip('/'))
self.fullpath = path
self.subdirs = {}
self.files = []
def add_file(self, file):
if file.startswith(self.name + '/'):
file = file.replace(self.name + '/', '', 1)
if '/' in file:
# This file is in a subdir
subdir = file.split('/')[0]
if subdir in self.subdirs:
self.subdirs[subdir].add_file(file)
else:
self.subdirs[subdir] = Dir(os.path.join(self.fullpath, subdir))
self.subdirs[subdir].add_file(file)
else:
self.files.append(file)
return True
def get_subdirs(self, path=None):
subdirs = []
if path and path != '' and path != './':
subdir = path.split('/')[0]
if subdir in self.subdirs:
searchpath = '/'.join(path.split('/')[1::])
subdirs = self.subdirs[subdir].get_subdirs(searchpath)
subdirs = list(map(lambda subsubdir: os.path.join(subdir, subsubdir), subdirs))
else:
subdirs = self.subdirs
return subdirs
def get_subdirs_recursively(self, path=None):
subdirs = []
if path and path != '' and path != './':
subdir = path.split('/')[0]
if subdir in self.subdirs:
searchpath = '/'.join(path.split('/')[1::])
subdirs = self.subdirs[subdir].get_subdirs_recursively(searchpath)
else:
subdirs = list(self.subdirs.keys())
for key, val in self.subdirs.items():
subdirs.extend(map(lambda subdir: key + '/' + subdir, val.get_subdirs_recursively()))
subdirs.sort()
return subdirs
def get_files(self, path=None):
files = []
if path and path != '' and path != './':
subdir = path.split('/')[0]
if subdir in self.subdirs:
searchpath = '/'.join(path.split('/')[1::])
files = self.subdirs[subdir].get_files(searchpath)
else:
files = self.files
return files
def get_files_recursively(self, path=None):
files = []
if path and path != '' and path != './':
subdir = path.split('/')[0]
if subdir in self.subdirs:
searchpath = '/'.join(path.split('/')[1::])
files = self.subdirs[subdir].get_files_recursively(searchpath)
else:
files = self.files
for key, val in self.subdirs.items():
files.extend(map(lambda file: key + '/' + file, val.get_files_recursively()))
return files
def render_text(self, ident=0):
print('{}{}/'.format(' ' * (ident * 4), self.name))
for key, val in self.subdirs.items():
val.render_text(ident + 1)
for file in self.files:
print('{}{}'.format(' ' * (ident + 1) * 4, file))
# Parse the html from the message to get the URL
def get_url_from_input(string):
string = string.strip()
if not (string.startswith("http") or string.startswith("HTTP")):
res = re.search('href="(.+?)"', string, flags=re.IGNORECASE)
if res:
string = res.group(1)
else:
return ""
match = re.search("(http|https)://(\S*)?/(\S*)", string, flags=re.IGNORECASE)
if match:
url = match[1].lower() + "://" + match[2].lower() + "/" + match[3]
# https://github.com/mumble-voip/mumble/issues/4999
return html.unescape(url)
else:
return ""
def youtube_search(query):
global log
import json
try:
cookie = json.loads(var.config.get('bot', 'youtube_query_cookie', fallback='{}'))
r = requests.get("https://www.youtube.com/results", cookies=cookie,
params={'search_query': query}, timeout=5)
result_json_match = re.findall(r">var ytInitialData = (.*?);</script>", r.text)
if not len(result_json_match):
log.error("util: can not interpret youtube search web page")
return False
result_big_json = json.loads(result_json_match[0])
results = []
try:
for item in result_big_json['contents']['twoColumnSearchResultsRenderer']\
['primaryContents']['sectionListRenderer']['contents'][0]\
['itemSectionRenderer']['contents']:
if 'videoRenderer' not in item:
continue
video_info = item['videoRenderer']
title = video_info['title']['runs'][0]['text']
video_id = video_info['videoId']
uploader = video_info['ownerText']['runs'][0]['text']
results.append([video_id, title, uploader])
except (json.JSONDecodeError, KeyError):
log.error("util: can not interpret youtube search web page")
return False
return results
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout):
error_traceback = traceback.format_exc().split("During")[0]
log.error("util: youtube query failed with error:\n %s" % error_traceback)
return False
def get_media_duration(path):
command = ("ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", path)
process = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE)
stdout, stderr = process.communicate()
try:
if not stderr:
return float(stdout)
else:
return 0
except ValueError:
return 0
def parse_time(human):
match = re.search("(?:(\d\d):)?(?:(\d\d):)?(\d+(?:\.\d*)?)", human, flags=re.IGNORECASE)
if match:
if match[1] is None and match[2] is None:
return float(match[3])
elif match[2] is None:
return float(match[3]) + 60 * int(match[1])
else:
return float(match[3]) + 60 * int(match[2]) + 3600 * int(match[1])
else:
raise ValueError("Invalid time string given.")
def format_time(seconds):
hours = seconds // 3600
seconds = seconds % 3600
minutes = seconds // 60
seconds = seconds % 60
return f"{hours:d}:{minutes:02d}:{seconds:02d}"
def parse_file_size(human):
units = {"B": 1, "KB": 1024, "MB": 1024 * 1024, "GB": 1024 * 1024 * 1024, "TB": 1024 * 1024 * 1024 * 1024,
"K": 1024, "M": 1024 * 1024, "G": 1024 * 1024 * 1024, "T": 1024 * 1024 * 1024 * 1024}
match = re.search("(\d+(?:\.\d*)?)\s*([A-Za-z]+)", human, flags=re.IGNORECASE)
if match:
num = float(match[1])
unit = match[2].upper()
if unit in units:
return int(num * units[unit])
raise ValueError("Invalid file size given.")
def get_salted_password_hash(password):
salt = os.urandom(10)
hashed = hashlib.pbkdf2_hmac('sha1', password.encode("utf-8"), salt, 100000)
return hashed.hex(), salt.hex()
def verify_password(password, salted_hash, salt):
hashed = hashlib.pbkdf2_hmac('sha1', password.encode("utf-8"), bytearray.fromhex(salt), 100000)
if hashed.hex() == salted_hash:
return True
return False
def get_supported_language():
root_dir = os.path.dirname(__file__)
lang_files = os.listdir(os.path.join(root_dir, 'lang'))
lang_list = []
for lang_file in lang_files:
match = re.search("([a-z]{2}_[A-Z]{2})\.json", lang_file)
if match:
lang_list.append(match[1])
return lang_list
def set_logging_formatter(handler: logging.Handler, logging_level):
if logging_level == logging.DEBUG:
formatter = logging.Formatter(
"[%(asctime)s] > [%(threadName)s] > "
"[%(filename)s:%(lineno)d] %(message)s"
)
else:
formatter = logging.Formatter(
'[%(asctime)s %(levelname)s] %(message)s', "%b %d %H:%M:%S")
handler.setFormatter(formatter)
def get_snapshot_version():
import subprocess
wd = os.getcwd()
root_dir = os.path.dirname(__file__)
os.chdir(root_dir)
ver = "unknown"
if os.path.exists(os.path.join(root_dir, ".git")):
try:
ret = subprocess.check_output(["git", "describe", "--tags"]).strip()
ver = ret.decode("utf-8")
except FileNotFoundError:
try:
with open(os.path.join(root_dir, ".git/refs/heads/master")) as f:
ver = "g" + f.read()[:7]
except FileNotFoundError:
pass
os.chdir(wd)
return ver
class LoggerIOWrapper(io.TextIOWrapper):
def __init__(self, logger: logging.Logger, logging_level, fallback_io_buffer):
super().__init__(fallback_io_buffer, write_through=True)
self.logger = logger
self.logging_level = logging_level
def write(self, text):
if isinstance(text, bytes):
msg = text.decode('utf-8').rstrip()
self.logger.log(self.logging_level, msg)
super().write(msg + "\n")
else:
self.logger.log(self.logging_level, text.rstrip())
super().write(text + "\n")
class VolumeHelper:
def __init__(self, plain_volume=0, ducking_plain_volume=0):
self.plain_volume_set = 0
self.plain_ducking_volume_set = 0
self.volume_set = 0
self.ducking_volume_set = 0
self.real_volume = 0
self.set_volume(plain_volume)
self.set_ducking_volume(ducking_plain_volume)
def set_volume(self, plain_volume):
self.volume_set = self._convert_volume(plain_volume)
self.plain_volume_set = plain_volume
def set_ducking_volume(self, plain_volume):
self.ducking_volume_set = self._convert_volume(plain_volume)
self.plain_ducking_volume_set = plain_volume
def _convert_volume(self, volume):
if volume == 0:
return 0
# convert input of 0~1 into -35~5 dB
dB = -35 + volume * 40
# Some dirty trick to stretch the function, to make to be 0 when input is -35 dB
return (10 ** (dB / 20) - 10 ** (-35 / 20)) / (1 - 10 ** (-35 / 20))
def get_size_folder(path):
global log
folder_size = 0
for (path, dirs, files) in os.walk(path):
for file in files:
filename = os.path.join(path, file)
try:
folder_size += os.path.getsize(filename)
except (FileNotFoundError, OSError):
continue
return int(folder_size / (1024 * 1024))
def clear_tmp_folder(path, size):
global log
if size == -1:
return
elif size == 0:
for (path, dirs, files) in os.walk(path):
for file in files:
filename = os.path.join(path, file)
try:
os.remove(filename)
except (FileNotFoundError, OSError):
continue
else:
if get_size_folder(path=path) > size:
all_files = ""
for (path, dirs, files) in os.walk(path):
all_files = [os.path.join(path, file) for file in files]
all_files.sort(key=lambda x: os.path.getmtime(x))
size_tp = 0
for idx, file in enumerate(all_files):
size_tp += os.path.getsize(file)
if int(size_tp / (1024 * 1024)) > size:
log.info("Cleaning tmp folder")
to_remove = all_files[:idx]
print(to_remove)
for f in to_remove:
log.debug("Removing " + f)
try:
os.remove(os.path.join(path, f))
except (FileNotFoundError, OSError):
continue
return