botamusique/database.py

689 lines
23 KiB
Python

import os
import re
import sqlite3
import json
import datetime
import time
import logging
log = logging.getLogger("bot")
class DatabaseError(Exception):
pass
class Condition:
def __init__(self):
self.filler = []
self._sql = ""
self._limit = 0
self._offset = 0
self._order_by = ""
self._desc = ""
self.has_regex = False
pass
def sql(self, conn: sqlite3.Connection = None):
sql = self._sql
if not self._sql:
sql = "1"
if self._order_by:
sql += f" ORDER BY {self._order_by}"
if self._desc:
sql += " DESC"
if self._limit:
sql += f" LIMIT {self._limit}"
if self._offset:
sql += f" OFFSET {self._offset}"
if self.has_regex and conn:
conn.create_function("REGEXP", 2, self._regexp)
return sql
@staticmethod
def _regexp(expr, item):
if not item:
return False
reg = re.compile(expr)
return reg.search(item) is not None
def or_equal(self, column, equals_to, case_sensitive=True):
if not case_sensitive:
column = f"LOWER({column})"
equals_to = equals_to.lower()
if self._sql:
self._sql += f" OR {column}=?"
else:
self._sql += f"{column}=?"
self.filler.append(equals_to)
return self
def and_equal(self, column, equals_to, case_sensitive=True):
if not case_sensitive:
column = f"LOWER({column})"
equals_to = equals_to.lower()
if self._sql:
self._sql += f" AND {column}=?"
else:
self._sql += f"{column}=?"
self.filler.append(equals_to)
return self
def or_like(self, column, equals_to, case_sensitive=True):
if not case_sensitive:
column = f"LOWER({column})"
equals_to = equals_to.lower()
if self._sql:
self._sql += f" OR {column} LIKE ?"
else:
self._sql += f"{column} LIKE ?"
self.filler.append(equals_to)
return self
def and_like(self, column, equals_to, case_sensitive=True):
if not case_sensitive:
column = f"LOWER({column})"
equals_to = equals_to.lower()
if self._sql:
self._sql += f" AND {column} LIKE ?"
else:
self._sql += f"{column} LIKE ?"
self.filler.append(equals_to)
return self
def and_regexp(self, column, regex):
self.has_regex = True
if self._sql:
self._sql += f" AND {column} REGEXP ?"
else:
self._sql += f"{column} REGEXP ?"
self.filler.append(regex)
return self
def or_regexp(self, column, regex):
self.has_regex = True
if self._sql:
self._sql += f" OR {column} REGEXP ?"
else:
self._sql += f"{column} REGEXP ?"
self.filler.append(regex)
return self
def or_sub_condition(self, sub_condition):
if sub_condition.has_regex:
self.has_regex = True
self.filler.extend(sub_condition.filler)
if self._sql:
self._sql += f" OR ({sub_condition.sql(None)})"
else:
self._sql += f"({sub_condition.sql(None)})"
return self
def or_not_sub_condition(self, sub_condition):
if sub_condition.has_regex:
self.has_regex = True
self.filler.extend(sub_condition.filler)
if self._sql:
self._sql += f" OR NOT ({sub_condition.sql(None)})"
else:
self._sql += f"NOT ({sub_condition.sql(None)})"
return self
def and_sub_condition(self, sub_condition):
if sub_condition.has_regex:
self.has_regex = True
self.filler.extend(sub_condition.filler)
if self._sql:
self._sql += f" AND ({sub_condition.sql(None)})"
else:
self._sql += f"({sub_condition.sql(None)})"
return self
def and_not_sub_condition(self, sub_condition):
if sub_condition.has_regex:
self.has_regex = True
self.filler.extend(sub_condition.filler)
if self._sql:
self._sql += f" AND NOT({sub_condition.sql(None)})"
else:
self._sql += f"NOT ({sub_condition.sql(None)})"
return self
def limit(self, limit):
self._limit = limit
return self
def offset(self, offset):
self._offset = offset
return self
def order_by(self, order_by, desc=False):
self._order_by = order_by
self._desc = desc
return self
SETTING_DB_VERSION = 2
MUSIC_DB_VERSION = 4
class SettingsDatabase:
def __init__(self, db_path):
self.db_path = db_path
def get(self, section, option, **kwargs):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?",
(section, option)).fetchall()
conn.close()
if len(result) > 0:
return result[0][0]
else:
if 'fallback' in kwargs:
return kwargs['fallback']
else:
raise DatabaseError("Item not found")
def getboolean(self, section, option, **kwargs):
return bool(int(self.get(section, option, **kwargs)))
def getfloat(self, section, option, **kwargs):
return float(self.get(section, option, **kwargs))
def getint(self, section, option, **kwargs):
return int(self.get(section, option, **kwargs))
def set(self, section, option, value):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("INSERT OR REPLACE INTO botamusique (section, option, value) "
"VALUES (?, ?, ?)", (section, option, value))
conn.commit()
conn.close()
def has_option(self, section, option):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?",
(section, option)).fetchall()
conn.close()
if len(result) > 0:
return True
else:
return False
def remove_option(self, section, option):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM botamusique WHERE section=? AND option=?", (section, option))
conn.commit()
conn.close()
def remove_section(self, section):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM botamusique WHERE section=?", (section,))
conn.commit()
conn.close()
def items(self, section):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
results = cursor.execute("SELECT option, value FROM botamusique WHERE section=?", (section,)).fetchall()
conn.close()
if len(results) > 0:
return list(map(lambda v: (v[0], v[1]), results))
else:
return []
def drop_table(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DROP TABLE botamusique")
conn.close()
class MusicDatabase:
def __init__(self, db_path):
self.db_path = db_path
def insert_music(self, music_dict, _conn=None):
conn = sqlite3.connect(self.db_path) if _conn is None else _conn
cursor = conn.cursor()
id = music_dict['id']
title = music_dict['title']
type = music_dict['type']
path = music_dict['path'] if 'path' in music_dict else ''
keywords = music_dict['keywords']
tags_list = list(dict.fromkeys(music_dict['tags']))
tags = ''
if tags_list:
tags = ",".join(tags_list) + ","
del music_dict['id']
del music_dict['title']
del music_dict['type']
del music_dict['tags']
if 'path' in music_dict:
del music_dict['path']
del music_dict['keywords']
existed = cursor.execute("SELECT 1 FROM music WHERE id=?", (id,)).fetchall()
if len(existed) == 0:
cursor.execute(
"INSERT INTO music (id, type, title, metadata, tags, path, keywords) VALUES (?, ?, ?, ?, ?, ?, ?)",
(id,
type,
title,
json.dumps(music_dict),
tags,
path,
keywords))
else:
cursor.execute("UPDATE music SET type=:type, title=:title, metadata=:metadata, tags=:tags, "
"path=:path, keywords=:keywords WHERE id=:id",
{'id': id,
'type': type,
'title': title,
'metadata': json.dumps(music_dict),
'tags': tags,
'path': path,
'keywords': keywords})
if not _conn:
conn.commit()
conn.close()
def query_music_ids(self, condition: Condition):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
results = cursor.execute("SELECT id FROM music WHERE id != 'info' AND %s" %
condition.sql(conn), condition.filler).fetchall()
conn.close()
return list(map(lambda i: i[0], results))
def query_all_paths(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
results = cursor.execute("SELECT path FROM music WHERE id != 'info' AND type = 'file'").fetchall()
conn.close()
paths = []
for result in results:
if result and result[0]:
paths.append(result[0])
return paths
def query_all_tags(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
results = cursor.execute("SELECT tags FROM music WHERE id != 'info'").fetchall()
tags = []
for result in results:
for tag in result[0].strip(",").split(","):
if tag and tag not in tags:
tags.append(tag)
conn.close()
return tags
def query_music_count(self, condition: Condition):
filler = condition.filler
conn = sqlite3.connect(self.db_path)
condition_str = condition.sql(conn)
cursor = conn.cursor()
results = cursor.execute("SELECT COUNT(*) FROM music "
"WHERE id != 'info' AND %s" % condition_str, filler).fetchall()
conn.close()
return results[0][0]
def query_music(self, condition: Condition, _conn=None):
filler = condition.filler
conn = sqlite3.connect(self.db_path) if _conn is None else _conn
condition_str = condition.sql(conn)
cursor = conn.cursor()
results = cursor.execute("SELECT id, type, title, metadata, tags, path, keywords FROM music "
"WHERE id != 'info' AND %s" % condition_str, filler).fetchall()
if not _conn:
conn.close()
return self._result_to_dict(results)
def _query_music_by_plain_sql_cond(self, sql_cond, _conn=None):
conn = sqlite3.connect(self.db_path) if _conn is None else _conn
cursor = conn.cursor()
results = cursor.execute("SELECT id, type, title, metadata, tags, path, keywords FROM music "
"WHERE id != 'info' AND %s" % sql_cond).fetchall()
if not _conn:
conn.close()
return self._result_to_dict(results)
def query_music_by_id(self, _id, _conn=None):
results = self.query_music(Condition().and_equal("id", _id), _conn)
if results:
return results[0]
else:
return None
def query_music_by_keywords(self, keywords, _conn=None):
condition = Condition()
for keyword in keywords:
condition.and_like("title", f"%{keyword}%", case_sensitive=False)
return self.query_music(condition, _conn)
def query_music_by_tags(self, tags, _conn=None):
condition = Condition()
for tag in tags:
condition.and_like("tags", f"%{tag},%", case_sensitive=False)
return self.query_music(condition, _conn)
def manage_special_tags(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("UPDATE music SET tags=REPLACE(tags, 'recent added,', '') WHERE tags LIKE '%recent added,%' "
"AND create_at <= DATETIME('now', '-1 day') AND id != 'info'")
cursor.execute("UPDATE music SET tags=tags||'recent added,' WHERE tags NOT LIKE '%recent added,%' "
"AND create_at > DATETIME('now', '-1 day') AND id != 'info'")
conn.commit()
conn.close()
def query_tags(self, condition: Condition):
# TODO: Can we keep a index of tags?
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
results = cursor.execute("SELECT id, tags FROM music "
"WHERE id != 'info' AND %s" % condition.sql(conn), condition.filler).fetchall()
conn.close()
lookup = {}
if len(results) > 0:
for result in results:
id = result[0]
tags = result[1].strip(",").split(",")
lookup[id] = tags if tags[0] else []
return lookup
def query_random_music(self, count, condition: Condition = None):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
results = []
if condition is None:
condition = Condition().and_not_sub_condition(Condition().and_equal('id', 'info'))
results = cursor.execute("SELECT id, type, title, metadata, tags, path, keywords FROM music "
"WHERE id IN (SELECT id FROM music WHERE %s ORDER BY RANDOM() LIMIT ?) "
"ORDER BY RANDOM()"
% condition.sql(conn), condition.filler + [count]).fetchall()
conn.close()
return self._result_to_dict(results)
def _result_to_dict(self, results):
if len(results) > 0:
music_dicts = []
for result in results:
music_dict = json.loads(result[3])
music_dict['type'] = result[1]
music_dict['title'] = result[2]
music_dict['id'] = result[0]
music_dict['tags'] = result[4].strip(",").split(",") if result[4] else []
music_dict['path'] = result[5]
music_dict['keywords'] = result[6]
music_dicts.append(music_dict)
return music_dicts
else:
return []
def delete_music(self, condition: Condition):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM music "
"WHERE %s" % condition.sql(conn), condition.filler)
conn.commit()
conn.close()
def drop_table(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DROP TABLE music")
conn.close()
class DatabaseMigration:
def __init__(self, settings_db: SettingsDatabase, music_db: MusicDatabase):
self.settings_db = settings_db
self.music_db = music_db
self.settings_table_migrate_func = {0: self.settings_table_migrate_from_0_to_1,
1: self.settings_table_migrate_from_1_to_2}
self.music_table_migrate_func = {0: self.music_table_migrate_from_0_to_1,
1: self.music_table_migrate_from_1_to_2,
2: self.music_table_migrate_from_2_to_4,
3: self.music_table_migrate_from_2_to_4
}
def migrate(self):
self.settings_database_migrate()
self.music_database_migrate()
def settings_database_migrate(self):
conn = sqlite3.connect(self.settings_db.db_path)
cursor = conn.cursor()
if self.has_table('botamusique', conn):
current_version = 0
ver = cursor.execute("SELECT value FROM botamusique WHERE section='bot' "
"AND option='db_version'").fetchone()
if ver:
current_version = int(ver[0])
if current_version == SETTING_DB_VERSION:
conn.close()
return
else:
log.info(
f"database: migrating from settings table version {current_version} to {SETTING_DB_VERSION}...")
while current_version < SETTING_DB_VERSION:
log.debug(f"database: migrate step {current_version}/{SETTING_DB_VERSION - 1}")
current_version = self.settings_table_migrate_func[current_version](conn)
log.info(f"database: migration done.")
cursor.execute("UPDATE botamusique SET value=? "
"WHERE section='bot' AND option='db_version'", (SETTING_DB_VERSION,))
else:
log.info(f"database: no settings table found. Creating settings table version {SETTING_DB_VERSION}.")
self.create_settings_table_version_2(conn)
conn.commit()
conn.close()
def music_database_migrate(self):
conn = sqlite3.connect(self.music_db.db_path)
cursor = conn.cursor()
if self.has_table('music', conn):
current_version = 0
ver = cursor.execute("SELECT title FROM music WHERE id='info'").fetchone()
if ver:
current_version = int(ver[0])
if current_version == MUSIC_DB_VERSION:
conn.close()
return
else:
log.info(f"database: migrating from music table version {current_version} to {MUSIC_DB_VERSION}...")
while current_version < MUSIC_DB_VERSION:
log.debug(f"database: migrate step {current_version}/{MUSIC_DB_VERSION - 1}")
current_version = self.music_table_migrate_func[current_version](conn)
log.info(f"database: migration done.")
cursor.execute("UPDATE music SET title=? "
"WHERE id='info'", (MUSIC_DB_VERSION,))
else:
log.info(f"database: no music table found. Creating music table version {MUSIC_DB_VERSION}.")
self.create_music_table_version_4(conn)
conn.commit()
conn.close()
def has_table(self, table, conn):
cursor = conn.cursor()
tables = cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table,)).fetchall()
if len(tables) == 0:
return False
return True
def create_settings_table_version_2(self, conn):
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS botamusique ("
"section TEXT, "
"option TEXT, "
"value TEXT, "
"UNIQUE(section, option))")
cursor.execute("INSERT INTO botamusique (section, option, value) "
"VALUES (?, ?, ?)", ("bot", "db_version", 2))
conn.commit()
return 1
def create_music_table_version_1(self, conn):
cursor = conn.cursor()
cursor.execute("CREATE TABLE music ("
"id TEXT PRIMARY KEY, "
"type TEXT, "
"title TEXT, "
"keywords TEXT, "
"metadata TEXT, "
"tags TEXT, "
"path TEXT, "
"create_at DATETIME DEFAULT CURRENT_TIMESTAMP"
")")
cursor.execute("INSERT INTO music (id, title) "
"VALUES ('info', ?)", (MUSIC_DB_VERSION,))
conn.commit()
def create_music_table_version_4(self, conn):
self.create_music_table_version_1(conn)
def settings_table_migrate_from_0_to_1(self, conn):
cursor = conn.cursor()
cursor.execute("DROP TABLE botamusique")
conn.commit()
self.create_settings_table_version_2(conn)
return 2 # return new version number
def settings_table_migrate_from_1_to_2(self, conn):
cursor = conn.cursor()
# move music database into a separated file
if self.has_table('music', conn) and not os.path.exists(self.music_db.db_path):
log.info(f"database: move music db into separated file.")
cursor.execute(f"ATTACH DATABASE '{self.music_db.db_path}' AS music_db")
cursor.execute(f"SELECT sql FROM sqlite_master "
f"WHERE type='table' AND name='music'")
sql_create_table = cursor.fetchone()[0]
sql_create_table = sql_create_table.replace("music", "music_db.music")
cursor.execute(sql_create_table)
cursor.execute("INSERT INTO music_db.music SELECT * FROM music")
conn.commit()
cursor.execute("DETACH DATABASE music_db")
cursor.execute("DROP TABLE music")
cursor.execute("UPDATE botamusique SET value=2 "
"WHERE section='bot' AND option='db_version'")
return 2 # return new version number
def music_table_migrate_from_0_to_1(self, conn):
cursor = conn.cursor()
cursor.execute("ALTER TABLE music RENAME TO music_old")
conn.commit()
self.create_music_table_version_1(conn)
cursor.execute("INSERT INTO music (id, type, title, metadata, tags)"
"SELECT id, type, title, metadata, tags FROM music_old")
cursor.execute("DROP TABLE music_old")
conn.commit()
return 1 # return new version number
def music_table_migrate_from_1_to_2(self, conn):
items_to_update = self.music_db.query_music(Condition(), conn)
for item in items_to_update:
item['keywords'] = item['title']
if 'artist' in item:
item['keywords'] += ' ' + item['artist']
tags = []
for tag in item['tags']:
if tag:
tags.append(tag)
item['tags'] = tags
self.music_db.insert_music(item)
conn.commit()
return 2 # return new version number
def music_table_migrate_from_2_to_4(self, conn):
items_to_update = self.music_db.query_music(Condition(), conn)
for item in items_to_update:
if 'duration' not in item:
item['duration'] = 0
if item['type'] == 'url' or item['type'] == "url_from_playlist":
item['duration'] = item['duration'] * 60
self.music_db.insert_music(item)
conn.commit()
return 4 # return new version number