halflife2chaos/twitch-integration/twitch_integration.py

350 lines
11 KiB
Python

import asyncio
import threading
from collections import Counter
from typing import Optional
from aiohttp.helpers import sentinel
from twitchAPI.types import ChatEvent
from twitchAPI.chat import Chat, EventData, ChatMessage
# noinspection PyUnresolvedReferences
import obspython as obs
from rcon.source import rcon
from rcon.exceptions import WrongPassword
TARGET_CHANNEL = ''
SOURCE_NAME = ''
RCON_HOST = "127.0.0.1"
RCON_PORT = "27015"
RCON_PASSWORD = ""
voteNumber = -1
# Counter(votes.values())
votes = {}
voteEffects = []
voteKeywords = []
class FakeUser:
"""
This class mimics the functionality of the `twitchAPI.object.TwitchUser`.
Login `justinfanxxxx` allows to login anonymously, where `xxxx` any number.
"""
login = "justinfan1337"
class FakeTwitch:
"""
This class mimics the functionality of the `twitchAPI.Twitch` used in `twitchAPI.chat.Chat`
and is required because `twitchAPI.chat.Chat` is heavily relies on `twitchAPI.Twitch` and is
required for `twitchAPI.chat.Chat` to work. Since we do not use sending messages in this integration,
we use the functionality of twitch to connect anonymously to the chat, which makes it easier
for the user to connect the integration and allows to remove the code responsible for handling the token.
"""
session_timeout = sentinel
async def get_refreshed_user_auth_token():
# When connecting anonymously to chat you can send any token.
return "kappa"
def has_required_auth(*args, **kwargs):
return True
async def get_users():
while True:
yield FakeUser
# this will be called when the event READY is triggered, which will be on bot start
async def on_ready(ready_event: EventData):
print('Bot is ready for work' + (", joining channels" if TARGET_CHANNEL else ""))
if TARGET_CHANNEL:
await ready_event.chat.join_room(TARGET_CHANNEL)
async def on_message(msg: ChatMessage):
global votes
print(f'in {msg.room.name}, {msg.user.name} said: {msg.text}')
uppercase_keywords = [kw.upper() for kw in voteKeywords]
if msg.text.upper() in uppercase_keywords:
kwIndex = uppercase_keywords.index(msg.text.upper())
#check range of index because we don't want a 2 when the range in OBS is 4-7
if voteNumber % 2 == 0:
if kwIndex >= 4:
votes[msg.user.id] = kwIndex - 4
else:
if kwIndex <= 3:
votes[msg.user.id] = kwIndex
async def update_game_votes():
global voteNumber, voteEffects, votes
vote_counts = Counter(votes.values())
vote_params = [""] * len(voteEffects)
for i in range(len(voteEffects)):
vote_params[i] = str(vote_counts.get(i) or 0)
await rcon(
'chaos_vote_internal_set', str(voteNumber), *vote_params,
host=RCON_HOST, port=int(RCON_PORT), passwd=RCON_PASSWORD
)
async def poll_game():
global voteNumber, voteEffects, votes
raw_resp = await rcon(
'chaos_vote_internal_poll',
host=RCON_HOST, port=int(RCON_PORT), passwd=RCON_PASSWORD
)
response = raw_resp.split("rcon from \"", 1)[0].strip()
if response == "":
return False # the game is not quite ready yet.
vote_number, *effects = response.split(";")
vote_number = int(vote_number)
if vote_number != voteNumber:
voteNumber = vote_number
voteEffects = effects
votes = {}
return True
async def game_loop():
already_printed_err = False
faulty_password = ""
while True:
await asyncio.sleep(1)
if faulty_password == RCON_PASSWORD:
continue # wait for the user to change password
else:
faulty_password = ""
try:
if await poll_game():
await update_game_votes()
if already_printed_err:
print("poll resumed as normal.")
already_printed_err = False
except ConnectionError as e:
if not already_printed_err:
print("poll failed", e)
already_printed_err = True
except WrongPassword:
print("rcon wrong password")
faulty_password = RCON_PASSWORD
except Exception as e:
print("poll unexpected exception:", e) # i broke something. log and bail
return
chat: Optional[Chat] = None
poll_task: Optional[asyncio.Task] = None
channel_task: Optional[asyncio.Task] = None
channel_queue = asyncio.Queue()
async def try_starting_chat():
global chat
chat = await Chat(FakeTwitch)
chat.register_event(ChatEvent.READY, on_ready)
chat.register_event(ChatEvent.MESSAGE, on_message)
# NOTE: this evil little library creates its own thread and loop.
# be careful inside its callbacks.
chat.start()
print("connection stuff done")
async def shutdown_chat():
global chat
if chat:
chat.stop()
chat = None
async def startup():
global poll_task
# since we're running in a thread, we need to wait for obs to set our properties.
# TODO: change this to a future or some other kind of awaitable?
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
poll_task = loop.create_task(game_loop())
await try_starting_chat()
print("startup done")
async def shutdown():
global chat, poll_task
if poll_task:
poll_task.cancel()
await shutdown_chat()
print("shutdown done")
def set_text(source: str, text: str):
s = obs.obs_get_source_by_name(source)
if s:
settings = obs.obs_data_create()
obs.obs_data_set_string(settings, "text", text)
obs.obs_source_update(s, settings)
obs.obs_data_release(settings)
obs.obs_source_release(s)
def update_source():
if SOURCE_NAME == "":
return
output = f"Vote #{voteNumber}\n"
vote_counts = Counter(votes.values())
offset = 0
for i, voteEffect in enumerate(voteEffects):
if i == 0 and voteNumber % 2 == 0:
offset = 4
keyword = '?' if i > len(voteKeywords) - 1 else voteKeywords[i + offset]
vote_count = vote_counts.get(i) or 0
output = output + f"{keyword} {voteEffect}: {vote_count}\n"
if i == 3 and voteNumber % 2 == 1:
break
set_text(SOURCE_NAME, output[:-1]) # exclude final newline
def channel_update(channel: str):
global channel_task
if not chat:
return
if channel and not chat.is_in_room(channel):
if not channel_task or channel_task.done():
channel_task = _LOOP.create_task(channel_loop())
channel_queue.put_nowait(channel)
async def channel_loop():
try:
while True:
channel: str = await asyncio.wait_for(channel_queue.get(), 2)
except TimeoutError:
if channel and not chat.is_in_room(channel):
if chat.room_cache:
prev_channel = tuple(chat.room_cache.keys())[0]
await chat.leave_room(prev_channel)
print(f"Left from {prev_channel} channel.")
if not await chat.join_room(channel):
print(f"Joined {channel} channel.")
else:
print(f"Can't join {channel} channel.")
_LOOP: Optional[asyncio.AbstractEventLoop] = None
_THREAD: Optional[threading.Thread] = None
def script_properties():
props = obs.obs_properties_create()
obs.obs_properties_add_text(props, "target_channel", "Target channel", obs.OBS_TEXT_DEFAULT)
p = obs.obs_properties_add_list(props, "source", "Text Source", obs.OBS_COMBO_TYPE_EDITABLE,
obs.OBS_COMBO_FORMAT_STRING)
sources = obs.obs_enum_sources()
if sources:
for source in sources:
source_id = obs.obs_source_get_unversioned_id(source)
if source_id == "text_gdiplus" or source_id == "text_ft2_source":
name = obs.obs_source_get_name(source)
obs.obs_property_list_add_string(p, name, name)
obs.source_list_release(sources)
obs.obs_properties_add_text(props, "rcon_host", "RCON host", obs.OBS_TEXT_DEFAULT)
obs.obs_properties_add_text(props, "rcon_password", "RCON password", obs.OBS_TEXT_PASSWORD)
obs.obs_properties_add_editable_list(props, "vote_keywords", "Vote keywords", obs.OBS_EDITABLE_LIST_TYPE_STRINGS,
None, None)
async def reconnect():
print("reconnecting to twitch")
await shutdown_chat()
await try_starting_chat()
def call_reconnect(props, p):
_LOOP.call_soon_threadsafe(lambda l: asyncio.ensure_future(reconnect()), _LOOP)
obs.obs_properties_add_button(props, "reconnect_button", "Reconnect to twitch", call_reconnect)
return props
def script_defaults(settings):
obs.obs_data_set_default_string(settings, "target_channel", "acuifex")
obs.obs_data_set_default_string(settings, "rcon_host", "127.0.0.1:27015")
obs_array = obs.obs_data_array_create()
for i in ["1", "2", "3", "4", "5", "6", "7", "8"]:
item = obs.obs_data_create()
obs.obs_data_set_string(item, "value", i)
obs.obs_data_array_push_back(obs_array, item)
# obs.obs_data_release(item)
obs.obs_data_set_default_array(settings, "vote_keywords", obs_array)
# obs.obs_data_array_release(obs_array)
def script_update(settings):
# i feel like i'm doing something wrong.
global voteKeywords, TARGET_CHANNEL, SOURCE_NAME, RCON_HOST, RCON_PORT, RCON_PASSWORD
TARGET_CHANNEL = obs.obs_data_get_string(settings, "target_channel")
channel_update(TARGET_CHANNEL)
SOURCE_NAME = obs.obs_data_get_string(settings, "source")
# TODO: Verify port here. We may have an error if we don't
RCON_HOST, RCON_PORT = obs.obs_data_get_string(settings, "rcon_host").split(":", 1)
RCON_PASSWORD = obs.obs_data_get_string(settings, "rcon_password")
voteKeywords = []
obs_votes_keywords = obs.obs_data_get_array(settings, "vote_keywords")
for i in range(obs.obs_data_array_count(obs_votes_keywords)):
item = obs.obs_data_array_item(obs_votes_keywords, i)
value = obs.obs_data_get_string(item, "value")
voteKeywords.append(value)
obs.obs_data_release(item)
obs.obs_data_array_release(obs_votes_keywords)
print("data updated")
# https://gist.github.com/serializingme/5c1a6fd6c7ea58af77c7b80579737c5a
def script_load(settings):
global _LOOP, _THREAD
# let's be nice, and only call the obs's methods from its own thread.
# TODO: call this every frame because why not?
obs.timer_add(update_source, 1000)
_LOOP = asyncio.new_event_loop()
def async_thread():
global _LOOP
asyncio.set_event_loop(_LOOP)
asyncio.ensure_future(startup())
_LOOP.run_forever()
_LOOP.close()
_LOOP = None
_THREAD = threading.Thread(None, async_thread, daemon=True)
_THREAD.start()
def script_unload():
obs.timer_remove(update_source)
global _LOOP, _THREAD
if _LOOP is not None:
async def destroy():
await shutdown()
_LOOP.stop()
# let the destructor run
_LOOP.call_soon_threadsafe(lambda l: asyncio.ensure_future(destroy()), _LOOP)
if _THREAD is not None:
# Wait for 5 seconds, if it doesn't exit just move on not to block
# OBS main thread. Logging something about the failure to properly exit
# is advised.
_THREAD.join()
_THREAD = None
def script_description():
return """Twitch chat voting plugin for HL2Chaos mod
Made by acuifex, modified by holy-jesus
Released under AGPLv3 license"""