大文件 排序,外存文件的排序操作

  大文件 排序,外存文件的排序操作

  

问题一:一个文件含有5亿行,每行是一个随机整数,需要对该文件所有整数排序。

分治(pideConquer),参考大数据算法:对5亿数据进行排序

  对这个一个500000000行的total.txt进行排序,该文件大小4.6克。

  每读10000行就排序并写入到一个新的子文件里(这里使用的是快速排序)。

  

1.分割 排序

#!/usr/bin/python2.7

  导入时间

  def readline _ by _ yield(bfile):

  用打开(b文件, r )作为rf:

  对于rf:中的行

  屈服线

  极好的快速分类第:号

  if len(lst) 2:

  返回地表温度

  枢轴=lst[0]

  left=[ele for ele in lst[1:]if ele pivot]

  right=[ele for ele in lst[1:]if ele=pivot]

  返回快速排序(左)[透视,]快速排序(右)

  定义split_bfile(bfile):

  计数=0

  nums=[]

  对于读取产量行(bfile):中的行

  num=int(line)

  如果数字不在nums:中

  nums.append(数字)

  if 10000==len(nums):

  nums=快速排序

  带开(’子文件/子文件{}。 txt .格式(计数1),‘w’)为wf:

  wf.write(\n ).join([ str(i) for i in nums ])

  nums[:]=[]

  计数=1

  打印计数

  now=time.time()

  split_bfile(total.txt )

  运行时间。时间()-现在

  打印"运行时: {}"。格式(run_t)会生成50000 个小文件,每个小文件大小约在96K左右。

  程序在执行过程中,内存占用一直处在 5424kB 左右

  整个文件分割完耗时94146 秒。

  

2.合并

#!/usr/bin/python2.7

  # -*-编码: utf-8 -*-

  导入操作系统

  导入时间

  testdir=/ssd/subfile

  now=time.time()

  #第一步:获取全部文件描述符

  fds=[]

  对于os.listdir(testdir):中的f

  ff=os.path.join(testdir,f)

  fds.append(open(ff, r ))

  #第二步:每个文件获取第一行,即当前文件最小值

  nums=[]

  tmp_nums=[]

  对于fds:中的软驱

  num=int(fd.readline())

  tmp_nums.append(数字)

  #第三步:获取当前最小值放入暂存区,并读取对应文件的下一行;循环遍历。

  计数=0

  而1:

  val=min(tmp_nums)

  nums .追加

  idx=tmp_nums.index(val)

  next=fds[idx].读取线()

  # 文件读完了

  如果不是下一个:

  倒三角形

   fds[idx]

   del tmp_nums[idx]

   else:

   tmp_nums[idx] = int(next)

   # 暂存区保存1000个数,一次性写入硬盘,然后清空继续读。

   if 1000 == len(nums):

   with open('final_sorted.txt','a') as wf:

   wf.write('\n'.join([ str(i) for i in nums ]) + '\n')

   nums[:] = []

   if 499999999 == count:

   break

   count += 1

  with open('runtime.txt','w') as wf:

   wf.write('Runtime : {}'.format(time.time()-now))程序在执行过程中,内存占用一直处在240M左右

  跑了38个小时左右,才合并完不到5千万行数据...

  虽然降低了内存使用,但时间复杂度太高了;可以通过减少文件数(每个小文件存储行数增加)来进一步降低内存使用

  

问题二:一个文件有一千亿行数据,每行是一个IP地址,需要对IP地址进行排序。

IP地址转换成数字

# 方法一:手动计算

  In [62]: ip

  Out[62]: '10.3.81.150'

  In [63]: ip.split('.')[::-1]

  Out[63]: ['150', '81', '3', '10']

  In [64]: [ '{}-{}'.format(idx,num) for idx,num in enumerate(ip.split('.')[::-1]) ]

  Out[64]: ['0-150', '1-81', '2-3', '3-10']

  In [65]: [256**idx*int(num) for idx,num in enumerate(ip.split('.')[::-1])]

  Out[65]: [150, 20736, 196608, 167772160]

  In [66]: sum([256**idx*int(num) for idx,num in enumerate(ip.split('.')[::-1])])

  Out[66]: 167989654

  In [67]:

  # 方法二:使用C扩展库来计算

  In [71]: import socket,struct

  In [72]: socket.inet_aton(ip)

  Out[72]: b'\n\x03Q\x96'

  In [73]: struct.unpack("!I", socket.inet_aton(ip))

  # !表示使用网络字节顺序解析, 后面的I表示unsigned int, 对应Python里的integer or long

  Out[73]: (167989654,)

  In [74]: struct.unpack("!I", socket.inet_aton(ip))[0]

  Out[74]: 167989654

  In [75]: socket.inet_ntoa(struct.pack("!I", 167989654))

  Out[75]: '10.3.81.150'

  In [76]:

问题三:有一个1.3GB的文件(共一亿行),里面每一行都是一个字符串,请在文件中找出重复次数最多的字符串。

基本思想:迭代读大文件,把大文件拆分成多个小文件;最后再归并这些小文件。

  拆分的规则

   迭代读大文件,内存中维护一个字典,key是字符串,value是该字符串出现的次数;

  当字典维护的字符串种类达到10000(可自定义)的时候,把该字典按照key从小到大排序,然后写入小文件,每行是 key\tvalue;

  然后清空字典,继续往下读,直到大文件读完。

  归并的规则

   首先获取全部小文件的文件描述符,然后各自读出第一行(即每个小文件字符串ascii值最小的字符串),进行比较。

  找出ascii值最小的字符串,如果有重复的,这把各自出现的次数累加起来,然后把当前字符串和总次数存储到内存中的一个列表。

  然后把最小字符串所在的文件的读指针向下移,即从对应小文件里再读出一行进行下一轮比较。

  当内存中的列表个数达到10000时,则一次性把该列表内容写到一个最终文件里存储到硬盘上。同时清空列表,进行之后的比较。

  一直到读完全部的小文件,那么最后得到的最终文件就是一个按照字符串ascii值升序排序的大的文件,每一行的内容就是 字符串\t重复次数

  最后迭代去读这个最终文件,找出重复次数最多的即可。

  

1. 分割

def readline_by_yield(bfile):

   with open(bfile, 'r') as rf:

   for line in rf:

   yield line

  def split_bfile(bfile):

   count = 0

   d = {}

   for line in readline_by_yield(bfile):

   line = line.strip()

   if line not in d:

   d[line] = 0

   d[line] += 1

   if 10000 == len(d):

   text = ''

   for string in sorted(d):

   text += '{}\t{}\n'.format(string,d[string])

   with open('subfile/subfile{}.txt'.format(count+1),'w') as wf:

   wf.write(text.strip())

   d.clear()

   count += 1

   text = ''

   for string in sorted(d):

   text += '{}\t{}\n'.format(string,d[string])

   with open('subfile/subfile_end.txt','w') as wf:

   wf.write(text.strip())

  split_bfile('bigfile.txt')

2. 归并

import os

  import json

  import time

  import traceback

  testdir = '/ssd/subfile'

  now = time.time()

  # Step 1 : 获取全部文件描述符

  fds = []

  for f in os.listdir(testdir):

   ff = os.path.join(testdir,f)

   fds.append(open(ff,'r'))

  # Step 2 : 每个文件获取第一行

  tmp_strings = []

  tmp_count = []

  for fd in fds:

   line = fd.readline()

   string,count = line.strip().split('\t')

   tmp_strings.append(string)

   tmp_count.append(int(count))

  # Step 3 : 获取当前最小值放入暂存区,并读取对应文件的下一行;循环遍历。

  result = []

  need2del = []

  while True:

   min_str = min(tmp_strings)

   str_idx = [i for i,v in enumerate(tmp_strings) if v==min_str]

   str_count = sum([ int(tmp_count[idx]) for idx in str_idx ])

   result.append('{}\t{}\n'.format(min_str,str_count))

   for idx in str_idx:

   next = fds[idx].readline() # IndexError: list index out of range

   # 文件读完了

   if not next:

   need2del.append(idx)

   else:

   next_string,next_count = next.strip().split('\t')

   tmp_strings[idx] = next_string

   tmp_count[idx] = next_count

   # 暂存区保存10000个记录,一次性写入硬盘,然后清空继续读。

   if 10000 == len(result):

   with open('merged.txt','a') as wf:

   wf.write(''.join(result))

   result[:] = []

   # 注意: 文件读完需要删除文件描述符的时候, 需要逆序删除

   need2del.reverse()

   for idx in need2del:

   del fds[idx]

   del tmp_strings[idx]

   del tmp_count[idx]

   need2del[:] = []

   if 0 == len(fds):

   break

  with open('merged.txt','a') as wf:

   wf.write(''.join(result))

  result[:] = []

归并结果分析:

  分割时内存中维护的字典大小分割的小文件个数归并时需维护的文件描述符个数归并时内存占用归并耗时第一次1000090009000 ~ 0200M归并速度慢,暂未统计完成时间第二次100000900900 ~ 027M归并速度快,只需2572秒3. 查找出现次数最多的字符串及其次数

  

import time

  def read_line(filepath):

   with open(filepath,'r') as rf:

   for line in rf:

   yield line

  start_ts = time.time()

  max_str = None

  max_count = 0

  for line in read_line('merged.txt'):

   string,count = line.strip().split('\t')

   if int(count) > max_count:

   max_count = int(count)

   max_str = string

  print(max_str,max_count)

  print('Runtime {}'.format(time.time()-start_ts))

归并后的文件共9999788行,大小是256M;执行查找耗时27秒,内存占用6480KB。以上就是一文了解大文件排序/外存排序问题的详细内容,更多请关注盛行IT软件开发工作室其它相关文章!

  

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: