Skip to content

hdfs write fail use AsyncWriter #183

@yujiapingyu

Description

@yujiapingyu
def hdfs_copy_stream(src, dst, namenode=None):
    try:
        md5 = hashlib.md5()
        offset = 0
        clt = get_client(src, namenode)
        with clt.read(src, offset=offset, chunk_size=2 ** 16) as reader:
            with clt.write(dst, overwrite=True) as writer:
                for chunk in reader:
                    md5.update(chunk)
                    offset += len(chunk)
                    writer.write(chunk)
        md5_value = md5.hexdigest()
        print('md5 = {}, length = {}'.format(md5_value, offset))
        return RunState.Done, (md5_value, offset)
    except Exception as e:
        print("copy file {} to {} failed: {}".format(src, dst, e))
        return RunState.Error, None

Hi,I implemented a copy method like this.
It works well for large files, but for small files with tens of KB, sometimes the function runs successfully, but the target path of HDFS is indeed a file with a length of 0. I checked it for a long time and found no problem.
When I add time.sleep like this:

offset += len(chunk)
writer.write(chunk)
time.sleep(0.001)

Problem solved.
I really don't know why, so I'm here to ask for your help.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions