Browse Source

Did some linting.

master
Bob 8 years ago
parent
commit
0e96d429aa
  1. 4
      .flake8
  2. 2
      README.md
  3. 104
      belay.py

4
.flake8

@ -0,0 +1,4 @@
[flake8]
exclude = .git
# 79 character limit is impractical with a 4 space indent...
ignore = E501

2
README.md

@ -1,6 +1,6 @@
# belay
A simple python utility for checking up on your Slack organization.
A simple python utility for checking up on your Slack organization. It's sort of like AWS Config Audit, but for a Slack team.
## Setup

104
belay.py

@ -1,7 +1,7 @@
#!/usr/bin/env python
import sys
import os
import sys
import yaml
import logging
import argparse
@ -11,6 +11,7 @@ program_version="1.0"
logger = logging.getLogger(__name__)
def load_config(conf_path=None, team=None):
# Try the usual places to store an API token
# First token wins
@ -22,7 +23,7 @@ def load_config(conf_path=None, team=None):
# 1. Specified config file
if conf_path:
logger.debug("Skipping standard config locations because config path passed in on the command line.")
logger.debug("Skipping standard locations because config path passed on the command line.")
# 2. ~/.config/slack/config.yml
elif os.path.isfile(os.path.join(homedir, ".config", "belay", "config.yml")):
conf_path = os.path.join(homedir, ".config", "belay", "config.yml")
@ -40,9 +41,12 @@ def load_config(conf_path=None, team=None):
if "teams" in config_data:
logger.debug("This is a multi-team config file. Script is looking for team: %s", team)
if not team:
raise ValueError("No team specified with multi-team config. Please indicate which team you'd like to use.")
raise ValueError("No team specified with multi-team config. " +
"Please indicate which team you'd like to use.")
elif team not in config_data["teams"]:
raise ValueError("Specified team '"+team+"' not present in config file. Valid choices are: "+", ".join(config_data["teams"].keys()))
raise ValueError("Specified team '" + team +
"' not present in config file. Valid choices are: " +
", ".join(config_data["teams"].keys()))
else:
logger.debug("Team '%s' found. Selecting only data for that team.", team)
config_data = config_data["teams"][team]
@ -62,9 +66,12 @@ def load_config(conf_path=None, team=None):
if "api_token" not in config_data:
raise RuntimeError("No API token not found.")
if "bot_token" not in config_data:
logger.warn("No bot token provided. Assuming API token is legacy token. Use of legacy tokens is a potential security issue and we strongly recommend switching to the new token format.")
logger.warn("No bot token provided. Assuming API token is legacy token. " +
"Use of legacy tokens is a potential security issue and we strongly " +
"recommend switching to the new token format.")
return config_data
def belay(config):
slack_api = SlackClient(config["api_token"])
api_test = slack_api.api_call("auth.test")
@ -90,7 +97,8 @@ def belay(config):
integration_issues = check_integrations(slack_api, config)
if integration_issues:
logger.info("Found the following integration issues: %s", integration_issues)
notify_problems(integration_issues, config, slack_bot, "Problem Integrations:", "integration_name", "date")
notify_problems(integration_issues, config, slack_bot,
"Problem Integrations:", "integration_name", "date")
else:
logger.info("No integration issues found.")
user_issues = check_users(slack_api, config)
@ -100,23 +108,27 @@ def belay(config):
else:
logger.info("No user issues found.")
def check_integrations(api_client, config):
if "skip_integrations" in config and config["skip_integrations"]:
return {}
result = api_client.api_call("team.integrationLogs")
logger.debug("Integration log results: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while getting initial integrations: "+unicode(result))
raise RuntimeError("API Call encountered an error while getting initial integrations: " +
unicode(result))
iLogs = result["logs"]
if result["paging"]["pages"] > 1:
logger.info("Multiple pages of integration logs exist. Pulling remaining %s pages...", result["paging"]["pages"] - 1)
logger.info("Multiple pages of integration logs exist. Pulling remaining %s pages...",
result["paging"]["pages"] - 1)
while result["paging"]["page"] < result["paging"]["pages"]:
nextPage = result["paging"]["page"] + 1
logger.info("Pulling page %s.", nextPage)
result = api_client.api_call("team.integrationLogs", page=nextPage)
logger.debug("Page %s: %s", nextPage, result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while getting additional integrations: "+unicode(result))
raise RuntimeError("API Call encountered an error while getting more integrations: " +
unicode(result))
iLogs.extend(result["logs"])
integrations = {}
if "integration_whitelist" in config:
@ -150,7 +162,9 @@ def check_integrations(api_client, config):
logger.warn("Unknown integration type: %s", log)
continue
if int_id not in integrations:
integrations[int_id] = {"integration_name": int_name, "app_id": int_id, "status": status, "scopes": scopes, "user_id": log["user_id"], "user_name": log["user_name"], "date": log["date"]}
integrations[int_id] = {"integration_name": int_name, "app_id": int_id,
"status": status, "scopes": scopes, "user_id": log["user_id"],
"user_name": log["user_name"], "date": log["date"]}
if "reason" in log:
integrations[int_id]["reason"] = log["reason"]
if "channel" in log:
@ -197,21 +211,26 @@ def check_integrations(api_client, config):
problem_integrations.append(integration)
return problem_integrations
def check_users(api_client, config):
result = api_client.api_call("users.list", presence=False, limit=100)
logger.debug("User list results: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while getting initial user list: "+unicode(result))
raise RuntimeError("API Call encountered an error while getting initial user list: " +
unicode(result))
users = result["members"]
while "next_cursor" in result["response_metadata"] and result["response_metadata"]["next_cursor"]:
logger.info("Further pages of users exist. Pulling next page...")
result = api_client.api_call("users.list", presence=False, limit=100, cursor=result["response_metadata"]["next_cursor"])
result = api_client.api_call("users.list", presence=False, limit=100,
cursor=result["response_metadata"]["next_cursor"])
logger.debug("User list results: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while getting additional user list: "+unicode(result))
raise RuntimeError("API Call encountered an error while getting additional users: " +
unicode(result))
users.extend(result["members"])
problem_users = []
retained_keys = ["real_name", "id", "team_id", "name", "problems", "has_2fa", "two_factor_type", "updated", "is_owner", "is_admin"]
retained_keys = ["real_name", "id", "team_id", "name", "problems",
"has_2fa", "two_factor_type", "updated", "is_owner", "is_admin"]
if "user_whitelist" in config:
user_whitelist = config["user_whitelist"]
else:
@ -241,8 +260,8 @@ def check_users(api_client, config):
return problem_users
def notify_problems(problems, config, slack_bot, heading="Problems:", item_name="name", sort_field=None):
def notify_problems(problems, config, slack_bot, heading="Problems:",
item_name="name", sort_field=None):
indent = "\t"
if "output_channel" in config:
indent = "\t\t"
@ -252,7 +271,8 @@ def notify_problems(problems, config, slack_bot, heading="Problems:", item_name=
for problem in problems:
fmt_problem = problem[item_name] + ": " + ", ".join(problem["problems"])
for item in sorted(problem.items()):
if not hasattr(item[1], "strip") and (hasattr(item[1], "__getitem__") or hasattr(item[1], "__iter__")):
if not hasattr(item[1], "strip") and \
(hasattr(item[1], "__getitem__") or hasattr(item[1], "__iter__")):
val = ", ".join(item[1])
else:
val = unicode(item[1])
@ -267,17 +287,27 @@ def notify_problems(problems, config, slack_bot, heading="Problems:", item_name=
"text": formatted_problems,
"color": "#ffe600"
}]
result = slack_bot.api_call("chat.postMessage", channel=config["output_channel"], attachments=attachment, as_user=True)
result = slack_bot.api_call("chat.postMessage",
channel=config["output_channel"],
attachments=attachment, as_user=True)
else:
# For bigger files we use a snippet. This has a 1MB limit. If you have more problems... well add that to the list.
result = slack_bot.api_call("files.upload", content=formatted_problems, title=heading, filetype="text")
# For bigger files we use a snippet. This has a 1MB limit.
# If you have more problems... well add this to the list.
result = slack_bot.api_call("files.upload",
content=formatted_problems,
title=heading, filetype="text")
logger.info("Message too large. Uploaded as file: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while uploading file: "+unicode(result))
result = slack_bot.api_call("chat.postMessage", channel=config["output_channel"], text="*"+heading+"*\n\nToo many issues to post directly.\nSee <"+result["file"]["url_private"]+"|the uploaded file> for more information.", unfurl_links=True, as_user=True)
raise RuntimeError("API Call encountered an error while uploading file: " +
unicode(result))
msg = "*" + heading + "*\n\nToo many " + "issues to post directly.\nSee <" + \
result["file"]["url_private"] + "|the uploaded file> for details."
result = slack_bot.api_call("chat.postMessage", channel=config["output_channel"],
text=msg, unfurl_links=True, as_user=True)
logger.info("Message posted. Result: %s", result)
if not result["ok"]:
raise RuntimeError("API Call encountered an error while posting message: "+unicode(result))
raise RuntimeError("API Call encountered an error while posting message: " +
unicode(result))
else:
print heading
print ""
@ -285,12 +315,20 @@ def notify_problems(problems, config, slack_bot, heading="Problems:", item_name=
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog="Belay", description="Check the security of your Slack.")
parser.add_argument("-c", "--config", default=None, help="Non-standard location of a configuration file.")
parser.add_argument("-f", "--file", default=None, help="File to redirect log output into.")
parser.add_argument("-t", "--team", default=None, help="Team to check for multi-team configs.")
parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase log verbosity level. (Default level: WARN, use twice for DEBUG)")
parser.add_argument("-V", "--version", action="version", version="%(prog)s "+program_version, help="Display version information and exit.")
parser = argparse.ArgumentParser(prog="Belay",
description="Secure your Slack.")
parser.add_argument("-c", "--config", default=None,
help="Non-standard location of a configuration file.")
parser.add_argument("-f", "--file", default=None,
help="File to redirect log output into.")
parser.add_argument("-t", "--team", default=None,
help="Team to check for multi-team configs.")
parser.add_argument("-v", "--verbose", action="count", default=0,
help="Increase log verbosity level. (Default" +
" level: WARN, use twice for DEBUG)")
parser.add_argument("-V", "--version", action="version",
version="%(prog)s " + program_version,
help="Display version information and exit.")
args = parser.parse_args()
loglevel = max(10, 30 - (args.verbose * 10))
logformat = '%(asctime)s %(levelname)s: %(message)s'
@ -300,10 +338,10 @@ if __name__ == '__main__':
logging.basicConfig(level=loglevel, format=logformat)
logger.info("Belaying...")
# try:
try:
config = load_config(args.config, args.team)
belay(config)
# except Exception as e:
# sys.exit(e)
# finally:
except Exception as e:
sys.exit(e)
finally:
logging.shutdown()

Loading…
Cancel
Save