python多线程的使用
多线程适合使用的情景
类似于爬虫这种,需要等待请求的返回的情况,如果是同时读入多个文件并行处理的情形,还是使用multiprocessing模块比较合适
两种使用多进程的方式
Python 线程池原理及实现 说的比较详细,有几种线程池的模型,可以看看
直接用多线程
参考:
本质上就是开了多个线程,同时运行。
主要步骤:
- 开启线程(创建,
start
,存到一个thread_list中) - 等待所有线程都完成,程序结束(
t.join()
)
和queue搭配
参考
Python 线程池原理及实现 说的比较详细
Python Multithreading and Multiprocessing Tutorial
主要的步骤有:
- 创建Queue
- 把相关的数据(比如待下载的url,函数参数等)
put
进Queue - 创建线程,start,每个线程都从queue中获取get到所需的变量(如上述的url,函数参数),执行(对应
run
),最后结束queue.task_done()
- 当整个Queue都被运行之后,程序结束(即
Queue.join()
)
不合适的例子:用多线程分批从hdfs或者本地读入protobuf文件,然后根据条件进行筛选
总结:
- GIL作用:限制多线程同时执行,保证同一时刻只有一个线程执行。
- 文件读写是会阻塞事件循环。
由于GIL锁的存在,即便是开启了多个线程,也仅仅在从hdfs get文件到本地时能够加快处理的速度。
实际上,由于一个时刻只能有一个文件被处理,因此在后续的读入protobuf文件以及根据gid进行筛选的过程中,各个线程是在争抢GIL锁的,并不合适
class ThreadWorker(Thread):
def __init__(self, queue, func, args, retry=1):
super(ThreadWorker, self).__init__()
self.queue = queue
self.func = func
self.retry = retry
self.args = args
def run(self):
while True:
if not self.queue.empty():
retry_count = 0
input_hour_dir = self.queue.get()
while retry_count < self.retry:
try:
retry_count += 1
self.func(input_hour_dir, self.args)
break
except Exception as e:
print('Failed with %s' % e)
if retry_count >= self.retry:
print('Failed after %d attempts' % retry_count)
self.queue.task_done()
print('Finished %s' % input_hour_dir)
def run(input_hour_dir, args):
global all_gids
valid_instances = 0
all_instances = 0
sizeof_size_t = 8
if args.hdfs:
[date, hour] = input_hour_dir.split('/')[-2:]
output_name = 'valid_mp_' + '_'.join([date, hour]) + '.pb'
hr = input_hour_dir + '/*.pb.snappy'
tmp_file = os.path.join(args.output_dir, 'tmp_cascade_{}_{}'.format(date, hour))
print('Hour file:%s, tmp_file:%s, output file: %s' % (hr, tmp_file, output_name))
count_cmd = "hdfs dfs -ls {} | wc -l".format(hr)
count_file_proc = Popen(count_cmd, shell=True, stderr=STDOUT, stdout=PIPE)
count_file_proc.wait()
if count_file_proc.returncode != 0:
print('Failed to get number of files')
return 1
num_pb_snappy = int(count_file_proc.communicate()[0])
print('Got %d .pb.snappy in %s' % (num_pb_snappy, hr))
if not os.path.exists(tmp_file):
if num_pb_snappy == 1:
cmd = "hdfs dfs -get {} {}".format(hr, tmp_file)
elif num_pb_snappy > 1:
cmd = "hdfs dfs -getmerge {} {}".format(hr, tmp_file)
else:
print('Invalid hour file dir %s' % hr)
return 1
child_prc = Popen(cmd, stdout=PIPE, shell=True, stderr=STDOUT)
child_prc.wait()
if child_prc.returncode != 0:
print('Fail to run cmd %s' % cmd)
return 1
else:
print('Skip downloading file %s from hdfs' % tmp_file)
decompressed_tmp = tmp_file + '.pb'
if not os.path.exists(decompressed_tmp):
cmd = 'python -m snappy -d {} {}'.format(tmp_file, decompressed_tmp)
child_prc = Popen(cmd, shell=True, stderr=STDOUT, stdout=PIPE)
child_prc.wait()
# 从这一步开始,后续的步骤都会争抢GIL锁
readfile = open(decompressed_tmp, 'r')
else:
output_name = 'valid_mp' + input_hour_dir.strip().split('.')[-2] + '.pb'
print('Input hour file %s, output file name:%s' % (input_hour_dir, output_name))
# 从这一步开始,后续的步骤都会争抢GIL锁
readfile = open(input_hour_dir, 'r')
instance = proto_parser_pb2.Instance()
with open(os.path.join(args.output_dir, output_name), 'w') as writefile:
while True:
try:
...
# 有效小程序的样本
if str(instance.line_id.gid) in all_gids: # 全局变量同样会被争抢
raw_proto.extend([size_binary[::-1], proto])
writefile.write(''.join(raw_proto))
valid_instances += 1
if all_instances % 1000000 == 0:
print('Read {} instances in {}/{}'.format(all_instances, date, hr))
except Exception as e:
print(e)
print('Got {} mp instances of {}/{} in {} '.format(valid_instances,
date,
hour,
all_instances))
break
# print 'get [{}] instances success'.format(args.count)
def main(args):
if args.hdfs:
cmd = "hdfs dfs -ls {}".format(args.source_dir) + "| awk '{print $8}'"
hour_list, _ = Popen(cmd, stdout=PIPE, shell=True, stderr=STDOUT).communicate()
hour_list = hour_list.strip().split('\n')
# cpus = multiprocessing.cpu_count()
# cores = min(n_hours, cpus)
#
# pool = Pool(3)
else:
hour_list = os.listdir(args.source_dir)
hour_list = [os.path.join(args.source_dir, hr) for hr in hour_list]
n_hr_files = len(hour_list)
print('Got %d files' % n_hr_files)
global all_gids
with open(args.gid_csv, 'r') as f:
reader = csv.reader(f)
next(reader)
all_gids = set(x[0] for x in reader)
queue = Queue()
for hr in hour_list:
queue.put(hr)
num_threads = min(args.num_threads, n_hr_files)
for _ in range(num_threads):
thread = ThreadWorker(queue, run, args)
thread.start()
queue.join()
print('Finished')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-dump_hdfs', dest='hdfs', action='store_true', help='Dump from hdfs')
parser.add_argument('-source_dir', type=str, required=True, help='Source file dir.Can be local or hdfs')
parser.add_argument('-output_dir', type=str, default='/home/tangweitao.walter/Download/test_multi_proc_dump_filter',
help='Output file location')
parser.add_argument('-has_sort_id', type=int, default=1, help='has_sort_id')
parser.add_argument('-kafka_dump', type=int, default=1, help='kafka_dump')
parser.add_argument('-gid_csv', required=False, type=str, default="", help="gid_csv")
parser.add_argument('-num_threads', required=False, type=int, default=3, help="number_of_threads")
args = parser.parse_args()
main(args)
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.