Created
January 31, 2017 08:17
-
-
Save kozikow/86ee26998c5de1cf3780b6bc0cd35ec0 to your computer and use it in GitHub Desktop.
Revisions
-
kozikow created this gist
Jan 31, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,97 @@ import os import re import kubernetes import logging import math from kubernetes.client import V1Container from kubernetes.client import V1EnvVar from kubernetes.client import V1Job from kubernetes.client import V1JobSpec from kubernetes.client import V1ObjectMeta from kubernetes.client import V1PodSpec from kubernetes.client import V1PodTemplateSpec from kubernetes.client import V1ResourceRequirements _KUBERNETES_NAMESPACE = "echoserver" _DEADLINE_SECONDS = 60 * 60 * 12 # 12h _BYTES_PER_TILE = 500000 # 500 KB _BASE_MEM_BYTES = 3000 * 1024 * 1024 # 3000 MiB def _plan_to_job_name(plan_id): clean_plan = plan_id.replace("_", "-").lower() clean_plan = re.sub("[^-a-z0-9]+", "-", clean_plan) return "worker-" + clean_plan def _tiles_to_memory_limit(tiles): bytes_req = _BASE_MEM_BYTES + _BYTES_PER_TILE * len(tiles) memory_req = str(int(math.ceil(bytes_req / (1024 * 1024)))) + "Mi" logging.info( "Memory requirement", extra={ "tiles_len": len(tiles), "memory_req_bytes": bytes_req, "memory_req": memory_req }) return memory_req class KubernetesTfApi(object): def __init__(self): logging.info("Initializing kubernetes api") kubernetes.config.load_incluster_config() self._batch_api = kubernetes.client.apis.batch_v1_api.BatchV1Api() self._core_api = kubernetes.client.CoreV1Api() def clean_old_job(self, plan_id): label_selector = "plan=={}".format(plan_id) self._batch_api.delete_collection_namespaced_job( namespace=_KUBERNETES_NAMESPACE, label_selector=label_selector, ) self._core_api.delete_collection_namespaced_pod( namespace=_KUBERNETES_NAMESPACE, label_selector=label_selector ) def request_plan_processing(self, plan_id, tiles): job_name = _plan_to_job_name(plan_id) self.clean_old_job(plan_id) object_meta = V1ObjectMeta(name=job_name, labels={"plan": plan_id}) memory_usage = {"memory": _tiles_to_memory_limit(tiles)} pod_spec = V1PodSpec( containers=[ V1Container( args=["python2", "/workspace/worker/main.py", plan_id], env=[V1EnvVar(name="SERVER_STAGE", value=os.environ["SERVER_STAGE"]), V1EnvVar(name="OPENBLAS_NUM_THREADS", value="2")], image_pull_policy="IfNotPresent", image="gcr.io/tensorflight/worker:27.0.0", name=job_name, resources=V1ResourceRequirements( limits=memory_usage, requests=memory_usage ) ) ], restart_policy="OnFailure", active_deadline_seconds=_DEADLINE_SECONDS ) job_spec = V1JobSpec( active_deadline_seconds=_DEADLINE_SECONDS, completions=1, parallelism=1, template=V1PodTemplateSpec( spec=pod_spec, metadata=object_meta ), ) self._batch_api.create_namespaced_job(namespace=_KUBERNETES_NAMESPACE, body=V1Job(spec=job_spec, metadata=object_meta))