Releases: nats-io/nats.py
Release v2.12.0
Added
-
Add options to send custom WebSocket headers on connect
custom_headers = { "Authorization": ["Bearer MySecretToken"], "X-Client-ID": ["my-client-123"], "Accept": ["application/json", "text/plain"] } nc = await nats.connect( "ws://localhost:4222", ws_connection_headers=custom_headers )
Fixed
- Fix filter_subject overriding filter_subjects #711
- Fix EOF processing while client is connecting #719
- Fix error when closing ws transport
- Fix test_object_list #728
Improved
Release v2.11.0
Breaking Change
⚠️ KV keys validation now happens by default, in order to opt-out from keys validation need to switch to use thevalidate_keysoption:
-
Added
validate_keysoption to KV methods for controlling key validation #706# Key validation enforced by default await kv.put("valid-key", b"value") # Opt out of validation for backwards compatibility await kv.put("invalid.key.", b"value", validate_keys=False)
Fixed
- Remove risk of getting
coroutine 'Queue.get' was never awaitedwarning #687 - Avoid
RuntimeWarning: coroutine 'Queue.get' was never awaitedin test #691 - Consume the pending messages rather than asserting pending message count #690
- Fix typo when using
subject_transforms#690
Improved
Release v2.10.0
Added
- Added
KeysWithFiltersmethod for key filtering in KV bucket (by @somratdutta in #602)
# Retrieve keys with filters
filtered_keys = await kv.keys(filters=['hello', 'greet'])
print(f'Filtered Keys: {filtered_keys}')- Add
discard_new_per_subjecttoStreamConfig(by @caspervonb in #609)
config = nats.js.api.StreamConfig(
name=stream_name,
discard=nats.js.api.DiscardPolicy.NEW,
discard_new_per_subject=True,
max_msgs_per_subject=100
)
await js.add_stream(config)-
Added support for passing
pathlib.Pathderived types touser_credentials(by @johnweldon in #623) -
Add an
is_ackedproperty tonats.aio.msg.Msg(by @charles-dyfis-net in #672)
Fixed
- Fixed typing of
JetStreamContext.publishby @rijenkii in #605 - Fixed supporting
REQUEST_TIMEOUTstatus code for a batch fetch withno_wait=True(by @diorcety in #618) - Fixed
deliver_subjectin implicit subscription creation (by @m3nowak in #615) - Fixed issue where flusher task stops running (by @debbyglance in #636)
- Fixed service start times to be utc (by @apollo13 in #640)
- Fixed discovered server callback not being awaited (by @caspervonb in #660)
Improved
Release v2.9.1
Bugfix release which includes:
Improved
- Improved server version semver handling (by @robinbowes in #679)
Release v2.9.0
Added
Thank you to @charbonnierg for the community implementation that served as a kick-off point.
import asyncio
import contextlib
import signal
import nats
import nats.micro
async def echo(req) -> None:
"""Echo the request data back to the client."""
await req.respond(req.data)
async def main():
# Define an event to signal when to quit
quit_event = asyncio.Event()
# Attach signal handler to the event loop
loop = asyncio.get_event_loop()
for sig in (signal.Signals.SIGINT, signal.Signals.SIGTERM):
loop.add_signal_handler(sig, lambda *_: quit_event.set())
# Create an async exit stack
async with contextlib.AsyncExitStack() as stack:
# Connect to NATS
nc = await stack.enter_async_context(await nats.connect())
# Add the service
service = await stack.enter_async_context(
await nats.micro.add_service(nc, name="demo_service", version="0.0.1")
)
group = service.add_group(name="demo")
# Add an endpoint to the service
await group.add_endpoint(
name="echo",
handler=echo,
)
# Wait for the quit event
await quit_event.wait()
if __name__ == "__main__":
asyncio.run(main())- Added pagination to JetStream stream info (#594)
nc = await nats.connect()
js = nc.jetstream()
jsm = nc.jsm()
for i in range(300):
await jsm.add_stream(name=f"stream_{i}")
streams_page_1 = await jsm.streams_info(offset=0)
streams_page_1 = await jsm.streams_info(offset=256)Fixed
- Fixed resource leak in JetStream push subscription (#597)
Release v2.8.0
Added
-
Added
publish_asyncmethod to jetstreamack_future = await js.publish_async("foo", b'bar') await ack_future
-
Added the ability to file contents as
user_credentials(#546)
from nats.aio.client import RawCredentials
...
await nats.connect(user_credentials=RawCredentials("<creds file contents as string>"))Fixed
- Fixed a race condition that could trigger error callbacks if a timeout and a response from a request where in-conflict (#573).
Release v2.7.2
-
Added
heartbeatoption to pull subscribersfetchAPIawait sub.fetch(1, timeout=1, heartbeat=0.1)
It can be useful to help distinguish API timeouts from not receiving messages:
try: await sub.fetch(100, timeout=1, heartbeat=0.2) except nats.js.errors.FetchTimeoutError: # timeout due to not receiving messages except asyncio.TimeoutError: # unexpected timeout
-
Added
subject_transformtoadd_consumerawait js.add_stream( name="TRANSFORMS", subjects=["test", "foo"], subject_transform=nats.js.api.SubjectTransform( src=">", dest="transformed.>" ), )
-
Added
subject_transformto sources as well:transformed_source = nats.js.api.StreamSource( name="TRANSFORMS", # The source filters cannot overlap. subject_transforms=[ nats.js.api.SubjectTransform( src="transformed.>", dest="fromtest.transformed.>" ), nats.js.api.SubjectTransform( src="foo.>", dest="fromtest.foo.>" ), ], ) await js.add_stream( name="SOURCING", sources=[transformed_source], )
-
Added
backoffoption toadd_consumerawait js.add_consumer( "events", durable_name="a", max_deliver=3, # has to be greater than length as backoff array backoff=[1, 2], # defined in seconds ack_wait=999999, # ignored once using backoff max_ack_pending=3, filter_subject="events.>", )
-
Added
compressiontoadd_consumerawait js.add_stream( name="COMPRESSION", subjects=["test", "foo"], compression="s2", )
-
Added
metadatatoadd_streamawait js.add_stream( name="META", subjects=["test", "foo"], metadata={'foo': 'bar'}, )
Release v2.7.0
Added
- Added support for multiple filter consumers when using nats-server +v2.10
This is only supported when using thepull_subscribe_bindAPI:
await jsm.add_stream(name="multi", subjects=["a", "b", "c.>"])
cinfo = await jsm.add_consumer(
"multi",
name="myconsumer",
filter_subjects=["a", "b"],
)
psub = await js.pull_subscribe_bind("multi", "myconsumer")
msgs = await psub.fetch(2)
for msg in msgs:
await msg.ack()- Added
subjects_filteroption tojs.stream_info()API
stream = await js.add_stream(name="foo", subjects=["foo.>"])
for i in range(0, 5):
await js.publish("foo.%d" % i, b'A')
si = await js.stream_info("foo", subjects_filter=">")
print(si.state.subjects)
# => {'foo.0': 1, 'foo.1': 1, 'foo.2': 1, 'foo.3': 1, 'foo.4': 1}Changed
- Changed kv.watch default
inactive_thresholdcleanup timeout to be 5 minutes.
It can now be customized as well by passinginactive_thresholdas argument in seconds:
w = await kv.watchall(inactive_threshold=30.0)
- Changed
pull_subscribe_bindfirst argument to be calledconsumerinstead ofdurable
since it also supports ephemeral consumers. This should be backwards compatible.
psub = await js.pull_subscribe_bind(consumer="myconsumer", stream="test")Release v2.6.0
Added
- Added support to ephemeral pull consumers (#412)
Changed
- Changed default max control line to 4K as in the server since v2.2
Fixed
- Fixed ordered consumer implementation not being recreated when consumer deleted (#510)
- Fixed accounting issue pending data which would have caused slow consumers on ordered consumers using
next_msg - Fixed subscribe to missing stream not raising
NotFoundError(#499 )
Full Changelog: v2.5.0...v2.6.0