|
继续在上一篇帖子的基础上实现完整的串口服务器【米尔瑞萨RZ/G2L开发板-创新应用】基于python的tcp、udp、串口收发使用 - 米尔RZ/G2L开发板 - 米尔科技论坛 - Powered by Discuz! (myir-tech.com)
本次需要实现的功能:
1、实现收到udp广播后,发送串口服务器配置信息;
2、收到tcp客户端发送来的数据后,自动转发到串口;
3、收到串口发送来的数据后,自动转发到tcp客户端;
功能实现:
1、收到UDP信息后,对信息内容进行过滤,如果结果匹配,则直接返回到发送端,代码如下:
- # -*- coding: utf-8 -*-
- import socket # 导入socket模块
- import time # 导入time模块
- import threading
- from config import *
- class udp(threading.Thread):
- def __init__(self, ip: str = '', port: int = 9000):
- threading.Thread.__init__(self)
- self.ip = ip
- self.port = port
- def run(self):
- cfg = config()
- cfg.read()
- # 创建一个套接字socket对象,用于进行通讯
- # socket.AF_INET 指明使用INET地址集,进行网间通讯
- # socket.SOCK_DGRAM 指明使用数据协议,即使用传输层的udp协议
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # 接收来自任何IP的广播帧
- address = (self.ip, self.port)
- server_socket.bind(address) # 为服务器绑定一个固定的地址,ip和端口
- # server_socket.settimeout(10) # 设置一个时间提示,如果10秒钟没接到数据进行提示
- while True:
- # 正常情况下接收数据并且显示,如果10秒钟没有接收数据进行提示(打印 'time out')
- # 当然可以不要这个提示,那样的话把'try:' 以及 'except'后的语句删掉就可以了
- try:
- # now = time.time() # 获取当前时间
- # 接收客户端传来的数据 recvfrom接收客户端的数据,默认是阻塞的,直到有客户端传来数据
- # recvfrom 参数的意义,表示最大能接收多少数据,单位是字节
- # recvfrom返回值说明
- # receive_data表示接受到的传来的数据,是bytes类型
- # client 表示传来数据的客户端的身份信息,客户端的ip和端口,元组
- receive_data, client = server_socket.recvfrom(1024)
- # print(time.strftime('%Y-%m-%d %H:%M:%S',
- # time.localtime(now))) # 以指定格式显示时间
- # print('来自客户端%s,发送的%s\n' % (client, receive_data)) # 打印接收的内容
- recv_str = str(receive_data, encoding="utf-8")
- if recv_str == 'where is myir serial port server?':
- ip_addr = cfg.ipv4
- serial_conf = ''
- for x in range(cfg.serial_count):
- serial_conf += cfg.tcp_ports[x]+','
- serial_conf += cfg.port[x]+','
- serial_conf += cfg.baudrate[x]+','
- serial_conf += cfg.parity[x]+';'
- server_socket.sendto(
- bytes(ip_addr+';'+serial_conf, encoding="utf8"), client)
- except socket.timeout:
- print('time out')
复制代码 收到客户端发送的 where is myir serial port server? 后,发送串口及tcp服务器配置信息。
2、收到tcp客户端发送的信息后,自动转发到串口队列
注意:此处难点,需要将socket设置为非阻塞模式,否则无法转发串口收到的数据
- # -*- coding: utf-8 -*-
- import socket # 导入socket模块
- import threading
- from queue import Queue
- import selectors
- import time
- class tcp(threading.Thread):
- def __init__(self, tcp2sci: Queue, sci2tcp: Queue, ip: str = '', port: int = 9000):
- threading.Thread.__init__(self)
- self.ip = ip
- self.port = port
- self.tcp2sci = tcp2sci
- self.sci2tcp = sci2tcp
- # 创建 Selector 对象
- self.sel = selectors.DefaultSelector()
- self.clientAddr = 0
- self.tcp_server_socket = 0
- def run(self):
- self.tcp_server_socket = socket.socket(
- socket.AF_INET, socket.SOCK_STREAM)
- address = (self.ip, self.port)
- self.tcp_server_socket.bind(address)
- self.tcp_server_socket.listen(128)
- self.tcp_server_socket.setblocking(False)
- # 将服务器套接字注册到 Selector 上
- self.sel.register(self.tcp_server_socket,
- selectors.EVENT_READ, self.accept)
- while True:
- events = self.sel.select(timeout=0.001)
- for key, mask in events:
- callback = key.data
- callback(key.fileobj, mask)
- while not self.sci2tcp.empty():
- item = self.sci2tcp.get()
- if self.clientAddr != 0:
- self.tcp_server_socket.sendto(item, self.clientAddr)
- time.sleep(0.001)
- tcp_server_socket.close()
- # 定义回调函数
- def accept(self, sock, mask):
- self.tcp_server_socket, self.clientAddr = sock.accept()
- self.tcp_server_socket.setblocking(False)
- self.sel.register(self.tcp_server_socket,
- selectors.EVENT_READ, self.read)
- def read(self, sock, mask):
- data = self.tcp_server_socket.recv(1024)
- if data:
- self.tcp2sci.put(data)
- else:
- self.sel.unregister(self.tcp_server_socket)
- self.tcp_server_socket.close()
- self.clientAddr = 0
复制代码 3、收到串口发送来的数据后,转发到tcp客户端
- import serial
- import os
- import socket # 导入socket模块
- class config():
- global cfg
- def __init__(self):
- self.serial_count = 1
- self.path = './configurate/port_config'
- self.device_name = 'serial_port_server'
- self.ipv4 = self._get_ip_address()
- self.udp_port = 9000
- self.tcp_start_port = 5000
- self.tcp_ports = []
- self.port = []
- self.baudrate = []
- self.parity = []
- for x in range(self.serial_count):
- self.tcp_ports.append(x + self.tcp_start_port)
- self.port.append('/dev/ttySC' + str(x))
- self.baudrate.append('115200')
- self.parity.append(serial.PARITY_NONE)
- def save(self) -> int:
- list = []
- for x in range(self.serial_count):
- list.append(str(self.tcp_ports[x]) + '\n')
- list.append(self.port[x]+'\n')
- list.append(self.baudrate[x]+'\n')
- list.append(self.parity[x]+'\n')
- if not os.path.exists('./configurate'):
- os.mkdir('./configurate')
- if os.path.exists(self.path):
- os.remove(self.path)
- with open(self.path, 'w') as file:
- file.writelines(list)
- def read(self) -> int:
- if os.path.exists(self.path):
- with open(self.path, 'r') as file:
- list = file.readlines()
- index = 0
- for x in range(self.serial_count):
- self.tcp_ports[x] = list[index].replace('\n','')
- index += 1
- self.port[x] = list[index].replace('\n','')
- index += 1
- self.baudrate[x] = list[index].replace('\n','')
- index += 1
- self.parity[x] = list[index].replace('\n','')
- index += 1
- return 0
-
- def _get_ip_address(self) -> str:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(("8.8.8.8", 80))
- return str(s.getsockname()[0])
复制代码 4、另外需要读取配置文件
- import serial
- import os
- import socket # 导入socket模块
- class config():
- global cfg
- def __init__(self):
- self.serial_count = 1
- self.path = './configurate/port_config'
- self.device_name = 'serial_port_server'
- self.ipv4 = self._get_ip_address()
- self.udp_port = 9000
- self.tcp_start_port = 5000
- self.tcp_ports = []
- self.port = []
- self.baudrate = []
- self.parity = []
- for x in range(self.serial_count):
- self.tcp_ports.append(x + self.tcp_start_port)
- self.port.append('/dev/ttySC' + str(x))
- self.baudrate.append('115200')
- self.parity.append(serial.PARITY_NONE)
- def save(self) -> int:
- list = []
- for x in range(self.serial_count):
- list.append(str(self.tcp_ports[x]) + '\n')
- list.append(self.port[x]+'\n')
- list.append(self.baudrate[x]+'\n')
- list.append(self.parity[x]+'\n')
- if not os.path.exists('./configurate'):
- os.mkdir('./configurate')
- if os.path.exists(self.path):
- os.remove(self.path)
- with open(self.path, 'w') as file:
- file.writelines(list)
- def read(self) -> int:
- if os.path.exists(self.path):
- with open(self.path, 'r') as file:
- list = file.readlines()
- index = 0
- for x in range(self.serial_count):
- self.tcp_ports[x] = list[index].replace('\n','')
- index += 1
- self.port[x] = list[index].replace('\n','')
- index += 1
- self.baudrate[x] = list[index].replace('\n','')
- index += 1
- self.parity[x] = list[index].replace('\n','')
- index += 1
- return 0
-
- def _get_ip_address(self) -> str:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(("8.8.8.8", 80))
- return str(s.getsockname()[0])
复制代码 5、在main中统一对所有线程进行初始化,并构建队列用于串口有tcp服务器之间传递信息
- # -*- coding: utf-8 -*-
- from udp import *
- from tcp import *
- from sci import *
- from config import *
- from threading import Thread
- from queue import Queue
- cfg = 0
- if __name__ == "__main__":
- cfg = config()
- # cfg.save()
- cfg.read()
- # 创建 Thread 实例
- t1 = udp('', cfg.udp_port)
- t1.start()
- tcp_thread = []
- serial_thread = []
- for x in range(cfg.serial_count):
- tcp2sci = Queue(maxsize=1000)
- sci2tcp = Queue(maxsize=1000)
- tcp_thread.append(
- tcp(tcp2sci, sci2tcp, ip=cfg.ipv4, port=int(cfg.tcp_ports[x])))
- # serial_thread.append(
- # sci(sci2tcp, tcp2sci, port=cfg.port[x], baudrate=cfg.baudrate[x], parity=cfg.parity[x]))
- # 启动线程运行
- for x in range(cfg.serial_count):
- tcp_thread[x].start()
- # serial_thread[x].start()
- # 等待所有线程执行完毕
- t1.join() # join() 等待线程终止,要不然一直挂起
- for x in range(cfg.serial_count):
- tcp_thread[x].join()
- # serial_thread[x].join()
复制代码 演示视频如下
https://www.bilibili.com/video/BV1KN411n7Jx/
|
|