Skip to content

Commit

Permalink
Fix the e2e test in vineyard operator.
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji committed Sep 3, 2024
1 parent 19badf1 commit f0f43b9
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 10 deletions.
6 changes: 6 additions & 0 deletions k8s/config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ leaderElection:
profiles:
- schedulerName: vineyard-scheduler
plugins:
filter:
disabled:
- name: "*"
preScore:
disabled:
- name: "*"
score:
enabled:
- name: Vineyard
Expand Down
2 changes: 1 addition & 1 deletion k8s/test/e2e/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM ghcr.io/v6d-io/v6d/vineyard-python-dev:latest
FROM ghcr.io/v6d-io/v6d/vineyard-python-dev:latest_x86_64

WORKDIR /

Expand Down
2 changes: 1 addition & 1 deletion k8s/test/e2e/assembly-demo/assembly-job1.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def generate_df(index):
stream = RecordBatchStream.new(vineyard_client)
vineyard_client.persist(stream.id)
print(stream.id)
writer = stream.writer
writer = stream.open_writer(vineyard_client)
total_chunks = 10
for idx in range(total_chunks):
time.sleep(idx)
Expand Down
2 changes: 1 addition & 1 deletion k8s/test/e2e/assembly-demo/assembly-local.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

stream_id = env_dist['STREAM_ID']
stream = vineyard_client.get(stream_id)
reader = stream.reader
reader = stream.open_reader(vineyard_client)

index = 0
global_meta = vineyard.ObjectMeta()
Expand Down
2 changes: 1 addition & 1 deletion k8s/test/e2e/assembly-demo/distributed-job1.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def generate_df(index):

sys.stdout = sys.__stdout__
print(meta.id, flush=True)
writer = stream.writer
writer = stream.open_writer(vineyard_client)
total_chunks = 10
for idx in range(total_chunks):
time.sleep(idx)
Expand Down
2 changes: 1 addition & 1 deletion k8s/test/e2e/assembly-demo/distributed-job2.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def generate_df(index):
tup = vineyard_client.create_metadata(meta)
vineyard_client.persist(tup)

writer = stream.writer
writer = stream.open_writer(vineyard_client)
total_chunks = 10
for idx in range(total_chunks):
time.sleep(idx)
Expand Down
4 changes: 2 additions & 2 deletions k8s/test/e2e/deploy-raw-backup-and-recover/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,15 @@ verify:
- query: |
kubectl get pod -l app=get-local-object -n vineyard-job -oname | \
awk -F '/' '{print $2}' | \
head -n 1 | \
tail -n 1 | \
xargs kubectl logs -n vineyard-job | \
yq e '{"sum": .}' - | \
yq e 'to_entries' -
expected: ../verify/values.yaml
- query: |
kubectl get pod -l app=get-distributed-object -n vineyard-job -oname | \
awk -F '/' '{print $2}' | \
head -n 1 | \
tail -n 1 | \
xargs kubectl logs -n vineyard-job | \
yq e '{"sum": .}' - | \
yq e 'to_entries' -
Expand Down
1 change: 1 addition & 0 deletions k8s/test/e2e/serialize/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ verify:
awk -F '/' '{print $2}' | \
head -n 1 | \
xargs kubectl logs -n vineyard-system | \
grep "test passed" | \
yq e '{"result": .}' - | \
yq e 'to_entries' -
expected: ../verify/serialize.yaml
Expand Down
4 changes: 2 additions & 2 deletions python/vineyard/io/tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_recordbatch_stream(vineyard_client):
total_chunks = 10

def producer(stream: RecordBatchStream, dtypes, produced: List):
writer = stream.writer
writer = stream.open_writer(vineyard_client)
for idx in range(total_chunks):
time.sleep(idx)
chunk = generate_random_dataframe(dtypes, 2) # np.random.randint(10, 100))
Expand All @@ -57,7 +57,7 @@ def producer(stream: RecordBatchStream, dtypes, produced: List):
writer.finish()

def consumer(stream: RecordBatchStream, produced: List):
reader = stream.reader
reader = stream.open_reader(vineyard_client)
index = 0
while True:
try:
Expand Down
33 changes: 32 additions & 1 deletion test/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,6 @@ def run_python_deploy_tests(meta, allocator, endpoints, test_args, with_migratio
flush=True,
)


def run_io_adaptor_tests(meta, allocator, endpoints, test_args):
meta_prefix = 'vineyard_test_%s' % time.time()
metadata_settings = make_metadata_settings(meta, endpoints, meta_prefix)
Expand Down Expand Up @@ -1043,6 +1042,37 @@ def run_io_adaptor_tests(meta, allocator, endpoints, test_args):
flush=True,
)

def run_stream_test(meta, allocator, endpoints, test_args):
meta_prefix = 'vineyard_test_%s' % time.time()
metadata_settings = make_metadata_settings(meta, endpoints, meta_prefix)

with start_vineyardd(
metadata_settings,
['--allocator', allocator],
default_ipc_socket=VINEYARD_CI_IPC_SOCKET,
) as (_, rpc_socket_port):
start_time = time.time()
subprocess.check_call(
[
'pytest',
'-s',
'-vvv',
'--exitfirst',
'--durations=0',
'--log-cli-level',
'DEBUG',
'python/vineyard/io/tests',
*test_args,
'--vineyard-ipc-socket=%s' % VINEYARD_CI_IPC_SOCKET,
'--vineyard-endpoint=localhost:%s' % rpc_socket_port,
],
cwd=os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'),
)
print(
'running python stream %s tests use %s seconds'
% (time.time() - start_time),
flush=True,
)

def run_fuse_test(meta, allocator, endpoints, test_args):
meta_prefix = 'vineyard_test_%s' % time.time()
Expand Down Expand Up @@ -1257,6 +1287,7 @@ def execute_tests(args):

if args.with_io:
run_io_adaptor_tests(args.meta, args.allocator, endpoints, python_test_args)
run_stream_test(args.meta, args.allocator, endpoints, python_test_args)

if args.with_fuse:
run_fuse_test(args.meta, args.allocator, endpoints, python_test_args)
Expand Down

0 comments on commit f0f43b9

Please sign in to comment.