From 89962fbdf921026273517f52e629b4014303a139 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Aug 2025 22:54:55 +0800 Subject: [PATCH 1/9] Add skip_instance_cache for gs file system Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 321851210e..959ab0d22f 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -214,6 +214,7 @@ def get_filesystem( elif protocol == "gs": if anonymous: kwargs["token"] = _ANON + kwargs["skip_instance_cache"] = True return fsspec.filesystem(protocol, **kwargs) # type: ignore elif protocol == "ftp": kwargs.update(fsspec.implementations.ftp.FTPFileSystem._get_kwargs_from_urls(path)) From 361ba6055181e8a7a73e896d64042c08f1dddaa2 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Aug 2025 23:19:52 +0800 Subject: [PATCH 2/9] nit Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 959ab0d22f..3872b4d35d 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -212,9 +212,9 @@ def get_filesystem( s3kwargs.update(kwargs) return fsspec.filesystem(protocol, **s3kwargs) # type: ignore elif protocol == "gs": + kwargs["skip_instance_cache"] = True if anonymous: kwargs["token"] = _ANON - kwargs["skip_instance_cache"] = True return fsspec.filesystem(protocol, **kwargs) # type: ignore elif protocol == "ftp": kwargs.update(fsspec.implementations.ftp.FTPFileSystem._get_kwargs_from_urls(path)) From 7b2ab53b00e7fc074bb0e2abe010c718fa9f7041 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Aug 2025 23:31:39 +0800 Subject: [PATCH 3/9] test Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 3872b4d35d..48590e131f 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -212,6 +212,7 @@ def get_filesystem( s3kwargs.update(kwargs) return fsspec.filesystem(protocol, **s3kwargs) # type: ignore elif protocol == "gs": + print("skip_instance_cacheskip_instance_cacheskip_instance_cache", flush=True) kwargs["skip_instance_cache"] = True if anonymous: kwargs["token"] = _ANON From 358df339042cc6a011e2d2d8e8e718624208c909 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Aug 2025 23:41:49 +0800 Subject: [PATCH 4/9] test Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 48590e131f..59520cc8fc 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -116,6 +116,7 @@ def get_additional_fsspec_call_kwargs(protocol: typing.Union[str, tuple], method if method_name == "put" and protocol in ["s3", "gs"]: kwargs["chunksize"] = _WRITE_SIZE_CHUNK_BYTES + kwargs["skip_instance_cache"] = True return kwargs From 5c44719b6d8a12d0a4b0b8da110bba96c7fcde4f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Aug 2025 23:48:39 +0800 Subject: [PATCH 5/9] test Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 59520cc8fc..e11c8eef51 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -213,8 +213,9 @@ def get_filesystem( s3kwargs.update(kwargs) return fsspec.filesystem(protocol, **s3kwargs) # type: ignore elif protocol == "gs": - print("skip_instance_cacheskip_instance_cacheskip_instance_cache", flush=True) kwargs["skip_instance_cache"] = True + print(f"skip_instance_cacheskip_instance_cacheskip_instance_cache {kwargs}", flush=True) + if anonymous: kwargs["token"] = _ANON return fsspec.filesystem(protocol, **kwargs) # type: ignore From 08c181b762e09af20230a8ad16daecbcd478beae Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 9 Aug 2025 00:02:15 +0800 Subject: [PATCH 6/9] test Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index e11c8eef51..a6bb05a8d3 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -228,6 +228,7 @@ def get_filesystem( ) kwargs.update(storage_options) + print("testtttttttttttttttttttttttt", flush=True) return fsspec.filesystem(protocol, **kwargs) async def get_async_filesystem_for_path( From e0aed9988c40ddd8a2bbd863d69f5420215fe060 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 9 Aug 2025 00:24:37 +0800 Subject: [PATCH 7/9] test Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index a6bb05a8d3..d321bfd54b 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -323,6 +323,7 @@ async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwa ) logger.info(f"Getting {from_path} to {to_path}") if isinstance(file_system, AsyncFileSystem): + kwargs["skip_instance_cache"] = True dst = await file_system._get(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212 else: dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs) From f6e6bdd133e11801112ef04d77167d0dafab2f4e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 9 Aug 2025 00:33:03 +0800 Subject: [PATCH 8/9] test Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index d321bfd54b..c8f6dee0eb 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -323,7 +323,7 @@ async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwa ) logger.info(f"Getting {from_path} to {to_path}") if isinstance(file_system, AsyncFileSystem): - kwargs["skip_instance_cache"] = True + kwargs["skip_instance_cache"] = "True" dst = await file_system._get(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212 else: dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs) From 0bdf91dc3e67bb4f44f1d2c997a8bd58b6e2c71b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 9 Aug 2025 00:52:27 +0800 Subject: [PATCH 9/9] test Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index c8f6dee0eb..c33f060c52 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -218,7 +218,10 @@ def get_filesystem( if anonymous: kwargs["token"] = _ANON - return fsspec.filesystem(protocol, **kwargs) # type: ignore + fs = fsspec.filesystem(protocol, **kwargs) + print(fs, flush=True) + print("kwargs", kwargs, flush=True) + return fs # type: ignore elif protocol == "ftp": kwargs.update(fsspec.implementations.ftp.FTPFileSystem._get_kwargs_from_urls(path)) return fsspec.filesystem(protocol, **kwargs)