Skip to content

Commit

Permalink
fix(content): path traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
esoadamo committed Nov 18, 2024
1 parent a971c2b commit 5ccf799
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 41 deletions.
122 changes: 88 additions & 34 deletions endpoint/content.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,58 @@
import os
import magic
import multipart
from typing import Optional
from pathlib import Path

import falcon
from falcon.request import datetime

import model
import util
from db import session
from util.logger import audit_log
from util.task import time_published


class Content(object):

# Smaze adresarovou strukturu rekurzivne od nejvic zanoreneho
# dokud jsou adresare prazdne.
def _delete_tree(self, path):
if os.listdir(path) != []:
if os.listdir(path):
return
try:
os.rmdir(path)
self._delete_tree(os.path.dirname(path))
except:
except Exception:
return

# GET na content vraci
# a) soubor, pokud je v \path cesta k souboru,
# b) obsah adresare, pokud je v \path cesta k adresari.
def on_get(self, req, resp):
if req.get_param('path'):
shortPath = req.get_param('path').replace('..', '')
short_path = req.get_param('path').replace('..', '')
else:
shortPath = "."
filePath = 'data/content/' + shortPath
short_path = "."
file_path = str(self.__parse_short_path(req, resp, req.context['user']))
if not file_path:
return

if os.path.isdir(filePath):
if os.path.isdir(file_path):
req.context['result'] = {
'content': util.content.dir_to_json(shortPath)
'content': util.content.dir_to_json(short_path)
}
return

if not os.path.isfile(filePath):
if not os.path.isfile(file_path):
req.context['result'] = {
'content': util.content.empty_content(shortPath)
'content': util.content.empty_content(short_path)
}
return

resp.content_type = magic.Magic(mime=True).from_file(filePath)
resp.set_stream(open(filePath, 'rb'), os.path.getsize(filePath))
resp.content_type = magic.Magic(mime=True).from_file(file_path)
resp.set_stream(open(file_path, 'rb'), os.path.getsize(file_path))

def on_post(self, req, resp):
user = req.context['user']
Expand All @@ -58,11 +68,9 @@ def on_post(self, req, resp):
resp.status = falcon.HTTP_400
return

if req.get_param('path'):
shortPath = req.get_param('path').replace('..', '')
else:
shortPath = "."
dirPath = 'data/content/' + shortPath
dir_path = str(self.__parse_short_path(req, resp, user))
if dir_path is None:
return

if not req.content_length:
resp.status = falcon.HTTP_411
Expand All @@ -82,15 +90,15 @@ def on_post(self, req, resp):
"No boundary for multipart/form-data.")

try:
if not os.path.isdir(dirPath):
os.makedirs(dirPath)
if not os.path.isdir(dir_path):
os.makedirs(dir_path)

for part in multipart.MultipartParser(
req.stream, boundary, req.content_length,
2**30, 2**20, 2**18, 2**16, 'utf-8'):
path = '%s/%s' % (dirPath, part.filename)
path = '%s/%s' % (dir_path, part.filename)
part.save_as(path)
except:
except Exception:
resp.status = falcon.HTTP_500
raise

Expand All @@ -104,32 +112,65 @@ def on_delete(self, req, resp):
resp.status = falcon.HTTP_400
return

if req.get_param('path'):
shortPath = req.get_param('path').replace('..', '')
else:
shortPath = "."
filePath = 'data/content/' + shortPath
file_path = self.__parse_short_path(req, resp, user)
if not file_path:
return

if not os.path.isfile(filePath):
if not os.path.isfile(file_path):
resp.status = falcon.HTTP_404
return

try:
os.remove(filePath)
self._delete_tree(os.path.dirname(filePath))
except:
os.remove(file_path)
self._delete_tree(os.path.dirname(file_path))
except Exception:
resp.status = falcon.HTTP_500
raise

resp.status = falcon.HTTP_200
req.context['result'] = {}

@staticmethod
def __parse_short_path(req, resp, user) -> Optional[Path]:
path_base = (Path('data') / 'content').absolute()

if req.get_param('path'):
short_path = req.get_param('path')
else:
short_path = "."

file_path = (path_base / Path(short_path)).absolute()
if not file_path.is_relative_to(path_base):
audit_log(
scope="HACK",
user_id=user.id if user.is_logged_in() else None,
message=f"Attempt to access content outside box",
message_meta={
'path': short_path
}
)
resp.status = falcon.HTTP_404
return None
return file_path


class TaskContent(object):

def on_get(self, req, resp, id, view):
user = req.context['user']

# TODO: Enable after frontend is sending auth token in headers
# if time_published(id) > datetime.now() and not user.is_org() and not user.is_tester():
# req.context['result'] = {
# 'errors': [{
# 'status': '403',
# 'title': 'Forbidden',
# 'detail': 'Obsah úlohy ještě nebyl zveřejněn.'
# }]
# }
# resp.status = falcon.HTTP_403
# return

if (view != 'icon' and not view.startswith('reseni')
and not view.startswith('zadani')):
resp.status = falcon.HTTP_400
Expand All @@ -140,12 +181,25 @@ def on_get(self, req, resp, id, view):
resp.status = falcon.HTTP_400
return

filePath = 'data/task-content/' + id + '/' + view + '/' + \
path_param.replace('..', '')
base_path = (Path('data') / 'task-content' / Path(str(id)).name / Path(view).name).absolute()
file_path = (base_path / path_param).absolute()
if not file_path.is_relative_to(base_path):
resp.status = falcon.HTTP_404
audit_log(
scope="HACK",
user_id=user.id if user.is_logged_in() else None,
message=f"Attempt to access task content outside box",
message_meta={
'id': id,
'view': view,
'path': path_param
}
)
return

if not os.path.isfile(filePath):
if not os.path.isfile(file_path):
resp.status = falcon.HTTP_404
return

resp.content_type = magic.Magic(mime=True).from_file(filePath)
resp.set_stream(open(filePath, 'rb'), os.path.getsize(filePath))
resp.content_type = magic.Magic(mime=True).from_file(file_path)
resp.set_stream(open(file_path, 'rb'), os.path.getsize(file_path))
27 changes: 21 additions & 6 deletions util/content.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
from typing import List, TypedDict
from typing import List, TypedDict, Union
from pathlib import Path

from util.logger import audit_log


class Content(TypedDict):
Expand All @@ -12,16 +15,28 @@ def empty_content(path: str) -> Content:
return {'id': path, 'files': [], 'dirs': []}


def dir_to_json(path: str) -> Content:
path_full = os.path.join('data', 'content', path)
def dir_to_json(path: Union[str, Path]) -> Content:
path_base = Path('data', 'content').absolute()
path_full = (path_base / path).absolute() if isinstance(path, str) or not path.is_absolute() else path

if not path_full.is_relative_to(path_base):
audit_log(
scope="HACK",
user_id=None,
message=f"Attempt to access content outside box using dir_to_json",
message_meta={
'path': path
}
)
return empty_content(path)

if os.path.isdir(path_full):
return {
'id': path,
'id': str(path.relative_to(path_base)),
'files': [f for f in os.listdir(path_full)
if os.path.isfile(path_full+'/'+f)],
if (path_full / f).is_file()],
'dirs': [f for f in os.listdir(path_full)
if os.path.isdir(path_full+'/'+f)]
if (path_full / f).is_dir()]
}
else:
return empty_content(path)
2 changes: 1 addition & 1 deletion util/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_log() -> Logger:
return logging.getLogger('gunicorn.error')


def audit_log(scope: str, user_id: int, message: str, message_meta: Optional[dict] = None, year_id: Optional[int] = None) -> None:
def audit_log(scope: str, user_id: Optional[int], message: str, message_meta: Optional[dict] = None, year_id: Optional[int] = None) -> None:
log_db = AuditLog(
scope=scope,
user_id=user_id,
Expand Down

0 comments on commit 5ccf799

Please sign in to comment.