pyqt5 UI冻结ADB&冷冻时也使用scrcpy

发布于 2025-02-13 07:33:30 字数 7025 浏览 0 评论 0原文

我正在尝试使用PYQT5,ADB,SCRCPY和PYTHON创建一个程序。当我单击Connect(启动SCRCPY服务器并镜像屏幕上的Android设备)时,我的UI冻结,直到关闭Scrcpy Session/Server。我将分享用于制作此工作和外观的.py文件代码。

from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from PyQt5 import uic
import os
import sys
from ADBee import *
from twt_start import *

devs = devices()
log_window = []
# GET LIST OF CONNECTED SERIAL NUMBERS
os.chdir('resources/ui/')
class TB_Main_Window(QMainWindow):
    
    def __init__(self):
        super(TB_Main_Window, self).__init__()
        uic.loadUi('main.ui', self)
        self.find_button.clicked.connect(lambda: self.find_devices(devs))
        self.connect_button.clicked.connect(self.connect_to_device)
        self.disconnect_button.clicked.connect(self.disconnect_from_device)
        
        self.connect_button.setEnabled(False)
        self.disconnect_button.setEnabled(False)
        #make = self.device_make.placeholderText.connect(self.get_selected_items)
        
        
        self.show()

 

    def find_devices(self, devs):
        
        count = 0
        try: 
               
            if len(devs) == 0:
                print(" --- No Devices Found --- \n")   
            elif len(devs) > 0:
                for d in devs:
                    self.device_listbox.addItem(devs[count])
                    count += 1
            self.connect_button.setEnabled(True)                                           
        except:
            print("\nCould Not Find Devices\n")


    def get_selected_items(self):
        serial =  self.device_listbox.currentText()   
        print(serial)
        return serial
        
    # CONNECT TO SELECTED DEVICE (SERIAL)
    def connect_to_device(self):
        _serial = self.device_listbox.currentText()
        self.find_button.setEnabled(False)
        self.connect_button.setEnabled(False)
        num_devices = self.device_listbox.count()
        

        if num_devices == 1:
            try:
                twt()          
            except:
                print("\nCould Not Connect To Device\n")
        if num_devices > 1:
            try:
                self.find_button.setEnabled(False)
                self.connect_button.setEnabled(False)
                twt_ws(serial=_serial)
            except:
                print(f'Failed to connect to:{_serial}')
        elif num_devices == 0:
            print(f'\nNo Devices Found\n')
        
    def disconnect_from_device(self):
        self.device_listbox.setEnabled(True)
        self.find_button.setEnabled(True)
        self.connect_button.setEnabled(False)
        try:
            kill_server()
            print(f"Device Disconnect Successfully")
        except:
            print(f"Can't Disconnect From Device")

app = QApplication([])
window = TB_Main_Window()
window.show()
sys.exit(app.exec_())

########################################## ########################################

from subprocess import Popen as send
import subprocess
import os



def twt(window_title='Twitedb3rn', width='480', height='900'):
    
    try:
        orientation()
        console_send = send(
            f"scrcpy --always-on-top --window-title={window_title} --window-width={width} --window-height={height} ",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True)
        _twt, errors = console_send.communicate()
        print(f'\n\tTWT Started\n\n!---START---!\n\n{_twt}\n!---END---!')
        console_send.wait()
    except:
        print(f'\nScrcpy Failed {errors}\n\n')
            

def twt_ws(window_title='Twitedb3rn', width='480', height='900', serial='99031FFBA0083T'):
        
        try:
            orientation_ws(serial)
            new_directoy = os.chdir('resources/scrcpy')
            console_send = send(
                f'scrcpy --always-on-top --window-title={window_title} --window-width={width} --window-height={height} -s{serial}',
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True)
            _twt_ws, errors = console_send.communicate()
            console_send.wait()
            print(f'\n\tTWT Started\n\n!---START---!\n\n{_twt_ws}\n!---END---!')
        except:
            print(f'\nTWT Failed\n\n')
            print(errors)


#adb shell dumpsys window | grep 'mLandscapeRotation'
def orientation_ws(serial):
    try:
        console_send = send(
            f"adb -s {serial} shell content insert --uri content://settings/system --bind name:s:accelerometer_rotation --bind value:i:0",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True)
        _orientation, errors = console_send.communicate()
        console_send.wait()
        print(f'\nScreen Rotation Disabled')
        try:
            home = send(f'adb -s {serial} shell input keyevent KEYCODE_HOME')
            home.communicate()
            home.wait()
            console_send = send(
            f"adb -s {serial} shell content insert --uri content://settings/system --bind name:s:user_rotation --bind value:i:1",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True)
            _orientation, errors = console_send.communicate()
            console_send.wait()
            print(f'\nScreen Set Landscape')
        except:
            print(f'\nScreen Landscape Failed')
            print(errors)
    except:
        print(f'\nScreen Rotation Not Disabled')
        print(errors)
    return errors

def orientation():
    try:
        console_send = send(
            f"adb shell content insert --uri content://settings/system --bind name:s:accelerometer_rotation --bind value:i:0",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True)
        _orientation, errors = console_send.communicate()
        console_send.wait()
        print(f'\nScreen Rotation Disabled')
        try:
            home = send(f'adb shell input keyevent KEYCODE_HOME')
            home.communicate()
            home.wait()
            console_send = send(
            f"adb shell content insert --uri content://settings/system --bind name:s:user_rotation --bind value:i:0",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True)
            _orientation, errors = console_send.communicate()
            console_send.wait()
            print(f'\nScreen Set Porttrait')
        except:
            print(f'\nScreen Portrait Failed')
            print(errors)
    except:
        print(f'\nScreen Portrait Not Disabled')
        print(errors)
    return errors

​href =“ https://i.sstatic.net/hb8ib.jpg” rel =“ nofollow noreferrer”>

I am trying to create a program using PyQt5, ADB, scrcpy, and python. When I click on connect, which starts a scrcpy server and mirrors the android device on the screen, my UI freezes until I close my scrcpy session/server. I will share both .py file codes that are used to make this work and look.

from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from PyQt5 import uic
import os
import sys
from ADBee import *
from twt_start import *

devs = devices()
log_window = []
# GET LIST OF CONNECTED SERIAL NUMBERS
os.chdir('resources/ui/')
class TB_Main_Window(QMainWindow):
    
    def __init__(self):
        super(TB_Main_Window, self).__init__()
        uic.loadUi('main.ui', self)
        self.find_button.clicked.connect(lambda: self.find_devices(devs))
        self.connect_button.clicked.connect(self.connect_to_device)
        self.disconnect_button.clicked.connect(self.disconnect_from_device)
        
        self.connect_button.setEnabled(False)
        self.disconnect_button.setEnabled(False)
        #make = self.device_make.placeholderText.connect(self.get_selected_items)
        
        
        self.show()

 

    def find_devices(self, devs):
        
        count = 0
        try: 
               
            if len(devs) == 0:
                print(" --- No Devices Found --- \n")   
            elif len(devs) > 0:
                for d in devs:
                    self.device_listbox.addItem(devs[count])
                    count += 1
            self.connect_button.setEnabled(True)                                           
        except:
            print("\nCould Not Find Devices\n")


    def get_selected_items(self):
        serial =  self.device_listbox.currentText()   
        print(serial)
        return serial
        
    # CONNECT TO SELECTED DEVICE (SERIAL)
    def connect_to_device(self):
        _serial = self.device_listbox.currentText()
        self.find_button.setEnabled(False)
        self.connect_button.setEnabled(False)
        num_devices = self.device_listbox.count()
        

        if num_devices == 1:
            try:
                twt()          
            except:
                print("\nCould Not Connect To Device\n")
        if num_devices > 1:
            try:
                self.find_button.setEnabled(False)
                self.connect_button.setEnabled(False)
                twt_ws(serial=_serial)
            except:
                print(f'Failed to connect to:{_serial}')
        elif num_devices == 0:
            print(f'\nNo Devices Found\n')
        
    def disconnect_from_device(self):
        self.device_listbox.setEnabled(True)
        self.find_button.setEnabled(True)
        self.connect_button.setEnabled(False)
        try:
            kill_server()
            print(f"Device Disconnect Successfully")
        except:
            print(f"Can't Disconnect From Device")

app = QApplication([])
window = TB_Main_Window()
window.show()
sys.exit(app.exec_())

############################################################################################

from subprocess import Popen as send
import subprocess
import os



def twt(window_title='Twitedb3rn', width='480', height='900'):
    
    try:
        orientation()
        console_send = send(
            f"scrcpy --always-on-top --window-title={window_title} --window-width={width} --window-height={height} ",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True)
        _twt, errors = console_send.communicate()
        print(f'\n\tTWT Started\n\n!---START---!\n\n{_twt}\n!---END---!')
        console_send.wait()
    except:
        print(f'\nScrcpy Failed {errors}\n\n')
            

def twt_ws(window_title='Twitedb3rn', width='480', height='900', serial='99031FFBA0083T'):
        
        try:
            orientation_ws(serial)
            new_directoy = os.chdir('resources/scrcpy')
            console_send = send(
                f'scrcpy --always-on-top --window-title={window_title} --window-width={width} --window-height={height} -s{serial}',
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True)
            _twt_ws, errors = console_send.communicate()
            console_send.wait()
            print(f'\n\tTWT Started\n\n!---START---!\n\n{_twt_ws}\n!---END---!')
        except:
            print(f'\nTWT Failed\n\n')
            print(errors)


#adb shell dumpsys window | grep 'mLandscapeRotation'
def orientation_ws(serial):
    try:
        console_send = send(
            f"adb -s {serial} shell content insert --uri content://settings/system --bind name:s:accelerometer_rotation --bind value:i:0",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True)
        _orientation, errors = console_send.communicate()
        console_send.wait()
        print(f'\nScreen Rotation Disabled')
        try:
            home = send(f'adb -s {serial} shell input keyevent KEYCODE_HOME')
            home.communicate()
            home.wait()
            console_send = send(
            f"adb -s {serial} shell content insert --uri content://settings/system --bind name:s:user_rotation --bind value:i:1",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True)
            _orientation, errors = console_send.communicate()
            console_send.wait()
            print(f'\nScreen Set Landscape')
        except:
            print(f'\nScreen Landscape Failed')
            print(errors)
    except:
        print(f'\nScreen Rotation Not Disabled')
        print(errors)
    return errors

def orientation():
    try:
        console_send = send(
            f"adb shell content insert --uri content://settings/system --bind name:s:accelerometer_rotation --bind value:i:0",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True)
        _orientation, errors = console_send.communicate()
        console_send.wait()
        print(f'\nScreen Rotation Disabled')
        try:
            home = send(f'adb shell input keyevent KEYCODE_HOME')
            home.communicate()
            home.wait()
            console_send = send(
            f"adb shell content insert --uri content://settings/system --bind name:s:user_rotation --bind value:i:0",
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True)
            _orientation, errors = console_send.communicate()
            console_send.wait()
            print(f'\nScreen Set Porttrait')
        except:
            print(f'\nScreen Portrait Failed')
            print(errors)
    except:
        print(f'\nScreen Portrait Not Disabled')
        print(errors)
    return errors

enter image description here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

深海里的那抹蓝 2025-02-20 07:33:31

总而言之,您的ADB通信代码看起来像以下内容一样更好地配制。

请注意,我已经省略了所有尝试/异常;最好处理异常和错误“更高”,而不仅仅是打印出错误,然后让程序继续进行,就好像没有发生任何不好的事情一样。

import subprocess


def set_adb_value(name, value):
    # TODO: this is not safe against shell injection vulnerabilities if `name` and `value`
    #       are user-controlled.
    subprocess.check_call(
        f"adb shell content insert --uri content://settings/system "
        f"--bind name:s:{name} "
        f"--bind value:{value}"
    )


def set_orientation(landscape=False):
    # Disable screen rotation
    set_adb_value("accelerometer_rotation", "i:0")
    # Press home key
    subprocess.check_call("adb shell input keyevent KEYCODE_HOME")
    # Set rotation
    set_adb_value("user_rotation", ("i:1" if landscape else "i:0"))


def start_scrcpy(window_title="Twitedb3rn", width="480", height="900", landscape=False):
    set_orientation(landscape=landscape)
    # TODO: this is not safe against shell injection vulnerabilities.
    return subprocess.Popen(
        f"scrcpy --always-on-top "
        f"--window-title={window_title} "
        f"--window-width={width} "
        f"--window-height={height}",
    )

All in all, it looks like your adb communication code is better formulated as something like the below.

Note I've elided all try/excepts; it's better to handle exceptions and errors "higher up" rather than just print out an error and let the program continue as if nothing bad had happened.

import subprocess


def set_adb_value(name, value):
    # TODO: this is not safe against shell injection vulnerabilities if `name` and `value`
    #       are user-controlled.
    subprocess.check_call(
        f"adb shell content insert --uri content://settings/system "
        f"--bind name:s:{name} "
        f"--bind value:{value}"
    )


def set_orientation(landscape=False):
    # Disable screen rotation
    set_adb_value("accelerometer_rotation", "i:0")
    # Press home key
    subprocess.check_call("adb shell input keyevent KEYCODE_HOME")
    # Set rotation
    set_adb_value("user_rotation", ("i:1" if landscape else "i:0"))


def start_scrcpy(window_title="Twitedb3rn", width="480", height="900", landscape=False):
    set_orientation(landscape=landscape)
    # TODO: this is not safe against shell injection vulnerabilities.
    return subprocess.Popen(
        f"scrcpy --always-on-top "
        f"--window-title={window_title} "
        f"--window-width={width} "
        f"--window-height={height}",
    )
半山落雨半山空 2025-02-20 07:33:31

不,您将需要先了解您的子处理方法,然后使用通信等。
这些popen(或send)现在使用),可以通过subprocess.check_call()来更好地服务。

这是答案。

我删除了:

_twt, errors = console_send.communicate()
console_send.wait()

还有:

_twt_ws, errors = console_send.communicate()
console_send.wait()

No, you will need to understand what you're doing with your subprocesses first, and then use communicate etc. accordingly.
Many of those Popen (or send) uses you have right now would be better served by subprocess.check_call(), by the way.

This was the answer.

I removed:

_twt, errors = console_send.communicate()
console_send.wait()

and also:

_twt_ws, errors = console_send.communicate()
console_send.wait()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文