Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
95e772f
add certificates arg
ahirreddy Nov 25, 2017
d05966f
add certificate
ahirreddy Nov 25, 2017
26fd655
certificates
ahirreddy Nov 25, 2017
5127610
verify first
ahirreddy Nov 25, 2017
8d75954
Merge remote-tracking branch 'upstream/master' into container-fast-pull
ahirreddy Feb 9, 2018
55bc69b
Speed up pull by parallelizing layer fetching
ahirreddy Feb 9, 2018
acb6c8c
fix style
ahirreddy Feb 9, 2018
97284c4
zcat
ahirreddy Feb 9, 2018
2fcc700
disable v2
ahirreddy Feb 12, 2018
fcad3e6
WIP2
ahirreddy Feb 12, 2018
558bf46
revert stuff
ahirreddy Feb 12, 2018
af00d7f
require v2.2 image
ahirreddy Feb 12, 2018
bd5ead3
allow v2
ahirreddy Feb 13, 2018
5dc1ab0
pull individual layers
ahirreddy Feb 20, 2018
a5117e1
[PROD-17859] Properly handle opaque whiteout entries in Docker layer …
JoshRosen Sep 22, 2018
5e6e83e
make it azure compatible
May 31, 2019
68381ed
Merge pull request #5 from databricks/zihengl
May 31, 2019
02fd12f
Add the ability to accept multiple cert tuples like the description s…
zreardond Jul 19, 2019
8cc2383
Merge remote-tracking branch 'upstream/master' into python-3-compat
ahirreddy Jun 1, 2020
74884be
Fix missing dependency
ahirreddy Jun 6, 2020
47f2138
remove outdated concurrent library
ahirreddy Jun 12, 2020
d48d058
remove subpar dependency
ahirreddy Jun 18, 2020
53482a6
Remove 2GiB layer size limit by writing in chunks
kevinqian-db Mar 14, 2024
20acfa3
Support chunked layer upload if layer size too large
kevinqian-db Mar 15, 2024
34592d0
Merge pull request #8 from databricks/kevinqian-db/fix_2gib_limit
kevinqian-db May 2, 2024
dd11af6
.
gasblaster Jul 11, 2024
611e74f
.
gasblaster Jul 17, 2024
2e97268
Merge pull request #9 from databricks/gasblaster/2024-07-11_identity-…
gasblaster Jul 18, 2024
8fee046
Change http chunk size from 2e9 to 2e7 bytes
gasblaster Mar 25, 2025
6d1eead
Merge pull request #10 from databricks/update-http-chunk-size
gasblaster Mar 25, 2025
1827af3
Retry on socket timeout (#12)
gasblaster Mar 26, 2025
1cd0c9f
Set chunk size to 2GB (#13)
gasblaster Mar 28, 2025
0802a55
Make push chunk size configurable (#14)
kevinqian-db Apr 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,49 +29,47 @@ py_library(
],
)

load("@subpar//:subpar.bzl", "par_binary")

par_binary(
py_binary(
name = "appender",
srcs = ["tools/docker_appender_.py"],
main = "tools/docker_appender_.py",
visibility = ["//visibility:public"],
deps = [":containerregistry"],
)

par_binary(
py_binary(
name = "puller",
srcs = ["tools/fast_puller_.py"],
main = "tools/fast_puller_.py",
visibility = ["//visibility:public"],
deps = [":containerregistry"],
)

par_binary(
py_binary(
name = "flatten",
srcs = ["tools/fast_flatten_.py"],
main = "tools/fast_flatten_.py",
visibility = ["//visibility:public"],
deps = [":containerregistry"],
)

par_binary(
py_binary(
name = "importer",
srcs = ["tools/fast_importer_.py"],
main = "tools/fast_importer_.py",
visibility = ["//visibility:public"],
deps = [":containerregistry"],
)

par_binary(
py_binary(
name = "pusher",
srcs = ["tools/fast_pusher_.py"],
main = "tools/fast_pusher_.py",
visibility = ["//visibility:public"],
deps = [":containerregistry"],
)

par_binary(
py_binary(
name = "digester",
srcs = ["tools/image_digester_.py"],
main = "tools/image_digester_.py",
Expand Down Expand Up @@ -119,3 +117,14 @@ sh_test(
":testenv.sh",
],
)

py_test(
name = "client_v2_2_unit_tests",
size = "large",
srcs = [
"client_v2_2_unit_tests.py",
":containerregistry",
],
main = "client_v2_2_unit_tests.py",
)

29 changes: 27 additions & 2 deletions client/docker_creds_.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,29 @@ def suffix(self):
p = self.password.encode('utf8')
return base64.b64encode(u + b':' + p).decode('utf8')

class IdentityToken(SchemeProvider):
"""Implementation for providing ID token credentials."""

def __init__(self, token):
super(IdentityToken, self).__init__('Basic')
self._password = token

@property
def username(self):
# MSFT for some reason requires the user name to be set to this value when using an identity token
# https://learn.microsoft.com/en-us/azure/container-registry/container-registry-authentication?tabs=azure-cli
return '00000000-0000-0000-0000-000000000000'

@property
def password(self):
return self._password

@property
def suffix(self):
u = self.username.encode('utf8')
p = self.password.encode('utf8')
return base64.b64encode(u + b':' + p).decode('utf8')


_USERNAME = '_token'

Expand Down Expand Up @@ -278,14 +301,16 @@ def Resolve(self, name):
for form in _FORMATS:
if form % name.registry in auths:
entry = auths[form % name.registry]
if 'auth' in entry:
if 'identitytoken' in entry:
token = entry['identitytoken']
return IdentityToken(token)
elif 'auth' in entry:
decoded = base64.b64decode(entry['auth']).decode('utf8')
username, password = decoded.split(':', 1)
return Basic(username, password)
elif 'username' in entry and 'password' in entry:
return Basic(entry['username'], entry['password'])
else:
# TODO(user): Support identitytoken
# TODO(user): Support registrytoken
raise Exception(
'Unsupported entry in "auth" section of Docker config: ' +
Expand Down
8 changes: 6 additions & 2 deletions client/v2_2/docker_http_.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ def Request(self,
method = None,
body = None,
content_type = None,
accepted_mimes = None
accepted_mimes = None,
additional_headers = {}
):
"""Wrapper containing much of the boilerplate REST logic for Registry calls.

Expand All @@ -347,7 +348,8 @@ def Request(self,
body: the body to pass into the PUT request (or None for GET)
content_type: the mime-type of the request (or None for JSON).
content_type is ignored when body is None.
accepted_mimes: the list of acceptable mime-types
accepted_mimes: the list of acceptable mime-types.
additional_headers: additional headers to include in the request.

Raises:
BadStateException: an unexpected internal state has been encountered.
Expand Down Expand Up @@ -382,6 +384,8 @@ def Request(self,
if method in ('POST', 'PUT') and not body:
headers['content-length'] = '0'

headers.update(additional_headers)

resp, content = self._transport.request(
url, method, body=body, headers=headers)

Expand Down
33 changes: 28 additions & 5 deletions client/v2_2/docker_image_.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,18 @@ def is_compressed(name):
return name[0:2] == b'\x1f\x8b'


# Python earlier than 3.7 has a gzip.GzipFile bug that does not support writing
# content longer than 2^31 bytes. To work around this, we write the content in
# smaller chunks if exceed size limit.
def _write_large_content_to_zipped_file(zipped, content, chunk_size=2**31-1):
if len(content) > chunk_size:
# Write the content in chunks
for i in range(0, len(content), chunk_size):
zipped.write(content[i:i+chunk_size])
else:
zipped.write(content)


class FromTarball(DockerImage):
"""This decodes the image tarball output of docker_build for upload."""

Expand Down Expand Up @@ -458,7 +470,7 @@ def _content(self,
zipped = gzip.GzipFile(
mode='wb', compresslevel=self._compresslevel, fileobj=buf)
try:
zipped.write(content)
_write_large_content_to_zipped_file(zipped, content)
finally:
zipped.close()
content = buf.getvalue()
Expand Down Expand Up @@ -799,18 +811,18 @@ def __exit__(self, unused_type, unused_value, unused_traceback):
pass


def _in_whiteout_dir(fs, name):
def _in_whiteout_dir(fs, opaque_whiteouts, name):
while name:
dirname = os.path.dirname(name)
if name == dirname:
break
if fs.get(dirname):
if fs.get(dirname) or dirname in opaque_whiteouts:
return True
name = dirname
return False


_WHITEOUT_PREFIX = '.wh.'
_OPAQUE_WHITEOUT_FILENAME = '.wh..wh..opq'


def extract(image, tar):
Expand All @@ -824,17 +836,27 @@ def extract(image, tar):
# to whether they are a tombstone or not.
fs = {}

opaque_whiteouts_in_higher_layers = set()

# Walk the layers, topmost first and add files. If we've seen them in a
# higher layer then we skip them
for layer in image.diff_ids():
buf = io.BytesIO(image.uncompressed_layer(layer))
with tarfile.open(mode='r:', fileobj=buf) as layer_tar:
opaque_whiteouts_in_this_layer = []
for tarinfo in layer_tar:
# If we see a whiteout file, then don't add anything to the tarball
# but ensure that any lower layers don't add a file with the whited
# out name.
basename = os.path.basename(tarinfo.name)
dirname = os.path.dirname(tarinfo.name)

# If we see an opaque whiteout file, then don't add anything to the
# tarball but ensure that any lower layers don't add files or
# directories which are siblings of the whiteout file.
if basename == _OPAQUE_WHITEOUT_FILENAME:
opaque_whiteouts_in_this_layer.append(dirname)

tombstone = basename.startswith(_WHITEOUT_PREFIX)
if tombstone:
basename = basename[len(_WHITEOUT_PREFIX):]
Expand All @@ -846,7 +868,7 @@ def extract(image, tar):
continue

# Check for a whited out parent directory
if _in_whiteout_dir(fs, name):
if _in_whiteout_dir(fs, opaque_whiteouts_in_higher_layers, name):
continue

# Mark this file as handled by adding its name.
Expand All @@ -858,3 +880,4 @@ def extract(image, tar):
tar.addfile(tarinfo, fileobj=layer_tar.extractfile(tarinfo))
else:
tar.addfile(tarinfo, fileobj=None)
opaque_whiteouts_in_higher_layers.update(opaque_whiteouts_in_this_layer)
72 changes: 70 additions & 2 deletions client/v2_2/docker_session_.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
import six.moves.urllib.parse


# 200 MB chunk balances performance in poor networking conditions. Should only be used for slower registries.
# We need to use 2GB for now for CMv2 to avoid hitting the partial upload api
# GCR registry unexpectedly drops partial upload connections (us-central1-docker.pkg.dev)
UPLOAD_CHUNK_SIZE_MAX = int(2e9)
UPLOAD_CHUNK_SIZE_MIN = int(2e7)


def _tag_or_digest(name):
if isinstance(name, docker_name.Tag):
return name.tag
Expand All @@ -48,7 +55,8 @@ def __init__(self,
creds,
transport,
mount = None,
threads = 1):
threads = 1,
chunk_size = UPLOAD_CHUNK_SIZE_MAX):
"""Constructor.

If multiple threads are used, the caller *must* ensure that the provided
Expand All @@ -70,6 +78,7 @@ def __init__(self,
docker_http.PUSH)
self._mount = mount
self._threads = threads
self._chunk_size = chunk_size

def _scheme_and_host(self):
return '{scheme}://{registry}'.format(
Expand Down Expand Up @@ -150,10 +159,61 @@ def _put_upload(self, image, digest):
method='PUT',
body=self._get_blob(image, digest),
accepted_codes=[six.moves.http_client.CREATED])

# pylint: disable=missing-docstring
def _patch_chunked_upload_image_body(self, image_body, digest):
if len(image_body) == 0:
raise ValueError('Empty image body')

# See https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-a-blob-in-chunks

mounted, location = self._start_upload(digest, self._mount)

if mounted:
logging.info('Layer %s mounted.', digest)
return

location = self._get_absolute_url(location)

# Upload the content in chunks. Invoked at least once to get the response.
for i in range(0, len(image_body), self._chunk_size):
chunk = image_body[i:i + self._chunk_size]
chunk_start, chunk_end_inclusive = i, i + len(chunk) - 1
logging.info('Pushing chunk(%d) for layer %s', i, digest)
resp, unused_content = self._transport.Request(
location,
method='PATCH',
body=chunk,
content_type='application/octet-stream',
additional_headers={
'Content-Range': '{start}-{end}'.format(start=chunk_start, end=chunk_end_inclusive)
},
accepted_codes=[
six.moves.http_client.NO_CONTENT, six.moves.http_client.ACCEPTED,
six.moves.http_client.CREATED
])
# Need to use the new location in the response.
location = self._get_absolute_url(resp.get('location'))

location = self._add_digest(resp['location'], digest)
location = self._get_absolute_url(location)
self._transport.Request(
location,
method='PUT',
body=None,
accepted_codes=[six.moves.http_client.CREATED])

# pylint: disable=missing-docstring
def _patch_upload(self, image,
digest):
image_body = self._get_blob(image, digest)
# When the layer is too large, registry might reject.
# In this case, we need to do chunk upload.
if len(image_body) > self._chunk_size:
logging.info('Uploading layer %s in chunks.', digest)
self._patch_chunked_upload_image_body(image_body, digest)
return

mounted, location = self._start_upload(digest, self._mount)

if mounted:
Expand All @@ -165,7 +225,7 @@ def _patch_upload(self, image,
resp, unused_content = self._transport.Request(
location,
method='PATCH',
body=self._get_blob(image, digest),
body=image_body,
content_type='application/octet-stream',
accepted_codes=[
six.moves.http_client.NO_CONTENT, six.moves.http_client.ACCEPTED,
Expand Down Expand Up @@ -199,6 +259,14 @@ def _put_blob(self, image, digest):
# POST /v2/<name>/blobs/uploads/ (no body*)
# PATCH /v2/<name>/blobs/uploads/<uuid> (full body)
# PUT /v2/<name>/blobs/uploads/<uuid> (no body)
# self._patch_upload(image, digest)
#
# When the layer is too large (> INT_MAX), we need to do chunk upload.
# POST /v2/<name>/blobs/uploads/ (no body*)
# PATCH /v2/<name>/blobs/uploads/<uuid> (body chunk 1)
# PATCH /v2/<name>/blobs/uploads/<uuid> (body chunk ...)
# PUT /v2/<name>/blobs/uploads/<uuid> (no body)
# self._patch_chunked_upload_image_body(image_body, digest)
#
# * We attempt to perform a cross-repo mount if any repositories are
# specified in the "mount" parameter. This does a fast copy from a
Expand Down
7 changes: 6 additions & 1 deletion client/v2_2/save_.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ def tarball(name, image,
def fast(image,
directory,
threads = 1,
cache_directory = None):
cache_directory = None,
first_layer = 0):
"""Produce a FromDisk compatible file layout under the provided directory.

After calling this, the following filesystem will exist:
Expand Down Expand Up @@ -227,6 +228,10 @@ def valid(cached_layer, digest):
idx = 0
layers = []
for blob in reversed(image.fs_layers()):
if idx < first_layer:
idx += 1
continue

# Create a local copy
layer_name = os.path.join(directory, '%03d.tar.gz' % idx)
digest_name = os.path.join(directory, '%03d.sha256' % idx)
Expand Down
Loading