You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

462 lines
18 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Bogdan Cordier <ooctogene@gmail.com>
# and Matteo Cypriani <mcy@lm7.fr>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
import logging
import sleekxmpp
import datetime
import locale
import dataset
import configargparse
import getpass
import os
import random
import sqlalchemy
import xdg.BaseDirectory
import pathlib
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
default_vocabulary = {
'help': ["My vocabulary empty, I can't help you."],
'empty_log': ["No log for you."],
'gossips': ["{nick} is reading the back log."],
'greetings': ["/me is here!"],
'insults': ['If I had vocabulary, I would insult {nick}.'],
'uptime': ["I'm up for {uptime}."],
'welcome': ["{nick}'s last connection: {date}."],
# Responses to direct messages (not on a MUC):
'refusals': ["I don't accept direct messages. Try on a MUC."],
}
class KaaBot(sleekxmpp.ClientXMPP):
def __init__(self, jid, password, database, muc, nick, vocabulary_file,
welcome):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.muc = muc
self.nick = nick
self.online_timestamp = None
database_path = self.find_database(database, muc)
self.db = dataset.connect('sqlite:///{db}'.format(db=database_path),
engine_kwargs={'connect_args': {
'check_same_thread': False}})
self.vocabulary = self.init_vocabulary(vocabulary_file)
self.welcome = welcome
self.users = self.db['user']
# Initialize table with correct type.
self.users.create_column('nick', sqlalchemy.String)
self.users.create_column('offline_timestamp', sqlalchemy.DateTime)
self.users.create_column('online_timestamp', sqlalchemy.DateTime)
self.muc_log = self.db['muc_log']
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
self.add_event_handler("muc::%s::got_online" % self.muc,
self.muc_online)
self.add_event_handler("muc::%s::got_offline" % self.muc,
self.muc_offline)
@staticmethod
def find_database(database, muc):
"""Returns the path to the database to use for the given MUC.
If `database` is empty, the file name is generated from the MUC's name
`muc` in the first "kaabot" XDG data dir (usually
`$HOME/.local/share/kaabot/`). If the XDG data dir can't be located, the
database is created/open in the work directory.
If it contains a value, it is assumed to be the path to the database. If
the name contains the string "{muc}", it will be substituted with the
MUC's name.
The returned path may or may not exist in the file system.
"""
if database:
return database.format(muc=muc)
data_dir = xdg.BaseDirectory.save_data_path("kaabot")
database = "{muc}.db".format(muc=muc)
return os.path.join(data_dir, database)
@staticmethod
def init_vocabulary(vocabulary_file):
"""Reads the vocabulary from a JSON file.
If vocabulary_file is empty (i.e. the user didn't use the --vocabulary
option), a file named "vocabulary.json" is searched in the first
existing XDG "kaabot" config path, in order of preference (usually
$HOME/.config/kaabot/, then /etc/xdg/kaabot/).
If vocabulary_file contains a value, it is considered to be the path to
a valid vocabulary file.
Error handling:
- If the user-specified file can't be opened, the program will crash.
- Ditto if the XDG-found file exists but can't be opened.
- If the XDG directory cannot be detected, "vocabulary.json" is searched
in the work directory.
- In case of parsing error (the file exists but is invalid JSON),
minimalistic vocabulary is set.
- Ditto if the user didn't use --vocabulary and no vocabulary file is
found in the XDG config path.
"""
if not vocabulary_file:
config_dir = xdg.BaseDirectory.load_first_config("kaabot")
full_path = os.path.join(config_dir, "vocabulary.json")
if os.path.exists(full_path):
vocabulary_file = full_path
if vocabulary_file:
return KaaBot.read_vocabulary_file(vocabulary_file)
else:
return default_vocabulary
@staticmethod
def read_vocabulary_file(vocabulary_file):
"""Actually read and parse the vocabulary file.
"""
try:
fd = open(vocabulary_file, encoding='UTF-8')
except IOError:
logging.error("Can't open vocabulary file {filename}!"
.format(filename=vocabulary_file))
raise
try:
vocabulary = json.load(fd)
fd.close()
except ValueError: # json.JSONDecodeError in Python >= 3.5
logging.warning(("Invalid JSON vocabulary file '{filename}'. "
"Minimal vocabulary will be set.")
.format(filename=vocabulary_file))
vocabulary = default_vocabulary
return vocabulary
def session_start(self, event):
self.send_presence()
self.get_roster()
self.plugin['xep_0045'].joinMUC(self.muc, self.nick, wait=True)
self.plugin['xep_0172'].publish_nick(self.nick)
def message(self, msg):
"""Handles incoming messages.
"""
# Private message
if msg['type'] in ('chat', 'normal'):
# Don't accept private messages unless they are initiated from a MUC
if msg['from'].bare != self.muc:
msg.reply(self.pick_sentence('refusals')).send()
return
# Message's author info
dest = msg['from']
nick = msg['from'].resource
command = msg['body'].strip()
self.parse_command(command, nick, dest, priv=True)
# Public (MUC) message
elif msg['type'] in ('groupchat'):
# Message's author info
dest = msg['from']
nick = msg['mucnick']
# Insert message in database with timestamp
self.muc_log.insert(dict(datetime=datetime.datetime.now(),
msg=msg['body'], user=nick))
# Stop dealing with this message if we sent it
if msg['mucnick'] == self.nick:
return
splitbody = msg['body'].split(sep=self.nick, maxsplit=1)
# The message starts or ends with the bot's nick
if len(splitbody) == 2:
if splitbody[1]:
# Bot's nick is at the beginning
command = splitbody[1]
else:
# Bot's nick is at the end
command = splitbody[0]
command = command.lstrip('\t :, ').rstrip()
self.parse_command(command, nick, dest)
# The bot's nick was used in the middle of a message
elif self.nick in msg['body']:
self.send_insult(nick, dest.bare)
def parse_command(self, command, nick, dest, priv=False):
"""Parses a command sent by dest (nick).
`priv` should be True if the bot was contacted through a private
message, False if analysing a public message. In any case, the bot may
report publicly information about the commands processed.
"""
if not command: # original message was just the bot's name
self.send_help(dest)
elif command in ['log', 'histo']:
self.send_log(nick, dest, echo=priv)
elif command in ['help', 'aide']:
self.send_help(dest)
elif command in ['uptime']:
self.send_uptime(dest, priv)
else:
self.send_insult(nick, dest.bare)
def send_help(self, dest):
"""Sends help messages to 'dest'.
"""
mbody = '\n '.join(self.vocabulary['help'])
self.send_message(mto=dest,
mbody=mbody,
mtype='chat')
def send_log(self, nick, dest, echo=False):
"""Look up backlog for 'nick' and send it to 'dest'.
"""
if echo:
gossip = self.pick_sentence('gossips').format(nick=nick)
self.send_message(mto=dest.bare,
mbody=gossip,
mtype='groupchat')
# Get offline timestamp from database and check if it exists.
offline_timestamp = self.users.find_one(nick=nick)['offline_timestamp']
if not offline_timestamp:
logging.debug(('KaaBot : No offline'
' timestamp for {nick}.').format(nick=nick))
self.send_empty_log(dest)
return
else:
logging.debug(
('KaaBot : {nick} '
'last seen on {date}').format(nick=nick,
date=offline_timestamp))
# Get online timestamp from database.
online_timestamp = self.users.find_one(nick=nick)['online_timestamp']
logging.debug(('KaaBot : {nick} last'
' connection on {date}').format(nick=nick,
date=online_timestamp))
# Since filtered log is a generator we can't know in advance if
# it will be empty. Creating filtered_log_empty allows us to act on
# this event later.
filtered_log_empty = True
filtered_log = (log for log in self.muc_log if
offline_timestamp < log['datetime'] < online_timestamp)
for log in filtered_log:
filtered_log_empty = False
log_message = "[{:%H:%M}] {}: {}".format(log['datetime'],
log['user'],
log['msg'])
self.send_message(mto=dest,
mbody=log_message,
mtype='chat')
# Send message if filtered_log is still empty.
if filtered_log_empty:
logging.debug('KaaBot : Filtered backlog empty.')
self.send_empty_log(dest)
def send_empty_log(self, dest):
"""Send message if backlog empty.
"""
mbody = self.pick_sentence('empty_log')
self.send_message(mto=dest,
mbody=mbody,
mtype='chat')
def send_uptime(self, dest, priv=False):
"""Sends the uptime to `dest`.
If `priv` is true, the message is sent privately, otherwise it's send on
the MUC.
"""
uptime = str(datetime.datetime.now() - self.online_timestamp)
mbody = self.pick_sentence('uptime').format(uptime=uptime)
if priv:
self.send_message(mto=dest,
mbody=mbody,
mtype='chat')
else:
self.send_message(mto=dest.bare,
mbody=mbody,
mtype='groupchat')
def send_insult(self, nick, dest):
"""Sends an insult about `nick` to `dest`.
"""
insult = self.pick_sentence('insults').format(nick=nick)
self.send_message(mto=dest,
mbody=insult,
mtype='groupchat')
def send_welcome(self, nick, dest, date):
msg = self.pick_sentence('welcome').format(nick=nick, date=date)
self.send_message(mto=dest,
mbody=msg,
mtype='groupchat')
def pick_sentence(self, type):
"""Returns a random sentence picked in the loaded vocabulary.
`type` can be any known category of the vocabulary file, e.g. 'insults'.
No substitution is done to the returned string.
"""
voc = self.vocabulary[type]
i = random.randint(0, len(voc) - 1)
return voc[i]
def muc_online(self, presence):
"""Handles MUC online presence.
On bot connection gets called for each
user in the MUC (bot included).
"""
nick = presence['muc']['nick']
if nick != self.nick:
# Check if nick in database.
if self.users.find_one(nick=nick):
# Update nick online timestamp.
self.users.update(dict(nick=nick,
online_timestamp=datetime.datetime.now()),
['nick'])
# Check if bot is connecting for the first time.
if self.online_timestamp:
try:
user = self.users.find_one(nick=nick)
offline_timestamp = user['offline_timestamp']
date = datetime.datetime.strftime(offline_timestamp,
format="%c")
logging.debug('KaaBot : user {} connected, last seen {}'
.format(nick, date))
if self.welcome:
dest = presence['from'].bare
self.send_welcome(nick, dest, date)
except TypeError:
msg = 'KaaBot : No offline timestamp yet for {nick}'
logging.debug(msg.format(nick=nick))
else:
self.users.insert(dict(nick=nick,
online_timestamp=datetime.datetime.now()))
else:
# Set bot online timestamp.
self.online_timestamp = datetime.datetime.now()
self.send_message(mto=presence['from'].bare,
mbody=self.pick_sentence('greetings'),
mtype='groupchat')
def muc_offline(self, presence):
"""Handles MUC offline presence.
"""
nick = presence['muc']['nick']
if nick != self.nick:
self.users.update(dict(nick=nick,
offline_timestamp=datetime.datetime.now()),
['nick'])
def str_to_bool(text):
"""Converts a string to a boolean.
Raises an exception if the string does not describe a boolean value.
"""
text = text.lower()
if text in ["on", "true", "1"]:
return True
elif text in ["off", "false", "0"]:
return False
raise TypeError
if __name__ == '__main__':
config_dir = xdg.BaseDirectory.save_config_path("kaabot")
config_file = os.path.join(config_dir, 'config')
argp = configargparse.ArgParser(default_config_files=[config_file],
description="Super Simple Silly Bot for Jabber.")
argp.add_argument('-d', '--debug', help="set logging to DEBUG",
action='store_const',
dest='debug', const=logging.DEBUG,
default=logging.INFO)
argp.add_argument("-b", "--database", dest="database",
help="path to an alternative database; the '{muc}' string"
" in the name will be substituted with "
"the MUC's name as provided by the --muc option")
argp.add_argument("-j", "--jid", dest="jid", help="JID to use")
argp.add_argument("-p", "--password", dest="password",
help="password to use")
argp.add_argument("-m", "--muc", dest="muc",
help="Multi User Chatroom to join")
argp.add_argument("-n", "--nick", dest="nick", default='KaaBot',
help="nickname to use in the chatroom (default: KaaBot)")
argp.add_argument("-V", "--vocabulary_file", dest="vocabulary_file",
help="path to an alternative vocabulary file")
argp.add_argument("--welcome", dest="welcome", default="on",
type=str_to_bool,
help="welcome users joining the MUC (on/off, default: on)")
args = argp.parse_args()
if args.jid is None:
args.jid = input("Username: ")
if args.password is None:
args.password = getpass.getpass("Password: ")
if args.muc is None:
args.muc = input("MUC: ")
logging.basicConfig(level=args.debug,
format='%(levelname)-8s %(message)s')
try:
pathlib.Path(config_file).touch(mode=0o600, exist_ok=False)
arguments = vars(args)
arguments.pop("debug")
with open(config_file, 'w') as f:
f.writelines('{}= {}\n'.format(k, v) for k, v
in arguments.items() if v)
except FileExistsError:
logging.debug('Config file exists.')
bot = KaaBot(args.jid, args.password, args.database,
args.muc, args.nick, args.vocabulary_file,
args.welcome)
bot.register_plugin('xep_0045')
bot.register_plugin('xep_0071')
bot.register_plugin('xep_0172')
bot.connect()
bot.process(block=True)