forked from kubernetes/test-infra
-
Notifications
You must be signed in to change notification settings - Fork 1
/
gcs_async.py
102 lines (86 loc) · 3.38 KB
/
gcs_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import urlparse
import zlib
import google.appengine.ext.ndb as ndb
from google.appengine.api import app_identity
GCS_API_URL = 'https://storage.googleapis.com'
STORAGE_API_URL = 'https://www.googleapis.com/storage/v1/b'
MAX_SIZE = 30 * 1024 ** 2 # 30MiB
@ndb.tasklet
def get(url):
context = ndb.get_context()
headers = {
'accept-encoding': 'gzip, *',
'x-goog-api-version': '2',
}
url_result = urlparse.urlparse(url)
if url_result.netloc.endswith('.googleapis.com'):
auth_token, _ = app_identity.get_access_token(
'https://www.googleapis.com/auth/cloud-platform')
if auth_token:
headers['Authorization'] = 'OAuth %s' % auth_token
for retry in xrange(6):
result = yield context.urlfetch(url, headers=headers)
status = result.status_code
if status == 429 or 500 <= status < 600:
yield ndb.sleep(2 ** retry)
continue
if status in (200, 206):
content = result.content
if result.headers.get('content-encoding') == 'gzip':
dec = zlib.decompressobj(15 | 16)
content = dec.decompress(result.content, MAX_SIZE)
if dec.unconsumed_tail:
logging.warning('only decompressed %d KB, %d KB remain in buffer.',
len(content) / 1024,
len(dec.unconsumed_tail) / 1024)
raise ndb.Return(content)
logging.error("unable to fetch '%s': status code %d", url, status)
raise ndb.Return(None)
def read(path):
"""Asynchronously reads a file from GCS.
NOTE: for large files (>10MB), this may return a truncated response due to
urlfetch API limits. We don't want to read large files currently, so this
is not yet a problem.
Args:
path: the location of the object to read
Returns:
a Future that resolves to the file's data, or None if an error occurred.
"""
url = GCS_API_URL + path
return get(url)
@ndb.tasklet
def listdirs(path):
"""Asynchronously list directories present on GCS.
NOTE: This returns at most 1000 results. The API supports pagination, but
it's not implemented here.
Args:
path: the GCS bucket directory to list subdirectories of
Returns:
a Future that resolves to a list of directories, or None if an error
occurred.
"""
if path[-1] != '/':
path += '/'
assert path[0] != '/'
bucket, prefix = path.split('/', 1)
url = '%s/%s/o?delimiter=/&prefix=%s' % (STORAGE_API_URL, bucket, prefix)
res = yield get(url)
if res is None:
raise ndb.Return(None)
prefixes = json.loads(res).get('prefixes', [])
raise ndb.Return(['%s/%s' % (bucket, prefix) for prefix in prefixes])