Last active
June 22, 2022 08:32
-
-
Save evanjuang/6a673c832776b0bebafa5df2dcad6efc to your computer and use it in GitHub Desktop.
python - requests utils
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import io | |
| import tarfile | |
| from pathlib import Path | |
| import magic | |
| import requests | |
| def is_file_exist(url: str) -> bool: | |
| try: | |
| resp = requests.head(url) | |
| resp.raise_for_status() | |
| return True | |
| except Exception: | |
| return False | |
| def is_url(url: str) -> bool: | |
| try: | |
| u = urlparse(url) | |
| return all([u.scheme, u.netloc]) | |
| except ValueError: | |
| return False | |
| def get_tarfile_type(mime: str) -> str: | |
| return { | |
| 'application/x-xz': 'xz', | |
| 'application/gzip': 'gz' | |
| }[mime] | |
| def download(src: str) -> requests.Response: | |
| try: | |
| r = requests.get(src, stream=True) | |
| r.raise_for_status() | |
| return r | |
| except (requests.HTTPError, ConnectionError): | |
| raise RuntimeError('Failed to downlaod file') | |
| def extract(src: str, dest: str): | |
| try: | |
| Path(dest).mkdir(parents=True, exist_ok=True) | |
| r = download(src) | |
| content = io.BytesIO(r.content) | |
| mime_type = magic.from_buffer(content.read(1024), mime=True) | |
| content.seek(0) | |
| with tarfile.open(fileobj=content, | |
| mode=f"r|{get_tarfile_type(mime_type)}") as t: | |
| t.extractall(path=dest) | |
| except KeyError: | |
| raise RuntimeError('Unsupport tar type') | |
| except tarfile.TarError: | |
| raise RuntimeError('Failed to extract file') | |
| def copy(src: str, dest: str, filename: str = None) -> str: | |
| try: | |
| Path(dest).mkdir(parents=True, exist_ok=True) | |
| r = download(src) | |
| name = filename if filename else src.split('/')[-1] | |
| file_path = Path(dest) / name | |
| with open(file_path, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=8*1024*1024): | |
| if chunk: | |
| f.write(chunk) | |
| return str(file_path) | |
| except OSError: | |
| raise RuntimeError('Failed to copy file') | |
| def get_json(src: str): | |
| try: | |
| resp = requests.get(src) | |
| resp.raise_for_status() | |
| return json.loads(resp.content) | |
| except (requests.HTTPError, ConnectionError): | |
| raise RuntimeError('Failed to get json file') | |
| except json.JSONDecodeError: | |
| raise RuntimeError('Invalid JSON file') | |
| def get_yaml(src: str): | |
| try: | |
| resp = requests.get(src) | |
| resp.raise_for_status() | |
| return yaml.safe_load(resp.content) | |
| except (requests.HTTPError, ConnectionError): | |
| raise RuntimeError('Failed to get yaml file') | |
| except yaml.YAMLError: | |
| raise RuntimeError('Invalid YAML file') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment