diff --git a/README.md b/README.md index 45454e6..7ee2b98 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,98 @@ # belay -A simple python utility for checking up on your Slack organization. \ No newline at end of file +A simple python utility for checking up on your Slack organization. + +## Setup + +While belay can utilize legacy tokens with the `MAX` permission scope, stored in plaintext, and with everything installed system-wide, this goes against modern security and system administration practices. Instead of doing that, below we will go into depth to describe how to set up `belay` on a standard unix-like system. + +### Python Environment + +Depending on your choice of system, please use the package manager of your choice to install `python-2.7`, `pip`, and `virtualenv`. Now, selecting a directory that can host executable files, create a virtualenv for belay to run from + +> $ virtualenv belayenv + +Once that is complete, move into the new `belayenv` directory and activate your new virtual environment: + +> $ . ./bin/activate + +Now that we're in the environment, please download `belay` from your choice of source. For example: + +> $(belayenv) git clone git@github.com:bobthesecurityguy/belay.git + +Now move into that new directory and install `belay`'s (relatively short) list of requirements: + +> $(belayenv) pip install -r requirements.txt + +On certain systems, installing these dependencies from `pip` may fail. In that case, check your package manager for pre-built packages under that name and then re-run the above command until it succeeds. + +### API Tokens + +To create the required API tokens, direct your browser to the [Slack API Apps list](https://api.slack.com/apps) and (once signed-in to the appropriate team) click on the "Create New App" button. This should present you with a menu that allows you to set a name for the app (we recommend "Belay") and select the team to enable it on. + +Once you enter that information, you will be taken to the detailed settings for your new app. Feel free to set the "Display Information" in a way that makes sense to you and ignore the "App Credentials" presented. Move down to "Bot Users", enable a bot user with the name of your choice, and then move back up to "OAuth & Permissions". + +In the OAuth section select the following permissions. In an effort to make you a bit more comfortable with granting these, each will be listed with a full description of what we use it for below: + +> admin - Used to access integration logs. +> bot - Used to act as the bot user. +> chat:write:bot - Used to send messages as our bot. +> files:write:user - Used to upload messages as files if too large. +> users:read - Used to access the user list. + +Once you have granted those permissions, install the app and take note of the OAuth tokens that have been generated for you. + +### Config File + +The `belay` config file is a YAML config file that allows you to configure most settings within the script. Some settings are available at the command line (mostly runtime options like verbosity and log output file) and some can be pulled in from the environment (the OAuth tokens). (Note that securely getting those tokens into the environment without placing them on disk is outside the scope of this document. For simplicity, this description will assume that they are in the config file.) + +The config file can be placed anywhere, but if a location is not given at runtime, the script will default to looking in `~/.config/belay/config.yml` or `./config.yml` in that order for the config. If the OAuth tokens are in the environment, it is possible to run without a config file, but most uses will want to set one up. + +The format of the config file is standard YAML formatting. If only one team is to be audited, the top-level "teams" dictionary may be omitted as may the outer wrapper dictionary for the team in question. + +Within a team dictionary, the following keys are valid and may have the following values: + +> api\_token - A string token +> bot\_token - A string token +> skip\_integrations - A boolean that skips the audit of active integrations +> integration\_whitelist - A dictionary (key=integration\_id) of lists of issues to ignore +> integration\_issue\_whitelist - A list of issues to ignore +> user\_whitelist - A dictionary (key=id) of lists of issues to ignore +> user\_issue\_whitelist - A list of issues to ignore +> output\_channel - The Slack channel name in which to output our results + +Possible values for the issues with integrations are: + +> legacy +> admin +> chat:write:user +> channels:history +> files:read +> files:write:user +> groups:history +> im:history +> mpim:history +> pins:read +> search:read + +Possible values for the issues with users are: + +> 2fa +> sms + +An example of how one might construct an actual working configuration from this is included in `example.yml`. + +## Output + +It is recommended to run Belay with the verbose flag and redirecting output to a file until you are certain that the configuration is correct. This is usually sufficient to identify issues with your configuration. Debug mode (`-vv`) logs an excessive amount of information and may log sensitve information and should be used with care. + +Please note that, when interpreting output from this program, some integrations really do need the permissions that we flag as potentially dangerous. Even `belay` needs some of the permissions on the list. The output of this program is not intended as a checklist of integrations for removal. Rather, it is intended to bring to light applications requesting sensitive permissions. If these permissions do not make sense for that application, it may be worth further investigation. + +Similarly, the use of SMS-based 2FA in many organizations is not a sufficient risk to justify concerted eradication efforts. Please use reasonable judgment when interpreting the output of `belay`. + +## Changelog + +* 1.0 - Initial public release + * Checks for integrations with sensitive permissions + * Checks for users with no 2FA or with SMS-based 2FA + * Fully documented diff --git a/belay.py b/belay.py index 5122666..cf1c0cc 100755 --- a/belay.py +++ b/belay.py @@ -3,41 +3,307 @@ import sys import os import yaml +import logging +import argparse from slackclient import SlackClient -import pprint +program_version="1.0" -# TODO: Logging Config, prints are just hack-y +logger = logging.getLogger(__name__) +def load_config(conf_path=None, team=None): + # Try the usual places to store an API token + # First token wins + api_token = None + bot_token = None + config_data = {} + homedir = os.path.expanduser("~") + scriptdir = os.path.dirname(os.path.realpath(__file__)) -# Try the usual places to store an API token -# First token wins -api_token = None -homedir = os.path.expanduser("~") -scriptdir = os.path.dirname(os.path.realpath(__file__)) + # 1. Specified config file + if conf_path: + logger.debug("Skipping standard config locations because config path passed in on the command line.") + # 2. ~/.config/slack/config.yml + elif os.path.isfile(os.path.join(homedir,".config","belay","config.yml")): + conf_path = os.path.join(homedir,".config","belay","config.yml") + logger.info("Found config file in Home Directory: %s", conf_path) + # 3. ./config.yml + elif os.path.isfile(os.path.join(scriptdir,"config.yml")): + conf_path = os.path.join(scriptdir,"config.yml") + logger.info("Found config file in current script directory: %s", conf_path) + # OK, now actually pull values from the configs + if conf_path: + logger.info("Attempting to load config from file.") + with open(conf_path, "r") as config: + config_data = yaml.load(config, Loader=yaml.loader.BaseLoader) + logger.debug("Loaded config: %s", config_data) + if "teams" in config_data: + logger.debug("This is a multi-team config file. Script is looking for team: %s", team) + if not team: + raise ValueError("No team specified with multi-team config. Please indicate which team you'd like to use.") + elif team not in config_data["teams"]: + raise ValueError("Specified team '"+team+"' not present in config file. Valid choices are: "+", ".join(config_data["teams"].keys())) + else: + logger.debug("Team '%s' found. Selecting only data for that team.", team) + config_data = config_data["teams"][team] + # Allow people to set keys in the env if they don't want them in files + # Clear bot if we find API, to try to avoid mixing teams + if "SLACK_API_TOKEN" in os.environ: + config_data.pop("bot_token", 0) + config_data["api_token"] = os.environ["SLACK_API_TOKEN"] + logger.info("Found API token in environment, adding to config from file.") + logger.debug("API Token: %s", api_token) + logger.info("Checking environment for SLACK_BOT_TOKEN variable...") + if "SLACK_BOT_TOKEN" in os.environ: + config_data["bot_token"] = os.environ["SLACK_BOT_TOKEN"] + logger.info("Found bot token in environment, adding to config from file.") + logger.debug("Bot Token: %s", bot_token) + + if "api_token" not in config_data: + raise RuntimeError("No API token not found.") + if "bot_token" not in config_data: + logger.warn("No bot token provided. Assuming API token is legacy token. Use of legacy tokens is a potential security issue and we strongly recommend switching to the new token format.") + return config_data -# 1. Environment variables -if "SLACK_API_TOKEN" in os.environ: - api_token = os.environ["SLACK_API_TOKEN"] -# 2. ~/.config/slack/config.yml -elif os.path.isfile(os.path.join(homedir,".config","slack","config.yml")): - with open(os.path.join(homedir,".config","slack","config.yml"), "r") as config: - config_data = yaml.load(config) - api_token = config_data["api_token"] -# 3. ./config.yml -elif os.path.isfile(os.path.join(scriptdir,"config.yml")): - with open(os.path.join(scriptdir,"config.yml"), "r") as config: - config_data = yaml.load(config) - api_token = config_data["api_token"] +def belay(config): + slack_api = SlackClient(config["api_token"]) + api_test = slack_api.api_call("auth.test") + logger.debug("API Token auth.test results: %s", api_test) + if not api_test["ok"]: + raise ValueError("API Token is invalid.") + api_user = slack_api.api_call("users.info", user=api_test["user_id"]) + logger.debug("User info for API token user: %s", api_user) + if api_user["user"]["is_bot"]: + raise ValueError("API Token is a bot token. This will not work.") + if "bot_token" in config: + slack_bot = SlackClient(config["bot_token"]) + bot_test = slack_bot.api_call("auth.test") + logger.debug("Bot Token auth.test results: %s", bot_test) + if not bot_test["ok"]: + raise ValueError("Bot Token is invalid.") + bot_user = slack_bot.api_call("users.info", user=bot_test["user_id"]) + logger.debug("User info for Bot token user: %s", bot_user) + if not bot_user["user"]["is_bot"]: + raise ValueError("Bot Token does not correspond to a bot user.") + else: + slack_bot = slack_api + integration_issues = check_integrations(slack_api, config) + if integration_issues: + logger.info("Found the following integration issues: %s", integration_issues) + notify_problems(integration_issues, config, slack_bot, "Problem Integrations:", "integration_name", "date") + else: + logger.info("No integration issues found.") + user_issues = check_users(slack_api, config) + if user_issues: + logger.info("Found the following user issues: %s", user_issues) + notify_problems(user_issues, config, slack_bot, "Problem Users:", "name", "name") + else: + logger.info("No user issues found.") +def check_integrations(api_client, config): + if "skip_integrations" in config and config["skip_integrations"]: + return {} + result = api_client.api_call("team.integrationLogs") + logger.debug("Integration log results: %s", result) + if not result["ok"]: + raise RuntimeError("API Call encountered an error while getting initial integrations: "+unicode(result)) + iLogs = result["logs"] + if result["paging"]["pages"] > 1: + logger.info("Multiple pages of integration logs exist. Pulling remaining %s pages...", result["paging"]["pages"] - 1) + while result["paging"]["page"] < result["paging"]["pages"]: + nextPage = result["paging"]["page"] + 1 + logger.info("Pulling page %s.", nextPage) + result = api_client.api_call("team.integrationLogs", page=nextPage) + logger.debug("Page %s: %s", nextPage, result) + if not result["ok"]: + raise RuntimeError("API Call encountered an error while getting additional integrations: "+unicode(result)) + iLogs.extend(result["logs"]) + integrations = {} + if "integration_whitelist" in config: + integration_whitelist = config["integration_whitelist"] + else: + integration_whitelist = {} + for log in iLogs: + if log["change_type"] in ["added", "enabled", "updated", "expanded", "reissued"]: + status = "active" + elif log["change_type"] == "removed": + status = "removed" + elif log["change_type"] == "disabled": + status = "disabled" + else: + status = "unknown" + logger.warn("Unknown change type (%s) in integration log: %s", log["change_type"], log) + if "scope" in log and log["scope"]: + scopes = log["scope"].split(",") + else: + scopes = [] + if "app_id" in log: + int_id = log["app_id"] + int_name = log["app_type"] + elif "service_id" in log: + int_id = log["service_id"] + int_name = log["service_type"] + elif log["user_id"] == 0 and log["change_type"] == "removed": + # No idea what these are, but they don't have any useful fields, so skip them. + continue + else: + logger.warn("Unknown integration type: %s", log) + continue + if int_id not in integrations: + integrations[int_id] = {"integration_name": int_name, "app_id": int_id, "status": status, "scopes": scopes, "user_id": log["user_id"], "user_name": log["user_name"], "date": log["date"]} + if "reason" in log: + integrations[int_id]["reason"] = log["reason"] + if "channel" in log: + integrations[int_id]["channel"] = log["channel"] + problem_integrations = [] + if "integration_issue_whitelist" in config: + global_whitelist = config["integration_issue_whitelist"] + else: + global_whitelist = [] + for int_id in integrations: + integration = integrations[int_id] + if integration["status"] == "removed": + continue + if int_id in integration_whitelist: + issue_whitelist = global_whitelist.extend(integration_whitelist[int_id]) + else: + issue_whitelist = global_whitelist + problems = [] + logger.debug("Checking for issues with integration: %s", integration) + if "MAX" in integration["scopes"] and "legacy" not in issue_whitelist: + problems.append("Legacy integration with full access to act as the user") + if "admin" in integration["scopes"] and "admin" not in issue_whitelist: + problems.append("Admin permission") + if "chat:write:user" in integration["scopes"] and "chat:write:user" not in issue_whitelist: + problems.append("Can chat as user") + if "channels:history" in integration["scopes"] and "channels:history" not in issue_whitelist: + problems.append("Can access channel history for public channels") + if "files:read" in integration["scopes"] and "files:read" not in issue_whitelist: + problems.append("Can read uploaded files") + if "files:write:user" in integration["scopes"] and "files:write:user" not in issue_whitelist: + problems.append("Can modify/delete existing files") + if "groups:history" in integration["scopes"] and "groups:history" not in issue_whitelist: + problems.append("Can access channel history for private channels") + if "im:history" in integration["scopes"] and "im:history" not in issue_whitelist: + problems.append("Can access channel history for private IMs") + if "mpim:history" in integration["scopes"] and "mpim:history" not in issue_whitelist: + problems.append("Can access channel history for multi-party IMs") + if "pins:read" in integration["scopes"] and "pins:read" not in issue_whitelist: + problems.append("Can access channel pinned messages/files") + if "search:read" in integration["scopes"] and "search:read" not in issue_whitelist: + problems.append("Can search team files and messages") + if problems: + integration["problems"] = problems + problem_integrations.append(integration) + return problem_integrations -if not api_token: - print "ERROR: API token not found. Exiting." - sys.exit(-1) +def check_users(api_client, config): + result = api_client.api_call("users.list", presence=False, limit=100) + logger.debug("User list results: %s", result) + if not result["ok"]: + raise RuntimeError("API Call encountered an error while getting initial user list: "+unicode(result)) + users = result["members"] + while "next_cursor" in result["response_metadata"] and result["response_metadata"]["next_cursor"]: + logger.info("Further pages of users exist. Pulling next page...") + result = api_client.api_call("users.list", presence=False, limit=100, cursor=result["response_metadata"]["next_cursor"]) + logger.debug("User list results: %s", result) + if not result["ok"]: + raise RuntimeError("API Call encountered an error while getting additional user list: "+unicode(result)) + users.extend(result["members"]) + problem_users = [] + retained_keys = ["real_name", "id", "team_id", "name", "problems", "has_2fa", "two_factor_type", "updated", "is_owner", "is_admin"] + if "user_whitelist" in config: + user_whitelist = config["user_whitelist"] + else: + user_whitelist = {} + if "user_issue_whitelist" in config: + global_whitelist = config["user_issue_whitelist"] + else: + global_whitelist = [] + for user in users: + logger.debug("Checking for issues with user: %s", user) + if user["id"] == "USLACKBOT": + # Special case Slackbot, since it lacks the fields of other users/bots + continue + if user["deleted"] or user["is_bot"]: + continue + if user["id"] in user_whitelist: + issue_whitelist = global_whitelist.extend(user_whitelist[user["id"]]) + else: + issue_whitelist = global_whitelist + if not user["has_2fa"] and "2fa" not in issue_whitelist: + user["problems"] = ["User does not have 2FA enabled"] + if user["has_2fa"] and user["two_factor_type"] == "sms" and "sms" not in issue_whitelist: + user["problems"] = ["User is using less-secure SMS-based 2FA"] + if "problems" in user: + problem_user = {k:v for k,v in user.iteritems() if k in retained_keys} + problem_users.append(problem_user) + return problem_users -# Now to use that token on the API -slack = SlackClient(api_token) -result = slack.api_call("team.integrationLogs") -pp = pprint.PrettyPrinter(indent=4) -pp.pprint(result) + +def notify_problems(problems, config, slack_bot, heading="Problems:", item_name="name", sort_field=None): + indent = "\t" + if "output_channel" in config: + indent = "\t\t" + if sort_field: + problems = sorted(problems, key=lambda k: k[sort_field]) + prob_strings = [] + for problem in problems: + fmt_problem = problem[item_name]+": "+", ".join(problem["problems"]) + for item in sorted(problem.items()): + if not hasattr(item[1], "strip") and (hasattr(item[1], "__getitem__") or hasattr(item[1], "__iter__")): + val = ", ".join(item[1]) + else: + val = unicode(item[1]) + fmt_problem= fmt_problem+"\n"+indent+item[0]+": "+val + prob_strings.append(fmt_problem) + formatted_problems = "\n\n".join(prob_strings) + if "output_channel" in config: + if len(formatted_problems) < 3000: + attachment = [{ + "fallback": heading+"\n"+formatted_problems, + "title": heading, + "text": formatted_problems, + "color": "#ffe600" + }] + result = slack_bot.api_call("chat.postMessage", channel=config["output_channel"], attachments=attachment, as_user=True) + else: + # For bigger files we use a snippet. This has a 1MB limit. If you have more problems... well add that to the list. + result = slack_bot.api_call("files.upload", content=formatted_problems, title=heading, filetype="text") + logger.info("Message too large. Uploaded as file: %s", result) + if not result["ok"]: + raise RuntimeError("API Call encountered an error while uploading file: "+unicode(result)) + result = slack_bot.api_call("chat.postMessage", channel=config["output_channel"], text="*"+heading+"*\n\nToo many issues to post directly.\nSee <"+result["file"]["url_private"]+"|the uploaded file> for more information.", unfurl_links=True, as_user=True) + logger.info("Message posted. Result: %s", result) + if not result["ok"]: + raise RuntimeError("API Call encountered an error while posting message: "+unicode(result)) + else: + print heading + print "" + print formatted_problems + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(prog="Belay", description="Check the security of your Slack.") + parser.add_argument("-c", "--config", default=None, help="Non-standard location of a configuration file.") + parser.add_argument("-f", "--file", default=None, help="File to redirect log output into.") + parser.add_argument("-t", "--team", default=None, help="Team to check for multi-team configs.") + parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase log verbosity level. (Default level: WARN, use twice for DEBUG)") + parser.add_argument("-V", "--version", action="version", version="%(prog)s "+program_version, help="Display version information and exit.") + args = parser.parse_args() + loglevel = max(10, 30 - (args.verbose * 10)) + logformat = '%(asctime)s %(levelname)s: %(message)s' + if args.file: + logging.basicConfig(filename=args.file, level=loglevel, format=logformat) + else: + logging.basicConfig(level=loglevel, format=logformat) + + logger.info("Belaying...") +# try: + config = load_config(args.config, args.team) + belay(config) +# except Exception as e: +# sys.exit(e) +# finally: + logging.shutdown() diff --git a/example.yml b/example.yml new file mode 100644 index 0000000..df09810 --- /dev/null +++ b/example.yml @@ -0,0 +1,12 @@ +teams: + Team1: + api_token: "xoxp-12345-124-123455" + bot_token: "xoxb-98765-999-999999" + skip_integrations: False + integration_whitelist: + A2: + - "admin" + - "chat:write:user" + user_issue_whitelist: + - "sms" + output_channel: "@bob"