...
 
Commits (3)
......@@ -493,16 +493,6 @@ func (b *changefeedResumer) Resume(
details := b.job.Details().(jobspb.ChangefeedDetails)
progress := b.job.Progress()
// TODO(dan): This is a workaround for not being able to set an initial
// progress high-water when creating a job (currently only the progress
// details can be set). I didn't want to pick off the refactor to get this
// fix in, but it'd be nice to remove this hack.
if _, ok := details.Opts[optCursor]; ok {
if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) {
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
}
// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
......@@ -514,6 +504,19 @@ func (b *changefeedResumer) Resume(
}
var err error
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
if err := b.job.CheckStatus(ctx); err != nil {
log.Infof(ctx, `CHANGEFEED job %d exiting due to status: %v`, jobID, err)
return err
}
// TODO(dan): This is a workaround for not being able to set an initial
// progress high-water when creating a job (currently only the progress
// details can be set). I didn't want to pick off the refactor to get this
// fix in, but it'd be nice to remove this hack.
if _, ok := details.Opts[optCursor]; ok {
if h := progress.GetHighWater(); h == nil || *h == (hlc.Timestamp{}) {
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
}
if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil {
return nil
}
......@@ -542,6 +545,7 @@ func (b *changefeedResumer) Resume(
// on the channel, causing the changefeed flow to block. Replace it with
// a dummy channel.
startedCh = make(chan tree.Datums, 1)
}
// We only hit this if `r.Next()` returns false, which right now only happens
// on context cancellation.
......
......@@ -1992,6 +1992,115 @@ func TestChangefeedMemBufferCapacity(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}
// TestChangefeedRespectCursorOnRestart ensures that even if a rangefeed with a
// cursor fails before writing its first high water mark, it will retry and not
// emit a backfill.
//
// Regression test for #48746.
func TestChangefeedRespectCursorOnRestart(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
beforeEmitRowCh := make(chan error)
done := make(chan struct{})
defer close(done)
knobs.AfterSinkFlush = func() error {
select {
case <-done:
return nil
case err := <-beforeEmitRowCh:
return err
}
}
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0), (1), (2), (3)`)
var afterInsert string
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp() + 100`).Scan(&afterInsert)
startTime := parseTimeToHLC(t, afterInsert)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor = $1, resolved = $2, updated`, afterInsert, (10 * time.Millisecond).String()).(*cdctest.TableFeed)
defer closeFeed(t, foo)
beforeEmitRowCh <- MarkRetryableError(errors.New("boom"))
close(beforeEmitRowCh)
for {
if ts := expectResolvedTimestamp(t, foo); startTime.Less(ts) {
break
}
}
sqlDB.Exec(t, `INSERT INTO foo VALUES (5)`)
assertPayloadsStripTs(t, foo, []string{
`foo: [5]->{"after": {"a": 5}}`,
})
}
t.Run("enterprise", enterpriseTest(testFn))
}
// TestCanceledRangefeedsStop ensures that rangefeeds stop if they are canceled
// after hitting a retriable error. There previously was a bug whereby canceled
// changefeeds which continued to hit retriable errors would never stop.
func TestCanceledRangefeedsStop(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
beforeEmitRowCh := make(chan struct{})
done := make(chan struct{})
defer close(done)
knobs.AfterSinkFlush = func() error {
select {
case <-done:
return nil
case <-beforeEmitRowCh:
return MarkRetryableError(errors.New("boom"))
}
}
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0), (1), (2), (3)`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = $1`, (10 * time.Millisecond).String()).(*cdctest.TableFeed)
defer closeFeed(t, foo)
assertPayloadsStripTs(t, foo, []string{
`foo: [0]->{"after": {"a": 0}}`,
`foo: [1]->{"after": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3}}`,
})
scope := log.Scope(t)
defer scope.Close(t)
beforeEmitRowCh <- struct{}{}
sqlDB.Exec(t, `CANCEL JOB $1`, foo.JobID)
close(beforeEmitRowCh)
testutils.SucceedsSoon(t, func() error {
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
// This pattern is used when a changefeed exits due to cancelation.
regexp.MustCompile("exiting due to status"))
if err != nil {
t.Fatal(err)
}
if len(entries) > 1 {
t.Fatalf("only ever expected 1 log line, got %v", entries)
}
if len(entries) != 1 {
return errors.Errorf("log line not found.")
}
return nil
})
}
t.Run("enterprise", enterpriseTest(testFn))
}
// Regression test for #41694
func TestChangefeedRestartDuringBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
......