# 服务端
import socket
HOST = '127.0.0.1'
PORT = 5000
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((HOST, PORT))
print(f'Server start at: {HOST}:{PORT}')
while True:
data, addr = s.recvfrom(1024)
print(f'Received from {addr}')
data = data.decode('utf-8')
print(data)
s.sendto(bytes(f'Server received {data}', encoding='utf-8'), addr)
s.close()
# 客户端
import socket
HOST = '127.0.0.1'
PORT = 5000
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
cmd = input('Input your msg:')
s.sendto(bytes(cmd, encoding='utf-8'), (HOST, PORT))
data, addr = s.recvfrom(1024)
print(data.decode('utf-8'))
多线程udp聊天器 import socket import threading def send_data(udp_socket, dest_addr): while True: send_msg = input("请输入要发送的数据:") udp_socket.sendto(send_msg.encode('gbk'), dest_addr) udp_socket.close() def recv_data(udp_socket, dest_addr): while True: recv_msg = udp_socket.recvfrom(1024)[0].decode('gbk') print("接收到来自% s 的消息:% s" %(dest_addr, recv_msg)) udp_socket.close() def main(): udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) dest_addr = ('192.168.19.160', 5000) udp_socket.bind(("", 5000)) t1 = threading.Thread(target = recv_data, args = (udp_socket,dest_addr)) t2 = threading.Thread(target = send_data, args = (udp_socket, dest_addr)) t1.start() t2.start() if __name__ == "__main__": main()
from socket import socket, AF_INET, SOCK_DGRAM def socket_server(address, port): server = socket(AF_INET, SOCK_DGRAM) server.bind((address, port)) while True: msg, addr = server.recvfrom(8192) print("Get message from %s: %s" % (addr, msg)) if __name__ == "__main__": socket_server('', 5000)客户端:
from socket import socket, AF_INET, SOCK_DGRAM def socket_client(address, port, send_msg): msg = bytes(send_msg.encode('utf-8')) server = socket(AF_INET, SOCK_DGRAM) server.sendto(msg, (address, port)) if __name__ == "__main__": socket_client('localhost', 5000, "Hello world!")