Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: PaneInfo not populated in Go SDK #31153

Open
2 of 16 tasks
camphillips22 opened this issue May 1, 2024 · 4 comments · May be fixed by #31174
Open
2 of 16 tasks

[Bug]: PaneInfo not populated in Go SDK #31153

camphillips22 opened this issue May 1, 2024 · 4 comments · May be fixed by #31174

Comments

@camphillips22
Copy link
Contributor

camphillips22 commented May 1, 2024

What happened?

I'm attempting to use early triggering and PaneInfo to limit bundle sizes to avoid running into the dataflow limit of 80MB and have found that PaneInfo does not appear to be populated correctly.

Runner: Dataflow
Beam Version: 2.55.1

Here's a test that I believe demonstrates the problem:

func init() {
	register.Function2x0(produceFn)
	register.Function4x0(getPanes)
	register.Emitter1[int]()
}

func produceFn(_ []byte, emit func(beam.EventTime, int)) {
	baseT := mtime.Now()
	for i := 0; i < 10; i++ {
		emit(baseT.Add(time.Minute), i)
	}
}

func Produce(s beam.Scope) beam.PCollection {
	return beam.ParDo(s, produceFn, beam.Impulse(s))
}

func getPanes(ctx context.Context, pi typex.PaneInfo, _ int, emit func(int)) {
	log.Output(ctx, log.SevWarn, 0, fmt.Sprintf("got pane %+v", pi))
	emit(int(pi.Index))
}
func TestPanes(t *testing.T) {
	p, scp := beam.NewPipelineWithRoot()

	c := Produce(scp)
	windowed := beam.WindowInto(
		scp,
		window.NewFixedWindows(5*time.Minute),
		c,
		beam.Trigger(trigger.AfterEndOfWindow().
			EarlyFiring(
				trigger.Repeat(
					trigger.AfterCount(2),
				),
			),
		),
		beam.PanesDiscard(),
	)
	panes := beam.ParDo(scp, getPanes, windowed)
	paneIdxs := beam.WindowInto(scp, window.NewGlobalWindows(), panes)
	passert.Count(scp, paneIdxs, "pane idxs", 10)
	passert.EqualsList(scp, paneIdxs, []int{0, 0, 1, 1, 2, 0, 0, 1, 1, 2})
	ptest.RunAndValidate(t, p)
}

The logs are all:

got pane {Timing:0 IsFirst:false IsLast:false Index:0 NonSpeculativeIndex:0}

Even if I don't have the indexes correct in the test (the test is failing on the EqualsList), I would expect these to be internally consistent. That is, I would expect there to be at least one IsFirst:true and IsLast:true each.

Issue Priority

Priority: 2 (default)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@camphillips22
Copy link
Contributor Author

I messed up the bug report (submitted too early) and it got classified as P3 whereas I think it should be P2

@camphillips22 camphillips22 linked a pull request May 3, 2024 that will close this issue
4 tasks
@camphillips22
Copy link
Contributor Author

I've tossed up a draft PR that essentially tries to pipe pane info through all the FullValue writes. I now am getting the default NoFiringPane() in the log. Still working on figuring out why the triggering info is not being populated as expected.

got paneinfo: {Timing:3 IsFirst:true IsLast:true Index:0 NonSpeculativeIndex:0}

@camphillips22
Copy link
Contributor Author

Still working on figuring out why the triggering info is not being populated as expected.

I think it was actually coming in correctly, but I was misapplying triggering, so things were not happening as expected because I was attempting to use paneinfo before bundles were being committed to the backend.

@lostluck
Copy link
Contributor

lostluck commented May 9, 2024

First, thank for finding and reporting this!

Agreed that p2 is more appropriate for this issue generally, updated labels.

But probably not higher than that. In principle, using State and Timers should enable the same semantics as they are lower level primitives. BUT, that won't work very well for executions on Batch Dataflow, since timers behave differently when all data is available a-priory.


This would be a blocker for getting triggers working properly on the Go SDK's local runner, Prism, as it's not doing anything with Panes or Triggers at present, though that work is coming up. (see #29650 for the Prism implementation list).

And proper Pane propagation would allow for implementing natively in the Go SDK sophisticated Streaming enabled File Sinks, which rely on correct pane information to output and update files written in an unbounded pipeline.


The example code is demonstrating that the default pane isn't being set to the NoFiringPane. That is a bug, that is probably broken due to a lack of propagation and should be fixed.

What it's not demonstrating is that the pane should be different due to a trigger.

IIRC Triggers only resolve at the downstream GBK/Aggregation, so that's when there would be multiple firings, and different Panes.

Panes are only updated after a trigger is enacted ("fired") from a runner source, like after a GBK. More precisely, The default "No Firing Pane" is the expected default until a trigger actually resolves. The "No Firing Pane" means the given pane was not due to a trigger firing.

So, having the following pipeline should show different firings:

  • DoFn: produce some data
  • WindowInto with some triggers set to fire early (eg, by num elements), and have the default firing.
  • GBK
  • DoFn: Detect panes here.
  • Other Dofn: Do more stuff.
  • DoFn: Detect the same panes here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants