diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 321851210e..c33f060c52 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -116,6 +116,7 @@ def get_additional_fsspec_call_kwargs(protocol: typing.Union[str, tuple], method if method_name == "put" and protocol in ["s3", "gs"]: kwargs["chunksize"] = _WRITE_SIZE_CHUNK_BYTES + kwargs["skip_instance_cache"] = True return kwargs @@ -212,9 +213,15 @@ def get_filesystem( s3kwargs.update(kwargs) return fsspec.filesystem(protocol, **s3kwargs) # type: ignore elif protocol == "gs": + kwargs["skip_instance_cache"] = True + print(f"skip_instance_cacheskip_instance_cacheskip_instance_cache {kwargs}", flush=True) + if anonymous: kwargs["token"] = _ANON - return fsspec.filesystem(protocol, **kwargs) # type: ignore + fs = fsspec.filesystem(protocol, **kwargs) + print(fs, flush=True) + print("kwargs", kwargs, flush=True) + return fs # type: ignore elif protocol == "ftp": kwargs.update(fsspec.implementations.ftp.FTPFileSystem._get_kwargs_from_urls(path)) return fsspec.filesystem(protocol, **kwargs) @@ -224,6 +231,7 @@ def get_filesystem( ) kwargs.update(storage_options) + print("testtttttttttttttttttttttttt", flush=True) return fsspec.filesystem(protocol, **kwargs) async def get_async_filesystem_for_path( @@ -318,6 +326,7 @@ async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwa ) logger.info(f"Getting {from_path} to {to_path}") if isinstance(file_system, AsyncFileSystem): + kwargs["skip_instance_cache"] = "True" dst = await file_system._get(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212 else: dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs)