Use Sanic

This commit is contained in:
2019-03-08 18:35:21 +03:00
parent 6cf94b0efe
commit 9cc13191d4
16 changed files with 340 additions and 4590 deletions

View File

@@ -1,117 +1,57 @@
from bottle import route, request, run, redirect, template, default_app, abort
import os
from datetime import datetime
from mimetypes import guess_type
from os import environ
base_hosts = [
'drop.bakatrouble.pw',
'drop.bakatrouble.me',
'127.0.0.1.xip.io:8080',
]
from sanic import Sanic
from sanic.request import Request
from sanic.response import text, html, file_stream, redirect
from sanic.exceptions import abort
from utils import get_j2env, get_sort_icon, get_sort_link, resolve_path, list_dir
def format_size(size):
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB']:
if size < 2048:
return f'{size} {unit}'
else:
size //= 1024
return f'{size} PiB'
DEBUG = environ.get('ENV', '').upper() != 'PRODUCTION'
app = Sanic()
app.static('/~static/', '~static/', use_content_range=True, stream_large_files=True)
j2env = get_j2env(DEBUG)
def format_date(ts):
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
@app.route('/')
@app.route('/<path:path>')
async def index(request: Request, path=''):
domain = request.host
query = f'?{request.query_string}' if request.query_string else ''
def get_sort_icon(test, sort):
if sort == test:
return '<i class="sort descending icon"></i>'
elif sort == f'-{test}':
return '<i class="sort ascending icon"></i>'
else:
return ''
def get_sort_link(current, sort, hidden):
return f'?sort={"-" if current == sort else ""}{sort}{"&hidden" if hidden else ""}'
def get_file_icon(name):
mime = guess_type(name)[0]
if mime:
t = mime.split('/')[0]
if t in ['text', 'image', 'audio', 'video']:
return t
return ''
class ListEntry:
def __init__(self, dir, name):
path = os.path.join(dir, name)
self.name = name
self.isdir = os.path.isdir(path)
self.size = os.path.getsize(path)
self.created = os.path.getctime(path)
def resolve_path(domain, path):
if domain in base_hosts:
return os.path.join(os.path.dirname(__file__), 'files', *path.split('/'))
else:
for host in base_hosts:
if host in domain:
return os.path.join(os.path.dirname(__file__), 'subdomain_files',
domain[:domain.index(host)-1], *path.split('/'))
raise ValueError
def list_dir(dir, sort=None, hidden=False):
lst = [ListEntry(dir, name) for name in os.listdir(dir) if hidden or not name.startswith('.')]
lst = sorted(lst, key={
'name': lambda item: (not item.isdir, item.name.lower()),
'-name': lambda item: (item.isdir, item.name.lower()),
'size': lambda item: (not item.isdir, item.size),
'-size': lambda item: (item.isdir, item.size),
'created': lambda item: (not item.isdir, item.created),
'-created': lambda item: (item.isdir, item.created)
}[sort], reverse=sort.startswith('-'))
return lst
@route('/')
@route('/<path:path>')
def index(path=''):
domain = request.urlparts.netloc
query = request.urlparts.query
try:
resolved_path = resolve_path(domain, path)
resolved_path, resolved_query = resolve_path(domain, path)
except ValueError:
return 'GTFO'
if os.path.isdir(resolved_path):
return text('GTFO', 400)
if resolved_path.is_dir():
if path and path[-1] != '/':
return redirect(f'/{path}/')
hidden = 'hidden' in request.GET
sort = request.GET.get('sort', 'name')
hidden = request.args.get('hidden') is not None
sort = request.args.get('sort', 'name')
if sort not in ['name', '-name', 'size', '-size', 'created', '-created']:
sort = 'name'
return template('filelist',
lst=list_dir(resolved_path, sort, hidden),
format_size=format_size,
format_date=format_date,
get_sort_icon=get_sort_icon,
get_sort_link=get_sort_link,
sort=sort,
hidden=hidden,
path=path,
query=query,
guess_type=guess_type,
get_file_icon=get_file_icon)
else:
return abort(404)
return html(j2env.get_template('filelist.tpl').render(
lst=list_dir(resolved_path, sort, hidden, root=not resolved_query),
get_sort_icon=get_sort_icon,
get_sort_link=get_sort_link,
sort=sort,
hidden=hidden,
path=resolved_query,
query=query,
))
elif resolved_path.is_file():
return await file_stream(resolved_path)
abort(404, 'Path was not found')
application = default_app()
if __name__ == '__main__':
run(host='localhost', port=8080, debug=True, reloader=True)
if DEBUG:
app.run(host='localhost', port=8080, debug=True, auto_reload=True)
else:
app.run(sock='/tmp/drop.sock', workers=2)