site_liveq/liveq/bms.py

478 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import json
import subprocess
import zlib
from enum import IntEnum
from pathlib import Path
from typing import Any, Dict, Generic, List, Optional, TypeVar, Union
import bms_ragnarock
import osutools
import PIL.Image
from constants import LOCALCACHE, PATH_OSU, PATH_RAGNAROCK, PREVIEW_DURATION
from models import JsonableMixin
_VT = TypeVar('_VT')
PREVIEW_OGG = 'preview.ogg'
THUMBNAIL_JPG = 'thumbail.jpg'
CACHED_JSON = 'cached.json'
CACHED_MIN_JSON = 'cached_min.json'
def make_audio_preview(audio: Path, preview: Path, start: float, duration: float):
if not preview.exists():
audio_duration = probe_audio_duration(audio)
start = min(max(0.0, start), audio_duration-duration/2)
fade_duration = 0.075
actual_duration = max(0.0, min(duration, audio_duration-start))
if actual_duration <= 2.0:
fade_duration = 0.0
preview_tmp = preview.with_suffix('.egg')
subprocess.run(
['ffmpeg', '-y',
'-v', 'quiet',
'-ss', str(start),
'-t', str(duration),
'-i', str(audio),
'-filter:a', f'afade=t=in:st=0:d={fade_duration},afade=t=out:st={actual_duration-fade_duration}:d={fade_duration}',
'-q', '5',
'-map_metadata', '-1',
'-vn', '-acodec', 'libvorbis',
'-f', 'ogg',
str(preview_tmp),
],
check=True,
)
preview_tmp.rename(preview)
def probe_audio_duration(audio: Path) -> float:
if not audio.exists():
return 0.0
r = subprocess.run(
['ffprobe',
'-v', 'quiet',
'-show_format',
'-print_format', 'json',
str(audio),
],
check=True,
stdout=subprocess.PIPE
)
return float(json.loads(r.stdout)['format']['duration'])
def get_osu_sections(osu_text: str) -> Dict[str, List[str]]:
osu_sections: Dict[str, List[str]] = {}
for section in filter(lambda a: str.startswith(a, '['), filter(len, map(str.strip, osu_text.split('\n\n')))):
header, *lines = section.splitlines()
osu_sections[header.strip('[]').strip()] = lines
return osu_sections
def convert_thumb_osu2ragna(beatmapset_osu: Path, ragna_beatmap_thumb: Path, osu_beatmap_sections: Dict[str, List[str]]) -> bool:
if not ragna_beatmap_thumb.exists():
imageline = next(filter(lambda x: x.startswith('0,0,'),
osu_beatmap_sections.get('Events', [])), None)
im = PIL.Image.new('RGB', (512, 512), 0)
if imageline is not None:
osu_bg = beatmapset_osu.joinpath(
imageline.split(',')[2].strip('"'))
if osu_bg.exists():
im = PIL.Image.open(str(osu_bg)).convert('RGB')
im = im_crop_square_center(im)
im.thumbnail((512, 512))
im.save(str(ragna_beatmap_thumb))
return False
def im_crop_square_center(im: PIL.Image.Image) -> PIL.Image.Image:
sx, sy = im.size
if sx != sy:
d = min(sx, sy)
if sx == d:
# L T R B
im = im.crop((
0,
int(sy/2 - d/2),
d,
int(sy/2 + d/2),
))
else:
# L T R B
im = im.crop((
int(sx/2 - d/2),
0,
int(sx/2 + d/2),
d,
))
return im
class RhythmGameMode(IntEnum):
osumania1k = 1
osumania2k = 2
osumania3k = 3
osumania4k = 4
osumania5k = 5
osumania6k = 6
osumania7k = 7
osumania8k = 8
osumania9k = 9
osumania10k = 10
osumania12k = 12
osumania14k = 14
osumania16k = 16
osumania18k = 18
osustd = 31
osutaiko = 41
osucatch = 51
osuddr4k = 61
ragnarock = 71
beatsaber = 81
outfoxdance = 91
outfoxpump = 92
outfoxsmx = 93
outfoxtechno = 94
outfoxbemu = 95
outfoxpomu = 96
outfoxgddm = 97
outfoxgdgf = 98
outfoxgh = 99
outfoxtaiko = 100
outfoxbongo = 101
outfoxpara = 102
outfoxkbx = 103
outfoxez2 = 104
outfoxds3ddx = 105
outfoxmaniax = 106
outfoxstepstage = 107
outfoxkickbox = 108
class DifficultyWrapperForRhythmGame(Generic[_VT], JsonableMixin):
def __init__(self,
title: str,
artist: str,
mapper: str,
difficulty: float,
difficulty_name: str,
preview: Union[Path, str],
thumbnail: Union[Path, str],
gamemode: RhythmGameMode,
url_reference: Optional[str] = None,
tid: Optional[int] = None,
sid: Optional[int] = None,
id: Optional[int] = None,
) -> None:
self.title = title
self.artist = artist
self.mapper = mapper
self.difficulty = difficulty
self.difficulty_name = difficulty_name
self.preview = Path(preview)
self.thumbnail = Path(thumbnail)
self.gamemode = gamemode
self.url_reference = url_reference
self.tid = tid or self.gen_tid()
self.sid = sid or self.gen_sid()
self.id = id or self.gen_id()
@classmethod
def from_lib(cls, o: _VT) -> List['DifficultyWrapperForRhythmGame']:
raise NotImplementedError
def gen_tid(self) -> int:
title = self.title
artist = self.artist
idtf = zlib.crc32(f'{artist}@{title}'.encode('utf-8'))
return idtf
def gen_sid(self) -> int:
title = self.title
artist = self.artist
mapper = self.mapper
idtf = zlib.crc32(f'{mapper}@{artist}@{title}'.encode('utf-8'))
return idtf
def gen_id(self) -> int:
diff_nu = self.difficulty
diff_nm = self.difficulty_name
gamemd = self.gamemode
df = zlib.crc32(
f'{self.gen_sid()}@{diff_nu}@{diff_nm}@{gamemd}'.encode('utf-8'))
return df
def saved_to_cache(self) -> 'DifficultyWrapperForRhythmGame':
LOCALCACHE.joinpath(str(self.gen_sid())).joinpath(f'info_{self.gen_id()}.json').write_text(
json.dumps(self.to_json(), indent=4),
)
return self
def to_json(self) -> dict[str, Any]:
return {
'title': self.title,
'artist': self.artist,
'mapper': self.mapper,
'difficulty': self.difficulty,
'difficulty_name': self.difficulty_name,
'preview': str(self.preview),
'thumbnail': str(self.thumbnail),
'gamemode': self.gamemode,
'url_reference': self.url_reference,
'tid': self.tid,
'sid': self.sid,
'id': self.id,
}
@classmethod
def from_json(cls, o: dict[str, Any]) -> 'DifficultyWrapperForRhythmGame':
return cls(**o)
class DifficultyWrapperForOsu(DifficultyWrapperForRhythmGame[osutools.map.LocalMap]):
@classmethod
def get_audio_file(cls, o: osutools.map.LocalMap) -> Path:
audiofp: Path = (o.client.osu_folder / "Songs" /
o.folder_name.strip() / o.audio_filename)
if not audiofp.is_file():
for potaf in audiofp.parent.iterdir():
if audiofp.name.lower() == potaf.name.lower():
audiofp = potaf
break
return audiofp
@classmethod
def get_mode(cls, o: osutools.map.LocalMap) -> RhythmGameMode:
mode = RhythmGameMode.osustd
match o.mode.value:
case 0:
mode = RhythmGameMode.osustd
case 1:
mode = RhythmGameMode.osutaiko
case 2:
mode = RhythmGameMode.osucatch
case 3:
match round(o.circle_size):
case 1:
mode = RhythmGameMode.osumania1k
case 2:
mode = RhythmGameMode.osumania2k
case 3:
mode = RhythmGameMode.osumania3k
case 4:
mode = RhythmGameMode.osumania4k
case 5:
mode = RhythmGameMode.osumania5k
case 6:
mode = RhythmGameMode.osumania6k
case 7:
mode = RhythmGameMode.osumania7k
case 8:
mode = RhythmGameMode.osumania8k
case 9:
mode = RhythmGameMode.osumania9k
case 10:
mode = RhythmGameMode.osumania10k
case 12:
mode = RhythmGameMode.osumania12k
case 14:
mode = RhythmGameMode.osumania14k
case 16:
mode = RhythmGameMode.osumania16k
case 18:
mode = RhythmGameMode.osumania18k
case _:
raise ValueError(
f'Unknown mania key count: {o.circle_size}')
case _:
raise ValueError(f'Unknown mode: {o.mode}')
return mode
@classmethod
def from_lib(cls, o: osutools.map.LocalMap) -> List['DifficultyWrapperForRhythmGame[osutools.map.LocalMap]']:
title = o.song_title or o.title_unicode
artist = o.artist or o.artist_unicode
creator = o.creator_name
idtf = zlib.crc32(f'{creator}@{artist}@{title}'.encode('utf-8'))
lcc = LOCALCACHE/str(idtf)
lcc.mkdir(parents=True, exist_ok=True)
difficulty_arr = [
o.standard_sr_ratings,
o.taiko_sr_ratings,
o.ctb_sr_ratings,
o.mania_sr_ratings,
]
difficulty = dict(difficulty_arr[o.mode.value]).get(
0, difficulty_arr[o.mode.value][0][-1])
difficultynm = str(o.difficulty_name)
prevaud = lcc.joinpath(PREVIEW_OGG)
thumb = lcc.joinpath(THUMBNAIL_JPG)
if not prevaud.is_file():
audiofp: Path = cls.get_audio_file(o)
if not audiofp.is_file():
return []
make_audio_preview(audiofp, prevaud, max(0.0,
o.preview_start/1000), PREVIEW_DURATION)
if not thumb.is_file():
bmfp: Path = (o.client.osu_folder / "Songs" /
o.folder_name.strip() / o.filename)
osu_sections = get_osu_sections(
bmfp.read_text(encoding='utf-8', errors='ignore'))
convert_thumb_osu2ragna(bmfp.parent, thumb, osu_sections)
mode = cls.get_mode(o)
if mode != RhythmGameMode.osumania4k:
return [cls(
title,
artist,
creator,
difficulty,
difficultynm,
prevaud,
thumb,
mode,
url_reference=f'https://osu.ppy.sh/beatmaps/{o.beatmap_id}',
)]
return [
cls(
title,
artist,
creator,
difficulty,
difficultynm,
prevaud,
thumb,
mode,
url_reference=f'https://osu.ppy.sh/beatmaps/{o.beatmap_id}',
),
cls(
title,
artist,
creator,
difficulty,
difficultynm,
prevaud,
thumb,
RhythmGameMode.osuddr4k,
url_reference=f'https://osu.ppy.sh/beatmaps/{o.beatmap_id}',
),
]
class DifficultyWrapperForRagnarock(DifficultyWrapperForRhythmGame[bms_ragnarock.RagnaRockDifficultyV1]):
@classmethod
def from_lib(cls, o: bms_ragnarock.RagnaRockDifficultyV1) -> List['DifficultyWrapperForRhythmGame[bms_ragnarock.RagnaRockDifficultyV1]']:
if o.parent is None:
raise ValueError(f'Parent is None on {o}')
p: bms_ragnarock.RagnaRockInfo = o.parent
if p.path is None:
raise ValueError(f'Beatmapset path is None on {p}')
title = p.songName
artist = p.songAuthorName
creator = p.levelAuthorName
idtf = zlib.crc32(f'{creator}@{artist}@{title}'.encode('utf-8'))
lcc = LOCALCACHE/str(idtf)
lcc.mkdir(parents=True, exist_ok=True)
p.difficultyBeatmapSets
difficulty = o.difficulty_rankf
difficultynm = o.difficulty_label
prevaud = lcc.joinpath(PREVIEW_OGG)
thumb = lcc.joinpath(THUMBNAIL_JPG)
if not prevaud.is_file():
if (ragnaprev := p.path.parent.joinpath(PREVIEW_OGG)).is_file():
prevaud.write_bytes(ragnaprev.read_bytes())
else:
songfp: Path = p.path.parent.joinpath(p.songFilename)
make_audio_preview(
songfp,
prevaud,
max(0.0, p.previewStartTime),
PREVIEW_DURATION)
if not thumb.is_file():
ifn: str = p.coverImageFilename
ifp: Path = p.path.parent/ifn
if ifp.is_file():
thumb.write_bytes(ifp.read_bytes())
else:
convert_thumb_osu2ragna(ifp.parent, thumb, {
'Events': [f'0,0,{ifn}']})
link: Optional[str] = None
if (idf := p.path.parent.joinpath('.id')).is_file():
idn = idf.read_text(encoding='utf-8', errors='ignore').strip()
link = f'https://ragnacustoms.com/songs/ddl/{idn}'
if (bml := p.customData.get('link2', None)):
link = bml
return [cls(
title,
artist,
creator,
difficulty,
difficultynm,
prevaud,
thumb,
RhythmGameMode.ragnarock,
url_reference=link,
)]
CROSSGAME_BEATMAPS: Optional[list[DifficultyWrapperForRhythmGame]] = None
def get_crossgame_beatmaps() -> list[DifficultyWrapperForRhythmGame]:
global CROSSGAME_BEATMAPS
if CROSSGAME_BEATMAPS is None:
CROSSGAME_BEATMAPS = load_beatmaps()
return CROSSGAME_BEATMAPS
def load_beatmaps() -> list[DifficultyWrapperForRhythmGame]:
if LOCALCACHE.joinpath(CACHED_JSON).is_file():
return [*map(DifficultyWrapperForRhythmGame.from_json, json.loads(
LOCALCACHE.joinpath(CACHED_JSON).read_text(encoding='utf-8')
))]
osu = osutools.OsuClientV1(None)
osu.set_osu_folder(PATH_OSU)
osu_maps: List[osutools.map.LocalMap] = [osu_map
for osu_map in osu.osu_db.map_list()
if osu_map.audio_filename != 'virtual' and
osu_map.audio_filename is not None]
del osu
ragna_mapsets = [bms_ragnarock.RagnaRockInfo.read_from(info)
for bms in PATH_RAGNAROCK.iterdir()
if bms.is_dir()
for info in bms.iterdir()
if info.is_file() and info.name.lower() == 'info.dat'
]
ragna_maps = [ragna_map
for ragna_mapset in ragna_mapsets
for ragna_map in ragna_mapset.difficultyBeatmapSets]
difficulties: list[DifficultyWrapperForRhythmGame] = [
*map(DifficultyWrapperForRhythmGame.saved_to_cache, [
x for y in (
*map(DifficultyWrapperForRagnarock.from_lib, ragna_maps),
*map(DifficultyWrapperForOsu.from_lib, osu_maps),
)
for x in y # type: ignore
if x is not None])
]
LOCALCACHE.joinpath(CACHED_JSON).write_text(
json.dumps(
[*map(DifficultyWrapperForRhythmGame.to_json, difficulties)],
indent=4),
encoding='utf-8',
)
LOCALCACHE.joinpath(CACHED_MIN_JSON).write_text(
json.dumps(
[*map(DifficultyWrapperForRhythmGame.to_json, difficulties)],
separators=(',', ':')),
encoding='utf-8',
)
return difficulties
get_crossgame_beatmaps()