Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
388 views
in Technique[技术] by (71.8m points)

TypeError while using Pool from multiprocessing (python 3.7)

I'm trying to sum up the sizes of all files in a directory including recursive subdirectories. The relevant function (self._count) works totaly fine if I just call it once. But for large amounts of files I want to use multiprocessing to make the program faster. Here are the relevant parts of the code.

self._sum_dict sums the values of the same keys of the given dicts up.
self._get_file_type returns the category (key for stats) the file shall be placed.
self._categories holds a list of all possible categorys.
number_of_threats specifies the number of workers thal shall be used.
path holds the path to the directory meantioned in the first sentence.

import os
from multiprocessing import Pool

def _count(self, path):
    stats = dict.fromkeys(self._categories, 0)
    try:
        dir_list = os.listdir(path)
    except:
        # I do some warning here, but removed it for SSCCE
        return stats

    for element in dir_list:
        new_path = os.path.join(path, element)

        if os.path.isdir(new_path):
            add_stats = self._count(new_path)
            stats = self._sum_dicts([stats, add_stats])
        else:
            file_type = self._get_file_type(element)
            try:
                size = os.path.getsize(new_path)
            except Exception as e:
                # I do some warning here, but removed it for SSCCE
                continue

            stats[file_type] += size

    return stats

files = []
dirs = []
for e in dir_list:
    new_name = os.path.join(path, e)
    if os.path.isdir(new_name):
        dirs.append(new_name)
    else:
        files.append(new_name)

with Pool(processes=number_of_threats) as pool:
    res = pool.map(self._count, dirs)

self._stats = self._sum_dicts(res)

I know, that this code won't consider files in path, but that is something that I can add easily add. When execuding the code I get the following exception.

Exception has occurred: TypeError
cannot serialize '_io.TextIOWrapper' object
...
line ... in ...
res = pool.map(self._count, dirs)

I found out, that this exception can occure when sharing resources betwenen processes, which - as far as I can see - I only do with stats = dict.fromkeys(self._categories, 0). But replacing this line with hardcoded values won't fix the problem. Even placing a breakpoint at this line won't help me, because it isn't reached.

Does anybody have an idea what the reason for this problem is and how I can fix this?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The problem is you try to transmit "self". If self has a Stream object it can't be serialized.

Try and move the multiprocessed code outside the class.

Python multiprocessing launches a new interpreter and if you try to access shared code that can't be pickled (or serialized) it fails. Usually it doesn't crash where you think it crashed... but when trying to recieve the object. I converted a code using threads to multiprocessing and i had a lot of wierd errors even i didn't sent or used those objects, but i used their parent ( self )


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...