Skip to content

Instantly share code, notes, and snippets.

@kozikow
Created January 31, 2017 08:17
Show Gist options
  • Save kozikow/86ee26998c5de1cf3780b6bc0cd35ec0 to your computer and use it in GitHub Desktop.
Save kozikow/86ee26998c5de1cf3780b6bc0cd35ec0 to your computer and use it in GitHub Desktop.

Revisions

  1. kozikow created this gist Jan 31, 2017.
    97 changes: 97 additions & 0 deletions job_submit.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,97 @@
    import os
    import re

    import kubernetes
    import logging

    import math
    from kubernetes.client import V1Container
    from kubernetes.client import V1EnvVar
    from kubernetes.client import V1Job
    from kubernetes.client import V1JobSpec
    from kubernetes.client import V1ObjectMeta
    from kubernetes.client import V1PodSpec
    from kubernetes.client import V1PodTemplateSpec
    from kubernetes.client import V1ResourceRequirements

    _KUBERNETES_NAMESPACE = "echoserver"
    _DEADLINE_SECONDS = 60 * 60 * 12 # 12h
    _BYTES_PER_TILE = 500000 # 500 KB
    _BASE_MEM_BYTES = 3000 * 1024 * 1024 # 3000 MiB


    def _plan_to_job_name(plan_id):
    clean_plan = plan_id.replace("_", "-").lower()
    clean_plan = re.sub("[^-a-z0-9]+", "-", clean_plan)
    return "worker-" + clean_plan


    def _tiles_to_memory_limit(tiles):
    bytes_req = _BASE_MEM_BYTES + _BYTES_PER_TILE * len(tiles)
    memory_req = str(int(math.ceil(bytes_req / (1024 * 1024)))) + "Mi"
    logging.info(
    "Memory requirement",
    extra={
    "tiles_len": len(tiles),
    "memory_req_bytes": bytes_req,
    "memory_req": memory_req
    })
    return memory_req


    class KubernetesTfApi(object):
    def __init__(self):
    logging.info("Initializing kubernetes api")
    kubernetes.config.load_incluster_config()
    self._batch_api = kubernetes.client.apis.batch_v1_api.BatchV1Api()
    self._core_api = kubernetes.client.CoreV1Api()

    def clean_old_job(self, plan_id):
    label_selector = "plan=={}".format(plan_id)
    self._batch_api.delete_collection_namespaced_job(
    namespace=_KUBERNETES_NAMESPACE,
    label_selector=label_selector,
    )
    self._core_api.delete_collection_namespaced_pod(
    namespace=_KUBERNETES_NAMESPACE,
    label_selector=label_selector
    )

    def request_plan_processing(self, plan_id, tiles):
    job_name = _plan_to_job_name(plan_id)
    self.clean_old_job(plan_id)
    object_meta = V1ObjectMeta(name=job_name, labels={"plan": plan_id})

    memory_usage = {"memory": _tiles_to_memory_limit(tiles)}
    pod_spec = V1PodSpec(
    containers=[
    V1Container(
    args=["python2", "/workspace/worker/main.py", plan_id],
    env=[V1EnvVar(name="SERVER_STAGE",
    value=os.environ["SERVER_STAGE"]),
    V1EnvVar(name="OPENBLAS_NUM_THREADS",
    value="2")],
    image_pull_policy="IfNotPresent",
    image="gcr.io/tensorflight/worker:27.0.0",
    name=job_name,
    resources=V1ResourceRequirements(
    limits=memory_usage,
    requests=memory_usage
    )
    )
    ],
    restart_policy="OnFailure",
    active_deadline_seconds=_DEADLINE_SECONDS
    )
    job_spec = V1JobSpec(
    active_deadline_seconds=_DEADLINE_SECONDS,
    completions=1,
    parallelism=1,
    template=V1PodTemplateSpec(
    spec=pod_spec,
    metadata=object_meta
    ),
    )
    self._batch_api.create_namespaced_job(namespace=_KUBERNETES_NAMESPACE,
    body=V1Job(spec=job_spec,
    metadata=object_meta))