You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am doing some performance experiments with various tools available for Big Data. The idea is to assess, in a given scenario of premises, which cases each tool, analysis method and storage method are most efficient.
Environment:
Single Machine (local)
Mac OS (2 cores, 8GB RAM)
Python 3.7.10
Dask 2021.4.0
Relational, tabular data
Original file with millions of rows and 70000 columns
File in the initial test phase with 7000 rows and 70000 columns (the one I am using now)
Indexes are important, with timestamps for cross-column correction
I'm experimenting with Pandas, Dask, Hadoop, Spark, on files / storage: CSV, HDF5, Parquet and HDFS.
In initial experiments Pandas was very well given what I like without memory problems. However, in the later phase, thinking about scalability, I imagine that we will have a problem with it. Therefore, the need for DASK tests.
Thus, in the DASK case, we have:
Reading from_csv: works, but too slow (ref.: 1000s to Create DF and Basic Operations). Similar in Pandas
Reading from_parquet: works, very fast (ref.: 32s to Create DF and Basic Operations). Even better than Pandas
Reading from HDFS (not testing yet) Reading from HDF5: This is the topic of the question
We have a particularity with HDF5, which is a different file that I get from the database of the company I work for, with no option to change the structure. Such a file cannot be read with read_hdf, for example. To read it on Pandas I used the h5py library and, using File, I assemble a dict and create the dataframe. It works very well on Pandas, but on DASK I am facing some problems.
Pandas:
import h5py
import numpy as np
import pandas as pd
if __name__ == '__main__':
f = h5py.File(db_path + HDF_file, 'r')
myParams = sorted([int(item) for item in list(f.keys()) if item.isdigit()],reverse=False)
df = pd.DataFrame(index=np.transpose(f['TIME'][()]),data={pname: np.transpose(f[str(pname)][()]) for pname in myParams})
f.close()
DASK:
Basic Code:
import h5py
import os
import shutil
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from dask import dataframe as dd
from dask import array as da
from dask.distributed import Client, LocalCluster
if __name__ == '__main__':
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
f = h5py.File(db_path + HDF_file, 'r')
paramsList = sorted([int(item) for item in list(f.keys()) if item.isdigit()], reverse=False)
# myParams = paramsList[0:1000] # Select some Parameters (columns)
myParams = paramsList
Attempt 1) From_Pandas I believe it could be a good option now, but as long as it doesn't fit in the memory, it won't work. But, still the test was not successful. Apparently above 40,000 parameters / columns it runs, but it returns an error in any subsequent operation. Error = "concurrent.futures._base.CancelledError"
df_DASK = dd.from_pandas(pd.DataFrame(index=np.transpose(f['TIME'][()]), data={pname: np.transpose(f[str(pname)][()]) for pname in myParams}), npartitions=4)
f.close()
result = df_DASK[["Param1", "Param2", "Param3"]].compute()
# plot result
Attempt 2) From_Array I build the dataframe from the array of each parameter / column. It works well and fast up to about 40,000 columns. After that, it returns the same error as before. Error = "concurrent.futures._base.CancelledError"
df_DASK = dd.from_dask_array(da.from_array([f['TIME'][()]] + [f[str(pname)][()] for pname in myParams]).transpose(), columns=["TIME"] + myParams)
df_DASK = df_DASK.set_index("TIME", drop=True, sorted=True)
f.close()
result = df_DASK[["Param1", "Param2", "Param3"]].compute()
# plot result
Attempt 3) I build the dataframe by concatenating arrays. This is the only one that works with the full DB, but it takes a long time (1000s). Still, the following Warning returns: "distributed.worker - WARNING - gc.collect() took 1.716s. This is usually a sign that some tasks handle too many Python objects at the same time. Rechunking the work into smaller tasks might help."
df_arr = [dd.from_array(f['TIME'][()])] + [dd.from_array(f[str(pname)][()]) for pname in myParams]
df_DASK = dd.concat(df_arr, axis=1)
df_DASK.columns = ["TIME"] + myParams
df_DASK = df_DASK.set_index("TIME", drop=True, sorted=True)
f.close()
result = df_DASK[["Param1", "Param2", "Param3"]].compute()
# plot result
if I comment the first 2 lines:
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
It works faster in all cases, but I do know that it is not distributed and I will probably have memory / efficiency issues.
PS: The final tests will still be in a Single Machine, but i7 with 8 cores and 16gb RAM
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello everyone,
I am doing some performance experiments with various tools available for Big Data. The idea is to assess, in a given scenario of premises, which cases each tool, analysis method and storage method are most efficient.
Environment:
I'm experimenting with Pandas, Dask, Hadoop, Spark, on files / storage: CSV, HDF5, Parquet and HDFS.
In initial experiments Pandas was very well given what I like without memory problems. However, in the later phase, thinking about scalability, I imagine that we will have a problem with it. Therefore, the need for DASK tests.
Thus, in the DASK case, we have:
Reading from_csv: works, but too slow (ref.: 1000s to Create DF and Basic Operations). Similar in Pandas
Reading from_parquet: works, very fast (ref.: 32s to Create DF and Basic Operations). Even better than Pandas
Reading from HDFS (not testing yet)
Reading from HDF5: This is the topic of the question
We have a particularity with HDF5, which is a different file that I get from the database of the company I work for, with no option to change the structure. Such a file cannot be read with read_hdf, for example. To read it on Pandas I used the h5py library and, using File, I assemble a dict and create the dataframe. It works very well on Pandas, but on DASK I am facing some problems.
Pandas:
DASK:
Basic Code:
if I comment the first 2 lines:
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
It works faster in all cases, but I do know that it is not distributed and I will probably have memory / efficiency issues.
PS: The final tests will still be in a Single Machine, but i7 with 8 cores and 16gb RAM
I thank you in advance, I appreciate it
Thanks!
Beta Was this translation helpful? Give feedback.
All reactions