Skip to content

Commit aed3af6

Browse files
committed
chore(pipe): more e2e tests
1 parent a9243f5 commit aed3af6

File tree

1 file changed

+361
-0
lines changed

1 file changed

+361
-0
lines changed

pkg/apps/pipe/ssh_test.go

Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,3 +1011,364 @@ func TestSub_WithoutKeepAliveExitsAfterPublisher(t *testing.T) {
10111011
_ = subSession.Close()
10121012
}
10131013
}
1014+
1015+
func TestPub_EmptyMessage(t *testing.T) {
1016+
server := NewTestSSHServer(t)
1017+
defer server.Shutdown()
1018+
1019+
user := GenerateUser("alice")
1020+
RegisterUserWithServer(server, user)
1021+
1022+
subClient, err := user.NewClient()
1023+
if err != nil {
1024+
t.Fatalf("failed to connect subscriber: %v", err)
1025+
}
1026+
defer func() { _ = subClient.Close() }()
1027+
1028+
pubClient, err := user.NewClient()
1029+
if err != nil {
1030+
t.Fatalf("failed to connect publisher: %v", err)
1031+
}
1032+
defer func() { _ = pubClient.Close() }()
1033+
1034+
// Start subscriber
1035+
subSession, err := subClient.NewSession()
1036+
if err != nil {
1037+
t.Fatalf("failed to create sub session: %v", err)
1038+
}
1039+
defer func() { _ = subSession.Close() }()
1040+
1041+
subStdout, err := subSession.StdoutPipe()
1042+
if err != nil {
1043+
t.Fatalf("failed to get sub stdout: %v", err)
1044+
}
1045+
1046+
if err := subSession.Start("sub emptytopic -c"); err != nil {
1047+
t.Fatalf("failed to start sub: %v", err)
1048+
}
1049+
1050+
time.Sleep(100 * time.Millisecond)
1051+
1052+
// Publish with -e flag (empty message) - should not require stdin
1053+
output, err := user.RunCommand(pubClient, "pub emptytopic -e -c")
1054+
if err != nil {
1055+
t.Logf("pub -e completed: %v, output: %s", err, output)
1056+
}
1057+
1058+
// Subscriber should receive something (even if empty/minimal)
1059+
// The -e flag sends a 1-byte buffer
1060+
received := make([]byte, 10)
1061+
n, err := subStdout.Read(received)
1062+
if err != nil && err != io.EOF {
1063+
t.Logf("read result: n=%d, err=%v", n, err)
1064+
}
1065+
1066+
// With -e flag, we expect to receive at least 1 byte
1067+
if n < 1 {
1068+
t.Errorf("subscriber should receive empty message signal, got %d bytes", n)
1069+
}
1070+
}
1071+
1072+
func TestPipe_AccessControl(t *testing.T) {
1073+
server := NewTestSSHServer(t)
1074+
defer server.Shutdown()
1075+
1076+
alice := GenerateUser("alice")
1077+
bob := GenerateUser("bob")
1078+
RegisterUserWithServer(server, alice)
1079+
RegisterUserWithServer(server, bob)
1080+
1081+
aliceClient, err := alice.NewClient()
1082+
if err != nil {
1083+
t.Fatalf("failed to connect alice: %v", err)
1084+
}
1085+
defer func() { _ = aliceClient.Close() }()
1086+
1087+
bobClient, err := bob.NewClient()
1088+
if err != nil {
1089+
t.Fatalf("failed to connect bob: %v", err)
1090+
}
1091+
defer func() { _ = bobClient.Close() }()
1092+
1093+
// Alice creates a pipe with access control allowing bob
1094+
aliceSession, err := aliceClient.NewSession()
1095+
if err != nil {
1096+
t.Fatalf("failed to create alice session: %v", err)
1097+
}
1098+
defer func() { _ = aliceSession.Close() }()
1099+
1100+
aliceStdin, err := aliceSession.StdinPipe()
1101+
if err != nil {
1102+
t.Fatalf("failed to get alice stdin: %v", err)
1103+
}
1104+
1105+
aliceStdout, err := aliceSession.StdoutPipe()
1106+
if err != nil {
1107+
t.Fatalf("failed to get alice stdout: %v", err)
1108+
}
1109+
1110+
if err := aliceSession.Start("pipe accesspipe -a alice,bob -c"); err != nil {
1111+
t.Fatalf("failed to start alice pipe: %v", err)
1112+
}
1113+
1114+
time.Sleep(100 * time.Millisecond)
1115+
1116+
// Bob joins the pipe using alice's namespace
1117+
bobSession, err := bobClient.NewSession()
1118+
if err != nil {
1119+
t.Fatalf("failed to create bob session: %v", err)
1120+
}
1121+
defer func() { _ = bobSession.Close() }()
1122+
1123+
bobStdin, err := bobSession.StdinPipe()
1124+
if err != nil {
1125+
t.Fatalf("failed to get bob stdin: %v", err)
1126+
}
1127+
1128+
bobStdout, err := bobSession.StdoutPipe()
1129+
if err != nil {
1130+
t.Fatalf("failed to get bob stdout: %v", err)
1131+
}
1132+
1133+
if err := bobSession.Start("pipe alice/accesspipe -c"); err != nil {
1134+
t.Fatalf("failed to start bob pipe: %v", err)
1135+
}
1136+
1137+
time.Sleep(100 * time.Millisecond)
1138+
1139+
// Alice sends message to bob
1140+
aliceMsg := "hello bob\n"
1141+
_, err = aliceStdin.Write([]byte(aliceMsg))
1142+
if err != nil {
1143+
t.Fatalf("alice failed to write: %v", err)
1144+
}
1145+
1146+
bobReceived := make([]byte, 100)
1147+
n, _ := bobStdout.Read(bobReceived)
1148+
if !strings.Contains(string(bobReceived[:n]), "hello bob") {
1149+
t.Errorf("bob did not receive alice's message, got: %q", string(bobReceived[:n]))
1150+
}
1151+
1152+
// Bob sends message to alice
1153+
bobMsg := "hello alice\n"
1154+
_, err = bobStdin.Write([]byte(bobMsg))
1155+
if err != nil {
1156+
t.Fatalf("bob failed to write: %v", err)
1157+
}
1158+
1159+
aliceReceived := make([]byte, 100)
1160+
n, _ = aliceStdout.Read(aliceReceived)
1161+
if !strings.Contains(string(aliceReceived[:n]), "hello alice") {
1162+
t.Errorf("alice did not receive bob's message, got: %q", string(aliceReceived[:n]))
1163+
}
1164+
}
1165+
1166+
func TestPipe_Replay(t *testing.T) {
1167+
server := NewTestSSHServer(t)
1168+
defer server.Shutdown()
1169+
1170+
user := GenerateUser("alice")
1171+
RegisterUserWithServer(server, user)
1172+
1173+
client, err := user.NewClient()
1174+
if err != nil {
1175+
t.Fatalf("failed to connect: %v", err)
1176+
}
1177+
defer func() { _ = client.Close() }()
1178+
1179+
// Start pipe with replay flag (-r)
1180+
session, err := client.NewSession()
1181+
if err != nil {
1182+
t.Fatalf("failed to create session: %v", err)
1183+
}
1184+
defer func() { _ = session.Close() }()
1185+
1186+
stdin, err := session.StdinPipe()
1187+
if err != nil {
1188+
t.Fatalf("failed to get stdin: %v", err)
1189+
}
1190+
1191+
stdout, err := session.StdoutPipe()
1192+
if err != nil {
1193+
t.Fatalf("failed to get stdout: %v", err)
1194+
}
1195+
1196+
if err := session.Start("pipe replaytopic -r -c"); err != nil {
1197+
t.Fatalf("failed to start pipe: %v", err)
1198+
}
1199+
1200+
time.Sleep(100 * time.Millisecond)
1201+
1202+
// Send a message - with -r flag, should receive it back
1203+
testMsg := "echo back\n"
1204+
_, err = stdin.Write([]byte(testMsg))
1205+
if err != nil {
1206+
t.Fatalf("failed to write: %v", err)
1207+
}
1208+
1209+
received := make([]byte, 100)
1210+
n, err := stdout.Read(received)
1211+
if err != nil && err != io.EOF {
1212+
t.Logf("read error: %v", err)
1213+
}
1214+
1215+
if !strings.Contains(string(received[:n]), "echo back") {
1216+
t.Errorf("with -r flag, sender should receive own message back, got: %q", string(received[:n]))
1217+
}
1218+
}
1219+
1220+
func TestAccessControl_UnauthorizedUserDenied(t *testing.T) {
1221+
server := NewTestSSHServer(t)
1222+
defer server.Shutdown()
1223+
1224+
alice := GenerateUser("alice")
1225+
bob := GenerateUser("bob")
1226+
charlie := GenerateUser("charlie")
1227+
RegisterUserWithServer(server, alice)
1228+
RegisterUserWithServer(server, bob)
1229+
RegisterUserWithServer(server, charlie)
1230+
1231+
aliceClient, err := alice.NewClient()
1232+
if err != nil {
1233+
t.Fatalf("failed to connect alice: %v", err)
1234+
}
1235+
defer func() { _ = aliceClient.Close() }()
1236+
1237+
charlieClient, err := charlie.NewClient()
1238+
if err != nil {
1239+
t.Fatalf("failed to connect charlie: %v", err)
1240+
}
1241+
defer func() { _ = charlieClient.Close() }()
1242+
1243+
// Alice creates a topic with access only for alice and bob (not charlie)
1244+
aliceSession, err := aliceClient.NewSession()
1245+
if err != nil {
1246+
t.Fatalf("failed to create alice session: %v", err)
1247+
}
1248+
defer func() { _ = aliceSession.Close() }()
1249+
1250+
if err := aliceSession.Start("sub restrictedtopic -a alice,bob -c"); err != nil {
1251+
t.Fatalf("failed to start alice sub: %v", err)
1252+
}
1253+
1254+
time.Sleep(100 * time.Millisecond)
1255+
1256+
// Charlie tries to publish to alice's restricted topic - should be denied
1257+
output, err := charlie.RunCommandWithStdin(charlieClient, "pub alice/restrictedtopic -c", "unauthorized message")
1258+
if err != nil {
1259+
t.Logf("charlie pub completed with error (expected): %v", err)
1260+
}
1261+
1262+
// Charlie should get access denied or the message should not be delivered
1263+
if strings.Contains(output, "access denied") {
1264+
t.Logf("charlie correctly received access denied")
1265+
} else {
1266+
t.Logf("charlie output: %q (access control may work differently)", output)
1267+
}
1268+
}
1269+
1270+
func TestPubSub_MultipleSubscribers(t *testing.T) {
1271+
server := NewTestSSHServer(t)
1272+
defer server.Shutdown()
1273+
1274+
user := GenerateUser("alice")
1275+
RegisterUserWithServer(server, user)
1276+
1277+
pubClient, err := user.NewClient()
1278+
if err != nil {
1279+
t.Fatalf("failed to connect publisher: %v", err)
1280+
}
1281+
defer func() { _ = pubClient.Close() }()
1282+
1283+
sub1Client, err := user.NewClient()
1284+
if err != nil {
1285+
t.Fatalf("failed to connect subscriber 1: %v", err)
1286+
}
1287+
defer func() { _ = sub1Client.Close() }()
1288+
1289+
sub2Client, err := user.NewClient()
1290+
if err != nil {
1291+
t.Fatalf("failed to connect subscriber 2: %v", err)
1292+
}
1293+
defer func() { _ = sub2Client.Close() }()
1294+
1295+
sub3Client, err := user.NewClient()
1296+
if err != nil {
1297+
t.Fatalf("failed to connect subscriber 3: %v", err)
1298+
}
1299+
defer func() { _ = sub3Client.Close() }()
1300+
1301+
// Start three subscribers
1302+
sub1Session, err := sub1Client.NewSession()
1303+
if err != nil {
1304+
t.Fatalf("failed to create sub1 session: %v", err)
1305+
}
1306+
defer func() { _ = sub1Session.Close() }()
1307+
1308+
sub1Stdout, err := sub1Session.StdoutPipe()
1309+
if err != nil {
1310+
t.Fatalf("failed to get sub1 stdout: %v", err)
1311+
}
1312+
1313+
if err := sub1Session.Start("sub fanout -c"); err != nil {
1314+
t.Fatalf("failed to start sub1: %v", err)
1315+
}
1316+
1317+
sub2Session, err := sub2Client.NewSession()
1318+
if err != nil {
1319+
t.Fatalf("failed to create sub2 session: %v", err)
1320+
}
1321+
defer func() { _ = sub2Session.Close() }()
1322+
1323+
sub2Stdout, err := sub2Session.StdoutPipe()
1324+
if err != nil {
1325+
t.Fatalf("failed to get sub2 stdout: %v", err)
1326+
}
1327+
1328+
if err := sub2Session.Start("sub fanout -c"); err != nil {
1329+
t.Fatalf("failed to start sub2: %v", err)
1330+
}
1331+
1332+
sub3Session, err := sub3Client.NewSession()
1333+
if err != nil {
1334+
t.Fatalf("failed to create sub3 session: %v", err)
1335+
}
1336+
defer func() { _ = sub3Session.Close() }()
1337+
1338+
sub3Stdout, err := sub3Session.StdoutPipe()
1339+
if err != nil {
1340+
t.Fatalf("failed to get sub3 stdout: %v", err)
1341+
}
1342+
1343+
if err := sub3Session.Start("sub fanout -c"); err != nil {
1344+
t.Fatalf("failed to start sub3: %v", err)
1345+
}
1346+
1347+
time.Sleep(100 * time.Millisecond)
1348+
1349+
// Publish a single message
1350+
testMessage := "broadcast message"
1351+
_, err = user.RunCommandWithStdin(pubClient, "pub fanout -c", testMessage)
1352+
if err != nil {
1353+
t.Logf("pub completed: %v", err)
1354+
}
1355+
1356+
// All three subscribers should receive the message
1357+
received1 := make([]byte, 100)
1358+
n1, _ := sub1Stdout.Read(received1)
1359+
if !strings.Contains(string(received1[:n1]), testMessage) {
1360+
t.Errorf("subscriber 1 did not receive message, got: %q", string(received1[:n1]))
1361+
}
1362+
1363+
received2 := make([]byte, 100)
1364+
n2, _ := sub2Stdout.Read(received2)
1365+
if !strings.Contains(string(received2[:n2]), testMessage) {
1366+
t.Errorf("subscriber 2 did not receive message, got: %q", string(received2[:n2]))
1367+
}
1368+
1369+
received3 := make([]byte, 100)
1370+
n3, _ := sub3Stdout.Read(received3)
1371+
if !strings.Contains(string(received3[:n3]), testMessage) {
1372+
t.Errorf("subscriber 3 did not receive message, got: %q", string(received3[:n3]))
1373+
}
1374+
}

0 commit comments

Comments
 (0)