Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow FlightSQL implementers to extend do_get() #2581

Closed
avantgardnerio opened this issue Aug 24, 2022 · 9 comments · Fixed by #2582
Closed

Allow FlightSQL implementers to extend do_get() #2581

avantgardnerio opened this issue Aug 24, 2022 · 9 comments · Fixed by #2582
Labels
arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog

Comments

@avantgardnerio
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The Flight protocol has the same shortcomings of many protocols such as FTP and zookeeper: it expects that the interweb is a happy place where everyone knows their public IP address and everything is routable the same way from every where. Unfortunately this is not the case IRL, so I would argue that it is best practice to stream everything over a single connection. This has manifested itself in weeks of work recently in two facets:

  1. The ArrowFlightSQL JDBC driver doesn't support alternate endpoints, and PRs to fix this are currently on hold
  2. When trying to deploy to Kubernetes (I would posit a common situation), it is very hard to get kubernetes to behave in such a way as that it can horizontally scale, and yet have each executor be reachable by unique address to the outside world.

Describe the solution you'd like

Rather than hack on kubernetes routing and build a JDBC fork, it would be much more convenient if we can simply proxy all Flights through the existing connection to the scheduler. It was a relatively minor amount of work to get this working (PR soon incoming), and due to coroutines, performance impacts should be minor, as each stream is just another open file handle.

Unfortunately, that means that the scheduler needs to be able to implement both the Flight and FlightSQL protocols, since there will be do_get requests coming in for SQL operations as well as Flights.

The change I would propose is to create a new method with a default implementation that gets called if the do_get doesn't match any existing FlightSQL request, and allow the implementer to override and handle that if they so choose. The default behavior remains the same.

Describe alternatives you've considered

Solve problems in the networking layer instead of code.

@avantgardnerio avantgardnerio added the enhancement Any new improvement worthy of a entry in the changelog label Aug 24, 2022
@tustvold
Copy link
Contributor

tustvold commented Aug 25, 2022

Why can't you make use of a standard L7 router? Kubernetes supports this is many forms from LoadBalancer Service types, to Ingress resources? You could also just use an off the shelf router like nginx, envoy, etc...

If you do wish to support some sort of proxy for the scheduler, it might make more sense to expose this on a different port?

@avantgardnerio
Copy link
Contributor Author

avantgardnerio commented Aug 25, 2022

I'd like to apologize for some of the wording in this issue. It was written in haste and much of it would be better rephrased as "X could be improved by Y".

Also, I was not aware of this document, or more specifically:

kind: PersistentVolumeClaim
metadata:
  name: data-pv-claim
spec:
  storageClassName: manual
  accessModes:
    - ReadWriteOnce

I believe I now understand how others are deploying Ballista in k8s - using shared storage to allow any executor to access any data. This makes it possible to load balance requests to any executor and still get the same data - I was storing it locally on the executor node itself, and responding to queries with Flights with the address of the executor that processed the partition, as opposed to the service that points at all executors.

All that being said, now that I understand the recommended way of deploying, I think we can still make this easier on folks (and bonus: work with the JDBC driver in it's present state) by allowing the option of proxying the flights through the scheduler, as this will provide the best "works out of the box" experience by only requiring one connection to one address and avoid any issues with NATs, routers, subnets, firewalls, VLANs, overzealous IT security, etc.

Edit: to put this all in context, my overarching goal is to add FlightSQL in addition to the native Ballista protocol (which is working in our forks).

@andygrove
Copy link
Member

I agree that having the option for the JDBC driver to just interact with the scheduler and not require the ability to reach the executors is highly desirable and will make it much easier for users to try Ballista out. Not all developers are DevOps and k8s experts. We should really try and make it easy for devs to get started and make it possible for DevOps to deploy efficiently with a more advanced configuration.

@tustvold
Copy link
Contributor

tustvold commented Aug 25, 2022

Also, I was not aware of this document, or more specifically:

I believe I now understand how others are deploying Ballista in k8s - using shared storage to allow any executor to access any data.

That's not what that is doing, that's creating a single PVC which can be attached to a single executor pod, and then associating a local path with a PV for that PVC. This likely will not work for more than a single executor, and won't work across nodes. It's not something I would advise doing...

If you need local scratch space I'd recommend using EmptyDir or similar, w.r.t using shared storage for sharing data between executors - that is likely a dead-end. Kubernetes is really not designed for that deployment model, not to mention most cloud providers don't support multi-mount block storage anyway.

In general horizontal scalability and local storage are somewhat incompatible. You can do a limited form of auto-scaling using StatefulSets, PVC templates, and a volume provisioner (normally provided by the cloud provider), but it is a massive operational headache that is best avoided at all costs.

I agree that having the option for the JDBC driver to just interact with the scheduler and not require the ability to reach the executors is highly desirable and will make it much easier for users to try Ballista out

Agree with the sentiment, disagree with the mechanism. Why not just provide k8s manifests that deploy Ballista idiomatically, in this case adding a kubernetes Service for the executors? The user would then just run kubectl apply -f manifest.yaml and be done. Kubernetes is specifically designed to make this easy and not need application support.

That all being said if Ballista wants to support proxying via the scheduler that's perfectly fine, it just seems a little odd to buy into all the overheads of kubernetes and not actually use it 😅

@avantgardnerio
Copy link
Contributor Author

avantgardnerio commented Aug 25, 2022

This likely will not work for more than a single executor, and won't work across nodes.
multi-mount block storage

Ah, yes, I was surprised to see it not be of type NFS, and this was my hesitation to go this route - shared storage seemed like a nightmare.

a kubernetes Service for the executors

From the document you linked:

Kubernetes gives Pods their own IP addresses and a single DNS name for a set of Pods, and can load-balance across them.

This is what I still don't understand: if there is data locality to the executor that did the processing, and kubernetes is loadbalancing / DNS round robin / whatever to a random pod, then what happens if it routes the client TCP connection to executor pod A when the Flight results reside on local disk on executor pod B?

Edit: the only answer I can think of is that the executors proxy between each other already? or the executors are aware of their service IP address and advertise that and the client skips DNS and goes straight to the IP and the service exposes all IPs?

@avantgardnerio
Copy link
Contributor Author

bgardner@bg-tensorbook:~/workspace/ballista/arrow-ballista/helm/ballista$ kubectl scale statefulsets ballista-executor --replicas=2
statefulset.apps/ballista-executor scaled
bgardner@bg-tensorbook:~/workspace/ballista/arrow-ballista/helm/ballista$ kubectl get services
NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)     AGE
ballista-executor    ClusterIP   10.0.209.211   <none>        50051/TCP   15h
ballista-scheduler   ClusterIP   10.0.165.171   <none>        50050/TCP   15h
kubernetes           ClusterIP   10.0.0.1       <none>        443/TCP     27d
bgardner@bg-tensorbook:~/workspace/ballista/arrow-ballista/helm/ballista$ kubectl get pods
NAME                   READY   STATUS    RESTARTS   AGE
ballista-executor-0    1/1     Running   0          15h
ballista-executor-1    1/1     Running   0          9s
ballista-scheduler-0   1/1     Running   0          15h

Seems like with 1 service and 2 pods there is still only 1 IP, so it must be executors proxying to each other?

@tustvold
Copy link
Contributor

tustvold commented Aug 25, 2022

Seems like with 1 service and 2 pods there is still only 1 IP, so it must be executors proxying to each other?

That's the ClusterIP, which is a type of Service. Its a transparent L4 proxy provided by kubernetes, when you make a new TCP connection to that IP it will transparently DNAT it to one of the pod IPs. If you run kubectl get pods -o wide you will get the pod IPs.

It is perhaps worth highlighting that as gRPC is HTTP/2 which has long-lived TCP connections across multiple requests, this L4 proxy will likely not yield a particular balanced load distribution

if there is data locality to the executor that did the processing

I have to confess to not being intimately familiar with Ballista, but the following is what I had presumed:

  • The executors are interchangeable with no data locality. They have some mechanism to distribute data between them as part of distributed execution, potentially via object storage or some communication protocol
  • The user makes a single DoGet request and gets a stream of FlightData responses back, as a result even if doing L7 load balancing, there is only HTTP request to route per query and so you don't need to worry about session stickiness, etc...

@avantgardnerio
Copy link
Contributor Author

@thinkharderdev do you know the answers to these questions? Sorry for the Ballista discussion on an arrow PR, but the arrow repo being home to FlightSQL is causing more of this lately.

@tustvold
Copy link
Contributor

Happy for this discussion to move to the Ballista repo if you prefer, I don't mind either way. There's also no obligation to satisfy my curiosity 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants