Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocess table writing #1063

Open
jpjarnoux opened this issue Oct 6, 2023 · 3 comments
Open

Multiprocess table writing #1063

jpjarnoux opened this issue Oct 6, 2023 · 3 comments

Comments

@jpjarnoux
Copy link

jpjarnoux commented Oct 6, 2023

Hi,
I could understand that it could be possible to write in multiprocessing here. Unfortunately, I could not find the way to do it.
In my case, I have many data that I want to store in different Table of my file, They can be in separate Groups too. Each of my try was raising pickling error or if there was no error my tables were empty.
I write a little example which represent what I'm trying to do.

#!/usr/bin/env python3
# coding:utf-8

from random import randint, seed
import tables
from tqdm import tqdm
from multiprocessing import Lock
from concurrent.futures import ProcessPoolExecutor


table_lock = Lock()


def write_table(table: tables.Table, number_rows: int):
    tqdm_text = "#" + "{}".format(pid).zfill(3)
    row = table.row
    for i in range(1, number_rows + 1):
        row["value"] = i
        row["square"] = i**2
        with table_lock.aquire():
            row.append()
    with table_lock.acquire():
        table.flush()


def main():
    num_processes = 5
    num_jobs = 2
    random_seed = 0
    seed(random_seed)
    h5f = tables.open_file("test.h5", mode="w")
    desc = {"value": tables.UInt32Col(),
            "square": tables.UInt32Col()}
    with ProcessPoolExecutor(max_workers=num_jobs) as executor:
        with tqdm(total=num_processes, unit="table") as pbar:
            futures = []
            for i in range(num_processes):
                expected_rows = randint(0, 100)
                table = h5f.create_table("/", f"table_{i}", description=desc, expectedrows=expected_rows)
                future = executor.submit(write_table, table, expected_rows)
                future.add_done_callback(lambda p: pbar.update())
                futures.append(future)

            for future in futures:
                future.result()


if __name__ == "__main__":
    main()
@avalentino
Copy link
Member

Dear @jpjarnoux , sorry for the late reply.
I think that a good starting point to answer you question is https://www.pytables.org/FAQ.html#can-pytables-be-used-in-concurrent-access-scenarios.

Please also look at the examples directory. There are few examples for mutliprocessing and threading.

@jpjarnoux
Copy link
Author

Hi,
Yes, I already look at all of these but in my case I'm not using array but Table object, and it's not working as expected. I want to store in a Table string, int, boolean value and I assume Table was the right choice to save this information.
Thanks

@avalentino
Copy link
Member

I think that the problem in you case is that table.row maintains a state by updating internal attributes of the table object.
If possible you should try to gave a single writer and use a queue to transmit batches of data from the concurrent worker processes to the writer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants
@avalentino @jpjarnoux and others