Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] KIP-891: Multiversioning in Kafka Connect - Connector Multiversion Integration Test #18360

Open
wants to merge 233 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
233 commits
Select commit Hold shift + click to select a range
49b37c7
Add multiversioning apis to isolation
snehashisp Aug 23, 2024
1b740cc
add method for header converter
snehashisp Aug 23, 2024
3aba36e
init commit
snehashisp Aug 31, 2024
c96a8d1
update validation logic for connector versions
snehashisp Sep 19, 2024
017df62
add getter for all plugins for a class
snehashisp Sep 19, 2024
0f1cdaf
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 19, 2024
855e77a
minor edits
snehashisp Sep 21, 2024
4744f4f
Allow super to load class but do a version check
snehashisp Sep 21, 2024
c708ade
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 21, 2024
8beac31
validate the version when loaded from parent correctly
snehashisp Sep 21, 2024
df16999
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 21, 2024
6fb6471
Tested connector multiple verions
snehashisp Sep 22, 2024
84ed0da
revert changes to filestream sink
snehashisp Sep 22, 2024
814ddb7
bug in loadclass, should use fullname
snehashisp Sep 22, 2024
7905708
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 22, 2024
7497ef7
add available version to exception and minor changes
snehashisp Sep 22, 2024
27aba58
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 22, 2024
b9c0ab5
validation supporting connector version
snehashisp Oct 1, 2024
d9cad93
fix lf to cr
snehashisp Oct 1, 2024
7df37cf
Add configs and recommendors
snehashisp Oct 1, 2024
cbbbc95
converter tested
snehashisp Oct 2, 2024
5282a3f
Add converter validatoins
snehashisp Oct 2, 2024
7ad8a32
Remove changes to json
snehashisp Oct 5, 2024
2cdf67c
Validation and recommendors for versioning transformations and predic…
snehashisp Oct 10, 2024
53068f8
update recommendation logic
snehashisp Oct 10, 2024
7676408
minor fixes
snehashisp Oct 28, 2024
2119a91
Add default version getter
snehashisp Oct 28, 2024
eb3e693
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 28, 2024
9d83a1c
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Oct 28, 2024
dd150a5
add defaults
snehashisp Oct 28, 2024
79a7e02
Update default version
snehashisp Oct 28, 2024
f973fd0
add a null check
snehashisp Oct 29, 2024
cce53be
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 29, 2024
ec7892b
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Oct 29, 2024
baea3cf
Update log and comment
snehashisp Oct 29, 2024
5238dd0
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 29, 2024
ae243dc
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Oct 29, 2024
b7cf615
Update worker for versioned plugins
snehashisp Oct 29, 2024
efbe73b
adress backwards compatiblity
snehashisp Oct 30, 2024
f228fbc
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Oct 30, 2024
ddec0d8
Rename default version to latest version
snehashisp Oct 30, 2024
ab0c8d8
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 30, 2024
d5f765b
Add logic for getting defaults
snehashisp Oct 30, 2024
84fb4fe
fix worker converter fetching
snehashisp Oct 31, 2024
9ded1af
fix header converter version
snehashisp Oct 31, 2024
a385dd4
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 31, 2024
a490ac1
Remove existing default logic
snehashisp Oct 31, 2024
d929a02
remove unwanted imports
snehashisp Oct 31, 2024
edd1067
Remove unwanted imports
snehashisp Oct 31, 2024
8f79bc3
Update imports
snehashisp Oct 31, 2024
648691b
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Oct 31, 2024
9604dfe
refactor default version getting logic
snehashisp Oct 31, 2024
a9fe400
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Oct 31, 2024
74a0d8f
Use current classloader when version is not present
snehashisp Oct 31, 2024
7190808
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 31, 2024
c604c74
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Oct 31, 2024
680fffc
checkpoint
snehashisp Nov 9, 2024
bfa73e7
Remove transformation and predicate getters
snehashisp Nov 9, 2024
0cc0f8a
Merge branch 'multiversioning' into mvn-validation
snehashisp Nov 9, 2024
49f1522
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Nov 9, 2024
fbbb0b8
add rawtype annotation
snehashisp Nov 9, 2024
bd4b665
Merge branch 'multiversioning' into mvn-validation
snehashisp Nov 9, 2024
105e746
Merge branch 'multiversioning' into mvn-validation-transforms
snehashisp Nov 9, 2024
5d7a335
remove json changes
snehashisp Nov 10, 2024
69381f3
Add version configs to worker
snehashisp Nov 10, 2024
4edbe2a
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Nov 10, 2024
010dbd2
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Nov 10, 2024
6eb0f4c
worker and related changes
snehashisp Nov 10, 2024
8c334b9
Add a static loader swap method
snehashisp Nov 13, 2024
4a8f0d2
Merge branch 'multiversioning' into mvn-validation
snehashisp Nov 13, 2024
2c7693a
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Nov 13, 2024
e374438
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Nov 13, 2024
ba581ad
Use a different static swaploader
snehashisp Nov 13, 2024
c51270c
add null checks
snehashisp Nov 15, 2024
78080b9
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Nov 15, 2024
054624a
Add version to status
snehashisp Nov 16, 2024
530af08
Add version metrics
snehashisp Nov 24, 2024
5b39309
Add connector metadata to task configs
snehashisp Nov 24, 2024
8ac4d56
Minor updates to get connectors
snehashisp Nov 24, 2024
ff108b0
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Nov 24, 2024
4e3cc99
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Nov 24, 2024
62d5d05
Partial commit of metrics changes
snehashisp Nov 24, 2024
70e2dff
fix transformation, remove some changes
snehashisp Nov 24, 2024
cd3f21b
Merge branch 'mvn-runtime' into mvn-status
snehashisp Nov 24, 2024
4306dcf
Complete metrics code, need testing
snehashisp Nov 24, 2024
dac6c62
Add tags for transformation and predicates
snehashisp Nov 28, 2024
42dcaed
fix converter initialisation
snehashisp Nov 28, 2024
a100807
Merge branch 'mvn-runtime' into mvn-status
snehashisp Nov 28, 2024
27cb9b1
add comment for servie loading bug and rename some methods
snehashisp Nov 29, 2024
720d619
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Nov 29, 2024
1f04289
remove some unndeded code
snehashisp Nov 29, 2024
3b39e90
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Nov 29, 2024
7dbe1a7
update worker
snehashisp Nov 29, 2024
d5ecd91
Merge branch 'mvn-runtime' into mvn-status
snehashisp Nov 29, 2024
5fef3f4
Resolve comments on 1st review
snehashisp Dec 5, 2024
5e9f3c7
Remove extra code
snehashisp Dec 5, 2024
d796f22
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
b2b51c6
Add plugin version utils for parsing versions
snehashisp Dec 5, 2024
da2c5f5
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
f8e2837
Remove unused imports
snehashisp Dec 5, 2024
093e9ef
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Dec 5, 2024
46b7656
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 5, 2024
302b47d
Merge branch 'mvn-runtime' into mvn-status
snehashisp Dec 5, 2024
aff9ac1
Use PluginVersionUtils version range requirement
snehashisp Dec 5, 2024
56fd5af
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Dec 5, 2024
ee8a3be
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 5, 2024
6dca4af
Merge branch 'mvn-runtime' into mvn-status
snehashisp Dec 5, 2024
1c35aa9
Should return loader
snehashisp Dec 5, 2024
9c1c50d
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
ca43739
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Dec 5, 2024
b8e1363
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 5, 2024
3020d00
Merge branch 'mvn-runtime' into mvn-status
snehashisp Dec 5, 2024
8d64cfa
address first set of comments
snehashisp Dec 5, 2024
73ecfb1
Add a delegated newPlugin class
snehashisp Dec 5, 2024
037d9af
Update abstract herder converter plugin fetching
snehashisp Dec 5, 2024
229cfa9
Update pluginversion util
snehashisp Dec 5, 2024
a5e67de
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 5, 2024
155271b
fix incorrect classloader equality check
snehashisp Dec 5, 2024
1d41648
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
5efd761
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Dec 5, 2024
c578ceb
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 5, 2024
f09d508
Merge branch 'mvn-runtime' into mvn-status
snehashisp Dec 5, 2024
8c88d4c
fix whitespace issue
snehashisp Dec 5, 2024
d6e3392
Update version loading logic and add return delegating loader
snehashisp Dec 5, 2024
f969159
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
b6e2226
Combine loader logic
snehashisp Dec 5, 2024
e05bf9e
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
7d995ee
Use instanced PluginVersionUtil
snehashisp Dec 5, 2024
1ac60e4
Update default fetching and recommendor logic
snehashisp Dec 6, 2024
821a7fe
Don't expose some plugins methods
snehashisp Dec 6, 2024
b3c28f7
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 6, 2024
6d1aed4
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Dec 6, 2024
261e697
Update suffix for verison
snehashisp Dec 6, 2024
f52a8fc
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Dec 6, 2024
9e859dd
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 6, 2024
44bd97b
Use plugins.newPlugin
snehashisp Dec 6, 2024
5b41efe
add safe swap loader instead of static swaploader
snehashisp Dec 6, 2024
773d9b0
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 6, 2024
475514d
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Dec 6, 2024
0879414
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 6, 2024
d50bdd4
Merge branch 'mvn-runtime' into mvn-status
snehashisp Dec 6, 2024
e325ae3
use safe loader, revert classloader changes
snehashisp Dec 6, 2024
445e1ca
Merge branch 'mvn-runtime' into mvn-status
snehashisp Dec 6, 2024
dbbc843
Merge branch 'trunk' of https://github.com/apache/kafka into mvn-vali…
snehashisp Dec 8, 2024
0e89ba4
Cleanup and changes post multiversioning merge
snehashisp Dec 8, 2024
17ceac2
fix checkstyle
snehashisp Dec 8, 2024
3e6c910
Update and Isolate getConnector logic
snehashisp Dec 8, 2024
47d4991
fix tests for herders
snehashisp Dec 8, 2024
b0aee4c
attempt to fix large diff in distributed herder
snehashisp Dec 8, 2024
e0c3b16
spotlessJavaFix
snehashisp Dec 9, 2024
9a8fc1b
updates based on comments
snehashisp Dec 9, 2024
7efacbf
remove plugins.recommendor, use header converter defaults
snehashisp Dec 9, 2024
b980c03
minor changes
snehashisp Dec 9, 2024
871ae0a
Do validations for invalid versions
snehashisp Dec 10, 2024
f7d964d
Update plugin loading logic
snehashisp Dec 10, 2024
cba6924
test fix and minor adjustments
snehashisp Dec 10, 2024
520681c
bug fixes
snehashisp Dec 11, 2024
dc4f686
revert unwanted changes
snehashisp Dec 11, 2024
18b8d16
add recommendor for invalid version, attempt to remove unwanted update
snehashisp Dec 11, 2024
978e84b
remove changes in connector config
snehashisp Dec 11, 2024
b29b373
more unwanted change removal
snehashisp Dec 11, 2024
a0f2bd0
fix test
snehashisp Dec 11, 2024
72d86aa
checkstyle fix and remove gitignore changes
snehashisp Dec 11, 2024
14cf633
remove unwanted changes
snehashisp Dec 11, 2024
357d461
newline in gitignore
snehashisp Dec 11, 2024
642221d
Merge branch 'mvn-validation' into mvn-validation-transforms
snehashisp Dec 12, 2024
0c57bf9
remove unwanted changes
snehashisp Dec 12, 2024
87fb31a
Merge branch 'trunk' of https://github.com/apache/kafka into mvn-vali…
snehashisp Dec 13, 2024
4d8ec98
Attempt to fix diff
snehashisp Dec 13, 2024
53a7c78
checkstyle fix
snehashisp Dec 13, 2024
58869c1
fix bug in plugin versionloading and some tests
snehashisp Dec 13, 2024
e1a3ce0
add back concrete class check
snehashisp Dec 13, 2024
2fcf63a
fix abstract herder test
snehashisp Dec 13, 2024
6e997e9
Validate plugin type while getting verison to avoid showing wrong def…
snehashisp Dec 15, 2024
8458124
bug fixes to version fetching
snehashisp Dec 15, 2024
8d59940
checkstyle fix
snehashisp Dec 15, 2024
e0fa30d
add back config exception
snehashisp Dec 17, 2024
f3e82ab
use version plugin loading exception in validation
snehashisp Dec 18, 2024
56f8cd5
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 18, 2024
17788f7
add versioned plugin builder
snehashisp Dec 22, 2024
7e047ef
Add buildable versioned plugins
snehashisp Dec 23, 2024
3554ea8
Add transformation and predicate versioned test plugins
snehashisp Dec 23, 2024
17e0c46
fix bugs and add loading test
snehashisp Dec 23, 2024
8a221a4
Merge branch 'mvn-validation-transforms' into mvn-plugins-test
snehashisp Dec 23, 2024
09f717d
More multiversioning tests
snehashisp Dec 23, 2024
2cfbf62
fix newPlugin
snehashisp Dec 23, 2024
3cd1b4a
Merge branch 'mvn-validation-transforms' into mvn-plugins-test
snehashisp Dec 23, 2024
1d64748
add tests for different version ranges
snehashisp Dec 24, 2024
4590b4f
separate out source and sink connector, add test for versioned conver…
snehashisp Dec 24, 2024
ce13181
make plugin initialisation static
snehashisp Dec 24, 2024
82072a8
Add cache connector tests
snehashisp Dec 24, 2024
37560be
cached connectors should throw original exception
snehashisp Dec 25, 2024
7d92500
Merge branch 'mvn-validation-transforms' into mvn-plugins-test
snehashisp Dec 25, 2024
bc5e124
Merge branch 'mvn-plugins-test' into mvn-validation-tests
snehashisp Dec 25, 2024
1d400c1
add some validation tests
snehashisp Dec 25, 2024
551482f
Add connector config def tests
snehashisp Dec 25, 2024
388a22a
add plugin recommender tests
snehashisp Dec 26, 2024
0a1322d
Merge branch 'mvn-plugins-test' into mvn-validation-tests
snehashisp Dec 26, 2024
1705b19
add validation tests
snehashisp Dec 26, 2024
ac394b0
reorder operations in validate
snehashisp Dec 26, 2024
d6fd2fd
Merge branch 'mvn-validation-transforms' into mvn-plugins-test
snehashisp Dec 26, 2024
1966dd9
Merge branch 'mvn-plugins-test' into mvn-validation-tests
snehashisp Dec 26, 2024
639c38a
add more validation test
snehashisp Dec 27, 2024
128a07b
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 28, 2024
8b7835e
Fix checkstyle
snehashisp Dec 28, 2024
b9204d6
Remove unused method
snehashisp Dec 28, 2024
33f3f64
Merge branch 'mvn-validation-transforms' into mvn-runtime
snehashisp Dec 28, 2024
7e37d47
Header converter should check abstract config
snehashisp Dec 28, 2024
c6388bd
fix tests
snehashisp Dec 29, 2024
e3e579e
Merge branch 'mvn-runtime' into mvn-status
snehashisp Dec 30, 2024
1745443
changes after merge
snehashisp Dec 30, 2024
64a1f6f
instantiate connector and update tests
snehashisp Dec 30, 2024
05b199f
Merge branch 'mvn-runtime' into mvn-status
snehashisp Dec 30, 2024
4a560fa
more changes
snehashisp Dec 30, 2024
7b66579
checkstyle fix
snehashisp Dec 30, 2024
8c140a0
update tests
snehashisp Dec 30, 2024
395a722
Merge remote-tracking branch 'origin' into mvn-status
snehashisp Dec 30, 2024
d28ca2c
Merge branch 'mvn-validation-tests' into mvn-integration-tests
snehashisp Dec 30, 2024
ba41792
add task to versioned connector
snehashisp Dec 30, 2024
f318486
Merge branch 'mvn-plugins-test' into mvn-validation-tests
snehashisp Dec 30, 2024
e667aef
Merge branch 'mvn-validation-tests' into mvn-integration-tests
snehashisp Dec 30, 2024
abceeb7
initial commit with config builder
snehashisp Dec 30, 2024
8463beb
update taskconfigs
snehashisp Dec 30, 2024
fe7e67b
Merge branch 'mvn-plugins-test' into mvn-validation-tests
snehashisp Dec 30, 2024
74eb162
Merge branch 'mvn-validation-tests' into mvn-integration-tests
snehashisp Dec 30, 2024
e42425c
testing tests
snehashisp Dec 31, 2024
6483470
should extend sourcetask
snehashisp Dec 31, 2024
d166e23
Merge branch 'mvn-plugins-test' into mvn-validation-tests
snehashisp Dec 31, 2024
c143f54
Merge branch 'mvn-validation-tests' into mvn-integration-tests
snehashisp Dec 31, 2024
369374b
correct sourcetask
snehashisp Dec 31, 2024
9712626
Merge branch 'mvn-plugins-test' into mvn-validation-tests
snehashisp Dec 31, 2024
c037b34
Merge branch 'mvn-validation-tests' into mvn-integration-tests
snehashisp Dec 31, 2024
a2ed431
finalise tests
snehashisp Dec 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,83 +201,91 @@ public boolean isReady() {
@Override
public void onStartup(String connector) {
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING,
workerId, generation()));
workerId, generation(), worker.connectorVersion(connector)));
}

@Override
public void onStop(String connector) {
statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.STOPPED,
workerId, generation()));
workerId, generation(), worker.connectorVersion(connector)));
}

@Override
public void onPause(String connector) {
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.PAUSED,
workerId, generation()));
workerId, generation(), worker.connectorVersion(connector)));
}

@Override
public void onResume(String connector) {
statusBackingStore.put(new ConnectorStatus(connector, TaskStatus.State.RUNNING,
workerId, generation()));
workerId, generation(), worker.connectorVersion(connector)));
}

@Override
public void onShutdown(String connector) {
statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED,
workerId, generation()));
workerId, generation(), worker.connectorVersion(connector)));
}

@Override
public void onFailure(String connector, Throwable cause) {
statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED,
trace(cause), workerId, generation()));
trace(cause), workerId, generation(), worker.connectorVersion(connector)));
}

@Override
public void onStartup(ConnectorTaskId id) {
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation(), null,
worker.taskVersion(id)));
}

@Override
public void onFailure(ConnectorTaskId id, Throwable cause) {
statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause)));
statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause),
worker.taskVersion(id)));
}

@Override
public void onShutdown(ConnectorTaskId id) {
statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation()));
statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation(), null,
worker.taskVersion(id)));
}

@Override
public void onResume(ConnectorTaskId id) {
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation(), null,
worker.taskVersion(id)));
}

@Override
public void onPause(ConnectorTaskId id) {
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, workerId, generation()));
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, workerId, generation(), null,
worker.taskVersion(id)));
}

@Override
public void onDeletion(String connector) {
for (TaskStatus status : statusBackingStore.getAll(connector))
onDeletion(status.id());
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation(),
worker.connectorVersion(connector)));
}

@Override
public void onDeletion(ConnectorTaskId id) {
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation()));
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation(), null,
worker.taskVersion(id)));
}

public void onRestart(String connector) {
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RESTARTING,
workerId, generation()));
workerId, generation(), worker.connectorVersion(connector)));
}

public void onRestart(ConnectorTaskId id) {
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING, workerId, generation()));
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING, workerId, generation(), null,
worker.taskVersion(id)));
}

@Override
Expand Down Expand Up @@ -345,12 +353,12 @@ public ConnectorStateInfo connectorStatus(String connName) {
Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);

ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(
connector.state().toString(), connector.workerId(), connector.trace());
connector.state().toString(), connector.workerId(), connector.trace(), connector.version());
List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>();

for (TaskStatus status : tasks) {
taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(),
status.state().toString(), status.workerId(), status.trace()));
status.state().toString(), status.workerId(), status.trace(), status.version()));
}

Collections.sort(taskStates);
Expand Down Expand Up @@ -386,7 +394,7 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
throw new NotFoundException("No status found for task " + id);

return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(),
status.workerId(), status.trace());
status.workerId(), status.trace(), status.version());
}

protected Map<String, ConfigValue> validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map<String, String> config) {
Expand Down Expand Up @@ -619,7 +627,8 @@ public Optional<RestartPlan> buildRestartPlan(RestartRequest request) {
ConnectorStateInfo.ConnectorState connectorInfoState = new ConnectorStateInfo.ConnectorState(
connectorState.toString(),
connectorStatus.workerId(),
connectorStatus.trace()
connectorStatus.trace(),
connectorStatus.version()
);

// Collect the task states, If requested, mark the task as restarting
Expand All @@ -631,7 +640,8 @@ public Optional<RestartPlan> buildRestartPlan(RestartRequest request) {
taskStatus.id().task(),
taskState.toString(),
taskStatus.workerId(),
taskStatus.trace()
taskStatus.trace(),
taskStatus.version()
);
})
.collect(Collectors.toList());
Expand Down Expand Up @@ -859,9 +869,11 @@ ConfigInfos validateConnectorConfig(

addNullValuedErrors(connectorProps, validatedConnectorConfig);

ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
// the order of operations here is important, converter validations can add error messages to the connector config
// which are collected and converted to ConfigInfos in validateConnectorPluginSpecifiedConfigs
ConfigInfos converterConfigInfo = validateAllConverterConfigs(connectorProps, validatedConnectorConfig, connectorLoader, reportStage);
ConfigInfos clientOverrideInfo = validateClientOverrides(connectorProps, connectorType, connector.getClass(), reportStage, doLog);
ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage);

return mergeConfigInfos(connType,
connectorConfigInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,29 @@ public enum State {
private final State state;
private final String trace;
private final String workerId;
private final String version;
private final int generation;

public AbstractStatus(T id,
State state,
String workerId,
int generation,
String trace) {
String trace,
String version) {
this.id = id;
this.state = state;
this.workerId = workerId;
this.generation = generation;
this.trace = trace;
this.version = version;
}

public AbstractStatus(T id,
State state,
String workerId,
int generation,
String trace) {
this(id, state, workerId, generation, trace, null);
}

public T id() {
Expand All @@ -68,12 +79,17 @@ public int generation() {
return generation;
}

public String version() {
return version;
}

@Override
public String toString() {
return "Status{" +
"id=" + id +
", state=" + state +
", workerId='" + workerId + '\'' +
", version='" + version + '\'' +
", generation=" + generation +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
Expand All @@ -68,6 +69,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
Expand Down Expand Up @@ -230,11 +232,12 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor,
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {

super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
retryWithToleranceOperator, transformationChain, errorReportersSupplier,
time, statusBackingStore);
time, statusBackingStore, pluginLoaderSwapper);

this.workerConfig = workerConfig;
this.task = task;
Expand Down Expand Up @@ -382,6 +385,11 @@ public void execute() {
finalOffsetCommit(false);
}

@Override
public String taskVersion() {
return task.version();
}

/**
* Try to send a batch of records. If a send fails and is retriable, this saves the remainder of the batch so it can
* be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException.
Expand Down Expand Up @@ -485,11 +493,17 @@ protected ProducerRecord<byte[], byte[]> convertTransformedRecord(ProcessingCont

RecordHeaders headers = retryWithToleranceOperator.execute(context, () -> convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverter.getClass());

byte[] key = retryWithToleranceOperator.execute(context, () -> keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key()),
Stage.KEY_CONVERTER, keyConverter.getClass());
byte[] key = retryWithToleranceOperator.execute(context, () -> {
try (LoaderSwap swap = pluginLoaderSwapper.apply(keyConverter.getClass().getClassLoader())) {
return keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key());
}
}, Stage.KEY_CONVERTER, keyConverter.getClass());

byte[] value = retryWithToleranceOperator.execute(context, () -> valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value()),
Stage.VALUE_CONVERTER, valueConverter.getClass());
byte[] value = retryWithToleranceOperator.execute(context, () -> {
try (LoaderSwap swap = pluginLoaderSwapper.apply(valueConverter.getClass().getClassLoader())) {
return valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value());
}
}, Stage.VALUE_CONVERTER, valueConverter.getClass());

if (context.failed()) {
return null;
Expand Down Expand Up @@ -545,8 +559,11 @@ protected RecordHeaders convertHeaderFor(SourceRecord record) {
String topic = record.topic();
for (Header header : headers) {
String key = header.key();
byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
result.add(key, rawHeader);
try (LoaderSwap swap = pluginLoaderSwapper.apply(headerConverter.getClass().getClassLoader())) {
byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
result.add(key, rawHeader);
}

}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime;

import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;

Expand All @@ -31,8 +32,8 @@ public class CachedConnectors {
private static final String LATEST_VERSION = "latest";

private final Map<String, Map<String, Connector>> connectors;
private final Map<String, Exception> invalidConnectors;
private final Map<String, Map<String, Exception>> invalidVersions;
private final Map<String, Throwable> invalidConnectors;
private final Map<String, Map<String, VersionedPluginLoadingException>> invalidVersions;
private final Plugins plugins;

public CachedConnectors(Plugins plugins) {
Expand All @@ -42,14 +43,14 @@ public CachedConnectors(Plugins plugins) {
this.invalidVersions = new ConcurrentHashMap<>();
}

private void validate(String connectorName, VersionRange range) throws Exception {
private void validate(String connectorName, VersionRange range) throws ConnectException, VersionedPluginLoadingException {
if (invalidConnectors.containsKey(connectorName)) {
throw new Exception(invalidConnectors.get(connectorName));
throw new ConnectException(invalidConnectors.get(connectorName));
}

String version = range == null ? LATEST_VERSION : range.toString();
if (invalidVersions.containsKey(connectorName) && invalidVersions.get(connectorName).containsKey(version)) {
throw new Exception(invalidVersions.get(connectorName).get(version));
throw new VersionedPluginLoadingException(invalidVersions.get(connectorName).get(version).getMessage());
}
}

Expand Down
Loading
Loading