Skip to content

Commit

Permalink
wip persist
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 15, 2024
1 parent 8cb1209 commit cff0892
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
28 changes: 11 additions & 17 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

type MessageLSN struct {
msg pglogrepl.Message
lsn pglogrepl.LSN
}

type PostgresCDCSource struct {
*PostgresConnector
*PostgresCDCConfig
typeMap *pgtype.Map
commitLock *pglogrepl.BeginMessage
txBuffer map[uint32][]MessageLSN
txBuffer map[uint32][][]byte
inStream bool
}

Expand All @@ -60,9 +55,9 @@ type startReplicationOpts struct {
}

func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
var txBuffer map[uint32][]MessageLSN
var txBuffer map[uint32][][]byte
if cdcConfig.Version >= 2 {
txBuffer = make(map[uint32][]MessageLSN)
txBuffer = make(map[uint32][][]byte)
}
return &PostgresCDCSource{
PostgresConnector: c,
Expand Down Expand Up @@ -476,7 +471,7 @@ func PullCdcRecords[Items model.Items](

logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
if err := recordProcessor.processXLogData(ctx, p, xld, clientXLogPos); err != nil {
if err := recordProcessor.processXLogData(ctx, p, xld, msg.Data[1:], clientXLogPos); err != nil {
return fmt.Errorf("error processing message: %w", err)
}

Expand All @@ -502,6 +497,7 @@ func (rp *cdcRecordProcessor[Items]) processXLogData(
ctx context.Context,
p *PostgresCDCSource,
xld pglogrepl.XLogData,
xldbytes []byte,
currentClientXlogPos pglogrepl.LSN,
) error {
var logicalMsg pglogrepl.Message
Expand Down Expand Up @@ -534,15 +530,15 @@ func (rp *cdcRecordProcessor[Items]) processMessage(
case *pglogrepl.InsertMessage:
return rp.processInsertMessage(p, lsn, msg)
case *pglogrepl.InsertMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.InsertMessage, lsn: lsn})
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], xldbytes)
case *pglogrepl.UpdateMessage:
return rp.processUpdateMessage(p, lsn, msg)
case *pglogrepl.UpdateMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.UpdateMessage, lsn: lsn})
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], xldbytes)
case *pglogrepl.DeleteMessage:
return rp.processDeleteMessage(p, lsn, msg)
case *pglogrepl.DeleteMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.DeleteMessage, lsn: lsn})
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], xldbytes)
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
Expand Down Expand Up @@ -572,7 +568,7 @@ func (rp *cdcRecordProcessor[Items]) processMessage(

return rp.processRelationMessage(ctx, p, currentClientXlogPos, msg)
case *pglogrepl.RelationMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.RelationMessage, lsn: lsn})
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], xldbytes)
case *pglogrepl.StreamStartMessageV2:
p.inStream = true
case *pglogrepl.StreamStopMessageV2:
Expand Down Expand Up @@ -748,8 +744,7 @@ func (rp *cdcRecordProcessor[Items]) processDeleteMessage(
}
isFullReplica := rp.pullRequest.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec)
if err != nil {
if err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -778,8 +773,7 @@ func (rp *cdcRecordProcessor[Items]) processDeleteMessage(

// A delete can only be followed by an INSERT, which does not need backfilling
// No need to store DeleteRecords in memory or disk.
err = rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec)
if err != nil {
if err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec); err != nil {
return err
}
}
Expand Down
7 changes: 7 additions & 0 deletions nexus/catalog/migrations/V26__v2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
create table v2cdc (
flow_name text,
xid xid,
lsn pg_lsn,
stream bytea[],
primary key (flow_name, xid, lsn)
);

0 comments on commit cff0892

Please sign in to comment.