import gitlab import os import subprocess from gitlab.v4.objects.jobs import ProjectJob from gitlab.v4.objects.pipelines import ProjectPipeline def pprint_job(job: ProjectJob): return f"ProjectJob" def pprint_pipeline(p: ProjectPipeline): return f"ProjectPipeline" def download_artifact_locally_with_python(gl: gitlab.Gitlab): '''When running locally, use developer's with GitLab Artifacts API to download last successful master deployment file. ''' # get last successful master branch Build Job Definition job build_job: ProjectJob = None print(f"searching pipelines for successful master job {os.environ['BUILD_JOB_NAME']}...") project = gl.projects.get(os.environ['CI_PROJECT_ID'], lazy=False) for p in project.pipelines.list(): if p.attributes.get('ref', None) == 'master': for j in p.jobs.list(): if j.attributes.get('name', None) == os.environ['BUILD_JOB_NAME']: if j.attributes.get('status', None) == 'success': build_job = project.jobs.get(j.id, lazy=False) print(f'found {pprint_job(build_job)} from {pprint_pipeline(p)}') break if build_job is not None: break # write previous job definition artifact_path = os.environ['FILEPATH_FINAL_JOB_DEFINITION'] print(f'downloading artifact {artifact_path}...') try: contents = build_job.artifact(artifact_path) if build_job else b'{}' with open(os.environ['FILEPATH_PREVIOUS_JOB_DEFINITION'], mode='wb') as f: f.write(contents) print(os.environ['FILEPATH_PREVIOUS_JOB_DEFINITION']) except gitlab.exceptions.GitlabGetError: print(f'ERROR: could not find artifact {artifact_path} from {pprint_job(build_job)}') artifacts = build_job.attributes.get('attributes', None) print(f'available artifacts: {artifacts}') if __name__ == '__main__': token: str = None gl: gitlab.Gitlab = None try: gl = gitlab.Gitlab(url='https://gitlab.com', private_token=os.environ['GITLAB_API_TOKEN']) except KeyError: print('no GITLAB_API_TOKEN found (running on GitLab.com?)') print('exiting to run with curl using CI_JOB_TOKEN...') else: download_artifact_locally_with_python(gl)