-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcpp_consumer.cpp
More file actions
131 lines (110 loc) · 4.28 KB
/
cpp_consumer.cpp
File metadata and controls
131 lines (110 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/**
* C++ Consumer - reads data from shared memory queue written by Python producer
*
* Usage: cpp_consumer <queue_name> <num_items> <element_size> <output_file>
*/
// // CRITICAL: Force ANSI mode for Python interoperability
// // Python's mmap uses CreateFileMappingA, not CreateFileMappingW
// #ifdef _WIN32
// // Must be BEFORE any Windows headers
// #undef UNICODE
// #undef _UNICODE
// #ifndef _MBCS
// #define _MBCS 1
// #endif
// #endif
#include <limits>
#include <slick/queue.h>
#include <iostream>
#include <fstream>
#include <cstring>
#include <cstdint>
#include <chrono>
#include <thread>
#include <array>
// // Compile-time verification that we're in ANSI mode
// #ifdef _WIN32
// #ifdef UNICODE
// #error "UNICODE is defined! This will break Python interop. Check build settings."
// #endif
// #ifdef _UNICODE
// #error "_UNICODE is defined! This will break Python interop. Check build settings."
// #endif
// #ifndef _MBCS
// #error "_MBCS is not defined! ANSI mode not enabled."
// #endif
// #endif
int main(int argc, char* argv[]) {
if (argc != 5) {
std::cerr << "Usage: " << argv[0] << " <queue_name> <num_items> <element_size> <output_file>\n";
return 1;
}
const char* queue_name = argv[1];
int num_items = std::atoi(argv[2]);
int element_size = std::atoi(argv[3]);
const char* output_file = argv[4];
std::cout << "C++ Consumer starting...\n";
std::cout << "Queue: " << queue_name << "\n";
std::cout << "Expected items: " << num_items << "\n";
std::cout << "Element size: " << element_size << "\n";
// Open existing queue (created by Python)
// Note: C++ SlickQueue constructor opens existing queue when only name is provided
// Use array type to match Python's element_size
using Element = std::array<uint8_t, 32>;
try {
slick::SlickQueue<Element> queue(queue_name);
std::cout << "Queue opened successfully!\n";
std::cout << " Queue size: " << queue.size() << "\n";
std::ofstream out(output_file);
if (!out) {
std::cerr << "Failed to open output file: " << output_file << "\n";
return 1;
}
uint64_t read_index = 0;
int consumed = 0;
int attempts = 0;
const int MAX_ATTEMPTS = 10000; // Avoid infinite loop
std::cout << "Starting read loop, expecting " << num_items << " items...\n";
std::cout.flush();
// Consume items
while (consumed < num_items && attempts < MAX_ATTEMPTS) {
attempts++;
// C++ read() takes read_index by reference and updates it
uint64_t prev_read_index = read_index;
auto [data, size] = queue.read(read_index);
// Debug: Print first few attempts
// if (attempts <= 5) {
// std::cout << "Attempt " << attempts << ": read_index=" << prev_read_index
// << " -> " << read_index << ", data="
// << (data ? "YES " : "NULL ") << ", size=" << size << "\n";
// }
if (data != nullptr) {
// Parse data: [worker_id (4 bytes), item_num (4 bytes)]
// data is now Element* (std::array<uint8_t, 32>*), access via data->data()
uint32_t worker_id, item_num;
std::memcpy(&worker_id, data->data(), sizeof(worker_id));
std::memcpy(&item_num, data->data() + 4, sizeof(item_num));
// Write to output file
out << worker_id << " " << item_num << "\n";
consumed++;
if (consumed % 100 == 0) {
std::cout << "Progress: " << consumed << "/" << num_items << "\n";
}
} else {
// No data available, small sleep
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
out.close();
if (consumed == num_items) {
std::cout << "C++ Consumer completed: " << consumed << " items read\n";
return 0;
} else {
std::cerr << "C++ Consumer timeout: only read " << consumed << "/" << num_items << " items\n";
return 1;
}
} catch (const std::exception& e) {
std::cerr << "C++ Consumer error: " << e.what() << "\n";
return 1;
}
}