Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrying a subscription does not renew the id and may cause an error on the server because the id is already used #5217

Open
Tracked by #5648
damianpetla opened this issue Sep 1, 2023 · 23 comments
Assignees

Comments

@damianpetla
Copy link

Version

4.0.0-alpha.2

Summary

I have noticed few crashes of my app when using subscription. It looks like SDK have problem with parsing errors. I am not sure exactly why. I have contacted my backend team but maybe letting you know can resolve that issue faster.

My setup of ApolloClient looks like this

ApolloClient.Builder()
        .networkTransport(
            HttpNetworkTransport.Builder()
                .serverUrl(BuildConfig.BASE_URL)
                .okHttpClient(okHttpClient)
                .build()
        )
        .subscriptionNetworkTransport(
            WebSocketNetworkTransport.Builder()
                .protocol(GraphQLWsProtocol.Factory())
                .serverUrl(BuildConfig.BASE_URL)
                .okHttpClient(okHttpClient)
                .reopenWhen { throwable, attempt ->
                    if (attempt < 13) {
                        delay(2.0.pow(attempt.toDouble()).toLong())
                    } else {
                        delay(5000L)
                    }
                    true
                }
                .build()
        )
        .build()

and subscription like this

 apolloClient.subscription(BoostCompetitionSubscription(roomId))
            .toFlow()
            .mapNotNull { response ->
                val exception = response.exception
                if (exception is ApolloGraphQLException) {
                    Timber.e(BoostCompetitionException(roomId, exception.errors, exception))
                }
                try {
                    response.data?.boostCompetitions?.firstOrNull()?.toDomain()
                } catch (ex: Exception) {
                    Timber.e(BoostCompetitionParsingException(roomId, ex.message, ex))
                    null
                }
            }

Just before the crash reopenWhen set on WebScoketNetworkTransport was fired once with attempt 0

Is there something I could do to prevent app from crashing?

Steps to reproduce the behavior

No response

Logs

com.apollographql.apollo3.exception.JsonDataException: Expected BEGIN_ARRAY but was BEGIN_OBJECT at path errors
    at com.apollographql.apollo3.api.json.MapJsonReader.peek
    at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
    at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
    at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
    at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
    at com.apollographql.apollo3.api.internal.ResponseParser.readErrors
    at com.apollographql.apollo3.api.internal.ResponseParser.readErrors
    at com.apollographql.apollo3.api.internal.ResponseParser.parse
    at com.apollographql.apollo3.api.Operations.parseJsonResponse
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit$bridge
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
    at kotlinx.coroutines.flow.internal.SafeCollector.emit
    at kotlinx.coroutines.flow.internal.SafeCollector.emit
    at kotlinx.coroutines.flow.internal.SafeCollector.emit
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invokeSuspend
    at androidx.compose.material.AnchoredDraggableKt$snapTo$2.invokeSuspend$bridge
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
    at androidx.compose.material.AnchoredDraggableKt$snapTo$2.invoke$bridge(SourceFile:0)
    at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$1.emit
    at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$2.emit
    at androidx.compose.ui.platform.WindowRecomposer_androidKt$createLifecycleAwareWindowRecomposer$2$onStateChanged$1$1$1$1.emit$bridge
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$filter$1$2.emit
    at androidx.compose.foundation.text.selection.SelectionMagnifierKt$rememberAnimatedMagnifierPosition$1$2.emit$bridge
    at kotlinx.coroutines.flow.SubscribedFlowCollector.emit
    at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl
    at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
    at kotlinx.coroutines.DispatchedTask.run
    at kotlinx.coroutines.DispatchedTask.run
    at kotlinx.coroutines.internal.LimitedDispatcher.run
    at kotlinx.coroutines.scheduling.TaskImpl.run
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run
@martinbonnin
Copy link
Contributor

Looks like your backend is returning errors as an object instead of the expected list (spec). The response should look like

{
  "data": null,
  "errors": [
    {
      "message": "Name for character with ID 1002 could not be fetched.",
      "locations": [{ "line": 6, "column": 7 }],
      "path": ["hero", "heroFriends", 1, "name"]
    }
  ]
}

This should be fixed in your backend.

I also agree this should be recoverable from the client side of things. Did you try catching errors in your Flow? Something like

apolloClient.subscription(subscription)
     .toFlow()
     .catch { 
       // recover here
     }

@damianpetla
Copy link
Author

Hey @martinbonnin thanks for quick reply. I haven't tried catch clause just checking exceptions in mapNotNull and on the client itself. I will add catch and see how it goes. I saw you added some PR, do I get it right that this will prevent crash? If so, when can I expect next version? Thanks!

@martinbonnin
Copy link
Contributor

do I get it right that this will prevent crash?

Yes, instead of a crash, you will have an ApolloResponse with null data and non-null exception instead

when can I expect next version?

We typically release ~1 a month these days but there is no fixed release schedule. You can already try in the SNAPSHOTs though

@damianpetla
Copy link
Author

damianpetla commented Sep 6, 2023

hey @martinbonnin My team was looking into the crash from the backend side and apparently it was same issue as this one hasura/graphql-engine#3564 We are using Hasura.

If we understand it correctly, it was the client trying to connect with the same operationId.

This 👇 what I got from the backend team. Replaced some data with <removed>

{
  "detail": {
    "connection_info": {
      "msg": null,
      "token_expiry": "2023-08-31T20:09:54Z",
      "websocket_id": "<removed>"
    },
    "event": {
      "detail": {
        "operation_id": "cbeb464f-f79<removed>",
        "operation_name": "SomeOperation",
        "operation_type": {
          "detail": "an operation already exists with this id: <removed>",
          "type": "proto_err"
        },
        "parameterized_query_hash": null,
        "query": {
          "operationName": "SomeOperation",
          "query": "subscription <removed>"
        },
        "request_id": null
      },
      "type": "operation"
    },
    "user_vars": {
      "x-hasura-role": "user",
      "x-hasura-user-id": "<removed>"
    }
  },
  "level": "error",
  "timestamp": "2023-08-31T20:00:30.083+0000",
  "type": "websocket-log"
}

I am letting you know because maybe there is another issue with Apollo SDK that you would like to investigate.

@martinbonnin
Copy link
Contributor

martinbonnin commented Sep 6, 2023

Thanks for the follow up!

{
  "detail": { ... },
  "level": "error",
  "timestamp": "2023-08-31T20:00:30.083+0000",
  "type": "websocket-log"
}

Is that the content of a WebSocket message? Or the contents of the "errors" field? The graphql-ws protocol defines messages here. An error message should be like:

{
  id: '<unique-operation-id>';
  type: 'error';
  payload: GraphQLError[];
}

And a GraphQLError is defined here

All in all, we can make it so that Apollo Kotlin catches these errors and exposes them in ApolloResponse.exception alongside other possible errors but from my current understanding this looks like the Hasura server returns non-compliant data in the first place.

@damianpetla
Copy link
Author

damianpetla commented Sep 19, 2023

@martinbonnin hey, thanks for your info. I just managed to reproduce this error. I was using single collector in my view model that use flow from subscription. Now, I tried to setup second collector on the same flow and that error appeared. Is it not allowed to have multiple collectors?

@martinbonnin
Copy link
Contributor

Excellent question. From a quick look at the code, collecting the same Flow twice will send two start messages with the same requestUuid (code). I can see how that can confuse the server.

The quick fix is to re-use your ApolloCall, not Flow:

val call = apolloClient.subscription(subscription)

launch {
  // first collector
  call.toFlow().collect { ... }
}
launch {
  // second collector
  call.toFlow().collect { ... }
}

This will make sure a new requestUuid is allocated every time. Longer term we might want to allocate a new uuid in toFlow() but I'm not 100% sure the API implications of this yet.

@damianpetla
Copy link
Author

thanks @martinbonnin can't do that unfortunately because we use KKM and I have Apollo in shared module exposing Flow and two collectors in android module. I will use single flow, not a big deal. Just wanted to raise this as it might be a potential improvement to Apollo allowing multiple collectors.

@damianpetla
Copy link
Author

damianpetla commented Sep 22, 2023

hey @martinbonnin it might be SDK issue, not sure but I am playing a bit with subscriptions and turning off and on network to see how reconnection behave. I noticed this error

WebSocket Closed
 code='1000' reason='{"server_error_msg":"4400: an operation already exists with this
id: c20d5533-40f8-4b2b-8487-024c5dba79a8"}

This is thrown after I turned on the internet back. I have a single subscription running and that error is from retryWhen on ApolloClient directly.

EDIT:
Realised that to reproduce it I had to disable internet first and then try subscribe. Lets say you have some screen with subscription. You turn of the internet before going to that screen, then you go there. You turn on the internet and this error is presented.
I also realised it's kinda the same issue I was reporting for trying multiple collectors but this time I am not. It's is single collect. So maybe something for SDK to handle?

@martinbonnin martinbonnin changed the title App crashes with Expected BEGIN_ARRAY but was BEGIN_OBJECT at path errors Retrying a subscription does not renew the id and may cause an error on the server because the id is already used Nov 8, 2023
@damianpetla
Copy link
Author

Hey @martinbonnin how are you doing? I was wondering how are you doing with the fix for this issue. I see there is a draft PR already but it's pretty old. Can we expect some progress any time soon on this?

@martinbonnin
Copy link
Contributor

@damianpetla apologies for the super late response here. This ended up being quite the rabbit hole...

The tlrd; of that rabbit hole is that retrying at the WebSocket like was previously done has major limitations as you found out:

  1. The WebSocket doesn't know about operation ids and cannot update them when retrying.
  2. In general, subscriptions can happen over other transports than websockets. Multipart HTTP being another example. So retrying at the WebSocket layer does not help there.

All in all, 4.0.0-beta.5 adds a retry option on the ApolloClient itself:

    ApolloClient.Builder()
        .serverUrl(sampleServer.graphqlUrl())
        .retryOnError { it.operation is Subscription }
        .subscriptionNetworkTransport(
            WebSocketNetworkTransport.Builder()
                .serverUrl(sampleServer.subscriptionsUrl())
                .build()
        )
        .build()

This code also uses an incubating WebSocketNetworkTransport that should make it a lot easier to debug things moving forward:

dependencies {
  implementation("com.apollographql.apollo3:apollo-websocket-network-transport-incubating")
}

For more details, you can also check the integration test here

The current plan is to replace the default implementation of WebSocketNetworkTransport in the next version if everything looks good with the incubating one. If you're still around and have the time to try it, I'd love to hear what you think. Again, deep apologies for the delay on this issue and thanks for raising it 🙏

@damianpetla
Copy link
Author

damianpetla commented Mar 13, 2024

Hey @martinbonnin Thanks for update, I was looking for those changes. No worries about delay. We get this SDK for free so appreciate any feedback and support 😄

I have updated to beta5 now and added retryOnError. Will see how that works. Regarding incubating, I wouldn't mind to try that as well but would appreciate some hints on it. We use it on prod so don't want to take too much risk with experimentation🙈

Here is how we setup apollo client currently:

   ApolloClient.Builder()
        .networkTransport(
            HttpNetworkTransport.Builder()
                .serverUrl(applicationInfo.hasuraUrl)
                .httpEngine(KtorHttpEngine(httpClient))
                .build()
        )
        .retryOnError { it.operation is Subscription }
        .subscriptionNetworkTransport(
            WebSocketNetworkTransport.Builder()
                .webSocketEngine(KtorWebSocketEngine(httpClient))
                .protocol(GraphQLWsProtocol.Factory())
                .serverUrl(applicationInfo.hasuraUrl)
                /**
                 * ApolloClient on logout stop subscriptions and wait by default 60 sec
                 * until connection is closed. Below function change that to 5 sec
                 * so another user has no chance to login and re-use previous connection.
                 */
                .idleTimeoutMillis(5000)
                .reopenWhen { throwable, attempt ->
                    log.i(
                        throwable = throwable,
                        messageString = "Reopen when attempt $attempt"
                    )
                    if (throwable is ApolloWebSocketClosedException) {
                        log.w(
                            throwable = throwable,
                            messageString = "Web socket closed"
                        )
                    }
                    /**
                     * Power of 13 attempts give ~5 sec delay.
                     * If it's higher attempt we keep 5 sec delay max.
                     */
                    exponentialDelay(attempt)
                }
                .build()
        )
        .build()

Things that I would need to address:

  1. webSocketEngine cannot take KtorWebSocketEngine. Looks like there is just one implementation of WebSocketEngine inside incubating which is AppleWebSocketEngine
  2. protocol replaced with wsProtocol and there is no Factory for GraphQLWsProtocol but I guess it suppose to be created now with constructor?
  3. There is no reopenWhen which was called on network disconnections. Is it not needed anymore? Is every disconnection now passed to each subscription or is it handled silently by SDK?

Additionally, you can see that I am reducing idle timeout. I discovered that re-login user kept using previous connection. Do you know if there is a better way to invalidate connection?

Thanks!

@martinbonnin
Copy link
Contributor

Hey thanks for the follow up!

  1. webSocketEngine cannot take KtorWebSocketEngine. Looks like there is just one implementation of WebSocketEngine inside incubating which is AppleWebSocketEngine

Indeed the WebSocketEngine has changed too. I'll work on ktor-engine-incubating. Follow up issue here

  1. protocol replaced with wsProtocol and there is no Factory for GraphQLWsProtocol but I guess it suppose to be created now with constructor?

TBH I'm not sure why this needed to be a factory. WsProtocol is mostly stateless and the constructors do not do any IO so it seemed better to bypass the Factory altogether.

  1. There is no reopenWhen which was called on network disconnections. Is it not needed anymore? Is every disconnection now passed to each subscription or is it handled silently by SDK?

Exactly. I realized recently that managing the reopen at the WebSocket layer was wrong because it doesn't work for multipart subscriptions.

In general, I like to see the WebSocket more and more as a raw socket. Just like HTTP 2 allows multiplexing several streams on a single raw socket, the WebSocket allows multiplexing several subscription streams. In terms of GraphQL APIs, how connections are pooled and reopened should not matter.

Using retryOnError {} gives more control to individually retry some subscriptions. For an example "cold" subscriptions (which depend on a client state, think countTo(10) as opposed to "warm" subscriptions like currentTime) might not want to be retried.

As a nice bonus, 4.0.0-beta.5 uses Android/iOS APIs to retry when network connectivity comes back instead of exponential backoff for reduced latency.

Additionally, you can see that I am reducing idle timeout. I discovered that re-login user kept using previous connection. Do you know if there is a better way to invalidate connection?

Right. That's actually the (big) difference between a GraphQL WebSocket and a raw HTTP2 socket is that the GraphQL WebSocket is stateful. I'm not a huge fan of this. Can you share more details about how authentication is handled by your backend? Especially how do you know a subscription has expired? I'm hoping you can handle that with a custom ApolloInterceptor

@damianpetla
Copy link
Author

hey @martinbonnin I have played a bit with retryOnError and it's being called on every new subscription with EmptyExecutionContext. While writing this message I realised I might have misunderstood this operator. Correct me if I am wrong but it's setting up condition for later when actual error is encountered, right?

Btw, I also noticed that on individual operators I could call setRetryOnError and read it then inside retryOnError { it.operation.retryOnError } So I could use that if I want let individual calls to decide if it should be retried.

@martinbonnin
Copy link
Contributor

martinbonnin commented Mar 13, 2024

Oooh my..., the KDoc for ApolloClient.Builder is severaly lacking. Follow up issue here.

  • ApolloCall.retryOnError(Boolean) uses a boolean that lets individual operation decide if they want to be retried
  • ApolloClient.Builder.retryOnError((ApolloRequest) -> Boolean) uses a function that determines the default value if no retryOnError has been set on the ApolloCall. Usually you'll do { it.operation is Subscription } to only retry subscriptions (but you might want to retry queries too)

The actual logic is done in RetryOnErrorInterceptor. It leverages coroutines to retry the Flow whenever an error happens. If no error happens then no retry is done.

Hope this helps, since this is all pretty new there's not a lot of docs but I'll get working on this (edit: KDoc PR here).

@apollographql apollographql deleted a comment from github-actions bot Mar 21, 2024
@martinbonnin martinbonnin reopened this Mar 21, 2024
@martinbonnin
Copy link
Contributor

@damianpetla 4.0.0-beta.6 has shipped with KDoc, documentation and migration guide: https://www.apollographql.com/docs/kotlin/v4/advanced/experimental-websockets. Let us know how that goes.

@damianpetla
Copy link
Author

hey @martinbonnin I was trying it out but don't know what to do with KtorWebSocketEngine that we were using so far on previous WebSocketNetworkTransport

@martinbonnin
Copy link
Contributor

Right, this part is still needed... We're actually looking into breaking down the apollo-kotlin repo in smaller ones. KtorWebSocketEngine is a lot less stable than ApolloClient and it doesn't really make sense for them to share the versioning. KtorWebSocketEngine might be a good candidate for that... It might take a while to get it published and everything. In the meantime, I'll look into sending you a gist that you can copy/paste in your project.

@martinbonnin
Copy link
Contributor

@damianpetla gist is here.

I have tested it passes all the WebSocketNetworkTransportTest tests except one little corner case where it throws ApolloNetworkException instead of ApolloWebSocketClosedException here. I'll work on getting that tested and published properly but in the meantime you should be able to just copy/pasta this.

@damianpetla
Copy link
Author

hey @martinbonnin I have tried your gist but for some reason subscriptions stopped emitting any data. I have logged the engine and I see frames being send and received.

@martinbonnin
Copy link
Contributor

Can you upload a small reproducer somewhere?

@damianpetla
Copy link
Author

Not sure really how at the moment, building something on a side would probably take too much time. I might look into the code more later and try debugging more. Swamped with work currently 😓 Even trying it was a bit stretching but not giving up yet so don't worry :)

@martinbonnin
Copy link
Contributor

I hear you, thanks for looking into this! WebSockets are hard because the transport isn't specified so there are a lot of variants out there. To make things worse, they are very often behind authentication, making it really hard to reproduce the issues.

I've uploaded a small playground here that uses MockServer. Hopefully it can help reproducing/understanding what's going wrong there. Let me know if you find out anything.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants