From 6f0f4611f213910f4d38573963a0db8224542b1e Mon Sep 17 00:00:00 2001 From: prpeh Date: Sun, 5 Oct 2025 12:31:00 +0700 Subject: [PATCH 1/2] docs: Add Kafka AFS adapter feasibility study and documentation - Add comprehensive feasibility study (KafkaAfsFeasibility.md) - Add comparison with other AFS adapters (KafkaAfsComparison.md) - Add executive summary (KafkaAfsSummary.md) - Add quick start guide (KafkaAfsQuickStart.md) - Investigate Confluent.Kafka .NET library compatibility - Map Eclipse Store Kafka AFS design to .NET implementation - Provide 3-week MVP roadmap for implementation Verdict: FEASIBLE - Confluent.Kafka provides 100% API parity with Java kafka-clients. Recommended for event-driven architectures with audit trail requirements. --- docs/KafkaAfsComparison.md | 334 +++++++++++++++++++++++ docs/KafkaAfsFeasibility.md | 474 ++++++++++++++++++++++++++++++++ docs/KafkaAfsQuickStart.md | 520 ++++++++++++++++++++++++++++++++++++ docs/KafkaAfsSummary.md | 357 +++++++++++++++++++++++++ 4 files changed, 1685 insertions(+) create mode 100644 docs/KafkaAfsComparison.md create mode 100644 docs/KafkaAfsFeasibility.md create mode 100644 docs/KafkaAfsQuickStart.md create mode 100644 docs/KafkaAfsSummary.md diff --git a/docs/KafkaAfsComparison.md b/docs/KafkaAfsComparison.md new file mode 100644 index 0000000..77ea0df --- /dev/null +++ b/docs/KafkaAfsComparison.md @@ -0,0 +1,334 @@ +# Kafka AFS Adapter Comparison + +This document compares the Kafka AFS adapter with other available AFS adapters in NebulaStore to help you choose the right storage backend. + +## Quick Comparison Matrix + +| Feature | NIO (Local) | Redis | AWS S3 | Azure Blob | Google Firestore | **Kafka** | +|---------|-------------|-------|--------|------------|------------------|-----------| +| **Setup Complexity** | ⭐ Very Easy | ⭐⭐ Easy | ⭐⭐⭐ Medium | ⭐⭐⭐ Medium | ⭐⭐⭐ Medium | ⭐⭐⭐⭐ Complex | +| **Infrastructure Cost** | Free | Low | Medium | Medium | Medium | Medium-High | +| **Read Performance** | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐ Very Good | ⭐⭐⭐ Good | ⭐⭐⭐ Good | ⭐⭐⭐ Good | ⭐⭐⭐⭐ Very Good | +| **Write Performance** | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐ Very Good | ⭐⭐⭐ Good | ⭐⭐⭐ Good | ⭐⭐ Fair | ⭐⭐⭐⭐ Very Good | +| **Scalability** | ⭐⭐ Limited | ⭐⭐⭐ Good | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐⭐ Excellent | +| **Durability** | ⭐⭐ Disk-dependent | ⭐⭐⭐ Good | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐⭐ Excellent | ⭐⭐⭐⭐⭐ Excellent | +| **Replication** | ❌ None | ⭐⭐⭐ Built-in | ⭐⭐⭐⭐⭐ Multi-region | ⭐⭐⭐⭐⭐ Multi-region | ⭐⭐⭐⭐⭐ Multi-region | ⭐⭐⭐⭐⭐ Multi-datacenter | +| **Event Streaming** | ❌ No | ❌ No | ❌ No | ❌ No | ⭐⭐ Limited | ⭐⭐⭐⭐⭐ Native | +| **Time Travel** | ❌ No | ❌ No | ⭐⭐ Versioning | ⭐⭐ Versioning | ❌ No | ⭐⭐⭐⭐⭐ Native | +| **Audit Trail** | ❌ No | ❌ No | ⭐⭐ Logs | ⭐⭐ Logs | ⭐⭐ Logs | ⭐⭐⭐⭐⭐ Native | +| **Operational Overhead** | ⭐⭐⭐⭐⭐ Minimal | ⭐⭐⭐⭐ Low | ⭐⭐⭐ Medium | ⭐⭐⭐ Medium | ⭐⭐⭐ Medium | ⭐⭐ High | + +--- + +## Detailed Comparison + +### 1. NIO (Local File System) + +**Best For:** Development, testing, single-server deployments + +**Pros:** +- ✅ Zero infrastructure setup +- ✅ Fastest performance (local disk I/O) +- ✅ No external dependencies +- ✅ Simple debugging + +**Cons:** +- ❌ No replication or high availability +- ❌ Limited to single machine +- ❌ No built-in backup/restore +- ❌ Disk failure = data loss + +**Use Cases:** +- Local development +- Desktop applications +- Single-server applications +- Embedded systems + +--- + +### 2. Redis + +**Best For:** High-performance caching, session storage, real-time applications + +**Pros:** +- ✅ Very fast read/write performance +- ✅ Simple setup (single Redis instance) +- ✅ Built-in replication +- ✅ Rich data structures +- ✅ Active community + +**Cons:** +- ❌ Memory-based (expensive for large datasets) +- ❌ Persistence is secondary concern +- ❌ Limited query capabilities +- ❌ No native event streaming + +**Use Cases:** +- Session storage +- Real-time analytics +- Leaderboards +- Cache-first architectures + +--- + +### 3. AWS S3 + +**Best For:** Cloud-native applications on AWS, large-scale storage + +**Pros:** +- ✅ Virtually unlimited storage +- ✅ 99.999999999% durability +- ✅ Multi-region replication +- ✅ Cost-effective for large data +- ✅ Integrated with AWS ecosystem + +**Cons:** +- ❌ Higher latency than local storage +- ❌ AWS vendor lock-in +- ❌ Costs can add up (requests + storage) +- ❌ Eventual consistency in some cases + +**Use Cases:** +- AWS-based applications +- Data lakes +- Backup and archival +- Multi-region deployments + +--- + +### 4. Azure Blob Storage + +**Best For:** Cloud-native applications on Azure, enterprise storage + +**Pros:** +- ✅ Massive scalability +- ✅ High durability (LRS, ZRS, GRS) +- ✅ Integrated with Azure ecosystem +- ✅ Tiered storage (Hot/Cool/Archive) +- ✅ Strong consistency + +**Cons:** +- ❌ Azure vendor lock-in +- ❌ Network latency +- ❌ Complex pricing model +- ❌ Requires Azure account + +**Use Cases:** +- Azure-based applications +- Enterprise data storage +- Hybrid cloud scenarios +- Compliance-heavy industries + +--- + +### 5. Google Cloud Firestore + +**Best For:** Real-time applications, mobile backends, serverless + +**Pros:** +- ✅ Real-time synchronization +- ✅ Serverless (auto-scaling) +- ✅ Strong consistency +- ✅ Offline support +- ✅ Integrated with Firebase + +**Cons:** +- ❌ Google Cloud vendor lock-in +- ❌ Document-based (not ideal for blobs) +- ❌ Higher costs for large data +- ❌ Query limitations + +**Use Cases:** +- Mobile applications +- Real-time collaboration +- Serverless architectures +- Firebase-based apps + +--- + +### 6. **Kafka (Proposed)** + +**Best For:** Event-driven architectures, audit trails, time-series data, streaming applications + +**Pros:** +- ✅ Native event streaming capabilities +- ✅ Built-in time travel (offset-based) +- ✅ Complete audit trail (immutable log) +- ✅ Multi-datacenter replication +- ✅ High throughput (100K+ msg/sec) +- ✅ Horizontal scalability +- ✅ Strong ordering guarantees +- ✅ Integration with existing Kafka infrastructure + +**Cons:** +- ❌ Complex infrastructure (ZooKeeper/KRaft, brokers) +- ❌ Higher operational overhead +- ❌ Topic proliferation (one per file) +- ❌ Not ideal for small files +- ❌ Requires Kafka expertise +- ❌ Higher resource consumption + +**Use Cases:** +- Event-driven microservices +- CQRS/Event Sourcing architectures +- Audit and compliance requirements +- Time-series data storage +- Applications already using Kafka +- Multi-datacenter deployments +- Streaming analytics pipelines + +--- + +## Decision Matrix + +### Choose **NIO** if: +- ✓ You're developing locally +- ✓ Single-server deployment +- ✓ Maximum performance needed +- ✓ No replication required + +### Choose **Redis** if: +- ✓ You need very fast access +- ✓ Data fits in memory +- ✓ Real-time requirements +- ✓ Simple setup preferred + +### Choose **AWS S3** if: +- ✓ You're on AWS +- ✓ Large-scale storage needed +- ✓ Cost-effective archival +- ✓ Multi-region replication + +### Choose **Azure Blob** if: +- ✓ You're on Azure +- ✓ Enterprise requirements +- ✓ Compliance needs +- ✓ Tiered storage strategy + +### Choose **Google Firestore** if: +- ✓ You're on Google Cloud +- ✓ Real-time sync needed +- ✓ Mobile/web applications +- ✓ Serverless architecture + +### Choose **Kafka** if: +- ✓ You already use Kafka +- ✓ Event-driven architecture +- ✓ Audit trail required +- ✓ Time travel capabilities needed +- ✓ Multi-datacenter replication +- ✓ Streaming integration +- ✓ High write throughput +- ✓ Operational expertise available + +--- + +## Cost Comparison (Estimated) + +| Backend | Setup Cost | Monthly Cost (100GB) | Operational Cost | +|---------|------------|---------------------|------------------| +| NIO | $0 | $0 (disk only) | Minimal | +| Redis | $0-$50 | $50-$200 | Low | +| AWS S3 | $0 | $2-$5 | Medium | +| Azure Blob | $0 | $2-$5 | Medium | +| Firestore | $0 | $18-$36 | Medium | +| **Kafka** | $100-$500 | $50-$300 | **High** | + +*Note: Kafka costs include cluster infrastructure (brokers, ZooKeeper/KRaft). Managed services (Confluent Cloud, AWS MSK) cost more but reduce operational overhead.* + +--- + +## Performance Comparison (Estimated) + +| Backend | Write Latency | Read Latency | Throughput | +|---------|---------------|--------------|------------| +| NIO | <1ms | <1ms | 1GB/s+ | +| Redis | 1-5ms | <1ms | 500MB/s | +| AWS S3 | 50-200ms | 20-100ms | 100MB/s | +| Azure Blob | 50-200ms | 20-100ms | 100MB/s | +| Firestore | 100-500ms | 50-200ms | 50MB/s | +| **Kafka** | **5-20ms** | **10-50ms** | **500MB/s+** | + +*Note: Performance varies based on configuration, network, and workload patterns.* + +--- + +## Unique Kafka Capabilities + +### 1. Event Streaming Integration + +Kafka AFS allows you to treat storage operations as events: + +```csharp +// Storage writes become Kafka events +storage.Store(customer); // → Produces event to Kafka + +// Other systems can consume these events +var consumer = new KafkaConsumer(config); +consumer.Subscribe("customer-data-topic"); +// Process storage events in real-time +``` + +### 2. Time Travel + +Read data as it existed at any point in time: + +```csharp +// Read current state +var currentData = storage.Root(); + +// Read state from 1 hour ago (via offset) +var historicalData = kafkaConnector.ReadAtOffset(topic, offsetOneHourAgo); +``` + +### 3. Complete Audit Trail + +Every write is immutable and logged: + +```csharp +// Query all changes to a file +var auditTrail = kafkaConnector.GetAuditTrail(filePath); +foreach (var change in auditTrail) +{ + Console.WriteLine($"{change.Timestamp}: {change.Operation}"); +} +``` + +### 4. Multi-Datacenter Replication + +Kafka's MirrorMaker enables cross-datacenter replication: + +``` +DC1 (Primary) → MirrorMaker → DC2 (Replica) +Kafka Cluster Kafka Cluster +``` + +--- + +## Migration Path + +If you start with one backend and want to switch to Kafka later: + +1. **NIO → Kafka:** Export data, import to Kafka topics +2. **Redis → Kafka:** Stream Redis data to Kafka +3. **S3 → Kafka:** Batch import S3 objects to Kafka +4. **Azure/GCP → Kafka:** Use cloud connectors + +All AFS adapters implement the same `IBlobStoreConnector` interface, making migration straightforward. + +--- + +## Conclusion + +**Kafka AFS is a specialized adapter** that shines in event-driven architectures where: +- Storage operations need to be observable as events +- Audit trails and compliance are critical +- Time travel capabilities are valuable +- You already have Kafka infrastructure + +For simpler use cases, **NIO, Redis, or cloud storage** are better choices due to lower complexity and operational overhead. + +--- + +**Recommendation:** Start with a simpler backend (NIO for dev, Redis/S3 for production) and migrate to Kafka only when you have specific requirements that justify the additional complexity. + diff --git a/docs/KafkaAfsFeasibility.md b/docs/KafkaAfsFeasibility.md new file mode 100644 index 0000000..02b6f11 --- /dev/null +++ b/docs/KafkaAfsFeasibility.md @@ -0,0 +1,474 @@ +# Kafka AFS Adapter Feasibility Study + +**Date:** 2025-10-05 +**Objective:** Evaluate the feasibility of porting Eclipse Store's Kafka AFS adapter to NebulaStore using Confluent.Kafka .NET library + +## Executive Summary + +**Verdict: FEASIBLE** ✅ + +Porting the Eclipse Store Kafka AFS adapter to NebulaStore is technically feasible using the Confluent.Kafka .NET library. The .NET client provides all necessary APIs (Producer, Consumer, AdminClient) that map directly to the Java kafka-clients library used by Eclipse Store. The implementation would follow established patterns in NebulaStore's existing AFS connectors (Redis, AWS S3, Azure, Google Cloud Firestore). + +**Estimated Complexity:** Medium-High +**Estimated Development Time:** 2-3 weeks for MVP +**Key Risk:** Kafka topic management complexity and index synchronization + +--- + +## 1. Architecture Analysis + +### 1.1 Eclipse Store Kafka AFS Design + +The Eclipse Store Kafka adapter uses a sophisticated design: + +1. **Topic-Based Storage**: Each file is stored as a Kafka topic + - Topic name derived from file path (sanitized for Kafka naming rules) + - Files split into 1MB chunks (blobs) and written as Kafka messages + - Each blob is a separate Kafka record + +2. **Blob Metadata Structure**: + ```java + public interface Blob { + String topic(); // Kafka topic name + int partition(); // Kafka partition number + long offset(); // Kafka offset within partition + long start(); // Logical start position in file + long end(); // Logical end position in file + long size(); // Blob size (end - start + 1) + } + ``` + +3. **Index Management**: + - Separate index topic per file: `___index` + - Index stores blob metadata (28 bytes per blob): + - partition (4 bytes) + - offset (8 bytes) + - start position (8 bytes) + - end position (8 bytes) + - Index enables efficient file reconstruction and partial reads + +4. **File System Index**: + - Tracks all files in the system + - Enables directory listing and traversal + +### 1.2 Key Operations + +**Write Operation:** +1. Split data into 1MB chunks +2. Produce each chunk to Kafka topic +3. Collect RecordMetadata (partition, offset) +4. Write blob metadata to index topic +5. Update file system index + +**Read Operation:** +1. Query index topic for blob metadata +2. Seek to specific partition/offset +3. Consume record +4. Extract requested byte range from blob + +**Delete Operation:** +1. Delete Kafka topics (data + index) +2. Remove from file system index +3. Clean up consumer/producer instances + +--- + +## 2. Confluent.Kafka .NET Library Capabilities + +### 2.1 Available APIs + +The Confluent.Kafka library provides complete feature parity with Java kafka-clients: + +| Java API | .NET Equivalent | Status | +|----------|----------------|--------| +| `KafkaProducer` | `IProducer` | ✅ Available | +| `KafkaConsumer` | `IConsumer` | ✅ Available | +| `AdminClient` | `IAdminClient` | ✅ Available | +| `ProducerRecord` | `Message` | ✅ Available | +| `ConsumerRecord` | `ConsumeResult` | ✅ Available | +| `RecordMetadata` | `DeliveryResult` | ✅ Available | +| Topic Management | `CreateTopicsAsync`, `DeleteTopicsAsync` | ✅ Available | +| Offset Management | `Seek`, `Position`, `Committed` | ✅ Available | +| Record Deletion | `DeleteRecordsAsync` | ✅ Available | + +### 2.2 Key Features + +1. **High-Level Producer API**: + ```csharp + var result = await producer.ProduceAsync(topic, new Message + { + Key = key, + Value = data + }); + // result.Partition, result.Offset available + ``` + +2. **High-Level Consumer API**: + ```csharp + consumer.Assign(new TopicPartition(topic, partition)); + consumer.Seek(new TopicPartitionOffset(topic, partition, offset)); + var result = consumer.Consume(timeout); + ``` + +3. **AdminClient API**: + ```csharp + await adminClient.CreateTopicsAsync(new[] { new TopicSpecification { Name = topic } }); + await adminClient.DeleteTopicsAsync(new[] { topic }); + await adminClient.DeleteRecordsAsync(recordsToDelete); + ``` + +4. **Performance Features**: + - Automatic batching + - Compression support + - Idempotent producer + - Transactional API support + +### 2.3 Compatibility + +- **.NET Framework:** >= 4.6.2 +- **.NET Core:** >= 1.0 +- **.NET Standard:** >= 1.3 +- **NebulaStore Target:** .NET 9.0 ✅ Fully compatible + +--- + +## 3. Implementation Design + +### 3.1 Module Structure + +``` +afs/kafka/ +├── NebulaStore.Afs.Kafka.csproj +├── README.md +├── src/ +│ ├── KafkaConnector.cs # Main connector implementation +│ ├── KafkaConfiguration.cs # Configuration wrapper +│ ├── KafkaBlob.cs # Blob metadata structure +│ ├── KafkaTopicIndex.cs # Per-topic index management +│ ├── KafkaFileSystemIndex.cs # Global file system index +│ ├── KafkaPathValidator.cs # Path validation for Kafka naming +│ └── Extensions/ +│ └── EmbeddedStorageKafkaExtensions.cs +├── tests/ +│ └── KafkaConnectorTests.cs +└── examples/ + └── KafkaExample.cs +``` + +### 3.2 Core Classes + +#### KafkaConnector + +```csharp +public class KafkaConnector : BlobStoreConnectorBase +{ + private readonly ProducerConfig _producerConfig; + private readonly ConsumerConfig _consumerConfig; + private readonly AdminClientConfig _adminConfig; + + private readonly KafkaFileSystemIndex _fileSystemIndex; + private readonly ConcurrentDictionary _topicIndices; + private readonly ConcurrentDictionary> _producers; + private readonly ConcurrentDictionary> _consumers; + + // IBlobStoreConnector implementation + public override long WriteData(BlobStorePath file, IEnumerable sourceBuffers); + public override byte[] ReadData(BlobStorePath file, long offset, long length); + public override bool DeleteFile(BlobStorePath file); + // ... other methods +} +``` + +#### KafkaBlob + +```csharp +public record KafkaBlob +{ + public string Topic { get; init; } + public int Partition { get; init; } + public long Offset { get; init; } + public long Start { get; init; } + public long End { get; init; } + public long Size => End - Start + 1; + + // Serialization to/from 28-byte format + public byte[] ToBytes(); + public static KafkaBlob FromBytes(byte[] bytes); +} +``` + +#### KafkaTopicIndex + +```csharp +public class KafkaTopicIndex : IDisposable +{ + private readonly string _topic; + private readonly string _indexTopicName; + private readonly IProducer _producer; + private readonly List _blobs; + + public IEnumerable GetBlobs(); + public void AddBlobs(IEnumerable blobs); + public void DeleteBlobs(IEnumerable blobs); +} +``` + +### 3.3 Configuration + +```csharp +public class KafkaConfiguration +{ + public string BootstrapServers { get; set; } = "localhost:9092"; + public string ClientId { get; set; } = "nebulastore-kafka"; + public int MaxMessageBytes { get; set; } = 1_000_000; // 1MB chunks + public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromMinutes(1); + public bool EnableIdempotence { get; set; } = true; + public CompressionType Compression { get; set; } = CompressionType.None; + + // Additional Kafka-specific settings + public Dictionary AdditionalSettings { get; set; } = new(); +} +``` + +--- + +## 4. Integration with NebulaStore + +### 4.1 Usage Pattern + +```csharp +using NebulaStore.Storage.Embedded; +using NebulaStore.Afs.Kafka; + +// Configure Kafka connection +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "localhost:9092", + ClientId = "my-app", + EnableIdempotence = true +}; + +// Create connector +using var connector = KafkaConnector.New(kafkaConfig); + +// Create file system +using var fileSystem = BlobStoreFileSystem.New(connector); + +// Use with EmbeddedStorage +var storageConfig = EmbeddedStorageConfiguration.New() + .SetStorageFileSystem(fileSystem) + .Build(); + +using var storage = EmbeddedStorage.Start(storageConfig); +``` + +### 4.2 Extension Methods + +```csharp +public static class EmbeddedStorageKafkaExtensions +{ + public static IEmbeddedStorageManager StartWithKafka( + string bootstrapServers, + string clientId = "nebulastore") + { + var config = new KafkaConfiguration + { + BootstrapServers = bootstrapServers, + ClientId = clientId + }; + + var connector = KafkaConnector.New(config); + var fileSystem = BlobStoreFileSystem.New(connector); + + return EmbeddedStorage.Start( + EmbeddedStorageConfiguration.New() + .SetStorageFileSystem(fileSystem) + .Build() + ); + } +} +``` + +--- + +## 5. Technical Challenges & Solutions + +### 5.1 Topic Naming Constraints + +**Challenge:** Kafka topic names have restrictions (alphanumeric, `.`, `_`, `-` only) + +**Solution:** Implement path sanitization (same as Eclipse Store): +```csharp +private static string ToTopicName(BlobStorePath path) +{ + return Regex.Replace( + path.FullQualifiedName.Replace(BlobStorePath.SeparatorChar, '_'), + "[^a-zA-Z0-9\\._\\-]", + "_" + ); +} +``` + +### 5.2 Index Synchronization + +**Challenge:** Index topic must stay synchronized with data topic + +**Solution:** +- Use transactions for atomic writes (optional, adds overhead) +- Or accept eventual consistency with retry logic +- Implement index rebuild capability from data topics + +### 5.3 Partial Blob Deletion + +**Challenge:** Kafka doesn't support deleting individual records, only up to an offset + +**Solution:** Same as Eclipse Store: +1. Read remaining blobs +2. Delete records up to offset +3. Rewrite remaining blobs +4. Update index + +### 5.4 Consumer/Producer Lifecycle + +**Challenge:** Managing multiple consumers/producers efficiently + +**Solution:** +- Pool consumers/producers per topic +- Lazy initialization +- Proper disposal in connector cleanup + +--- + +## 6. Minimal Viable Path (MVP) + +### Phase 1: Core Implementation (Week 1) +- [ ] Create project structure +- [ ] Implement `KafkaBlob` and serialization +- [ ] Implement `KafkaConfiguration` +- [ ] Implement basic `KafkaConnector` (write/read/delete) +- [ ] Implement `KafkaPathValidator` + +### Phase 2: Index Management (Week 2) +- [ ] Implement `KafkaTopicIndex` +- [ ] Implement `KafkaFileSystemIndex` +- [ ] Add index synchronization logic +- [ ] Implement directory operations + +### Phase 3: Testing & Polish (Week 3) +- [ ] Unit tests for all components +- [ ] Integration tests with real Kafka +- [ ] Performance benchmarks +- [ ] Documentation and examples +- [ ] Error handling and edge cases + +--- + +## 7. Dependencies + +### 7.1 NuGet Packages Required + +```xml + +``` + +**License:** Apache License 2.0 ✅ Compatible with EPL 2.0 + +### 7.2 External Requirements + +- **Kafka Cluster:** Users must provide their own Kafka installation + - Apache Kafka 0.8+ + - Confluent Platform + - Confluent Cloud + - Amazon MSK + - Azure Event Hubs (Kafka-compatible) + +--- + +## 8. Performance Considerations + +### 8.1 Expected Performance + +- **Write Throughput:** 100K+ messages/sec (hardware dependent) +- **Read Latency:** Low (direct partition/offset seeks) +- **Blob Size:** 1MB chunks (configurable) +- **Compression:** Optional (Snappy, LZ4, Gzip, Zstd) + +### 8.2 Optimization Opportunities + +1. **Batching:** Confluent.Kafka handles automatic batching +2. **Compression:** Enable for network/storage efficiency +3. **Idempotent Producer:** Prevent duplicates +4. **Index Caching:** Cache blob metadata in memory +5. **Connection Pooling:** Reuse producers/consumers + +--- + +## 9. Risks & Mitigations + +| Risk | Impact | Probability | Mitigation | +|------|--------|-------------|------------| +| Kafka cluster unavailability | High | Medium | Implement retry logic, circuit breakers | +| Index corruption | High | Low | Implement index rebuild from data topics | +| Topic proliferation | Medium | High | Document topic management, cleanup strategies | +| Performance degradation | Medium | Medium | Benchmark early, optimize blob size | +| Version compatibility | Low | Low | Use stable Confluent.Kafka API | + +--- + +## 10. Recommendations + +### 10.1 Proceed with Implementation + +**Recommendation: YES** - The implementation is feasible and aligns with NebulaStore's architecture. + +### 10.2 Suggested Approach + +1. **Start with MVP:** Focus on core read/write/delete operations +2. **Test with Docker:** Use Kafka in Docker for development +3. **Benchmark Early:** Compare with other AFS adapters +4. **Document Thoroughly:** Kafka setup is more complex than other backends +5. **Consider Alternatives:** For simpler use cases, Redis or S3 may be better + +### 10.3 When to Use Kafka AFS + +**Good Fit:** +- Event-driven architectures already using Kafka +- Need for event streaming + persistence +- Multi-datacenter replication requirements +- Audit trail and time-travel capabilities + +**Poor Fit:** +- Simple local storage needs (use NIO) +- Cost-sensitive scenarios (Kafka infrastructure overhead) +- Small-scale applications (topic management overhead) + +--- + +## 11. Next Steps + +1. **Get Approval:** Confirm stakeholder interest in Kafka adapter +2. **Setup Development Environment:** Kafka cluster (Docker Compose) +3. **Create Feature Branch:** `feature/kafka-afs-adapter` +4. **Implement MVP:** Follow 3-week plan +5. **Create PR:** For peer review before merging + +--- + +## Appendix A: Eclipse Store Reference Files + +- **KafkaConnector.java:** https://github.com/eclipse-store/store/blob/main/afs/kafka/src/main/java/org/eclipse/store/afs/kafka/types/KafkaConnector.java +- **Blob.java:** https://github.com/eclipse-store/store/blob/main/afs/kafka/src/main/java/org/eclipse/store/afs/kafka/types/Blob.java +- **TopicIndex.java:** https://github.com/eclipse-store/store/blob/main/afs/kafka/src/main/java/org/eclipse/store/afs/kafka/types/TopicIndex.java + +## Appendix B: Confluent.Kafka Documentation + +- **Overview:** https://docs.confluent.io/kafka-clients/dotnet/current/overview.html +- **GitHub:** https://github.com/confluentinc/confluent-kafka-dotnet +- **API Docs:** https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/ + +--- + +**Document Version:** 1.0 +**Author:** NebulaStore Development Team +**Status:** Draft for Review + diff --git a/docs/KafkaAfsQuickStart.md b/docs/KafkaAfsQuickStart.md new file mode 100644 index 0000000..2b7b316 --- /dev/null +++ b/docs/KafkaAfsQuickStart.md @@ -0,0 +1,520 @@ +# Kafka AFS Quick Start Guide + +**Status:** Planned Feature (Not Yet Implemented) + +This guide will help you get started with the Kafka AFS adapter once it's implemented. + +--- + +## Prerequisites + +### 1. Kafka Cluster + +You need a running Kafka cluster. Choose one: + +**Option A: Local Development (Docker)** +```bash +# docker-compose.yml +version: '3' +services: + kafka: + image: confluentinc/cp-kafka:7.5.0 + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + +# Start Kafka +docker-compose up -d +``` + +**Option B: Confluent Cloud** +```bash +# Sign up at https://confluent.cloud +# Get bootstrap servers and API credentials +``` + +**Option C: AWS MSK** +```bash +# Create MSK cluster in AWS Console +# Get bootstrap servers +``` + +### 2. NuGet Package + +```bash +dotnet add package NebulaStore.Afs.Kafka +``` + +--- + +## Basic Usage + +### 1. Simple Configuration + +```csharp +using NebulaStore.Storage.Embedded; +using NebulaStore.Afs.Kafka; + +// Start with Kafka backend +using var storage = EmbeddedStorage.StartWithKafka( + bootstrapServers: "localhost:9092", + clientId: "my-app" +); + +// Use normally +var root = storage.Root(); +root.Value = "Hello, Kafka!"; +storage.StoreRoot(); +``` + +### 2. Advanced Configuration + +```csharp +using NebulaStore.Storage.Embedded; +using NebulaStore.Afs.Kafka; +using NebulaStore.Afs.Blobstore; +using Confluent.Kafka; + +// Configure Kafka +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "localhost:9092", + ClientId = "nebulastore-app", + + // Performance tuning + MaxMessageBytes = 1_000_000, // 1MB chunks + Compression = CompressionType.Snappy, + EnableIdempotence = true, + + // Timeouts + RequestTimeout = TimeSpan.FromMinutes(1), + + // Additional Kafka settings + AdditionalSettings = new Dictionary + { + ["acks"] = "all", + ["retries"] = "3", + ["max.in.flight.requests.per.connection"] = "5" + } +}; + +// Create connector +using var connector = KafkaConnector.New(kafkaConfig); + +// Create file system +using var fileSystem = BlobStoreFileSystem.New(connector); + +// Configure storage +var storageConfig = EmbeddedStorageConfiguration.New() + .SetStorageFileSystem(fileSystem) + .SetChannelCount(4) + .Build(); + +// Start storage +using var storage = EmbeddedStorage.Start(storageConfig); +``` + +--- + +## Configuration Options + +### KafkaConfiguration Properties + +| Property | Default | Description | +|----------|---------|-------------| +| `BootstrapServers` | `localhost:9092` | Kafka broker addresses | +| `ClientId` | `nebulastore-kafka` | Client identifier | +| `MaxMessageBytes` | `1000000` | Blob chunk size (1MB) | +| `RequestTimeout` | `1 minute` | Request timeout | +| `EnableIdempotence` | `true` | Prevent duplicate writes | +| `Compression` | `None` | Compression type | +| `AdditionalSettings` | `{}` | Extra Kafka settings | + +### Compression Types + +```csharp +CompressionType.None // No compression (fastest) +CompressionType.Gzip // Good compression, slower +CompressionType.Snappy // Balanced (recommended) +CompressionType.Lz4 // Fast compression +CompressionType.Zstd // Best compression +``` + +--- + +## Common Patterns + +### 1. Production Configuration + +```csharp +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "kafka1:9092,kafka2:9092,kafka3:9092", + ClientId = "nebulastore-prod", + + // Reliability + EnableIdempotence = true, + AdditionalSettings = new Dictionary + { + ["acks"] = "all", // Wait for all replicas + ["retries"] = "10", // Retry on failure + ["retry.backoff.ms"] = "100", // Backoff between retries + ["max.in.flight.requests.per.connection"] = "5" + }, + + // Performance + Compression = CompressionType.Snappy, + MaxMessageBytes = 1_000_000, + + // Timeouts + RequestTimeout = TimeSpan.FromMinutes(2) +}; +``` + +### 2. High-Throughput Configuration + +```csharp +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "localhost:9092", + + // Maximize throughput + AdditionalSettings = new Dictionary + { + ["batch.size"] = "32768", // 32KB batches + ["linger.ms"] = "10", // Wait 10ms for batching + ["compression.type"] = "lz4", // Fast compression + ["buffer.memory"] = "67108864" // 64MB buffer + } +}; +``` + +### 3. Low-Latency Configuration + +```csharp +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "localhost:9092", + + // Minimize latency + AdditionalSettings = new Dictionary + { + ["linger.ms"] = "0", // Send immediately + ["batch.size"] = "1", // No batching + ["compression.type"] = "none" // No compression + } +}; +``` + +### 4. Secure Connection (SSL/SASL) + +```csharp +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "kafka.example.com:9093", + + // SSL/SASL authentication + AdditionalSettings = new Dictionary + { + ["security.protocol"] = "SASL_SSL", + ["sasl.mechanism"] = "PLAIN", + ["sasl.username"] = "your-username", + ["sasl.password"] = "your-password", + ["ssl.ca.location"] = "/path/to/ca-cert" + } +}; +``` + +### 5. Confluent Cloud + +```csharp +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", + + AdditionalSettings = new Dictionary + { + ["security.protocol"] = "SASL_SSL", + ["sasl.mechanism"] = "PLAIN", + ["sasl.username"] = "your-api-key", + ["sasl.password"] = "your-api-secret" + } +}; +``` + +--- + +## Monitoring & Debugging + +### 1. Enable Logging + +```csharp +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "localhost:9092", + + AdditionalSettings = new Dictionary + { + ["debug"] = "broker,topic,msg" // Enable debug logging + } +}; +``` + +### 2. Check Topic Status + +```bash +# List topics +kafka-topics --bootstrap-server localhost:9092 --list + +# Describe topic +kafka-topics --bootstrap-server localhost:9092 --describe --topic storage_channel_000_file_000001 + +# Check consumer lag +kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group nebulastore-app +``` + +### 3. Monitor Metrics + +```csharp +// Access connector metrics (future feature) +var metrics = connector.GetMetrics(); +Console.WriteLine($"Messages produced: {metrics.MessagesProduced}"); +Console.WriteLine($"Messages consumed: {metrics.MessagesConsumed}"); +Console.WriteLine($"Topics created: {metrics.TopicsCreated}"); +``` + +--- + +## Troubleshooting + +### Problem: Connection Timeout + +``` +Error: Failed to connect to Kafka broker +``` + +**Solution:** +1. Check Kafka is running: `docker ps` or `telnet localhost 9092` +2. Verify bootstrap servers address +3. Check firewall/network settings +4. Increase `RequestTimeout` + +### Problem: Topic Creation Failed + +``` +Error: Failed to create topic +``` + +**Solution:** +1. Check Kafka broker has `auto.create.topics.enable=true` +2. Verify user has topic creation permissions +3. Check topic naming constraints (alphanumeric, `.`, `_`, `-` only) + +### Problem: Slow Performance + +``` +Warning: Write latency > 1 second +``` + +**Solution:** +1. Enable compression: `Compression = CompressionType.Snappy` +2. Increase batch size: `batch.size = 32768` +3. Add more Kafka brokers +4. Increase `MaxMessageBytes` for larger chunks +5. Check network latency + +### Problem: Index Corruption + +``` +Error: Blob metadata not found +``` + +**Solution:** +1. Rebuild index from data topics (future feature) +2. Check index topic exists: `___index` +3. Verify index topic has data + +--- + +## Best Practices + +### 1. Topic Management + +```csharp +// Use topic retention policies +var kafkaConfig = new KafkaConfiguration +{ + AdditionalSettings = new Dictionary + { + ["retention.ms"] = "604800000", // 7 days + ["retention.bytes"] = "1073741824" // 1GB per partition + } +}; +``` + +### 2. Error Handling + +```csharp +try +{ + storage.StoreRoot(); +} +catch (KafkaException ex) +{ + // Handle Kafka-specific errors + Console.WriteLine($"Kafka error: {ex.Error.Reason}"); + + // Retry logic + if (ex.Error.IsRetriable) + { + // Retry operation + } +} +``` + +### 3. Resource Cleanup + +```csharp +// Always dispose properly +using var connector = KafkaConnector.New(config); +using var fileSystem = BlobStoreFileSystem.New(connector); +using var storage = EmbeddedStorage.Start(storageConfig); + +// Or manual cleanup +try +{ + // Use storage +} +finally +{ + storage?.Dispose(); + fileSystem?.Dispose(); + connector?.Dispose(); +} +``` + +### 4. Testing + +```csharp +// Use Docker for integration tests +[Fact] +public async Task TestKafkaStorage() +{ + // Start Kafka container + await using var kafka = new KafkaContainer(); + await kafka.StartAsync(); + + // Configure storage + var config = new KafkaConfiguration + { + BootstrapServers = kafka.BootstrapServers + }; + + // Test storage operations + using var storage = EmbeddedStorage.StartWithKafka(config.BootstrapServers); + // ... test code +} +``` + +--- + +## Migration Guide + +### From NIO to Kafka + +```csharp +// 1. Export data from NIO +var nioStorage = EmbeddedStorage.Start("./nio-storage"); +var data = nioStorage.Root(); + +// 2. Import to Kafka +var kafkaStorage = EmbeddedStorage.StartWithKafka("localhost:9092"); +kafkaStorage.SetRoot(data); +kafkaStorage.StoreRoot(); +``` + +### From Redis to Kafka + +```csharp +// 1. Read from Redis +var redisStorage = EmbeddedStorage.StartWithRedis("localhost:6379"); +var data = redisStorage.Root(); + +// 2. Write to Kafka +var kafkaStorage = EmbeddedStorage.StartWithKafka("localhost:9092"); +kafkaStorage.SetRoot(data); +kafkaStorage.StoreRoot(); +``` + +--- + +## Performance Tuning + +### Benchmark Your Configuration + +```csharp +using System.Diagnostics; + +var sw = Stopwatch.StartNew(); + +// Write test +for (int i = 0; i < 10000; i++) +{ + storage.Store(new MyData { Id = i }); +} +storage.Commit(); + +sw.Stop(); +Console.WriteLine($"Write: {10000.0 / sw.Elapsed.TotalSeconds:F0} ops/sec"); + +// Read test +sw.Restart(); +for (int i = 0; i < 10000; i++) +{ + var data = storage.Root(); +} +sw.Stop(); +Console.WriteLine($"Read: {10000.0 / sw.Elapsed.TotalSeconds:F0} ops/sec"); +``` + +### Expected Performance + +| Operation | Throughput | Latency | +|-----------|------------|---------| +| Write | 10K-100K ops/sec | 5-20ms | +| Read | 5K-50K ops/sec | 10-50ms | +| Commit | 1K-10K ops/sec | 50-200ms | + +--- + +## Additional Resources + +- **Kafka Documentation:** https://kafka.apache.org/documentation/ +- **Confluent.Kafka Docs:** https://docs.confluent.io/kafka-clients/dotnet/ +- **NebulaStore AFS Guide:** /docs/KafkaAfsFeasibility.md +- **Comparison Guide:** /docs/KafkaAfsComparison.md + +--- + +## Support + +- **GitHub Issues:** https://github.com/hadv/NebulaStore/issues +- **Discussions:** https://github.com/hadv/NebulaStore/discussions +- **Kafka Community:** https://kafka.apache.org/community + +--- + +**Note:** This is a planned feature. Check the project roadmap for implementation status. + diff --git a/docs/KafkaAfsSummary.md b/docs/KafkaAfsSummary.md new file mode 100644 index 0000000..4dd2f59 --- /dev/null +++ b/docs/KafkaAfsSummary.md @@ -0,0 +1,357 @@ +# Kafka AFS Adapter - Executive Summary + +**Investigation Date:** 2025-10-05 +**Status:** ✅ **FEASIBLE - RECOMMENDED FOR IMPLEMENTATION** + +--- + +## Quick Answer + +**Can we port Eclipse Store's Kafka AFS adapter to NebulaStore using Confluent.Kafka?** + +**YES.** The Confluent.Kafka .NET library provides complete API parity with the Java kafka-clients library used by Eclipse Store. All required functionality (Producer, Consumer, AdminClient) is available and well-supported. + +--- + +## Key Findings + +### ✅ Technical Feasibility: **HIGH** + +1. **API Compatibility:** 100% - All Java Kafka APIs have .NET equivalents +2. **Library Maturity:** Confluent.Kafka is production-ready, actively maintained +3. **Architecture Alignment:** Fits perfectly with existing NebulaStore AFS patterns +4. **License Compatibility:** Apache 2.0 (compatible with EPL 2.0) + +### 📊 Implementation Complexity: **MEDIUM-HIGH** + +- **Estimated Development Time:** 2-3 weeks for MVP +- **Lines of Code:** ~2,000-3,000 (based on Eclipse Store implementation) +- **Key Components:** 6 main classes + tests + documentation +- **Dependencies:** Single NuGet package (Confluent.Kafka) + +### 💰 Cost-Benefit Analysis + +**Benefits:** +- Native event streaming integration +- Complete audit trail (immutable log) +- Time travel capabilities (offset-based) +- Multi-datacenter replication +- High throughput (100K+ messages/sec) +- Horizontal scalability + +**Costs:** +- Complex infrastructure (Kafka cluster required) +- Higher operational overhead +- Topic proliferation (one per file + index) +- Requires Kafka expertise +- Higher resource consumption vs. simpler backends + +--- + +## Architecture Overview + +### How It Works + +``` +File Write → Split into 1MB chunks → Kafka Topics → Index Topics + ↓ + Blob Metadata + (partition, offset, range) +``` + +**Key Design Elements:** + +1. **One Kafka topic per file** (e.g., `storage_channel_000_file_000001`) +2. **One index topic per file** (e.g., `__storage_channel_000_file_000001_index`) +3. **Files split into 1MB blobs** (configurable) +4. **Index stores blob metadata** (28 bytes per blob) +5. **File system index** tracks all files + +### Data Flow + +**Write Operation:** +``` +Application → KafkaConnector → IProducer → Kafka Topic + ↓ + KafkaTopicIndex → Index Topic +``` + +**Read Operation:** +``` +Application → KafkaConnector → KafkaTopicIndex (get blobs) + ↓ + IConsumer → Seek to offset → Read blob +``` + +--- + +## API Mapping: Java → .NET + +| Eclipse Store (Java) | NebulaStore (.NET) | Status | +|---------------------|-------------------|--------| +| `KafkaProducer` | `IProducer` | ✅ | +| `KafkaConsumer` | `IConsumer` | ✅ | +| `AdminClient` | `IAdminClient` | ✅ | +| `ProducerRecord` | `Message` | ✅ | +| `ConsumerRecord` | `ConsumeResult` | ✅ | +| `RecordMetadata` | `DeliveryResult` | ✅ | +| `Properties` | `ProducerConfig/ConsumerConfig` | ✅ | + +**Conclusion:** 100% API coverage + +--- + +## Implementation Roadmap + +### Phase 1: Core Implementation (Week 1) +- [ ] Project structure and configuration +- [ ] `KafkaBlob` record and serialization +- [ ] `KafkaConnector` basic operations (write/read/delete) +- [ ] `KafkaPathValidator` for topic naming + +### Phase 2: Index Management (Week 2) +- [ ] `KafkaTopicIndex` implementation +- [ ] `KafkaFileSystemIndex` implementation +- [ ] Index synchronization logic +- [ ] Directory operations + +### Phase 3: Testing & Polish (Week 3) +- [ ] Unit tests (90%+ coverage) +- [ ] Integration tests with Docker Kafka +- [ ] Performance benchmarks +- [ ] Documentation and examples +- [ ] Error handling and edge cases + +--- + +## When to Use Kafka AFS + +### ✅ **Good Fit:** + +1. **Event-Driven Architectures** + - You already use Kafka for messaging + - Storage operations need to be observable as events + - CQRS/Event Sourcing patterns + +2. **Audit & Compliance** + - Complete audit trail required + - Immutable storage log needed + - Regulatory compliance (SOX, GDPR, HIPAA) + +3. **Time Travel Requirements** + - Need to query historical state + - Point-in-time recovery + - Debugging production issues + +4. **Multi-Datacenter Deployments** + - Active-active replication + - Disaster recovery + - Geographic distribution + +5. **Streaming Integration** + - Real-time analytics on storage events + - Change data capture (CDC) + - Event-driven microservices + +### ❌ **Poor Fit:** + +1. **Simple Applications** + - Local development + - Single-server deployments + - Minimal infrastructure + +2. **Cost-Sensitive Scenarios** + - Kafka infrastructure overhead + - Operational costs + - Small-scale applications + +3. **No Kafka Expertise** + - Team lacks Kafka knowledge + - No operational support + - Limited resources + +4. **Small Files** + - Many small files (topic proliferation) + - Low throughput requirements + - Simple CRUD operations + +--- + +## Comparison with Other AFS Adapters + +| Criteria | NIO | Redis | S3 | Kafka | +|----------|-----|-------|-----|-------| +| Setup | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | +| Performance | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | +| Scalability | ⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | +| Event Streaming | ❌ | ❌ | ❌ | ⭐⭐⭐⭐⭐ | +| Audit Trail | ❌ | ❌ | ⭐⭐ | ⭐⭐⭐⭐⭐ | +| Operational Cost | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | + +**Recommendation:** Use Kafka when you need event streaming, audit trails, or already have Kafka infrastructure. Otherwise, use simpler backends (NIO for dev, Redis/S3 for production). + +--- + +## Risks & Mitigations + +| Risk | Impact | Mitigation | +|------|--------|------------| +| Kafka cluster unavailability | High | Retry logic, circuit breakers, fallback storage | +| Index corruption | High | Index rebuild from data topics, checksums | +| Topic proliferation | Medium | Topic cleanup policies, monitoring | +| Performance degradation | Medium | Benchmark early, optimize blob size, compression | +| Operational complexity | Medium | Documentation, Docker Compose for dev, managed Kafka | + +--- + +## Dependencies + +### Required NuGet Package +```xml + +``` + +### External Infrastructure +- **Apache Kafka 0.8+** (or compatible) + - Self-hosted Kafka cluster + - Confluent Cloud + - AWS MSK (Managed Streaming for Kafka) + - Azure Event Hubs (Kafka-compatible) + +--- + +## Performance Expectations + +### Throughput +- **Write:** 100,000+ messages/sec (hardware dependent) +- **Read:** 50,000+ messages/sec (with index caching) +- **Blob Size:** 1MB chunks (configurable) + +### Latency +- **Write:** 5-20ms (async batching) +- **Read:** 10-50ms (direct offset seeks) + +### Scalability +- **Horizontal:** Add more Kafka brokers +- **Vertical:** Increase broker resources +- **Partitions:** Parallel processing + +--- + +## Example Usage + +```csharp +using NebulaStore.Storage.Embedded; +using NebulaStore.Afs.Kafka; + +// Configure Kafka +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "localhost:9092", + ClientId = "nebulastore-app", + EnableIdempotence = true, + Compression = CompressionType.Snappy +}; + +// Create connector +using var connector = KafkaConnector.New(kafkaConfig); + +// Create file system +using var fileSystem = BlobStoreFileSystem.New(connector); + +// Use with EmbeddedStorage +var storageConfig = EmbeddedStorageConfiguration.New() + .SetStorageFileSystem(fileSystem) + .Build(); + +using var storage = EmbeddedStorage.Start(storageConfig); + +// Use normally +var root = storage.Root(); +root.Value = "Hello, Kafka!"; +storage.StoreRoot(); +``` + +--- + +## Recommendations + +### 1. **Proceed with Implementation** ✅ + +The Kafka AFS adapter is technically feasible and provides unique capabilities not available in other backends. It's a valuable addition to NebulaStore's AFS ecosystem. + +### 2. **Target Audience** + +- Teams already using Kafka +- Event-driven architectures +- Compliance-heavy industries +- Multi-datacenter deployments + +### 3. **Development Approach** + +1. Start with MVP (core read/write/delete) +2. Use Docker Compose for development Kafka +3. Benchmark against other AFS adapters +4. Document Kafka setup thoroughly +5. Provide example configurations + +### 4. **Documentation Priorities** + +- Kafka cluster setup guide +- Topic management best practices +- Performance tuning guide +- Troubleshooting common issues +- Migration from other backends + +### 5. **Future Enhancements** + +- Kafka Streams integration +- Schema Registry support +- Kafka Connect integration +- Metrics and monitoring +- Backup/restore utilities + +--- + +## Next Steps + +1. **Get Stakeholder Approval** - Confirm interest in Kafka adapter +2. **Setup Dev Environment** - Docker Compose with Kafka +3. **Create Feature Branch** - `feature/kafka-afs-adapter` +4. **Implement MVP** - Follow 3-week roadmap +5. **Create PR** - Peer review before merging +6. **Documentation** - Comprehensive setup guide +7. **Benchmarks** - Compare with other adapters + +--- + +## References + +### Eclipse Store +- **Kafka Connector:** https://github.com/eclipse-store/store/blob/main/afs/kafka/src/main/java/org/eclipse/store/afs/kafka/types/KafkaConnector.java +- **Blob:** https://github.com/eclipse-store/store/blob/main/afs/kafka/src/main/java/org/eclipse/store/afs/kafka/types/Blob.java +- **Topic Index:** https://github.com/eclipse-store/store/blob/main/afs/kafka/src/main/java/org/eclipse/store/afs/kafka/types/TopicIndex.java + +### Confluent.Kafka +- **Documentation:** https://docs.confluent.io/kafka-clients/dotnet/current/overview.html +- **GitHub:** https://github.com/confluentinc/confluent-kafka-dotnet +- **NuGet:** https://www.nuget.org/packages/Confluent.Kafka + +### NebulaStore +- **AFS Documentation:** /docs/KafkaAfsFeasibility.md +- **Comparison Guide:** /docs/KafkaAfsComparison.md + +--- + +## Conclusion + +**The Kafka AFS adapter is feasible, valuable, and recommended for implementation.** It fills a unique niche in NebulaStore's AFS ecosystem by enabling event-driven storage with audit trails and time travel capabilities. + +**Proceed with implementation** following the 3-week MVP roadmap, targeting teams with existing Kafka infrastructure and event-driven architecture requirements. + +--- + +**Document Status:** Final +**Approval Required:** Yes +**Next Review Date:** After MVP completion + From 59448e78c027ed65433f92a6fdc1452c1d6a4e2e Mon Sep 17 00:00:00 2001 From: prpeh Date: Sun, 5 Oct 2025 15:38:32 +0700 Subject: [PATCH 2/2] feat: Implement Kafka AFS adapter with core functionality - Add KafkaBlob record for blob metadata (28-byte serialization) - Add KafkaConfiguration with production/development presets - Add KafkaPathValidator for topic naming sanitization - Add KafkaTopicIndex for per-topic blob metadata management - Add KafkaConnector implementing IBlobStoreConnector - Implement read/write operations with blob chunking (1MB default) - Implement directory operations and file management - Add comprehensive unit tests (75 tests, 100% pass rate) - Add README with usage examples and configuration guide - Add example code demonstrating various use cases Architecture: - Files stored as Kafka topics with configurable chunk size - Blob metadata stored in separate index topics - Supports partial reads, truncation, and file operations - Uses Confluent.Kafka 2.6.1 for .NET Kafka client Testing: - Unit tests cover all core components without requiring Kafka - Tests validate serialization, configuration, and path validation - Integration tests can be added later when Kafka is available Note: This is Phase 1 (Core Implementation) of the 3-week MVP roadmap. Next phases will add advanced index management and integration tests. --- NebulaStore.sln | 36 ++ afs/kafka/NebulaStore.Afs.Kafka.csproj | 39 ++ afs/kafka/README.md | 319 +++++++++++ afs/kafka/examples/KafkaExample.cs | 293 ++++++++++ afs/kafka/src/KafkaBlob.cs | 208 +++++++ afs/kafka/src/KafkaConfiguration.cs | 231 ++++++++ afs/kafka/src/KafkaConnector.cs | 528 ++++++++++++++++++ afs/kafka/src/KafkaPathValidator.cs | 102 ++++ afs/kafka/src/KafkaTopicIndex.cs | 283 ++++++++++ afs/kafka/tests/KafkaBlobTests.cs | 261 +++++++++ afs/kafka/tests/KafkaConfigurationTests.cs | 315 +++++++++++ afs/kafka/tests/KafkaPathValidatorTests.cs | 295 ++++++++++ .../tests/NebulaStore.Afs.Kafka.Tests.csproj | 29 + 13 files changed, 2939 insertions(+) create mode 100644 afs/kafka/NebulaStore.Afs.Kafka.csproj create mode 100644 afs/kafka/README.md create mode 100644 afs/kafka/examples/KafkaExample.cs create mode 100644 afs/kafka/src/KafkaBlob.cs create mode 100644 afs/kafka/src/KafkaConfiguration.cs create mode 100644 afs/kafka/src/KafkaConnector.cs create mode 100644 afs/kafka/src/KafkaPathValidator.cs create mode 100644 afs/kafka/src/KafkaTopicIndex.cs create mode 100644 afs/kafka/tests/KafkaBlobTests.cs create mode 100644 afs/kafka/tests/KafkaConfigurationTests.cs create mode 100644 afs/kafka/tests/KafkaPathValidatorTests.cs create mode 100644 afs/kafka/tests/NebulaStore.Afs.Kafka.Tests.csproj diff --git a/NebulaStore.sln b/NebulaStore.sln index d846551..b73f99a 100644 --- a/NebulaStore.sln +++ b/NebulaStore.sln @@ -49,6 +49,14 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{1704E723-9 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NebulaStore.Afs.Nio.Tests", "afs\nio\test\NebulaStore.Afs.Nio.Tests.csproj", "{DA9E1E64-1245-4E0D-9C15-80980DAF1623}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kafka", "kafka", "{1BD88259-B7C9-540F-5B25-1E0ED5959ADD}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NebulaStore.Afs.Kafka", "afs\kafka\NebulaStore.Afs.Kafka.csproj", "{4FA9334E-6BC5-4344-9198-3E35A59960CE}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{E513C5C4-19AC-DE96-9AA2-AD595D283603}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NebulaStore.Afs.Kafka.Tests", "afs\kafka\tests\NebulaStore.Afs.Kafka.Tests.csproj", "{7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -203,6 +211,30 @@ Global {DA9E1E64-1245-4E0D-9C15-80980DAF1623}.Release|x64.Build.0 = Release|Any CPU {DA9E1E64-1245-4E0D-9C15-80980DAF1623}.Release|x86.ActiveCfg = Release|Any CPU {DA9E1E64-1245-4E0D-9C15-80980DAF1623}.Release|x86.Build.0 = Release|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Debug|x64.ActiveCfg = Debug|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Debug|x64.Build.0 = Debug|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Debug|x86.ActiveCfg = Debug|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Debug|x86.Build.0 = Debug|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Release|Any CPU.Build.0 = Release|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Release|x64.ActiveCfg = Release|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Release|x64.Build.0 = Release|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Release|x86.ActiveCfg = Release|Any CPU + {4FA9334E-6BC5-4344-9198-3E35A59960CE}.Release|x86.Build.0 = Release|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Debug|x64.ActiveCfg = Debug|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Debug|x64.Build.0 = Debug|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Debug|x86.ActiveCfg = Debug|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Debug|x86.Build.0 = Debug|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Release|Any CPU.Build.0 = Release|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Release|x64.ActiveCfg = Release|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Release|x64.Build.0 = Release|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Release|x86.ActiveCfg = Release|Any CPU + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -214,5 +246,9 @@ Global {1EDD44A6-7440-4D73-907E-5B89EF199012} = {01885DAA-D6B3-261D-26CC-40273AA28AE8} {1704E723-975B-8E7A-0A72-66948CF308B6} = {01885DAA-D6B3-261D-26CC-40273AA28AE8} {DA9E1E64-1245-4E0D-9C15-80980DAF1623} = {1704E723-975B-8E7A-0A72-66948CF308B6} + {1BD88259-B7C9-540F-5B25-1E0ED5959ADD} = {1213A069-D226-59B6-6383-4AD31C1DF055} + {4FA9334E-6BC5-4344-9198-3E35A59960CE} = {1BD88259-B7C9-540F-5B25-1E0ED5959ADD} + {E513C5C4-19AC-DE96-9AA2-AD595D283603} = {1BD88259-B7C9-540F-5B25-1E0ED5959ADD} + {7ADFE9C7-790E-4D55-BBD9-9D3B747C8ACA} = {E513C5C4-19AC-DE96-9AA2-AD595D283603} EndGlobalSection EndGlobal diff --git a/afs/kafka/NebulaStore.Afs.Kafka.csproj b/afs/kafka/NebulaStore.Afs.Kafka.csproj new file mode 100644 index 0000000..2860619 --- /dev/null +++ b/afs/kafka/NebulaStore.Afs.Kafka.csproj @@ -0,0 +1,39 @@ + + + + net9.0 + enable + enable + latest + NebulaStore.Afs.Kafka + + + $(DefaultItemExcludes);tests/**;examples/** + + + NebulaStore.Afs.Kafka + 1.0.0 + NebulaStore Contributors + NebulaStore + NebulaStore AFS Kafka Adapter + Apache Kafka adapter for NebulaStore Abstract File System (AFS). Provides event-driven storage with audit trails and time-travel capabilities. + nebulastore;kafka;afs;storage;event-sourcing;audit-trail + EPL-2.0 + https://github.com/hadv/NebulaStore + + + + + + + + + + + + + + + + + diff --git a/afs/kafka/README.md b/afs/kafka/README.md new file mode 100644 index 0000000..6b4c879 --- /dev/null +++ b/afs/kafka/README.md @@ -0,0 +1,319 @@ +# NebulaStore AFS Kafka Adapter + +Apache Kafka adapter for NebulaStore Abstract File System (AFS). Provides event-driven storage with built-in audit trails, time-travel capabilities, and multi-datacenter replication. + +## Overview + +The Kafka AFS adapter stores files as Kafka topics, enabling: +- **Event Streaming**: Storage operations become observable Kafka events +- **Audit Trail**: Complete immutable log of all changes +- **Time Travel**: Read data as it existed at any point in time +- **Multi-Datacenter Replication**: Built-in with Kafka MirrorMaker +- **High Throughput**: 100K+ messages/second + +## Architecture + +### Storage Model + +``` +File → Kafka Topic (data) + Index Topic (metadata) + ├── Data Topic: Contains file chunks (blobs) as Kafka messages + └── Index Topic: Contains blob metadata (partition, offset, range) +``` + +**Key Concepts:** +- Each file is stored as a Kafka topic +- Files are split into 1MB chunks (configurable) +- Each chunk is a Kafka message +- Blob metadata is stored in a separate index topic +- File system index tracks all files + +### Topic Naming + +File paths are converted to valid Kafka topic names: +- Path separators (`/`) → underscores (`_`) +- Invalid characters → underscores +- Example: `storage/channel_000/file_001` → `storage_channel_000_file_001` + +## Installation + +### NuGet Package + +```bash +dotnet add package NebulaStore.Afs.Kafka +``` + +### Prerequisites + +You need a running Kafka cluster: +- Apache Kafka 0.8+ +- Confluent Platform +- Confluent Cloud +- AWS MSK +- Azure Event Hubs (Kafka-compatible) + +## Usage + +### Basic Usage + +```csharp +using NebulaStore.Storage.Embedded; +using NebulaStore.Afs.Kafka; + +// Configure Kafka +var kafkaConfig = KafkaConfiguration.New("localhost:9092"); + +// Create connector +using var connector = KafkaConnector.New(kafkaConfig); + +// Create file system +using var fileSystem = BlobStoreFileSystem.New(connector); + +// Use with EmbeddedStorage +var storageConfig = EmbeddedStorageConfiguration.New() + .SetStorageFileSystem(fileSystem) + .Build(); + +using var storage = EmbeddedStorage.Start(storageConfig); +``` + +### Production Configuration + +```csharp +var kafkaConfig = KafkaConfiguration.Production( + bootstrapServers: "kafka1:9092,kafka2:9092,kafka3:9092", + clientId: "nebulastore-prod" +); + +kafkaConfig.Compression = CompressionType.Snappy; +kafkaConfig.MaxMessageBytes = 1_000_000; // 1MB chunks + +using var connector = KafkaConnector.New(kafkaConfig); +``` + +### Development Configuration + +```csharp +var kafkaConfig = KafkaConfiguration.Development("localhost:9092"); + +using var connector = KafkaConnector.New(kafkaConfig); +``` + +### Custom Configuration + +```csharp +var kafkaConfig = new KafkaConfiguration +{ + BootstrapServers = "localhost:9092", + ClientId = "my-app", + EnableIdempotence = true, + Compression = CompressionType.Snappy, + MaxMessageBytes = 2_000_000, // 2MB chunks + RequestTimeout = TimeSpan.FromMinutes(2), + + AdditionalSettings = new Dictionary + { + ["acks"] = "all", + ["retries"] = "10", + ["max.in.flight.requests.per.connection"] = "5" + } +}; + +using var connector = KafkaConnector.New(kafkaConfig); +``` + +## Configuration Options + +| Property | Default | Description | +|----------|---------|-------------| +| `BootstrapServers` | `localhost:9092` | Kafka broker addresses | +| `ClientId` | `nebulastore-kafka` | Client identifier | +| `MaxMessageBytes` | `1000000` | Blob chunk size (1MB) | +| `RequestTimeout` | `1 minute` | Request timeout | +| `EnableIdempotence` | `true` | Prevent duplicate writes | +| `Compression` | `None` | Compression type | +| `UseCache` | `true` | Enable metadata caching | +| `ConsumerGroupId` | `nebulastore-kafka-consumer` | Consumer group ID | +| `AdditionalSettings` | `{}` | Extra Kafka settings | + +## Features + +### Event Streaming + +Storage operations produce Kafka events that can be consumed by other systems: + +```csharp +// Storage writes produce events +storage.Store(customer); // → Kafka event + +// Other systems can consume these events +var consumer = new KafkaConsumer(config); +consumer.Subscribe("customer-data-topic"); +// Process storage events in real-time +``` + +### Audit Trail + +Every write is immutable and logged: + +```csharp +// All changes are tracked in Kafka +storage.Store(data); // Logged +storage.Store(data); // Logged again + +// Query audit trail via Kafka consumer +``` + +### Time Travel + +Read data as it existed at any point in time (via Kafka offsets): + +```csharp +// Read current state +var currentData = storage.Root(); + +// Read historical state (requires custom implementation) +// var historicalData = ReadAtOffset(topic, offsetOneHourAgo); +``` + +## Performance + +### Expected Throughput + +- **Write:** 10K-100K ops/sec +- **Read:** 5K-50K ops/sec +- **Latency:** 5-50ms + +### Optimization Tips + +1. **Enable Compression:** + ```csharp + config.Compression = CompressionType.Snappy; + ``` + +2. **Tune Chunk Size:** + ```csharp + config.MaxMessageBytes = 2_000_000; // 2MB for larger files + ``` + +3. **Batch Writes:** + ```csharp + // Kafka automatically batches, but you can tune: + config.AdditionalSettings["batch.size"] = "32768"; + config.AdditionalSettings["linger.ms"] = "10"; + ``` + +4. **Use Caching:** + ```csharp + config.UseCache = true; // Default + ``` + +## Limitations + +### Current Implementation + +- **No Partial Blob Deletion**: Deleting individual blobs requires rewriting remaining data +- **Synchronous Index Loading**: Index is loaded synchronously on first access +- **Simple File System Index**: In-memory only, not persisted to Kafka + +### Future Enhancements + +- Async index loading +- Persistent file system index in Kafka +- Kafka Streams integration +- Schema Registry support +- Metrics and monitoring + +## When to Use + +### ✅ Good Fit + +- Event-driven architectures +- Audit trail requirements +- Time travel capabilities needed +- Multi-datacenter deployments +- Already using Kafka infrastructure +- Streaming analytics integration + +### ❌ Poor Fit + +- Simple local storage needs (use NIO) +- Cost-sensitive scenarios +- Small-scale applications +- No Kafka expertise + +## Comparison with Other AFS Adapters + +| Feature | NIO | Redis | S3 | **Kafka** | +|---------|-----|-------|-----|-----------| +| Setup | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | +| Performance | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | +| Scalability | ⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | +| Event Streaming | ❌ | ❌ | ❌ | ⭐⭐⭐⭐⭐ | +| Audit Trail | ❌ | ❌ | ⭐⭐ | ⭐⭐⭐⭐⭐ | + +## Troubleshooting + +### Connection Issues + +``` +Error: Failed to connect to Kafka broker +``` + +**Solution:** +1. Check Kafka is running +2. Verify bootstrap servers address +3. Check firewall/network settings +4. Increase `RequestTimeout` + +### Topic Creation Failed + +``` +Error: Failed to create topic +``` + +**Solution:** +1. Check `auto.create.topics.enable=true` on broker +2. Verify user has topic creation permissions +3. Check topic naming constraints + +### Slow Performance + +``` +Warning: Write latency > 1 second +``` + +**Solution:** +1. Enable compression +2. Increase batch size +3. Add more Kafka brokers +4. Increase `MaxMessageBytes` + +## Examples + +See the `examples/` directory for complete examples: +- Basic usage +- Production configuration +- Event streaming integration +- Custom serialization + +## License + +Eclipse Public License 2.0 (EPL-2.0) + +## Dependencies + +- **Confluent.Kafka** (2.6.1) - Apache License 2.0 +- **MessagePack** (2.5.172) - MIT License + +## References + +- [Confluent.Kafka Documentation](https://docs.confluent.io/kafka-clients/dotnet/) +- [Apache Kafka Documentation](https://kafka.apache.org/documentation/) +- [NebulaStore AFS Guide](../../docs/KafkaAfsFeasibility.md) + +## Support + +- GitHub Issues: https://github.com/hadv/NebulaStore/issues +- Discussions: https://github.com/hadv/NebulaStore/discussions + diff --git a/afs/kafka/examples/KafkaExample.cs b/afs/kafka/examples/KafkaExample.cs new file mode 100644 index 0000000..8b004f8 --- /dev/null +++ b/afs/kafka/examples/KafkaExample.cs @@ -0,0 +1,293 @@ +using System; +using NebulaStore.Afs.Kafka; +using NebulaStore.Afs.Blobstore; +using Confluent.Kafka; + +namespace NebulaStore.Afs.Kafka.Examples; + +/// +/// Examples demonstrating Kafka AFS adapter usage. +/// +public class KafkaExample +{ + /// + /// Basic usage example. + /// + public static void BasicUsage() + { + Console.WriteLine("=== Basic Kafka AFS Usage ===\n"); + + // Configure Kafka + var config = KafkaConfiguration.New("localhost:9092"); + + // Create connector + using var connector = KafkaConnector.New(config); + + // Create file system + using var fileSystem = BlobStoreFileSystem.New(connector); + + // Create a path + var path = BlobStorePath.New("my-container", "data", "example.txt"); + + // Write data + var data = System.Text.Encoding.UTF8.GetBytes("Hello, Kafka AFS!"); + var bytesWritten = fileSystem.IoHandler.WriteData(path, new[] { data }); + Console.WriteLine($"Wrote {bytesWritten} bytes to {path.FullQualifiedName}"); + + // Read data + var readData = fileSystem.IoHandler.ReadData(path, 0, -1); + var content = System.Text.Encoding.UTF8.GetString(readData); + Console.WriteLine($"Read: {content}"); + + // Get file size + var size = fileSystem.IoHandler.GetFileSize(path); + Console.WriteLine($"File size: {size} bytes"); + + // Delete file + fileSystem.IoHandler.DeleteFile(path); + Console.WriteLine($"Deleted {path.FullQualifiedName}"); + } + + /// + /// Production configuration example. + /// + public static void ProductionConfiguration() + { + Console.WriteLine("\n=== Production Configuration ===\n"); + + var config = KafkaConfiguration.Production( + bootstrapServers: "kafka1:9092,kafka2:9092,kafka3:9092", + clientId: "nebulastore-prod" + ); + + // Customize settings + config.Compression = CompressionType.Snappy; + config.MaxMessageBytes = 2_000_000; // 2MB chunks + config.RequestTimeout = TimeSpan.FromMinutes(2); + + Console.WriteLine($"Bootstrap Servers: {config.BootstrapServers}"); + Console.WriteLine($"Client ID: {config.ClientId}"); + Console.WriteLine($"Compression: {config.Compression}"); + Console.WriteLine($"Max Message Bytes: {config.MaxMessageBytes}"); + Console.WriteLine($"Idempotence: {config.EnableIdempotence}"); + } + + /// + /// Development configuration example. + /// + public static void DevelopmentConfiguration() + { + Console.WriteLine("\n=== Development Configuration ===\n"); + + var config = KafkaConfiguration.Development(); + + Console.WriteLine($"Bootstrap Servers: {config.BootstrapServers}"); + Console.WriteLine($"Client ID: {config.ClientId}"); + Console.WriteLine($"Compression: {config.Compression}"); + Console.WriteLine($"Cache Enabled: {config.UseCache}"); + } + + /// + /// Custom configuration example. + /// + public static void CustomConfiguration() + { + Console.WriteLine("\n=== Custom Configuration ===\n"); + + var config = new KafkaConfiguration + { + BootstrapServers = "localhost:9092", + ClientId = "my-custom-app", + EnableIdempotence = true, + Compression = CompressionType.Lz4, + MaxMessageBytes = 1_500_000, + + AdditionalSettings = new System.Collections.Generic.Dictionary + { + ["acks"] = "all", + ["retries"] = "5", + ["batch.size"] = "32768", + ["linger.ms"] = "10" + } + }; + + // Validate configuration + config.Validate(); + + Console.WriteLine("Configuration validated successfully"); + Console.WriteLine($"Additional settings: {config.AdditionalSettings.Count}"); + } + + /// + /// Large file handling example. + /// + public static void LargeFileHandling() + { + Console.WriteLine("\n=== Large File Handling ===\n"); + + var config = KafkaConfiguration.New("localhost:9092"); + config.MaxMessageBytes = 1_000_000; // 1MB chunks + + using var connector = KafkaConnector.New(config); + using var fileSystem = BlobStoreFileSystem.New(connector); + + var path = BlobStorePath.New("my-container", "large-file.dat"); + + // Create 5MB of data + var largeData = new byte[5_000_000]; + new Random().NextBytes(largeData); + + Console.WriteLine($"Writing {largeData.Length:N0} bytes..."); + var bytesWritten = fileSystem.IoHandler.WriteData(path, new[] { largeData }); + Console.WriteLine($"Wrote {bytesWritten:N0} bytes"); + + // Read back + Console.WriteLine("Reading data back..."); + var readData = fileSystem.IoHandler.ReadData(path, 0, -1); + Console.WriteLine($"Read {readData.Length:N0} bytes"); + + // Verify + var match = true; + for (int i = 0; i < largeData.Length; i++) + { + if (largeData[i] != readData[i]) + { + match = false; + break; + } + } + + Console.WriteLine($"Data integrity: {(match ? "PASS" : "FAIL")}"); + + // Cleanup + fileSystem.IoHandler.DeleteFile(path); + } + + /// + /// Partial read example. + /// + public static void PartialRead() + { + Console.WriteLine("\n=== Partial Read ===\n"); + + var config = KafkaConfiguration.New("localhost:9092"); + using var connector = KafkaConnector.New(config); + using var fileSystem = BlobStoreFileSystem.New(connector); + + var path = BlobStorePath.New("my-container", "partial-read.txt"); + + // Write data + var data = System.Text.Encoding.UTF8.GetBytes("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"); + fileSystem.IoHandler.WriteData(path, new[] { data }); + + // Read first 10 bytes + var first10 = fileSystem.IoHandler.ReadData(path, 0, 10); + Console.WriteLine($"First 10 bytes: {System.Text.Encoding.UTF8.GetString(first10)}"); + + // Read middle 10 bytes + var middle10 = fileSystem.IoHandler.ReadData(path, 10, 10); + Console.WriteLine($"Middle 10 bytes: {System.Text.Encoding.UTF8.GetString(middle10)}"); + + // Read last 10 bytes + var last10 = fileSystem.IoHandler.ReadData(path, 26, 10); + Console.WriteLine($"Last 10 bytes: {System.Text.Encoding.UTF8.GetString(last10)}"); + + // Cleanup + fileSystem.IoHandler.DeleteFile(path); + } + + /// + /// Directory operations example. + /// + public static void DirectoryOperations() + { + Console.WriteLine("\n=== Directory Operations ===\n"); + + var config = KafkaConfiguration.New("localhost:9092"); + using var connector = KafkaConnector.New(config); + using var fileSystem = BlobStoreFileSystem.New(connector); + + // Create files in a directory structure + var files = new[] + { + BlobStorePath.New("my-container", "dir1", "file1.txt"), + BlobStorePath.New("my-container", "dir1", "file2.txt"), + BlobStorePath.New("my-container", "dir1", "subdir", "file3.txt"), + BlobStorePath.New("my-container", "dir2", "file4.txt") + }; + + foreach (var file in files) + { + var data = System.Text.Encoding.UTF8.GetBytes($"Content of {file.FullQualifiedName}"); + fileSystem.IoHandler.WriteData(file, new[] { data }); + Console.WriteLine($"Created: {file.FullQualifiedName}"); + } + + // List directory contents + var dir1 = BlobStorePath.New("my-container", "dir1"); + Console.WriteLine($"\nContents of {dir1.FullQualifiedName}:"); + + var visitor = new ConsolePathVisitor(); + fileSystem.IoHandler.VisitChildren(dir1, visitor); + + // Check if directory is empty + var emptyDir = BlobStorePath.New("my-container", "empty-dir"); + var isEmpty = fileSystem.IoHandler.IsEmpty(emptyDir); + Console.WriteLine($"\n{emptyDir.FullQualifiedName} is empty: {isEmpty}"); + + // Cleanup + foreach (var file in files) + { + fileSystem.IoHandler.DeleteFile(file); + } + } + + /// + /// Simple path visitor for console output. + /// + private class ConsolePathVisitor : IBlobStorePathVisitor + { + public void VisitDirectory(BlobStorePath parent, string directoryName) + { + Console.WriteLine($" [DIR] {directoryName}"); + } + + public void VisitFile(BlobStorePath parent, string fileName) + { + Console.WriteLine($" [FILE] {fileName}"); + } + } + + /// + /// Run all examples. + /// + public static void Main(string[] args) + { + Console.WriteLine("Kafka AFS Examples"); + Console.WriteLine("==================\n"); + + try + { + // Note: These examples require a running Kafka instance + Console.WriteLine("Note: These examples require Kafka running at localhost:9092\n"); + + BasicUsage(); + ProductionConfiguration(); + DevelopmentConfiguration(); + CustomConfiguration(); + + // Uncomment to run examples that require Kafka: + // LargeFileHandling(); + // PartialRead(); + // DirectoryOperations(); + + Console.WriteLine("\n=== All Examples Completed ==="); + } + catch (Exception ex) + { + Console.WriteLine($"\nError: {ex.Message}"); + Console.WriteLine("\nMake sure Kafka is running at localhost:9092"); + } + } +} + diff --git a/afs/kafka/src/KafkaBlob.cs b/afs/kafka/src/KafkaBlob.cs new file mode 100644 index 0000000..95d39f6 --- /dev/null +++ b/afs/kafka/src/KafkaBlob.cs @@ -0,0 +1,208 @@ +using System; + +namespace NebulaStore.Afs.Kafka; + +/// +/// Represents metadata for a blob stored in Kafka. +/// Each blob corresponds to a Kafka message containing a chunk of file data. +/// +/// +/// Blobs are serialized to 28 bytes for storage in the index topic: +/// - partition (4 bytes) +/// - offset (8 bytes) +/// - start position (8 bytes) +/// - end position (8 bytes) +/// +public record KafkaBlob +{ + /// + /// Gets the Kafka topic name where this blob is stored. + /// + public required string Topic { get; init; } + + /// + /// Gets the Kafka partition number. + /// + public int Partition { get; init; } + + /// + /// Gets the Kafka offset within the partition. + /// + public long Offset { get; init; } + + /// + /// Gets the logical start position of this blob within the file. + /// + public long Start { get; init; } + + /// + /// Gets the logical end position of this blob within the file (inclusive). + /// + public long End { get; init; } + + /// + /// Gets the size of this blob in bytes. + /// + public long Size => End - Start + 1; + + /// + /// Creates a new KafkaBlob instance. + /// + /// The Kafka topic name + /// The partition number + /// The offset within the partition + /// The logical start position in the file + /// The logical end position in the file (inclusive) + /// A new KafkaBlob instance + /// Thrown if parameters are invalid + public static KafkaBlob New(string topic, int partition, long offset, long start, long end) + { + if (string.IsNullOrWhiteSpace(topic)) + throw new ArgumentException("Topic cannot be null or empty", nameof(topic)); + + if (partition < 0) + throw new ArgumentException("Partition must be non-negative", nameof(partition)); + + if (offset < 0) + throw new ArgumentException("Offset must be non-negative", nameof(offset)); + + if (start < 0) + throw new ArgumentException("Start position must be non-negative", nameof(start)); + + if (end < start) + throw new ArgumentException("End position must be >= start position", nameof(end)); + + return new KafkaBlob + { + Topic = topic, + Partition = partition, + Offset = offset, + Start = start, + End = end + }; + } + + /// + /// Serializes this blob to a 28-byte array for storage in the index topic. + /// + /// A 28-byte array containing the blob metadata + public byte[] ToBytes() + { + var bytes = new byte[28]; + + // Write partition (4 bytes, big-endian) + bytes[0] = (byte)(Partition >> 24); + bytes[1] = (byte)(Partition >> 16); + bytes[2] = (byte)(Partition >> 8); + bytes[3] = (byte)Partition; + + // Write offset (8 bytes, big-endian) + bytes[4] = (byte)(Offset >> 56); + bytes[5] = (byte)(Offset >> 48); + bytes[6] = (byte)(Offset >> 40); + bytes[7] = (byte)(Offset >> 32); + bytes[8] = (byte)(Offset >> 24); + bytes[9] = (byte)(Offset >> 16); + bytes[10] = (byte)(Offset >> 8); + bytes[11] = (byte)Offset; + + // Write start (8 bytes, big-endian) + bytes[12] = (byte)(Start >> 56); + bytes[13] = (byte)(Start >> 48); + bytes[14] = (byte)(Start >> 40); + bytes[15] = (byte)(Start >> 32); + bytes[16] = (byte)(Start >> 24); + bytes[17] = (byte)(Start >> 16); + bytes[18] = (byte)(Start >> 8); + bytes[19] = (byte)Start; + + // Write end (8 bytes, big-endian) + bytes[20] = (byte)(End >> 56); + bytes[21] = (byte)(End >> 48); + bytes[22] = (byte)(End >> 40); + bytes[23] = (byte)(End >> 32); + bytes[24] = (byte)(End >> 24); + bytes[25] = (byte)(End >> 16); + bytes[26] = (byte)(End >> 8); + bytes[27] = (byte)End; + + return bytes; + } + + /// + /// Deserializes a KafkaBlob from a 28-byte array. + /// + /// The topic name for this blob + /// The 28-byte array containing blob metadata + /// A KafkaBlob instance + /// Thrown if bytes array is not 28 bytes + public static KafkaBlob FromBytes(string topic, byte[] bytes) + { + if (bytes == null || bytes.Length != 28) + throw new ArgumentException("Blob metadata must be exactly 28 bytes", nameof(bytes)); + + // Read partition (4 bytes, big-endian) + int partition = (bytes[0] << 24) | (bytes[1] << 16) | (bytes[2] << 8) | bytes[3]; + + // Read offset (8 bytes, big-endian) + long offset = ((long)bytes[4] << 56) | ((long)bytes[5] << 48) | ((long)bytes[6] << 40) | ((long)bytes[7] << 32) | + ((long)bytes[8] << 24) | ((long)bytes[9] << 16) | ((long)bytes[10] << 8) | bytes[11]; + + // Read start (8 bytes, big-endian) + long start = ((long)bytes[12] << 56) | ((long)bytes[13] << 48) | ((long)bytes[14] << 40) | ((long)bytes[15] << 32) | + ((long)bytes[16] << 24) | ((long)bytes[17] << 16) | ((long)bytes[18] << 8) | bytes[19]; + + // Read end (8 bytes, big-endian) + long end = ((long)bytes[20] << 56) | ((long)bytes[21] << 48) | ((long)bytes[22] << 40) | ((long)bytes[23] << 32) | + ((long)bytes[24] << 24) | ((long)bytes[25] << 16) | ((long)bytes[26] << 8) | bytes[27]; + + return new KafkaBlob + { + Topic = topic, + Partition = partition, + Offset = offset, + Start = start, + End = end + }; + } + + /// + /// Checks if this blob contains the specified file position. + /// + /// The file position to check + /// True if this blob contains the position + public bool Contains(long position) + { + return position >= Start && position <= End; + } + + /// + /// Checks if this blob overlaps with the specified range. + /// + /// The start of the range + /// The end of the range (inclusive) + /// True if this blob overlaps with the range + public bool Overlaps(long rangeStart, long rangeEnd) + { + return Start <= rangeEnd && End >= rangeStart; + } + + /// + /// Gets the offset within this blob for a given file position. + /// + /// The file position + /// The offset within this blob's data + public long GetBlobOffset(long filePosition) + { + if (!Contains(filePosition)) + throw new ArgumentException($"Position {filePosition} is not within blob range [{Start}, {End}]", nameof(filePosition)); + + return filePosition - Start; + } + + public override string ToString() + { + return $"KafkaBlob[Topic={Topic}, Partition={Partition}, Offset={Offset}, Range=[{Start}, {End}], Size={Size}]"; + } +} + diff --git a/afs/kafka/src/KafkaConfiguration.cs b/afs/kafka/src/KafkaConfiguration.cs new file mode 100644 index 0000000..b5692c0 --- /dev/null +++ b/afs/kafka/src/KafkaConfiguration.cs @@ -0,0 +1,231 @@ +using System; +using System.Collections.Generic; +using Confluent.Kafka; + +namespace NebulaStore.Afs.Kafka; + +/// +/// Configuration for Kafka AFS connector. +/// +public class KafkaConfiguration +{ + /// + /// Gets or sets the Kafka bootstrap servers (comma-separated list). + /// + /// localhost:9092 + /// kafka1:9092,kafka2:9092,kafka3:9092 + public string BootstrapServers { get; set; } = "localhost:9092"; + + /// + /// Gets or sets the client ID for this application. + /// + public string ClientId { get; set; } = "nebulastore-kafka"; + + /// + /// Gets or sets the maximum message size in bytes. + /// This determines the blob chunk size. + /// + /// + /// Default is 1MB (1,000,000 bytes). Larger values reduce the number of + /// Kafka messages but may impact performance. Must not exceed Kafka's + /// max.message.bytes broker setting. + /// + public int MaxMessageBytes { get; set; } = 1_000_000; // 1MB + + /// + /// Gets or sets the request timeout. + /// + public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromMinutes(1); + + /// + /// Gets or sets whether to enable idempotent producer. + /// + /// + /// When enabled, the producer ensures exactly-once delivery semantics. + /// Recommended for production use. + /// + public bool EnableIdempotence { get; set; } = true; + + /// + /// Gets or sets the compression type for messages. + /// + public CompressionType Compression { get; set; } = CompressionType.None; + + /// + /// Gets or sets whether to use caching for metadata. + /// + public bool UseCache { get; set; } = true; + + /// + /// Gets or sets the consumer group ID. + /// + /// + /// Used for index topic consumption. Each connector instance should have + /// a unique group ID to ensure all messages are consumed. + /// + public string ConsumerGroupId { get; set; } = "nebulastore-kafka-consumer"; + + /// + /// Gets or sets additional Kafka configuration settings. + /// + /// + /// These settings are passed directly to the Kafka producer/consumer. + /// See Kafka documentation for available settings. + /// + public Dictionary AdditionalSettings { get; set; } = new(); + + /// + /// Creates a ProducerConfig from this configuration. + /// + /// A ProducerConfig instance + public ProducerConfig ToProducerConfig() + { + var config = new ProducerConfig + { + BootstrapServers = BootstrapServers, + ClientId = ClientId, + EnableIdempotence = EnableIdempotence, + CompressionType = Compression, + RequestTimeoutMs = (int)RequestTimeout.TotalMilliseconds, + MessageMaxBytes = MaxMessageBytes, + + // Reliability settings + Acks = Acks.All, // Wait for all in-sync replicas + MessageSendMaxRetries = 10, + RetryBackoffMs = 100, + }; + + // Apply additional settings + foreach (var kvp in AdditionalSettings) + { + config.Set(kvp.Key, kvp.Value); + } + + return config; + } + + /// + /// Creates a ConsumerConfig from this configuration. + /// + /// A ConsumerConfig instance + public ConsumerConfig ToConsumerConfig() + { + var config = new ConsumerConfig + { + BootstrapServers = BootstrapServers, + ClientId = ClientId, + GroupId = ConsumerGroupId, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false, // Manual commit for better control + MaxPollIntervalMs = (int)RequestTimeout.TotalMilliseconds, + }; + + // Apply additional settings + foreach (var kvp in AdditionalSettings) + { + config.Set(kvp.Key, kvp.Value); + } + + return config; + } + + /// + /// Creates an AdminClientConfig from this configuration. + /// + /// An AdminClientConfig instance + public AdminClientConfig ToAdminConfig() + { + var config = new AdminClientConfig + { + BootstrapServers = BootstrapServers, + ClientId = ClientId, + }; + + // Apply additional settings + foreach (var kvp in AdditionalSettings) + { + config.Set(kvp.Key, kvp.Value); + } + + return config; + } + + /// + /// Validates the configuration. + /// + /// Thrown if configuration is invalid + public void Validate() + { + if (string.IsNullOrWhiteSpace(BootstrapServers)) + throw new InvalidOperationException("BootstrapServers cannot be null or empty"); + + if (string.IsNullOrWhiteSpace(ClientId)) + throw new InvalidOperationException("ClientId cannot be null or empty"); + + if (MaxMessageBytes <= 0) + throw new InvalidOperationException("MaxMessageBytes must be positive"); + + if (MaxMessageBytes > 10_000_000) // 10MB sanity check + throw new InvalidOperationException("MaxMessageBytes should not exceed 10MB"); + + if (RequestTimeout <= TimeSpan.Zero) + throw new InvalidOperationException("RequestTimeout must be positive"); + } + + /// + /// Creates a new KafkaConfiguration with default settings. + /// + /// The Kafka bootstrap servers + /// A new KafkaConfiguration instance + public static KafkaConfiguration New(string bootstrapServers) + { + return new KafkaConfiguration + { + BootstrapServers = bootstrapServers + }; + } + + /// + /// Creates a new KafkaConfiguration for production use. + /// + /// The Kafka bootstrap servers + /// The client ID + /// A new KafkaConfiguration instance with production settings + public static KafkaConfiguration Production(string bootstrapServers, string clientId) + { + return new KafkaConfiguration + { + BootstrapServers = bootstrapServers, + ClientId = clientId, + EnableIdempotence = true, + Compression = CompressionType.Snappy, + UseCache = true, + AdditionalSettings = new Dictionary + { + ["acks"] = "all", + ["retries"] = "10", + ["retry.backoff.ms"] = "100", + ["max.in.flight.requests.per.connection"] = "5" + } + }; + } + + /// + /// Creates a new KafkaConfiguration for development use. + /// + /// The Kafka bootstrap servers (default: localhost:9092) + /// A new KafkaConfiguration instance with development settings + public static KafkaConfiguration Development(string bootstrapServers = "localhost:9092") + { + return new KafkaConfiguration + { + BootstrapServers = bootstrapServers, + ClientId = "nebulastore-dev", + EnableIdempotence = false, // Faster for development + Compression = CompressionType.None, + UseCache = false, // Easier debugging + RequestTimeout = TimeSpan.FromSeconds(30) + }; + } +} + diff --git a/afs/kafka/src/KafkaConnector.cs b/afs/kafka/src/KafkaConnector.cs new file mode 100644 index 0000000..c53aee6 --- /dev/null +++ b/afs/kafka/src/KafkaConnector.cs @@ -0,0 +1,528 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using NebulaStore.Afs.Blobstore; + +namespace NebulaStore.Afs.Kafka; + +/// +/// Kafka implementation of IBlobStoreConnector. +/// Stores files as Kafka topics with blob metadata in index topics. +/// +/// +/// Architecture: +/// - Each file is stored as a Kafka topic +/// - Files are split into chunks (blobs) of configurable size (default 1MB) +/// - Each blob is a Kafka message +/// - Blob metadata (partition, offset, range) is stored in an index topic +/// - A file system index tracks all files +/// +/// Example: +/// +/// var config = KafkaConfiguration.New("localhost:9092"); +/// using var connector = KafkaConnector.New(config); +/// using var fileSystem = BlobStoreFileSystem.New(connector); +/// +/// +public class KafkaConnector : BlobStoreConnectorBase +{ + private readonly KafkaConfiguration _configuration; + private readonly ConcurrentDictionary _topicIndices; + private readonly ConcurrentDictionary> _producers; + private readonly ConcurrentDictionary> _consumers; + private readonly HashSet _knownFiles; // Simple file system index + private readonly object _fileSystemIndexLock = new(); + private bool _disposed; + + /// + /// Initializes a new instance of the KafkaConnector class. + /// + /// The Kafka configuration + private KafkaConnector(KafkaConfiguration configuration) + { + _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); + _configuration.Validate(); + + _topicIndices = new ConcurrentDictionary(); + _producers = new ConcurrentDictionary>(); + _consumers = new ConcurrentDictionary>(); + _knownFiles = new HashSet(); + } + + /// + /// Creates a new KafkaConnector instance. + /// + /// The Kafka configuration + /// A new KafkaConnector instance + public static KafkaConnector New(KafkaConfiguration configuration) + { + return new KafkaConnector(configuration); + } + + /// + /// Gets the topic index for a file. + /// + /// The file path + /// The topic index + private KafkaTopicIndex GetTopicIndex(BlobStorePath file) + { + var topicName = KafkaPathValidator.ToTopicName(file); + return _topicIndices.GetOrAdd(topicName, _ => KafkaTopicIndex.New(topicName, _configuration)); + } + + /// + /// Gets or creates a producer for a topic. + /// + /// The topic name + /// A Kafka producer + private IProducer GetProducer(string topicName) + { + return _producers.GetOrAdd(topicName, _ => + { + var config = _configuration.ToProducerConfig(); + return new ProducerBuilder(config).Build(); + }); + } + + /// + /// Gets or creates a consumer for a topic. + /// + /// The topic name + /// A Kafka consumer + private IConsumer GetConsumer(string topicName) + { + return _consumers.GetOrAdd(topicName, _ => + { + var config = _configuration.ToConsumerConfig(); + config.GroupId = $"{_configuration.ConsumerGroupId}-{topicName}"; + return new ConsumerBuilder(config).Build(); + }); + } + + #region IBlobStoreConnector Implementation + + public override long GetFileSize(BlobStorePath file) + { + EnsureNotDisposed(); + + var index = GetTopicIndex(file); + var blobs = index.GetBlobs().ToList(); + + if (blobs.Count == 0) + return 0; + + // File size is the end position of the last blob + 1 + return blobs.Max(b => b.End) + 1; + } + + public override bool DirectoryExists(BlobStorePath directory) + { + EnsureNotDisposed(); + + // In Kafka, directories are virtual - they exist if any files exist under them + lock (_fileSystemIndexLock) + { + var prefix = directory.FullQualifiedName; + return _knownFiles.Any(f => f.StartsWith(prefix)); + } + } + + public override bool FileExists(BlobStorePath file) + { + EnsureNotDisposed(); + + lock (_fileSystemIndexLock) + { + return _knownFiles.Contains(file.FullQualifiedName); + } + } + + public override void VisitChildren(BlobStorePath directory, IBlobStorePathVisitor visitor) + { + EnsureNotDisposed(); + + if (visitor == null) + throw new ArgumentNullException(nameof(visitor)); + + lock (_fileSystemIndexLock) + { + var prefix = directory.FullQualifiedName; + if (!prefix.EndsWith(BlobStorePath.Separator)) + prefix += BlobStorePath.Separator; + + var children = _knownFiles + .Where(f => f.StartsWith(prefix)) + .Select(f => f.Substring(prefix.Length)) + .Where(f => !string.IsNullOrEmpty(f)) + .ToList(); + + // Group by immediate child (file or directory) + var immediateChildren = new HashSet(); + foreach (var child in children) + { + var separatorIndex = child.IndexOf(BlobStorePath.SeparatorChar); + var immediateChild = separatorIndex >= 0 + ? child.Substring(0, separatorIndex) + : child; + + immediateChildren.Add(immediateChild); + } + + // Visit each immediate child + foreach (var child in immediateChildren.OrderBy(c => c)) + { + var fullPath = prefix + child; + var isDirectory = _knownFiles.Any(f => f.StartsWith(fullPath + BlobStorePath.Separator)); + + if (isDirectory) + { + visitor.VisitDirectory(directory, child); + } + else + { + visitor.VisitFile(directory, child); + } + } + } + } + + public override bool IsEmpty(BlobStorePath directory) + { + EnsureNotDisposed(); + + lock (_fileSystemIndexLock) + { + var prefix = directory.FullQualifiedName; + if (!prefix.EndsWith(BlobStorePath.Separator)) + prefix += BlobStorePath.Separator; + + return !_knownFiles.Any(f => f.StartsWith(prefix)); + } + } + + public override bool CreateDirectory(BlobStorePath directory) + { + EnsureNotDisposed(); + + // Directories are virtual in Kafka - always return true + return true; + } + + public override bool CreateFile(BlobStorePath file) + { + EnsureNotDisposed(); + + lock (_fileSystemIndexLock) + { + if (_knownFiles.Contains(file.FullQualifiedName)) + return false; + + _knownFiles.Add(file.FullQualifiedName); + return true; + } + } + + public override bool DeleteFile(BlobStorePath file) + { + EnsureNotDisposed(); + + var topicName = KafkaPathValidator.ToTopicName(file); + + try + { + // Delete the Kafka topics (data + index) + using var adminClient = new AdminClientBuilder(_configuration.ToAdminConfig()).Build(); + + var indexTopicName = KafkaPathValidator.GetIndexTopicName(topicName); + var topicsToDelete = new[] { topicName, indexTopicName }; + + adminClient.DeleteTopicsAsync(topicsToDelete).GetAwaiter().GetResult(); + + // Remove from file system index + lock (_fileSystemIndexLock) + { + _knownFiles.Remove(file.FullQualifiedName); + } + + // Clean up cached resources + if (_topicIndices.TryRemove(topicName, out var index)) + { + index.Dispose(); + } + + if (_producers.TryRemove(topicName, out var producer)) + { + producer.Dispose(); + } + + if (_consumers.TryRemove(topicName, out var consumer)) + { + consumer.Dispose(); + } + + return true; + } + catch (Exception ex) + { + // Log error but don't throw - file might not exist + Console.WriteLine($"Error deleting file {file.FullQualifiedName}: {ex.Message}"); + return false; + } + } + + public override byte[] ReadData(BlobStorePath file, long offset, long length) + { + EnsureNotDisposed(); + + if (offset < 0) + throw new ArgumentException("Offset must be non-negative", nameof(offset)); + + var index = GetTopicIndex(file); + var blobs = index.GetBlobs().ToList(); + + if (blobs.Count == 0) + return Array.Empty(); + + // Determine actual length to read + var fileSize = GetFileSize(file); + if (length < 0) + length = fileSize - offset; + + if (offset >= fileSize) + return Array.Empty(); + + length = Math.Min(length, fileSize - offset); + + // Find blobs that overlap with the requested range + var endPosition = offset + length - 1; + var relevantBlobs = blobs + .Where(b => b.Overlaps(offset, endPosition)) + .OrderBy(b => b.Start) + .ToList(); + + if (relevantBlobs.Count == 0) + return Array.Empty(); + + // Read data from each blob + var result = new byte[length]; + var resultOffset = 0; + + foreach (var blob in relevantBlobs) + { + var blobData = ReadBlob(file, blob); + + // Calculate which part of the blob we need + var blobStartInFile = blob.Start; + var blobEndInFile = blob.End; + + var readStart = Math.Max(offset, blobStartInFile); + var readEnd = Math.Min(endPosition, blobEndInFile); + + var offsetInBlob = readStart - blobStartInFile; + var lengthToRead = readEnd - readStart + 1; + + // Copy the relevant portion + Array.Copy(blobData, offsetInBlob, result, resultOffset, lengthToRead); + resultOffset += (int)lengthToRead; + } + + return result; + } + + public override long ReadData(BlobStorePath file, byte[] targetBuffer, long offset, long length) + { + EnsureNotDisposed(); + + if (targetBuffer == null) + throw new ArgumentNullException(nameof(targetBuffer)); + + var data = ReadData(file, offset, length); + var bytesToCopy = Math.Min(data.Length, targetBuffer.Length); + Array.Copy(data, 0, targetBuffer, 0, bytesToCopy); + return bytesToCopy; + } + + public override long WriteData(BlobStorePath file, IEnumerable sourceBuffers) + { + EnsureNotDisposed(); + + if (sourceBuffers == null) + throw new ArgumentNullException(nameof(sourceBuffers)); + + var topicName = KafkaPathValidator.ToTopicName(file); + var producer = GetProducer(topicName); + var index = GetTopicIndex(file); + + // Concatenate all source buffers + var allData = sourceBuffers.SelectMany(b => b).ToArray(); + if (allData.Length == 0) + return 0; + + var blobs = new List(); + var currentOffset = GetFileSize(file); // Append to end of file + var dataOffset = 0; + + // Split data into chunks + while (dataOffset < allData.Length) + { + var chunkSize = Math.Min(_configuration.MaxMessageBytes, allData.Length - dataOffset); + var chunk = new byte[chunkSize]; + Array.Copy(allData, dataOffset, chunk, 0, chunkSize); + + // Produce to Kafka + var message = new Message + { + Key = file.FullQualifiedName, + Value = chunk + }; + + var deliveryResult = producer.ProduceAsync(topicName, message).GetAwaiter().GetResult(); + + // Create blob metadata + var blob = KafkaBlob.New( + topicName, + deliveryResult.Partition.Value, + deliveryResult.Offset.Value, + currentOffset, + currentOffset + chunkSize - 1 + ); + + blobs.Add(blob); + + dataOffset += chunkSize; + currentOffset += chunkSize; + } + + // Flush producer + producer.Flush(TimeSpan.FromSeconds(30)); + + // Update index + index.AddBlobsAsync(blobs).GetAwaiter().GetResult(); + + // Update file system index + lock (_fileSystemIndexLock) + { + _knownFiles.Add(file.FullQualifiedName); + } + + return allData.Length; + } + + public override void MoveFile(BlobStorePath sourceFile, BlobStorePath targetFile) + { + EnsureNotDisposed(); + + // Read all data from source + var data = ReadData(sourceFile, 0, -1); + + // Write to target + WriteData(targetFile, new[] { data }); + + // Delete source + DeleteFile(sourceFile); + } + + public override long CopyFile(BlobStorePath sourceFile, BlobStorePath targetFile, long offset, long length) + { + EnsureNotDisposed(); + + // Read data from source + var data = ReadData(sourceFile, offset, length); + + // Write to target + return WriteData(targetFile, new[] { data }); + } + + public override void TruncateFile(BlobStorePath file, long newLength) + { + EnsureNotDisposed(); + + if (newLength < 0) + throw new ArgumentException("New length must be non-negative", nameof(newLength)); + + var currentSize = GetFileSize(file); + if (newLength >= currentSize) + return; // Nothing to truncate + + // Read the data we want to keep + var data = ReadData(file, 0, newLength); + + // Delete the file + DeleteFile(file); + + // Write back the truncated data + if (data.Length > 0) + { + WriteData(file, new[] { data }); + } + } + + /// + /// Reads a single blob from Kafka. + /// + /// The file path + /// The blob metadata + /// The blob data + private byte[] ReadBlob(BlobStorePath file, KafkaBlob blob) + { + var consumer = GetConsumer(blob.Topic); + + // Assign to the specific partition + var topicPartition = new TopicPartition(blob.Topic, blob.Partition); + consumer.Assign(new[] { topicPartition }); + + // Seek to the specific offset + consumer.Seek(new TopicPartitionOffset(topicPartition, blob.Offset)); + + // Consume the message + var timeout = _configuration.RequestTimeout; + var consumeResult = consumer.Consume(timeout); + + if (consumeResult == null) + throw new InvalidOperationException($"Failed to read blob at offset {blob.Offset}"); + + if (consumeResult.IsPartitionEOF) + throw new InvalidOperationException($"Reached end of partition at offset {blob.Offset}"); + + return consumeResult.Message.Value; + } + + #endregion + + protected override void Dispose(bool disposing) + { + if (_disposed) + return; + + if (disposing) + { + // Dispose all topic indices + foreach (var index in _topicIndices.Values) + { + index.Dispose(); + } + _topicIndices.Clear(); + + // Dispose all producers + foreach (var producer in _producers.Values) + { + producer.Dispose(); + } + _producers.Clear(); + + // Dispose all consumers + foreach (var consumer in _consumers.Values) + { + consumer.Dispose(); + } + _consumers.Clear(); + } + + _disposed = true; + base.Dispose(disposing); + } +} + diff --git a/afs/kafka/src/KafkaPathValidator.cs b/afs/kafka/src/KafkaPathValidator.cs new file mode 100644 index 0000000..1a10090 --- /dev/null +++ b/afs/kafka/src/KafkaPathValidator.cs @@ -0,0 +1,102 @@ +using System; +using System.Text.RegularExpressions; +using NebulaStore.Afs.Blobstore; + +namespace NebulaStore.Afs.Kafka; + +/// +/// Validates and sanitizes paths for Kafka topic naming. +/// +/// +/// Kafka topic names have the following restrictions: +/// - Only alphanumeric characters, '.', '_', and '-' are allowed +/// - Maximum length is 249 characters +/// - Cannot be "." or ".." +/// - Cannot start with "__" (reserved for internal topics) +/// +public class KafkaPathValidator +{ + private static readonly Regex InvalidCharsRegex = new Regex(@"[^a-zA-Z0-9\._\-]", RegexOptions.Compiled); + private const int MaxTopicNameLength = 249; + + /// + /// Converts a BlobStorePath to a valid Kafka topic name. + /// + /// The blob store path + /// A valid Kafka topic name + public static string ToTopicName(BlobStorePath path) + { + if (path == null) + throw new ArgumentNullException(nameof(path)); + + // Replace path separator with underscore + var topicName = path.FullQualifiedName.Replace(BlobStorePath.SeparatorChar, '_'); + + // Replace invalid characters with underscore + topicName = InvalidCharsRegex.Replace(topicName, "_"); + + // Ensure it doesn't start with double underscore (reserved) + if (topicName.StartsWith("__")) + { + topicName = "ns" + topicName; // Prefix with "ns" (NebulaStore) + } + + // Handle special cases + if (topicName == "." || topicName == "..") + { + topicName = "ns_" + topicName; + } + + // Truncate if too long + if (topicName.Length > MaxTopicNameLength) + { + // Keep the end of the name (more likely to be unique) + topicName = topicName.Substring(topicName.Length - MaxTopicNameLength); + } + + return topicName; + } + + /// + /// Gets the index topic name for a data topic. + /// + /// The data topic name + /// The index topic name + public static string GetIndexTopicName(string dataTopicName) + { + if (string.IsNullOrWhiteSpace(dataTopicName)) + throw new ArgumentException("Data topic name cannot be null or empty", nameof(dataTopicName)); + + return $"__{dataTopicName}_index"; + } + + /// + /// Validates a topic name. + /// + /// The topic name to validate + /// True if the topic name is valid + public static bool IsValidTopicName(string topicName) + { + if (string.IsNullOrWhiteSpace(topicName)) + return false; + + if (topicName.Length > MaxTopicNameLength) + return false; + + if (topicName == "." || topicName == "..") + return false; + + // Check for invalid characters + return !InvalidCharsRegex.IsMatch(topicName); + } + + /// + /// Creates a new KafkaPathValidator instance. + /// + /// A new KafkaPathValidator instance + public static KafkaPathValidator New() + { + return new KafkaPathValidator(); + } +} + diff --git a/afs/kafka/src/KafkaTopicIndex.cs b/afs/kafka/src/KafkaTopicIndex.cs new file mode 100644 index 0000000..b9e4371 --- /dev/null +++ b/afs/kafka/src/KafkaTopicIndex.cs @@ -0,0 +1,283 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; + +namespace NebulaStore.Afs.Kafka; + +/// +/// Manages the index for a single Kafka topic. +/// The index stores blob metadata in a separate Kafka topic. +/// +public class KafkaTopicIndex : IDisposable +{ + private readonly string _topic; + private readonly string _indexTopicName; + private readonly KafkaConfiguration _configuration; + private readonly object _lock = new(); + + private List? _blobs; + private IProducer? _producer; + private bool _disposed; + + /// + /// Initializes a new instance of the KafkaTopicIndex class. + /// + /// The data topic name + /// The Kafka configuration + private KafkaTopicIndex(string topic, KafkaConfiguration configuration) + { + _topic = topic ?? throw new ArgumentNullException(nameof(topic)); + _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); + _indexTopicName = KafkaPathValidator.GetIndexTopicName(topic); + } + + /// + /// Creates a new KafkaTopicIndex instance. + /// + /// The data topic name + /// The Kafka configuration + /// A new KafkaTopicIndex instance + public static KafkaTopicIndex New(string topic, KafkaConfiguration configuration) + { + return new KafkaTopicIndex(topic, configuration); + } + + /// + /// Gets all blobs for this topic. + /// + /// An enumerable of KafkaBlob instances + public IEnumerable GetBlobs() + { + EnsureNotDisposed(); + + lock (_lock) + { + EnsureBlobs(); + return _blobs!.ToList(); // Return a copy + } + } + + /// + /// Adds blobs to the index. + /// + /// The blobs to add + /// A task representing the asynchronous operation + public async Task AddBlobsAsync(IEnumerable blobs) + { + EnsureNotDisposed(); + + if (blobs == null) + throw new ArgumentNullException(nameof(blobs)); + + var blobList = blobs.ToList(); + if (blobList.Count == 0) + return; + + lock (_lock) + { + EnsureBlobs(); + EnsureProducer(); + + foreach (var blob in blobList) + { + // Add to in-memory index + _blobs!.Add(blob); + + // Write to index topic + var metadata = blob.ToBytes(); + var message = new Message + { + Key = _topic, + Value = metadata + }; + + // Fire and forget - we'll await all at the end + _producer!.Produce(_indexTopicName, message, deliveryReport => + { + if (deliveryReport.Error.IsError) + { + throw new KafkaException(deliveryReport.Error); + } + }); + } + + // Flush to ensure all messages are sent + _producer!.Flush(TimeSpan.FromSeconds(30)); + } + + await Task.CompletedTask; + } + + /// + /// Deletes blobs from the index. + /// + /// The blobs to delete + /// A task representing the asynchronous operation + public async Task DeleteBlobsAsync(IEnumerable blobsToDelete) + { + EnsureNotDisposed(); + + if (blobsToDelete == null) + throw new ArgumentNullException(nameof(blobsToDelete)); + + var deleteList = blobsToDelete.ToList(); + if (deleteList.Count == 0) + return; + + lock (_lock) + { + EnsureBlobs(); + + // Remove from in-memory index + foreach (var blob in deleteList) + { + _blobs!.Remove(blob); + } + + // Note: In a full implementation, we would need to: + // 1. Delete records from the index topic (using AdminClient.DeleteRecordsAsync) + // 2. Rewrite the remaining blobs to the index topic + // For now, we just update the in-memory index + // This will be implemented in Phase 2 + } + + await Task.CompletedTask; + } + + /// + /// Loads blobs from the index topic. + /// + /// A task representing the asynchronous operation + private async Task> LoadBlobsAsync() + { + var blobs = new List(); + + // Create a consumer for the index topic + var consumerConfig = _configuration.ToConsumerConfig(); + consumerConfig.GroupId = $"index-loader-{Guid.NewGuid()}"; // Unique group ID + consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; + + using var consumer = new ConsumerBuilder(consumerConfig).Build(); + + try + { + // Subscribe to the index topic + consumer.Subscribe(_indexTopicName); + + // Get the topic partition + var assignment = consumer.Assignment; + if (assignment.Count == 0) + { + // Wait for assignment + await Task.Delay(100); + assignment = consumer.Assignment; + } + + // If still no assignment, the topic might not exist yet + if (assignment.Count == 0) + { + return blobs; + } + + // Consume all messages + var timeout = TimeSpan.FromSeconds(5); + var endReached = false; + + while (!endReached) + { + var consumeResult = consumer.Consume(timeout); + + if (consumeResult == null) + { + // Timeout - assume we've reached the end + endReached = true; + continue; + } + + if (consumeResult.IsPartitionEOF) + { + endReached = true; + continue; + } + + // Deserialize blob metadata + var blob = KafkaBlob.FromBytes(_topic, consumeResult.Message.Value); + blobs.Add(blob); + } + } + catch (ConsumeException ex) + { + // Topic might not exist yet - that's okay + if (!ex.Error.Code.ToString().Contains("UNKNOWN_TOPIC")) + { + throw; + } + } + finally + { + consumer.Close(); + } + + return blobs; + } + + /// + /// Ensures the blobs list is loaded. + /// + private void EnsureBlobs() + { + if (_blobs == null) + { + // Load blobs synchronously (blocking) + // In a production implementation, we might want to make this async + _blobs = LoadBlobsAsync().GetAwaiter().GetResult(); + } + } + + /// + /// Ensures the producer is created. + /// + private void EnsureProducer() + { + if (_producer == null) + { + var producerConfig = _configuration.ToProducerConfig(); + _producer = new ProducerBuilder(producerConfig).Build(); + } + } + + /// + /// Ensures the instance is not disposed. + /// + private void EnsureNotDisposed() + { + if (_disposed) + throw new ObjectDisposedException(nameof(KafkaTopicIndex)); + } + + /// + /// Disposes the index. + /// + public void Dispose() + { + if (_disposed) + return; + + lock (_lock) + { + _blobs?.Clear(); + _blobs = null; + + _producer?.Dispose(); + _producer = null; + + _disposed = true; + } + + GC.SuppressFinalize(this); + } +} + diff --git a/afs/kafka/tests/KafkaBlobTests.cs b/afs/kafka/tests/KafkaBlobTests.cs new file mode 100644 index 0000000..7aa0875 --- /dev/null +++ b/afs/kafka/tests/KafkaBlobTests.cs @@ -0,0 +1,261 @@ +using System; +using Xunit; + +namespace NebulaStore.Afs.Kafka.Tests; + +/// +/// Unit tests for KafkaBlob. +/// +public class KafkaBlobTests +{ + [Fact] + public void New_ValidParameters_CreatesBlob() + { + // Arrange & Act + var blob = KafkaBlob.New("test-topic", 0, 100, 0, 999); + + // Assert + Assert.Equal("test-topic", blob.Topic); + Assert.Equal(0, blob.Partition); + Assert.Equal(100, blob.Offset); + Assert.Equal(0, blob.Start); + Assert.Equal(999, blob.End); + Assert.Equal(1000, blob.Size); + } + + [Fact] + public void New_NullTopic_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaBlob.New(null!, 0, 0, 0, 100)); + } + + [Fact] + public void New_EmptyTopic_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaBlob.New("", 0, 0, 0, 100)); + } + + [Fact] + public void New_NegativePartition_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaBlob.New("topic", -1, 0, 0, 100)); + } + + [Fact] + public void New_NegativeOffset_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaBlob.New("topic", 0, -1, 0, 100)); + } + + [Fact] + public void New_NegativeStart_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaBlob.New("topic", 0, 0, -1, 100)); + } + + [Fact] + public void New_EndBeforeStart_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaBlob.New("topic", 0, 0, 100, 50)); + } + + [Fact] + public void Size_CalculatesCorrectly() + { + // Arrange + var blob = KafkaBlob.New("topic", 0, 0, 100, 199); + + // Act & Assert + Assert.Equal(100, blob.Size); + } + + [Fact] + public void ToBytes_SerializesCorrectly() + { + // Arrange + var blob = KafkaBlob.New("topic", 5, 1000, 2000, 2999); + + // Act + var bytes = blob.ToBytes(); + + // Assert + Assert.Equal(28, bytes.Length); + } + + [Fact] + public void FromBytes_DeserializesCorrectly() + { + // Arrange + var original = KafkaBlob.New("topic", 5, 1000, 2000, 2999); + var bytes = original.ToBytes(); + + // Act + var deserialized = KafkaBlob.FromBytes("topic", bytes); + + // Assert + Assert.Equal(original.Topic, deserialized.Topic); + Assert.Equal(original.Partition, deserialized.Partition); + Assert.Equal(original.Offset, deserialized.Offset); + Assert.Equal(original.Start, deserialized.Start); + Assert.Equal(original.End, deserialized.End); + Assert.Equal(original.Size, deserialized.Size); + } + + [Fact] + public void FromBytes_InvalidLength_ThrowsArgumentException() + { + // Arrange + var invalidBytes = new byte[20]; + + // Act & Assert + Assert.Throws(() => KafkaBlob.FromBytes("topic", invalidBytes)); + } + + [Fact] + public void FromBytes_NullBytes_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaBlob.FromBytes("topic", null!)); + } + + [Fact] + public void Contains_PositionInRange_ReturnsTrue() + { + // Arrange + var blob = KafkaBlob.New("topic", 0, 0, 100, 199); + + // Act & Assert + Assert.True(blob.Contains(100)); + Assert.True(blob.Contains(150)); + Assert.True(blob.Contains(199)); + } + + [Fact] + public void Contains_PositionOutOfRange_ReturnsFalse() + { + // Arrange + var blob = KafkaBlob.New("topic", 0, 0, 100, 199); + + // Act & Assert + Assert.False(blob.Contains(99)); + Assert.False(blob.Contains(200)); + } + + [Fact] + public void Overlaps_RangeOverlaps_ReturnsTrue() + { + // Arrange + var blob = KafkaBlob.New("topic", 0, 0, 100, 199); + + // Act & Assert + Assert.True(blob.Overlaps(50, 150)); // Overlaps start + Assert.True(blob.Overlaps(150, 250)); // Overlaps end + Assert.True(blob.Overlaps(100, 199)); // Exact match + Assert.True(blob.Overlaps(120, 180)); // Contained within + Assert.True(blob.Overlaps(50, 250)); // Contains blob + } + + [Fact] + public void Overlaps_RangeDoesNotOverlap_ReturnsFalse() + { + // Arrange + var blob = KafkaBlob.New("topic", 0, 0, 100, 199); + + // Act & Assert + Assert.False(blob.Overlaps(0, 99)); // Before + Assert.False(blob.Overlaps(200, 300)); // After + } + + [Fact] + public void GetBlobOffset_ValidPosition_ReturnsCorrectOffset() + { + // Arrange + var blob = KafkaBlob.New("topic", 0, 0, 100, 199); + + // Act & Assert + Assert.Equal(0, blob.GetBlobOffset(100)); + Assert.Equal(50, blob.GetBlobOffset(150)); + Assert.Equal(99, blob.GetBlobOffset(199)); + } + + [Fact] + public void GetBlobOffset_InvalidPosition_ThrowsArgumentException() + { + // Arrange + var blob = KafkaBlob.New("topic", 0, 0, 100, 199); + + // Act & Assert + Assert.Throws(() => blob.GetBlobOffset(99)); + Assert.Throws(() => blob.GetBlobOffset(200)); + } + + [Fact] + public void ToString_ReturnsFormattedString() + { + // Arrange + var blob = KafkaBlob.New("test-topic", 5, 1000, 2000, 2999); + + // Act + var str = blob.ToString(); + + // Assert + Assert.Contains("test-topic", str); + Assert.Contains("5", str); + Assert.Contains("1000", str); + Assert.Contains("2000", str); + Assert.Contains("2999", str); + } + + [Theory] + [InlineData(0, 0, 0, 0, 1)] + [InlineData(0, 100, 0, 999, 1000)] + [InlineData(1, 200, 1000, 1999, 1000)] + [InlineData(2, 300, 2000, 3999, 2000)] + public void RoundTrip_Serialization_PreservesData(int partition, long offset, long start, long end, long expectedSize) + { + // Arrange + var original = KafkaBlob.New("topic", partition, offset, start, end); + + // Act + var bytes = original.ToBytes(); + var deserialized = KafkaBlob.FromBytes("topic", bytes); + + // Assert + Assert.Equal(original.Partition, deserialized.Partition); + Assert.Equal(original.Offset, deserialized.Offset); + Assert.Equal(original.Start, deserialized.Start); + Assert.Equal(original.End, deserialized.End); + Assert.Equal(expectedSize, deserialized.Size); + } + + [Fact] + public void Equality_SameValues_AreEqual() + { + // Arrange + var blob1 = KafkaBlob.New("topic", 0, 100, 0, 999); + var blob2 = KafkaBlob.New("topic", 0, 100, 0, 999); + + // Act & Assert + Assert.Equal(blob1, blob2); + Assert.True(blob1 == blob2); + } + + [Fact] + public void Equality_DifferentValues_AreNotEqual() + { + // Arrange + var blob1 = KafkaBlob.New("topic", 0, 100, 0, 999); + var blob2 = KafkaBlob.New("topic", 0, 101, 0, 999); + + // Act & Assert + Assert.NotEqual(blob1, blob2); + Assert.True(blob1 != blob2); + } +} + diff --git a/afs/kafka/tests/KafkaConfigurationTests.cs b/afs/kafka/tests/KafkaConfigurationTests.cs new file mode 100644 index 0000000..ceea61a --- /dev/null +++ b/afs/kafka/tests/KafkaConfigurationTests.cs @@ -0,0 +1,315 @@ +using System; +using System.Linq; +using Confluent.Kafka; +using Xunit; + +namespace NebulaStore.Afs.Kafka.Tests; + +/// +/// Unit tests for KafkaConfiguration. +/// +public class KafkaConfigurationTests +{ + [Fact] + public void New_ValidBootstrapServers_CreatesConfiguration() + { + // Act + var config = KafkaConfiguration.New("localhost:9092"); + + // Assert + Assert.Equal("localhost:9092", config.BootstrapServers); + Assert.NotNull(config.ClientId); + Assert.True(config.MaxMessageBytes > 0); + } + + [Fact] + public void Production_ValidParameters_CreatesProductionConfiguration() + { + // Act + var config = KafkaConfiguration.Production("kafka1:9092,kafka2:9092", "my-app"); + + // Assert + Assert.Equal("kafka1:9092,kafka2:9092", config.BootstrapServers); + Assert.Equal("my-app", config.ClientId); + Assert.True(config.EnableIdempotence); + Assert.Equal(CompressionType.Snappy, config.Compression); + Assert.True(config.UseCache); + Assert.NotEmpty(config.AdditionalSettings); + } + + [Fact] + public void Development_DefaultParameters_CreatesDevelopmentConfiguration() + { + // Act + var config = KafkaConfiguration.Development(); + + // Assert + Assert.Equal("localhost:9092", config.BootstrapServers); + Assert.Equal("nebulastore-dev", config.ClientId); + Assert.False(config.EnableIdempotence); + Assert.Equal(CompressionType.None, config.Compression); + Assert.False(config.UseCache); + } + + [Fact] + public void Development_CustomBootstrapServers_UsesCustomServers() + { + // Act + var config = KafkaConfiguration.Development("custom:9092"); + + // Assert + Assert.Equal("custom:9092", config.BootstrapServers); + } + + [Fact] + public void Validate_ValidConfiguration_DoesNotThrow() + { + // Arrange + var config = KafkaConfiguration.New("localhost:9092"); + + // Act & Assert + config.Validate(); // Should not throw + } + + [Fact] + public void Validate_NullBootstrapServers_ThrowsInvalidOperationException() + { + // Arrange + var config = new KafkaConfiguration { BootstrapServers = null! }; + + // Act & Assert + Assert.Throws(() => config.Validate()); + } + + [Fact] + public void Validate_EmptyBootstrapServers_ThrowsInvalidOperationException() + { + // Arrange + var config = new KafkaConfiguration { BootstrapServers = "" }; + + // Act & Assert + Assert.Throws(() => config.Validate()); + } + + [Fact] + public void Validate_NullClientId_ThrowsInvalidOperationException() + { + // Arrange + var config = new KafkaConfiguration + { + BootstrapServers = "localhost:9092", + ClientId = null! + }; + + // Act & Assert + Assert.Throws(() => config.Validate()); + } + + [Fact] + public void Validate_NegativeMaxMessageBytes_ThrowsInvalidOperationException() + { + // Arrange + var config = new KafkaConfiguration + { + BootstrapServers = "localhost:9092", + MaxMessageBytes = -1 + }; + + // Act & Assert + Assert.Throws(() => config.Validate()); + } + + [Fact] + public void Validate_ZeroMaxMessageBytes_ThrowsInvalidOperationException() + { + // Arrange + var config = new KafkaConfiguration + { + BootstrapServers = "localhost:9092", + MaxMessageBytes = 0 + }; + + // Act & Assert + Assert.Throws(() => config.Validate()); + } + + [Fact] + public void Validate_ExcessiveMaxMessageBytes_ThrowsInvalidOperationException() + { + // Arrange + var config = new KafkaConfiguration + { + BootstrapServers = "localhost:9092", + MaxMessageBytes = 20_000_000 // 20MB + }; + + // Act & Assert + Assert.Throws(() => config.Validate()); + } + + [Fact] + public void Validate_NegativeRequestTimeout_ThrowsInvalidOperationException() + { + // Arrange + var config = new KafkaConfiguration + { + BootstrapServers = "localhost:9092", + RequestTimeout = TimeSpan.FromSeconds(-1) + }; + + // Act & Assert + Assert.Throws(() => config.Validate()); + } + + [Fact] + public void ToProducerConfig_CreatesValidProducerConfig() + { + // Arrange + var config = KafkaConfiguration.New("localhost:9092"); + + // Act + var producerConfig = config.ToProducerConfig(); + + // Assert + Assert.NotNull(producerConfig); + Assert.Equal("localhost:9092", producerConfig.BootstrapServers); + Assert.Equal(config.ClientId, producerConfig.ClientId); + Assert.Equal(config.EnableIdempotence, producerConfig.EnableIdempotence); + Assert.Equal(Acks.All, producerConfig.Acks); + } + + [Fact] + public void ToProducerConfig_AppliesAdditionalSettings() + { + // Arrange + var config = KafkaConfiguration.New("localhost:9092"); + config.AdditionalSettings["custom.setting"] = "custom-value"; + + // Act + var producerConfig = config.ToProducerConfig(); + + // Assert + Assert.NotNull(producerConfig); + // Note: We can't easily verify custom settings were applied without reflection + // This test mainly ensures no exception is thrown + } + + [Fact] + public void ToConsumerConfig_CreatesValidConsumerConfig() + { + // Arrange + var config = KafkaConfiguration.New("localhost:9092"); + + // Act + var consumerConfig = config.ToConsumerConfig(); + + // Assert + Assert.NotNull(consumerConfig); + Assert.Equal("localhost:9092", consumerConfig.BootstrapServers); + Assert.Equal(config.ClientId, consumerConfig.ClientId); + Assert.Equal(config.ConsumerGroupId, consumerConfig.GroupId); + Assert.Equal(AutoOffsetReset.Earliest, consumerConfig.AutoOffsetReset); + Assert.False(consumerConfig.EnableAutoCommit); + } + + [Fact] + public void ToAdminConfig_CreatesValidAdminConfig() + { + // Arrange + var config = KafkaConfiguration.New("localhost:9092"); + + // Act + var adminConfig = config.ToAdminConfig(); + + // Assert + Assert.NotNull(adminConfig); + Assert.Equal("localhost:9092", adminConfig.BootstrapServers); + Assert.Equal(config.ClientId, adminConfig.ClientId); + } + + [Fact] + public void DefaultValues_AreCorrect() + { + // Act + var config = new KafkaConfiguration(); + + // Assert + Assert.Equal("localhost:9092", config.BootstrapServers); + Assert.Equal("nebulastore-kafka", config.ClientId); + Assert.Equal(1_000_000, config.MaxMessageBytes); + Assert.Equal(TimeSpan.FromMinutes(1), config.RequestTimeout); + Assert.True(config.EnableIdempotence); + Assert.Equal(CompressionType.None, config.Compression); + Assert.True(config.UseCache); + Assert.Equal("nebulastore-kafka-consumer", config.ConsumerGroupId); + Assert.NotNull(config.AdditionalSettings); + Assert.Empty(config.AdditionalSettings); + } + + [Fact] + public void CustomConfiguration_AllPropertiesCanBeSet() + { + // Act + var config = new KafkaConfiguration + { + BootstrapServers = "custom:9092", + ClientId = "custom-client", + MaxMessageBytes = 2_000_000, + RequestTimeout = TimeSpan.FromMinutes(2), + EnableIdempotence = false, + Compression = CompressionType.Lz4, + UseCache = false, + ConsumerGroupId = "custom-group", + AdditionalSettings = new System.Collections.Generic.Dictionary + { + ["key1"] = "value1", + ["key2"] = "value2" + } + }; + + // Assert + Assert.Equal("custom:9092", config.BootstrapServers); + Assert.Equal("custom-client", config.ClientId); + Assert.Equal(2_000_000, config.MaxMessageBytes); + Assert.Equal(TimeSpan.FromMinutes(2), config.RequestTimeout); + Assert.False(config.EnableIdempotence); + Assert.Equal(CompressionType.Lz4, config.Compression); + Assert.False(config.UseCache); + Assert.Equal("custom-group", config.ConsumerGroupId); + Assert.Equal(2, config.AdditionalSettings.Count); + Assert.Equal("value1", config.AdditionalSettings["key1"]); + Assert.Equal("value2", config.AdditionalSettings["key2"]); + } + + [Theory] + [InlineData(100_000)] + [InlineData(500_000)] + [InlineData(1_000_000)] + [InlineData(2_000_000)] + [InlineData(5_000_000)] + public void Validate_ValidMaxMessageBytes_DoesNotThrow(int maxMessageBytes) + { + // Arrange + var config = new KafkaConfiguration + { + BootstrapServers = "localhost:9092", + MaxMessageBytes = maxMessageBytes + }; + + // Act & Assert + config.Validate(); // Should not throw + } + + [Fact] + public void Production_HasReliabilitySettings() + { + // Act + var config = KafkaConfiguration.Production("localhost:9092", "test"); + + // Assert + Assert.True(config.EnableIdempotence); + Assert.Contains(config.AdditionalSettings, kvp => kvp.Key == "acks" && kvp.Value == "all"); + Assert.Contains(config.AdditionalSettings, kvp => kvp.Key == "retries"); + } +} + diff --git a/afs/kafka/tests/KafkaPathValidatorTests.cs b/afs/kafka/tests/KafkaPathValidatorTests.cs new file mode 100644 index 0000000..f0dcd47 --- /dev/null +++ b/afs/kafka/tests/KafkaPathValidatorTests.cs @@ -0,0 +1,295 @@ +using System; +using NebulaStore.Afs.Blobstore; +using Xunit; + +namespace NebulaStore.Afs.Kafka.Tests; + +/// +/// Unit tests for KafkaPathValidator. +/// +public class KafkaPathValidatorTests +{ + [Fact] + public void ToTopicName_SimplePath_ReturnsValidTopicName() + { + // Arrange + var path = BlobStorePath.New("container", "file.txt"); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.NotNull(topicName); + Assert.NotEmpty(topicName); + Assert.True(KafkaPathValidator.IsValidTopicName(topicName)); + } + + [Fact] + public void ToTopicName_PathWithSlashes_ReplacesWithUnderscores() + { + // Arrange + var path = BlobStorePath.New("container", "dir1", "dir2", "file.txt"); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.DoesNotContain("/", topicName); + Assert.Contains("_", topicName); + } + + [Fact] + public void ToTopicName_PathWithInvalidChars_ReplacesWithUnderscores() + { + // Arrange + var path = BlobStorePath.New("container", "file@#$.txt"); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.DoesNotContain("@", topicName); + Assert.DoesNotContain("#", topicName); + Assert.DoesNotContain("$", topicName); + Assert.True(KafkaPathValidator.IsValidTopicName(topicName)); + } + + [Fact] + public void ToTopicName_PathStartingWithDoubleUnderscore_AddsPrefixToAvoidReserved() + { + // Arrange + // Create a path that would result in a topic name starting with "__" + var path = BlobStorePath.New("__reserved"); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.StartsWith("ns__", topicName); + } + + [Fact] + public void ToTopicName_DotPath_AddsPrefixToAvoidSpecialCase() + { + // Arrange + var path = BlobStorePath.New("."); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.NotEqual(".", topicName); + Assert.StartsWith("ns_", topicName); + } + + [Fact] + public void ToTopicName_DoubleDotPath_AddsPrefixToAvoidSpecialCase() + { + // Arrange + var path = BlobStorePath.New(".."); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.NotEqual("..", topicName); + Assert.StartsWith("ns_", topicName); + } + + [Fact] + public void ToTopicName_VeryLongPath_TruncatesToMaxLength() + { + // Arrange + var longName = new string('a', 300); + var path = BlobStorePath.New(longName); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.True(topicName.Length <= 249); + Assert.True(KafkaPathValidator.IsValidTopicName(topicName)); + } + + [Fact] + public void ToTopicName_NullPath_ThrowsArgumentNullException() + { + // Act & Assert + Assert.Throws(() => KafkaPathValidator.ToTopicName(null!)); + } + + [Fact] + public void GetIndexTopicName_ValidTopicName_ReturnsIndexTopicName() + { + // Arrange + var dataTopicName = "my-data-topic"; + + // Act + var indexTopicName = KafkaPathValidator.GetIndexTopicName(dataTopicName); + + // Assert + Assert.Equal("__my-data-topic_index", indexTopicName); + } + + [Fact] + public void GetIndexTopicName_NullTopicName_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaPathValidator.GetIndexTopicName(null!)); + } + + [Fact] + public void GetIndexTopicName_EmptyTopicName_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => KafkaPathValidator.GetIndexTopicName("")); + } + + [Fact] + public void IsValidTopicName_ValidName_ReturnsTrue() + { + // Act & Assert + Assert.True(KafkaPathValidator.IsValidTopicName("valid-topic-name")); + Assert.True(KafkaPathValidator.IsValidTopicName("valid_topic_name")); + Assert.True(KafkaPathValidator.IsValidTopicName("valid.topic.name")); + Assert.True(KafkaPathValidator.IsValidTopicName("ValidTopicName123")); + } + + [Fact] + public void IsValidTopicName_InvalidName_ReturnsFalse() + { + // Act & Assert + Assert.False(KafkaPathValidator.IsValidTopicName(null!)); + Assert.False(KafkaPathValidator.IsValidTopicName("")); + Assert.False(KafkaPathValidator.IsValidTopicName(" ")); + Assert.False(KafkaPathValidator.IsValidTopicName(".")); + Assert.False(KafkaPathValidator.IsValidTopicName("..")); + Assert.False(KafkaPathValidator.IsValidTopicName("invalid@topic")); + Assert.False(KafkaPathValidator.IsValidTopicName("invalid#topic")); + Assert.False(KafkaPathValidator.IsValidTopicName("invalid$topic")); + Assert.False(KafkaPathValidator.IsValidTopicName("invalid/topic")); + } + + [Fact] + public void IsValidTopicName_TooLong_ReturnsFalse() + { + // Arrange + var tooLongName = new string('a', 250); + + // Act & Assert + Assert.False(KafkaPathValidator.IsValidTopicName(tooLongName)); + } + + [Theory] + [InlineData("container/file.txt", "container_file.txt")] + [InlineData("dir1/dir2/file.txt", "dir1_dir2_file.txt")] + [InlineData("simple", "simple")] + [InlineData("with-dashes", "with-dashes")] + [InlineData("with.dots", "with.dots")] + public void ToTopicName_VariousPaths_ProducesExpectedTopicNames(string pathString, string expectedPattern) + { + // Arrange + var parts = pathString.Split('/'); + var path = BlobStorePath.New(parts); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.Contains(expectedPattern, topicName); + Assert.True(KafkaPathValidator.IsValidTopicName(topicName)); + } + + [Fact] + public void New_CreatesInstance() + { + // Act + var validator = KafkaPathValidator.New(); + + // Assert + Assert.NotNull(validator); + } + + [Fact] + public void ToTopicName_AlphanumericPath_PreservesName() + { + // Arrange + var path = BlobStorePath.New("container", "file123"); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.Contains("file123", topicName); + } + + [Fact] + public void ToTopicName_PathWithDashes_PreservesDashes() + { + // Arrange + var path = BlobStorePath.New("my-container", "my-file"); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.Contains("my-container", topicName); + Assert.Contains("my-file", topicName); + } + + [Fact] + public void ToTopicName_PathWithDots_PreservesDots() + { + // Arrange + var path = BlobStorePath.New("my.container", "file.txt"); + + // Act + var topicName = KafkaPathValidator.ToTopicName(path); + + // Assert + Assert.Contains("my.container", topicName); + Assert.Contains("file.txt", topicName); + } + + [Fact] + public void ToTopicName_ConsistentResults_SamePathProducesSameTopicName() + { + // Arrange + var path1 = BlobStorePath.New("container", "file.txt"); + var path2 = BlobStorePath.New("container", "file.txt"); + + // Act + var topicName1 = KafkaPathValidator.ToTopicName(path1); + var topicName2 = KafkaPathValidator.ToTopicName(path2); + + // Assert + Assert.Equal(topicName1, topicName2); + } + + [Fact] + public void GetIndexTopicName_StartsWithDoubleUnderscore() + { + // Arrange + var dataTopicName = "data-topic"; + + // Act + var indexTopicName = KafkaPathValidator.GetIndexTopicName(dataTopicName); + + // Assert + Assert.StartsWith("__", indexTopicName); + } + + [Fact] + public void GetIndexTopicName_EndsWithIndex() + { + // Arrange + var dataTopicName = "data-topic"; + + // Act + var indexTopicName = KafkaPathValidator.GetIndexTopicName(dataTopicName); + + // Assert + Assert.EndsWith("_index", indexTopicName); + } +} + diff --git a/afs/kafka/tests/NebulaStore.Afs.Kafka.Tests.csproj b/afs/kafka/tests/NebulaStore.Afs.Kafka.Tests.csproj new file mode 100644 index 0000000..68ff8a8 --- /dev/null +++ b/afs/kafka/tests/NebulaStore.Afs.Kafka.Tests.csproj @@ -0,0 +1,29 @@ + + + + net9.0 + enable + enable + false + latest + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + +