|
|
#!/usr/bin/env python3
|
|
|
# -*- coding: utf-8 -*-
|
|
|
#
|
|
|
# Copyright (c) 2016 Bogdan Cordier <ooctogene@gmail.com>
|
|
|
# and Matteo Cypriani <mcy@lm7.fr>
|
|
|
#
|
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
|
# in the Software without restriction, including without limitation the rights
|
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
|
# furnished to do so, subject to the following conditions:
|
|
|
#
|
|
|
# The above copyright notice and this permission notice shall be included in all
|
|
|
# copies or substantial portions of the Software.
|
|
|
#
|
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
|
# SOFTWARE.
|
|
|
|
|
|
import json
|
|
|
import logging
|
|
|
import sleekxmpp
|
|
|
import datetime
|
|
|
import locale
|
|
|
import dataset
|
|
|
import configargparse
|
|
|
import getpass
|
|
|
import os
|
|
|
import random
|
|
|
import sqlalchemy
|
|
|
import xdg.BaseDirectory
|
|
|
import pathlib
|
|
|
|
|
|
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
|
|
|
|
|
|
default_vocabulary = {
|
|
|
'help': ["My vocabulary empty, I can't help you."],
|
|
|
'empty_log': ["No log for you."],
|
|
|
'gossips': ["{nick} is reading the back log."],
|
|
|
'greetings': ["/me is here!"],
|
|
|
'insults': ['If I had vocabulary, I would insult {nick}.'],
|
|
|
'uptime': ["I'm up for {uptime}."],
|
|
|
'welcome': ["{nick}'s last connection: {date}."],
|
|
|
# Responses to direct messages (not on a MUC):
|
|
|
'refusals': ["I don't accept direct messages. Try on a MUC."],
|
|
|
}
|
|
|
|
|
|
|
|
|
class KaaBot(sleekxmpp.ClientXMPP):
|
|
|
def __init__(self, jid, password, database, muc, nick, vocabulary_file,
|
|
|
welcome):
|
|
|
sleekxmpp.ClientXMPP.__init__(self, jid, password)
|
|
|
|
|
|
self.muc = muc
|
|
|
self.nick = nick
|
|
|
self.online_timestamp = None
|
|
|
database_path = self.find_database(database, muc)
|
|
|
self.db = dataset.connect('sqlite:///{db}'.format(db=database_path),
|
|
|
engine_kwargs={'connect_args': {
|
|
|
'check_same_thread': False}})
|
|
|
|
|
|
self.vocabulary = self.init_vocabulary(vocabulary_file)
|
|
|
|
|
|
self.welcome = welcome
|
|
|
|
|
|
self.users = self.db['user']
|
|
|
# Initialize table with correct type.
|
|
|
self.users.create_column('nick', sqlalchemy.String)
|
|
|
self.users.create_column('offline_timestamp', sqlalchemy.DateTime)
|
|
|
self.users.create_column('online_timestamp', sqlalchemy.DateTime)
|
|
|
|
|
|
self.muc_log = self.db['muc_log']
|
|
|
|
|
|
self.add_event_handler("session_start", self.session_start)
|
|
|
self.add_event_handler("message", self.message)
|
|
|
self.add_event_handler("muc::%s::got_online" % self.muc,
|
|
|
self.muc_online)
|
|
|
self.add_event_handler("muc::%s::got_offline" % self.muc,
|
|
|
self.muc_offline)
|
|
|
|
|
|
@staticmethod
|
|
|
def find_database(database, muc):
|
|
|
"""Returns the path to the database to use for the given MUC.
|
|
|
|
|
|
If `database` is empty, the file name is generated from the MUC's name
|
|
|
`muc` in the first "kaabot" XDG data dir (usually
|
|
|
`$HOME/.local/share/kaabot/`). If the XDG data dir can't be located, the
|
|
|
database is created/open in the work directory.
|
|
|
|
|
|
If it contains a value, it is assumed to be the path to the database. If
|
|
|
the name contains the string "{muc}", it will be substituted with the
|
|
|
MUC's name.
|
|
|
|
|
|
The returned path may or may not exist in the file system.
|
|
|
"""
|
|
|
if database:
|
|
|
return database.format(muc=muc)
|
|
|
|
|
|
data_dir = xdg.BaseDirectory.save_data_path("kaabot")
|
|
|
database = "{muc}.db".format(muc=muc)
|
|
|
return os.path.join(data_dir, database)
|
|
|
|
|
|
@staticmethod
|
|
|
def init_vocabulary(vocabulary_file):
|
|
|
"""Reads the vocabulary from a JSON file.
|
|
|
|
|
|
If vocabulary_file is empty (i.e. the user didn't use the --vocabulary
|
|
|
option), a file named "vocabulary.json" is searched in the first
|
|
|
existing XDG "kaabot" config path, in order of preference (usually
|
|
|
$HOME/.config/kaabot/, then /etc/xdg/kaabot/).
|
|
|
|
|
|
If vocabulary_file contains a value, it is considered to be the path to
|
|
|
a valid vocabulary file.
|
|
|
|
|
|
Error handling:
|
|
|
- If the user-specified file can't be opened, the program will crash.
|
|
|
- Ditto if the XDG-found file exists but can't be opened.
|
|
|
- If the XDG directory cannot be detected, "vocabulary.json" is searched
|
|
|
in the work directory.
|
|
|
- In case of parsing error (the file exists but is invalid JSON),
|
|
|
minimalistic vocabulary is set.
|
|
|
- Ditto if the user didn't use --vocabulary and no vocabulary file is
|
|
|
found in the XDG config path.
|
|
|
"""
|
|
|
if not vocabulary_file:
|
|
|
config_dir = xdg.BaseDirectory.load_first_config("kaabot")
|
|
|
full_path = os.path.join(config_dir, "vocabulary.json")
|
|
|
if os.path.exists(full_path):
|
|
|
vocabulary_file = full_path
|
|
|
|
|
|
if vocabulary_file:
|
|
|
return KaaBot.read_vocabulary_file(vocabulary_file)
|
|
|
else:
|
|
|
return default_vocabulary
|
|
|
|
|
|
@staticmethod
|
|
|
def read_vocabulary_file(vocabulary_file):
|
|
|
"""Actually read and parse the vocabulary file.
|
|
|
"""
|
|
|
try:
|
|
|
fd = open(vocabulary_file, encoding='UTF-8')
|
|
|
except IOError:
|
|
|
logging.error("Can't open vocabulary file {filename}!"
|
|
|
.format(filename=vocabulary_file))
|
|
|
raise
|
|
|
|
|
|
try:
|
|
|
vocabulary = json.load(fd)
|
|
|
fd.close()
|
|
|
except ValueError: # json.JSONDecodeError in Python >= 3.5
|
|
|
logging.warning(("Invalid JSON vocabulary file '{filename}'. "
|
|
|
"Minimal vocabulary will be set.")
|
|
|
.format(filename=vocabulary_file))
|
|
|
vocabulary = default_vocabulary
|
|
|
|
|
|
return vocabulary
|
|
|
|
|
|
def session_start(self, event):
|
|
|
self.send_presence()
|
|
|
self.get_roster()
|
|
|
self.plugin['xep_0045'].joinMUC(self.muc, self.nick, wait=True)
|
|
|
self.plugin['xep_0172'].publish_nick(self.nick)
|
|
|
|
|
|
def message(self, msg):
|
|
|
"""Handles incoming messages.
|
|
|
"""
|
|
|
# Private message
|
|
|
if msg['type'] in ('chat', 'normal'):
|
|
|
# Don't accept private messages unless they are initiated from a MUC
|
|
|
if msg['from'].bare != self.muc:
|
|
|
msg.reply(self.pick_sentence('refusals')).send()
|
|
|
return
|
|
|
|
|
|
# Message's author info
|
|
|
dest = msg['from']
|
|
|
nick = msg['from'].resource
|
|
|
|
|
|
command = msg['body'].strip()
|
|
|
self.parse_command(command, nick, dest, priv=True)
|
|
|
|
|
|
# Public (MUC) message
|
|
|
elif msg['type'] in ('groupchat'):
|
|
|
# Message's author info
|
|
|
dest = msg['from']
|
|
|
nick = msg['mucnick']
|
|
|
|
|
|
# Insert message in database with timestamp
|
|
|
self.muc_log.insert(dict(datetime=datetime.datetime.now(),
|
|
|
msg=msg['body'], user=nick))
|
|
|
|
|
|
# Stop dealing with this message if we sent it
|
|
|
if msg['mucnick'] == self.nick:
|
|
|
return
|
|
|
|
|
|
splitbody = msg['body'].split(sep=self.nick, maxsplit=1)
|
|
|
|
|
|
# The message starts or ends with the bot's nick
|
|
|
if len(splitbody) == 2:
|
|
|
if splitbody[1]:
|
|
|
# Bot's nick is at the beginning
|
|
|
command = splitbody[1]
|
|
|
else:
|
|
|
# Bot's nick is at the end
|
|
|
command = splitbody[0]
|
|
|
command = command.lstrip('\t :, ').rstrip()
|
|
|
self.parse_command(command, nick, dest)
|
|
|
|
|
|
# The bot's nick was used in the middle of a message
|
|
|
elif self.nick in msg['body']:
|
|
|
self.send_insult(nick, dest.bare)
|
|
|
|
|
|
def parse_command(self, command, nick, dest, priv=False):
|
|
|
"""Parses a command sent by dest (nick).
|
|
|
|
|
|
`priv` should be True if the bot was contacted through a private
|
|
|
message, False if analysing a public message. In any case, the bot may
|
|
|
report publicly information about the commands processed.
|
|
|
"""
|
|
|
if not command: # original message was just the bot's name
|
|
|
self.send_help(dest)
|
|
|
elif command in ['log', 'histo']:
|
|
|
self.send_log(nick, dest, echo=priv)
|
|
|
elif command in ['help', 'aide']:
|
|
|
self.send_help(dest)
|
|
|
elif command in ['uptime']:
|
|
|
self.send_uptime(dest, priv)
|
|
|
else:
|
|
|
self.send_insult(nick, dest.bare)
|
|
|
|
|
|
def send_help(self, dest):
|
|
|
"""Sends help messages to 'dest'.
|
|
|
"""
|
|
|
mbody = '\n '.join(self.vocabulary['help'])
|
|
|
self.send_message(mto=dest,
|
|
|
mbody=mbody,
|
|
|
mtype='chat')
|
|
|
|
|
|
def send_log(self, nick, dest, echo=False):
|
|
|
"""Look up backlog for 'nick' and send it to 'dest'.
|
|
|
"""
|
|
|
if echo:
|
|
|
gossip = self.pick_sentence('gossips').format(nick=nick)
|
|
|
self.send_message(mto=dest.bare,
|
|
|
mbody=gossip,
|
|
|
mtype='groupchat')
|
|
|
|
|
|
# Get offline timestamp from database and check if it exists.
|
|
|
offline_timestamp = self.users.find_one(nick=nick)['offline_timestamp']
|
|
|
if not offline_timestamp:
|
|
|
logging.debug(('KaaBot : No offline'
|
|
|
' timestamp for {nick}.').format(nick=nick))
|
|
|
self.send_empty_log(dest)
|
|
|
return
|
|
|
else:
|
|
|
logging.debug(
|
|
|
('KaaBot : {nick} '
|
|
|
'last seen on {date}').format(nick=nick,
|
|
|
date=offline_timestamp))
|
|
|
|
|
|
# Get online timestamp from database.
|
|
|
online_timestamp = self.users.find_one(nick=nick)['online_timestamp']
|
|
|
logging.debug(('KaaBot : {nick} last'
|
|
|
' connection on {date}').format(nick=nick,
|
|
|
date=online_timestamp))
|
|
|
|
|
|
# Since filtered log is a generator we can't know in advance if
|
|
|
# it will be empty. Creating filtered_log_empty allows us to act on
|
|
|
# this event later.
|
|
|
filtered_log_empty = True
|
|
|
filtered_log = (log for log in self.muc_log if
|
|
|
offline_timestamp < log['datetime'] < online_timestamp)
|
|
|
|
|
|
for log in filtered_log:
|
|
|
filtered_log_empty = False
|
|
|
log_message = "[{:%H:%M}] {}: {}".format(log['datetime'],
|
|
|
log['user'],
|
|
|
log['msg'])
|
|
|
self.send_message(mto=dest,
|
|
|
mbody=log_message,
|
|
|
mtype='chat')
|
|
|
|
|
|
# Send message if filtered_log is still empty.
|
|
|
if filtered_log_empty:
|
|
|
logging.debug('KaaBot : Filtered backlog empty.')
|
|
|
self.send_empty_log(dest)
|
|
|
|
|
|
def send_empty_log(self, dest):
|
|
|
"""Send message if backlog empty.
|
|
|
"""
|
|
|
mbody = self.pick_sentence('empty_log')
|
|
|
self.send_message(mto=dest,
|
|
|
mbody=mbody,
|
|
|
mtype='chat')
|
|
|
|
|
|
def send_uptime(self, dest, priv=False):
|
|
|
"""Sends the uptime to `dest`.
|
|
|
|
|
|
If `priv` is true, the message is sent privately, otherwise it's send on
|
|
|
the MUC.
|
|
|
"""
|
|
|
uptime = str(datetime.datetime.now() - self.online_timestamp)
|
|
|
mbody = self.pick_sentence('uptime').format(uptime=uptime)
|
|
|
if priv:
|
|
|
self.send_message(mto=dest,
|
|
|
mbody=mbody,
|
|
|
mtype='chat')
|
|
|
else:
|
|
|
self.send_message(mto=dest.bare,
|
|
|
mbody=mbody,
|
|
|
mtype='groupchat')
|
|
|
|
|
|
def send_insult(self, nick, dest):
|
|
|
"""Sends an insult about `nick` to `dest`.
|
|
|
"""
|
|
|
insult = self.pick_sentence('insults').format(nick=nick)
|
|
|
self.send_message(mto=dest,
|
|
|
mbody=insult,
|
|
|
mtype='groupchat')
|
|
|
|
|
|
def send_welcome(self, nick, dest, date):
|
|
|
msg = self.pick_sentence('welcome').format(nick=nick, date=date)
|
|
|
self.send_message(mto=dest,
|
|
|
mbody=msg,
|
|
|
mtype='groupchat')
|
|
|
|
|
|
def pick_sentence(self, type):
|
|
|
"""Returns a random sentence picked in the loaded vocabulary.
|
|
|
|
|
|
`type` can be any known category of the vocabulary file, e.g. 'insults'.
|
|
|
No substitution is done to the returned string.
|
|
|
"""
|
|
|
voc = self.vocabulary[type]
|
|
|
i = random.randint(0, len(voc) - 1)
|
|
|
return voc[i]
|
|
|
|
|
|
def muc_online(self, presence):
|
|
|
"""Handles MUC online presence.
|
|
|
On bot connection gets called for each
|
|
|
user in the MUC (bot included).
|
|
|
"""
|
|
|
nick = presence['muc']['nick']
|
|
|
if nick != self.nick:
|
|
|
# Check if nick in database.
|
|
|
if self.users.find_one(nick=nick):
|
|
|
|
|
|
# Update nick online timestamp.
|
|
|
self.users.update(dict(nick=nick,
|
|
|
online_timestamp=datetime.datetime.now()),
|
|
|
['nick'])
|
|
|
|
|
|
# Check if bot is connecting for the first time.
|
|
|
if self.online_timestamp:
|
|
|
try:
|
|
|
user = self.users.find_one(nick=nick)
|
|
|
offline_timestamp = user['offline_timestamp']
|
|
|
date = datetime.datetime.strftime(offline_timestamp,
|
|
|
format="%c")
|
|
|
logging.debug('KaaBot : user {} connected, last seen {}'
|
|
|
.format(nick, date))
|
|
|
if self.welcome:
|
|
|
dest = presence['from'].bare
|
|
|
self.send_welcome(nick, dest, date)
|
|
|
|
|
|
except TypeError:
|
|
|
msg = 'KaaBot : No offline timestamp yet for {nick}'
|
|
|
logging.debug(msg.format(nick=nick))
|
|
|
else:
|
|
|
self.users.insert(dict(nick=nick,
|
|
|
online_timestamp=datetime.datetime.now()))
|
|
|
else:
|
|
|
# Set bot online timestamp.
|
|
|
self.online_timestamp = datetime.datetime.now()
|
|
|
self.send_message(mto=presence['from'].bare,
|
|
|
mbody=self.pick_sentence('greetings'),
|
|
|
mtype='groupchat')
|
|
|
|
|
|
def muc_offline(self, presence):
|
|
|
"""Handles MUC offline presence.
|
|
|
"""
|
|
|
nick = presence['muc']['nick']
|
|
|
if nick != self.nick:
|
|
|
self.users.update(dict(nick=nick,
|
|
|
offline_timestamp=datetime.datetime.now()),
|
|
|
['nick'])
|
|
|
|
|
|
|
|
|
def str_to_bool(text):
|
|
|
"""Converts a string to a boolean.
|
|
|
|
|
|
Raises an exception if the string does not describe a boolean value.
|
|
|
"""
|
|
|
text = text.lower()
|
|
|
if text in ["on", "true", "1"]:
|
|
|
return True
|
|
|
elif text in ["off", "false", "0"]:
|
|
|
return False
|
|
|
raise TypeError
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
config_dir = xdg.BaseDirectory.save_config_path("kaabot")
|
|
|
config_file = os.path.join(config_dir, 'config')
|
|
|
|
|
|
argp = configargparse.ArgParser(default_config_files=[config_file],
|
|
|
description="Super Simple Silly Bot for Jabber.")
|
|
|
argp.add_argument('-d', '--debug', help="set logging to DEBUG",
|
|
|
action='store_const',
|
|
|
dest='debug', const=logging.DEBUG,
|
|
|
default=logging.INFO)
|
|
|
argp.add_argument("-b", "--database", dest="database",
|
|
|
help="path to an alternative database; the '{muc}' string"
|
|
|
" in the name will be substituted with "
|
|
|
"the MUC's name as provided by the --muc option")
|
|
|
argp.add_argument("-j", "--jid", dest="jid", help="JID to use")
|
|
|
argp.add_argument("-p", "--password", dest="password",
|
|
|
help="password to use")
|
|
|
argp.add_argument("-m", "--muc", dest="muc",
|
|
|
help="Multi User Chatroom to join")
|
|
|
argp.add_argument("-n", "--nick", dest="nick", default='KaaBot',
|
|
|
help="nickname to use in the chatroom (default: KaaBot)")
|
|
|
argp.add_argument("-V", "--vocabulary_file", dest="vocabulary_file",
|
|
|
help="path to an alternative vocabulary file")
|
|
|
argp.add_argument("--welcome", dest="welcome", default="on",
|
|
|
type=str_to_bool,
|
|
|
help="welcome users joining the MUC (on/off, default: on)")
|
|
|
|
|
|
args = argp.parse_args()
|
|
|
|
|
|
if args.jid is None:
|
|
|
args.jid = input("Username: ")
|
|
|
if args.password is None:
|
|
|
args.password = getpass.getpass("Password: ")
|
|
|
if args.muc is None:
|
|
|
args.muc = input("MUC: ")
|
|
|
|
|
|
logging.basicConfig(level=args.debug,
|
|
|
format='%(levelname)-8s %(message)s')
|
|
|
|
|
|
try:
|
|
|
pathlib.Path(config_file).touch(mode=0o600, exist_ok=False)
|
|
|
arguments = vars(args)
|
|
|
arguments.pop("debug")
|
|
|
with open(config_file, 'w') as f:
|
|
|
f.writelines('{}= {}\n'.format(k, v) for k, v
|
|
|
in arguments.items() if v)
|
|
|
except FileExistsError:
|
|
|
logging.debug('Config file exists.')
|
|
|
|
|
|
bot = KaaBot(args.jid, args.password, args.database,
|
|
|
args.muc, args.nick, args.vocabulary_file,
|
|
|
args.welcome)
|
|
|
bot.register_plugin('xep_0045')
|
|
|
bot.register_plugin('xep_0071')
|
|
|
bot.register_plugin('xep_0172')
|
|
|
bot.connect()
|
|
|
bot.process(block=True)
|