1
0
mirror of https://github.com/azlux/botamusique synced 2024-11-23 13:56:17 +00:00
botamusique/mumbleBot.py
2021-05-26 11:29:22 +08:00

907 lines
36 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import threading
import time
import sys
import math
import signal
import configparser
import audioop
import subprocess as sp
import argparse
import os
import os.path
import pymumble_py3 as pymumble
import pymumble_py3.constants
import variables as var
import logging
import logging.handlers
import traceback
import struct
from packaging import version
import util
import command
import constants
import media.playlist
from constants import tr_cli as tr
from database import SettingsDatabase, MusicDatabase, DatabaseMigration
from media.item import ValidationFailedError, PreparationFailedError
from media.cache import MusicCache
class MumbleBot:
version = 'git'
def __init__(self, args):
self.log = logging.getLogger("bot")
self.log.info(f"bot: botamusique version {self.get_version()}, starting...")
signal.signal(signal.SIGINT, self.ctrl_caught)
self.cmd_handle = {}
self.stereo = var.config.getboolean('bot', 'stereo', fallback=True)
if args.channel:
self.channel = args.channel
else:
self.channel = var.config.get("server", "channel", fallback=None)
var.user = args.user
var.is_proxified = var.config.getboolean(
"webinterface", "is_web_proxified")
# Flags to indicate the bot is exiting (Ctrl-C, or !kill)
self.exit = False
self.nb_exit = 0
# Related to ffmpeg thread
self.thread = None
self.thread_stderr = None
self.read_pcm_size = 0
self.pcm_buffer_size = 0
self.last_ffmpeg_err = ""
# Play/pause status
self.is_pause = False
self.pause_at_id = ""
self.playhead = -1 # current position in a song.
self.song_start_at = -1
self.wait_for_ready = False # flag for the loop are waiting for download to complete in the other thread
#
self.on_interrupting = False
if args.host:
host = args.host
else:
host = var.config.get("server", "host")
if args.port:
port = args.port
else:
port = var.config.getint("server", "port")
if args.password:
password = args.password
else:
password = var.config.get("server", "password")
if args.channel:
self.channel = args.channel
else:
self.channel = var.config.get("server", "channel")
if args.certificate:
certificate = args.certificate
else:
certificate = util.solve_filepath(var.config.get("server", "certificate"))
if args.tokens:
tokens = args.tokens
else:
tokens = var.config.get("server", "tokens")
tokens = tokens.split(',')
if args.user:
self.username = args.user
else:
self.username = var.config.get("bot", "username")
if args.bandwidth:
self.bandwidth = args.bandwidth
else:
self.bandwidth = var.config.getint("bot", "bandwidth")
self.mumble = pymumble.Mumble(host, user=self.username, port=port, password=password, tokens=tokens,
stereo=self.stereo,
debug=var.config.getboolean('debug', 'mumbleConnection'),
certfile=certificate)
self.mumble.callbacks.set_callback(pymumble.constants.PYMUMBLE_CLBK_TEXTMESSAGERECEIVED, self.message_received)
self.mumble.set_codec_profile("audio")
self.mumble.start() # start the mumble thread
self.mumble.is_ready() # wait for the connection
if self.mumble.connected >= pymumble.constants.PYMUMBLE_CONN_STATE_FAILED:
exit()
self.set_comment()
self.mumble.users.myself.unmute() # by sure the user is not muted
self.join_channel()
self.mumble.set_bandwidth(self.bandwidth)
# ====== Volume ======
self.volume_helper = util.VolumeHelper()
_volume = var.config.getfloat('bot', 'volume', fallback=0.8)
if var.db.has_option('bot', 'volume'):
_volume = var.db.getfloat('bot', 'volume')
self.volume_helper.set_volume(_volume)
self.is_ducking = False
self.on_ducking = False
self.ducking_release = time.time()
self.last_volume_cycle_time = time.time()
self._ducking_volume = 0
_ducking_volume = var.config.getfloat("bot", "ducking_volume", fallback=0.50)
_ducking_volume = var.db.getfloat("bot", "ducking_volume", fallback=_ducking_volume)
self.volume_helper.set_ducking_volume(_ducking_volume)
self.ducking_threshold = var.config.getfloat("bot", "ducking_threshold", fallback=5000)
self.ducking_threshold = var.db.getfloat("bot", "ducking_threshold", fallback=self.ducking_threshold)
if not var.db.has_option("bot", "ducking") and var.config.getboolean("bot", "ducking", fallback=False) \
or var.config.getboolean("bot", "ducking"):
self.is_ducking = True
self.mumble.callbacks.set_callback(pymumble.constants.PYMUMBLE_CLBK_SOUNDRECEIVED,
self.ducking_sound_received)
self.mumble.set_receive_sound(True)
assert var.config.get("bot", "when_nobody_in_channel") in ['pause', 'pause_resume', 'stop', 'nothing', ''], \
"Unknown action for when_nobody_in_channel"
if var.config.get("bot", "when_nobody_in_channel", fallback='') in ['pause', 'pause_resume', 'stop']:
user_change_callback = \
lambda user, action: threading.Thread(target=self.users_changed,
args=(user, action), daemon=True).start()
self.mumble.callbacks.set_callback(pymumble.constants.PYMUMBLE_CLBK_USERREMOVED, user_change_callback)
self.mumble.callbacks.set_callback(pymumble.constants.PYMUMBLE_CLBK_USERUPDATED, user_change_callback)
# Debug use
self._loop_status = 'Idle'
self._display_rms = False
self._max_rms = 0
self.redirect_ffmpeg_log = var.config.getboolean('debug', 'redirect_ffmpeg_log', fallback=True)
if var.config.getboolean("bot", "auto_check_update"):
def check_update():
nonlocal self
new_version, changelog = util.check_update(self.get_version())
if new_version:
self.send_channel_msg(tr('new_version_found', new_version=new_version, changelog=changelog))
th = threading.Thread(target=check_update, name="UpdateThread")
th.daemon = True
th.start()
last_startup_version = var.db.get("bot", "version", fallback=None)
if not last_startup_version or version.parse(last_startup_version) < version.parse(self.version):
var.db.set("bot", "version", self.version)
if var.config.getboolean("bot", "auto_check_update"):
changelog = util.fetch_changelog()
self.send_channel_msg(tr("update_successful", version=self.version, changelog=changelog))
# Set the CTRL+C shortcut
def ctrl_caught(self, signal, frame):
self.log.info(
"\nSIGINT caught, quitting, {} more to kill".format(2 - self.nb_exit))
if var.config.getboolean('bot', 'save_playlist', fallback=True) \
and var.config.get("bot", "save_music_library", fallback=True):
self.log.info("bot: save playlist into database")
var.playlist.save()
if self.nb_exit > 1:
self.log.info("Forced Quit")
sys.exit(0)
self.nb_exit += 1
self.exit = True
def get_version(self):
if self.version != "git":
return self.version
else:
return util.get_snapshot_version()
def register_command(self, cmd, handle, no_partial_match=False, access_outside_channel=False, admin=False):
cmds = cmd.split(",")
for command in cmds:
command = command.strip()
if command:
self.cmd_handle[command] = {'handle': handle,
'partial_match': not no_partial_match,
'access_outside_channel': access_outside_channel,
'admin': admin}
self.log.debug("bot: command added: " + command)
def set_comment(self):
self.mumble.users.myself.comment(var.config.get('bot', 'comment'))
def join_channel(self):
if self.channel:
if '/' in self.channel:
self.mumble.channels.find_by_tree(self.channel.split('/')).move_in()
else:
self.mumble.channels.find_by_name(self.channel).move_in()
# =======================
# Message
# =======================
# All text send to the chat is analysed by this function
def message_received(self, text):
raw_message = text.message.strip()
message = re.sub(r'<.*?>', '', raw_message)
if text.actor == 0:
# Some server will send a welcome message to the bot once connected.
# It doesn't have a valid "actor". Simply ignore it here.
return
user = self.mumble.users[text.actor]['name']
if var.config.getboolean('commands', 'split_username_at_space'):
# in can you use https://github.com/Natenom/mumblemoderator-module-collection/tree/master/os-suffixes ,
# you want to split the username
user = user.split()[0]
command_symbols = var.config.get('commands', 'command_symbol')
match = re.match(fr'^[{re.escape(command_symbols)}](?P<command>\S+)(?:\s(?P<argument>.*))?', message)
if match:
command = match.group("command").lower()
argument = match.group("argument") or ""
if not command:
return
self.log.info(f'bot: received command "{command}" with arguments "{argument}" from {user}')
# Anti stupid guy function
if not self.is_admin(user) and not var.config.getboolean('bot', 'allow_private_message') and text.session:
self.mumble.users[text.actor].send_text_message(
tr('pm_not_allowed'))
return
for i in var.db.items("user_ban"):
if user.lower() == i[0]:
self.mumble.users[text.actor].send_text_message(
tr('user_ban'))
return
if not self.is_admin(user) and argument:
input_url = util.get_url_from_input(argument)
if input_url and var.db.has_option('url_ban', input_url):
self.mumble.users[text.actor].send_text_message(
tr('url_ban'))
return
command_exc = ""
try:
if command in self.cmd_handle:
command_exc = command
else:
# try partial match
cmds = self.cmd_handle.keys()
matches = []
for cmd in cmds:
if cmd.startswith(command) and self.cmd_handle[cmd]['partial_match']:
matches.append(cmd)
if len(matches) == 1:
self.log.info("bot: {:s} matches {:s}".format(command, matches[0]))
command_exc = matches[0]
elif len(matches) > 1:
self.mumble.users[text.actor].send_text_message(
tr('which_command', commands="<br>".join(matches)))
return
else:
self.mumble.users[text.actor].send_text_message(
tr('bad_command', command=command))
return
if self.cmd_handle[command_exc]['admin'] and not self.is_admin(user):
self.mumble.users[text.actor].send_text_message(tr('not_admin'))
return
if not self.cmd_handle[command_exc]['access_outside_channel'] \
and not self.is_admin(user) \
and not var.config.getboolean('bot', 'allow_other_channel_message') \
and self.mumble.users[text.actor]['channel_id'] != self.mumble.users.myself['channel_id']:
self.mumble.users[text.actor].send_text_message(
tr('not_in_my_channel'))
return
self.cmd_handle[command_exc]['handle'](self, user, text, command_exc, argument)
except:
error_traceback = traceback.format_exc()
error = error_traceback.rstrip().split("\n")[-1]
self.log.error(f"bot: command {command_exc} failed with error: {error_traceback}\n")
self.send_msg(tr('error_executing_command', command=command_exc, error=error), text)
def send_msg(self, msg, text):
msg = msg.encode('utf-8', 'ignore').decode('utf-8')
# text if the object message, contain information if direct message or channel message
self.mumble.users[text.actor].send_text_message(msg)
def send_channel_msg(self, msg):
msg = msg.encode('utf-8', 'ignore').decode('utf-8')
own_channel = self.mumble.channels[self.mumble.users.myself['channel_id']]
own_channel.send_text_message(msg)
@staticmethod
def is_admin(user):
list_admin = var.config.get('bot', 'admin').rstrip().split(';')
if user in list_admin:
return True
else:
return False
# =======================
# Users changed
# =======================
def users_changed(self, user, message):
own_channel = self.mumble.channels[self.mumble.users.myself['channel_id']]
# only check if there is one more user currently in the channel
# else when the music is paused and somebody joins, music would start playing again
if len(own_channel.get_users()) == 2:
if var.config.get("bot", "when_nobody_in_channel") == "pause_resume":
self.resume()
elif var.config.get("bot", "when_nobody_in_channel") == "pause" and self.is_pause:
self.send_channel_msg(tr("auto_paused"))
elif len(own_channel.get_users()) == 1 and len(var.playlist) != 0:
# if the bot is the only user left in the channel and the playlist isn't empty
self.log.info('bot: Other users in the channel left. Stopping music now.')
if var.config.get("bot", "when_nobody_in_channel") == "stop":
self.clear()
else:
self.pause()
# =======================
# Launch and Download
# =======================
def launch_music(self, music_wrapper, start_from=0):
assert music_wrapper.is_ready()
uri = music_wrapper.uri()
self.log.info("bot: play music " + music_wrapper.format_debug_string())
if var.config.getboolean('bot', 'announce_current_music'):
self.send_channel_msg(music_wrapper.format_current_playing())
if var.config.getboolean('debug', 'ffmpeg'):
ffmpeg_debug = "debug"
else:
ffmpeg_debug = "warning"
channels = 2 if self.stereo else 1
self.pcm_buffer_size = 960 * channels
command = ("ffmpeg", '-v', ffmpeg_debug, '-nostdin', '-i',
uri, '-ss', f"{start_from:f}", '-ac', str(channels), '-f', 's16le', '-ar', '48000', '-')
self.log.debug("bot: execute ffmpeg command: " + " ".join(command))
# The ffmpeg process is a thread
# prepare pipe for catching stderr of ffmpeg
if self.redirect_ffmpeg_log:
pipe_rd, pipe_wd = util.pipe_no_wait() # Let the pipe work in non-blocking mode
self.thread_stderr = os.fdopen(pipe_rd)
else:
pipe_rd, pipe_wd = None, None
self.thread = sp.Popen(command, stdout=sp.PIPE, stderr=pipe_wd, bufsize=self.pcm_buffer_size)
def async_download_next(self):
# Function start if the next music isn't ready
# Do nothing in case the next music is already downloaded
self.log.debug("bot: Async download next asked ")
while var.playlist.next_item():
# usually, all validation will be done when adding to the list.
# however, for performance consideration, youtube playlist won't be validate when added.
# the validation has to be done here.
next = var.playlist.next_item()
try:
if not next.is_ready():
self.async_download(next)
break
except ValidationFailedError as e:
self.send_channel_msg(e.msg)
var.playlist.remove_by_id(next.id)
var.cache.free_and_delete(next.id)
def async_download(self, item):
th = threading.Thread(
target=self._download, name="Prepare-" + item.id[:7], args=(item,))
self.log.info(f"bot: start preparing item in thread: {item.format_debug_string()}")
th.daemon = True
th.start()
return th
def start_download(self, item):
if not item.is_ready():
self.log.info("bot: current music isn't ready, start downloading.")
self.async_download(item)
self.send_channel_msg(
tr('download_in_progress', item=item.format_title()))
def _download(self, item):
ver = item.version
try:
item.validate()
if item.is_ready():
return True
except ValidationFailedError as e:
self.send_channel_msg(e.msg)
var.playlist.remove_by_id(item.id)
var.cache.free_and_delete(item.id)
return False
try:
item.prepare()
if item.version > ver:
var.playlist.version += 1
return True
except PreparationFailedError as e:
self.send_channel_msg(e.msg)
return False
# =======================
# Loop
# =======================
# Main loop of the Bot
def loop(self):
while not self.exit and self.mumble.is_alive():
while self.thread and self.mumble.sound_output.get_buffer_size() > 0.5 and not self.exit:
# If the buffer isn't empty, I cannot send new music part, so I wait
self._loop_status = f'Wait for buffer {self.mumble.sound_output.get_buffer_size():.3f}'
time.sleep(0.01)
raw_music = None
if self.thread:
# I get raw from ffmpeg thread
# move playhead forward
self._loop_status = 'Reading raw'
if self.song_start_at == -1:
self.song_start_at = time.time() - self.playhead
self.playhead = time.time() - self.song_start_at
raw_music = self.thread.stdout.read(self.pcm_buffer_size)
self.read_pcm_size += len(raw_music)
if self.redirect_ffmpeg_log:
try:
self.last_ffmpeg_err = self.thread_stderr.readline()
if self.last_ffmpeg_err:
self.log.debug("ffmpeg: " + self.last_ffmpeg_err.strip("\n"))
except:
pass
if raw_music:
# Adjust the volume and send it to mumble
self.volume_cycle()
if not self.on_interrupting and len(raw_music) == self.pcm_buffer_size:
self.mumble.sound_output.add_sound(
audioop.mul(raw_music, 2, self.volume_helper.real_volume))
elif self.read_pcm_size == 0:
self.mumble.sound_output.add_sound(
audioop.mul(self._fadeout(raw_music, self.stereo, fadein=True), 2, self.volume_helper.real_volume))
elif self.on_interrupting or len(raw_music) < self.pcm_buffer_size:
self.mumble.sound_output.add_sound(
audioop.mul(self._fadeout(raw_music, self.stereo, fadein=False), 2, self.volume_helper.real_volume))
self.thread.kill()
self.thread = None
time.sleep(0.1)
self.on_interrupting = False
else:
time.sleep(0.1)
else:
time.sleep(0.1)
if not self.is_pause and not raw_music:
self.thread = None
# bot is not paused, but ffmpeg thread has gone.
# indicate that last song has finished, or the bot just resumed from pause, or something is wrong.
if self.read_pcm_size < self.pcm_buffer_size \
and var.playlist.current_index != -1 \
and self.last_ffmpeg_err:
current = var.playlist.current_item()
self.log.error("bot: cannot play music %s", current.format_debug_string())
self.log.error("bot: with ffmpeg error: %s", self.last_ffmpeg_err)
self.last_ffmpeg_err = ""
self.send_channel_msg(tr('unable_play', item=current.format_title()))
var.playlist.remove_by_id(current.id)
var.cache.free_and_delete(current.id)
# move to the next song.
if not self.wait_for_ready: # if wait_for_ready flag is not true, move to the next song.
if var.playlist.next():
current = var.playlist.current_item()
self.log.debug(f"bot: next into the song: {current.format_debug_string()}")
try:
self.start_download(current)
self.wait_for_ready = True
self.song_start_at = -1
self.playhead = 0
except ValidationFailedError as e:
self.send_channel_msg(e.msg)
var.playlist.remove_by_id(current.id)
var.cache.free_and_delete(current.id)
else:
self._loop_status = 'Empty queue'
else:
# if wait_for_ready flag is true, means the pointer is already
# pointing to target song. start playing
current = var.playlist.current_item()
if current:
if current.is_ready():
self.wait_for_ready = False
self.read_pcm_size = 0
self.launch_music(current, self.playhead)
self.last_volume_cycle_time = time.time()
self.async_download_next()
elif current.is_failed():
var.playlist.remove_by_id(current.id)
self.wait_for_ready = False
else:
self._loop_status = 'Wait for the next item to be ready'
else:
self.wait_for_ready = False
while self.mumble.sound_output.get_buffer_size() > 0 and self.mumble.is_alive():
# Empty the buffer before exit
time.sleep(0.01)
time.sleep(0.5)
if self.exit:
self._loop_status = "exited"
if var.config.getboolean('bot', 'save_playlist', fallback=True) \
and var.config.get("bot", "save_music_library", fallback=True):
self.log.info("bot: save playlist into database")
var.playlist.save()
def volume_cycle(self):
delta = time.time() - self.last_volume_cycle_time
if self.on_ducking and self.ducking_release < time.time():
self.on_ducking = False
self._max_rms = 0
if delta > 0.001:
if self.is_ducking and self.on_ducking:
self.volume_helper.real_volume = \
(self.volume_helper.real_volume - self.volume_helper.ducking_volume_set) * math.exp(- delta / 0.2) \
+ self.volume_helper.ducking_volume_set
else:
self.volume_helper.real_volume = self.volume_helper.volume_set - \
(self.volume_helper.volume_set - self.volume_helper.real_volume) * math.exp(- delta / 0.5)
self.last_volume_cycle_time = time.time()
def ducking_sound_received(self, user, sound):
rms = audioop.rms(sound.pcm, 2)
self._max_rms = max(rms, self._max_rms)
if self._display_rms:
if rms < self.ducking_threshold:
print('%6d/%6d ' % (rms, self._max_rms) + '-' * int(rms / 200), end='\r')
else:
print('%6d/%6d ' % (rms, self._max_rms) + '-' * int(self.ducking_threshold / 200)
+ '+' * int((rms - self.ducking_threshold) / 200), end='\r')
if rms > self.ducking_threshold:
if self.on_ducking is False:
self.log.debug("bot: ducking triggered")
self.on_ducking = True
self.ducking_release = time.time() + 1 # ducking release after 1s
def _fadeout(self, _pcm_data, stereo=False, fadein=False):
pcm_data = bytearray(_pcm_data)
if stereo:
if not fadein:
mask = [math.exp(-x / 60) for x in range(0, int(len(pcm_data) / 4))]
else:
mask = [math.exp(-x / 60) for x in reversed(range(0, int(len(pcm_data) / 4)))]
for i in range(int(len(pcm_data) / 4)):
pcm_data[4 * i:4 * i + 2] = struct.pack("<h",
round(struct.unpack("<h", pcm_data[4 * i:4 * i + 2])[0] * mask[i]))
pcm_data[4 * i + 2:4 * i + 4] = struct.pack("<h", round(
struct.unpack("<h", pcm_data[4 * i + 2:4 * i + 4])[0] * mask[i]))
else:
mask = [math.exp(-x / 60) for x in range(0, int(len(pcm_data) / 2))]
for i in range(int(len(pcm_data) / 2)):
pcm_data[2 * i:2 * i + 2] = struct.pack("<h",
round(struct.unpack("<h", pcm_data[2 * i:2 * i + 2])[0] * mask[i]))
return bytes(pcm_data) + bytes(len(pcm_data))
# =======================
# Play Control
# =======================
def play(self, index=-1, start_at=0):
if not self.is_pause:
self.interrupt()
if index != -1:
var.playlist.point_to(index)
current = var.playlist.current_item()
self.start_download(current)
self.is_pause = False
self.wait_for_ready = True
self.song_start_at = -1
self.playhead = start_at
def clear(self):
# Kill the ffmpeg thread and empty the playlist
self.interrupt()
var.playlist.clear()
self.wait_for_ready = False
self.log.info("bot: music stopped. playlist trashed.")
def stop(self):
self.interrupt()
self.is_pause = True
if len(var.playlist) > 0:
self.wait_for_ready = True
else:
self.wait_for_ready = False
self.log.info("bot: music stopped.")
def interrupt(self):
# Kill the ffmpeg thread
if self.thread:
self.on_interrupting = True
time.sleep(0.1)
self.song_start_at = -1
self.read_pcm_size = 0
def pause(self):
# Kill the ffmpeg thread
self.interrupt()
self.is_pause = True
self.song_start_at = -1
if len(var.playlist) > 0:
self.pause_at_id = var.playlist.current_item().id
self.log.info(f"bot: music paused at {self.playhead:.2f} seconds.")
def resume(self):
self.is_pause = False
if var.playlist.current_index == -1:
var.playlist.next()
self.playhead = 0
return
music_wrapper = var.playlist.current_item()
if not music_wrapper or not music_wrapper.id == self.pause_at_id or not music_wrapper.is_ready():
self.playhead = 0
return
self.wait_for_ready = True
self.pause_at_id = ""
def start_web_interface(addr, port):
global formatter
import interface
# setup logger
werkzeug_logger = logging.getLogger('werkzeug')
logfile = util.solve_filepath(var.config.get('webinterface', 'web_logfile'))
if logfile:
handler = logging.handlers.RotatingFileHandler(logfile, mode='a', maxBytes=10240) # Rotate after 10KB
else:
handler = logging.StreamHandler()
werkzeug_logger.addHandler(handler)
interface.init_proxy()
interface.web.env = 'development'
interface.web.secret_key = var.config.get('webinterface', 'flask_secret')
interface.web.run(port=port, host=addr)
if __name__ == '__main__':
supported_languages = util.get_supported_language()
parser = argparse.ArgumentParser(
description='Bot for playing music on Mumble')
# General arguments
parser.add_argument("--config", dest='config', type=str, default='configuration.ini',
help='Load configuration from this file. Default: configuration.ini')
parser.add_argument("--db", dest='db', type=str,
default=None, help='Settings database file')
parser.add_argument("--music-db", dest='music_db', type=str,
default=None, help='Music library database file')
parser.add_argument("--lang", dest='lang', type=str, default=None,
help='Preferred language. Support ' + ", ".join(supported_languages))
parser.add_argument("-q", "--quiet", dest="quiet",
action="store_true", help="Only Error logs")
parser.add_argument("-v", "--verbose", dest="verbose",
action="store_true", help="Show debug log")
# Mumble arguments
parser.add_argument("-s", "--server", dest="host",
type=str, help="Hostname of the Mumble server")
parser.add_argument("-u", "--user", dest="user",
type=str, help="Username for the bot")
parser.add_argument("-P", "--password", dest="password",
type=str, help="Server password, if required")
parser.add_argument("-T", "--tokens", dest="tokens",
type=str, help="Server tokens to enter a channel, if required (multiple entries separated with comma ','")
parser.add_argument("-p", "--port", dest="port",
type=int, help="Port for the Mumble server")
parser.add_argument("-c", "--channel", dest="channel",
type=str, help="Default channel for the bot")
parser.add_argument("-C", "--cert", dest="certificate",
type=str, default=None, help="Certificate file")
parser.add_argument("-b", "--bandwidth", dest="bandwidth",
type=int, help="Bandwidth used by the bot")
args = parser.parse_args()
# ======================
# Load Config
# ======================
config = configparser.ConfigParser(interpolation=None, allow_no_value=True)
var.config = config
parsed_configs = config.read([util.solve_filepath('configuration.default.ini'), util.solve_filepath(args.config)],
encoding='utf-8')
if len(parsed_configs) == 0:
logging.error('Could not read configuration from file \"{}\"'.format(args.config))
sys.exit()
# ======================
# Setup Logger
# ======================
bot_logger = logging.getLogger("bot")
bot_logger.setLevel(logging.INFO)
if args.verbose:
bot_logger.setLevel(logging.DEBUG)
bot_logger.debug("Starting in DEBUG loglevel")
elif args.quiet:
bot_logger.setLevel(logging.ERROR)
bot_logger.error("Starting in ERROR loglevel")
logfile = util.solve_filepath(var.config.get('bot', 'logfile').strip())
handler = None
if logfile:
print(f"Redirecting stdout and stderr to log file: {logfile}")
handler = logging.handlers.RotatingFileHandler(logfile, mode='a', maxBytes=10240) # Rotate after 10KB
if var.config.getboolean("bot", "redirect_stderr", fallback=False):
sys.stderr = util.LoggerIOWrapper(bot_logger, logging.INFO,
fallback_io_buffer=sys.stderr.buffer)
else:
handler = logging.StreamHandler()
util.set_logging_formatter(handler, bot_logger.level)
bot_logger.addHandler(handler)
logging.getLogger("root").addHandler(handler)
var.bot_logger = bot_logger
# ======================
# Load Database
# ======================
if args.user:
username = args.user
else:
username = var.config.get("bot", "username")
sanitized_username = "".join([x if x.isalnum() else "_" for x in username])
var.settings_db_path = args.db if args.db is not None else util.solve_filepath(
config.get("bot", "database_path", fallback=f"settings-{sanitized_username}.db"))
var.music_db_path = args.music_db if args.music_db is not None else util.solve_filepath(
config.get("bot", "music_database_path", fallback="music.db"))
var.db = SettingsDatabase(var.settings_db_path)
if var.config.get("bot", "save_music_library", fallback=True):
var.music_db = MusicDatabase(var.music_db_path)
else:
var.music_db = MusicDatabase(":memory:")
DatabaseMigration(var.db, var.music_db).migrate()
var.music_folder = util.solve_filepath(var.config.get('bot', 'music_folder'))
if not var.music_folder.endswith(os.sep):
# The file searching logic assumes that the music folder ends in a /
var.music_folder = var.music_folder + os.sep
var.tmp_folder = util.solve_filepath(var.config.get('bot', 'tmp_folder'))
# ======================
# Translation
# ======================
lang = ""
if args.lang:
lang = args.lang
else:
lang = var.config.get('bot', 'language', fallback='en_US')
if lang not in supported_languages:
raise KeyError(f"Unsupported language {lang}")
var.language = lang
constants.load_lang(lang)
# ======================
# Prepare Cache
# ======================
var.cache = MusicCache(var.music_db)
if var.config.getboolean("bot", "refresh_cache_on_startup", fallback=True):
var.cache.build_dir_cache()
# ======================
# Load playback mode
# ======================
playback_mode = None
if var.db.has_option("playlist", "playback_mode"):
playback_mode = var.db.get('playlist', 'playback_mode')
else:
playback_mode = var.config.get('bot', 'playback_mode', fallback="one-shot")
if playback_mode in ["one-shot", "repeat", "random", "autoplay"]:
var.playlist = media.playlist.get_playlist(playback_mode)
else:
raise KeyError(f"Unknown playback mode '{playback_mode}'")
# ======================
# Create bot instance
# ======================
var.bot = MumbleBot(args)
command.register_all_commands(var.bot)
# load playlist
if var.config.getboolean('bot', 'save_playlist', fallback=True):
var.bot_logger.info("bot: load playlist from previous session")
var.playlist.load()
# ============================
# Start the web interface
# ============================
if var.config.getboolean("webinterface", "enabled"):
wi_addr = var.config.get("webinterface", "listening_addr")
wi_port = var.config.getint("webinterface", "listening_port")
tt = threading.Thread(
target=start_web_interface, name="WebThread", args=(wi_addr, wi_port))
tt.daemon = True
bot_logger.info('Starting web interface on {}:{}'.format(wi_addr, wi_port))
tt.start()
# Start the main loop.
var.bot.loop()