diff --git a/Daqifi.Core.Cli.csproj b/Daqifi.Core.Cli.csproj index 5ce0972..3448e2c 100644 --- a/Daqifi.Core.Cli.csproj +++ b/Daqifi.Core.Cli.csproj @@ -12,7 +12,7 @@ - + diff --git a/Program.cs b/Program.cs index dc3bc6d..cd55f2c 100644 --- a/Program.cs +++ b/Program.cs @@ -180,6 +180,14 @@ private static async Task RunStreamingSessionAsync(CliOptions options) } var messageCount = 0; + + // The device sends analog and digital data in separate protobuf + // messages that share the same timestamp. We buffer the pending + // analog message and merge it with the subsequent digital message + // before writing a single combined output row. + DaqifiOutMessage? pendingAnalog = null; + var pendingLock = new object(); + device.MessageReceived += (_, eventArgs) => { if (stopCts.IsCancellationRequested) @@ -192,24 +200,50 @@ private static async Task RunStreamingSessionAsync(CliOptions options) return; } - var currentCount = Interlocked.Increment(ref messageCount); - if (options.MessageLimit > 0 && currentCount > options.MessageLimit) + if (options.ShowStatusMessages && ProtobufProtocolHandler.DetectMessageType(message) == ProtobufMessageType.Status) { + WriteStatusSummary(outputWriter, message); return; } - if (IsStreamLikeMessage(message)) + if (!IsStreamLikeMessage(message)) { - WriteStreamSample(outputWriter, message, options.OutputFormat); - } - else if (options.ShowStatusMessages && ProtobufProtocolHandler.DetectMessageType(message) == ProtobufMessageType.Status) - { - WriteStatusSummary(outputWriter, message); + return; } - if (options.MessageLimit > 0 && currentCount >= options.MessageLimit) + lock (pendingLock) { - stopCts.Cancel(); + var hasAnalog = message.AnalogInData.Count > 0 || message.AnalogInDataFloat.Count > 0; + var hasDigital = message.DigitalData.Length > 0; + + if (hasAnalog && !hasDigital) + { + // Flush any stale pending message before buffering the new one + if (pendingAnalog != null) + { + WriteMergedSample(outputWriter, pendingAnalog, null, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts); + } + + pendingAnalog = message; + return; + } + + if (hasDigital && pendingAnalog != null && pendingAnalog.MsgTimeStamp == message.MsgTimeStamp) + { + // Matching pair — merge and write + WriteMergedSample(outputWriter, pendingAnalog, message, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts); + pendingAnalog = null; + return; + } + + // Digital-only with no matching analog, or timestamp mismatch + if (pendingAnalog != null) + { + WriteMergedSample(outputWriter, pendingAnalog, null, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts); + pendingAnalog = null; + } + + WriteMergedSample(outputWriter, message, null, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts); } }; @@ -249,6 +283,16 @@ private static async Task RunStreamingSessionAsync(CliOptions options) device.Send(ScpiMessageProducer.StopStreaming); Console.WriteLine("Streaming stopped."); + // Flush any buffered analog-only message that never got a matching digital + lock (pendingLock) + { + if (pendingAnalog != null) + { + WriteMergedSample(outputWriter, pendingAnalog, null, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts); + pendingAnalog = null; + } + } + if (options.MinSamples > 0 && messageCount < options.MinSamples) { Console.Error.WriteLine( @@ -663,8 +707,39 @@ private static async Task RunSdCardOperationAsync(CliOptions options) { streamingDevice.StreamingFrequency = options.SampleRate; + // Enable channels before starting SD card logging. Core's + // StartSdCardLoggingAsync only forwards the channel mask — it does + // not enable channels itself. Without an explicit mask we enable all + // ADC channels (the device reports AnalogInputChannels in its + // capabilities after InitializeAsync) and DIO ports so the log + // file is not empty. + var channelMask = options.ChannelMask; + if (!string.IsNullOrWhiteSpace(channelMask) && !IsValidChannelMask(channelMask)) + { + Console.Error.WriteLine($"Invalid channel mask: {channelMask}"); + return 1; + } + + if (string.IsNullOrWhiteSpace(channelMask)) + { + var adcCount = streamingDevice.Metadata.Capabilities.AnalogInputChannels; + if (adcCount > 0) + { + channelMask = new string('1', adcCount); + } + } + + if (!string.IsNullOrWhiteSpace(channelMask)) + { + streamingDevice.Send(ScpiMessageProducer.EnableAdcChannels(channelMask)); + await Task.Delay(100); + } + + streamingDevice.Send(ScpiMessageProducer.EnableDioPorts()); + await Task.Delay(100); + await streamingDevice.StartSdCardLoggingAsync( - channelMask: options.ChannelMask, + channelMask: channelMask, format: options.SdLogFormat); Console.WriteLine("SD card logging started."); @@ -1071,20 +1146,38 @@ private static bool IsStreamLikeMessage(DaqifiOutMessage message) message.DigitalData.Length > 0; } - private static void WriteStreamSample(TextWriter writer, DaqifiOutMessage message, OutputFormat format) + private static void WriteMergedSample( + TextWriter writer, + DaqifiOutMessage analogMessage, + DaqifiOutMessage? digitalMessage, + OutputFormat format, + ref int messageCount, + int messageLimit, + CancellationTokenSource stopCts) { + var currentCount = Interlocked.Increment(ref messageCount); + if (messageLimit > 0 && currentCount > messageLimit) + { + return; + } + switch (format) { case OutputFormat.Jsonl: - writer.WriteLine(ToJsonLine(message)); + writer.WriteLine(ToJsonLine(analogMessage, digitalMessage)); break; case OutputFormat.Csv: - writer.WriteLine(ToCsvLine(message)); + writer.WriteLine(ToCsvLine(analogMessage, digitalMessage)); break; default: - writer.WriteLine(ToTextLine(message)); + writer.WriteLine(ToTextLine(analogMessage, digitalMessage)); break; } + + if (messageLimit > 0 && currentCount >= messageLimit) + { + stopCts.Cancel(); + } } private static void WriteStatusSummary(TextWriter writer, DaqifiOutMessage message) @@ -1094,19 +1187,19 @@ private static void WriteStatusSummary(TextWriter writer, DaqifiOutMessage messa $"fw={message.DeviceFwRev ?? "unknown"} sn={message.DeviceSn}"); } - private static string ToTextLine(DaqifiOutMessage message) + private static string ToTextLine(DaqifiOutMessage analogMsg, DaqifiOutMessage? digitalMsg) { var builder = new StringBuilder(); - if (message.MsgTimeStamp != 0) + if (analogMsg.MsgTimeStamp != 0) { builder.Append("ts="); - builder.Append(message.MsgTimeStamp.ToString(CultureInfo.InvariantCulture)); + builder.Append(analogMsg.MsgTimeStamp.ToString(CultureInfo.InvariantCulture)); builder.Append(' '); } - var analogValues = message.AnalogInDataFloat.Count > 0 - ? message.AnalogInDataFloat.Select(value => value.ToString("F3", CultureInfo.InvariantCulture)).ToList() - : message.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList(); + var analogValues = analogMsg.AnalogInDataFloat.Count > 0 + ? analogMsg.AnalogInDataFloat.Select(value => value.ToString("F3", CultureInfo.InvariantCulture)).ToList() + : analogMsg.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList(); if (analogValues.Count > 0) { @@ -1119,9 +1212,11 @@ private static string ToTextLine(DaqifiOutMessage message) builder.Append(']'); } - if (message.DigitalData.Length > 0) + // Prefer the paired digital message; fall back to analog message's own digital data + var digitalSource = digitalMsg ?? analogMsg; + if (digitalSource.DigitalData.Length > 0) { - var digital = BitConverter.ToString(message.DigitalData.ToByteArray()); + var digital = BitConverter.ToString(digitalSource.DigitalData.ToByteArray()); builder.Append(" digital="); builder.Append(digital); } @@ -1129,33 +1224,36 @@ private static string ToTextLine(DaqifiOutMessage message) return builder.ToString(); } - private static string ToCsvLine(DaqifiOutMessage message) + private static string ToCsvLine(DaqifiOutMessage analogMsg, DaqifiOutMessage? digitalMsg) { - var analogValues = message.AnalogInDataFloat.Count > 0 - ? message.AnalogInDataFloat.Select(value => value.ToString("F6", CultureInfo.InvariantCulture)).ToList() - : message.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList(); + var analogValues = analogMsg.AnalogInDataFloat.Count > 0 + ? analogMsg.AnalogInDataFloat.Select(value => value.ToString("F6", CultureInfo.InvariantCulture)).ToList() + : analogMsg.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList(); - var timestamp = message.MsgTimeStamp.ToString(CultureInfo.InvariantCulture); + var timestamp = analogMsg.MsgTimeStamp.ToString(CultureInfo.InvariantCulture); var analog = string.Join(",", analogValues); - var digital = message.DigitalData.Length > 0 - ? BitConverter.ToString(message.DigitalData.ToByteArray()) + + var digitalSource = digitalMsg ?? analogMsg; + var digital = digitalSource.DigitalData.Length > 0 + ? BitConverter.ToString(digitalSource.DigitalData.ToByteArray()) : string.Empty; return $"{timestamp},{analog},{digital}"; } - private static string ToJsonLine(DaqifiOutMessage message) + private static string ToJsonLine(DaqifiOutMessage analogMsg, DaqifiOutMessage? digitalMsg) { - var analogValues = message.AnalogInDataFloat.Count > 0 - ? message.AnalogInDataFloat.Select(value => value.ToString("F6", CultureInfo.InvariantCulture)).ToList() - : message.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList(); + var analogValues = analogMsg.AnalogInDataFloat.Count > 0 + ? analogMsg.AnalogInDataFloat.Select(value => value.ToString("F6", CultureInfo.InvariantCulture)).ToList() + : analogMsg.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList(); - var digitalBytes = message.DigitalData.Length > 0 - ? BitConverter.ToString(message.DigitalData.ToByteArray()) + var digitalSource = digitalMsg ?? analogMsg; + var digitalBytes = digitalSource.DigitalData.Length > 0 + ? BitConverter.ToString(digitalSource.DigitalData.ToByteArray()) : string.Empty; return "{" + - $"\"ts\":{message.MsgTimeStamp.ToString(CultureInfo.InvariantCulture)}," + + $"\"ts\":{analogMsg.MsgTimeStamp.ToString(CultureInfo.InvariantCulture)}," + $"\"analog\":[{string.Join(",", analogValues)}]," + $"\"digital\":\"{digitalBytes}\"" + "}";