def make_date_file_sensor_for_asset(j: JobDefinition, resource_defs_for_ssh): @sensor( job=j, name=j.name + "_sensor", default_status=DefaultSensorStatus.RUNNING, ) def date_file_sensor(context): with build_resources(resource_defs_for_ssh) as resources: ssh = resources.ssh sftp = ssh.open_sftp() last_processed_date = context.cursor if last_processed_date is None: next_date = pd.to_datetime(START_DATE).strftime(DATE_FORMAT) next_date_formatted = pd.to_datetime(START_DATE).strftime( DATE_FORMAT_PARTITION ) else: next_date = ( datetime.strptime(last_processed_date, DATE_FORMAT) + timedelta(days=7) ).strftime(DATE_FORMAT) next_date_formatted = pd.to_datetime(next_date).strftime( DATE_FORMAT_PARTITION ) path = f"upload/myfile{next_date_formatted}.zip" if sftp_exists(sftp, path): context.update_cursor(next_date) close(sftp, ssh) return j.run_request_for_partition(next_date, run_key=path) else: close(sftp, ssh) return SkipReason(f"Did not find file {path}") return date_file_sensor