微宝网 微宝网
首页
  • 专题

    • ZYNQ
    • 树莓派
    • 信号处理
    • 硬件
    • 大杂烩
  • 笔记

    • 《Git》
    • 《Qt》
    • 《Python》
转载
产品
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

bitQ

饮马江湖,仗剑走天涯
首页
  • 专题

    • ZYNQ
    • 树莓派
    • 信号处理
    • 硬件
    • 大杂烩
  • 笔记

    • 《Git》
    • 《Qt》
    • 《Python》
转载
产品
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Qml内嵌网页
  • Qwebsocket收发测试
  • Qt
bitQ
2023-12-06

Qwebsocket收发测试

# Qwebsocket收发测试

​ 书接前文,这里写个Qwebsocket收发测试的程序,直接用python的websocket的库也可以,但websocket是个新东西,python3.6.8对这个支持并不是太好(python3.9往后的会好些)。

​ 开发环境:

​ python3.6.8

​ PyQt5==5.15.4

client:

iimport sys
from PyQt5.QtCore import QUrl, QTimer
from PyQt5.QtWidgets import QApplication
from PyQt5.QtWebSockets import QWebSocket

class WebSocketClient(QWebSocket):
    def __init__(self, url):
        super().__init__()
        self.connected.connect(self.on_connected)
        self.disconnected.connect(self.close)
        self.error.connect(self.on_error)
        self.textMessageReceived.connect(self.on_message_received)
        self.open(QUrl(url))
        self.timer = QTimer()
        self.timer.timeout.connect(self.send_msg)
        self.timer.start(1000)  # 每隔1秒触发一次定时器

    def on_connected(self):
        print("Connected to WebSocket server")

    def on_error(self, error_code):
        print(f"WebSocket error: {error_code}")

    def on_message_received(self, message):
        print(f"Received message from server: {message}")

    def send_msg(self):
        if self.isValid():
            self.sendTextMessage("Hello, server!")
        else:
            print("send fail!")

if __name__ == "__main__":
    app = QApplication(sys.argv)
    client = WebSocketClient("ws://127.0.0.1:8000")
    sys.exit(app.exec_())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

server:

from PyQt5.QtCore import QCoreApplication
from PyQt5.QtWebSockets import QWebSocketServer, QWebSocket
from PyQt5.QtNetwork import QHostAddress

class WebSocketServer(QWebSocketServer):
    def __init__(self, address, port):
        super().__init__("WebSocket Server", QWebSocketServer.NonSecureMode)
        self.listen(QHostAddress(address), port)
        self.newConnection.connect(self.on_new_connection)

    def on_new_connection(self):
        self.client_socket = self.nextPendingConnection()
        self.client_socket.textMessageReceived.connect(self.on_message_received)
        # client_socket.disconnected.connect(self.on_client_disconnected)

    def on_message_received(self, message):
        client_socket = self.sender()
        print(f"Received message from client: {message}")
        # 在这里可以对接收到的消息进行处理
        client_socket.sendTextMessage("Echo: " + message)

    def on_client_disconnected(self):
        client_socket = self.sender()
        print("Client disconnected")

if __name__ == "__main__":
    app = QCoreApplication([])
    server = WebSocketServer("127.0.0.1", 8000)
    print(f"WebSocket server listening on {server.serverAddress().toString()}:{server.serverPort()}")
    app.exec_()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#Qwebsocket#web
上次更新: 2023/12/06, 13:43:10
Qml内嵌网页

← Qml内嵌网页

最近更新
01
vivado工程异常
02-20
02
制作微信卡片
02-14
03
vivado工程Git版本控制
12-16
更多文章>
Theme by Vdoing | Copyright © 2023-2025 微宝 | MIT License
鲁ICP备14006596号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式