219 lines
6.4 KiB
Python
219 lines
6.4 KiB
Python
# SPDX-License-Identifier: BSD-3-Clause
|
|
# Copyright (c) 2021, dsc@xmr.pm
|
|
|
|
import logging
|
|
import json
|
|
import os
|
|
import re
|
|
from typing import Optional, List
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
|
|
|
|
import mutagen
|
|
import aiofiles
|
|
|
|
from peewee import SqliteDatabase, SQL
|
|
import peewee as pw
|
|
|
|
from ircradio.youtube import YouTube
|
|
import settings
|
|
|
|
db = SqliteDatabase(f"{settings.cwd}/data/db.sqlite3")
|
|
|
|
|
|
class Ban(pw.Model):
|
|
id = pw.AutoField()
|
|
utube_id_or_nick = pw.CharField(index=True)
|
|
|
|
class Meta:
|
|
database = db
|
|
|
|
@dataclass
|
|
class SongMeta:
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
artist: Optional[str] = None
|
|
album: Optional[str] = None
|
|
album_cover: Optional[str] = None # path to image
|
|
bitrate: Optional[int] = None
|
|
length: Optional[float] = None
|
|
sample_rate: Optional[int] = None
|
|
channels: Optional[int] = None
|
|
mime: Optional[str] = None
|
|
|
|
|
|
class Song(pw.Model):
|
|
id = pw.AutoField()
|
|
date_added = pw.DateTimeField(default=datetime.now)
|
|
|
|
title = pw.CharField(index=True)
|
|
utube_id = pw.CharField(index=True, unique=True)
|
|
added_by = pw.CharField(index=True, constraints=[SQL('COLLATE NOCASE')]) # ILIKE index
|
|
duration = pw.IntegerField()
|
|
karma = pw.IntegerField(default=5, index=True)
|
|
banned = pw.BooleanField(default=False)
|
|
|
|
meta: SongMeta = None # directly from file (exif) or metadata json
|
|
path: Optional[str] = None
|
|
remaining: int = None # liquidsoap playing status in seconds
|
|
|
|
@staticmethod
|
|
def delete_song(utube_id: str) -> bool:
|
|
from ircradio.factory import app
|
|
try:
|
|
fn = f"{settings.dir_music}/{utube_id}.ogg"
|
|
Song.delete().where(Song.utube_id == utube_id).execute()
|
|
os.remove(fn)
|
|
except Exception as ex:
|
|
app.logger.error(f"{ex}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def search(needle: str, min_chars=3) -> List['Song']:
|
|
needle = needle.replace("%", "")
|
|
if len(needle) < min_chars:
|
|
raise Exception("Search too short. Wow. More typing plz. Much effort.")
|
|
|
|
if YouTube.is_valid_uid(needle):
|
|
try:
|
|
song = Song.select().filter(Song.utube_id == needle).get()
|
|
return [song]
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
q = Song.select().filter(Song.title ** f"%{needle}%")
|
|
return [s for s in q]
|
|
except:
|
|
pass
|
|
|
|
return []
|
|
|
|
@staticmethod
|
|
def by_uid(uid: str) -> Optional['Song']:
|
|
try:
|
|
return Song.select().filter(Song.utube_id == uid).get()
|
|
except:
|
|
pass
|
|
|
|
@staticmethod
|
|
def from_filepath(filepath: str) -> Optional['Song']:
|
|
fn = os.path.basename(filepath)
|
|
name, ext = fn.split(".", 1)
|
|
if not YouTube.is_valid_uid(name):
|
|
raise Exception("invalid youtube id")
|
|
try:
|
|
return Song.select().filter(utube_id=name).get()
|
|
except:
|
|
return Song.auto_create_from_filepath(filepath)
|
|
|
|
@staticmethod
|
|
def auto_create_from_filepath(filepath: str) -> Optional['Song']:
|
|
from ircradio.factory import app
|
|
fn = os.path.basename(filepath)
|
|
uid, ext = fn.split(".", 1)
|
|
if not YouTube.is_valid_uid(uid):
|
|
raise Exception("invalid youtube id")
|
|
|
|
metadata = YouTube.metadata_from_filepath(filepath)
|
|
if not metadata:
|
|
return
|
|
|
|
app.logger.info(f"auto-creating for {fn}")
|
|
|
|
try:
|
|
song = Song.create(
|
|
duration=metadata['duration'],
|
|
title=metadata['name'],
|
|
added_by='radio',
|
|
karma=5,
|
|
utube_id=uid)
|
|
return song
|
|
except Exception as ex:
|
|
app.logger.error(f"{ex}")
|
|
pass
|
|
|
|
|
|
async def scan(self, path: str = None):
|
|
# update with metadata, etc.
|
|
if path is None:
|
|
path = self.filepath
|
|
|
|
if not os.path.exists(path):
|
|
raise Exception(f"filepath {path} does not exist")
|
|
basename = os.path.splitext(os.path.basename(path))[0]
|
|
|
|
self.meta = SongMeta()
|
|
self.path = path
|
|
|
|
# EXIF direct
|
|
from ircradio.utils import mutagen_file
|
|
_m = await mutagen_file(path)
|
|
|
|
if _m:
|
|
if _m.info:
|
|
self.meta.channels = _m.info.channels
|
|
self.meta.length = int(_m.info.length)
|
|
if hasattr(_m.info, 'bitrate'):
|
|
self.meta.bitrate = _m.info.bitrate
|
|
if hasattr(_m.info, 'sample_rate'):
|
|
self.meta.sample_rate = _m.info.sample_rate
|
|
if _m.tags:
|
|
if hasattr(_m.tags, 'as_dict'):
|
|
_tags = _m.tags.as_dict()
|
|
else:
|
|
try:
|
|
_tags = {k: v for k, v in _m.tags.items()}
|
|
except:
|
|
_tags = {}
|
|
|
|
for k, v in _tags.items():
|
|
if isinstance(v, list):
|
|
v = v[0]
|
|
elif isinstance(v, (str, int, float)):
|
|
pass
|
|
else:
|
|
continue
|
|
|
|
if k in ["title", "description", "language", "date", "purl", "artist"]:
|
|
if hasattr(self.meta, k):
|
|
setattr(self.meta, k, v)
|
|
|
|
# yt-dlp metadata json file
|
|
fn_utube_meta = os.path.join(settings.dir_meta, f"{basename}.info.json")
|
|
utube_meta = {}
|
|
if os.path.exists(fn_utube_meta):
|
|
async with aiofiles.open(fn_utube_meta, mode="r") as f:
|
|
try:
|
|
utube_meta = json.loads(await f.read())
|
|
except Exception as ex:
|
|
logging.error(f"could not parse {fn_utube_meta}, {ex}")
|
|
|
|
if utube_meta:
|
|
# utube_meta file does not have anything we care about
|
|
pass
|
|
|
|
if not self.title and not self.meta.title:
|
|
# just adopt filename
|
|
self.title = os.path.basename(self.path)
|
|
|
|
return self
|
|
|
|
|
|
@property
|
|
def filepath(self):
|
|
"""Absolute"""
|
|
return os.path.join(settings.dir_music, f"{self.utube_id}.ogg")
|
|
|
|
@property
|
|
def filepath_noext(self):
|
|
"""Absolute filepath without extension ... maybe"""
|
|
try:
|
|
return os.path.splitext(self.filepath)[0]
|
|
except:
|
|
return self.filepath
|
|
|
|
class Meta:
|
|
database = db
|