173 lines
4.7 KiB
Python
Executable file
173 lines
4.7 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# hikari_uploader
|
|
# Copyright (C) 2024 odrling
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import hashlib
|
|
import os
|
|
from dataclasses import dataclass
|
|
from functools import cache
|
|
from io import BufferedIOBase, BufferedReader
|
|
from pathlib import Path
|
|
from typing import Literal, Optional
|
|
|
|
import httpx
|
|
import pyperclip
|
|
import typer
|
|
from rich.console import Console
|
|
from rich.progress import (
|
|
BarColumn,
|
|
DownloadColumn,
|
|
Progress,
|
|
TextColumn,
|
|
TimeRemainingColumn,
|
|
TransferSpeedColumn,
|
|
)
|
|
|
|
HIKARI_BASE = "https://hikari.butaishoujo.moe"
|
|
HIKARI_URL = f"{HIKARI_BASE}/upload"
|
|
|
|
console = Console()
|
|
|
|
|
|
@cache
|
|
def get_client() -> httpx.Client:
|
|
return httpx.Client()
|
|
|
|
|
|
@dataclass
|
|
class UploadResponse:
|
|
status: Literal["exists"] | Literal["uploaded"]
|
|
url: str
|
|
|
|
|
|
def check_hash(filename: str, file: Path):
|
|
h = hashlib.sha256()
|
|
with file.open('br') as f:
|
|
while buf := f.read(1024**2):
|
|
h.update(buf)
|
|
|
|
h.hexdigest()[:8]
|
|
url = f"{HIKARI_URL}/{h.hexdigest()[:8]}"
|
|
|
|
with file.open('br') as f:
|
|
files = {'file': (filename, f.read(2048))}
|
|
|
|
req = get_client().post(url, files=files)
|
|
if req.status_code == 404:
|
|
return
|
|
|
|
req.raise_for_status()
|
|
return UploadResponse(**req.json())
|
|
|
|
|
|
def output(copy: bool, resp: UploadResponse):
|
|
if copy:
|
|
pyperclip.copy(resp.url)
|
|
action = "Copied"
|
|
elif resp.status == "exists":
|
|
action = "Found"
|
|
elif resp.status == "uploaded":
|
|
action = "Uploaded"
|
|
else:
|
|
action = ""
|
|
|
|
console.print(action, resp.url, style="bold green")
|
|
|
|
|
|
@dataclass
|
|
class ProgressMonitor:
|
|
filename: str
|
|
file: BufferedReader
|
|
|
|
def __post_init__(self) -> None:
|
|
self.size = get_file_size(self.file)
|
|
self._file_read = self.file.read
|
|
self.file.read = self.read # type: ignore
|
|
self.progress = Progress(
|
|
TextColumn("[bold blue]{task.fields[filename]}", justify="right"),
|
|
BarColumn(bar_width=None),
|
|
"[progress.percentage]{task.percentage:>3.1f}%",
|
|
"•",
|
|
DownloadColumn(),
|
|
"•",
|
|
TransferSpeedColumn(),
|
|
"•",
|
|
TimeRemainingColumn(),
|
|
)
|
|
self.task = self.progress.add_task("[green]Uploading",
|
|
filename=self.filename,
|
|
total=self.size)
|
|
self.progress.start()
|
|
|
|
def read(self, size: Optional[int] = None, /) -> bytes:
|
|
data = self._file_read(size)
|
|
self.progress.update(self.task, completed=self.file.tell())
|
|
return data
|
|
|
|
|
|
def get_file_size(fd: BufferedIOBase) -> int:
|
|
fd.seek(0, os.SEEK_END)
|
|
size: int = fd.tell()
|
|
fd.seek(0)
|
|
return size
|
|
|
|
|
|
def upload(file: Path,
|
|
obstruct: bool = typer.Option(
|
|
False, "--obstruct", "-o",
|
|
help="Obstruct filename (by hashing)"
|
|
),
|
|
filename: str = typer.Option(
|
|
None, "--name", "-n",
|
|
help="Rename the uploaded file."
|
|
),
|
|
copy: bool = typer.Option(
|
|
False, "--copy", "-c",
|
|
help="Copy file URL to clipboard."
|
|
),
|
|
hash: bool = typer.Option(
|
|
True, "--hash/--no-hash", "-h/-H",
|
|
help="Check file hash before uploading."
|
|
)):
|
|
|
|
with get_client():
|
|
if copy:
|
|
pyperclip.copy("")
|
|
|
|
if obstruct:
|
|
filename = f"{hashlib.sha256(file.name.encode()).hexdigest()}{file.suffix}"
|
|
elif filename is None:
|
|
filename = file.name
|
|
|
|
if hash and (resp := check_hash(filename, file)):
|
|
return output(copy, resp)
|
|
|
|
with file.open('br') as f:
|
|
monitor = ProgressMonitor(filename, f)
|
|
files = {'file': (filename, f)}
|
|
|
|
timeout = httpx.Timeout(5.0, read=None)
|
|
req = get_client().post(HIKARI_URL, files=files, timeout=timeout)
|
|
monitor.progress.stop()
|
|
|
|
req.raise_for_status()
|
|
data = UploadResponse(**req.json())
|
|
output(copy, data)
|
|
|
|
|
|
def main():
|
|
typer.run(upload)
|