# Multipart form library # Created 08 Nov, 2020 # v0.0.1 import os import io import sys import socket, ssl import urllib.parse import mimetypes import enum class FormContentType(enum.IntEnum): Void = 0 Text = 1 File = 2 class FormContent: def __init__(self, name, type=FormContentType.Void, data=None): self.name = name self.type = type self.size = 0 self.headers = [ 'Content-Disposition: form-data; name=\"%s\"' %(name) ] if type == FormContentType.Text: self.data = data self.size = len(data) elif type == FormContentType.File: basename = os.path.basename(data) self.data = data self.size = os.path.getsize(data) self.headers = [ 'Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"' %(name, basename) ] self.headers.append('Content-Type: %s' %(mimetypes.guess_type(data)[0]) or "application/octet-stream") class Form: def __init__(self): self.boundary = "666-devils-666" self.headers = [] self.contents = [] # array of FormContent self.calculateSize() def calculateSize(self): size = 0 boundaryLength = len(self.boundary) for content in self.contents: # add together the header info, boundaries and line endings lengths size += boundaryLength + 4 for header in content.headers: size += len(header) + 2 # terminating header chars size += 2 size += content.size size += 2 size += boundaryLength + 4 self.headers = [ "Content-Type: multipart/form-data; boundary=" + self.boundary, "Content-Length: " + str(size) ] def addText(self, name, text): self.contents.append(FormContent(name, FormContentType.Text, text)) self.calculateSize() def addFile(self, name, file): self.contents.append(FormContent(name, FormContentType.File, file)) self.calculateSize() class SocketFile(io.FileIO): def send(self, data): self.write(data) def recv(self, i): return "Nothing returned".encode() class BasicRequest: def post(self, url, form, service=None): if service == None: url = urllib.parse.urlparse(url) ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) service = ssl_context.wrap_socket(sock, server_hostname=url.netloc) service.connect((url.netloc, 443)) service.send(("POST %s HTTP/1.0\r\n" %(url.path)).encode()) service.send(("Host: %s\r\n" %(url.netloc)).encode()) print("Writing http request headers") self.writeHeaders(form.headers, service) service.send("\r\n".encode()) print("Writing body") self.writeContent(form, service) print("Waiting for response") result = self.parseResponse(service) service.close() return result def writeHeaders(self, headers, service): for header in headers: service.send(header.encode()) service.send("\r\n".encode()) def writeContent(self, form, service): for content in form.contents: service.send("--".encode()) service.send(form.boundary.encode()) service.send("\r\n".encode()) self.writeHeaders(content.headers, service) service.send("\r\n".encode()) if content.type == FormContentType.Text: service.send(content.data.encode()) elif content.type == FormContentType.File: print("writing file") f = open(content.data, "rb") while data := f.read(4096): service.send(data) f.close() service.send("\r\n".encode()) service.send("--".encode()) service.send(form.boundary.encode()) service.send("--".encode()) def parseResponse(self, service): status, headers, content = 0, {}, "" byteBlock = b"" byteBuffer = b"" while byteBuffer := service.recv(512): byteBlock += byteBuffer headerBlock, contentBlock = byteBlock.split(b"\r\n\r\n") status, *headerLines = headerBlock.split(b"\r\n") status = status.split(b" ")[1].decode() for line in headerLines: key, content = line.split(b": ") headers[key.decode()] = content.decode() content = contentBlock.decode() return { "status": status, "headers": headers, "content": content }