|
12 | 12 |
|
13 | 13 | args = parser.parse_args()
|
14 | 14 |
|
| 15 | +import requests |
| 16 | + |
| 17 | +def fetch_metadata(attribute): |
| 18 | + url = f"http://metadata.google.internal/computeMetadata/v1/instance/attributes/{attribute}" |
| 19 | + headers = {"Metadata-Flavor": "Google"} |
| 20 | + try: |
| 21 | + response = requests.get(url, headers=headers, timeout=5) |
| 22 | + response.raise_for_status() |
| 23 | + return response.text |
| 24 | + except Exception as e: |
| 25 | + print(f"Failed to fetch metadata attribute '{attribute}': {e}") |
| 26 | + return "unknown" |
| 27 | + |
| 28 | +machine_type = fetch_metadata("MACHINE_TYPE") |
| 29 | +gcsfuse_version = fetch_metadata("GCSFUSE_VERSION") |
| 30 | + |
15 | 31 | # Load the results file
|
16 | 32 | with open(args.result_file) as f:
|
17 | 33 | try:
|
|
36 | 52 | # Create table if it doesn't exist
|
37 | 53 | schema = [
|
38 | 54 | bigquery.SchemaField("job_name", "STRING"),
|
| 55 | + bigquery.SchemaField("gcsfuse_version", "STRING"), |
| 56 | + bigquery.SchemaField("machine_type", "STRING"), |
39 | 57 | bigquery.SchemaField("start_time", "TIMESTAMP"),
|
40 | 58 | bigquery.SchemaField("file_size", "STRING"),
|
41 | 59 | bigquery.SchemaField("block_size", "STRING"),
|
42 | 60 | bigquery.SchemaField("nrfiles", "INTEGER"),
|
43 | 61 | bigquery.SchemaField("read_bandwidth_MiBps", "FLOAT"),
|
44 | 62 | bigquery.SchemaField("write_bandwidth_MiBps", "FLOAT"),
|
45 | 63 | bigquery.SchemaField("IOPS", "FLOAT"),
|
46 |
| - bigquery.SchemaField("duration_seconds", "FLOAT"), |
| 64 | + bigquery.SchemaField("avg_latency_ms", "FLOAT"), |
47 | 65 | ]
|
48 | 66 |
|
49 | 67 | try:
|
|
59 | 77 | rows = []
|
60 | 78 | for job in data.get("jobs", []):
|
61 | 79 | jobname = job.get("jobname")
|
62 |
| - # Correctly access job options using .get() for nested keys |
63 | 80 | job_options = job.get("job options", {})
|
64 | 81 |
|
65 |
| - # Use get with a default value for each option and handle string conversion |
66 |
| - file_size = job_options.get("filesize", data.get("global options",{}).get("filesize", "unknown")) |
67 |
| - block_size = job_options.get("bs", data.get("global options",{}).get("bs", "unknown")) |
68 |
| - |
69 |
| - # Convert nrfiles to int, handle missing values and potential string values |
70 |
| - nrfiles_str = job_options.get("nrfiles", data.get("global options",{}).get("nrfiles")) |
| 82 | + file_size = job_options.get("filesize", data.get("global options", {}).get("filesize", "unknown")) |
| 83 | + block_size = job_options.get("bs", data.get("global options", {}).get("bs", "unknown")) |
| 84 | + |
| 85 | + nrfiles_str = job_options.get("nrfiles", data.get("global options", {}).get("nrfiles")) |
71 | 86 | nrfiles = int(nrfiles_str) if nrfiles_str and isinstance(nrfiles_str, str) and nrfiles_str.isdigit() else 0
|
72 | 87 |
|
73 |
| - read_bw = job.get("read", {}).get("bw_bytes", 0) / (1024 * 1024) |
74 |
| - write_bw = job.get("write", {}).get("bw_bytes", 0) / (1024 * 1024) |
75 |
| - iops = job.get("read", {}).get("iops", 0.0) + job.get("write", {}).get("iops", 0.0) |
| 88 | + read = job.get("read", {}) |
| 89 | + write = job.get("write", {}) |
| 90 | + |
| 91 | + read_bw = read.get("bw_bytes", 0) / (1024 * 1024) |
| 92 | + write_bw = write.get("bw_bytes", 0) / (1024 * 1024) |
| 93 | + iops = read.get("iops", 0.0) + write.get("iops", 0.0) |
| 94 | + |
| 95 | + read_lat_ns = read.get("lat_ns", {}).get("mean") |
| 96 | + write_lat_ns = write.get("lat_ns", {}).get("mean") |
| 97 | + |
| 98 | + if read_lat_ns is not None and write_lat_ns is not None: |
| 99 | + avg_latency_ms = ((read_lat_ns + write_lat_ns) / 2) / 1_000_000 |
| 100 | + elif read_lat_ns is not None: |
| 101 | + avg_latency_ms = read_lat_ns / 1_000_000 |
| 102 | + elif write_lat_ns is not None: |
| 103 | + avg_latency_ms = write_lat_ns / 1_000_000 |
| 104 | + else: |
| 105 | + avg_latency_ms = 0.0 |
76 | 106 |
|
77 | 107 | rows.append({
|
78 | 108 | "job_name": jobname,
|
| 109 | + "gcsfuse_version": gcsfuse_version, |
| 110 | + "machine_type": machine_type, |
79 | 111 | "start_time": start_time,
|
80 | 112 | "file_size": file_size,
|
81 | 113 | "block_size": block_size,
|
82 | 114 | "nrfiles": nrfiles,
|
83 | 115 | "read_bandwidth_MiBps": read_bw,
|
84 | 116 | "write_bandwidth_MiBps": write_bw,
|
85 | 117 | "IOPS": iops,
|
86 |
| - "duration_seconds": job.get("job_runtime", 0) / 1000, |
| 118 | + "avg_latency_ms": avg_latency_ms, |
87 | 119 | })
|
88 | 120 |
|
89 | 121 | # Insert rows
|
|
0 commit comments