Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connecting Observable #318

Closed
dayaftereh opened this issue May 5, 2021 · 4 comments · May be fixed by #334
Closed

Connecting Observable #318

dayaftereh opened this issue May 5, 2021 · 4 comments · May be fixed by #334
Assignees
Labels
question Question regarding how RxGo is working etc.

Comments

@dayaftereh
Copy link

dayaftereh commented May 5, 2021

I'm not sure if the following behavior an issue or i'm on the wrong track. I want to have a producer observable from a single channel with the ability of rxgo.WithPublishStrategy() to broadcast the items to child observables. I'm creating the child observables by using Observable#Observe channel from the producer observable with rxgo.FromChannel, but when the child Observable is canceled via the context from context.WithCancel() the parent observable stops emitting items.

using: github.com/reactivex/rxgo/v2 v2.5.0

To Reproduce
The following test allows to reproduce the behavior:

import "github.com/reactivex/rxgo/v2"

func TestRxGo(t *testing.T) {
	producerCtx, cancelProducer := context.WithCancel(context.Background())
	// create a infinity producer with context for cancel
	producer := rxgo.Create(
		[]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
			log.Print("create producer")
			defer func() {
				log.Print("producer done")
			}()

			i := 0
			for {
				timeout := time.After(time.Second * 1)
				select {
				case <-ctx.Done():
					return
				case <-timeout:
					next <- rxgo.Of(i)
					log.Printf("producer fire %d", i)
				}
				i = i + 1
			}
		}},
		rxgo.WithPublishStrategy(),
		rxgo.WithContext(producerCtx),
		rxgo.WithBackPressureStrategy(rxgo.Drop),
	)

	// connect to the producer
	producer.Connect(producerCtx)

	// create a child observable to receive items from producer
	append := func(index int, parent context.Context) rxgo.Observable {
		observable := rxgo.FromChannel(
			producer.Observe(),
			rxgo.WithBackPressureStrategy(rxgo.Drop),
			rxgo.WithContext(parent),
		)
		observable.DoOnNext(func(i interface{}) {
			log.Printf("Observable %d: %v", index, i)
		})

		return observable
	}

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)

	// create the first observable with own context for cancel
	observable1Ctx, cancelObservable1 := context.WithCancel(context.Background())
	append(1, observable1Ctx)

	// create the second observable with own context for cancel
	observable2Ctx, cancelObservable2 := context.WithCancel(context.Background())
	append(2, observable2Ctx)

	go func() {
		log.Printf("sleep 5")
		time.Sleep(time.Second * 5)

		log.Print("cancel 1 observable")
		cancelObservable1()

		log.Printf("sleep 5")
		time.Sleep(time.Second * 5)

		log.Print("cancel 2 observable")
		cancelObservable2()
	}()

	log.Print("wait for done")
	<-observable2Ctx.Done()
	log.Print("all observable canceled")

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)

	log.Printf("cancel the producer")
	cancelProducer()

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)
}

The console output is:

=== RUN   TestRxGo
2021/05/05 14:57:32 sleep 5
2021/05/05 14:57:32 create producer
2021/05/05 14:57:33 producer fire 0
2021/05/05 14:57:34 producer fire 1
2021/05/05 14:57:35 producer fire 2
2021/05/05 14:57:36 producer fire 3
2021/05/05 14:57:37 wait for done
2021/05/05 14:57:37 sleep 5
2021/05/05 14:57:37 producer fire 4
2021/05/05 14:57:37 Observable 1: 4
2021/05/05 14:57:37 Observable 2: 4
2021/05/05 14:57:38 producer fire 5
2021/05/05 14:57:38 Observable 2: 5
2021/05/05 14:57:38 Observable 1: 5
2021/05/05 14:57:39 producer fire 6
2021/05/05 14:57:39 Observable 2: 6
2021/05/05 14:57:39 Observable 1: 6
2021/05/05 14:57:40 producer fire 7
2021/05/05 14:57:40 Observable 2: 7
2021/05/05 14:57:40 Observable 1: 7
2021/05/05 14:57:41 producer fire 8
2021/05/05 14:57:41 Observable 2: 8
2021/05/05 14:57:41 Observable 1: 8
2021/05/05 14:57:42 cancel 1 observable
2021/05/05 14:57:42 sleep 5
2021/05/05 14:57:42 producer fire 9

##--> Missing Observable 2
##--> Missing producer fire

2021/05/05 14:57:47 cancel 2 observable
2021/05/05 14:57:47 all observable canceled
2021/05/05 14:57:47 sleep 5

##--> Missing producer fire

2021/05/05 14:57:52 cancel the producer

##--> Missing producer done

2021/05/05 14:57:52 sleep 5
--- PASS: TestRxGo (25.04s)
PASS

Expected behavior

Actually i what that the producer observable still emits items without blocking and creating child observables dynamically.

@dayaftereh dayaftereh added the question Question regarding how RxGo is working etc. label May 5, 2021
@dayaftereh
Copy link
Author

Using Observable.ForEach to forward the items from the producer i'm close to a working example, but with a ugly workaround using a boolflag, because Observable.ForEach is unable to stop emitting item from the producer.

Here is the test:

func TestRxGoWithForEach(t *testing.T) {
	producerCtx, cancelProducer := context.WithCancel(context.Background())
	// create a infinity producer with context for cancel
	producer := rxgo.Create(
		[]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
			log.Print("create producer")
			defer func() {
				log.Print("producer done")
			}()

			i := 0
			for {
				timeout := time.After(time.Second * 1)
				select {
				case <-ctx.Done():
					return
				case <-timeout:
					next <- rxgo.Of(i)
					log.Printf("producer fire %d", i)
				}
				i = i + 1
			}
		}},
		rxgo.WithPublishStrategy(),
		rxgo.WithContext(producerCtx),
		rxgo.WithBackPressureStrategy(rxgo.Drop),
	)

	// connect to the producer
	producer.Connect(producerCtx)

	// create a child observable to receive items from producer
	append := func(index int, parent context.Context) rxgo.Observable {

		observableCtx, cancel := context.WithCancel(context.Background())

		observable := rxgo.Create(
			[]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
				log.Printf("create observable %d", index)
				defer func() {
					log.Printf("observable done %d", index)
				}()

				done := false

				wait := producer.ForEach(
					func(i interface{}) {
						if !done {
							next <- rxgo.Of(i)
						} else {
							log.Printf("ForEach observable %d", index)
						}
					},
					func(err error) {
						if !done {
							next <- rxgo.Error(err)
						}
					},
					func() {
						cancel()
					},
					rxgo.WithContext(ctx),
				)

				select {
				case <-observableCtx.Done():
				case <-wait:
				}

				done = true
			}},
			rxgo.WithPublishStrategy(),
			rxgo.WithContext(observableCtx),
			rxgo.WithBackPressureStrategy(rxgo.Drop),
		)

		_, abort := observable.Connect(context.Background())

		go func() {
			<-parent.Done()
			abort()
			cancel()
		}()

		observable.DoOnNext(func(i interface{}) {
			log.Printf("Observable %d: %v", index, i)
		})

		return observable
	}

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)

	// create the first observable with own context for cancel
	observable1Ctx, cancelObservable1 := context.WithCancel(context.Background())
	append(1, observable1Ctx)

	// create the second observable with own context for cancel
	observable2Ctx, cancelObservable2 := context.WithCancel(context.Background())
	append(2, observable2Ctx)

	go func() {
		log.Printf("sleep 5")
		time.Sleep(time.Second * 5)

		log.Print("cancel 1 observable")
		cancelObservable1()

		log.Printf("sleep 5")
		time.Sleep(time.Second * 5)

		log.Print("cancel 2 observable")
		cancelObservable2()
	}()

	log.Print("wait for done")
	<-observable2Ctx.Done()
	log.Print("all observable canceled")

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)

	log.Printf("cancel the producer")
	cancelProducer()

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)
}

and the console output:

=== RUN   TestRxGoWithForEach
2021/05/06 08:45:40 sleep 5
2021/05/06 08:45:40 create producer
2021/05/06 08:45:41 producer fire 0
2021/05/06 08:45:42 producer fire 1
2021/05/06 08:45:43 producer fire 2
2021/05/06 08:45:44 producer fire 3
2021/05/06 08:45:45 create observable 1
2021/05/06 08:45:45 create observable 2
2021/05/06 08:45:45 wait for done
2021/05/06 08:45:45 sleep 5
2021/05/06 08:45:45 producer fire 4
2021/05/06 08:45:45 Observable 2: 4
2021/05/06 08:45:45 Observable 1: 4
2021/05/06 08:45:46 producer fire 5
2021/05/06 08:45:46 Observable 2: 5
2021/05/06 08:45:46 Observable 1: 5
2021/05/06 08:45:47 producer fire 6
2021/05/06 08:45:47 Observable 2: 6
2021/05/06 08:45:47 Observable 1: 6
2021/05/06 08:45:48 producer fire 7
2021/05/06 08:45:48 Observable 2: 7
2021/05/06 08:45:48 Observable 1: 7
2021/05/06 08:45:49 producer fire 8
2021/05/06 08:45:49 Observable 1: 8
2021/05/06 08:45:49 Observable 2: 8
2021/05/06 08:45:50 cancel 1 observable
2021/05/06 08:45:50 sleep 5
2021/05/06 08:45:50 observable done 1
2021/05/06 08:45:50 producer fire 9
2021/05/06 08:45:50 ForEach observable 1
2021/05/06 08:45:50 Observable 2: 9
2021/05/06 08:45:51 producer fire 10
2021/05/06 08:45:51 Observable 2: 10
2021/05/06 08:45:51 ForEach observable 1
2021/05/06 08:45:52 producer fire 11
2021/05/06 08:45:52 Observable 2: 11
2021/05/06 08:45:52 ForEach observable 1
2021/05/06 08:45:53 producer fire 12
2021/05/06 08:45:53 Observable 2: 12
2021/05/06 08:45:53 ForEach observable 1
2021/05/06 08:45:54 producer fire 13
2021/05/06 08:45:54 Observable 2: 13
2021/05/06 08:45:54 ForEach observable 1
2021/05/06 08:45:55 cancel 2 observable
2021/05/06 08:45:55 all observable canceled
2021/05/06 08:45:55 sleep 5
2021/05/06 08:45:55 observable done 2
2021/05/06 08:45:55 producer fire 14
2021/05/06 08:45:55 ForEach observable 2
2021/05/06 08:45:55 ForEach observable 1
2021/05/06 08:45:56 producer fire 15
2021/05/06 08:45:56 ForEach observable 2
2021/05/06 08:45:56 ForEach observable 1
2021/05/06 08:45:57 producer fire 16
2021/05/06 08:45:57 ForEach observable 2
2021/05/06 08:45:57 ForEach observable 1
2021/05/06 08:45:58 producer fire 17
2021/05/06 08:45:58 ForEach observable 2
2021/05/06 08:45:58 ForEach observable 1
2021/05/06 08:45:59 producer fire 18
2021/05/06 08:45:59 ForEach observable 2
2021/05/06 08:45:59 ForEach observable 1
2021/05/06 08:46:00 cancel the producer
2021/05/06 08:46:00 sleep 5
2021/05/06 08:46:00 producer done
--- PASS: TestRxGoWithForEach (25.05s)
PASS

@su225
Copy link

su225 commented Nov 4, 2021

I guess I know why that blocks

func (i *channelIterable) produce(ctx context.Context) {
	defer func() {
		i.mutex.RLock()
		for _, subscriber := range i.subscribers {
			close(subscriber)
		}
		i.mutex.RUnlock()
	}()

	for {
		select {
		case <-ctx.Done():
			return
		case item, ok := <-i.next:
			if !ok {
				return
			}
			i.mutex.RLock()
			for _, subscriber := range i.subscribers {
				subscriber <- item
			}
			i.mutex.RUnlock()
		}
	}
}

The subscriber channel is not listened to when the subscription context goes away. In your example (Observable 1 and Observable 2). So...It gets stuck in this for loop. @dayaftereh - when both adding and removing subscribers are also taken into account, then subscription leakage happens because closing one subscription context leads to pipeline block :(

@su225
Copy link

su225 commented Nov 4, 2021

@dayaftereh - I made some changes in the PR linked to allow unsubscribe. For testing purposes, I changed the producer part in your example to a channel based producer. Here is the modified test

func TestRxGo(t *testing.T) {
	producerCtx, cancelProducer := context.WithCancel(context.Background())
	// create a infinity producer with context for cancel
	itemCh := make(chan Item)
	go func() {
		defer func() {
			log.Print("producer done")
			close(itemCh)
		}()
		i := 0
		for {
			select {
			case <-producerCtx.Done():
				return
			case <-time.After(1 * time.Second):
				select {
				case <-producerCtx.Done():
					return
				case itemCh <- Of(i):
					log.Printf("producer fire %d", i)
				}
				i++
			}
		}
	}()

	producer := FromChannel(itemCh,
		WithPublishStrategy(),
		WithContext(producerCtx),
		WithBackPressureStrategy(Drop))

	// connect to the producer
	producer.Connect(producerCtx)

	// create a child observable to receive items from producer
	append := func(index int, parent context.Context) {
		producer.DoOnNext(func(i interface{}) {
			log.Printf("Observable %d: %v", index, i)
		}, WithContext(parent))
	}

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)

	// create the first observable with own context for cancel
	observable1Ctx, cancelObservable1 := context.WithCancel(context.Background())
	append(1, observable1Ctx)

	// create the second observable with own context for cancel
	observable2Ctx, cancelObservable2 := context.WithCancel(context.Background())
	append(2, observable2Ctx)

	go func() {
		log.Printf("sleep 5")
		time.Sleep(time.Second * 5)

		log.Print("cancel 1 observable")
		cancelObservable1()

		log.Printf("sleep 5")
		time.Sleep(time.Second * 5)

		log.Print("cancel 2 observable")
		cancelObservable2()
	}()

	log.Print("wait for done")
	<-observable2Ctx.Done()
	log.Print("all observable canceled")

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)

	log.Printf("cancel the producer")
	cancelProducer()

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)
}

And..........Here is the output

=== RUN   TestRxGo
2021/11/04 13:34:22 sleep 5
2021/11/04 13:34:23 producer fire 0
2021/11/04 13:34:24 producer fire 1
2021/11/04 13:34:25 producer fire 2
2021/11/04 13:34:26 producer fire 3
2021/11/04 13:34:27 wait for done
2021/11/04 13:34:27 sleep 5
2021/11/04 13:34:27 producer fire 4
2021/11/04 13:34:27 Observable 2: 4
2021/11/04 13:34:27 Observable 1: 4
2021/11/04 13:34:28 producer fire 5
2021/11/04 13:34:28 Observable 2: 5
2021/11/04 13:34:28 Observable 1: 5
2021/11/04 13:34:29 producer fire 6
2021/11/04 13:34:29 Observable 2: 6
2021/11/04 13:34:29 Observable 1: 6
2021/11/04 13:34:30 producer fire 7
2021/11/04 13:34:30 Observable 1: 7
2021/11/04 13:34:30 Observable 2: 7
2021/11/04 13:34:31 producer fire 8
2021/11/04 13:34:31 Observable 2: 8
2021/11/04 13:34:31 Observable 1: 8
2021/11/04 13:34:32 cancel 1 observable
2021/11/04 13:34:32 sleep 5
2021/11/04 13:34:32 producer fire 9
2021/11/04 13:34:32 Observable 2: 9
2021/11/04 13:34:33 producer fire 10
2021/11/04 13:34:33 Observable 2: 10
2021/11/04 13:34:34 producer fire 11
2021/11/04 13:34:34 Observable 2: 11
2021/11/04 13:34:35 producer fire 12
2021/11/04 13:34:35 Observable 2: 12
2021/11/04 13:34:36 producer fire 13
2021/11/04 13:34:36 Observable 2: 13
2021/11/04 13:34:37 cancel 2 observable
2021/11/04 13:34:37 all observable canceled
2021/11/04 13:34:37 sleep 5
2021/11/04 13:34:37 producer fire 14
2021/11/04 13:34:38 producer fire 15
2021/11/04 13:34:39 producer fire 16
2021/11/04 13:34:40 producer fire 17
2021/11/04 13:34:41 producer fire 18
2021/11/04 13:34:42 cancel the producer
2021/11/04 13:34:42 sleep 5
2021/11/04 13:34:42 producer done
--- PASS: TestRxGo (25.01s)
PASS

@dayaftereh
Copy link
Author

Perfect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question regarding how RxGo is working etc.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants