python日常实用小脚本,Python小脚本

  python日常实用小脚本,Python小脚本

  在日常工作中,我们总是会遇到各种各样的问题。下面这篇文章主要介绍一些常用python脚本的信息,通过示例代码非常详细的介绍。有需要的朋友可以参考一下。

  00-1010序打印十六进制字符串文件合并多线程下载图集多线程下载图片爬虫抓取信息多线程下载电影名称串口到tcp工具远程读卡器服务器黑客rtcp反向链接调用C动态库示例tcp socket连接消息测试工具消息拼接和加解密测试二进制文件解析工具抓取动画图片抓取网站模板总结

  

目录

  日常生活中经常会有一些小任务,手动处理会很麻烦。

  用python做一些小的脚本处理可以提高很多效率。或者你可以用python作为工具,帮助提高办公效率。(比如我经常用python做计算器进行计算和字符转换)

  以下是个人使用的一些python脚本的汇总,作为备忘录。

  

前言

  用途:通信消息中的十六进制数据不好看,可以打印成十六进制字符串显示。

  #编码=utf-8

  #name: myutil.py

  def print_hex1(s,prev=0x):

  对于s:中的c

  打印%sx %(上一次,订单(c)),

  打印

  def print _ hex :

  对于s:中的c

  打印x %(订单(c)),

  打印

  打印 myutil

  def print_hex3(s,prev=0x):

  i=0

  对于s:中的c

  打印“%s%s”,%(prev,s[i:i 2]),

  i=2

  打印

  

打印16进制字符串

  之前MCU中生成的Hex应用文件不能直接刷入MCU,iap程序需要合并成一个文件才能烧录到MCU中。每当你打包有困难的时候,做一个脚本来处理它:

  # path= c : \ \ Users \ \ test \ \ IAP _ CZ _ v204w . hex

  #file=open(路径, r )

  #for ll in file.readlines()

  #打印ll

  #编码=gb18030

  导入时间

  导入操作系统

  定义prr():

  打印“文件组合开始.”

  path0=os.getcwd()

  打印路径0

  路径=路径0

  #路径1=路径0

  路径2=路径0

  path=\\IAP_CZ_v204w.hex

  #path1=\\NC_armStaSystem.hex

  path2=\\

  打印路径

  s=raw_input(输入文件路径: )

  路径1=s

  #path1=\\NC_armStaSystem.hex

  打印路径1

  s=raw_input(输入文件名: )

  路径2=s

  path 2=time . strftime( _ % y % M % d % H % M % S )

  路径2=。十六进制

  打印路径2

  prr()

  尝试:

  f1=打开(路径,“r”)

  计数=0

  对于f1.readlines()中的l :

  #打印l

  计数=1

  #打印数量

  f1.close()

  f1=打开(路径,“r”)

  f2=开路(路径1, r )

  f3=开路(路径2,“w”)

  while(count1):

  l=f1.readline()

  #打印l

  f3 .写(l)

  计数-=1

  #打印数量

  f3.flush()

  对于f2.readlines()中的l :

  f3 .写(l)

  f3.flush()

  f3.close()

  print combination success!

  except Exception,ex:

   print excettion occured!

   print ex

   s=raw_input(press any key to continue...)

  finally:

   f1.close()

   f2.close()

   s=raw_input(press any key to continue...)

  

  

  

多线程下载图集

  网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。

  

#!/usr/bin/python

  # -*- coding: utf-8 -*-

  # filename: paxel.py

  It is a multi-thread downloading tool

   It was developed follow axel.

   Author: volans

   E-mail: volansw [at] gmail.com

  import sys

  import os

  import time

  import urllib

  from threading import Thread

  local_proxies = {http: http://131.139.58.200:8080}

  class AxelPython(Thread, urllib.FancyURLopener):

   Multi-thread downloading class.

   run() is a vitural method of Thread.

   def __init__(self, threadname, url, filename, ranges=0, proxies={}):

   Thread.__init__(self, name=threadname)

   urllib.FancyURLopener.__init__(self, proxies)

   self.name = threadname

   self.url = url

   self.filename = filename

   self.ranges = ranges

   self.downloaded = 0

   def run(self):

   vertual function in Thread

   try:

   self.downloaded = os.path.getsize( self.filename )

   except OSError:

   #print never downloaded

   self.downloaded = 0

   # rebuild start poind

   self.startpoint = self.ranges[0] + self.downloaded

   # This part is completed

   if self.startpoint >= self.ranges[1]:

   print Part %s has been downloaded over. % self.filename

   return

   self.oneTimeSize = 16384 #16kByte/time

   print task %s will download from %d to %d % (self.name, self.startpoint, self.ranges[1])

   self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))

   self.urlhandle = self.open( self.url )

   data = self.urlhandle.read( self.oneTimeSize )

   while data:

   filehandle = open( self.filename, ab+ )

   filehandle.write( data )

   filehandle.close()

   self.downloaded += len( data )

   #print "%s" % (self.name)

   #progress = u\r...

   data = self.urlhandle.read( self.oneTimeSize )

  def GetUrlFileSize(url, proxies={}):

   urlHandler = urllib.urlopen( url, proxies=proxies )

   headers = urlHandler.info().headers

   length = 0

   for header in headers:

   if header.find(Length) != -1:

   length = header.split(:)[-1].strip()

   length = int(length)

   return length

  def SpliteBlocks(totalsize, blocknumber):

   blocksize = totalsize/blocknumber

   ranges = []

   for i in range(0, blocknumber-1):

   ranges.append((i*blocksize, i*blocksize +blocksize - 1))

   ranges.append(( blocksize*(blocknumber-1), totalsize -1 ))

   return ranges

  def islive(tasks):

   for task in tasks:

   if task.isAlive():

   return True

   return False

  def paxel(url, output, blocks=6, proxies=local_proxies):

   paxel

   size = GetUrlFileSize( url, proxies )

   ranges = SpliteBlocks( size, blocks )

   threadname = [ "thread_%d" % i for i in range(0, blocks) ]

   filename = [ "tmpfile_%d" % i for i in range(0, blocks) ]

   tasks = []

   for i in range(0,blocks):

   task = AxelPython( threadname[i], url, filename[i], ranges[i] )

   task.setDaemon( True )

   task.start()

   tasks.append( task )

   time.sleep( 2 )

   while islive(tasks):

   downloaded = sum( [task.downloaded for task in tasks] )

   process = downloaded/float(size)*100

   show = u\rFilesize:%d Downloaded:%d Completed:%.2f%% % (size, downloaded, process)

   sys.stdout.write(show)

   sys.stdout.flush()

   time.sleep( 0.5 )

   filehandle = open( output, wb+ )

   for i in filename:

   f = open( i, rb )

   filehandle.write( f.read() )

   f.close()

   try:

   os.remove(i)

   pass

   except:

   pass

   filehandle.close()

  if __name__ == __main__:

   url = "http://xz1.mm667.com/xz84/images/001.jpg"

   output = 001.jpg

   paxel( url, output, blocks=4, proxies={} )

  

  

  

多线程下载图片

  多线程下载图片并存储到指定目录中,若目录不存在则自动创建。

  

# -*- coding: UTF-8 -*-

  import re

  import urllib

  urls=http://xz5.mm667.com/xz82/images/01.jpg

  def getHtml(url):

   page = urllib.urlopen(url)

   html = page.read()

   return html

  def getImg(html):

   reg = rsrc="(.+?\.jpg)" pic_ext

   imgre = re.compile(reg)

   imglist = imgre.findall(html)

   x = 0

   for imgurl in imglist:

   urllib.urlretrieve(imgurl,%s.jpg % x)

   x = x + 1

  html = getHtml("http://tieba.baidu.com/p/2460150866")

  getImg(html)

  import re

  import urllib

  import threading

  import time

  import socket

  socket.setdefaulttimeout(30)

  urls=[]

  j=0

  for i in xrange(1,81):

   if (i-1)%4 == 0:

   j += 1

   if ((j-1)%5) == 0 :

   j=1

   site=http://xz%d.mm667.com/xz%02d/images/ %(j,i)

   urls.append(site)

   print urls[i-1]

  #print urls

  urls.append(http://xz1.mm667.com/xz01/images/)

  urls.append(http://xz1.mm667.com/xz02/images/)

  urls.append(http://xz1.mm667.com/xz03/images/)

  urls.append(http://xz1.mm667.com/xz04/images/)

  urls.append(http://xz1.mm667.com/xz84/images/)

  urls.append(http://xz2.mm667.com/xz85/images/)

  urls.append(http://xz3.mm667.com/xz86/images/)

  urls.append(http://xz1.mm667.com/s/)

  urls.append(http://xz1.mm667.com/p/)

  def mkdir(path):

   # 引入模块

   import os

   # 去除首位空格

   path=path.strip()

   # 去除尾部 \ 符号

   path=path.rstrip("\\")

   # 判断路径是否存在

   # 存在 True

   # 不存在 False

   isExists=os.path.exists(path)

   # 判断结果

   if not isExists:

   # 如果不存在则创建目录

   print path+u 创建成功

   # 创建目录操作函数

   os.makedirs(path)

   return True

   else:

   # 如果目录存在则不创建,并提示目录已存在

   print path+u 目录已存在

   return False

  def cbk(a,b,c):

   回调函数

   @a: 已经下载的数据块

   @b: 数据块的大小

   @c: 远程文件的大小

   per = 100.0 * a * b / c

   if per > 100:

   per = 100

   print %.2f%% % per

  #url = http://www.sina.com.cn

  local = d:\\mysite\\pic1\\

  d=0

  mutex = threading.Lock()

  # mutex1 = threading.Lock()

  class MyThread(threading.Thread):

   def __init__(self, url, name):

   threading.Thread.__init__(self)

   self.url=url

   self.name=name

   def run(self):

   mutex.acquire()

   print

   print down from %s % self.url

   time.sleep(1)

   mutex.release()

   try:

   urllib.urlretrieve(self.url, self.name)

   except Exception,e:

   print e

   time.sleep(1)

   urllib.urlretrieve(self.url, self.name)

  threads=[]

  for u in urls[84:]:

   d += 1

   local = d:\\mysite\\pic1\\%d\\ %d

   mkdir(local)

   print download begin...

   for i in xrange(40):

   lcal = local

   url=u

   url += %03d.jpg %i

   lcal += %03d.jpg %i

   th = MyThread(url,lcal)

   threads.append(th)

   th.start()

  # for t in threads:

  # t.join()

  print over! download finished

  

  

  

爬虫抓取信息

  

#!/usr/bin/env python

  # -*- coding:utf-8 -*-

  """

  Python爬虫,抓取一卡通相关企业信息

  Anthor: yangyongzhen

  Version: 0.0.2

  Date: 2014-12-14

  Language: Python2.7.5

  Editor: Sublime Text2

  """

  import urllib2, re, string

  import threading, Queue, time

  import sys

  import os

  from bs4 import BeautifulSoup

  #from pprint import pprint

  reload(sys)

  sys.setdefaultencoding(utf8)

  _DATA = []

  FILE_LOCK = threading.Lock()

  SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列

  _WORKER_THREAD_NUM = 3 #设置线程的个数

  _Num = 0 #总条数

  class MyThread(threading.Thread) :

   def __init__(self, func,num) :

   super(MyThread, self).__init__() #调用父类的构造函数

   self.func = func #传入线程函数逻辑

   self.thread_num = num

   def run(self) :

   self.func()

   #print u线程ID:,self.thread_num

  def worker() :

   global SHARE_Q

   while not SHARE_Q.empty():

   url = SHARE_Q.get() #获得任务

   my_page = get_page(url)

   find_data(my_page) #获得当前页面的数据

   #write_into_file(temp_data)

   time.sleep(1)

   SHARE_Q.task_done()

  def get_page(url) :

   """

   根据所给的url爬取网页HTML

   Args:

   url: 表示当前要爬取页面的url

   Returns:

   返回抓取到整个页面的HTML(unicode编码)

   Raises:

   URLError:url引发的异常

   """

   try :

   html = urllib2.urlopen(url).read()

   my_page = html.decode("gbk",ignore)

   #my_page = unicode(html,utf-8,ignore).encode(utf-8,ignore)

   #my_page = urllib2.urlopen(url).read().decode("utf8")

   except urllib2.URLError, e :

   if hasattr(e, "code"):

   print "The server couldnt fulfill the request."

   print "Error code: %s" % e.code

   elif hasattr(e, "reason"):

   print "We failed to reach a server. Please check your url and read the Reason"

   print "Reason: %s" % e.reason

   return my_page

  def find_data(my_page) :

   """

   通过返回的整个网页HTML, 正则匹配名称

   Args:

   my_page: 传入页面的HTML文本用于正则匹配

   """

   global _Num

   temp_data = []

   items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;")

   for index, item in enumerate(items) :

   #print item

   #print item.h1

   #print h.group()

   #temp_data.append(item)

   #print item.find(re.compile("^a"))

   href = item.find(re.compile("^a"))

   #soup = BeautifulSoup(item)

   #公司名称

   if item.a:

   data = item.a.string.encode("gbk","ignore")

   print data

   temp_data.append(data)

   goods = item.find_all("div", style="font-size:12px;")

   #经营产品与联系方式

   for i in goods:

   data = i.get_text().encode("gbk","ignore")

   temp_data.append(data)

   print data

   #b = item.find_all("b")

   #print b

   #链接地址

   pat = re.compile(rhref="([^"]*)")

   h = pat.search(str(item))

   if h:

   #print h.group(0)

   href = h.group(1)

   print href

   temp_data.append(h.group(1))

   _Num += 1

   #b = item.find_all(text=re.compile("Dormouse"))

   #pprint(goods)

   #print href

   #pat = re.compile(rtitle="([^"]*)")

   #h = pat.search(str(href))

   #if h:

   #print h.group(1)

   #temp_data.append(h.group(1))

   _DATA.append(temp_data)

  #headers = {User-Agent:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦)

  #all_url = http://www.mzitu.com/all ##开始的URL地址

  #start_html = requests.get(all_url, headers=headers) ##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容 headers为上面设置的请求头、请务必参考requests官方文档解释

  #print(start_html.text) ##打印出start_html (请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent, 对于打印网页内容请使用text)

  def main() :

   global SHARE_Q

   threads = []

   start = time.clock()

   douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}"

   #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务

   for index in xrange(20) :

   SHARE_Q.put(douban_url.format(page = index * 1))

   for i in xrange(_WORKER_THREAD_NUM) :

   thread = MyThread(worker,i)

   thread.start() #线程开始处理任务

   threads.append(thread)

   for thread in threads :

   thread.join()

   SHARE_Q.join()

   i = 0

   with open("down.txt", "w+") as my_file :

   for page in _DATA :

   i += 1

   for name in page:

   my_file.write(name + "\n")

   print "Spider Successful!!!"

   end = time.clock()

   print u抓取完成!

   print u总页数:,i

   print u总条数:,_Num

   print u一共用时:,end-start,u秒

  if __name__ == __main__:

   main()

  

  

  

爬虫多线程下载电影名称

  

#!/usr/bin/env python

  # -*- coding:utf-8 -*-

  """

  Python爬虫

  Anthor: yangyongzhen

  Version: 0.0.2

  Date: 2014-12-14

  Language: Python2.7.8

  Editor: Sublime Text2

  """

  import urllib2, re, string

  import threading, Queue, time

  import sys

  import os

  from bs4 import BeautifulSoup

  reload(sys)

  sys.setdefaultencoding(utf8)

  _DATA = []

  FILE_LOCK = threading.Lock()

  SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列

  _WORKER_THREAD_NUM = 3 #设置线程的个数

  rootpath = os.getcwd()+u/抓取的内容/

  def makedir(path):

   if not os.path.isdir(path):

   os.makedirs(path)

  #创建抓取的根目录

  #makedir(rootpath)

  #显示下载进度

  def Schedule(a,b,c):

   a:已经下载的数据块

   b:数据块的大小

   c:远程文件的大小

   per = 100.0 * a * b / c

   if per > 100 :

   per = 100

   print %.2f%% % per

  class MyThread(threading.Thread) :

   def __init__(self, func) :

   super(MyThread, self).__init__() #调用父类的构造函数

   self.func = func #传入线程函数逻辑

   def run(self) :

   self.func()

  def worker() :

   print work thread start...\n

   global SHARE_Q

   while not SHARE_Q.empty():

   url = SHARE_Q.get() #获得任务

   my_page = get_page(url)

   find_title(my_page) #获得当前页面的电影名

   #write_into_file(temp_data)

   time.sleep(1)

   SHARE_Q.task_done()

  def get_page(url) :

   """

   根据所给的url爬取网页HTML

   Args:

   url: 表示当前要爬取页面的url

   Returns:

   返回抓取到整个页面的HTML(unicode编码)

   Raises:

   URLError:url引发的异常

   """

   try :

   html = urllib2.urlopen(url).read()

   my_page = html.decode("utf8")

   #my_page = unicode(html,utf-8,ignore).encode(utf-8,ignore)

   #my_page = urllib2.urlopen(url).read().decode("utf8")

   except urllib2.URLError, e :

   if hasattr(e, "code"):

   print "The server couldnt fulfill the request."

   print "Error code: %s" % e.code

   elif hasattr(e, "reason"):

   print "We failed to reach a server. Please check your url and read the Reason"

   print "Reason: %s" % e.reason

   return my_page

  def find_title(my_page) :

   """

   通过返回的整个网页HTML, 正则匹配前100的电影名称

   Args:

   my_page: 传入页面的HTML文本用于正则匹配

   """

   temp_data = []

   movie_items = BeautifulSoup(my_page).findAll(h1)

   for index, item in enumerate(movie_items) :

   #print item

   #print item.h1

   pat = re.compile(rhref="([^"]*)")

   h = pat.search(str(item))

   if h:

   #print h.group(0)

   href = h.group(1)

   print href

   temp_data.append(h.group(1))

   #print h.group()

   #temp_data.append(item)

   #print item.find(re.compile("^a"))

   href = item.find(re.compile("^a"))

   #soup = BeautifulSoup(item)

   if item.a:

   #print item.a.string

   temp_data.append(item.a.string)

   #print href

   #pat = re.compile(rtitle="([^"]*)")

   #h = pat.search(str(href))

   #if h:

   #print h.group(1)

   #temp_data.append(h.group(1))

   _DATA.append(temp_data)

  def main() :

   global SHARE_Q

   threads = []

   start = time.clock()

   douban_url = "http://movie.misszm.com/page/{page}"

   #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务

   for index in xrange(5) :

   SHARE_Q.put(douban_url.format(page = index * 1))

   for i in xrange(_WORKER_THREAD_NUM) :

   thread = MyThread(worker)

   thread.start() #线程开始处理任务

   threads.append(thread)

   for thread in threads :

   thread.join()

   SHARE_Q.join()

   with open("movie.txt", "w+") as my_file :

   for page in _DATA :

   for movie_name in page:

   my_file.write(movie_name + "\n")

   print "Spider Successful!!!"

   end = time.clock()

   print u抓取完成!

   print u一共用时:,end-start,u秒

  if __name__ == __main__:

   main()

  

  

  

串口转tcp工具

  

#coding=utf-8

  #author:yangyongzhen

  #QQ:534117529

  #CardTest TcpServer - Simple Test Card Tool 1.00

  import sys,threading,time;

  import serial;

  import binascii,encodings;

  import re;

  import os;

  from socket import *

  from struct import *;

  #from myutil import *;

  #name: myutil.py

  mylock = threading.RLock()

  Server_IP =

  Srever_Port =

  def print_hex1(s,prev=0x):

   for c in s:

   print %s%02x %(prev,ord(c)),

   print

  def print_hex(s):

   for c in s:

   print %02x %(ord(c)),

   print

  def hexto_str(s):

   r =

   for c in s:

   r += %02x %(ord(c))

   return r

  def strto_hex(s):

   r = s.decode(hex)

   return r

  #代表服务器为localhost

  #在一个非保留端口号上进行监听

  class ComThread:

   def __init__(self, Port=0):

   self.l_serial = None;

   self.alive = False;

   self.waitEnd = None;

   self.port = Port;

   #TCP部分

   #self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

   self.connection = None

   #数据

   self.snddata =

   self.rcvdata =

   def waiting(self):

   if not self.waitEnd is None:

   self.waitEnd.wait();

   def SetStopEvent(self):

   if not self.waitEnd is None:

   self.waitEnd.set();

   self.alive = False;

   self.stop();

   def start(self):

   self.l_serial = serial.Serial();

   self.l_serial.port = self.port;

   self.l_serial.baudrate = 115200;

   self.l_serial.timeout = 2; #秒

   self.l_serial.open();

   if self.l_serial.isOpen():

   self.waitEnd = threading.Event();

   self.alive = True;

   print open serial port %d ok!\n %(self.port+1)

   print baudrate:115200 \n

   self.thread_read = None;

   self.thread_read = threading.Thread(target=self.FirstReader);

   self.thread_read.setDaemon(1);

   self.thread_read.start();

   self.thread_write = None;

   self.thread_write = threading.Thread(target=self.FirstWriter);

   self.thread_write.setDaemon(1);

   self.thread_write.start();

   #TCP部分

   self.thread_TcpClient = None;

   self.thread_TcpClient = threading.Thread(target=self.TcpClient);

   self.thread_TcpClient.setDaemon(1);

   self.thread_TcpClient.start();

   self.thread_TcpSend = None;

   self.thread_TcpSend = threading.Thread(target=self.TcpSend);

   self.thread_TcpSend.setDaemon(1);

   self.thread_TcpSend.start();

   return True;

   else:

   return False;

   def FirstReader(self):

   while self.alive:

   # 接收间隔

   time.sleep(0.1);

   try:

   data = ;

   n = self.l_serial.inWaiting();

   if n:

   data = data+self.l_serial.read(n);

   #for l in xrange(len(data)):

   #print %02X % ord(data[l]),

   # 发送数据

   print u->请求:

   print data;

   mylock.acquire()

   self.snddata = data

   mylock.release()

   #print_hex(data);

   # 判断结束

   except Exception, ex:

   print str(ex);

   self.waitEnd.set();

   self.alive = False;

   def FirstWriter(self):

   while self.alive:

   # 接收间隔

   time.sleep(0.1);

   try:

   #snddata = raw_input(\nenter data send:\n)

   if self.rcvdata!=:

   self.l_serial.write(self.rcvdata);

   print u-<应答:

   print self.rcvdata;

   mylock.acquire()

   self.rcvdata = ;

   mylock.release()

   #print_hex(snddata);

   except Exception, ex:

   print str(ex);

   self.waitEnd.set();

   self.alive = False;

   def TcpClient(self):

   while True:

   # 接收间隔

   time.sleep(0.1);

   self.connection = socket(AF_INET, SOCK_STREAM);

   self.connection.connect((Server_IP, int(Server_Port)));

   print Connect to Server OK!;

   self.snddata =

   self.rcvdata =

   while True:

   #读取客户端套接字的下一行

   data = self.connection.recv(1024)

   #如果没有数量的话,那么跳出循环

   if not data: break

   #发送一个回复至客户端

   mylock.acquire()

   self.snddata =

   self.rcvdata = data

   mylock.release()

   #connection.send(Echo=> + data)

   self.connection.close()

   self.waitEnd.set();

   self.alive = False;

   def TcpSend(self):

   while True:

   # 接收间隔

   time.sleep(0.1);

   while True:

   time.sleep(0.1);

   try:

   if not self.connection is None:

   if self.snddata != :

   self.connection.send(self.snddata)

   mylock.acquire()

   self.rcvdata =

   self.snddata =

   mylock.release()

   except Exception, ex:

   pass

   def stop(self):

   self.alive = False;

   self.thread_read.join();

   if self.l_serial.isOpen():

   self.l_serial.close();

  #测试用部分

  if __name__ == __main__:

   print Serial to Tcp Tool 1.00\n

   print Author:yangyongzhen\n

   print QQ:534117529\n

   print Copyright (c) **cap 2015-2016.\n

   Server_IP = raw_input(please enter ServerIP:)

   print Server_IP: %s %(Server_IP)

   Server_Port = raw_input(please enter ServerPort:)

   print Server_Port: %s %(Server_Port)

   com =raw_input(please enter com port(1-9):)

   rt = ComThread(int(com)-1);

   try:

   if rt.start():

   rt.waiting();

   rt.stop();

   else:

   pass;

   except Exception,se:

   print str(se);

   if rt.alive:

   rt.stop();

   os.system("pause")

   print ;

   print End OK .;

   del rt;

  

  

  

远程读卡器server端

  很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。

  这个就是服务端工具的实现:

  

#coding=utf-8

  #author:yangyongzhen

  #QQ:534117529

  #CardTest TcpServer - Simple Test Card Tool 1.00

  import sys,threading,time;

  import serial;

  import binascii,encodings;

  import re;

  import os;

  from socket import *

  from struct import *;

  #from myutil import *;

  #name: myutil.py

  mylock = threading.RLock()

  def print_hex1(s,prev=0x):

   for c in s:

   print %s%02x %(prev,ord(c)),

   print

  def print_hex(s):

   for c in s:

   print %02x %(ord(c)),

   print

  def hexto_str(s):

   r =

   for c in s:

   r += %02x %(ord(c))

   return r

  def strto_hex(s):

   r = s.decode(hex)

   return r

  #代表服务器为localhost

  #在一个非保留端口号上进行监听

  class ComThread:

   def __init__(self, Port=0):

   self.l_serial = None;

   self.alive = False;

   self.waitEnd = None;

   self.port = Port;

   #TCP部分

   self.myHost =

   self.myPort = 5050

   self.sockobj = socket(AF_INET, SOCK_STREAM)

   self.connection = None

   #数据

   self.snddata =

   self.rcvdata =

   def waiting(self):

   if not self.waitEnd is None:

   self.waitEnd.wait();

   def SetStopEvent(self):

   if not self.waitEnd is None:

   self.waitEnd.set();

   self.alive = False;

   self.stop();

   def start(self):

   self.l_serial = serial.Serial();

   self.l_serial.port = self.port;

   self.l_serial.baudrate = 115200;

   self.l_serial.timeout = 2; #秒

   self.l_serial.open();

   if self.l_serial.isOpen():

   self.waitEnd = threading.Event();

   self.alive = True;

   print open serial port %d ok!\n %(self.port+1)

   print baudrate:115200 \n

   self.thread_read = None;

   self.thread_read = threading.Thread(target=self.FirstReader);

   self.thread_read.setDaemon(1);

   self.thread_read.start();

   self.thread_write = None;

   self.thread_write = threading.Thread(target=self.FirstWriter);

   self.thread_write.setDaemon(1);

   self.thread_write.start();

   #TCP部分

   self.thread_TcpServer = None;

   self.thread_TcpServer = threading.Thread(target=self.TcpServer);

   self.thread_TcpServer.setDaemon(1);

   self.thread_TcpServer.start();

   self.thread_TcpSend = None;

   self.thread_TcpSend = threading.Thread(target=self.TcpSend);

   self.thread_TcpSend.setDaemon(1);

   self.thread_TcpSend.start();

   return True;

   else:

   return False;

   def FirstReader(self):

   while self.alive:

   # 接收间隔

   time.sleep(0.1);

   try:

   data = ;

   n = self.l_serial.inWaiting();

   if n:

   data = data+self.l_serial.read(n);

   #for l in xrange(len(data)):

   #print %02X % ord(data[l]),

   # 发送数据

   print serial recv:

   print data;

   mylock.acquire()

   self.snddata = data

   mylock.release()

   #print_hex(data);

   # 判断结束

   except Exception, ex:

   print str(ex);

   self.waitEnd.set();

   self.alive = False;

   def FirstWriter(self):

   while self.alive:

   # 接收间隔

   time.sleep(0.1);

   try:

   #snddata = raw_input(\nenter data send:\n)

   if self.rcvdata!=:

   self.l_serial.write(self.rcvdata);

   print serial send:

   print self.rcvdata;

   mylock.acquire()

   self.rcvdata = ;

   mylock.release()

   #print_hex(snddata);

   except Exception, ex:

   print str(ex);

   self.waitEnd.set();

   self.alive = False;

   def TcpServer(self):

   self.sockobj.bind((self.myHost, self.myPort))

   self.sockobj.listen(10)

   print TcpServer listen at 5050 oK!\n

   print Waiting for connect...\n

   while True:

   # 接收间隔

   time.sleep(0.1);

   self.connection, address = self.sockobj.accept()

   print Server connected by, address

   self.snddata =

   self.rcvdata =

   try:

   while True:

   #读取客户端套接字的下一行

   data = self.connection.recv(1024)

   #如果没有数量的。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: