# 服务端
import socket
HOST = '127.0.0.1'
PORT = 5000
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((HOST, PORT))
print(f'Server start at: {HOST}:{PORT}')
while True:
data, addr = s.recvfrom(1024)
print(f'Received from {addr}')
data = data.decode('utf-8')
print(data)
s.sendto(bytes(f'Server received {data}', encoding='utf-8'), addr)
s.close()
# 客户端
import socket
HOST = '127.0.0.1'
PORT = 5000
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
cmd = input('Input your msg:')
s.sendto(bytes(cmd, encoding='utf-8'), (HOST, PORT))
data, addr = s.recvfrom(1024)
print(data.decode('utf-8'))
多线程udp聊天器
import socket
import threading
def send_data(udp_socket, dest_addr):
while True:
send_msg = input("请输入要发送的数据:")
udp_socket.sendto(send_msg.encode('gbk'), dest_addr)
udp_socket.close()
def recv_data(udp_socket, dest_addr):
while True:
recv_msg = udp_socket.recvfrom(1024)[0].decode('gbk')
print("接收到来自% s 的消息:% s" %(dest_addr, recv_msg))
udp_socket.close()
def main():
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
dest_addr = ('192.168.19.160', 5000)
udp_socket.bind(("", 5000))
t1 = threading.Thread(target = recv_data, args = (udp_socket,dest_addr))
t2 = threading.Thread(target = send_data, args = (udp_socket, dest_addr))
t1.start()
t2.start()
if __name__ == "__main__":
main()
from socket import socket, AF_INET, SOCK_DGRAM
def socket_server(address, port):
server = socket(AF_INET, SOCK_DGRAM)
server.bind((address, port))
while True:
msg, addr = server.recvfrom(8192)
print("Get message from %s: %s" % (addr, msg))
if __name__ == "__main__":
socket_server('', 5000) 客户端: from socket import socket, AF_INET, SOCK_DGRAM
def socket_client(address, port, send_msg):
msg = bytes(send_msg.encode('utf-8'))
server = socket(AF_INET, SOCK_DGRAM)
server.sendto(msg, (address, port))
if __name__ == "__main__":
socket_client('localhost', 5000, "Hello world!")