From a5e18b38a9e3846993fce806b9970ed9513c72a2 Mon Sep 17 00:00:00 2001 From: Bob Date: Fri, 20 Oct 2017 17:00:54 +0000 Subject: [PATCH] First pass at actual code. --- busybody.py | 260 +++++++++++++++++++++++++++++++++++++++++++ flatfile/__init__.py | 1 + flatfile/flatfile.py | 95 ++++++++++++++++ gsuite/__init__.py | 1 + gsuite/gsuite.py | 72 ++++++++++++ requirements.txt | 7 ++ slack/__init__.py | 1 + slack/slack.py | 71 ++++++++++++ 8 files changed, 508 insertions(+) create mode 100755 busybody.py create mode 100644 flatfile/__init__.py create mode 100644 flatfile/flatfile.py create mode 100644 gsuite/__init__.py create mode 100644 gsuite/gsuite.py create mode 100644 requirements.txt create mode 100644 slack/__init__.py create mode 100644 slack/slack.py diff --git a/busybody.py b/busybody.py new file mode 100755 index 0000000..c722822 --- /dev/null +++ b/busybody.py @@ -0,0 +1,260 @@ +#!env python +import os +import sys +import argparse +import yaml +import logging +from pathlib import Path +import importlib +import re +import geoip2.database +from datetime import datetime +import numpy +from scipy.sparse import hstack +from sklearn.preprocessing import scale +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.ensemble import IsolationForest + +program_version = "0.1" + +logger = logging.getLogger(__name__) + + +def poll(config): + data = {} + if "persistence" in config["active_modules"] and config["active_modules"]["persistence"]: + persist_module = getattr(sys.modules[config["active_modules"]["persistence"]], + config["active_modules"]["persistence"]) + get_last_func = getattr(persist_module, "get_last") + persist_func = getattr(persist_module, "persist") + config = get_last_func(config) + for poller in config["active_modules"]["pollers"]: + poll_mod = getattr(sys.modules[poller], poller) + ts_field = poll_mod.TIMESTAMP_FIELD + poll_func = getattr(poll_mod, "poll") + logger.info("Polling %s for new events..." % poller) + data[poller] = poll_func(config) + data[poller].sort(key=lambda k: k[ts_field]) + if "persistence" in config["active_modules"] and config["active_modules"]["persistence"]: + persist_func(config, data) + return + + +def load_historical(config): + if "persistence" in config["active_modules"] and config["active_modules"]["persistence"]: + persist_module = getattr(sys.modules[config["active_modules"]["persistence"]], + config["active_modules"]["persistence"]) + get_last_func = getattr(persist_module, "get_last") + config = get_last_func(config) + get_historical_func = getattr(persist_module, "get_historical_data") + data = get_historical_func(config) + return data + + +def preprocess(config, data): + processed = [] + ua_filter = re.compile('[a-zA-Z:\._\(\)-]*([0-9]+[a-zA-Z:\._\(\)-]*)+') + for module in data: + poll_mod = getattr(sys.modules[module], module) + ts_field = poll_mod.TIMESTAMP_FIELD + user_field = poll_mod.USER_FIELD + ip_field = poll_mod.IP_FIELD + ua_field = poll_mod.USER_AGENT_FIELD + filter_field = poll_mod.FILTER_FIELD + city_lookup = geoip2.database.Reader(config["analysis"]["geoip"]["city_db"]) + asn_lookup = geoip2.database.Reader(config["analysis"]["geoip"]["asn_db"]) + for event in data[module]: + if filter_field: + if event[filter_field] in poll_mod.FILTERED_EVENTS: + continue + if not ts_field in event or not event[ts_field] or not user_field in event or not event[user_field] or not ip_field in event or not event[ip_field] or not ua_field in event or not event[ua_field]: + continue + if type(event[ts_field]) == str: + ts = datetime.timestamp(datetime.strptime(event[ts_field], '%Y-%m-%dT%H:%M:%S.%fZ')) + else: + ts = event[ts_field] + if "user_map" in config["analysis"][module] and \ + event[user_field] in config["analysis"][module]["user_map"]: + user = config["analysis"][module]["user_map"][event[user_field]] + else: + user = event[user_field] + if "user_domain" in config["analysis"][module] and '@' not in user: + user = "@".join((user, config["analysis"][module]["user_domain"])) + user_agent = ua_filter.sub('', event[ua_field]) + if event[ip_field] is None: + continue + city = city_lookup.city(event[ip_field]) + readable = [] + if "en" in city.city.names and city.city.names["en"]: + readable.append(city.city.names["en"]) + if city.subdivisions and city.subdivisions[0].iso_code: + readable.append(city.subdivisions[0].iso_code) + if city.country and city.country.iso_code: + readable.append(city.country.iso_code) + if city.continent and city.continent.code: + readable.append(city.continent.code) + event["ip_location"] = ", ".join(readable) + x, y, z = latlon_to_xyz(city.location.latitude, city.location.longitude) + try: + asn = asn_lookup.asn(event[ip_field]).autonomous_system_organization + event["asn"] = asn + except geoip2.errors.AddressNotFoundError: + asn = "" + if not asn: + asn = "" + processed.append([ts, event, user, x, y, z, asn, user_agent]) + return sorted(processed, key=lambda event: event[0]) + + +def analyze(config, data): + alerts = [] + last_anlyzed = 0 + if "persistence" in config["active_modules"] and config["active_modules"]["persistence"]: + persist_module = getattr(sys.modules[config["active_modules"]["persistence"]], + config["active_modules"]["persistence"]) + last_analyzed_func = getattr(persist_module, "get_last_analyzed") + persist_analyzed_func = getattr(persist_module, "persist_last_analyzed") + last_analyzed = last_analyzed_func(config) + # get unique list of users across data + unique_users = list(set([e[2] for e in data])) + logger.debug("Unique users: %s" % len(unique_users)) + for user in unique_users: + logger.debug("Analyzing data for user %s." % user) + user_events = numpy.array([e for e in data if e[2] == user]) + if last_analyzed > 0 and user_events[-1][0] < last_analyzed: + logger.debug("Skipping user as they have no non-analyzed events.") + continue + asn_vectorizer = TfidfVectorizer(binary=True) + ua_vectorizer = TfidfVectorizer(binary=True) + times = user_events[:, 0:1] + coords = user_events[:, 3:6] + logger.debug("Transforming ASNs.") + asns = asn_vectorizer.fit_transform(user_events[:, 6]) + logger.debug("Transforming User-Agents.") + uas = ua_vectorizer.fit_transform(user_events[:, 7]) + sparse_mat = numpy.concatenate((times, coords, asns.toarray(), uas.toarray()), axis=1) + logger.debug("Running Isolation Forest.") + detector = IsolationForest(n_jobs=-1, contamination=0) + if last_analyzed == 0 or user_events[0][0] >= last_analyzed: + detector.fit(sparse_mat[:, 1:]) + predictions = detector.predict(sparse_mat[:, 1:]) + else: + for counter, event in enumerate(sparse_mat): + if event[0] < last_analyzed: + cutoff = counter + else: + break + cutoff += 1 + logger.debug("Splitting array of length %s at entry %s" % (len(sparse_mat), cutoff)) + detector.fit(sparse_mat[:cutoff, 1:]) + predictions = detector.predict(sparse_mat[cutoff:, 1:]) + flagged = 0 + for ev_no, prediction in enumerate(predictions): + if prediction == -1: + flagged += 1 + alerts.append(user_events[ev_no][1]) + logger.debug("Processed %s: %s of %s flagged." % (user, flagged, len(user_events))) + for alert in alerts: + logger.info(alert) + persist_analyzed_func(config, data[-1][0]) + + +def latlon_to_xyz(lat, lon): + phi = (90 - lat) * (numpy.pi / 180) + theta = (lon + 180) * (numpy.pi / 180) + + x = 0 - (numpy.sin(phi) * numpy.cos(theta)) + z = (numpy.sin(phi) * numpy.sin(theta)) + y = (numpy.cos(phi)) + + return (x, y, z) + + +def load_config(config_path): + if config_path: + config_file = Path(config_path) + else: + homeconfig = Path.home() / ".config" / "busybody" / "config.yml" + scriptconfig = Path(os.path.realpath(__file__)).parent / "config.yml" + if homeconfig.is_file(): + config_file = homeconfig + elif scriptconfig.is_file(): + config_file = sriptconfig + else: + raise RuntimeError("No configuration file found.") + with config_file.open() as f: + config = yaml.load(f, Loader=yaml.loader.BaseLoader) + return config + + +def load_modules(config): + config["active_modules"] = {} + if not "pollers" in config or not config["pollers"]: + raise RuntimeError("Polllers aren't optional.") + config["active_modules"]["pollers"] = [] + for poller in config["pollers"]: + importlib.import_module(poller, poller) + config["active_modules"]["pollers"].append(poller) + if config["mode"] is None or config["mode"] == "analyze": + if not "notifiers" in config or not config["notifiers"]: + raise RuntimeError("Configured to analyze, but no notifiers in config file.") + config["active_modules"]["notifiers"] = [] + for notifier in config["notifiers"]: + importlib.import_module(notifier, notifier) + config["active_modules"]["notifiers"].append(notifier) + if "persistence" in config and config["persistence"]: + if "module" in config["persistence"] and config["persistence"]["module"]: + importlib.import_module(config["persistence"]["module"], + config["persistence"]["module"]) + config["active_modules"]["persistence"] = config["persistence"]["module"] + else: + raise RuntimeError("Persistence is configured, but no module specified.") + return config + + +#INIT STUFF/CONTROL LOOP +if __name__ == '__main__': + parser = argparse.ArgumentParser(prog="Busybody", + description="Neighborhood watch for your SaaS apps.") + parser.add_argument("-c", "--config", default=None, + help="Non-standard location of a configuration file.") + parser.add_argument("-f", "--file", default=None, + help="File to redirect log output into.") + parser.add_argument("-m", "--mode", default=None, + help="Select a mode from: poll, analyze. Default is to perform both.") + parser.add_argument("-v", "--verbose", action="count", default=0, + help="Increase log verbosity level. (Default" + + " level: WARN, use twice for DEBUG)") + parser.add_argument("-V", "--version", action="version", + version="%(prog)s " + program_version, + help="Display version information and exit.") + args = parser.parse_args() + loglevel = max(10, 30 - (args.verbose * 10)) + logformat = '%(asctime)s %(levelname)s: %(message)s' + if args.file: + logging.basicConfig(filename=args.file, level=loglevel, format=logformat) + else: + logging.basicConfig(level=loglevel, format=logformat) + + logger.info("Starting busybody...") + try: + config = load_config(args.config) + config["mode"] = args.mode + config = load_modules(config) + logger.info("Modules and config loaded.") + if not args.mode or args.mode == "poll": + logger.info("Polling for new events...") + data = poll(config) + if not args.mode or args.mode == "analyze": + if "persistence" in config and config["persistence"]: + logger.info("Loading stored data...") + data = load_historical(config) + logger.info("Preprocessing data...") + data = preprocess(config, data) + logger.info("Analyzing data...") + analyze(config, data) + except Exception as e: + raise(e) + finally: + logger.info("Busybody closing.") + logging.shutdown() diff --git a/flatfile/__init__.py b/flatfile/__init__.py new file mode 100644 index 0000000..abe4115 --- /dev/null +++ b/flatfile/__init__.py @@ -0,0 +1 @@ +from flatfile import flatfile diff --git a/flatfile/flatfile.py b/flatfile/flatfile.py new file mode 100644 index 0000000..8e410c2 --- /dev/null +++ b/flatfile/flatfile.py @@ -0,0 +1,95 @@ +import sys +import json +import logging +from pathlib import Path + +logger = logging.getLogger(__name__) + +def get_last(config): + if "log_directory" not in config["persistence"] or not config["persistence"]["log_directory"]: + raise RuntimeError("Flat file persistence requested, but no log_directory specified.") + log_dir = Path(config["persistence"]["log_directory"]) + log_dir.mkdir(mode=0o775, parents=True, exist_ok=True) + if "pollers" in config and config["pollers"]: + for module in config["active_modules"]["pollers"]: + poll_mod = getattr(sys.modules[module], module) + ts_field = poll_mod.TIMESTAMP_FIELD + log_file = log_dir / (module + ".log") + log_file.touch(mode=0o660, exist_ok=True) + last = "" + with log_file.open('r') as f: + for line in f: + if len(line) > 3: + last = line + if last: + last_event = json.loads(last) + config["pollers"][module]["last_polled_time"] = last_event[ts_field] + config["pollers"][module]["last_polled_event"] = last_event + else: + config["pollers"][module]["last_polled_time"] = 0 + config["pollers"][module]["last_polled_event"] = {} + return config + + +def persist(config, data): + if "log_directory" not in config["persistence"] or not config["persistence"]["log_directory"]: + raise RuntimeError("Flat file persistence requested, but no log_directory specified.") + log_dir = Path(config["persistence"]["log_directory"]) + log_dir.mkdir(mode=0o775, parents=True, exist_ok=True) + for module in data: + log_file = log_dir / (module + ".log") + log_file.touch(mode=0o660, exist_ok=True) + with log_file.open('a') as f: + for entry in data[module]: + f.write('%s\n' % json.dumps(entry)) + + +def get_historical_data(config): + data = {} + if "log_directory" not in config["persistence"] or not config["persistence"]["log_directory"]: + raise RuntimeError("Flat file persistence requested, but no log_directory specified.") + log_dir = Path(config["persistence"]["log_directory"]) + log_dir.mkdir(mode=0o775, parents=True, exist_ok=True) + if not "pollers" in config: + return data + for module in config["active_modules"]["pollers"]: + data[module] = [] + poll_mod = getattr(sys.modules[module], module) + ts_field = poll_mod.TIMESTAMP_FIELD + if "history_limit" in config: + limit = max(0, config["pollers"][module]["last_polled_time"] - config["history_limit"]) + else: + limit = 0 + log_file = log_dir / (module + ".log") + log_file.touch(mode=0o660, exist_ok=True) + with log_file.open('r') as f: + for line in f: + event = json.loads(line) + if str(event[ts_field]) >= str(limit): + data[module].append(event) + return data + +def get_last_analyzed(config): + if "log_directory" not in config["persistence"] or not config["persistence"]["log_directory"]: + raise RuntimeError("Flat file persistence requested, but no log_directory specified.") + log_dir = Path(config["persistence"]["log_directory"]) + log_dir.mkdir(mode=0o775, parents=True, exist_ok=True) + log_file = log_dir / "last_analyzed.log" + log_file.touch(mode=0o660, exist_ok=True) + with log_file.open('r') as f: + try: + timestamp = json.load(f) + except: + timestamp = 0 + return timestamp + + +def persist_last_analyzed(config, timestamp): + if "log_directory" not in config["persistence"] or not config["persistence"]["log_directory"]: + raise RuntimeError("Flat file persistence requested, but no log_directory specified.") + log_dir = Path(config["persistence"]["log_directory"]) + log_dir.mkdir(mode=0o775, parents=True, exist_ok=True) + log_file = log_dir / "last_analyzed.log" + log_file.touch(mode=0o660, exist_ok=True) + with log_file.open('w') as f: + json.dump(timestamp, f) diff --git a/gsuite/__init__.py b/gsuite/__init__.py new file mode 100644 index 0000000..4ba28be --- /dev/null +++ b/gsuite/__init__.py @@ -0,0 +1 @@ +from gsuite import gsuite diff --git a/gsuite/gsuite.py b/gsuite/gsuite.py new file mode 100644 index 0000000..c42ff2e --- /dev/null +++ b/gsuite/gsuite.py @@ -0,0 +1,72 @@ +import logging +from collections import Iterable +from apiclient import discovery +from oauth2client.service_account import ServiceAccountCredentials + +logger = logging.getLogger(__name__) + + +TIMESTAMP_FIELD = "id.time" +USER_FIELD = "actor.email" +IP_FIELD = "ipAddress" +USER_AGENT_FIELD = "events.0.login_type" +FILTER_FIELD = "events.0.name" +FILTERED_EVENTS = ["login_failure"] + +def poll(config): + data = [] + scopes = ['https://www.googleapis.com/auth/admin.reports.audit.readonly'] + credentials = ServiceAccountCredentials.from_json_keyfile_name(config["pollers"]["gsuite"]["credential_file"], scopes=scopes) + credentials = credentials.create_delegated(config["pollers"]["gsuite"]["admin_email"]) + service = discovery.build('admin', 'reports_v1', credentials=credentials) + request = service.activities().list(userKey='all', applicationName='login') + for i in range(1, 101): + logger.info("Polling page %s..." % i) + results = request.execute() + activities = results.get('items', []) + for event in activities: + flattened = flatten(event) + if flattened[TIMESTAMP_FIELD] > str(config["pollers"]["gsuite"]["last_polled_time"]): + data.append(flattened) + elif flattened[TIMESTAMP_FIELD] == str(config["pollers"]["gsuite"]["last_polled_time"]): + if flattened["id.uniqueQualifier"] == config["pollers"]["gsuite"]["last_polled_event"]["id.uniqueQualifier"]: + caught_up = True + break + else: + data.append(flattened) + else: + caught_up = True + break + request = service.activities().list_next(request, results) + if caught_up or request is None: + break + return data + + +def notify(config, alerts): + return + + +def flatten(event, prefix=''): + flattened = {} + for field_no, field in enumerate(event): + if 'keys' in dir(event): + #Special case "parameters" values. We should to treat those as dicts. + if field == "parameters": + for param in event[field]: + if isinstance(param["value"], Iterable) and not isinstance(param["value"], str): + flattened.update(param["value"], prefix + param["name"] + ".") + else: + flattened[prefix + param["name"]] = param["value"] + continue + else: + nextLevel = event[field] + currEntry = prefix + str(field) + else: + nextLevel = event[field_no] + currEntry = prefix + str(field_no) + if isinstance(nextLevel, Iterable) and not isinstance(nextLevel, str): + flattened.update(flatten(nextLevel, currEntry + ".")) + else: + flattened[currEntry] = nextLevel + return flattened diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2e97e92 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pyyaml +numpy +scikit-learn +geoip2 +pyproj +slackclient +google-api-python-client diff --git a/slack/__init__.py b/slack/__init__.py new file mode 100644 index 0000000..ec1faab --- /dev/null +++ b/slack/__init__.py @@ -0,0 +1 @@ +from slack import slack diff --git a/slack/slack.py b/slack/slack.py new file mode 100644 index 0000000..a17b288 --- /dev/null +++ b/slack/slack.py @@ -0,0 +1,71 @@ +import logging +from slackclient import SlackClient + +logger = logging.getLogger(__name__) + +TIMESTAMP_FIELD = "date_last" +USER_FIELD = "email" +IP_FIELD = "ip" +USER_AGENT_FIELD = "user_agent" +FILTER_FIELD = None + + +def poll(config): + slack_api = SlackClient(config["pollers"]["slack"]["api_token"]) + data = [] + caught_up = False + for i in range(1, 101): + logger.info("Polling page %s..." % i) + api_data = slack_api.api_call("team.accessLogs", count=1000, page=i) + check_api(api_data) + for event in api_data["logins"]: + if event[TIMESTAMP_FIELD] > config["pollers"]["slack"]["last_polled_time"]: + data.append(event) + elif event[TIMESTAMP_FIELD] == config["pollers"]["slack"]["last_polled_time"]: + if str(event) == str(config["pollers"]["slack"]["last_polled_event"]): + caught_up = True + break + else: + data.append(event) + else: + caught_up = True + break + if caught_up: + break + data = enrich(config, data) + return data + + +def notify(config, alerts): + return + + +def check_api(data): + if not data["ok"]: + raise RuntimeError("Slack API returned an error: "+str(data)) + else: + return + +def enrich(config, data): + unique_users = list(set([e["user_id"] for e in data])) + slack_api = SlackClient(config["pollers"]["slack"]["api_token"]) + user_map = {} + + for user in unique_users: + user_info = slack_api.api_call("users.info", user=user) + check_api(user_info) + logger.debug(user_info) + if "is_bot" in user_info["user"] and user_info["user"]["is_bot"]: + continue + elif "is_app_user" in user_info["user"] and user_info["user"]["is_app_user"]: + continue + elif "profile" in user_info["user"] and "email" in user_info["user"]["profile"]: + user_map[user] = user_info["user"]["profile"]["email"] + logger.debug("Mapping user %s to %s." % (user, user_map[user])) + new_data = [] + for entry in data: + if entry["user_id"] in user_map: + entry["email"] = user_map[entry["user_id"]] + new_data.append(entry) + logger.debug("Returning %s records." % len(new_data)) + return new_data