diff --git a/common/tl-files/common.tl b/common/tl-files/common.tl index 7459d8b920..9f802796d7 100644 --- a/common/tl-files/common.tl +++ b/common/tl-files/common.tl @@ -42,6 +42,34 @@ pair {X:Type} {Y:Type} a:X b:Y = Pair X Y; true = True; // this can be used as void type and serialized to empty array in PHP +exactlyOnce.uuid lo:long hi:long = exactlyOnce.Uuid; + +exactlyOnce.prepareRequest persistent_query_uuid:%exactlyOnce.Uuid = exactlyOnce.PersistentRequest; +exactlyOnce.commitRequest persistent_query_uuid:%exactlyOnce.Uuid persistent_slot_uuid:%exactlyOnce.Uuid = exactlyOnce.PersistentRequest; + +tracing.traceID#2f4ac855 lo:long hi:long = tracing.TraceID; + +tracing.traceContext#c463a95c + fields_mask: # + trace_id: %(tracing.TraceID) + parent_id: fields_mask.2?long + source_id: fields_mask.3?string + + // status = reserved_status_0 | (reserved_status_1 << 1) + // status == 0 - drop + // status == 1 - record + // status == 2 - defer + reserved_status_0: fields_mask.0?true + reserved_status_1: fields_mask.1?true + + reserved_level_0: fields_mask.4?true + reserved_level_1: fields_mask.5?true + reserved_level_2: fields_mask.6?true + + debug_flag: fields_mask.7?true + + = tracing.TraceContext; + stat#9d56e6b2 %(Dictionary string) = Stat; rpcInvokeReqExtra#f3ef81a9 {flags:#} @@ -62,6 +90,9 @@ rpcInvokeReqExtra#f3ef81a9 {flags:#} supported_compression_version:flags.25?int // note, that client support compression, to possibly compress answers random_delay:flags.26?double // starting query would be delayed by random number, not grater than given return_view_number:flags.27?%True // Barsic related parameter: return view number in response + persistent_query: flags.28?exactlyOnce.PersistentRequest + trace_context:flags.29?%(tracing.TraceContext) + execution_context:flags.30?string // contains serialized execution context. = RpcInvokeReqExtra flags; rpcReqResultExtra#c5011709 {flags:#} diff --git a/common/tl/constants/common.h b/common/tl/constants/common.h index fd435d5f16..1d4e5242b9 100644 --- a/common/tl/constants/common.h +++ b/common/tl/constants/common.h @@ -45,6 +45,17 @@ namespace vk { namespace tl { namespace common { +namespace tracing::traceContext { +static constexpr uint32_t return_reserved_status_0 = 1 << 0; +static constexpr uint32_t return_reserved_status_1 = 1 << 1; +static constexpr uint32_t parent_id = 1 << 2; +static constexpr uint32_t source_id = 1 << 3; +static constexpr uint32_t return_reserved_level_0 = 1 << 4; +static constexpr uint32_t return_reserved_level_1 = 1 << 5; +static constexpr uint32_t return_reserved_level_2 = 1 << 6; +static constexpr uint32_t return_debug = 1 << 7; +} // namespace tracing::traceContext + namespace rpc_invoke_req_extra_flags { inline constexpr uint32_t return_binlog_pos = 1U << 0U; inline constexpr uint32_t return_binlog_time = 1U << 1U; @@ -63,7 +74,9 @@ inline constexpr uint32_t supported_compression_version = 1U << 25U; inline constexpr uint32_t random_delay = 1U << 26U; inline constexpr uint32_t return_view_number = 1U << 27U; inline constexpr uint32_t persistent_query = 1U << 28U; -inline constexpr uint32_t ALL = 0x1ebd00df; +inline constexpr uint32_t trace_context = 1U << 29U; +inline constexpr uint32_t execution_context = 1U << 30U; +inline constexpr uint32_t ALL = 0x7ebd00df; } // namespace rpc_invoke_req_extra_flags namespace rpc_req_result_extra_flags { diff --git a/common/tl/query-header.cpp b/common/tl/query-header.cpp index 854944541f..0cf617c29d 100644 --- a/common/tl/query-header.cpp +++ b/common/tl/query-header.cpp @@ -90,6 +90,24 @@ static int tl_fetch_query_flags(tl_query_header_t* header) { return -1; } } + if (flags & flag::persistent_query) { + header->persistent_query.fetch(); + if (tl_fetch_error()) { + return -1; + } + } + if (flags & flag::trace_context) { + header->trace_context.fetch(); + if (tl_fetch_error()) { + return -1; + } + } + if (flags & flag::execution_context) { + vk::tl::fetch_string(header->execution_context); + if (tl_fetch_error()) { + return -1; + } + } return 0; } @@ -318,6 +336,15 @@ void tl_store_header(const tl_query_header_t* header) { if (flags & flag::random_delay) { tl_store_double(header->random_delay); } + if (flags & flag::persistent_query) { + header->persistent_query.store(); + } + if (flags & flag::trace_context) { + header->trace_context.store(); + } + if (flags & flag::execution_context) { + vk::tl::store_string(header->execution_context); + } } else if (header->actor_id) { tl_store_int(TL_RPC_DEST_ACTOR); tl_store_long(header->actor_id); diff --git a/common/tl/query-header.h b/common/tl/query-header.h index 9b11476e7b..d4ac039a4f 100644 --- a/common/tl/query-header.h +++ b/common/tl/query-header.h @@ -9,6 +9,7 @@ #include "common/pid.h" #include "common/tl/constants/common.h" +#include "common/tl/tl-types.h" enum class result_header_type { result, error, wrapped_error }; @@ -29,6 +30,9 @@ struct tl_query_header_t { std::vector string_forward_keys; int supported_compression_version{}; double random_delay{}; + exactlyOnce::PersistentRequest persistent_query{}; + tracing::traceContext trace_context{}; + std::string execution_context; }; struct tl_query_answer_header_t { diff --git a/common/tl/tl-types.h b/common/tl/tl-types.h new file mode 100644 index 0000000000..8a4a9af401 --- /dev/null +++ b/common/tl/tl-types.h @@ -0,0 +1,207 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2026 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include +#include +#include + +#include "common/tl/constants/common.h" +#include "common/tl/fetch.h" +#include "common/tl/parse.h" +#include "common/tl/store.h" + +namespace exactlyOnce { + +struct uuid final { + int64_t lo{}; + int64_t hi{}; + + bool fetch() { + lo = tl_fetch_long(); + if (tl_fetch_error()) { + return false; + } + hi = tl_fetch_long(); + return !tl_fetch_error(); + } + + void store() const { + tl_store_long(lo); + tl_store_long(hi); + } +}; + +struct prepareRequest final { + exactlyOnce::uuid persistent_query_uuid{}; + + bool fetch() { + return persistent_query_uuid.fetch(); + } + + void store() const { + persistent_query_uuid.store(); + } +}; + +struct commitRequest final { + exactlyOnce::uuid persistent_query_uuid{}; + exactlyOnce::uuid persistent_slot_uuid{}; + + bool fetch() { + return persistent_query_uuid.fetch() && persistent_slot_uuid.fetch(); + } + + void store() const { + persistent_query_uuid.store(); + persistent_slot_uuid.store(); + } +}; + +class PersistentRequest final { + static constexpr int32_t PREPARE_REQUEST_MAGIC = 0xc8d7'1b66; + static constexpr int32_t COMMIT_REQUEST_MAGIC = 0x6836'b983; + +public: + std::variant request; + + bool fetch() { + const int32_t magic{tl_fetch_int()}; + if (tl_fetch_error()) { + return false; + } + if (exactlyOnce::prepareRequest prepare_request{}; magic == PREPARE_REQUEST_MAGIC && prepare_request.fetch()) { + request.emplace(prepare_request); + return true; + } + if (exactlyOnce::commitRequest commit_request{}; magic == COMMIT_REQUEST_MAGIC && commit_request.fetch()) { + request.emplace(commit_request); + return true; + } + return false; + } + + void store() const { + std::visit( + [](const auto& value) { + using value_t = std::decay_t; + if constexpr (std::is_same_v) { + tl_store_int(PREPARE_REQUEST_MAGIC); + } else if constexpr (std::is_same_v) { + tl_store_int(COMMIT_REQUEST_MAGIC); + } else { + // condition is strange because of c++17 compiler. It is equivalent to `false` + static_assert(sizeof(value_t) && false, "exactlyOnce::PersistentRequest only supports prepareRequest and commitRequest"); + } + value.store(); + }, + request); + } +}; +} // namespace exactlyOnce + +namespace tracing { + +struct traceID final { + int64_t lo{}; + int64_t hi{}; + + bool fetch() { + lo = tl_fetch_long(); + if (tl_fetch_error()) { + return false; + } + hi = tl_fetch_long(); + return !tl_fetch_error(); + } + + void store() const { + tl_store_long(lo); + tl_store_long(hi); + } +}; + +class traceContext final { + static constexpr uint32_t RETURN_RESERVED_STATUS_0_FLAG = vk::tl::common::tracing::traceContext::return_reserved_status_0; + static constexpr uint32_t RETURN_RESERVED_STATUS_1_FLAG = vk::tl::common::tracing::traceContext::return_reserved_status_1; + static constexpr uint32_t PARENT_ID_FLAG = vk::tl::common::tracing::traceContext::parent_id; + static constexpr uint32_t SOURCE_ID_FLAG = vk::tl::common::tracing::traceContext::source_id; + static constexpr uint32_t RETURN_RESERVED_LEVEL_0_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_0; + static constexpr uint32_t RETURN_RESERVED_LEVEL_1_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_1; + static constexpr uint32_t RETURN_RESERVED_LEVEL_2_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_2; + static constexpr uint32_t RETURN_DEBUG_FLAG = vk::tl::common::tracing::traceContext::return_debug; + +public: + tracing::traceID trace_id{}; + std::optional opt_parent_id; + std::optional opt_source_id; + + // status = reserved_status_0 | (reserved_status_1 << 1) + // status == 0 - drop + // status == 1 - record + // status == 2 - defer + bool reserved_status_0{}; + bool reserved_status_1{}; + + bool reserved_level_0{}; + bool reserved_level_1{}; + bool reserved_level_2{}; + + bool debug_flag{}; + + bool fetch() { + const int32_t fields_mask{tl_fetch_int()}; + bool ok{!tl_fetch_error()}; + + ok = ok && trace_id.fetch(); + if (ok && static_cast(fields_mask & PARENT_ID_FLAG)) { + opt_parent_id.emplace(tl_fetch_long()); + ok = ok && !tl_fetch_error(); + } + if (ok && static_cast(fields_mask & SOURCE_ID_FLAG)) { + std::string value; + vk::tl::fetch_string(value); + opt_source_id.emplace(std::move(value)); + ok = ok && !tl_fetch_error(); + } + + reserved_status_0 = static_cast(fields_mask & RETURN_RESERVED_STATUS_0_FLAG); + reserved_status_1 = static_cast(fields_mask & RETURN_RESERVED_STATUS_1_FLAG); + reserved_level_0 = static_cast(fields_mask & RETURN_RESERVED_LEVEL_0_FLAG); + reserved_level_1 = static_cast(fields_mask & RETURN_RESERVED_LEVEL_1_FLAG); + reserved_level_2 = static_cast(fields_mask & RETURN_RESERVED_LEVEL_2_FLAG); + debug_flag = static_cast(fields_mask & RETURN_DEBUG_FLAG); + + return ok; + } + + void store() const { + tl_store_int(get_flags()); + trace_id.store(); + if (opt_parent_id) { + tl_store_long(opt_parent_id.value()); + } + if (opt_source_id) { + vk::tl::store_string(opt_source_id.value()); + } + } + + uint32_t get_flags() const noexcept { + uint32_t flags{}; + flags |= static_cast(reserved_status_0); + flags |= static_cast(reserved_status_1) << 1; + flags |= static_cast(reserved_level_0) << 4; + flags |= static_cast(reserved_level_1) << 5; + flags |= static_cast(reserved_level_2) << 6; + flags |= static_cast(debug_flag) << 7; + + flags |= static_cast(opt_parent_id.has_value()) << 2; + flags |= static_cast(opt_source_id.has_value()) << 3; + return flags; + } +}; +} // namespace tracing diff --git a/runtime-light/server/rpc/init-functions.cpp b/runtime-light/server/rpc/init-functions.cpp index 01f746b301..c11d911f9d 100644 --- a/runtime-light/server/rpc/init-functions.cpp +++ b/runtime-light/server/rpc/init-functions.cpp @@ -8,7 +8,9 @@ #include #include #include +#include #include +#include #include "runtime-common/core/allocator/script-allocator.h" #include "runtime-common/core/runtime-core.h" @@ -38,6 +40,9 @@ constexpr std::string_view RPC_EXTRA_INT_FORWARD = "RPC_EXTRA_INT_FORWARD"; constexpr std::string_view RPC_EXTRA_CUSTOM_TIMEOUT_MS = "RPC_EXTRA_CUSTOM_TIMEOUT_MS"; constexpr std::string_view RPC_EXTRA_SUPPORTED_COMPRESSION_VERSION = "RPC_EXTRA_SUPPORTED_COMPRESSION_VERSION"; constexpr std::string_view RPC_EXTRA_RANDOM_DELAY = "RPC_EXTRA_RANDOM_DELAY"; +constexpr std::string_view RPC_EXTRA_PERSISTENT_QUERY = "RPC_EXTRA_PERSISTENT_QUERY"; +constexpr std::string_view RPC_EXTRA_TRACE_CONTEXT = "RPC_EXTRA_TRACE_CONTEXT"; +constexpr std::string_view RPC_EXTRA_EXECUTION_CONTEXT = "RPC_EXTRA_EXECUTION_CONTEXT"; void process_rpc_invoke_req_extra(const tl::rpcInvokeReqExtra& extra, PhpScriptBuiltInSuperGlobals& superglobals) noexcept { if (extra.opt_wait_binlog_pos.has_value()) { @@ -81,6 +86,96 @@ void process_rpc_invoke_req_extra(const tl::rpcInvokeReqExtra& extra, PhpScriptB auto random_delay{*extra.opt_random_delay}; superglobals.v$_SERVER.set_value(string{RPC_EXTRA_RANDOM_DELAY.data(), RPC_EXTRA_RANDOM_DELAY.size()}, random_delay.value); } + if (extra.opt_persistent_query) { + const auto& persistent_query{*extra.opt_persistent_query}; + std::visit( + [&superglobals](const auto& value) noexcept { + using value_t = std::remove_cvref_t; + + constexpr std::string_view persistent_query_uuid_key_name{"persistent_query_uuid"}; + constexpr std::string_view persistent_slot_uuid_key_name{"persistent_slot_uuid"}; + constexpr std::string_view lo_key_name{"lo"}; + constexpr std::string_view hi_key_name{"hi"}; + + if constexpr (std::is_same_v) { + array persistent_query_uuid{array_size{2, false}}; + persistent_query_uuid.emplace_value(string{lo_key_name.data(), static_cast(lo_key_name.size())}, + static_cast(value.persistent_query_uuid.lo.value)); + persistent_query_uuid.emplace_value(string{hi_key_name.data(), static_cast(hi_key_name.size())}, + static_cast(value.persistent_query_uuid.hi.value)); + + array out{array_size{1, false}}; + out.emplace_value(string{persistent_query_uuid_key_name.data(), static_cast(persistent_query_uuid_key_name.size())}, + std::move(persistent_query_uuid)); + + superglobals.v$_SERVER.set_value(string{RPC_EXTRA_PERSISTENT_QUERY.data(), RPC_EXTRA_PERSISTENT_QUERY.size()}, std::move(out)); + } else if constexpr (std::is_same_v) { + array persistent_query_uuid{array_size{2, false}}; + persistent_query_uuid.emplace_value(string{lo_key_name.data(), static_cast(lo_key_name.size())}, + static_cast(value.persistent_query_uuid.lo.value)); + persistent_query_uuid.emplace_value(string{hi_key_name.data(), static_cast(hi_key_name.size())}, + static_cast(value.persistent_query_uuid.hi.value)); + + array persistent_slot_uuid{array_size{2, false}}; + persistent_slot_uuid.emplace_value(string{lo_key_name.data(), static_cast(lo_key_name.size())}, + static_cast(value.persistent_slot_uuid.lo.value)); + persistent_slot_uuid.emplace_value(string{hi_key_name.data(), static_cast(hi_key_name.size())}, + static_cast(value.persistent_slot_uuid.hi.value)); + + array out{array_size{2, false}}; + out.emplace_value(string{persistent_query_uuid_key_name.data(), static_cast(persistent_query_uuid_key_name.size())}, + std::move(persistent_query_uuid)); + out.emplace_value(string{persistent_slot_uuid_key_name.data(), static_cast(persistent_slot_uuid_key_name.size())}, + std::move(persistent_slot_uuid)); + + superglobals.v$_SERVER.set_value(string{RPC_EXTRA_PERSISTENT_QUERY.data(), RPC_EXTRA_PERSISTENT_QUERY.size()}, std::move(out)); + } else { + static_assert(false, "exactlyOnce::PersistentRequest only supports prepareRequest and commitRequest"); + } + }, + persistent_query.request); + } + if (extra.opt_trace_context) { + const auto& trace_context{*extra.opt_trace_context}; + + constexpr std::string_view fields_mask_key_name{"fields_mask"}; + constexpr std::string_view trace_id_key_name{"trace_id"}; + constexpr std::string_view lo_key_name{"lo"}; + constexpr std::string_view hi_key_name{"hi"}; + constexpr std::string_view parent_id_key_name{"parent_id"}; + constexpr std::string_view source_id_key_name{"source_id"}; + + // + 2 for fields_mask and trace_id, + 1 if there is a parent_id, + 1 if there is a source_id + const int64_t out_size{2 + static_cast(trace_context.opt_parent_id.has_value()) + static_cast(trace_context.opt_source_id.has_value())}; + + array trace_id{array_size{2, false}}; + trace_id.emplace_value(string{lo_key_name.data(), static_cast(lo_key_name.size())}, + static_cast(trace_context.trace_id.lo.value)); + trace_id.emplace_value(string{hi_key_name.data(), static_cast(hi_key_name.size())}, + static_cast(trace_context.trace_id.hi.value)); + + array out{array_size{out_size, false}}; + out.emplace_value(string{fields_mask_key_name.data(), static_cast(fields_mask_key_name.size())}, + static_cast(trace_context.get_flags().value)); + out.emplace_value(string{trace_id_key_name.data(), static_cast(trace_id_key_name.size())}, std::move(trace_id)); + + if (trace_context.opt_parent_id) { + out.emplace_value(string{parent_id_key_name.data(), static_cast(parent_id_key_name.size())}, + static_cast(trace_context.opt_parent_id->value)); + } + if (trace_context.opt_source_id) { + const std::string_view& opt_source_id_value{trace_context.opt_source_id->value}; + out.emplace_value(string{source_id_key_name.data(), static_cast(source_id_key_name.size())}, + string{opt_source_id_value.data(), static_cast(opt_source_id_value.size())}); + } + + superglobals.v$_SERVER.set_value(string{RPC_EXTRA_TRACE_CONTEXT.data(), RPC_EXTRA_TRACE_CONTEXT.size()}, std::move(out)); + } + if (extra.opt_execution_context) { + const std::string_view& execution_context{extra.opt_execution_context->value}; + superglobals.v$_SERVER.set_value(string{RPC_EXTRA_EXECUTION_CONTEXT.data(), RPC_EXTRA_EXECUTION_CONTEXT.size()}, + string{execution_context.data(), static_cast(execution_context.size())}); + } } } // namespace @@ -115,7 +210,7 @@ void init_server(kphp::component::stream&& request_stream, kphp::stl::vector((*invoke_rpc.opt_extra).flags.value)); + superglobals.v$_SERVER.set_value(string{RPC_EXTRA_FLAGS.data(), RPC_EXTRA_FLAGS.size()}, static_cast((*invoke_rpc.opt_extra).get_flags().value)); process_rpc_invoke_req_extra(*invoke_rpc.opt_extra, superglobals); } kphp::log::info("rpc server initialized with: " @@ -127,7 +222,7 @@ void init_server(kphp::component::stream&& request_stream, kphp::stl::vector {:#x}", invoke_rpc.net_pid.get_pid(), invoke_rpc.net_pid.get_port(), invoke_rpc.query_id.value, invoke_rpc.opt_actor_id.has_value() ? (*invoke_rpc.opt_actor_id).value : 0, - invoke_rpc.opt_extra.has_value() ? (*invoke_rpc.opt_extra).flags.value : 0, request_magic.value); + invoke_rpc.opt_extra.has_value() ? (*invoke_rpc.opt_extra).get_flags().value : 0, request_magic.value); } } // namespace kphp::rpc diff --git a/runtime-light/tl/tl-types.cpp b/runtime-light/tl/tl-types.cpp index 46c83bb095..db6912cff6 100644 --- a/runtime-light/tl/tl-types.cpp +++ b/runtime-light/tl/tl-types.cpp @@ -147,30 +147,40 @@ bool CertInfoItem::fetch(tl::fetcher& tlf) noexcept { // ===== RPC ===== bool rpcInvokeReqExtra::fetch(tl::fetcher& tlf) noexcept { + tl::mask flags{}; bool ok{flags.fetch(tlf)}; if (ok && static_cast(flags.value & WAIT_BINLOG_POS_FLAG)) { - ok = ok && opt_wait_binlog_pos.emplace().fetch(tlf); + ok &= opt_wait_binlog_pos.emplace().fetch(tlf); } if (ok && static_cast(flags.value & STRING_FORWARD_KEYS_FLAG)) { - ok = ok && opt_string_forward_keys.emplace().fetch(tlf); + ok &= opt_string_forward_keys.emplace().fetch(tlf); } if (ok && static_cast(flags.value & INT_FORWARD_KEYS_FLAG)) { - ok = ok && opt_int_forward_keys.emplace().fetch(tlf); + ok &= opt_int_forward_keys.emplace().fetch(tlf); } if (ok && static_cast(flags.value & STRING_FORWARD_FLAG)) { - ok = ok && opt_string_forward.emplace().fetch(tlf); + ok &= opt_string_forward.emplace().fetch(tlf); } if (ok && static_cast(flags.value & INT_FORWARD_FLAG)) { - ok = ok && opt_int_forward.emplace().fetch(tlf); + ok &= opt_int_forward.emplace().fetch(tlf); } if (ok && static_cast(flags.value & CUSTOM_TIMEOUT_MS_FLAG)) { - ok = ok && opt_custom_timeout_ms.emplace().fetch(tlf); + ok &= opt_custom_timeout_ms.emplace().fetch(tlf); } if (ok && static_cast(flags.value & SUPPORTED_COMPRESSION_VERSION_FLAG)) { - ok = ok && opt_supported_compression_version.emplace().fetch(tlf); + ok &= opt_supported_compression_version.emplace().fetch(tlf); } if (ok && static_cast(flags.value & RANDOM_DELAY_FLAG)) { - ok = ok && opt_random_delay.emplace().fetch(tlf); + ok &= opt_random_delay.emplace().fetch(tlf); + } + if (ok && static_cast(flags.value & PERSISTENT_QUERY_FLAG)) { + ok &= opt_persistent_query.emplace().fetch(tlf); + } + if (ok && static_cast(flags.value & TRACE_CONTEXT_FLAG)) { + ok &= opt_trace_context.emplace().fetch(tlf); + } + if (ok && static_cast(flags.value & EXECUTION_CONTEXT_FLAG)) { + ok &= opt_execution_context.emplace().fetch(tlf); } return_binlog_pos = static_cast(flags.value & RETURN_BINLOG_POS_FLAG); @@ -184,6 +194,31 @@ bool rpcInvokeReqExtra::fetch(tl::fetcher& tlf) noexcept { return ok; } +tl::mask rpcInvokeReqExtra::get_flags() const noexcept { + tl::mask flags{.value = static_cast(return_binlog_pos)}; + + flags.value |= static_cast(return_binlog_time) << 1; + flags.value |= static_cast(return_pid) << 2; + flags.value |= static_cast(return_request_sizes) << 3; + flags.value |= static_cast(return_failed_subqueries) << 4; + flags.value |= static_cast(return_query_stats) << 6; + flags.value |= static_cast(no_result) << 7; + flags.value |= static_cast(return_view_number) << 27; + + flags.value |= static_cast(opt_wait_binlog_pos.has_value()) << 16; + flags.value |= static_cast(opt_string_forward_keys.has_value()) << 18; + flags.value |= static_cast(opt_int_forward_keys.has_value()) << 19; + flags.value |= static_cast(opt_string_forward.has_value()) << 20; + flags.value |= static_cast(opt_int_forward.has_value()) << 21; + flags.value |= static_cast(opt_custom_timeout_ms.has_value()) << 23; + flags.value |= static_cast(opt_supported_compression_version.has_value()) << 25; + flags.value |= static_cast(opt_random_delay.has_value()) << 26; + flags.value |= static_cast(opt_persistent_query.has_value()) << 28; + flags.value |= static_cast(opt_trace_context.has_value()) << 29; + flags.value |= static_cast(opt_execution_context.has_value()) << 30; + return flags; +} + void rpcReqResultExtra::store(tl::storer& tls) const noexcept { flags.store(tls); if (static_cast(flags.value & BINLOG_POS_FLAG)) { diff --git a/runtime-light/tl/tl-types.h b/runtime-light/tl/tl-types.h index 94fef84975..b297a66e49 100644 --- a/runtime-light/tl/tl-types.h +++ b/runtime-light/tl/tl-types.h @@ -943,6 +943,135 @@ class HttpResponse final { // ===== RPC ===== +namespace exactlyOnce { + +struct uuid final { + tl::i64 lo{}; + tl::i64 hi{}; + + bool fetch(tl::fetcher& tlf) noexcept { + return lo.fetch(tlf) && hi.fetch(tlf); + } +}; + +struct prepareRequest final { + tl::exactlyOnce::uuid persistent_query_uuid{}; + + bool fetch(tl::fetcher& tlf) noexcept { + return persistent_query_uuid.fetch(tlf); + } +}; + +struct commitRequest final { + tl::exactlyOnce::uuid persistent_query_uuid{}; + tl::exactlyOnce::uuid persistent_slot_uuid{}; + + bool fetch(tl::fetcher& tlf) noexcept { + return persistent_query_uuid.fetch(tlf) && persistent_slot_uuid.fetch(tlf); + } +}; + +class PersistentRequest final { + static constexpr uint32_t PREPARE_REQUEST_MAGIC = 0xc8d7'1b66U; + static constexpr uint32_t COMMIT_REQUEST_MAGIC = 0x6836'b983U; + +public: + std::variant request; + + bool fetch(tl::fetcher& tlf) noexcept { + tl::magic magic{}; + if (!magic.fetch(tlf)) { + return false; + } + if (tl::exactlyOnce::prepareRequest prepare_request{}; magic.expect(PREPARE_REQUEST_MAGIC) && prepare_request.fetch(tlf)) { + request.emplace(prepare_request); + return true; + } + if (tl::exactlyOnce::commitRequest commit_request{}; magic.expect(COMMIT_REQUEST_MAGIC) && commit_request.fetch(tlf)) { + request.emplace(commit_request); + return true; + } + return false; + } +}; +} // namespace exactlyOnce + +namespace tracing { + +struct traceID final { + tl::i64 lo{}; + tl::i64 hi{}; + + bool fetch(tl::fetcher& tlf) noexcept { + return lo.fetch(tlf) && hi.fetch(tlf); + } +}; + +class traceContext final { + static constexpr uint32_t RETURN_RESERVED_STATUS_0_FLAG = vk::tl::common::tracing::traceContext::return_reserved_status_0; + static constexpr uint32_t RETURN_RESERVED_STATUS_1_FLAG = vk::tl::common::tracing::traceContext::return_reserved_status_1; + static constexpr uint32_t PARENT_ID_FLAG = vk::tl::common::tracing::traceContext::parent_id; + static constexpr uint32_t SOURCE_ID_FLAG = vk::tl::common::tracing::traceContext::source_id; + static constexpr uint32_t RETURN_RESERVED_LEVEL_0_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_0; + static constexpr uint32_t RETURN_RESERVED_LEVEL_1_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_1; + static constexpr uint32_t RETURN_RESERVED_LEVEL_2_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_2; + static constexpr uint32_t RETURN_DEBUG_FLAG = vk::tl::common::tracing::traceContext::return_debug; + +public: + tl::tracing::traceID trace_id{}; + std::optional opt_parent_id; + std::optional opt_source_id; + + // status = reserved_status_0 | (reserved_status_1 << 1) + // status == 0 - drop + // status == 1 - record + // status == 2 - defer + bool reserved_status_0{}; + bool reserved_status_1{}; + + bool reserved_level_0{}; + bool reserved_level_1{}; + bool reserved_level_2{}; + + bool debug_flag{}; + + bool fetch(tl::fetcher& tlf) noexcept { + tl::u32 fields_mask{}; + bool ok{fields_mask.fetch(tlf)}; + + ok = ok && trace_id.fetch(tlf); + if (ok && static_cast(fields_mask.value & PARENT_ID_FLAG)) { + ok = ok && opt_parent_id.emplace().fetch(tlf); + } + if (ok && static_cast(fields_mask.value & SOURCE_ID_FLAG)) { + ok = ok && opt_source_id.emplace().fetch(tlf); + } + + reserved_status_0 = static_cast(fields_mask.value & RETURN_RESERVED_STATUS_0_FLAG); + reserved_status_1 = static_cast(fields_mask.value & RETURN_RESERVED_STATUS_1_FLAG); + reserved_level_0 = static_cast(fields_mask.value & RETURN_RESERVED_LEVEL_0_FLAG); + reserved_level_1 = static_cast(fields_mask.value & RETURN_RESERVED_LEVEL_1_FLAG); + reserved_level_2 = static_cast(fields_mask.value & RETURN_RESERVED_LEVEL_2_FLAG); + debug_flag = static_cast(fields_mask.value & RETURN_DEBUG_FLAG); + + return ok; + } + + tl::mask get_flags() const noexcept { + tl::mask flags{.value = static_cast(reserved_status_0)}; + flags.value |= static_cast(reserved_status_1) << 1; + flags.value |= static_cast(reserved_level_0) << 4; + flags.value |= static_cast(reserved_level_1) << 5; + flags.value |= static_cast(reserved_level_2) << 6; + flags.value |= static_cast(debug_flag) << 7; + + flags.value |= static_cast(opt_parent_id.has_value()) << 2; + flags.value |= static_cast(opt_source_id.has_value()) << 3; + return flags; + } +}; +} // namespace tracing + class rpcInvokeReqExtra final { static constexpr uint32_t RETURN_BINLOG_POS_FLAG = vk::tl::common::rpc_invoke_req_extra_flags::return_binlog_pos; static constexpr uint32_t RETURN_BINLOG_TIME_FLAG = vk::tl::common::rpc_invoke_req_extra_flags::return_binlog_time; @@ -960,9 +1089,11 @@ class rpcInvokeReqExtra final { static constexpr uint32_t SUPPORTED_COMPRESSION_VERSION_FLAG = vk::tl::common::rpc_invoke_req_extra_flags::supported_compression_version; static constexpr uint32_t RANDOM_DELAY_FLAG = vk::tl::common::rpc_invoke_req_extra_flags::random_delay; static constexpr uint32_t RETURN_VIEW_NUMBER_FLAG = vk::tl::common::rpc_invoke_req_extra_flags::return_view_number; + static constexpr uint32_t PERSISTENT_QUERY_FLAG = vk::tl::common::rpc_invoke_req_extra_flags::persistent_query; + static constexpr uint32_t TRACE_CONTEXT_FLAG = vk::tl::common::rpc_invoke_req_extra_flags::trace_context; + static constexpr uint32_t EXECUTION_CONTEXT_FLAG = vk::tl::common::rpc_invoke_req_extra_flags::execution_context; public: - tl::mask flags{}; bool return_binlog_pos{}; bool return_binlog_time{}; bool return_pid{}; @@ -978,9 +1109,14 @@ class rpcInvokeReqExtra final { std::optional opt_custom_timeout_ms; std::optional opt_supported_compression_version; std::optional opt_random_delay; + std::optional opt_persistent_query; + std::optional opt_trace_context; + std::optional opt_execution_context; bool return_view_number{}; bool fetch(tl::fetcher& tlf) noexcept; + + tl::mask get_flags() const noexcept; }; struct RpcInvokeReqExtra final { diff --git a/runtime/interface.cpp b/runtime/interface.cpp index b61ed96b60..54ffe00633 100644 --- a/runtime/interface.cpp +++ b/runtime/interface.cpp @@ -13,14 +13,22 @@ #include #include #include +#include +#include #include +#include +#include #include "common/algorithms/string-algorithms.h" #include "common/macos-ports.h" #include "common/tl/constants/common.h" #include "common/wrappers/overloaded.h" +#include "common/tl/query-header.h" +#include "common/tl/tl-types.h" +#include "compiler/helper.h" #include "net/net-connections.h" +#include "runtime-common/core/runtime-core.h" #include "runtime-common/stdlib/serialization/serialization-context.h" #include "runtime-common/stdlib/server/url-functions.h" #include "runtime-common/stdlib/string/string-context.h" @@ -1485,6 +1493,93 @@ static void save_rpc_query_headers(const tl_query_header_t& header, mixed& v$_SE if (header.flags & flag::random_delay) { v$_SERVER.set_value(string("RPC_EXTRA_RANDOM_DELAY"), header.random_delay); } + if (header.flags & flag::persistent_query) { + std::visit( + [&v$_SERVER](const auto& value) noexcept { + using value_t = std::decay_t; + + constexpr std::string_view persistent_query_uuid_key_name{"persistent_query_uuid"}; + constexpr std::string_view persistent_slot_uuid_key_name{"persistent_slot_uuid"}; + constexpr std::string_view lo_key_name{"lo"}; + constexpr std::string_view hi_key_name{"hi"}; + + if constexpr (std::is_same_v) { + array persistent_query_uuid{array_size{2, false}}; + persistent_query_uuid.emplace_value(string{lo_key_name.data(), static_cast(lo_key_name.size())}, + static_cast(value.persistent_query_uuid.lo)); + persistent_query_uuid.emplace_value(string{hi_key_name.data(), static_cast(hi_key_name.size())}, + static_cast(value.persistent_query_uuid.hi)); + + array out{array_size{1, false}}; + out.emplace_value(string{persistent_query_uuid_key_name.data(), static_cast(persistent_query_uuid_key_name.size())}, + std::move(persistent_query_uuid)); + + v$_SERVER.set_value(string{"RPC_EXTRA_PERSISTENT_QUERY"}, std::move(out)); + } else if constexpr (std::is_same_v) { + array persistent_query_uuid{array_size{2, false}}; + persistent_query_uuid.emplace_value(string{lo_key_name.data(), static_cast(lo_key_name.size())}, + static_cast(value.persistent_query_uuid.lo)); + persistent_query_uuid.emplace_value(string{hi_key_name.data(), static_cast(hi_key_name.size())}, + static_cast(value.persistent_query_uuid.hi)); + + array persistent_slot_uuid{array_size{2, false}}; + persistent_slot_uuid.emplace_value(string{lo_key_name.data(), static_cast(lo_key_name.size())}, + static_cast(value.persistent_slot_uuid.lo)); + persistent_slot_uuid.emplace_value(string{hi_key_name.data(), static_cast(hi_key_name.size())}, + static_cast(value.persistent_slot_uuid.hi)); + + array out{array_size{2, false}}; + out.emplace_value(string{persistent_query_uuid_key_name.data(), static_cast(persistent_query_uuid_key_name.size())}, + std::move(persistent_query_uuid)); + out.emplace_value(string{persistent_slot_uuid_key_name.data(), static_cast(persistent_slot_uuid_key_name.size())}, + std::move(persistent_slot_uuid)); + + v$_SERVER.set_value(string{"RPC_EXTRA_PERSISTENT_QUERY"}, std::move(out)); + } else { + // condition is strange because of c++17 compiler. It is equivalent to `false` + static_assert(sizeof(value_t) && false, "exactlyOnce::PersistentRequest only supports prepareRequest and commitRequest"); + } + }, + header.persistent_query.request); + } + if (header.flags & flag::trace_context) { + const auto& trace_context{header.trace_context}; + + constexpr std::string_view fields_mask_key_name{"fields_mask"}; + constexpr std::string_view trace_id_key_name{"trace_id"}; + constexpr std::string_view lo_key_name{"lo"}; + constexpr std::string_view hi_key_name{"hi"}; + constexpr std::string_view parent_id_key_name{"parent_id"}; + constexpr std::string_view source_id_key_name{"source_id"}; + + // + 2 for fields_mask and trace_id, + 1 if there is a parent_id, + 1 if there is a source_id + const int64_t out_size{2 + static_cast(trace_context.opt_parent_id.has_value()) + static_cast(trace_context.opt_source_id.has_value())}; + + array trace_id{array_size{2, false}}; + trace_id.emplace_value(string{lo_key_name.data(), static_cast(lo_key_name.size())}, static_cast(trace_context.trace_id.lo)); + trace_id.emplace_value(string{hi_key_name.data(), static_cast(hi_key_name.size())}, static_cast(trace_context.trace_id.hi)); + + array out{array_size{out_size, false}}; + out.emplace_value(string{fields_mask_key_name.data(), static_cast(fields_mask_key_name.size())}, + static_cast(trace_context.get_flags())); + out.emplace_value(string{trace_id_key_name.data(), static_cast(trace_id_key_name.size())}, std::move(trace_id)); + + if (trace_context.opt_parent_id) { + out.emplace_value(string{parent_id_key_name.data(), static_cast(parent_id_key_name.size())}, + static_cast(*trace_context.opt_parent_id)); + } + if (trace_context.opt_source_id) { + const std::string& opt_source_id_value{*trace_context.opt_source_id}; + out.emplace_value(string{source_id_key_name.data(), static_cast(source_id_key_name.size())}, + string{opt_source_id_value.data(), static_cast(opt_source_id_value.size())}); + } + + v$_SERVER.set_value(string{"RPC_EXTRA_TRACE_CONTEXT"}, std::move(out)); + } + if (header.flags & flag::execution_context) { + const std::string& execution_context{header.execution_context}; + v$_SERVER.set_value(string{"RPC_EXTRA_EXECUTION_CONTEXT"}, string{execution_context.data(), static_cast(execution_context.size())}); + } } static void init_superglobals_impl(const http_query_data& http_data, const rpc_query_data& rpc_data, const job_query_data& job_data, diff --git a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActor.php b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActor.php index c04d9257c1..7c7db7179c 100644 --- a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActor.php +++ b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActor.php @@ -78,7 +78,7 @@ public function getTLFunctionName() { * * @return TL\RpcFunctionFetcher */ - public function typedStore(){ + public function typedStore() { return null; } @@ -87,7 +87,7 @@ public function typedStore(){ * * @return TL\RpcFunctionFetcher */ - public function typedFetch(){ + public function typedFetch() { return null; } diff --git a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActorFlags.php b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActorFlags.php index 234a3b514c..dc39dc528b 100644 --- a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActorFlags.php +++ b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActorFlags.php @@ -88,7 +88,7 @@ public function getTLFunctionName() { * * @return TL\RpcFunctionFetcher */ - public function typedStore(){ + public function typedStore() { return null; } @@ -97,7 +97,7 @@ public function typedStore(){ * * @return TL\RpcFunctionFetcher */ - public function typedFetch(){ + public function typedFetch() { return null; } diff --git a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestFlags.php b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestFlags.php index 8dc1637f81..6846670808 100644 --- a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestFlags.php +++ b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestFlags.php @@ -83,7 +83,7 @@ public function getTLFunctionName() { * * @return TL\RpcFunctionFetcher */ - public function typedStore(){ + public function typedStore() { return null; } @@ -92,7 +92,7 @@ public function typedStore(){ * * @return TL\RpcFunctionFetcher */ - public function typedFetch(){ + public function typedFetch() { return null; } diff --git a/tests/python/tests/rpc/php/VK/TL/_common/Types/rpcInvokeReqExtra.php b/tests/python/tests/rpc/php/VK/TL/_common/Types/rpcInvokeReqExtra.php index 018cdfa4d3..eb87e3561b 100644 --- a/tests/python/tests/rpc/php/VK/TL/_common/Types/rpcInvokeReqExtra.php +++ b/tests/python/tests/rpc/php/VK/TL/_common/Types/rpcInvokeReqExtra.php @@ -8,6 +8,8 @@ namespace VK\TL\_common\Types; +use VK\TL; + /** * @kphp-tl-class */ @@ -61,6 +63,15 @@ class rpcInvokeReqExtra { /** Field mask for $return_view_number field */ const BIT_RETURN_VIEW_NUMBER_27 = (1 << 27); + /** Field mask for $persistent_query field */ + const BIT_PERSISTENT_QUERY_28 = (1 << 28); + + /** Field mask for $trace_context field */ + const BIT_TRACE_CONTEXT_29 = (1 << 29); + + /** Field mask for $execution_context field */ + const BIT_EXECUTION_CONTEXT_30 = (1 << 30); + /** @var boolean */ public $return_binlog_pos = false; @@ -109,6 +120,15 @@ class rpcInvokeReqExtra { /** @var boolean */ public $return_view_number = false; + /** @var TL\exactlyOnce\Types\exactlyOnce_PersistentRequest|null */ + public $persistent_query = null; + + /** @var TL\tracing\Types\tracing_traceContext|null */ + public $trace_context = null; + + /** @var string|null */ + public $execution_context = null; + /** * @kphp-inline */ @@ -185,6 +205,18 @@ public function calculateFlags() { $mask |= self::BIT_RETURN_VIEW_NUMBER_27; } + if ($this->persistent_query) { + $mask |= self::BIT_PERSISTENT_QUERY_28; + } + + if ($this->trace_context) { + $mask |= self::BIT_TRACE_CONTEXT_29; + } + + if ($this->execution_context !== null) { + $mask |= self::BIT_EXECUTION_CONTEXT_30; + } + return $mask; } diff --git a/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_PersistentRequest.php b/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_PersistentRequest.php new file mode 100644 index 0000000000..2a7fb7b4e0 --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_PersistentRequest.php @@ -0,0 +1,24 @@ +persistent_query_uuid = $persistent_query_uuid; + $this->persistent_slot_uuid = $persistent_slot_uuid; + } + +} diff --git a/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_prepareRequest.php b/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_prepareRequest.php new file mode 100644 index 0000000000..49c6ca2b8f --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_prepareRequest.php @@ -0,0 +1,28 @@ +persistent_query_uuid = $persistent_query_uuid; + } + +} diff --git a/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_uuid.php b/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_uuid.php new file mode 100644 index 0000000000..5d233a828d --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/exactlyOnce/Types/exactlyOnce_uuid.php @@ -0,0 +1,31 @@ +lo = $lo; + $this->hi = $hi; + } + +} diff --git a/tests/python/tests/rpc/php/VK/TL/tracing/Types/tracing_traceContext.php b/tests/python/tests/rpc/php/VK/TL/tracing/Types/tracing_traceContext.php new file mode 100644 index 0000000000..e4dbca8bb6 --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/tracing/Types/tracing_traceContext.php @@ -0,0 +1,122 @@ +fields_mask = $fields_mask; + $this->trace_id = $trace_id; + } + + /** + * @return int + */ + public function calculateFieldsMask() { + $mask = 0; + + if ($this->reserved_status_0) { + $mask |= self::BIT_RESERVED_STATUS_0_0; + } + + if ($this->reserved_status_1) { + $mask |= self::BIT_RESERVED_STATUS_1_1; + } + + if ($this->parent_id !== null) { + $mask |= self::BIT_PARENT_ID_2; + } + + if ($this->source_id !== null) { + $mask |= self::BIT_SOURCE_ID_3; + } + + if ($this->reserved_level_0) { + $mask |= self::BIT_RESERVED_LEVEL_0_4; + } + + if ($this->reserved_level_1) { + $mask |= self::BIT_RESERVED_LEVEL_1_5; + } + + if ($this->reserved_level_2) { + $mask |= self::BIT_RESERVED_LEVEL_2_6; + } + + if ($this->debug_flag) { + $mask |= self::BIT_DEBUG_FLAG_7; + } + + return $mask; + } + +} diff --git a/tests/python/tests/rpc/php/VK/TL/tracing/Types/tracing_traceID.php b/tests/python/tests/rpc/php/VK/TL/tracing/Types/tracing_traceID.php new file mode 100644 index 0000000000..1870bb2128 --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/tracing/Types/tracing_traceID.php @@ -0,0 +1,31 @@ +lo = $lo; + $this->hi = $hi; + } + +}