Browse Source

Updated for initial release. Added docs, integration checks, and user checks.

master
Bob 8 years ago
parent
commit
8a0499a5ac
  1. 97
      README.md
  2. 322
      belay.py
  3. 12
      example.yml

97
README.md

@ -1,3 +1,98 @@
# belay
A simple python utility for checking up on your Slack organization.
A simple python utility for checking up on your Slack organization.
## Setup
While belay can utilize legacy tokens with the `MAX` permission scope, stored in plaintext, and with everything installed system-wide, this goes against modern security and system administration practices. Instead of doing that, below we will go into depth to describe how to set up `belay` on a standard unix-like system.
### Python Environment
Depending on your choice of system, please use the package manager of your choice to install `python-2.7`, `pip`, and `virtualenv`. Now, selecting a directory that can host executable files, create a virtualenv for belay to run from
> $ virtualenv belayenv
Once that is complete, move into the new `belayenv` directory and activate your new virtual environment:
> $ . ./bin/activate
Now that we're in the environment, please download `belay` from your choice of source. For example:
> $(belayenv) git clone git@github.com:bobthesecurityguy/belay.git
Now move into that new directory and install `belay`'s (relatively short) list of requirements:
> $(belayenv) pip install -r requirements.txt
On certain systems, installing these dependencies from `pip` may fail. In that case, check your package manager for pre-built packages under that name and then re-run the above command until it succeeds.
### API Tokens
To create the required API tokens, direct your browser to the [Slack API Apps list](https://api.slack.com/apps) and (once signed-in to the appropriate team) click on the "Create New App" button. This should present you with a menu that allows you to set a name for the app (we recommend "Belay") and select the team to enable it on.
Once you enter that information, you will be taken to the detailed settings for your new app. Feel free to set the "Display Information" in a way that makes sense to you and ignore the "App Credentials" presented. Move down to "Bot Users", enable a bot user with the name of your choice, and then move back up to "OAuth & Permissions".
In the OAuth section select the following permissions. In an effort to make you a bit more comfortable with granting these, each will be listed with a full description of what we use it for below:
> admin - Used to access integration logs.
> bot - Used to act as the bot user.
> chat:write:bot - Used to send messages as our bot.
> files:write:user - Used to upload messages as files if too large.
> users:read - Used to access the user list.
Once you have granted those permissions, install the app and take note of the OAuth tokens that have been generated for you.
### Config File
The `belay` config file is a YAML config file that allows you to configure most settings within the script. Some settings are available at the command line (mostly runtime options like verbosity and log output file) and some can be pulled in from the environment (the OAuth tokens). (Note that securely getting those tokens into the environment without placing them on disk is outside the scope of this document. For simplicity, this description will assume that they are in the config file.)
The config file can be placed anywhere, but if a location is not given at runtime, the script will default to looking in `~/.config/belay/config.yml` or `./config.yml` in that order for the config. If the OAuth tokens are in the environment, it is possible to run without a config file, but most uses will want to set one up.
The format of the config file is standard YAML formatting. If only one team is to be audited, the top-level "teams" dictionary may be omitted as may the outer wrapper dictionary for the team in question.
Within a team dictionary, the following keys are valid and may have the following values:
> api\_token - A string token
> bot\_token - A string token
> skip\_integrations - A boolean that skips the audit of active integrations
> integration\_whitelist - A dictionary (key=integration\_id) of lists of issues to ignore
> integration\_issue\_whitelist - A list of issues to ignore
> user\_whitelist - A dictionary (key=id) of lists of issues to ignore
> user\_issue\_whitelist - A list of issues to ignore
> output\_channel - The Slack channel name in which to output our results
Possible values for the issues with integrations are:
> legacy
> admin
> chat:write:user
> channels:history
> files:read
> files:write:user
> groups:history
> im:history
> mpim:history
> pins:read
> search:read
Possible values for the issues with users are:
> 2fa
> sms
An example of how one might construct an actual working configuration from this is included in `example.yml`.
## Output
It is recommended to run Belay with the verbose flag and redirecting output to a file until you are certain that the configuration is correct. This is usually sufficient to identify issues with your configuration. Debug mode (`-vv`) logs an excessive amount of information and may log sensitve information and should be used with care.
Please note that, when interpreting output from this program, some integrations really do need the permissions that we flag as potentially dangerous. Even `belay` needs some of the permissions on the list. The output of this program is not intended as a checklist of integrations for removal. Rather, it is intended to bring to light applications requesting sensitive permissions. If these permissions do not make sense for that application, it may be worth further investigation.
Similarly, the use of SMS-based 2FA in many organizations is not a sufficient risk to justify concerted eradication efforts. Please use reasonable judgment when interpreting the output of `belay`.
## Changelog
* 1.0 - Initial public release
* Checks for integrations with sensitive permissions
* Checks for users with no 2FA or with SMS-based 2FA
* Fully documented

322
belay.py

@ -3,41 +3,307 @@
import sys
import os
import yaml
import logging
import argparse
from slackclient import SlackClient
import pprint
program_version="1.0"
# TODO: Logging Config, prints are just hack-y
logger = logging.getLogger(__name__)
def load_config(conf_path=None, team=None):
# Try the usual places to store an API token
# First token wins
api_token = None
bot_token = None
config_data = {}
homedir = os.path.expanduser("~")
scriptdir = os.path.dirname(os.path.realpath(__file__))
# Try the usual places to store an API token
# First token wins
api_token = None
homedir = os.path.expanduser("~")
scriptdir = os.path.dirname(os.path.realpath(__file__))
# 1. Specified config file
if conf_path:
logger.debug("Skipping standard config locations because config path passed in on the command line.")
# 2. ~/.config/slack/config.yml
elif os.path.isfile(os.path.join(homedir,".config","belay","config.yml")):
conf_path = os.path.join(homedir,".config","belay","config.yml")
logger.info("Found config file in Home Directory: %s", conf_path)
# 3. ./config.yml
elif os.path.isfile(os.path.join(scriptdir,"config.yml")):
conf_path = os.path.join(scriptdir,"config.yml")
logger.info("Found config file in current script directory: %s", conf_path)
# OK, now actually pull values from the configs
if conf_path:
logger.info("Attempting to load config from file.")
with open(conf_path, "r") as config:
config_data = yaml.load(config, Loader=yaml.loader.BaseLoader)
logger.debug("Loaded config: %s", config_data)
if "teams" in config_data:
logger.debug("This is a multi-team config file. Script is looking for team: %s", team)
if not team:
raise ValueError("No team specified with multi-team config. Please indicate which team you'd like to use.")
elif team not in config_data["teams"]:
raise ValueError("Specified team '"+team+"' not present in config file. Valid choices are: "+", ".join(config_data["teams"].keys()))
else:
logger.debug("Team '%s' found. Selecting only data for that team.", team)
config_data = config_data["teams"][team]
# Allow people to set keys in the env if they don't want them in files
# Clear bot if we find API, to try to avoid mixing teams
if "SLACK_API_TOKEN" in os.environ:
config_data.pop("bot_token", 0)
config_data["api_token"] = os.environ["SLACK_API_TOKEN"]
logger.info("Found API token in environment, adding to config from file.")
logger.debug("API Token: %s", api_token)
logger.info("Checking environment for SLACK_BOT_TOKEN variable...")
if "SLACK_BOT_TOKEN" in os.environ:
config_data["bot_token"] = os.environ["SLACK_BOT_TOKEN"]
logger.info("Found bot token in environment, adding to config from file.")
logger.debug("Bot Token: %s", bot_token)
if "api_token" not in config_data:
raise RuntimeError("No API token not found.")
if "bot_token" not in config_data:
logger.warn("No bot token provided. Assuming API token is legacy token. Use of legacy tokens is a potential security issue and we strongly recommend switching to the new token format.")
return config_data
# 1. Environment variables
if "SLACK_API_TOKEN" in os.environ:
api_token = os.environ["SLACK_API_TOKEN"]
# 2. ~/.config/slack/config.yml
elif os.path.isfile(os.path.join(homedir,".config","slack","config.yml")):
with open(os.path.join(homedir,".config","slack","config.yml"), "r") as config:
config_data = yaml.load(config)
api_token = config_data["api_token"]
# 3. ./config.yml
elif os.path.isfile(os.path.join(scriptdir,"config.yml")):
with open(os.path.join(scriptdir,"config.yml"), "r") as config:
config_data = yaml.load(config)
api_token = config_data["api_token"]
def belay(config):
slack_api = SlackClient(config["api_token"])
api_test = slack_api.api_call("auth.test")
logger.debug("API Token auth.test results: %s", api_test)
if not api_test["ok"]:
raise ValueError("API Token is invalid.")
api_user = slack_api.api_call("users.info", user=api_test["user_id"])
logger.debug("User info for API token user: %s", api_user)
if api_user["user"]["is_bot"]:
raise ValueError("API Token is a bot token. This will not work.")
if "bot_token" in config:
slack_bot = SlackClient(config["bot_token"])
bot_test = slack_bot.api_call("auth.test")
logger.debug("Bot Token auth.test results: %s", bot_test)
if not bot_test["ok"]:
raise ValueError("Bot Token is invalid.")
bot_user = slack_bot.api_call("users.info", user=bot_test["user_id"])
logger.debug("User info for Bot token user: %s", bot_user)
if not bot_user["user"]["is_bot"]:
raise ValueError("Bot Token does not correspond to a bot user.")
else:
slack_bot = slack_api
integration_issues = check_integrations(slack_api, config)
if integration_issues:
logger.info("Found the following integration issues: %s", integration_issues)
notify_problems(integration_issues, config, slack_bot, "Problem Integrations:", "integration_name", "date")
else:
logger.info("No integration issues found.")
user_issues = check_users(slack_api, config)
if user_issues:
logger.info("Found the following user issues: %s", user_issues)
notify_problems(user_issues, config, slack_bot, "Problem Users:", "name", "name")
else:
logger.info("No user issues found.")
def check_integrations(api_client, config):
if "skip_integrations" in config and config["skip_integrations"]:
return {}
result = api_client.api_call("team.integrationLogs")
logger.debug("Integration log results: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while getting initial integrations: "+unicode(result))
iLogs = result["logs"]
if result["paging"]["pages"] > 1:
logger.info("Multiple pages of integration logs exist. Pulling remaining %s pages...", result["paging"]["pages"] - 1)
while result["paging"]["page"] < result["paging"]["pages"]:
nextPage = result["paging"]["page"] + 1
logger.info("Pulling page %s.", nextPage)
result = api_client.api_call("team.integrationLogs", page=nextPage)
logger.debug("Page %s: %s", nextPage, result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while getting additional integrations: "+unicode(result))
iLogs.extend(result["logs"])
integrations = {}
if "integration_whitelist" in config:
integration_whitelist = config["integration_whitelist"]
else:
integration_whitelist = {}
for log in iLogs:
if log["change_type"] in ["added", "enabled", "updated", "expanded", "reissued"]:
status = "active"
elif log["change_type"] == "removed":
status = "removed"
elif log["change_type"] == "disabled":
status = "disabled"
else:
status = "unknown"
logger.warn("Unknown change type (%s) in integration log: %s", log["change_type"], log)
if "scope" in log and log["scope"]:
scopes = log["scope"].split(",")
else:
scopes = []
if "app_id" in log:
int_id = log["app_id"]
int_name = log["app_type"]
elif "service_id" in log:
int_id = log["service_id"]
int_name = log["service_type"]
elif log["user_id"] == 0 and log["change_type"] == "removed":
# No idea what these are, but they don't have any useful fields, so skip them.
continue
else:
logger.warn("Unknown integration type: %s", log)
continue
if int_id not in integrations:
integrations[int_id] = {"integration_name": int_name, "app_id": int_id, "status": status, "scopes": scopes, "user_id": log["user_id"], "user_name": log["user_name"], "date": log["date"]}
if "reason" in log:
integrations[int_id]["reason"] = log["reason"]
if "channel" in log:
integrations[int_id]["channel"] = log["channel"]
problem_integrations = []
if "integration_issue_whitelist" in config:
global_whitelist = config["integration_issue_whitelist"]
else:
global_whitelist = []
for int_id in integrations:
integration = integrations[int_id]
if integration["status"] == "removed":
continue
if int_id in integration_whitelist:
issue_whitelist = global_whitelist.extend(integration_whitelist[int_id])
else:
issue_whitelist = global_whitelist
problems = []
logger.debug("Checking for issues with integration: %s", integration)
if "MAX" in integration["scopes"] and "legacy" not in issue_whitelist:
problems.append("Legacy integration with full access to act as the user")
if "admin" in integration["scopes"] and "admin" not in issue_whitelist:
problems.append("Admin permission")
if "chat:write:user" in integration["scopes"] and "chat:write:user" not in issue_whitelist:
problems.append("Can chat as user")
if "channels:history" in integration["scopes"] and "channels:history" not in issue_whitelist:
problems.append("Can access channel history for public channels")
if "files:read" in integration["scopes"] and "files:read" not in issue_whitelist:
problems.append("Can read uploaded files")
if "files:write:user" in integration["scopes"] and "files:write:user" not in issue_whitelist:
problems.append("Can modify/delete existing files")
if "groups:history" in integration["scopes"] and "groups:history" not in issue_whitelist:
problems.append("Can access channel history for private channels")
if "im:history" in integration["scopes"] and "im:history" not in issue_whitelist:
problems.append("Can access channel history for private IMs")
if "mpim:history" in integration["scopes"] and "mpim:history" not in issue_whitelist:
problems.append("Can access channel history for multi-party IMs")
if "pins:read" in integration["scopes"] and "pins:read" not in issue_whitelist:
problems.append("Can access channel pinned messages/files")
if "search:read" in integration["scopes"] and "search:read" not in issue_whitelist:
problems.append("Can search team files and messages")
if problems:
integration["problems"] = problems
problem_integrations.append(integration)
return problem_integrations
if not api_token:
print "ERROR: API token not found. Exiting."
sys.exit(-1)
def check_users(api_client, config):
result = api_client.api_call("users.list", presence=False, limit=100)
logger.debug("User list results: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while getting initial user list: "+unicode(result))
users = result["members"]
while "next_cursor" in result["response_metadata"] and result["response_metadata"]["next_cursor"]:
logger.info("Further pages of users exist. Pulling next page...")
result = api_client.api_call("users.list", presence=False, limit=100, cursor=result["response_metadata"]["next_cursor"])
logger.debug("User list results: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while getting additional user list: "+unicode(result))
users.extend(result["members"])
problem_users = []
retained_keys = ["real_name", "id", "team_id", "name", "problems", "has_2fa", "two_factor_type", "updated", "is_owner", "is_admin"]
if "user_whitelist" in config:
user_whitelist = config["user_whitelist"]
else:
user_whitelist = {}
if "user_issue_whitelist" in config:
global_whitelist = config["user_issue_whitelist"]
else:
global_whitelist = []
for user in users:
logger.debug("Checking for issues with user: %s", user)
if user["id"] == "USLACKBOT":
# Special case Slackbot, since it lacks the fields of other users/bots
continue
if user["deleted"] or user["is_bot"]:
continue
if user["id"] in user_whitelist:
issue_whitelist = global_whitelist.extend(user_whitelist[user["id"]])
else:
issue_whitelist = global_whitelist
if not user["has_2fa"] and "2fa" not in issue_whitelist:
user["problems"] = ["User does not have 2FA enabled"]
if user["has_2fa"] and user["two_factor_type"] == "sms" and "sms" not in issue_whitelist:
user["problems"] = ["User is using less-secure SMS-based 2FA"]
if "problems" in user:
problem_user = {k:v for k,v in user.iteritems() if k in retained_keys}
problem_users.append(problem_user)
return problem_users
# Now to use that token on the API
slack = SlackClient(api_token)
result = slack.api_call("team.integrationLogs")
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(result)
def notify_problems(problems, config, slack_bot, heading="Problems:", item_name="name", sort_field=None):
indent = "\t"
if "output_channel" in config:
indent = "\t\t"
if sort_field:
problems = sorted(problems, key=lambda k: k[sort_field])
prob_strings = []
for problem in problems:
fmt_problem = problem[item_name]+": "+", ".join(problem["problems"])
for item in sorted(problem.items()):
if not hasattr(item[1], "strip") and (hasattr(item[1], "__getitem__") or hasattr(item[1], "__iter__")):
val = ", ".join(item[1])
else:
val = unicode(item[1])
fmt_problem= fmt_problem+"\n"+indent+item[0]+": "+val
prob_strings.append(fmt_problem)
formatted_problems = "\n\n".join(prob_strings)
if "output_channel" in config:
if len(formatted_problems) < 3000:
attachment = [{
"fallback": heading+"\n"+formatted_problems,
"title": heading,
"text": formatted_problems,
"color": "#ffe600"
}]
result = slack_bot.api_call("chat.postMessage", channel=config["output_channel"], attachments=attachment, as_user=True)
else:
# For bigger files we use a snippet. This has a 1MB limit. If you have more problems... well add that to the list.
result = slack_bot.api_call("files.upload", content=formatted_problems, title=heading, filetype="text")
logger.info("Message too large. Uploaded as file: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while uploading file: "+unicode(result))
result = slack_bot.api_call("chat.postMessage", channel=config["output_channel"], text="*"+heading+"*\n\nToo many issues to post directly.\nSee <"+result["file"]["url_private"]+"|the uploaded file> for more information.", unfurl_links=True, as_user=True)
logger.info("Message posted. Result: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while posting message: "+unicode(result))
else:
print heading
print ""
print formatted_problems
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog="Belay", description="Check the security of your Slack.")
parser.add_argument("-c", "--config", default=None, help="Non-standard location of a configuration file.")
parser.add_argument("-f", "--file", default=None, help="File to redirect log output into.")
parser.add_argument("-t", "--team", default=None, help="Team to check for multi-team configs.")
parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase log verbosity level. (Default level: WARN, use twice for DEBUG)")
parser.add_argument("-V", "--version", action="version", version="%(prog)s "+program_version, help="Display version information and exit.")
args = parser.parse_args()
loglevel = max(10, 30 - (args.verbose * 10))
logformat = '%(asctime)s %(levelname)s: %(message)s'
if args.file:
logging.basicConfig(filename=args.file, level=loglevel, format=logformat)
else:
logging.basicConfig(level=loglevel, format=logformat)
logger.info("Belaying...")
# try:
config = load_config(args.config, args.team)
belay(config)
# except Exception as e:
# sys.exit(e)
# finally:
logging.shutdown()

12
example.yml

@ -0,0 +1,12 @@
teams:
Team1:
api_token: "xoxp-12345-124-123455"
bot_token: "xoxb-98765-999-999999"
skip_integrations: False
integration_whitelist:
A2:
- "admin"
- "chat:write:user"
user_issue_whitelist:
- "sms"
output_channel: "@bob"
Loading…
Cancel
Save