478 lines
16 KiB
Python
478 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import json
|
|
import subprocess
|
|
import zlib
|
|
from enum import IntEnum
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Generic, List, Optional, TypeVar, Union
|
|
|
|
import bms_ragnarock
|
|
import osutools
|
|
import PIL.Image
|
|
from constants import LOCALCACHE, PATH_OSU, PATH_RAGNAROCK, PREVIEW_DURATION
|
|
from models import JsonableMixin
|
|
|
|
_VT = TypeVar('_VT')
|
|
|
|
PREVIEW_OGG = 'preview.ogg'
|
|
THUMBNAIL_JPG = 'thumbail.jpg'
|
|
CACHED_JSON = 'cached.json'
|
|
CACHED_MIN_JSON = 'cached_min.json'
|
|
|
|
|
|
def make_audio_preview(audio: Path, preview: Path, start: float, duration: float):
|
|
if not preview.exists():
|
|
audio_duration = probe_audio_duration(audio)
|
|
start = min(max(0.0, start), audio_duration-duration/2)
|
|
fade_duration = 0.075
|
|
actual_duration = max(0.0, min(duration, audio_duration-start))
|
|
if actual_duration <= 2.0:
|
|
fade_duration = 0.0
|
|
preview_tmp = preview.with_suffix('.egg')
|
|
subprocess.run(
|
|
['ffmpeg', '-y',
|
|
'-v', 'quiet',
|
|
'-ss', str(start),
|
|
'-t', str(duration),
|
|
'-i', str(audio),
|
|
'-filter:a', f'afade=t=in:st=0:d={fade_duration},afade=t=out:st={actual_duration-fade_duration}:d={fade_duration}',
|
|
'-q', '5',
|
|
'-map_metadata', '-1',
|
|
'-vn', '-acodec', 'libvorbis',
|
|
'-f', 'ogg',
|
|
str(preview_tmp),
|
|
],
|
|
check=True,
|
|
)
|
|
preview_tmp.rename(preview)
|
|
|
|
|
|
def probe_audio_duration(audio: Path) -> float:
|
|
if not audio.exists():
|
|
return 0.0
|
|
r = subprocess.run(
|
|
['ffprobe',
|
|
'-v', 'quiet',
|
|
'-show_format',
|
|
'-print_format', 'json',
|
|
str(audio),
|
|
],
|
|
check=True,
|
|
stdout=subprocess.PIPE
|
|
)
|
|
return float(json.loads(r.stdout)['format']['duration'])
|
|
|
|
|
|
def get_osu_sections(osu_text: str) -> Dict[str, List[str]]:
|
|
osu_sections: Dict[str, List[str]] = {}
|
|
for section in filter(lambda a: str.startswith(a, '['), filter(len, map(str.strip, osu_text.split('\n\n')))):
|
|
header, *lines = section.splitlines()
|
|
osu_sections[header.strip('[]').strip()] = lines
|
|
return osu_sections
|
|
|
|
|
|
def convert_thumb_osu2ragna(beatmapset_osu: Path, ragna_beatmap_thumb: Path, osu_beatmap_sections: Dict[str, List[str]]) -> bool:
|
|
if not ragna_beatmap_thumb.exists():
|
|
imageline = next(filter(lambda x: x.startswith('0,0,'),
|
|
osu_beatmap_sections.get('Events', [])), None)
|
|
im = PIL.Image.new('RGB', (512, 512), 0)
|
|
if imageline is not None:
|
|
osu_bg = beatmapset_osu.joinpath(
|
|
imageline.split(',')[2].strip('"'))
|
|
if osu_bg.exists():
|
|
im = PIL.Image.open(str(osu_bg)).convert('RGB')
|
|
im = im_crop_square_center(im)
|
|
im.thumbnail((512, 512))
|
|
im.save(str(ragna_beatmap_thumb))
|
|
return False
|
|
|
|
|
|
def im_crop_square_center(im: PIL.Image.Image) -> PIL.Image.Image:
|
|
sx, sy = im.size
|
|
if sx != sy:
|
|
d = min(sx, sy)
|
|
if sx == d:
|
|
# L T R B
|
|
im = im.crop((
|
|
0,
|
|
int(sy/2 - d/2),
|
|
d,
|
|
int(sy/2 + d/2),
|
|
))
|
|
else:
|
|
# L T R B
|
|
im = im.crop((
|
|
int(sx/2 - d/2),
|
|
0,
|
|
int(sx/2 + d/2),
|
|
d,
|
|
))
|
|
return im
|
|
|
|
|
|
class RhythmGameMode(IntEnum):
|
|
osumania1k = 1
|
|
osumania2k = 2
|
|
osumania3k = 3
|
|
osumania4k = 4
|
|
osumania5k = 5
|
|
osumania6k = 6
|
|
osumania7k = 7
|
|
osumania8k = 8
|
|
osumania9k = 9
|
|
osumania10k = 10
|
|
osumania12k = 12
|
|
osumania14k = 14
|
|
osumania16k = 16
|
|
osumania18k = 18
|
|
osustd = 31
|
|
osutaiko = 41
|
|
osucatch = 51
|
|
osuddr4k = 61
|
|
ragnarock = 71
|
|
beatsaber = 81
|
|
outfoxdance = 91
|
|
outfoxpump = 92
|
|
outfoxsmx = 93
|
|
outfoxtechno = 94
|
|
outfoxbemu = 95
|
|
outfoxpomu = 96
|
|
outfoxgddm = 97
|
|
outfoxgdgf = 98
|
|
outfoxgh = 99
|
|
outfoxtaiko = 100
|
|
outfoxbongo = 101
|
|
outfoxpara = 102
|
|
outfoxkbx = 103
|
|
outfoxez2 = 104
|
|
outfoxds3ddx = 105
|
|
outfoxmaniax = 106
|
|
outfoxstepstage = 107
|
|
outfoxkickbox = 108
|
|
|
|
|
|
class DifficultyWrapperForRhythmGame(Generic[_VT], JsonableMixin):
|
|
def __init__(self,
|
|
title: str,
|
|
artist: str,
|
|
mapper: str,
|
|
difficulty: float,
|
|
difficulty_name: str,
|
|
preview: Union[Path, str],
|
|
thumbnail: Union[Path, str],
|
|
gamemode: RhythmGameMode,
|
|
url_reference: Optional[str] = None,
|
|
tid: Optional[int] = None,
|
|
sid: Optional[int] = None,
|
|
id: Optional[int] = None,
|
|
) -> None:
|
|
self.title = title
|
|
self.artist = artist
|
|
self.mapper = mapper
|
|
self.difficulty = difficulty
|
|
self.difficulty_name = difficulty_name
|
|
self.preview = Path(preview)
|
|
self.thumbnail = Path(thumbnail)
|
|
self.gamemode = gamemode
|
|
self.url_reference = url_reference
|
|
self.tid = tid or self.gen_tid()
|
|
self.sid = sid or self.gen_sid()
|
|
self.id = id or self.gen_id()
|
|
|
|
@classmethod
|
|
def from_lib(cls, o: _VT) -> List['DifficultyWrapperForRhythmGame']:
|
|
raise NotImplementedError
|
|
|
|
def gen_tid(self) -> int:
|
|
title = self.title
|
|
artist = self.artist
|
|
idtf = zlib.crc32(f'{artist}@{title}'.encode('utf-8'))
|
|
return idtf
|
|
|
|
def gen_sid(self) -> int:
|
|
title = self.title
|
|
artist = self.artist
|
|
mapper = self.mapper
|
|
idtf = zlib.crc32(f'{mapper}@{artist}@{title}'.encode('utf-8'))
|
|
return idtf
|
|
|
|
def gen_id(self) -> int:
|
|
diff_nu = self.difficulty
|
|
diff_nm = self.difficulty_name
|
|
gamemd = self.gamemode
|
|
df = zlib.crc32(
|
|
f'{self.gen_sid()}@{diff_nu}@{diff_nm}@{gamemd}'.encode('utf-8'))
|
|
return df
|
|
|
|
def saved_to_cache(self) -> 'DifficultyWrapperForRhythmGame':
|
|
LOCALCACHE.joinpath(str(self.gen_sid())).joinpath(f'info_{self.gen_id()}.json').write_text(
|
|
json.dumps(self.to_json(), indent=4),
|
|
)
|
|
return self
|
|
|
|
def to_json(self) -> dict[str, Any]:
|
|
return {
|
|
'title': self.title,
|
|
'artist': self.artist,
|
|
'mapper': self.mapper,
|
|
'difficulty': self.difficulty,
|
|
'difficulty_name': self.difficulty_name,
|
|
'preview': str(self.preview),
|
|
'thumbnail': str(self.thumbnail),
|
|
'gamemode': self.gamemode,
|
|
'url_reference': self.url_reference,
|
|
'tid': self.tid,
|
|
'sid': self.sid,
|
|
'id': self.id,
|
|
}
|
|
|
|
@classmethod
|
|
def from_json(cls, o: dict[str, Any]) -> 'DifficultyWrapperForRhythmGame':
|
|
return cls(**o)
|
|
|
|
|
|
class DifficultyWrapperForOsu(DifficultyWrapperForRhythmGame[osutools.map.LocalMap]):
|
|
@classmethod
|
|
def get_audio_file(cls, o: osutools.map.LocalMap) -> Path:
|
|
audiofp: Path = (o.client.osu_folder / "Songs" /
|
|
o.folder_name.strip() / o.audio_filename)
|
|
if not audiofp.is_file():
|
|
for potaf in audiofp.parent.iterdir():
|
|
if audiofp.name.lower() == potaf.name.lower():
|
|
audiofp = potaf
|
|
break
|
|
return audiofp
|
|
|
|
@classmethod
|
|
def get_mode(cls, o: osutools.map.LocalMap) -> RhythmGameMode:
|
|
mode = RhythmGameMode.osustd
|
|
match o.mode.value:
|
|
case 0:
|
|
mode = RhythmGameMode.osustd
|
|
case 1:
|
|
mode = RhythmGameMode.osutaiko
|
|
case 2:
|
|
mode = RhythmGameMode.osucatch
|
|
case 3:
|
|
match round(o.circle_size):
|
|
case 1:
|
|
mode = RhythmGameMode.osumania1k
|
|
case 2:
|
|
mode = RhythmGameMode.osumania2k
|
|
case 3:
|
|
mode = RhythmGameMode.osumania3k
|
|
case 4:
|
|
mode = RhythmGameMode.osumania4k
|
|
case 5:
|
|
mode = RhythmGameMode.osumania5k
|
|
case 6:
|
|
mode = RhythmGameMode.osumania6k
|
|
case 7:
|
|
mode = RhythmGameMode.osumania7k
|
|
case 8:
|
|
mode = RhythmGameMode.osumania8k
|
|
case 9:
|
|
mode = RhythmGameMode.osumania9k
|
|
case 10:
|
|
mode = RhythmGameMode.osumania10k
|
|
case 12:
|
|
mode = RhythmGameMode.osumania12k
|
|
case 14:
|
|
mode = RhythmGameMode.osumania14k
|
|
case 16:
|
|
mode = RhythmGameMode.osumania16k
|
|
case 18:
|
|
mode = RhythmGameMode.osumania18k
|
|
case _:
|
|
raise ValueError(
|
|
f'Unknown mania key count: {o.circle_size}')
|
|
case _:
|
|
raise ValueError(f'Unknown mode: {o.mode}')
|
|
return mode
|
|
|
|
@classmethod
|
|
def from_lib(cls, o: osutools.map.LocalMap) -> List['DifficultyWrapperForRhythmGame[osutools.map.LocalMap]']:
|
|
title = o.song_title or o.title_unicode
|
|
artist = o.artist or o.artist_unicode
|
|
creator = o.creator_name
|
|
idtf = zlib.crc32(f'{creator}@{artist}@{title}'.encode('utf-8'))
|
|
lcc = LOCALCACHE/str(idtf)
|
|
lcc.mkdir(parents=True, exist_ok=True)
|
|
difficulty_arr = [
|
|
o.standard_sr_ratings,
|
|
o.taiko_sr_ratings,
|
|
o.ctb_sr_ratings,
|
|
o.mania_sr_ratings,
|
|
]
|
|
difficulty = dict(difficulty_arr[o.mode.value]).get(
|
|
0, difficulty_arr[o.mode.value][0][-1])
|
|
difficultynm = str(o.difficulty_name)
|
|
prevaud = lcc.joinpath(PREVIEW_OGG)
|
|
thumb = lcc.joinpath(THUMBNAIL_JPG)
|
|
if not prevaud.is_file():
|
|
audiofp: Path = cls.get_audio_file(o)
|
|
if not audiofp.is_file():
|
|
return []
|
|
make_audio_preview(audiofp, prevaud, max(0.0,
|
|
o.preview_start/1000), PREVIEW_DURATION)
|
|
if not thumb.is_file():
|
|
bmfp: Path = (o.client.osu_folder / "Songs" /
|
|
o.folder_name.strip() / o.filename)
|
|
osu_sections = get_osu_sections(
|
|
bmfp.read_text(encoding='utf-8', errors='ignore'))
|
|
convert_thumb_osu2ragna(bmfp.parent, thumb, osu_sections)
|
|
mode = cls.get_mode(o)
|
|
if mode != RhythmGameMode.osumania4k:
|
|
return [cls(
|
|
title,
|
|
artist,
|
|
creator,
|
|
difficulty,
|
|
difficultynm,
|
|
prevaud,
|
|
thumb,
|
|
mode,
|
|
url_reference=f'https://osu.ppy.sh/beatmaps/{o.beatmap_id}',
|
|
)]
|
|
return [
|
|
cls(
|
|
title,
|
|
artist,
|
|
creator,
|
|
difficulty,
|
|
difficultynm,
|
|
prevaud,
|
|
thumb,
|
|
mode,
|
|
url_reference=f'https://osu.ppy.sh/beatmaps/{o.beatmap_id}',
|
|
),
|
|
cls(
|
|
title,
|
|
artist,
|
|
creator,
|
|
difficulty,
|
|
difficultynm,
|
|
prevaud,
|
|
thumb,
|
|
RhythmGameMode.osuddr4k,
|
|
url_reference=f'https://osu.ppy.sh/beatmaps/{o.beatmap_id}',
|
|
),
|
|
]
|
|
|
|
|
|
class DifficultyWrapperForRagnarock(DifficultyWrapperForRhythmGame[bms_ragnarock.RagnaRockDifficultyV1]):
|
|
@classmethod
|
|
def from_lib(cls, o: bms_ragnarock.RagnaRockDifficultyV1) -> List['DifficultyWrapperForRhythmGame[bms_ragnarock.RagnaRockDifficultyV1]']:
|
|
if o.parent is None:
|
|
raise ValueError(f'Parent is None on {o}')
|
|
p: bms_ragnarock.RagnaRockInfo = o.parent
|
|
if p.path is None:
|
|
raise ValueError(f'Beatmapset path is None on {p}')
|
|
title = p.songName
|
|
artist = p.songAuthorName
|
|
creator = p.levelAuthorName
|
|
idtf = zlib.crc32(f'{creator}@{artist}@{title}'.encode('utf-8'))
|
|
lcc = LOCALCACHE/str(idtf)
|
|
lcc.mkdir(parents=True, exist_ok=True)
|
|
p.difficultyBeatmapSets
|
|
difficulty = o.difficulty_rankf
|
|
difficultynm = o.difficulty_label
|
|
prevaud = lcc.joinpath(PREVIEW_OGG)
|
|
thumb = lcc.joinpath(THUMBNAIL_JPG)
|
|
if not prevaud.is_file():
|
|
if (ragnaprev := p.path.parent.joinpath(PREVIEW_OGG)).is_file():
|
|
prevaud.write_bytes(ragnaprev.read_bytes())
|
|
else:
|
|
songfp: Path = p.path.parent.joinpath(p.songFilename)
|
|
make_audio_preview(
|
|
songfp,
|
|
prevaud,
|
|
max(0.0, p.previewStartTime),
|
|
PREVIEW_DURATION)
|
|
if not thumb.is_file():
|
|
ifn: str = p.coverImageFilename
|
|
ifp: Path = p.path.parent/ifn
|
|
if ifp.is_file():
|
|
thumb.write_bytes(ifp.read_bytes())
|
|
else:
|
|
convert_thumb_osu2ragna(ifp.parent, thumb, {
|
|
'Events': [f'0,0,{ifn}']})
|
|
link: Optional[str] = None
|
|
if (idf := p.path.parent.joinpath('.id')).is_file():
|
|
idn = idf.read_text(encoding='utf-8', errors='ignore').strip()
|
|
link = f'https://ragnacustoms.com/songs/ddl/{idn}'
|
|
if (bml := p.customData.get('link2', None)):
|
|
link = bml
|
|
return [cls(
|
|
title,
|
|
artist,
|
|
creator,
|
|
difficulty,
|
|
difficultynm,
|
|
prevaud,
|
|
thumb,
|
|
RhythmGameMode.ragnarock,
|
|
url_reference=link,
|
|
)]
|
|
|
|
|
|
CROSSGAME_BEATMAPS: Optional[list[DifficultyWrapperForRhythmGame]] = None
|
|
|
|
|
|
def get_crossgame_beatmaps() -> list[DifficultyWrapperForRhythmGame]:
|
|
global CROSSGAME_BEATMAPS
|
|
if CROSSGAME_BEATMAPS is None:
|
|
CROSSGAME_BEATMAPS = load_beatmaps()
|
|
return CROSSGAME_BEATMAPS
|
|
|
|
|
|
def load_beatmaps() -> list[DifficultyWrapperForRhythmGame]:
|
|
if LOCALCACHE.joinpath(CACHED_JSON).is_file():
|
|
return [*map(DifficultyWrapperForRhythmGame.from_json, json.loads(
|
|
LOCALCACHE.joinpath(CACHED_JSON).read_text(encoding='utf-8')
|
|
))]
|
|
osu = osutools.OsuClientV1(None)
|
|
osu.set_osu_folder(PATH_OSU)
|
|
osu_maps: List[osutools.map.LocalMap] = [osu_map
|
|
for osu_map in osu.osu_db.map_list()
|
|
if osu_map.audio_filename != 'virtual' and
|
|
osu_map.audio_filename is not None]
|
|
del osu
|
|
|
|
ragna_mapsets = [bms_ragnarock.RagnaRockInfo.read_from(info)
|
|
for bms in PATH_RAGNAROCK.iterdir()
|
|
if bms.is_dir()
|
|
for info in bms.iterdir()
|
|
if info.is_file() and info.name.lower() == 'info.dat'
|
|
]
|
|
ragna_maps = [ragna_map
|
|
for ragna_mapset in ragna_mapsets
|
|
for ragna_map in ragna_mapset.difficultyBeatmapSets]
|
|
difficulties: list[DifficultyWrapperForRhythmGame] = [
|
|
*map(DifficultyWrapperForRhythmGame.saved_to_cache, [
|
|
x for y in (
|
|
*map(DifficultyWrapperForRagnarock.from_lib, ragna_maps),
|
|
*map(DifficultyWrapperForOsu.from_lib, osu_maps),
|
|
)
|
|
for x in y # type: ignore
|
|
if x is not None])
|
|
]
|
|
LOCALCACHE.joinpath(CACHED_JSON).write_text(
|
|
json.dumps(
|
|
[*map(DifficultyWrapperForRhythmGame.to_json, difficulties)],
|
|
indent=4),
|
|
encoding='utf-8',
|
|
)
|
|
LOCALCACHE.joinpath(CACHED_MIN_JSON).write_text(
|
|
json.dumps(
|
|
[*map(DifficultyWrapperForRhythmGame.to_json, difficulties)],
|
|
separators=(',', ':')),
|
|
encoding='utf-8',
|
|
)
|
|
return difficulties
|
|
|
|
|
|
get_crossgame_beatmaps()
|