python日常实用小脚本,Python小脚本
在日常工作中,我们总是会遇到各种各样的问题。下面这篇文章主要介绍一些常用python脚本的信息,通过示例代码非常详细的介绍。有需要的朋友可以参考一下。
00-1010序打印十六进制字符串文件合并多线程下载图集多线程下载图片爬虫抓取信息多线程下载电影名称串口到tcp工具远程读卡器服务器黑客rtcp反向链接调用C动态库示例tcp socket连接消息测试工具消息拼接和加解密测试二进制文件解析工具抓取动画图片抓取网站模板总结
目录
日常生活中经常会有一些小任务,手动处理会很麻烦。
用python做一些小的脚本处理可以提高很多效率。或者你可以用python作为工具,帮助提高办公效率。(比如我经常用python做计算器进行计算和字符转换)
以下是个人使用的一些python脚本的汇总,作为备忘录。
前言
用途:通信消息中的十六进制数据不好看,可以打印成十六进制字符串显示。
#编码=utf-8
#name: myutil.py
def print_hex1(s,prev=0x):
对于s:中的c
打印%sx %(上一次,订单(c)),
打印
def print _ hex :
对于s:中的c
打印x %(订单(c)),
打印
打印 myutil
def print_hex3(s,prev=0x):
i=0
对于s:中的c
打印“%s%s”,%(prev,s[i:i 2]),
i=2
打印
打印16进制字符串
之前MCU中生成的Hex应用文件不能直接刷入MCU,iap程序需要合并成一个文件才能烧录到MCU中。每当你打包有困难的时候,做一个脚本来处理它:
# path= c : \ \ Users \ \ test \ \ IAP _ CZ _ v204w . hex
#file=open(路径, r )
#for ll in file.readlines()
#打印ll
#编码=gb18030
导入时间
导入操作系统
定义prr():
打印“文件组合开始.”
path0=os.getcwd()
打印路径0
路径=路径0
#路径1=路径0
路径2=路径0
path=\\IAP_CZ_v204w.hex
#path1=\\NC_armStaSystem.hex
path2=\\
打印路径
s=raw_input(输入文件路径: )
路径1=s
#path1=\\NC_armStaSystem.hex
打印路径1
s=raw_input(输入文件名: )
路径2=s
path 2=time . strftime( _ % y % M % d % H % M % S )
路径2=。十六进制
打印路径2
prr()
尝试:
f1=打开(路径,“r”)
计数=0
对于f1.readlines()中的l :
#打印l
计数=1
#打印数量
f1.close()
f1=打开(路径,“r”)
f2=开路(路径1, r )
f3=开路(路径2,“w”)
while(count1):
l=f1.readline()
#打印l
f3 .写(l)
计数-=1
#打印数量
f3.flush()
对于f2.readlines()中的l :
f3 .写(l)
f3.flush()
f3.close()
print combination success!
except Exception,ex:
print excettion occured!
print ex
s=raw_input(press any key to continue...)
finally:
f1.close()
f2.close()
s=raw_input(press any key to continue...)
多线程下载图集
网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。
#!/usr/bin/python# -*- coding: utf-8 -*-
# filename: paxel.py
It is a multi-thread downloading tool
It was developed follow axel.
Author: volans
E-mail: volansw [at] gmail.com
import sys
import os
import time
import urllib
from threading import Thread
local_proxies = {http: http://131.139.58.200:8080}
class AxelPython(Thread, urllib.FancyURLopener):
Multi-thread downloading class.
run() is a vitural method of Thread.
def __init__(self, threadname, url, filename, ranges=0, proxies={}):
Thread.__init__(self, name=threadname)
urllib.FancyURLopener.__init__(self, proxies)
self.name = threadname
self.url = url
self.filename = filename
self.ranges = ranges
self.downloaded = 0
def run(self):
vertual function in Thread
try:
self.downloaded = os.path.getsize( self.filename )
except OSError:
#print never downloaded
self.downloaded = 0
# rebuild start poind
self.startpoint = self.ranges[0] + self.downloaded
# This part is completed
if self.startpoint >= self.ranges[1]:
print Part %s has been downloaded over. % self.filename
return
self.oneTimeSize = 16384 #16kByte/time
print task %s will download from %d to %d % (self.name, self.startpoint, self.ranges[1])
self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))
self.urlhandle = self.open( self.url )
data = self.urlhandle.read( self.oneTimeSize )
while data:
filehandle = open( self.filename, ab+ )
filehandle.write( data )
filehandle.close()
self.downloaded += len( data )
#print "%s" % (self.name)
#progress = u\r...
data = self.urlhandle.read( self.oneTimeSize )
def GetUrlFileSize(url, proxies={}):
urlHandler = urllib.urlopen( url, proxies=proxies )
headers = urlHandler.info().headers
length = 0
for header in headers:
if header.find(Length) != -1:
length = header.split(:)[-1].strip()
length = int(length)
return length
def SpliteBlocks(totalsize, blocknumber):
blocksize = totalsize/blocknumber
ranges = []
for i in range(0, blocknumber-1):
ranges.append((i*blocksize, i*blocksize +blocksize - 1))
ranges.append(( blocksize*(blocknumber-1), totalsize -1 ))
return ranges
def islive(tasks):
for task in tasks:
if task.isAlive():
return True
return False
def paxel(url, output, blocks=6, proxies=local_proxies):
paxel
size = GetUrlFileSize( url, proxies )
ranges = SpliteBlocks( size, blocks )
threadname = [ "thread_%d" % i for i in range(0, blocks) ]
filename = [ "tmpfile_%d" % i for i in range(0, blocks) ]
tasks = []
for i in range(0,blocks):
task = AxelPython( threadname[i], url, filename[i], ranges[i] )
task.setDaemon( True )
task.start()
tasks.append( task )
time.sleep( 2 )
while islive(tasks):
downloaded = sum( [task.downloaded for task in tasks] )
process = downloaded/float(size)*100
show = u\rFilesize:%d Downloaded:%d Completed:%.2f%% % (size, downloaded, process)
sys.stdout.write(show)
sys.stdout.flush()
time.sleep( 0.5 )
filehandle = open( output, wb+ )
for i in filename:
f = open( i, rb )
filehandle.write( f.read() )
f.close()
try:
os.remove(i)
pass
except:
pass
filehandle.close()
if __name__ == __main__:
url = "http://xz1.mm667.com/xz84/images/001.jpg"
output = 001.jpg
paxel( url, output, blocks=4, proxies={} )
多线程下载图片
多线程下载图片并存储到指定目录中,若目录不存在则自动创建。
# -*- coding: UTF-8 -*-import re
import urllib
urls=http://xz5.mm667.com/xz82/images/01.jpg
def getHtml(url):
page = urllib.urlopen(url)
html = page.read()
return html
def getImg(html):
reg = rsrc="(.+?\.jpg)" pic_ext
imgre = re.compile(reg)
imglist = imgre.findall(html)
x = 0
for imgurl in imglist:
urllib.urlretrieve(imgurl,%s.jpg % x)
x = x + 1
html = getHtml("http://tieba.baidu.com/p/2460150866")
getImg(html)
import re
import urllib
import threading
import time
import socket
socket.setdefaulttimeout(30)
urls=[]
j=0
for i in xrange(1,81):
if (i-1)%4 == 0:
j += 1
if ((j-1)%5) == 0 :
j=1
site=http://xz%d.mm667.com/xz%02d/images/ %(j,i)
urls.append(site)
print urls[i-1]
#print urls
urls.append(http://xz1.mm667.com/xz01/images/)
urls.append(http://xz1.mm667.com/xz02/images/)
urls.append(http://xz1.mm667.com/xz03/images/)
urls.append(http://xz1.mm667.com/xz04/images/)
urls.append(http://xz1.mm667.com/xz84/images/)
urls.append(http://xz2.mm667.com/xz85/images/)
urls.append(http://xz3.mm667.com/xz86/images/)
urls.append(http://xz1.mm667.com/s/)
urls.append(http://xz1.mm667.com/p/)
def mkdir(path):
# 引入模块
import os
# 去除首位空格
path=path.strip()
# 去除尾部 \ 符号
path=path.rstrip("\\")
# 判断路径是否存在
# 存在 True
# 不存在 False
isExists=os.path.exists(path)
# 判断结果
if not isExists:
# 如果不存在则创建目录
print path+u 创建成功
# 创建目录操作函数
os.makedirs(path)
return True
else:
# 如果目录存在则不创建,并提示目录已存在
print path+u 目录已存在
return False
def cbk(a,b,c):
回调函数
@a: 已经下载的数据块
@b: 数据块的大小
@c: 远程文件的大小
per = 100.0 * a * b / c
if per > 100:
per = 100
print %.2f%% % per
#url = http://www.sina.com.cn
local = d:\\mysite\\pic1\\
d=0
mutex = threading.Lock()
# mutex1 = threading.Lock()
class MyThread(threading.Thread):
def __init__(self, url, name):
threading.Thread.__init__(self)
self.url=url
self.name=name
def run(self):
mutex.acquire()
print down from %s % self.url
time.sleep(1)
mutex.release()
try:
urllib.urlretrieve(self.url, self.name)
except Exception,e:
print e
time.sleep(1)
urllib.urlretrieve(self.url, self.name)
threads=[]
for u in urls[84:]:
d += 1
local = d:\\mysite\\pic1\\%d\\ %d
mkdir(local)
print download begin...
for i in xrange(40):
lcal = local
url=u
url += %03d.jpg %i
lcal += %03d.jpg %i
th = MyThread(url,lcal)
threads.append(th)
th.start()
# for t in threads:
# t.join()
print over! download finished
爬虫抓取信息
#!/usr/bin/env python# -*- coding:utf-8 -*-
"""
Python爬虫,抓取一卡通相关企业信息
Anthor: yangyongzhen
Version: 0.0.2
Date: 2014-12-14
Language: Python2.7.5
Editor: Sublime Text2
"""
import urllib2, re, string
import threading, Queue, time
import sys
import os
from bs4 import BeautifulSoup
#from pprint import pprint
reload(sys)
sys.setdefaultencoding(utf8)
_DATA = []
FILE_LOCK = threading.Lock()
SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 3 #设置线程的个数
_Num = 0 #总条数
class MyThread(threading.Thread) :
def __init__(self, func,num) :
super(MyThread, self).__init__() #调用父类的构造函数
self.func = func #传入线程函数逻辑
self.thread_num = num
def run(self) :
self.func()
#print u线程ID:,self.thread_num
def worker() :
global SHARE_Q
while not SHARE_Q.empty():
url = SHARE_Q.get() #获得任务
my_page = get_page(url)
find_data(my_page) #获得当前页面的数据
#write_into_file(temp_data)
time.sleep(1)
SHARE_Q.task_done()
def get_page(url) :
"""
根据所给的url爬取网页HTML
Args:
url: 表示当前要爬取页面的url
Returns:
返回抓取到整个页面的HTML(unicode编码)
Raises:
URLError:url引发的异常
"""
try :
html = urllib2.urlopen(url).read()
my_page = html.decode("gbk",ignore)
#my_page = unicode(html,utf-8,ignore).encode(utf-8,ignore)
#my_page = urllib2.urlopen(url).read().decode("utf8")
except urllib2.URLError, e :
if hasattr(e, "code"):
print "The server couldnt fulfill the request."
print "Error code: %s" % e.code
elif hasattr(e, "reason"):
print "We failed to reach a server. Please check your url and read the Reason"
print "Reason: %s" % e.reason
return my_page
def find_data(my_page) :
"""
通过返回的整个网页HTML, 正则匹配名称
Args:
my_page: 传入页面的HTML文本用于正则匹配
"""
global _Num
temp_data = []
items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;")
for index, item in enumerate(items) :
#print item
#print item.h1
#print h.group()
#temp_data.append(item)
#print item.find(re.compile("^a"))
href = item.find(re.compile("^a"))
#soup = BeautifulSoup(item)
#公司名称
if item.a:
data = item.a.string.encode("gbk","ignore")
print data
temp_data.append(data)
goods = item.find_all("div", style="font-size:12px;")
#经营产品与联系方式
for i in goods:
data = i.get_text().encode("gbk","ignore")
temp_data.append(data)
print data
#b = item.find_all("b")
#print b
#链接地址
pat = re.compile(rhref="([^"]*)")
h = pat.search(str(item))
if h:
#print h.group(0)
href = h.group(1)
print href
temp_data.append(h.group(1))
_Num += 1
#b = item.find_all(text=re.compile("Dormouse"))
#pprint(goods)
#print href
#pat = re.compile(rtitle="([^"]*)")
#h = pat.search(str(href))
#if h:
#print h.group(1)
#temp_data.append(h.group(1))
_DATA.append(temp_data)
#headers = {User-Agent:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦)
#all_url = http://www.mzitu.com/all ##开始的URL地址
#start_html = requests.get(all_url, headers=headers) ##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容 headers为上面设置的请求头、请务必参考requests官方文档解释
#print(start_html.text) ##打印出start_html (请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent, 对于打印网页内容请使用text)
def main() :
global SHARE_Q
threads = []
start = time.clock()
douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}"
#向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
for index in xrange(20) :
SHARE_Q.put(douban_url.format(page = index * 1))
for i in xrange(_WORKER_THREAD_NUM) :
thread = MyThread(worker,i)
thread.start() #线程开始处理任务
threads.append(thread)
for thread in threads :
thread.join()
SHARE_Q.join()
i = 0
with open("down.txt", "w+") as my_file :
for page in _DATA :
i += 1
for name in page:
my_file.write(name + "\n")
print "Spider Successful!!!"
end = time.clock()
print u抓取完成!
print u总页数:,i
print u总条数:,_Num
print u一共用时:,end-start,u秒
if __name__ == __main__:
main()
爬虫多线程下载电影名称
#!/usr/bin/env python# -*- coding:utf-8 -*-
"""
Python爬虫
Anthor: yangyongzhen
Version: 0.0.2
Date: 2014-12-14
Language: Python2.7.8
Editor: Sublime Text2
"""
import urllib2, re, string
import threading, Queue, time
import sys
import os
from bs4 import BeautifulSoup
reload(sys)
sys.setdefaultencoding(utf8)
_DATA = []
FILE_LOCK = threading.Lock()
SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 3 #设置线程的个数
rootpath = os.getcwd()+u/抓取的内容/
def makedir(path):
if not os.path.isdir(path):
os.makedirs(path)
#创建抓取的根目录
#makedir(rootpath)
#显示下载进度
def Schedule(a,b,c):
a:已经下载的数据块
b:数据块的大小
c:远程文件的大小
per = 100.0 * a * b / c
if per > 100 :
per = 100
print %.2f%% % per
class MyThread(threading.Thread) :
def __init__(self, func) :
super(MyThread, self).__init__() #调用父类的构造函数
self.func = func #传入线程函数逻辑
def run(self) :
self.func()
def worker() :
print work thread start...\n
global SHARE_Q
while not SHARE_Q.empty():
url = SHARE_Q.get() #获得任务
my_page = get_page(url)
find_title(my_page) #获得当前页面的电影名
#write_into_file(temp_data)
time.sleep(1)
SHARE_Q.task_done()
def get_page(url) :
"""
根据所给的url爬取网页HTML
Args:
url: 表示当前要爬取页面的url
Returns:
返回抓取到整个页面的HTML(unicode编码)
Raises:
URLError:url引发的异常
"""
try :
html = urllib2.urlopen(url).read()
my_page = html.decode("utf8")
#my_page = unicode(html,utf-8,ignore).encode(utf-8,ignore)
#my_page = urllib2.urlopen(url).read().decode("utf8")
except urllib2.URLError, e :
if hasattr(e, "code"):
print "The server couldnt fulfill the request."
print "Error code: %s" % e.code
elif hasattr(e, "reason"):
print "We failed to reach a server. Please check your url and read the Reason"
print "Reason: %s" % e.reason
return my_page
def find_title(my_page) :
"""
通过返回的整个网页HTML, 正则匹配前100的电影名称
Args:
my_page: 传入页面的HTML文本用于正则匹配
"""
temp_data = []
movie_items = BeautifulSoup(my_page).findAll(h1)
for index, item in enumerate(movie_items) :
#print item
#print item.h1
pat = re.compile(rhref="([^"]*)")
h = pat.search(str(item))
if h:
#print h.group(0)
href = h.group(1)
print href
temp_data.append(h.group(1))
#print h.group()
#temp_data.append(item)
#print item.find(re.compile("^a"))
href = item.find(re.compile("^a"))
#soup = BeautifulSoup(item)
if item.a:
#print item.a.string
temp_data.append(item.a.string)
#print href
#pat = re.compile(rtitle="([^"]*)")
#h = pat.search(str(href))
#if h:
#print h.group(1)
#temp_data.append(h.group(1))
_DATA.append(temp_data)
def main() :
global SHARE_Q
threads = []
start = time.clock()
douban_url = "http://movie.misszm.com/page/{page}"
#向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
for index in xrange(5) :
SHARE_Q.put(douban_url.format(page = index * 1))
for i in xrange(_WORKER_THREAD_NUM) :
thread = MyThread(worker)
thread.start() #线程开始处理任务
threads.append(thread)
for thread in threads :
thread.join()
SHARE_Q.join()
with open("movie.txt", "w+") as my_file :
for page in _DATA :
for movie_name in page:
my_file.write(movie_name + "\n")
print "Spider Successful!!!"
end = time.clock()
print u抓取完成!
print u一共用时:,end-start,u秒
if __name__ == __main__:
main()
串口转tcp工具
#coding=utf-8#author:yangyongzhen
#QQ:534117529
#CardTest TcpServer - Simple Test Card Tool 1.00
import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import os;
from socket import *
from struct import *;
#from myutil import *;
#name: myutil.py
mylock = threading.RLock()
Server_IP =
Srever_Port =
def print_hex1(s,prev=0x):
for c in s:
print %s%02x %(prev,ord(c)),
def print_hex(s):
for c in s:
print %02x %(ord(c)),
def hexto_str(s):
r =
for c in s:
r += %02x %(ord(c))
return r
def strto_hex(s):
r = s.decode(hex)
return r
#代表服务器为localhost
#在一个非保留端口号上进行监听
class ComThread:
def __init__(self, Port=0):
self.l_serial = None;
self.alive = False;
self.waitEnd = None;
self.port = Port;
#TCP部分
#self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connection = None
#数据
self.snddata =
self.rcvdata =
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait();
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set();
self.alive = False;
self.stop();
def start(self):
self.l_serial = serial.Serial();
self.l_serial.port = self.port;
self.l_serial.baudrate = 115200;
self.l_serial.timeout = 2; #秒
self.l_serial.open();
if self.l_serial.isOpen():
self.waitEnd = threading.Event();
self.alive = True;
print open serial port %d ok!\n %(self.port+1)
print baudrate:115200 \n
self.thread_read = None;
self.thread_read = threading.Thread(target=self.FirstReader);
self.thread_read.setDaemon(1);
self.thread_read.start();
self.thread_write = None;
self.thread_write = threading.Thread(target=self.FirstWriter);
self.thread_write.setDaemon(1);
self.thread_write.start();
#TCP部分
self.thread_TcpClient = None;
self.thread_TcpClient = threading.Thread(target=self.TcpClient);
self.thread_TcpClient.setDaemon(1);
self.thread_TcpClient.start();
self.thread_TcpSend = None;
self.thread_TcpSend = threading.Thread(target=self.TcpSend);
self.thread_TcpSend.setDaemon(1);
self.thread_TcpSend.start();
return True;
else:
return False;
def FirstReader(self):
while self.alive:
# 接收间隔
time.sleep(0.1);
try:
data = ;
n = self.l_serial.inWaiting();
if n:
data = data+self.l_serial.read(n);
#for l in xrange(len(data)):
#print %02X % ord(data[l]),
# 发送数据
print u->请求:
print data;
mylock.acquire()
self.snddata = data
mylock.release()
#print_hex(data);
# 判断结束
except Exception, ex:
print str(ex);
self.waitEnd.set();
self.alive = False;
def FirstWriter(self):
while self.alive:
# 接收间隔
time.sleep(0.1);
try:
#snddata = raw_input(\nenter data send:\n)
if self.rcvdata!=:
self.l_serial.write(self.rcvdata);
print u-<应答:
print self.rcvdata;
mylock.acquire()
self.rcvdata = ;
mylock.release()
#print_hex(snddata);
except Exception, ex:
print str(ex);
self.waitEnd.set();
self.alive = False;
def TcpClient(self):
while True:
# 接收间隔
time.sleep(0.1);
self.connection = socket(AF_INET, SOCK_STREAM);
self.connection.connect((Server_IP, int(Server_Port)));
print Connect to Server OK!;
self.snddata =
self.rcvdata =
while True:
#读取客户端套接字的下一行
data = self.connection.recv(1024)
#如果没有数量的话,那么跳出循环
if not data: break
#发送一个回复至客户端
mylock.acquire()
self.snddata =
self.rcvdata = data
mylock.release()
#connection.send(Echo=> + data)
self.connection.close()
self.waitEnd.set();
self.alive = False;
def TcpSend(self):
while True:
# 接收间隔
time.sleep(0.1);
while True:
time.sleep(0.1);
try:
if not self.connection is None:
if self.snddata != :
self.connection.send(self.snddata)
mylock.acquire()
self.rcvdata =
self.snddata =
mylock.release()
except Exception, ex:
pass
def stop(self):
self.alive = False;
self.thread_read.join();
if self.l_serial.isOpen():
self.l_serial.close();
#测试用部分
if __name__ == __main__:
print Serial to Tcp Tool 1.00\n
print Author:yangyongzhen\n
print QQ:534117529\n
print Copyright (c) **cap 2015-2016.\n
Server_IP = raw_input(please enter ServerIP:)
print Server_IP: %s %(Server_IP)
Server_Port = raw_input(please enter ServerPort:)
print Server_Port: %s %(Server_Port)
com =raw_input(please enter com port(1-9):)
rt = ComThread(int(com)-1);
try:
if rt.start():
rt.waiting();
rt.stop();
else:
pass;
except Exception,se:
print str(se);
if rt.alive:
rt.stop();
os.system("pause")
print ;
print End OK .;
del rt;
远程读卡器server端
很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。
这个就是服务端工具的实现:
#coding=utf-8#author:yangyongzhen
#QQ:534117529
#CardTest TcpServer - Simple Test Card Tool 1.00
import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import os;
from socket import *
from struct import *;
#from myutil import *;
#name: myutil.py
mylock = threading.RLock()
def print_hex1(s,prev=0x):
for c in s:
print %s%02x %(prev,ord(c)),
def print_hex(s):
for c in s:
print %02x %(ord(c)),
def hexto_str(s):
r =
for c in s:
r += %02x %(ord(c))
return r
def strto_hex(s):
r = s.decode(hex)
return r
#代表服务器为localhost
#在一个非保留端口号上进行监听
class ComThread:
def __init__(self, Port=0):
self.l_serial = None;
self.alive = False;
self.waitEnd = None;
self.port = Port;
#TCP部分
self.myHost =
self.myPort = 5050
self.sockobj = socket(AF_INET, SOCK_STREAM)
self.connection = None
#数据
self.snddata =
self.rcvdata =
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait();
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set();
self.alive = False;
self.stop();
def start(self):
self.l_serial = serial.Serial();
self.l_serial.port = self.port;
self.l_serial.baudrate = 115200;
self.l_serial.timeout = 2; #秒
self.l_serial.open();
if self.l_serial.isOpen():
self.waitEnd = threading.Event();
self.alive = True;
print open serial port %d ok!\n %(self.port+1)
print baudrate:115200 \n
self.thread_read = None;
self.thread_read = threading.Thread(target=self.FirstReader);
self.thread_read.setDaemon(1);
self.thread_read.start();
self.thread_write = None;
self.thread_write = threading.Thread(target=self.FirstWriter);
self.thread_write.setDaemon(1);
self.thread_write.start();
#TCP部分
self.thread_TcpServer = None;
self.thread_TcpServer = threading.Thread(target=self.TcpServer);
self.thread_TcpServer.setDaemon(1);
self.thread_TcpServer.start();
self.thread_TcpSend = None;
self.thread_TcpSend = threading.Thread(target=self.TcpSend);
self.thread_TcpSend.setDaemon(1);
self.thread_TcpSend.start();
return True;
else:
return False;
def FirstReader(self):
while self.alive:
# 接收间隔
time.sleep(0.1);
try:
data = ;
n = self.l_serial.inWaiting();
if n:
data = data+self.l_serial.read(n);
#for l in xrange(len(data)):
#print %02X % ord(data[l]),
# 发送数据
print serial recv:
print data;
mylock.acquire()
self.snddata = data
mylock.release()
#print_hex(data);
# 判断结束
except Exception, ex:
print str(ex);
self.waitEnd.set();
self.alive = False;
def FirstWriter(self):
while self.alive:
# 接收间隔
time.sleep(0.1);
try:
#snddata = raw_input(\nenter data send:\n)
if self.rcvdata!=:
self.l_serial.write(self.rcvdata);
print serial send:
print self.rcvdata;
mylock.acquire()
self.rcvdata = ;
mylock.release()
#print_hex(snddata);
except Exception, ex:
print str(ex);
self.waitEnd.set();
self.alive = False;
def TcpServer(self):
self.sockobj.bind((self.myHost, self.myPort))
self.sockobj.listen(10)
print TcpServer listen at 5050 oK!\n
print Waiting for connect...\n
while True:
# 接收间隔
time.sleep(0.1);
self.connection, address = self.sockobj.accept()
print Server connected by, address
self.snddata =
self.rcvdata =
try:
while True:
#读取客户端套接字的下一行
data = self.connection.recv(1024)
#如果没有数量的。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。