diff --git a/materialize-s3-iceberg/catalog.go b/materialize-s3-iceberg/catalog.go index dc0aa76b14..a3693175ef 100644 --- a/materialize-s3-iceberg/catalog.go +++ b/materialize-s3-iceberg/catalog.go @@ -211,42 +211,34 @@ func (c *catalog) UpdateResource(_ context.Context, spec *pf.MaterializationSpec }, nil } -type tableAppend struct { - Table string `json:"table"` - PreviousCheckpoint string `json:"prev_checkpoint"` - NextCheckpoint string `json:"next_checkpoint"` - FilePaths []string `json:"file_paths"` -} - func (c *catalog) appendFiles( ctx context.Context, materialization string, - tableAppends []tableAppend, + tablePath []string, + filePaths []string, + prevCheckpoint string, + nextCheckpoint string, ) error { - input, err := json.Marshal(tableAppends) - if err != nil { - return nil - } + fqn := pathToFQN(tablePath) b, err := runIcebergctl( ctx, c.cfg, "append-files", materialization, - string(input), + fqn, + prevCheckpoint, + nextCheckpoint, + strings.Join(filePaths, ","), ) if err != nil { return err } if len(b) > 0 { - output := make(map[string]string) - if err := json.Unmarshal(b, &output); err != nil { - return err - } - log.WithFields(log.Fields{ - "output": output, + "table": fqn, + "output": string(b), }).Info("append files") } diff --git a/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py b/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py index 9d04050788..98747cfd98 100644 --- a/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py +++ b/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py @@ -298,88 +298,25 @@ def alter_table( update.update_column(path=c, required=False) -class TableAppend(BaseModel): - table: str - prev_checkpoint: str - next_checkpoint: str - file_paths: list[str] - - -async def append_to_table( - catalog: Catalog, materialization: str, table_append: TableAppend -) -> str: - table = table_append.table - prev_checkpoint = table_append.prev_checkpoint - next_checkpoint = table_append.next_checkpoint - - tbl = catalog.load_table(table) - checkpoints = TypeAdapter(dict[str, str]).validate_json( - tbl.properties.get("flow_checkpoints_v1", "{}") - ) - cp = checkpoints.get( - materialization, "" - ) # prev_checkpoint will be unset if this is the first commit to the table - - if cp == next_checkpoint: - return f"checkpoint is already '{next_checkpoint}'" - elif cp != "" and cp != prev_checkpoint: - # An absent checkpoint table property is allowed to accommodate cases - # where the user may have manually dropped the table and the - # materialization automatically re-created it, outside the normal - # backfill counter increment process. - raise Exception( - f"checkpoint from snapshot ({cp}) did not match either previous ({prev_checkpoint}) or next ({next_checkpoint}) checkpoint" - ) - - # Files are only added if the table checkpoint property has the prior checkpoint. The checkpoint - # property is updated to the current checkpoint in an atomic operation with appending the files. - # Note that this is not 100% correct exactly-once semantics, since there is a potential race - # between retrieving the table properties and appending the files, where a zombie process could - # append the same files concurrently. In principal Iceberg catalogs support the atomic - # operations necessary for true exactly-once semantics, but we'd need to work with the catalog - # at a lower level than PyIceberg currently makes available. - checkpoints[materialization] = next_checkpoint - txn = tbl.transaction() - txn.add_files(table_append.file_paths) - txn.set_properties({"flow_checkpoints_v1": json.dumps(checkpoints)}) - txn.commit_transaction() - - tbl = catalog.load_table(table) - return f"updated flow_checkpoints_v1 property to {next_checkpoint}" - - -async def run_appends( - catalog: Catalog, materialization: str, table_appends: list[TableAppend] -) -> dict[str, str]: - sem = asyncio.Semaphore(5) - - async def run_append(table_append: TableAppend) -> tuple[str, str]: - async with sem: - return ( - table_append.table, - await append_to_table(catalog, materialization, table_append), - ) - - return dict( - await asyncio.gather( - *(run_append(table_append) for table_append in table_appends) - ) - ) - - @run.command() @click.argument("materialization", type=str) -@click.argument("table-appends", type=str) +@click.argument("table", type=str) +@click.argument("prev-checkpoint", type=str) +@click.argument("next-checkpoint", type=str) +@click.argument("file-paths", type=str) @click.pass_context def append_files( ctx: Context, materialization: str, - table_appends: str, + table: str, + prev_checkpoint: str, + next_checkpoint: str, + file_paths: str, ): ''' - Appends files per the provided list. + Appends files at "file-paths" to the table. - The "prev_checkpoint" and "next_checkpoint" properties are used to provide a best-effort + The "prev-checkpoint" and "next-checkpoint" arguments are used to provide a best-effort avoidance of duplicating data from appending the same files that have previously been appended. A possible scenario is this: Files are successfully appended to Table1 and Table2 but not Table3 in response to the connector receiving a StartCommit message, but the connector is restarted @@ -389,10 +326,10 @@ def append_files( need to have the files appended. When a table is updated to append files, its "checkpoint" property is updated to - "next_checkpoint", and only tables with "checkpoint" equal to "prev_checkpoint" are appended to. + "next-checkpoint", and only tables with "checkpoint" equal to "prev-checkpoint" are appended to. The previously described scenario would then play out like this: - 1) The materialization connector persists values for "prev_checkpoint" and "next_checkpoint" of + 1) The materialization connector persists values for "prev-checkpoint" and "next-checkpoint" of "0001" and "0002", respectively, in its driver checkpoint via StartedCommit. 2) During the partial completion of the transaction, Table1 and Table2 are updated to have a @@ -410,12 +347,39 @@ def append_files( catalog = ctx.obj["catalog"] assert isinstance(catalog, Catalog) + tbl = catalog.load_table(table) + checkpoints = TypeAdapter(dict[str, str]).validate_json(tbl.properties.get("flow_checkpoints_v1", "{}")) + cp = checkpoints.get(materialization, "") # prev_checkpoint will be unset if this is the first commit to the table - res = asyncio.run( - run_appends(catalog, materialization, TypeAdapter(list[TableAppend]).validate_json(table_appends)) - ) - print(json.dumps(res)) + if cp == next_checkpoint: + print(f"checkpoint is already '{next_checkpoint}'") + return # already appended these files + elif cp != "" and cp != prev_checkpoint: + # An absent checkpoint table property is allowed to accommodate cases + # where the user may have manually dropped the table and the + # materialization automatically re-created it, outside the normal + # backfill counter increment process. + raise Exception( + f"checkpoint from snapshot ({cp}) did not match either previous ({prev_checkpoint}) or next ({next_checkpoint}) checkpoint" + ) + + # Files are only added if the table checkpoint property has the prior checkpoint. The checkpoint + # property is updated to the current checkpoint in an atomic operation with appending the files. + # Note that this is not 100% correct exactly-once semantics, since there is a potential race + # between retrieving the table properties and appending the files, where a zombie process could + # append the same files concurrently. In principal Iceberg catalogs support the atomic + # operations necessary for true exactly-once semantics, but we'd need to work with the catalog + # at a lower level than PyIceberg currently makes available. + checkpoints[materialization] = next_checkpoint + txn = tbl.transaction() + txn.add_files(file_paths.split(",")) + txn.set_properties({"flow_checkpoints_v1": json.dumps(checkpoints)}) + txn.commit_transaction() + # TODO(whb): This additional logging should not really be necessary, but is + # included for now to assist in troubleshooting potential errors. + tbl = catalog.load_table(table) + print(f"{table} updated with flow_checkpoints_v1 property of {tbl.properties.get("flow_checkpoints_v1")}") if __name__ == "__main__": run(auto_envvar_prefix="ICEBERG") diff --git a/materialize-s3-iceberg/transactor.go b/materialize-s3-iceberg/transactor.go index 027e433a00..1098ed933b 100644 --- a/materialize-s3-iceberg/transactor.go +++ b/materialize-s3-iceberg/transactor.go @@ -247,8 +247,6 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) { } func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) { - var appends []tableAppend - for _, b := range t.bindings { bindingState := t.state.BindingStates[b.stateKey] @@ -256,22 +254,21 @@ func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error continue // no data for this binding } - appends = append(appends, tableAppend{ - Table: pathToFQN(b.path), - PreviousCheckpoint: bindingState.PreviousCheckpoint, - NextCheckpoint: bindingState.CurrentCheckpoint, - FilePaths: bindingState.FileKeys, + ll := log.WithFields(log.Fields{ + "table": pathToFQN(b.path), + "previousCheckoint": bindingState.PreviousCheckpoint, + "currentCheckpoint": bindingState.CurrentCheckpoint, }) - bindingState.FileKeys = nil // reset for next txn - } - - if len(appends) > 0 { + ll.Info("starting appendFiles for table") appendCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - if err := t.catalog.appendFiles(appendCtx, t.materialization, appends); err != nil { - return nil, fmt.Errorf("appendFiles: %w", err) + if err := t.catalog.appendFiles(appendCtx, t.materialization, b.path, bindingState.FileKeys, bindingState.PreviousCheckpoint, bindingState.CurrentCheckpoint); err != nil { + return nil, fmt.Errorf("appendFiles for %s: %w", b.path, err) } + ll.Info("finished appendFiles for table") + + bindingState.FileKeys = nil // reset for next txn } checkpointJSON, err := json.Marshal(t.state)