python怎么建立socket服务端

先给出一个tcp和udp通过socket协议实现的聊天室的例子 python聊天室(python2.7版本): 都是分别运行server.py和client.py,就可以进行通讯了。 TCP版本: socket-tcp-server.py(服务端): #-*- encoding:utf-8 -*-#socket.getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0)#根据给定的参数...
python怎么建立socket服务端
socket服务器再细分可分为多种了,tcp,udp,websocket,都是调用socket模块,但是具体实现起来有一点细微的差别
先给出一个tcp和udp通过socket协议实现的聊天室的例子
python聊天室(python2.7版本):
都是分别运行server.py和client.py,就可以进行通讯了。
TCP版本:
socket-tcp-server.py(服务端): #-*- encoding:utf-8 -*-#socket.getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0)#根据给定的参数host/port,相应的转换成一个包含用于创建socket对象的五元组,#参数host为域名,以字符串形式给出代表一个IPV4/IPV6地址或者None.#参数port如果字符串形式就代表一个服务名,比如“http”"ftp""email"等,或者为数字,或者为None#参数family为地主族,可以为AF_INET ,AF_INET6 ,AF_UNIX.#参数socktype可以为SOCK_STREAM(TCP)或者SOCK_DGRAM(UDP)#参数proto通常为0可以直接忽略#参数flags为AI_*的组合,比如AI_NUMERICHOST,它会影响函数的返回值#附注:给参数host,port传递None时建立在C基础,通过传递NULL。#该函数返回一个五元组(family, socktype, proto, canonname, sockaddr),同时第五个参数sockaddr也是一个二元组(address, port)#更多的方法及链接请访问# Echo server programfrom socket import *import sysimport threadingfrom time import ctimefrom time import localtimeimport tracebackimport timeimport subprocessreload(sys)sys.setdefaultencoding("utf8") HOST='127.0.0.1'PORT=8555 #设置侦听端口BUFSIZ=1024 class TcpServer(): def __init__(self): self.ADDR=(HOST, PORT) try: self.sock=socket(AF_INET, SOCK_STREAM) print '%d is open' % PORT self.sock.bind(self.ADDR) self.sock.listen(5) #设置退出条件 self.STOP_CHAT=False # 所有监听的客户端 self.clients = {} self.thrs = {} self.stops = [] except Exception,e: print "%d is down" % PORT return False def IsOpen(ip, port): s = socket(AF_INET, SOCK_STREAM) try: s.connect((ip, int(port))) # s.shutdown(2) # 利用shutdown()函数使socket双向数据传输变为单向数据传输。shutdown()需要一个单独的参数, # 该参数表示s了如何关闭socket。具体为:0表示禁止将来读;1表示禁止将来写;2表示禁止将来读和写。 print '%d is open' % port return True except: print '%d is down' % port return False def listen_client(self): while not self.STOP_CHAT: print(u'等待接入,侦听端口:%d' % (PORT)) self.tcpClientSock, self.addr=self.sock.accept() print(u'接受连接,客户端地址:',self.addr) address = self.addr #将建立的client socket链接放到列表self.clients中 self.clients[address] = self.tcpClientSock #分别将每个建立的链接放入进程中,接收且分发消息 self.thrs[address] = threading.Thread(target=self.readmsg, args=[address]) self.thrs[address].start() time.sleep(0.5) def readmsg(self,address): #如果地址不存在,则返回False if address not in self.clients: return False #得到发送消息的client socket client = self.clients[address] while True: try: #获取到消息内容data data=client.recv(BUFSIZ) except: print(e) self.close_client(address) break if not data: break #python3使用bytes,所以要进行编码 #s='%s发送给我的信息是:[%s] %s' %(addr[0],ctime(), data.decode('utf8')) #对日期进行一下格式化 ISOTIMEFORMAT='%Y-%m-%d %X' stime=time.strftime(ISOTIMEFORMAT, localtime()) s=u'%s发送给我的信息是:%s' %(str(address),data.decode('utf8')) #将获得的消息分发给链接中的client socket for k in self.clients: self.clients[k].send(s.encode('utf8')) self.clients[k].sendall('sendall:'+s.encode('utf8')) print str(k) print([stime], ':', data.decode('utf8')) #如果输入quit(忽略大小写),则程序退出 STOP_CHAT=(data.decode('utf8').upper()=="QUIT") if STOP_CHAT: print "quit" self.close_client(address) print "already quit" break def close_client(self,address): try: client = self.clients.pop(address) self.stops.append(address) client.close() for k in self.clients: self.clients[k].send(str(address) + u"已经离开了") except: pass print(str(address)+u'已经退出') if __name__ == '__main__': tserver = TcpServer() tserver.listen_client() ——————————华丽的分割线—————————— socket-tcp-client.py (客户端): #-*- encoding:utf-8 -*-from socket import *import sysimport threadingimport timereload(sys)sys.setdefaultencoding("utf8") #测试,连接本机HOST='127.0.0.1'#设置侦听端口PORT=8555BUFSIZ=1024 class TcpClient: ADDR=(HOST, PORT) def __init__(self): self.HOST = HOST self.PORT = PORT self.BUFSIZ = BUFSIZ #创建socket连接 self.client = socket(AF_INET, SOCK_STREAM) self.client.connect(self.ADDR) #起一个线程,监听接收的信息 self.trecv = threading.Thread(target=self.recvmsg) self.trecv.start() def sendmsg(self): #循环发送聊天消息,如果socket连接存在则一直循环,发送quit时关闭链接 while self.client.connect_ex(self.ADDR): data=raw_input('>:') if not data: break self.client.send(data.encode('utf8')) print(u'发送信息到%s:%s' %(self.HOST,data)) if data.upper()=="QUIT": self.client.close() print u"已关闭" break def recvmsg(self): #接收消息,如果链接一直存在,则持续监听接收消息 try: while self.client.connect_ex(self.ADDR): data=self.client.recv(self.BUFSIZ) print(u'从%s收到信息:%s' %(self.HOST,data.decode('utf8'))) except Exception,e: print str(e) if __name__ == '__main__': client=TcpClient() client.sendmsg()UDP版本:
socket-udp-server.py # -*- coding:utf8 -*- import sysimport timeimport tracebackimport threadingreload(sys)sys.setdefaultencoding('utf-8') import socketimport traceback HOST = "127.0.0.1"PORT = 9555CHECK_PERIOD = 20CHECK_TIMEOUT = 15 class UdpServer(object): def __init__(self): self.clients = [] self.beats = {} self.ADDR = (HOST,PORT) try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.bind(self.ADDR) # 绑定同一个域名下的所有机器 self.beattrs = threading.Thread(target=self.checkheartbeat) self.beattrs.start() except Exception,e: traceback.print_exc() return False def listen_client(self): while True: time.sleep(0.5) print "hohohohohoo" try: recvData,address = self.sock.recvfrom(2048) if not recvData: self.close_client(address) break if address in self.clients: senddata = u"%s发送给我的信息是:%s" %(str(address),recvData.decode('utf8')) if recvData.upper() == "QUIT": self.close_client(address) if recvData == "HEARTBEAT": self.heartbeat(address) continue else: self.clients.append(address) senddata = u"%s发送给我的信息是:%s" %(str(address),u'进入了聊天室') for c in self.clients: try: self.sock.sendto(senddata,c) except Exception,e: print str(e) self.close_client(c) except Exception,e: # traceback.print_exc() print str(e) pass def heartbeat(self,address): self.beats[address] = time.time() def checkheartbeat(self): while True: print "checkheartbeat" print self.beats try: for c in self.clients: print time.time() print self.beats[c] if self.beats[c] + CHECK_TIMEOUT <time.time(): print u"%s心跳超时,连接已经断开" %str(c) self.close_client(c) else: print u"checkp%s,没有断开" %str(c) except Exception,e: traceback.print_exc() print str(e) pass time.sleep(CHECK_PERIOD) def close_client(self,address): try: if address in self.clients: self.clients.remove(address) if self.beats.has_key(address): del self.beats[address] print self.clients for c in self.clients: self.sock.sendto(u'%s已经离开了' % str(address),c) print(str(address)+u'已经退出') except Exception,e: print str(e) raise if __name__ == "__main__": udpServer = UdpServer() udpServer.listen_client() ——————————华丽的分割线—————————— socket-udp-client.py:# -*- coding:utf8 -*- import sysimport threadingimport timereload(sys)sys.setdefaultencoding('utf-8') import socket HOST = "127.0.0.1"PORT = 9555#BEAT_PORT = 43278BEAT_PERIOD = 5 class UdpClient(object): def __init__(self): self.clientsock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) self.HOST = HOST self.ADDR = (HOST,PORT) self.clientsock.sendto(u'请求建立链接',self.ADDR) self.recvtrs = threading.Thread(target=self.recvmsg) self.recvtrs.start() self.hearttrs = threading.Thread(target=self.heartbeat) self.hearttrs.start() def sendmsg(self): while True: data = raw_input(">:") if not data: break self.clientsock.sendto(data.encode('utf-8'),self.ADDR) if data.upper() == 'QUIT': self.clientsock.close() break def heartbeat(self): while True: self.clientsock.sendto('HEARTBEAT',self.ADDR) time.sleep(BEAT_PERIOD) def recvmsg(self): while True: recvData,addr = self.clientsock.recvfrom(1024) if not recvData: break print(u'从%s收到信息:%s' %(self.HOST,recvData.decode('utf8'))) if __name__ == "__main__": udpClient = UdpClient() udpClient.sendmsg()2017-08-02
1
首先先建立一个python文件,命名为 socket_server1.py

2
下面是相关的步骤图.

3
先导入相关的模块.并且定义相关的主机及端口.

4
完整的socket_server1.py文件.

5
设置好之后,通过命令提示符测试(进行测试.开始-----运行-----cmd)

6
先使用python 运行下刚刚的那个文件. >>python socket_server1.py

7
客户端直接使用telnet代替测试一下.>>telnet 127.0.0.1 10086

8
然后在服务端的窗口上面会出现相关的客户端信息,在客户端的窗口上面,输入一个字符,服务器端会显示出来,并且客户端上面会返回一个大写的字符。

9
这个就是一个简单的 python的socket的服务器端了。只是测试,没有排错日志2018-08-02
mengvlog 阅读 6 次 更新于 2025-07-20 01:41:02 我来答关注问题0
  •  文暄生活科普 武汉茑萝:Python之socket模块使用详解(附带解决丢包、粘包问题)

    Python中socket模块是进行网络编程的基础,通过socket创建一个网络连接,可以实现数据在主机间的传输。在socket的使用中,首先需要调用socket()函数创建socket对象,参数family定义网络协议类型,例如AF_INET表示IPv4,SOCK_STREAM表示TCP协议;type则定义socket类型,如SOCK_STREAM用于流式socket,SOCK_DGRAM用于数...

  • 1. 服务端代码(模拟男生端):首先导入socket库,创建一个socket对象,绑定到指定的地址和端口,开始监听连接。当有客户端(女生端)连接时,发送表示困意的信息,比如“我好困呀” 。示例代码如下:pythonimport socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_address...

  • 首先先建立一个python文件,命名为 socket_server1.py2下面是相关的步骤图.3先导入相关的模块.并且定义相关的主机及端口.4完整的socket_server1.py文件.5设置好之后,通过命令提示符测试(进行测试.开始---运行---cmd)6先使用python 运行下刚刚的那个文件. >>python socket_server1.py7客户端直接使用telnet代替测试...

  • 2. 利用网络通信模拟传递(以Socket为例):如果想要模拟更真实的网络环境下传递状态。pythonimport socket# 创建服务器端server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.bind(('localhost', 12345))server_socket.listen(1)print("等待女生连接...")...

  • 可以使用 Java 的 Socket 编程实现前后端之间的数据交互。具体来说,可以在 Java 后端中创建一个服务器程序,在 Python 脚本中使用 Socket 将数据发送到该服务器。以下是一个简单的示例,演示了如何使用 Socket 在 Python 和 Java 之间传递数据:Python 脚本:```python import socket 创建 Socket 对象...

檬味博客在线解答立即免费咨询

报错相关话题

Copyright © 2023 WWW.MENGVLOG.COM - 檬味博客
返回顶部