server/server/executor.py

94 lines
3.5 KiB
Python

#!/usr/bin/env python3
import os
import re
from unipath import Path
class Executor:
"""This class executes commands recieved by the server"""
def __init__(self, baseDir: str):
self.currentDirectory = ""
self.baseDir = baseDir + os.path.sep
def sanitizeDirectory(self, inDirectory: str) -> str:
return re.sub('[^a-zA-Z0-9]', '', inDirectory)
def sanitizeFile(self, inFile: str) -> str:
return re.sub('[^a-zA-Z0-9].[^a-zA-Z0-9]', '', inFile)
def createDirectory(self, dirName: str) -> str:
dirName = self.sanitizeDirectory(dirName)
actualDirName: str = os.path.join(self.baseDir,self.currentDirectory,dirName)
os.mkdir(actualDirName)
return actualDirName
def removeDirectory(self, dirName: str) -> str:
dirName = self.sanitizeDirectory(dirName)
actualDirName: str = os.path.join(self.baseDir, self.currentDirectory, dirName)
if actualDirName:
os.rmdir(actualDirName)
return actualDirName
def getCurrentDirectory(self) -> str:
return self.currentDirectory
def setCurrentDirectory(self, dirName: str) -> str:
if dirName == "..":
p = Path(os.path.join(self.baseDir, self.currentDirectory))
parentpath = p.parent
if (str(parentpath) + os.path.sep)== self.baseDir:
self.currentDirectory = ""
return self.currentDirectory
else:
if len(str(parentpath).split('/')) < len(self.baseDir.split('/')):
return self.currentDirectory
newpath = str(parentpath).replace(self.baseDir,'')
if os.path.exists(os.path.join(self.baseDir,newpath)):
self.currentDirectory = newpath
return self.currentDirectory
else:
dirName = self.sanitizeDirectory(dirName)
joinedDir = os.path.join(self.currentDirectory, dirName)
if os.path.join(self.baseDir, joinedDir):
self.currentDirectory = joinedDir
return self.currentDirectory
def listCurrentDirectoryContent(self) -> str:
contents = os.listdir(os.path.join(self.baseDir, self.currentDirectory))
strdirectory = ""
for content in contents:
strdirectory += content + ", "
strdirectory = strdirectory[:-1]
return strdirectory
def putFileInCurrentDirectory(self, filename: str, content: bytes) -> str:
filename = self.sanitizeFile(filename)
currenctfile = os.path.join(self.baseDir, self.currentDirectory, filename)
f = open(currenctfile, "wb")
f.write(content)
f.close()
return currenctfile
def getFileInCurrentDirectory(self, file: str) -> bytes:
file = self.sanitizeFile(file)
currentfile = os.path.join(self.baseDir, self.currentDirectory, file)
if os.path.exists(currentfile):
f = open(currentfile, "rb")
content = f.read()
f.close()
return content
else:
raise Exception('File not found')
def removeFileInCurrentDirectory(self, file: str) -> str:
file = self.sanitizeFile(file)
if self.currentDirectory == "":
currentfile = os.path.join(self.baseDir, file)
else:
currentfile = os.path.join(self.baseDir, self.currentDirectory, file)
if os.path.exists(currentfile):
os.remove(currentfile)
return currentfile