-
-
Save andr6/18f4bcfa7224b0a0107ca2fe550e093c to your computer and use it in GitHub Desktop.
Open source Threat intel check via C1fApp API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| ''' | |
| usage: | |
| cat something_withIPs | python checkthreatfeed.py | |
| www.c1fapp.com | |
| ''' | |
| import sys | |
| import re | |
| import ipaddress | |
| import json | |
| import requests | |
| URL = "https://www.c1fapp.com/cifapp/api/" | |
| HEADERS = {'cache-control': "no-cache"} | |
| KEY = "GET_A_KEY" | |
| def check_observable(request): | |
| c1_response = {"query":"","status":"0","assessment":"","description":""} | |
| c1_response['query'] = request | |
| payload = {} | |
| payload['key'] = KEY | |
| payload['format'] = "json" | |
| payload['backend'] = "es" | |
| payload['request'] = request | |
| response = requests.request("POST", URL, data=json.dumps(payload), | |
| headers=HEADERS) | |
| if response.status_code != 200: | |
| print "Error: Invalid response" | |
| return c1_response | |
| results = json.loads(response.text) | |
| if len(results) > 0: | |
| c1_response['status'] = 1 | |
| assessment_results = [] | |
| description_results = [] | |
| for res in results: | |
| assessment = str(res['assessment'][0]) | |
| description = str(res['description'][0]) | |
| description_results.append(description) | |
| assessment_results.append(assessment) | |
| c1_response['assessment'] = list(set(assessment_results))[0] | |
| c1_response['description'] = list(set(description_results))[0] | |
| return c1_response | |
| def read_in(): | |
| observables = [] | |
| lines = sys.stdin.readlines() | |
| for i in range(len(lines)): | |
| for ip in re.findall(r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.)' | |
| r'{3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b', | |
| lines[i], re.IGNORECASE): | |
| observables.append(ip) | |
| return observables | |
| def main(): | |
| observables = read_in() | |
| for ip in list(set(observables)): | |
| ipv4 = ipaddress.IPv4Address(unicode(ip)) | |
| if ipv4.is_private: | |
| pass | |
| else: | |
| res = check_observable(str(ipv4)) | |
| print res | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment