You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

463 lines
18 KiB
Python

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Bogdan Cordier <ooctogene@gmail.com>
# and Matteo Cypriani <mcy@lm7.fr>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
import logging
import sleekxmpp
import datetime
import locale
import dataset
import configargparse
import getpass
import os
import random
import sqlalchemy
import xdg.BaseDirectory
import pathlib
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
default_vocabulary = {
'help': ["My vocabulary empty, I can't help you."],
'empty_log': ["No log for you."],
'gossips': ["{nick} is reading the back log."],
'greetings': ["/me is here!"],
'insults': ['If I had vocabulary, I would insult {nick}.'],
'uptime': ["I'm up for {uptime}."],
'welcome': ["{nick}'s last connection: {date}."],
# Responses to direct messages (not on a MUC):
'refusals': ["I don't accept direct messages. Try on a MUC."],
}
class KaaBot(sleekxmpp.ClientXMPP):
def __init__(self, jid, password, database, muc, nick, vocabulary_file,
welcome):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.muc = muc
self.nick = nick
self.online_timestamp = None
database_path = self.find_database(database, muc)
self.db = dataset.connect('sqlite:///{db}'.format(db=database_path),
engine_kwargs={'connect_args': {
'check_same_thread': False}})
self.vocabulary = self.init_vocabulary(vocabulary_file)
self.welcome = welcome
self.users = self.db['user']
# Initialize table with correct type.
self.users.create_column('nick', sqlalchemy.String)
self.users.create_column('offline_timestamp', sqlalchemy.DateTime)
self.users.create_column('online_timestamp', sqlalchemy.DateTime)
self.muc_log = self.db['muc_log']
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
self.add_event_handler("muc::%s::got_online" % self.muc,
self.muc_online)
self.add_event_handler("muc::%s::got_offline" % self.muc,
self.muc_offline)
@staticmethod
def find_database(database, muc):
"""Returns the path to the database to use for the given MUC.
If `database` is empty, the file name is generated from the MUC's name
`muc` in the first "kaabot" XDG data dir (usually
`$HOME/.local/share/kaabot/`). If the XDG data dir can't be located, the
database is created/open in the work directory.
If it contains a value, it is assumed to be the path to the database. If
the name contains the string "{muc}", it will be substituted with the
MUC's name.
The returned path may or may not exist in the file system.
"""
if database:
return database.format(muc=muc)
data_dir = xdg.BaseDirectory.save_data_path("kaabot")
database = "{muc}.db".format(muc=muc)
return os.path.join(data_dir, database)
@staticmethod
def init_vocabulary(vocabulary_file):
"""Reads the vocabulary from a JSON file.
If vocabulary_file is empty (i.e. the user didn't use the --vocabulary
option), a file named "vocabulary.json" is searched in the first
existing XDG "kaabot" config path, in order of preference (usually
$HOME/.config/kaabot/, then /etc/xdg/kaabot/).
If vocabulary_file contains a value, it is considered to be the path to
a valid vocabulary file.
Error handling:
- If the user-specified file can't be opened, the program will crash.
- Ditto if the XDG-found file exists but can't be opened.
- If the XDG directory cannot be detected, "vocabulary.json" is searched
in the work directory.
- In case of parsing error (the file exists but is invalid JSON),
minimalistic vocabulary is set.
- Ditto if the user didn't use --vocabulary and no vocabulary file is
found in the XDG config path.
"""
if not vocabulary_file:
config_dir = xdg.BaseDirectory.load_first_config("kaabot")
full_path = os.path.join(config_dir, "vocabulary.json")
if os.path.exists(full_path):
vocabulary_file = full_path
if vocabulary_file:
return KaaBot.read_vocabulary_file(vocabulary_file)
else:
return default_vocabulary
@staticmethod
def read_vocabulary_file(vocabulary_file):
"""Actually read and parse the vocabulary file.
"""
try:
fd = open(vocabulary_file, encoding='UTF-8')
except IOError:
logging.error("Can't open vocabulary file {filename}!"
.format(filename=vocabulary_file))
raise
try:
vocabulary = json.load(fd)
fd.close()
except ValueError: # json.JSONDecodeError in Python >= 3.5
logging.warning(("Invalid JSON vocabulary file '{filename}'. "
"Minimal vocabulary will be set.")
.format(filename=vocabulary_file))
vocabulary = default_vocabulary
return vocabulary
def session_start(self, event):
self.send_presence()
self.get_roster()
self.plugin['xep_0045'].joinMUC(self.muc, self.nick, wait=True)
self.plugin['xep_0172'].publish_nick(self.nick)
def message(self, msg):
"""Handles incoming messages.
"""
# Private message
if msg['type'] in ('chat', 'normal'):
# Don't accept private messages unless they are initiated from a MUC
if msg['from'].bare != self.muc:
msg.reply(self.pick_sentence('refusals')).send()
return
# Message's author info
dest = msg['from']
nick = msg['from'].resource
command = msg['body'].strip()
self.parse_command(command, nick, dest, priv=True)
# Public (MUC) message
elif msg['type'] in ('groupchat'):
# Message's author info
dest = msg['from']
nick = msg['mucnick']
# Insert message in database with timestamp
self.muc_log.insert(dict(datetime=datetime.datetime.now(),
msg=msg['body'], user=nick))
# Stop dealing with this message if we sent it
if msg['mucnick'] == self.nick:
return
splitbody = msg['body'].split(sep=self.nick, maxsplit=1)
# The message starts or ends with the bot's nick
if len(splitbody) == 2:
if splitbody[1]:
# Bot's nick is at the beginning
command = splitbody[1]
else:
# Bot's nick is at the end
command = splitbody[0]
command = command.lstrip('\t :, ').rstrip()
self.parse_command(command, nick, dest)
# The bot's nick was used in the middle of a message
elif self.nick in msg['body']:
self.send_insult(nick, dest.bare)
def parse_command(self, command, nick, dest, priv=False):
"""Parses a command sent by dest (nick).
`priv` should be True if the bot was contacted through a private
message, False if analysing a public message. In any case, the bot may
report publicly information about the commands processed.
"""
if not command: # original message was just the bot's name
self.send_help(dest)
elif command in ['log', 'histo']:
self.send_log(nick, dest, echo=priv)
elif command in ['help', 'aide']:
self.send_help(dest)
elif command in ['uptime']:
self.send_uptime(dest, priv)
else:
self.send_insult(nick, dest.bare)
def send_help(self, dest):
"""Sends help messages to 'dest'.
"""
mbody = '\n '.join(self.vocabulary['help'])
self.send_message(mto=dest,
mbody=mbody,
mtype='chat')
def send_log(self, nick, dest, echo=False):
"""Look up backlog for 'nick' and send it to 'dest'.
"""
if echo:
gossip = self.pick_sentence('gossips').format(nick=nick)
self.send_message(mto=dest.bare,
mbody=gossip,
mtype='groupchat')
# Get offline timestamp from database and check if it exists.
offline_timestamp = self.users.find_one(nick=nick)['offline_timestamp']
if not offline_timestamp:
logging.debug(('KaaBot : No offline'
' timestamp for {nick}.').format(nick=nick))
self.send_empty_log(dest)
return
else:
logging.debug(
('KaaBot : {nick} '
'last seen on {date}').format(nick=nick,
date=offline_timestamp))
# Get online timestamp from database.
online_timestamp = self.users.find_one(nick=nick)['online_timestamp']
logging.debug(('KaaBot : {nick} last'
' connection on {date}').format(nick=nick,
date=online_timestamp))
# Since filtered log is a generator we can't know in advance if
# it will be empty. Creating filtered_log_empty allows us to act on
# this event later.
filtered_log_empty = True
filtered_log = (log for log in self.muc_log if
offline_timestamp < log['datetime'] < online_timestamp)
for log in filtered_log:
filtered_log_empty = False
log_message = "[{:%H:%M}] {}: {}".format(log['datetime'],
log['user'],
log['msg'])
self.send_message(mto=dest,
mbody=log_message,
mtype='chat')
# Send message if filtered_log is still empty.
if filtered_log_empty:
logging.debug('KaaBot : Filtered backlog empty.')
self.send_empty_log(dest)
def send_empty_log(self, dest):
"""Send message if backlog empty.
"""
mbody = self.pick_sentence('empty_log')
self.send_message(mto=dest,
mbody=mbody,
mtype='chat')
def send_uptime(self, dest, priv=False):
"""Sends the uptime to `dest`.
If `priv` is true, the message is sent privately, otherwise it's send on
the MUC.
"""
uptime = str(datetime.datetime.now() - self.online_timestamp)
mbody = self.pick_sentence('uptime').format(uptime=uptime)
if priv:
self.send_message(mto=dest,
mbody=mbody,
mtype='chat')
else:
self.send_message(mto=dest.bare,
mbody=mbody,
mtype='groupchat')
def send_insult(self, nick, dest):
"""Sends an insult about `nick` to `dest`.
"""
insult = self.pick_sentence('insults').format(nick=nick)
self.send_message(mto=dest,
mbody=insult,
mtype='groupchat')
def send_welcome(self, nick, dest, date):
msg = self.pick_sentence('welcome').format(nick=nick, date=date)
self.send_message(mto=dest,
mbody=msg,
mtype='groupchat')
def pick_sentence(self, type):
"""Returns a random sentence picked in the loaded vocabulary.
`type` can be any known category of the vocabulary file, e.g. 'insults'.
No substitution is done to the returned string.
"""
voc = self.vocabulary[type]
i = random.randint(0, len(voc) - 1)
return voc[i]
def muc_online(self, presence):
"""Handles MUC online presence.
On bot connection gets called for each
user in the MUC (bot included).
"""
nick = presence['muc']['nick']
if nick != self.nick:
# Check if nick in database.
if self.users.find_one(nick=nick):
# Update nick online timestamp.
self.users.update(dict(nick=nick,
online_timestamp=datetime.datetime.now()),
['nick'])
# Check if bot is connecting for the first time.
if self.online_timestamp:
try:
user = self.users.find_one(nick=nick)
offline_timestamp = user['offline_timestamp']
date = datetime.datetime.strftime(offline_timestamp,
format="%c")
logging.debug('KaaBot : user {} connected, last seen {}'
.format(nick, date))
if self.welcome:
dest = presence['from'].bare
self.send_welcome(nick, dest, date)
except TypeError:
msg = 'KaaBot : No offline timestamp yet for {nick}'
logging.debug(msg.format(nick=nick))
else:
self.users.insert(dict(nick=nick,
online_timestamp=datetime.datetime.now()))
else:
# Set bot online timestamp.
self.online_timestamp = datetime.datetime.now()
self.send_message(mto=presence['from'].bare,
mbody=self.pick_sentence('greetings'),
mtype='groupchat')
def muc_offline(self, presence):
"""Handles MUC offline presence.
"""
nick = presence['muc']['nick']
if nick != self.nick:
self.users.update(dict(nick=nick,
offline_timestamp=datetime.datetime.now()),
['nick'])
def str_to_bool(text):
"""Converts a string to a boolean.
Raises an exception if the string does not describe a boolean value.
"""
text = text.lower()
if text in ["on", "true", "1"]:
return True
elif text in ["off", "false", "0"]:
return False
raise TypeError
if __name__ == '__main__':
config_dir = xdg.BaseDirectory.save_config_path("kaabot")
config_file = os.path.join(config_dir, 'config')
argp = configargparse.ArgParser(default_config_files=[config_file],
description="Super Simple Silly Bot for Jabber.")
argp.add_argument('-d', '--debug', help="set logging to DEBUG",
action='store_const',
dest='debug', const=logging.DEBUG,
default=logging.INFO)
argp.add_argument("-b", "--database", dest="database",
help="path to an alternative database; the '{muc}' string"
" in the name will be substituted with "
"the MUC's name as provided by the --muc option")
argp.add_argument("-j", "--jid", dest="jid", help="JID to use")
argp.add_argument("-p", "--password", dest="password",
help="password to use")
argp.add_argument("-m", "--muc", dest="muc",
help="Multi User Chatroom to join")
argp.add_argument("-n", "--nick", dest="nick", default='KaaBot',
help="nickname to use in the chatroom (default: KaaBot)")
argp.add_argument("-V", "--vocabulary_file", dest="vocabulary_file",
help="path to an alternative vocabulary file")
argp.add_argument("--welcome", dest="welcome", default="on",
type=str_to_bool,
help="welcome users joining the MUC (on/off, default: on)")
args = argp.parse_args()
if args.jid is None:
args.jid = input("Username: ")
if args.password is None:
args.password = getpass.getpass("Password: ")
if args.muc is None:
args.muc = input("MUC: ")
logging.basicConfig(level=args.debug,
format='%(levelname)-8s %(message)s')
try:
pathlib.Path(config_file).touch(mode=0o600, exist_ok=False)
arguments = vars(args)
arguments.pop("debug")
with open(config_file, 'w') as f:
f.writelines('{}= {}\n'.format(k, v) for k, v
in arguments.items() if v)
except FileExistsError:
logging.debug('Config file exists.')
bot = KaaBot(args.jid, args.password, args.database,
args.muc, args.nick, args.vocabulary_file,
args.welcome)
bot.register_plugin('xep_0045')
bot.register_plugin('xep_0071')
bot.register_plugin('xep_0172')
bot.connect()
bot.process(block=True)