import requests import time class DatadogLogIterator: _MAX_LIMIT = 1_000 _FETCH_LIMIT = _MAX_LIMIT def __init__(self, query, start, end, api_key, app_key): self._cursor = None self._query = query self._api_key = api_key self._app_key = app_key self._start_timestamp = start.isoformat() self._end_timestamp = end.isoformat() def __iter__(self): return self def __next__(self): url = f"https://api.datadoghq.com/api/v2/logs/events/search" headers = {"Content-Type": "application/json", "DD-API-KEY": self._api_key, "DD-APPLICATION-KEY": self._app_key} params = { "filter": {"query": self._query, "from": self._start_timestamp, "to": self._end_timestamp}, "page": {"limit": self._FETCH_LIMIT}, } if self._cursor: params["page"]["cursor"] = self._cursor response = requests.post(url, headers=headers, json=params) if response.status_code != 200: print(f"ERROR: Datadog API returned {response.status_code}") return None result = response.json() try: self._cursor = result["meta"]["page"]["after"] except KeyError: raise StopIteration event_batch = result["data"] if not event_batch: raise StopIteration return event_batch class DatadogLogFetcher: _SLEEP_SEC = 3 def __init__(self, api_key, app_key): self._api_key = api_key self._app_key = app_key def fetch(self, query, start, end, limit): log_iterator = DatadogLogIterator(query, start, end, self._api_key, self._app_key) all_events = [] for batch in log_iterator: all_events += batch print(f"Fetched a batch of {len(batch)} log events; {len(all_events):>7} total fetched so far...") time.sleep(self._SLEEP_SEC) if len(all_events) >= limit: break return all_events