Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TelethonClient RAM consumption increases without limit over time #3235

Open
3 tasks done
ehtec opened this issue Dec 7, 2021 · 28 comments
Open
3 tasks done

TelethonClient RAM consumption increases without limit over time #3235

ehtec opened this issue Dec 7, 2021 · 28 comments
Labels
cleanup It would be nice if this code was cleaner or this was fixed (low priority)

Comments

@ehtec
Copy link

ehtec commented Dec 7, 2021

Checklist

  • The error is in the library's code, and not in my own.
  • I have searched for this issue before posting it and there isn't a duplicate.
  • I ran pip install -U https://github.com/LonamiWebs/Telethon/archive/master.zip and triggered the bug in the latest version.

I experience increased memory consumption over time when running telethon in a channel-scanning setup with many parallel sessions for a longer time. For each client, an asyncio worker task is created, which is retrieving channels / groups to scan from an asyncio.Queue. The worker executes a couple of coroutines on each channel. The relevant code is posted below.

Code that causes the issue

# check if a public name is a group or channel or neither
async def is_channel(client, public_name):
    public_name = public_name.lower()
    logging.info('Checking if {0} is a channel'.format(public_name))

    long_tg_api_sleep = await get_option("LONG_TG_API_SLEEP")

    try:
        await asyncio.sleep(long_tg_api_sleep)
        entity = await client.get_entity(public_name)
        if not isinstance(entity, Channel):
            return 2  # neither group nor channel
        elif entity.broadcast == True:
            return 1  # channel
        elif entity.broadcast == False:
            return -1  # chat
        else:
            return 0  # error

    except CancelledError:
        logging.info("Cancelling is_channel task")
        raise CancelledError
    except (UsernameNotOccupiedError, ValueError, UsernameInvalidError):
        logging.debug("Username {0} not occupied".format(public_name))
        return 3  # username not occupied
    except FloodWaitError:
        return 4  # floodwaiterror
    except Exception as e:
        logging.exception(e)
        return 0  # error
# download channel / group info: description / creation date / member count
async def get_channel_info(client, cur, conn, public_name):
    public_name = public_name.lower()
    try:
        logging.info("Downloading channel description of {0}".format(public_name))

        tg_api_sleep = await get_option("TG_API_SLEEP")

        await asyncio.sleep(tg_api_sleep)
        entity = await client.get_entity(public_name)

        await asyncio.sleep(tg_api_sleep)
        ch_full = await client(GetFullChannelRequest(channel=entity))
        channel_info = ch_full.full_chat.about

        # get channel title
        channel_title = entity.title

        # get member count
        member_count = ch_full.full_chat.participants_count

        # get creation date

        creation_date = None

        async for message in client.iter_messages(entity=entity, offset_date=datetime(2000, 1, 1), add_offset=-1, limit=1):
            creation_date = message.date

        logging.info("Adding channel info to database")

        # no relevant code here

    except CancelledError:
        logging.info("Cancelling get_channel_info task")
        raise CancelledError
    except Exception as e:
        logging.exception(e)
# download the profile photo of a group or channel and store it on the disk
async def get_profile_photo(client, public_name):
    public_name = public_name.lower()
    try:
        logging.info("Downloading profile photo of {0}".format(public_name))

        tg_api_sleep = await get_option("TG_API_SLEEP")

        await asyncio.sleep(tg_api_sleep)

        entity = await client.get_entity(public_name)

        path = os.path.join(TGSCRAPER_DATA_PATH, public_name, 'channel_picture')

        await client.download_profile_photo(entity, path)
        return True

    except CancelledError:
        logging.info("Cancelling get_profile_photo task")
        raise CancelledError
    except Exception as e:
        logging.exception(e)
        return False
# download group messages and files
# timestamp: use this argument to specify a timestamp in the past when to stop downloading messages
# file_timestamp: use this argument to specify a timestamp in the past when to continue downloading messages, but stop
#   downloading files. When timestamp > file_timestamp, the setting has no effect.
# this function only differs in terms of message number limits from download_channel.
async def download_group(client, cur, conn, public_name, timestamp=datetime(2005, 1, 1, 1),
                         file_timestamp=datetime(2005, 1, 1, 1), delete_files=False, max_msg_num=None,
                         offset_date=None):

    try:

        max_data_file_size = await get_option("MAX_DATA_FILE_SIZE")
        max_total_file_size = await get_option("MAX_TOTAL_FILE_SIZE")
        tg_api_sleep = await get_option("TG_API_SLEEP")
        measure_seconds = await get_option("MEASURE_SECONDS")
        trigger_requests_num = await get_option("TRIGGER_REQUESTS_NUM")
        max_nr_msgs_group = await get_option("MAX_NR_MSGS_GROUP")

        if bool(max_msg_num):
            max_nr_msgs_group = min(max_nr_msgs_group, max_msg_num)

        iter_options = {}

        if bool(offset_date):
            iter_options.update({'offset_date': offset_date})

        if not bool(public_name):
            raise ValueError("Empty public name supplied to download_group!")

        public_name = public_name.lower()

        if not os.path.isdir(os.path.join(TGSCRAPER_DATA_PATH, public_name)):
            os.mkdir(os.path.join(TGSCRAPER_DATA_PATH, public_name))
            os.mkdir(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'channel_picture'))
            os.mkdir(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'messages'))
            os.mkdir(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'images'))
            os.mkdir(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'documents'))
        else:
            if delete_files:
                clean_folder_contents(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'channel_picture'))
                clean_folder_contents(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'images'))
                clean_folder_contents(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'documents'))

        # message_array = []
        logging.info("Downloading messages from {0}".format(public_name))
        await asyncio.sleep(tg_api_sleep)
        entity = await client.get_input_entity(public_name)

        # filesize_counter = 0 #filesize in Bytes
        filesize_counter = folder_size(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'images')) + folder_size(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'documents'))

        i = 0  # count msgs nr

        lrtime = datetime.now()
        requestslist = []

        with open(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'messages', 'messages.csv'), 'w') as csvfile:
            mywriter = csv.writer(csvfile, delimiter=',')
            with open(os.path.join(TGSCRAPER_DATA_PATH, public_name, 'messages', 'messages.txt'), 'w') as textfile:
                async for message in client.iter_messages(entity, **iter_options):

                    # check if we need to sleep or the last request took long enough
                    deltat = datetime.now() - lrtime
                    if deltat.total_seconds() < tg_api_sleep:
                        await asyncio.sleep(tg_api_sleep - deltat.total_seconds())

                    lrtime = datetime.now()
                    requestslist.append(lrtime)

                    if i % trigger_requests_num == 0:
                        requestslist = [elem for elem in requestslist if (lrtime - elem).total_seconds() < measure_seconds]
                        client.rps = len(requestslist) / measure_seconds
                        async with db_lock:
                            await client.store_rps()

                    # await asyncio.sleep(TG_API_SLEEP)
                    i += 1
                    if i > max_nr_msgs_group:
                        logging.info("MAX_NR_MSGS_GROUP hit for group: {0}. Stopping.".format(public_name))
                        break

                    # check if message is before timestamp, if yes, stop downloading
                    if message.date < timestamp.replace(tzinfo=TIME_ZONE):
                        logging.info("Timestamp date reached: {0}; breaking.".format(message.date))
                        break

                    # logging.info(message.raw_text)
                    if message.text:
                        s1 = "{0}\n\n".format(message.text)
                        # message_array.append(message.text)
                        textfile.write(s1)

                        # also write to csv file
                        msgtime = message.date
                        encodedstr = b64encstr(message.text)
                        mywriter.writerow([str(msgtime), encodedstr])

                    # check if we should download files also
                    if message.date < file_timestamp.replace(tzinfo=TIME_ZONE):
                        logging.info("Do not download files for date: {0}".format(message.date))
                        continue

                    if filesize_counter < max_total_file_size:
                        filemeta = message.file
                        if not filemeta:
                            continue
                        if filemeta.ext not in ALLOWED_DATA_EXT:
                            logging.info("Invalid file extension: skipping {0}".format(message.file.name))
                            logging.info(filemeta.name)
                            continue
                        if not filemeta.size < max_data_file_size:
                            logging.info("File too big: {0} bytes. Skipping {1}".format(filemeta.size, filemeta.name))
                            continue
                        logging.info("Worker {1}: Downloading {0}...".format(filemeta.name, client.db_id))
                        logging.info("Size: {0}".format(filemeta.size))
                        filesize_counter += filemeta.size
                        path = os.path.join(TGSCRAPER_DATA_PATH, public_name, 'images')
                        if filemeta.ext in ALLOWED_DOCUMENT_EXT:
                            path = os.path.join(TGSCRAPER_DATA_PATH, public_name, 'documents')
                        # await asyncio.sleep(TG_API_SLEEP)

                        # check if we need to sleep or the last request took long enough
                        deltat = datetime.now() - lrtime
                        if deltat.total_seconds() < tg_api_sleep:
                            await asyncio.sleep(tg_api_sleep - deltat.total_seconds())

                        lrtime = datetime.now()
                        requestslist.append(lrtime)

                        await client.download_media(message=message, file=path)
                        logging.info("Download finished.")

        logging.warning("Downloaded group: {0}, downloaded file size: {1}".format(public_name, filesize_counter))

    except CancelledError:
        logging.info("Cancelling download_group task")
        raise CancelledError
    except Exception as e:
        logging.exception(e)

public_name is the channel / group name in all coroutines. The worker calls each of these coroutines on the channel / group name one by one before proceeding to the next element from the queue.

Garbage collection is enabled at the top of the file:

import gc
gc.enable()

Issue
The memory used by the script is increasing without limitation, as it can be seen from any system monitoring tool (like gnome-system-monitor).

I used objgraph to get a little more information on what objects are causing the increased memory usage:

print(objgraph.show_growth())

is executed regularly.

An example result:

FrameSummary      3330479    +10615
InputPeerUser     4979274     +8931
tuple              738165     +2375
Future             367760     +1211
cell               365260     +1204
function           407231     +1185
StackSummary       371151     +1142
Context            360580     +1117
InputPeerChannel    43058      +123
InputPeerChat        4390       +14
None
@ingria
Copy link
Contributor

ingria commented Jan 1, 2022

Try clearing the entity cache periodically (see #3073)

@ehtec
Copy link
Author

ehtec commented Jan 1, 2022

Try clearing the entity cache periodically (see #3073)

Thanks for the reply. So you suggest calling

client._entity_cache.clear()

if I have understood correctly how entity cache is implemented in the TelegramClient?

This will result in the client re-fetching entities when using them again I guess. So I can call it every time scraping of a channel is finished, I suppose?

@ingria
Copy link
Contributor

ingria commented Jan 2, 2022

Telegram sends a lot of extra data for your requests. For example, if you request channel history, you’ll also get all linked users (e.g. from post comments) and chats (e.g. from reposted messages or mentions) together with message contents. All that stuff is saved to the entity cache.

In other words, entity cache saves all access_hashes of all objects (users, chats, channels, etc), so that you can reuse them later without calling expensive methods (e.g. resolveUsername).

As far as I understand, you only need your channels' access hashes, so you can either cache them in your app and disable the entity cache entirely, or clear the entity cache selectively (delete everything except InputPeerChannels).

@ehtec
Copy link
Author

ehtec commented Jan 2, 2022

Okay. The route I have chosen for testing is just clearing the entire entity cache after the scraping of a channel is finished. This is no problem, as - at least currently - I don't re-scan lots of channels regularly, and at the current state of the application, The probability that a channel is picked up by the same client the second time is extremely low (less than 1%, I have lots of clients). So there isn't really a use for the entity cache here anyways, as every client has its own.

It would be interesting though to cache some data in my PostgreSQL database to avoid calling resolveUsername again. So each client can load this data from the database shared with other clients. Do you know what the exact objects are that need to be stored to avoid calling resolveUsername again? I thouht about storing the channel id instead of username but I don't know if this is sufficient.

I will keep you updated if clearing the entity cache solved the RAM issue. I restarted the application yesterday with the change, and it has gone up from 300MB to 2.1GB since. Maybe this is just because it needs some time to launch eveything and stabilize. If it stays like that, it is absolutely acceptable for me. (I am not talking about increases of 300MB / month like in the other issue, I am dealing with increases of 10GB / week here, which is obviously a problem even if you have a lot of RAM).

@ingria
Copy link
Contributor

ingria commented Jan 5, 2022

I thought about storing the channel id instead of username but I don't know if this is sufficient

You’ll need to store access hash and channel ID of each channel.

https://core.telegram.org/constructor/inputPeerChannel

IDs are the same across different accounts, access hashes are unique (different accounts will have different access hashes).

@Lonami
Copy link
Member

Lonami commented Jan 24, 2022

Update handling needs to know which entities it has a hash for, because it needs to call getDifference if it's missing a hash. They could be flushed to disk and cleared periodically, but I'm personally not affected by this, so I'm unlikely to look into it any time soon. Note that update handling can also be disabled entirely. If someone is interested in working on this let me know and I can offer some guidance.

v2 has some changes so that only entities from updates are stored in cache (as opposed to any request which may return entities).

@Lonami Lonami added the cleanup It would be nice if this code was cleaner or this was fixed (low priority) label Jan 24, 2022
@ehtec
Copy link
Author

ehtec commented Mar 6, 2022

Finally I have new test data. After cleaning the entity cache after every scan, I get objgraph outputs like this:

FrameSummary 17234282    +27989
tuple         3448227     +5843
function      1762376     +2880
cell          1720427     +2880
Future        1721537     +2852
Context       1715292     +2816
StackSummary  1724518     +2791
None

The RAM consumption growth is slowed down, but after 1-2 weeks of running, the remaining objects fill it up anyways.

@Lonami do you think disabling updates can help against this, and how do I disable them? I quickly checked the docs but could not find what I was looking for. I don't have any update handlers registered.

If you can provide some guidance on how to fix the RAM consumption issue, I'd be very thankful. I will need to fix this sooner or later. Either there is another cache in addition to the entity cache that makes trouble, or there might be some unreleased objects created somewhere else. Maybe not all in telethon itself, might as well be in my post processing.

@Lonami
Copy link
Member

Lonami commented Mar 6, 2022

My recommendation is wait until v2 is released because it's going to change a lot and would probably void any attempt at looking at it now. Unfortunately the v2 release is probably not going to be sooner rather than later.

@ehtec
Copy link
Author

ehtec commented Mar 27, 2022

My recommendation is wait until v2 is released because it's going to change a lot and would probably void any attempt at looking at it now. Unfortunately the v2 release is probably not going to be sooner rather than later.

Ok, thank you. Is there an ETA?

@alexgoryushkin
Copy link
Contributor

Hi Lonami, I also faced this problem. I think I found a memory leak: _event_handler_tasks in the client is cleared only when the connection is closed. And during the work of the client, it is constantly filled with new tasks, but completed tasks for some reason are not deleted from it, or are deleted, but not all.

Set _event_handler_tasks via debugger:

image

@Lonami
Copy link
Member

Lonami commented Dec 15, 2022

The only place where that set is filled is here:

task = self.loop.create_task(self._dispatch_update(updates_to_dispatch.popleft()))
self._event_handler_tasks.add(task)
task.add_done_callback(lambda _: self._event_handler_tasks.discard(task))

As far as I can tell, when a task is done, it cleans after itself. This is pretty much what Python's documentation recommends doing (funny, I don't recall this being in their docs):

Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done. For reliable “fire-and-forget” background tasks, gather them in a collection:

background_tasks = set()

for i in range(10):
    task = asyncio.create_task(some_coro(param=i))

    # Add task to the set. This creates a strong reference.
    background_tasks.add(task)

    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(background_tasks.discard)

Perhaps you could try changing line 274 with the following:

task.add_done_callback(self._event_handler_tasks.discard)

and report back. As far as I can tell though, that won't be any better (in fact it may be worse if the task is wrapped in another way as it won't be discarded).

In any case, _event_handler_tasks "only" has 458 items in your screenshot, which, depending on the code, I would not call a leak (or a very large one, but maybe you took the screenshot early).

I suspect the leak may be somewhere else. But you can also comment out this line entirely and see if it helps (you may get some "task was garbage collected" messages though).

@alexgoryushkin
Copy link
Contributor

Perhaps you could try changing line 274 with the following:

task.add_done_callback(self._event_handler_tasks.discard)

It really helped.

Comparison len(client._event_handler_tasks):
task.add_done_callback(self._event_handler_tasks.discard)

2022-12-16 00:03:45.989679 0
2022-12-16 00:03:55.992020 0
2022-12-16 00:04:05.994626 0
2022-12-16 00:04:16.010222 0
2022-12-16 00:04:26.010428 1
2022-12-16 00:04:36.017246 0
2022-12-16 00:04:46.027102 0
2022-12-16 00:04:56.036805 0
2022-12-16 00:05:06.029107 0
2022-12-16 00:05:16.040542 0

task.add_done_callback(lambda _: self._event_handler_tasks.discard(task))

2022-12-16 00:05:52.361548 0
2022-12-16 00:06:02.367989 0
2022-12-16 00:06:12.382152 67
2022-12-16 00:06:22.382540 111
2022-12-16 00:06:32.394259 138
2022-12-16 00:06:42.400919 160
2022-12-16 00:06:52.414996 201
2022-12-16 00:07:02.434844 240
2022-12-16 00:07:12.451940 300

I will continue to monitor the behavior and will let you know if I find anything useful.

@Lonami
Copy link
Member

Lonami commented Dec 15, 2022

Maybe it's the loop and the lambda capturing the older task so it only clears the latest one (which is an issue if there's more than one). I will push that change to the main branch, thanks.

Lonami added a commit that referenced this issue Dec 15, 2022
See #3235. This should help tone down memory usage a little.
@alexgoryushkin
Copy link
Contributor

I will push that change to the main branch, thanks.

There is an error in the commit, it should be like this task.add_done_callback(self._event_handler_tasks.discard), not like this task.add_done_callback(self._event_handler_tasks)

Lonami added a commit that referenced this issue Dec 16, 2022
See #3235. This should help tone down memory usage a little.
@ehtec
Copy link
Author

ehtec commented Dec 18, 2022

Hi and thanks to both of you for having a look at this. @alexgoryushkin was this the major memory leak in your setup, is the RAM consumption approximately constant after the fix was applied, or was this just one of multiple culprits? Difficult for me to test this at the moment, as we are moving to a new infrastructure.

@alexgoryushkin
Copy link
Contributor

is the RAM consumption approximately constant after the fix was applied, or was this just one of multiple culprits?

Hi, after the changes, the client size has started to grow 2 times slower, but it is still ongoing, so there are still problems. Unfortunately, I do not have the opportunity to continue studying this issue now, but I will try to return to it as soon as possible.

@alexgoryushkin
Copy link
Contributor

I left the client running for a few hours after joining in a few flood chats. An issue with the entity cache has been identified, it grows indefinitely.
These are such client fields as _entity_cache and _mb_entity_cache.

Client size log in megabytes:

2022-12-22 20:04:07.457781 RAM (MB): client  0.49 | _entity_cache  0.01 | _mb_entity_cache  0.00 | client without these caches  0.48
2022-12-22 20:34:08.282286 RAM (MB): client  0.80 | _entity_cache  0.26 | _mb_entity_cache  0.13 | client without these caches  0.41
2022-12-22 21:04:09.570893 RAM (MB): client  1.16 | _entity_cache  0.45 | _mb_entity_cache  0.28 | client without these caches  0.43
2022-12-22 21:34:11.359633 RAM (MB): client  1.44 | _entity_cache  0.59 | _mb_entity_cache  0.38 | client without these caches  0.48
2022-12-22 22:04:13.476014 RAM (MB): client  1.67 | _entity_cache  0.76 | _mb_entity_cache  0.51 | client without these caches  0.39
2022-12-22 22:34:16.097890 RAM (MB): client  1.91 | _entity_cache  0.87 | _mb_entity_cache  0.62 | client without these caches  0.42

And as far as I can tell, automatic cleaning is not currently provided, but it needs to be implemented. At the same time, it seems to me that the cleanup should be smart enough to store only frequently used objects in memory, I think the LRU algorithm is suitable for this purpose.

@ehtec
Copy link
Author

ehtec commented Dec 23, 2022

@alexgoryushkin have you seen all the messages above? the entity cache problem was known to us,

client._entity_cache.clear()

worked for me. I did not know however that there is a second _mb_entity_cache, maybe clearing this the same way solves the problem once and for all. Good to see that without the caches, RAM consumption is constant at least in the timeframe where you did the measurement

@w1ld32
Copy link

w1ld32 commented Jan 16, 2023

@ehtec have you found how to clean _mb_entity_cache? client._mb_entity_cache.clear() dont work(

@ehtec
Copy link
Author

ehtec commented Jan 16, 2023

@ehtec have you found how to clean _mb_entity_cache? client._mb_entity_cache.clear() dont work(

No, I don't have this code running at the moment so cannot test. Try to figure out what is the content and the type of this attribute by printing, if it's an object look at its declaration, maybe there is a similar method to clear. Maybe it's just a dictionary or list, then try to empty it manually

@danielWagnerr
Copy link

danielWagnerr commented Feb 9, 2023

@w1ld32 just call _mb_entity_cache.hash_map.clear()

I'm having the same problem right now, hope this will help

@alexgoryushkin
Copy link
Contributor

@ehtec, @w1ld32, @danielWagnerr
You can try to install my version of the library and test the limit on _entity_cache and _mb_entity_cache. I also added the cache_size_limit: int = 8192 parameter to the TelegramClient class constructor.
LRU algorithm will keep in the cache only the most frequently used entities.
This solves the problem for clients with a small number of chats, but if there are a lot of chats, it does not help. Apparently there is something else there.
Unfortunately, I can't actively investigate this problem right now.
This is how you can install my version: pip install -U https://github.com/alexgoryushkin/Telethon/archive/v1.zip
It corresponds to the latest version of the original library (v1 branch) for today with a small patch 472367d.

@danielWagnerr
Copy link

Thanks @alexgoryushkin, I'll use your changes

Also I'll keep investigating

Lonami added a commit that referenced this issue Apr 6, 2023
Progress towards #3989.
May also help with #3235.
Lonami added a commit that referenced this issue Apr 6, 2023
@alexgoryushkin
Copy link
Contributor

I found an interesting detail. It seems _updates_queue doesn't have time to clear itself when there are too many updates coming in. Even if there are no update handlers.

This is a function to calculate the size of an object, honestly stolen from stackoverflow:

import sys
from gc import get_referents
from types import ModuleType, FunctionType

BLACKLIST = type, ModuleType, FunctionType

def sizeof(obj):
    if isinstance(obj, BLACKLIST):
        return 0
    seen_ids = set()
    size = 0
    objects = [obj]
    while objects:
        need_referents = []
        for obj in objects:
            if not isinstance(obj, BLACKLIST) and id(obj) not in seen_ids:
                seen_ids.add(id(obj))
                size += sys.getsizeof(obj)
                need_referents.append(obj)
        objects = get_referents(*need_referents)
    return size

Logging client memory consumption, updates to the queue, and number of tasks in asyncio:

logger.debug(f"RAM (MB): client {sizeof(client) / (2 ** 20):5.2f} | "
             f"_updates_queue {client._updates_queue.qsize()} | "
             f"all_tasks {len(asyncio.all_tasks(loop))} | "
             f"event_handlers {client.list_event_handlers()}")

Log output for 2 sessions:

[19.10.2023 11:52:09,977] [DEBUG] RAM (MB): client  6.39 | _updates_queue 0 | all_tasks 15 | event_handlers []
[19.10.2023 12:02:13,752] [DEBUG] RAM (MB): client 20.14 | _updates_queue 1281 | all_tasks 14 | event_handlers []
[19.10.2023 12:12:25,222] [DEBUG] RAM (MB): client 48.19 | _updates_queue 3995 | all_tasks 18 | event_handlers []
[19.10.2023 12:22:25,422] [DEBUG] RAM (MB): client  7.22 | _updates_queue 1 | all_tasks 18 | event_handlers []
[19.10.2023 12:32:25,593] [DEBUG] RAM (MB): client  7.23 | _updates_queue 0 | all_tasks 19 | event_handlers []
[19.10.2023 12:42:25,761] [DEBUG] RAM (MB): client  7.31 | _updates_queue 0 | all_tasks 19 | event_handlers []
[19.10.2023 12:52:26,038] [DEBUG] RAM (MB): client  7.35 | _updates_queue 0 | all_tasks 19 | event_handlers []
[19.10.2023 13:02:26,764] [DEBUG] RAM (MB): client  7.41 | _updates_queue 1 | all_tasks 18 | event_handlers []
[19.10.2023 13:12:26,931] [DEBUG] RAM (MB): client  7.66 | _updates_queue 0 | all_tasks 19 | event_handlers []

[19.10.2023 11:52:13,075] [DEBUG] RAM (MB): client  6.18 | _updates_queue 0 | all_tasks 14 | event_handlers []
[19.10.2023 12:02:17,197] [DEBUG] RAM (MB): client 13.77 | _updates_queue 808 | all_tasks 14 | event_handlers []
[19.10.2023 12:12:27,885] [DEBUG] RAM (MB): client 24.69 | _updates_queue 2065 | all_tasks 14 | event_handlers []
[19.10.2023 12:22:49,828] [DEBUG] RAM (MB): client 34.78 | _updates_queue 3182 | all_tasks 14 | event_handlers []
[19.10.2023 12:33:28,674] [DEBUG] RAM (MB): client 50.64 | _updates_queue 5040 | all_tasks 14 | event_handlers []
[19.10.2023 12:43:42,574] [DEBUG] RAM (MB): client 58.56 | _updates_queue 5935 | all_tasks 14 | event_handlers []
[19.10.2023 12:53:48,163] [DEBUG] RAM (MB): client 71.78 | _updates_queue 7546 | all_tasks 14 | event_handlers []
[19.10.2023 13:05:56,475] [DEBUG] RAM (MB): client 85.92 | _updates_queue 9200 | all_tasks 14 | event_handlers []
[19.10.2023 13:17:21,992] [DEBUG] RAM (MB): client 107.28 | _updates_queue 11750 | all_tasks 14 | event_handlers []

@Lonami
Copy link
Member

Lonami commented Oct 20, 2023

v2 is still in alpha stage, but it will feature a update_queue_limit parameter (naming TBD) https://docs.telethon.dev/en/v2/modules/client.html.

However, the recommendation on #3235 (comment) stands, and now it would be possible to test v2 early.

@alexgoryushkin
Copy link
Contributor

By the way, I found a problem with my code. It turned out that in some places blocking operations were being performed. As a result, the event loop did not have time to process the updates correctly.

You can reproduce this behaviour with this event listener:

async def my_event_handler(event: NewMessage.Event):
    if random.choice((True, False)):
        await asyncio.sleep(random.uniform(1, 20))
    else:
        time.sleep(random.uniform(1, 20))

@kotlin711
Copy link

Is there a good solution to this problem?

@amao12580
Copy link

Any update for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cleanup It would be nice if this code was cleaner or this was fixed (low priority)
Projects
None yet
Development

No branches or pull requests

8 participants