Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent loading of shards to datasets.load_from_disk #6464

Merged
merged 8 commits into from
Jan 26, 2024

Conversation

kkoutini
Copy link
Contributor

@kkoutini kkoutini commented Dec 1, 2023

In some file systems (like luster), memory mapping arrow files takes time. This can be accelerated by performing the mmap in parallel on processes or threads.

  • Threads seem to be faster than processes when gathering the list of tables from the workers (see Slow dataloading with big datasets issue persists #2252).
  • I'm not sure if using threads would respect the IN_MEMORY_MAX_SIZE config.
  • I'm not sure if we need to expose num_proc from BaseReader.read to DatasetBuilder.as_dataset. Since  DatasetBuilder.as_dataset is used in many places beside load_dataset.

Tests on luster file system (on a shared partial node):

Loading 1231 shards of ~2GBs.
The files were pre-loaded in another process before the script runs (couldn't get a fresh node).

import logging
from time import perf_counter

import datasets
logger = datasets.logging.get_logger(__name__)
datasets.logging.set_verbosity_info()
logging.basicConfig(level=logging.DEBUG, format="%(message)s")

class catchtime:
    # context to measure loading time: https://stackoverflow.com/questions/33987060/python-context-manager-that-measures-time
    def __init__(self, debug_print="Time", logger=logger):
        self.debug_print = debug_print
        self.logger = logger

    def __enter__(self):
        self.start = perf_counter()
        return self

    def __exit__(self, type, value, traceback):
        self.time = perf_counter() - self.start
        readout = f"{self.debug_print}: {self.time:.3f} seconds"
        self.logger.info(readout)

dataset_path=""

# warmup
with catchtime("Loading in parallel", logger=logger):
   ds = datasets.load_from_disk(dataset_path,num_proc=16)

# num_proc=16
with catchtime("Loading in parallel", logger=logger):
   ds = datasets.load_from_disk(dataset_path,num_proc=16)
# num_proc=32
with catchtime("Loading in parallel", logger=logger):
   ds = datasets.load_from_disk(dataset_path,num_proc=32)
# num_proc=1
with catchtime("Loading in conseq", logger=logger):
   ds = datasets.load_from_disk(dataset_path,num_proc=1)

Run 1

open file: .../dataset_dict.json
Loading the dataset from disk using 16 threads: 100%|██████████| 1231/1231 [01:28<00:00, 13.96shards/s]
Loading in parallel: 88.690 seconds
open file: .../dataset_dict.json
Loading the dataset from disk using 16 threads: 100%|██████████| 1231/1231 [01:48<00:00, 11.31shards/s]
Loading in parallel: 109.339 seconds
open file: .../dataset_dict.json
Loading the dataset from disk using 32 threads: 100%|██████████| 1231/1231 [01:06<00:00, 18.56shards/s]
Loading in parallel: 66.931 seconds
open file: .../dataset_dict.json
Loading the dataset from disk: 100%|██████████| 1231/1231 [05:09<00:00,  3.98shards/s]
Loading in conseq: 309.792 seconds

Run 2

open file: .../dataset_dict.json
Loading the dataset from disk using 16 threads: 100%|██████████| 1231/1231 [01:38<00:00, 12.53shards/s]
Loading in parallel: 98.831 seconds
open file: .../dataset_dict.json
Loading the dataset from disk using 16 threads: 100%|██████████| 1231/1231 [02:01<00:00, 10.16shards/s]
Loading in parallel: 121.669 seconds
open file: .../dataset_dict.json
Loading the dataset from disk using 32 threads: 100%|██████████| 1231/1231 [01:07<00:00, 18.18shards/s]
Loading in parallel: 68.192 seconds
open file: .../dataset_dict.json
Loading the dataset from disk: 100%|██████████| 1231/1231 [05:19<00:00,  3.86shards/s]
Loading in conseq: 319.759 seconds

Run 3

open file: .../dataset_dict.json
Loading the dataset from disk using 16 threads: 100%|██████████| 1231/1231 [01:36<00:00, 12.74shards/s]
Loading in parallel: 96.936 seconds
open file: .../dataset_dict.json
Loading the dataset from disk using 16 threads: 100%|██████████| 1231/1231 [02:00<00:00, 10.24shards/s]
Loading in parallel: 120.761 seconds
open file: .../dataset_dict.json
Loading the dataset from disk using 32 threads: 100%|██████████| 1231/1231 [01:08<00:00, 18.04shards/s]
Loading in parallel: 68.666 seconds
open file: .../dataset_dict.json
Loading the dataset from disk: 100%|██████████| 1231/1231 [05:35<00:00,  3.67shards/s]
Loading in conseq: 335.777 seconds

fix #2252

@kkoutini kkoutini marked this pull request as ready for review December 1, 2023 18:31
@lhoestq
Copy link
Member

lhoestq commented Dec 6, 2023

If we use multithreading no need to ask for num_proc. And maybe we the same numbers of threads as tqdm by default (IIRC it's max(32, cpu_count() + 4)) - you can even use tqdm.contrib.concurrent.thread_map directly to simplify the code

Also you can ignore the IN_MEMORY_MAX_SIZE config for this. This parameter is kinda legacy.

Have you been able to run the benchmark on a fresh node ? The speed up doesn't seem that big in your first report

@kkoutini
Copy link
Contributor Author

kkoutini commented Dec 6, 2023

I got some fresh nodes with the 32 threads I'm loading the dataset with around 315 seconds (without any preloading). Sequentially, it used to take around 1865 seconds.
Ok I'll roll back the changes and switch to tqdm.contrib.concurrent.thread_map without the num_proc parameter.

@kkoutini
Copy link
Contributor Author

kkoutini commented Dec 6, 2023

I switched to tqdm.contrib.concurrent.thread_map the code looks much simpler now.

@kkoutini kkoutini reopened this Dec 6, 2023
Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice speed up !

I just have one comment:

Comment on lines 210 to 212
for f_dict in files:
pa_table: Table = self._get_table_from_filename(f_dict, in_memory=in_memory)
pa_tables.append(pa_table)
Copy link
Member

@lhoestq lhoestq Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this, and also remove the pa_tables = [] a few lines above this

Suggested change
for f_dict in files:
pa_table: Table = self._get_table_from_filename(f_dict, in_memory=in_memory)
pa_tables.append(pa_table)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh! Thanks I missed that :)

@HuggingFaceDocBuilderDev

The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update.

@kkoutini kkoutini requested a review from lhoestq January 25, 2024 09:10
@lhoestq
Copy link
Member

lhoestq commented Jan 25, 2024

Thanks for the update ! Btw you should tell Jack Morris that you added this :) see https://x.com/jxmnop/status/1749812573984461145?s=20

The CI fail is unrelated to this PR - I'm trying to fix it on main right now

@kkoutini
Copy link
Contributor Author

Thanks for the update ! Btw you should tell Jack Morris that you added this :) see https://x.com/jxmnop/status/1749812573984461145?s=20

The CI fail is unrelated to this PR - I'm trying to fix it on main right now

Thank you! I'll let him know :)

@gblazex
Copy link

gblazex commented Jan 25, 2024

great work guys! letting you know here too

@lhoestq lhoestq merged commit 65434e4 into huggingface:main Jan 26, 2024
12 checks passed
Copy link

Show benchmarks

PyArrow==8.0.0

Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.005268 / 0.011353 (-0.006085) 0.003520 / 0.011008 (-0.007488) 0.063247 / 0.038508 (0.024739) 0.032337 / 0.023109 (0.009228) 0.243251 / 0.275898 (-0.032647) 0.265816 / 0.323480 (-0.057664) 0.002960 / 0.007986 (-0.005025) 0.002733 / 0.004328 (-0.001595) 0.048965 / 0.004250 (0.044715) 0.044341 / 0.037052 (0.007289) 0.260352 / 0.258489 (0.001863) 0.288546 / 0.293841 (-0.005295) 0.027903 / 0.128546 (-0.100643) 0.010897 / 0.075646 (-0.064749) 0.210852 / 0.419271 (-0.208419) 0.036302 / 0.043533 (-0.007231) 0.247440 / 0.255139 (-0.007699) 0.263024 / 0.283200 (-0.020176) 0.017732 / 0.141683 (-0.123951) 1.144206 / 1.452155 (-0.307949) 1.206135 / 1.492716 (-0.286581)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.098404 / 0.018006 (0.080398) 0.310268 / 0.000490 (0.309778) 0.000231 / 0.000200 (0.000031) 0.000044 / 0.000054 (-0.000010)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.018342 / 0.037411 (-0.019070) 0.060620 / 0.014526 (0.046094) 0.074248 / 0.176557 (-0.102308) 0.121025 / 0.737135 (-0.616110) 0.075331 / 0.296338 (-0.221008)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.293721 / 0.215209 (0.078512) 2.854259 / 2.077655 (0.776605) 1.520735 / 1.504120 (0.016615) 1.393490 / 1.541195 (-0.147705) 1.494905 / 1.468490 (0.026415) 0.573812 / 4.584777 (-4.010965) 2.418383 / 3.745712 (-1.327329) 2.803916 / 5.269862 (-2.465945) 1.741646 / 4.565676 (-2.824030) 0.063341 / 0.424275 (-0.360934) 0.004950 / 0.007607 (-0.002658) 0.341758 / 0.226044 (0.115714) 3.392918 / 2.268929 (1.123989) 1.867037 / 55.444624 (-53.577587) 1.571381 / 6.876477 (-5.305096) 1.582883 / 2.142072 (-0.559190) 0.663660 / 4.805227 (-4.141567) 0.119587 / 6.500664 (-6.381077) 0.042071 / 0.075469 (-0.033398)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 0.940976 / 1.841788 (-0.900811) 11.841958 / 8.074308 (3.767650) 10.510954 / 10.191392 (0.319562) 0.131927 / 0.680424 (-0.548497) 0.015373 / 0.534201 (-0.518828) 0.294245 / 0.579283 (-0.285038) 0.269355 / 0.434364 (-0.165009) 0.330173 / 0.540337 (-0.210165) 0.436809 / 1.386936 (-0.950127)
PyArrow==latest
Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.005609 / 0.011353 (-0.005744) 0.003800 / 0.011008 (-0.007208) 0.055693 / 0.038508 (0.017185) 0.032606 / 0.023109 (0.009497) 0.302372 / 0.275898 (0.026474) 0.370530 / 0.323480 (0.047050) 0.004291 / 0.007986 (-0.003694) 0.002783 / 0.004328 (-0.001546) 0.049351 / 0.004250 (0.045101) 0.048186 / 0.037052 (0.011133) 0.290022 / 0.258489 (0.031533) 0.323358 / 0.293841 (0.029517) 0.053929 / 0.128546 (-0.074617) 0.011251 / 0.075646 (-0.064395) 0.058885 / 0.419271 (-0.360387) 0.033833 / 0.043533 (-0.009699) 0.283546 / 0.255139 (0.028407) 0.292416 / 0.283200 (0.009216) 0.017682 / 0.141683 (-0.124001) 1.141791 / 1.452155 (-0.310364) 1.202540 / 1.492716 (-0.290177)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.101240 / 0.018006 (0.083233) 0.313274 / 0.000490 (0.312784) 0.000255 / 0.000200 (0.000055) 0.000054 / 0.000054 (-0.000000)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.023144 / 0.037411 (-0.014268) 0.078418 / 0.014526 (0.063892) 0.089716 / 0.176557 (-0.086840) 0.129065 / 0.737135 (-0.608070) 0.090976 / 0.296338 (-0.205362)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.294585 / 0.215209 (0.079376) 2.921350 / 2.077655 (0.843695) 1.600977 / 1.504120 (0.096857) 1.483218 / 1.541195 (-0.057977) 1.533599 / 1.468490 (0.065109) 0.580064 / 4.584777 (-4.004712) 2.463501 / 3.745712 (-1.282211) 2.905853 / 5.269862 (-2.364009) 1.799701 / 4.565676 (-2.765975) 0.065057 / 0.424275 (-0.359218) 0.005080 / 0.007607 (-0.002527) 0.352292 / 0.226044 (0.126248) 3.429664 / 2.268929 (1.160735) 1.970752 / 55.444624 (-53.473872) 1.697151 / 6.876477 (-5.179326) 1.751678 / 2.142072 (-0.390394) 0.679264 / 4.805227 (-4.125963) 0.118197 / 6.500664 (-6.382467) 0.041834 / 0.075469 (-0.033635)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 0.985756 / 1.841788 (-0.856032) 13.335160 / 8.074308 (5.260852) 11.524807 / 10.191392 (1.333415) 0.134892 / 0.680424 (-0.545532) 0.016855 / 0.534201 (-0.517346) 0.294599 / 0.579283 (-0.284685) 0.285988 / 0.434364 (-0.148376) 0.331423 / 0.540337 (-0.208914) 0.418765 / 1.386936 (-0.968171)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Slow dataloading with big datasets issue persists
4 participants