rough automation implementation

This commit is contained in:
Lincoln-LM 2022-01-13 19:38:56 -07:00
parent 11433a711f
commit f8ad0d210f
5 changed files with 365 additions and 11 deletions

68
automation_test.py Normal file
View File

@ -0,0 +1,68 @@
import asyncio
from joycontrol.protocol import controller_protocol_factory
from joycontrol.server import create_hid_server
from joycontrol.controller import Controller
from joycontrol.controller_state import ControllerState, button_push, button_press, button_release
async def main():
# switch's btaddr
switch_bt_addr = "DC:68:EB:AF:50:BC"
# the type of controller to create
controller = Controller.PRO_CONTROLLER # or JOYCON_L or JOYCON_R
# a callback to create the corresponding protocol once a connection is established
factory = controller_protocol_factory(controller)
# start the emulated controller
transport, protocol = await create_hid_server(factory, reconnect_bt_addr = switch_bt_addr)
# get a reference to the state beeing emulated.
controller_state = protocol.get_controller_state()
# wait for input to be accepted
await controller_state.connect()
# press button
await asyncio.sleep(0.3)
# await button_push(controller_state,"home")
# await asyncio.sleep(0.1)
# await button_push(controller_state,"home")
# await asyncio.sleep(3)
print("home")
await button_push(controller_state,"home")
await button_push(controller_state,"home")
await button_push(controller_state,"home")
await asyncio.sleep(2.5)
await button_push(controller_state,"b")
await asyncio.sleep(1.5)
await button_push(controller_state,"x")
await asyncio.sleep(1)
for _ in range(10):
await button_push(controller_state,"a")
print("A")
await asyncio.sleep(1.5)
await button_push(controller_state,"b")
print("B")
await asyncio.sleep(1)
await asyncio.sleep(0.7)
await button_push(controller_state,"b")
await asyncio.sleep(1.7)
print("p")
await button_push(controller_state,"plus")
await button_push(controller_state,"plus")
await asyncio.sleep(0.7)
print("r")
await button_push(controller_state,"right")
await button_push(controller_state,"right")
await asyncio.sleep(1.7)
print("a")
await button_push(controller_state,"a")
await button_push(controller_state,"a")
await asyncio.sleep(1.7)
print("home")
await button_push(controller_state,"home")
await button_push(controller_state,"home")
await button_push(controller_state,"home")
await asyncio.sleep(2.5)
# await asyncio.sleep(0.5)
# await button_push(controller_state,"a")
input()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -3,14 +3,14 @@
"WindowPrefix": "", "WindowPrefix": "",
"image": "./images/phone_turnback/eye.png", "image": "./images/phone_turnback/eye.png",
"view": [ "view": [
286, 298,
231, 270,
37, 50,
36 50
], ],
"thresh": 0.9, "thresh": 0.8,
"white_delay": 2.0, "white_delay": 3.0,
"advance_delay": 2, "advance_delay": 3,
"advance_delay_2": 0, "advance_delay_2": 0,
"npc": 0, "npc": 0,
"pokemon_npc": 1, "pokemon_npc": 1,
@ -21,5 +21,5 @@
0, 0,
0 0
], ],
"camera": 3 "camera": 0
} }

Binary file not shown.

Before

Width:  |  Height:  |  Size: 896 B

After

Width:  |  Height:  |  Size: 870 B

View File

@ -13,6 +13,12 @@ from os import listdir
from os.path import isfile, join from os.path import isfile, join
from PIL import Image, ImageTk from PIL import Image, ImageTk
import asyncio
from joycontrol.protocol import controller_protocol_factory
from joycontrol.server import create_hid_server
from joycontrol.controller import Controller
from joycontrol.controller_state import ControllerState, button_push, button_press, button_release
os.chdir(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) os.chdir(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
import rngtool import rngtool
@ -26,6 +32,7 @@ class Application(tk.Frame):
self.previewing = False self.previewing = False
self.monitoring = False self.monitoring = False
self.reidentifying = False self.reidentifying = False
self.target = 25396
self.tidsiding = False self.tidsiding = False
self.timelining = False self.timelining = False
self.config_json = {} self.config_json = {}
@ -46,6 +53,8 @@ class Application(tk.Frame):
} }
self.pack() self.pack()
self.create_widgets() self.create_widgets()
self.loop = asyncio.new_event_loop()
self.loop.run_until_complete(self.connect())
signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGINT, self.signal_handler)
def update_configs(self,event=None): def update_configs(self,event=None):
@ -111,6 +120,9 @@ class Application(tk.Frame):
self.tidsid_button = ttk.Button(self, text="TID/SID", command=self.tidsid) self.tidsid_button = ttk.Button(self, text="TID/SID", command=self.tidsid)
self.tidsid_button.grid(column=5,row=5) self.tidsid_button.grid(column=5,row=5)
self.bot_button = ttk.Button(self, text="Bot", command=self.bot)
self.bot_button.grid(column=5,row=6)
x = y = w = h = 0 x = y = w = h = 0
th = 0.9 th = 0.9
@ -180,6 +192,9 @@ class Application(tk.Frame):
self.advances_increase_button = ttk.Button(self, text="Advance", command=self.increase_advances) self.advances_increase_button = ttk.Button(self, text="Advance", command=self.increase_advances)
self.advances_increase_button.grid(column=1,row=13) self.advances_increase_button.grid(column=1,row=13)
self.target_inp = tk.Spinbox(self, from_ = 0, to = 999999)
self.target_inp.grid(column=1,row=14)
self.pos_x.delete(0, tk.END) self.pos_x.delete(0, tk.END)
self.pos_x.insert(0, x) self.pos_x.insert(0, x)
self.pos_y.delete(0, tk.END) self.pos_y.delete(0, tk.END)
@ -209,6 +224,219 @@ class Application(tk.Frame):
self.after_task() self.after_task()
async def connect(self):
# switch's btaddr
self.switch_bt_addr = "DC:68:EB:AF:50:BC"
# the type of controller to create
self.controller = Controller.PRO_CONTROLLER # or JOYCON_L or JOYCON_R
# a callback to create the corresponding protocol once a connection is established
self.factory = controller_protocol_factory(self.controller)
# start the emulated controller
self.transport, self.protocol = await create_hid_server(self.factory, reconnect_bt_addr = self.switch_bt_addr)
# get a reference to the state beeing emulated.
self.controller_state = self.protocol.get_controller_state()
# wait for input to be accepted
print("Waiting for connection")
await self.controller_state.connect()
print("Connected")
return
def bot(self):
self.botting_thread=threading.Thread(target=self.bot_work)
self.botting_thread.daemon = True
self.botting_thread.start()
def quick_eye_check(self):
if self.config_json["MonitorWindow"]:
from windowcapture import WindowCapture
video = WindowCapture(self.config_json["WindowPrefix"],self.config_json["crop"])
else:
if sys.platform.startswith('linux'): # all Linux
backend = cv2.CAP_V4L
elif sys.platform.startswith('win'): # MS Windows
backend = cv2.CAP_DSHOW
elif sys.platform.startswith('darwin'): # macOS
backend = cv2.CAP_ANY
else:
backend = cv2.CAP_ANY # auto-detect via OpenCV
video = cv2.VideoCapture(self.config_json["camera"],backend)
video.set(cv2.CAP_PROP_FRAME_WIDTH,1920)
video.set(cv2.CAP_PROP_FRAME_HEIGHT,1080)
video.set(cv2.CAP_PROP_BUFFERSIZE,1)
print(f"camera {self.config_json['camera']}")
eye = self.player_eye
roi_x, roi_y, roi_w, roi_h = self.config_json["view"]
_, frame = video.read()
if frame is not None:
if not self.config_json["MonitorWindow"]:
size = frame.shape[::-1]
_, fw, fh = size
if fw >= 1920:
frame = cv2.resize(frame,(960,round(fh/fw*960)))
roi = cv2.cvtColor(frame[roi_y:roi_y+roi_h,roi_x:roi_x+roi_w],cv2.COLOR_RGB2GRAY)
res = cv2.matchTemplate(roi,eye,cv2.TM_CCOEFF_NORMED)
_, match, _, _ = cv2.minMaxLoc(res)
return match
def bot_work(self):
async def cmd():
await self.zoom_work(unzoom=True)
print("???")
time.sleep(1)
print("home")
await button_push(self.controller_state,"home")
await button_push(self.controller_state,"home")
print("home done")
time.sleep(1.5)
print("x")
await button_push(self.controller_state,"x")
await button_push(self.controller_state,"x")
time.sleep(1.5)
print("a")
await button_push(self.controller_state,"a")
await button_push(self.controller_state,"a")
time.sleep(2.5)
print("a")
await button_push(self.controller_state,"a")
await button_push(self.controller_state,"a")
time.sleep(1.5)
print("a")
await button_push(self.controller_state,"a")
await button_push(self.controller_state,"a")
time.sleep(30)
print("a")
await button_push(self.controller_state,"a")
await button_push(self.controller_state,"a")
time.sleep(4)
print("a")
await button_push(self.controller_state,"a")
await button_push(self.controller_state,"a")
time.sleep(12)
print("p")
await button_push(self.controller_state,"plus")
await button_push(self.controller_state,"plus")
time.sleep(0.7)
print("r")
await button_push(self.controller_state,"right")
await button_push(self.controller_state,"right")
time.sleep(1.7)
print("a")
await button_push(self.controller_state,"a")
await button_push(self.controller_state,"a")
time.sleep(1.7)
await self.zoom_work()
time.sleep(2.5)
self.s0_1_2_3.delete(1.0,tk.END)
self.s0_1_2_3.insert(1.0,"0")
last = self.s0_1_2_3.get(1.0,tk.END)
curr = last
self.monitor_blinks()
seeds_found = True
while curr == last:
curr = self.s0_1_2_3.get(1.0,tk.END)
if "ERROR" in curr:
print("FAILED TO FIND SEEDS")
seeds_found = False
break
time.sleep(0.5)
if seeds_found:
last = self.s0_1_2_3.get(1.0,tk.END)
print("Seeds found")
print(last)
rng = Xorshift(*[int(x,16) for x in self.s0_1_2_3.get(1.0,tk.END).split("\n")[:4]])
# last = None
found = False
rng.advance(260+84)
for i in range(100000):
go = Xorshift(*rng.getState())
go.next()
sidtid = rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF
pid = rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF
shiny = ((pid & 0xFFFF) ^ (sidtid & 0xFFFF) ^ (pid >> 16) ^ (sidtid >> 16)) < 16
# ivs = [-1,-1,-1,-1,-1,-1]
# for _ in range(3):
# index = (rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF) % 6
# while ivs[index] != -1:
# index = (rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF) % 6
# ivs[index] = 31
# for index in range(6):
# if ivs[index] == -1:
# ivs[index] = (rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF) % 32
# ability = (rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF) & 1
# nature = (rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF) % 25
# if shiny and ivs[0] >= 30 and ivs[3] >= 30 and ivs[5] >= 30:
if shiny:
print(i+260)
self.target_inp.delete(0, tk.END)
self.target_inp.insert(0,i+260)
found = True
break
# rng.next()
# if i == 0:
# last = rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF
# mock = rng.range(-0x80000000,0x7FFFFFFF) & 0xFFFFFFFF
# if (((mock & 0xFFFF) ^ (last & 0xFFFF) ^ (mock >> 16) ^ (last >> 16)) < 16):
# print(hex(last),hex(mock))
# print(i+260,"shiny")
# self.target_inp.delete(0, tk.END)
# self.target_inp.insert(0,i+260)
# found = True
# break
# last = mock
self.tracking = False
self.monitoring = False
if not found:
await cmd()
else:
if i > 165*2:
await self.zoom_work(unzoom=True)
await asyncio.sleep(2.5)
print("exit text")
await button_push(self.controller_state,"b")
await asyncio.sleep(1.5)
print("open menu")
await button_push(self.controller_state,"x")
await asyncio.sleep(1)
print(f"skip {int(i//165)} times")
for _ in range(int(i//165)):
await button_push(self.controller_state,"a")
print("A")
await asyncio.sleep(1.5)
await button_push(self.controller_state,"b")
print("B")
await asyncio.sleep(1)
await asyncio.sleep(0.7)
print("exit menu")
await button_push(self.controller_state,"b")
await asyncio.sleep(1.7)
print("open text")
print("p")
await button_push(self.controller_state,"plus")
await button_push(self.controller_state,"plus")
await asyncio.sleep(0.7)
print("r")
await button_push(self.controller_state,"right")
await button_push(self.controller_state,"right")
await asyncio.sleep(1.7)
print("a")
await button_push(self.controller_state,"a")
await button_push(self.controller_state,"a")
await asyncio.sleep(1.7)
print("home")
await self.zoom_work()
await asyncio.sleep(2.5)
print("hopefully going for target")
self.reidentify()
else:
await cmd()
self.loop.run_until_complete(cmd())
def increase_advances(self): def increase_advances(self):
plus = int(self.advances_increase.get()) plus = int(self.advances_increase.get())
self.rng.advance(plus) self.rng.advance(plus)
@ -313,7 +541,12 @@ class Application(tk.Frame):
def monitoring_work(self): def monitoring_work(self):
self.tracking = False self.tracking = False
blinks, intervals, offset_time = rngtool.tracking_blink(self.player_eye, *self.config_json["view"], MonitorWindow=self.config_json["MonitorWindow"], WindowPrefix=self.config_json["WindowPrefix"], crop=self.config_json["crop"], camera=self.config_json["camera"], tk_window=self, th=self.config_json["thresh"]) blinks, intervals, offset_time = rngtool.tracking_blink(self.player_eye, *self.config_json["view"], MonitorWindow=self.config_json["MonitorWindow"], WindowPrefix=self.config_json["WindowPrefix"], crop=self.config_json["crop"], camera=self.config_json["camera"], tk_window=self, th=self.config_json["thresh"])
self.rng = rngtool.recov(blinks, intervals) try:
self.rng = rngtool.recov(blinks, intervals)
except AssertionError as e:
self.s0_1_2_3.delete(1.0, tk.END)
self.s0_1_2_3.insert(1.0, "ERROR")
raise e
self.monitor_blink_button['text'] = "Monitor Blinks" self.monitor_blink_button['text'] = "Monitor Blinks"
self.monitoring = False self.monitoring = False
@ -445,6 +678,32 @@ class Application(tk.Frame):
next_time = waituntil - time.perf_counter() or 0 next_time = waituntil - time.perf_counter() or 0
time.sleep(next_time) time.sleep(next_time)
async def zoom_work(self,unzoom=False):
match = 1 if unzoom else 0
while (not (match < self.config_json["thresh"])) if unzoom else ((match < self.config_json["thresh"])):
print("home")
await button_push(self.controller_state,"home")
await asyncio.sleep(0.1)
await button_push(self.controller_state,"home")
await asyncio.sleep(1.5)
print('checking eye')
match = self.quick_eye_check()
print(match,1)
time.sleep(1.5)
def turnback_work(self):
async def cmd():
await asyncio.sleep(0.5)
await asyncio.sleep(0.5)
print("PRESS A")
await button_push(self.controller_state,"a")
await asyncio.sleep(0.5)
print("PRESS UP")
await button_push(self.controller_state,"up", sec=4.5)
await asyncio.sleep(0.5)
print("PRESS A")
await button_push(self.controller_state,"a")
self.loop.run_until_complete(cmd())
def reidentifying_work(self): def reidentifying_work(self):
self.tracking = False self.tracking = False
@ -467,9 +726,24 @@ class Application(tk.Frame):
self.s01_23.insert(1.0,s01+"\n"+s23) self.s01_23.insert(1.0,s01+"\n"+s23)
print([hex(x) for x in state]) print([hex(x) for x in state])
# async def cmd():
# await asyncio.sleep(0.5)
# await asyncio.sleep(0.5)
# print("PRESS A")
# await button_push(self.controller_state,"a")
# await asyncio.sleep(0.5)
# print("PRESS UP")
# await button_push(self.controller_state,"up", sec=4.5)
# await asyncio.sleep(0.5)
# print("PRESS A")
# await button_push(self.controller_state,"a")
# self.loop.run_until_complete(cmd())
observed_blinks, _, offset_time = rngtool.tracking_blink(self.player_eye, *self.config_json["view"], MonitorWindow=self.config_json["MonitorWindow"], WindowPrefix=self.config_json["WindowPrefix"], crop=self.config_json["crop"], camera=self.config_json["camera"], tk_window=self, th=self.config_json["thresh"], size=20) observed_blinks, _, offset_time = rngtool.tracking_blink(self.player_eye, *self.config_json["view"], MonitorWindow=self.config_json["MonitorWindow"], WindowPrefix=self.config_json["WindowPrefix"], crop=self.config_json["crop"], camera=self.config_json["camera"], tk_window=self, th=self.config_json["thresh"], size=20)
self.rng, adv = rngtool.reidentifyByBlinks(Xorshift(*state), observed_blinks, return_advance=True, npc=self.config_json["npc"]) self.rng, adv = rngtool.reidentifyByBlinks(Xorshift(*state), observed_blinks, return_advance=True, npc=self.config_json["npc"])
if self.target != -1:
print("WHAT")
self.loop.run_until_complete(self.zoom_work(unzoom=True))
self.reidentify_button['text'] = "Reidentify" self.reidentify_button['text'] = "Reidentify"
self.reidentifying = False self.reidentifying = False
@ -483,6 +757,8 @@ class Application(tk.Frame):
self.advances = adv+diff*(self.config_json["npc"]+1) self.advances = adv+diff*(self.config_json["npc"]+1)
self.tracking = True self.tracking = True
self.count_down = None self.count_down = None
self.target = int(self.target_inp.get())
print(self.target)
while self.tracking: while self.tracking:
if self.count_down is None: if self.count_down is None:
if self.timelining: if self.timelining:
@ -497,11 +773,18 @@ class Application(tk.Frame):
r = self.rng.getNextRandSequence(self.config_json["npc"]+1)[-1] r = self.rng.getNextRandSequence(self.config_json["npc"]+1)[-1]
waituntil += 1.018 waituntil += 1.018
print(f"advances:{self.advances}, blinks:{hex(r&0xF)}") print(f"advances:{self.advances}, blinks:{hex(r&0xF)}")
if self.advances == self.target-21:
self.timeline()
next_time = waituntil - time.perf_counter() or 0 next_time = waituntil - time.perf_counter() or 0
time.sleep(next_time) time.sleep(next_time)
if self.timelining: if self.timelining:
if self.target != -1:
movement_thread=threading.Thread(target=self.turnback_work)
movement_thread.daemon = True
movement_thread.start()
self.rng.next() self.rng.next()
# white screen # white screen
time.sleep(self.config_json["white_delay"]) time.sleep(self.config_json["white_delay"])
@ -519,6 +802,9 @@ class Application(tk.Frame):
self.count_down = 10 self.count_down = 10
while queue and self.tracking: while queue and self.tracking:
self.advances += 1 self.advances += 1
if self.advances == self.target:
self.loop.run_until_complete(button_push(self.controller_state,"a"))
print("hit gira?")
w, q = heapq.heappop(queue) w, q = heapq.heappop(queue)
next_time = w - time.perf_counter() or 0 next_time = w - time.perf_counter() or 0
if next_time>0: if next_time>0:

View File

@ -52,7 +52,7 @@ class Xorshift(object):
Returns: Returns:
[int]: random integer value [int]: random integer value
""" """
return self.next() % (ma-mi) + min return self.next() % (ma-mi) + mi
def float(self)->float: def float(self)->float:
"""generate random value in [0,1] """generate random value in [0,1]