Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Daqifi.Core.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</ItemGroup>

<ItemGroup Condition="'$(DaqifiCoreProjectPath)' == ''">
<PackageReference Include="Daqifi.Core" Version="0.18.0" />
<PackageReference Include="Daqifi.Core" Version="0.19.4" />
</ItemGroup>

</Project>
172 changes: 135 additions & 37 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ private static async Task<int> RunStreamingSessionAsync(CliOptions options)
}

var messageCount = 0;

// The device sends analog and digital data in separate protobuf
// messages that share the same timestamp. We buffer the pending
// analog message and merge it with the subsequent digital message
// before writing a single combined output row.
DaqifiOutMessage? pendingAnalog = null;
var pendingLock = new object();

device.MessageReceived += (_, eventArgs) =>
{
if (stopCts.IsCancellationRequested)
Expand All @@ -192,24 +200,50 @@ private static async Task<int> RunStreamingSessionAsync(CliOptions options)
return;
}

var currentCount = Interlocked.Increment(ref messageCount);
if (options.MessageLimit > 0 && currentCount > options.MessageLimit)
if (options.ShowStatusMessages && ProtobufProtocolHandler.DetectMessageType(message) == ProtobufMessageType.Status)
{
WriteStatusSummary(outputWriter, message);
return;
}

if (IsStreamLikeMessage(message))
if (!IsStreamLikeMessage(message))
{
WriteStreamSample(outputWriter, message, options.OutputFormat);
}
else if (options.ShowStatusMessages && ProtobufProtocolHandler.DetectMessageType(message) == ProtobufMessageType.Status)
{
WriteStatusSummary(outputWriter, message);
return;
}

if (options.MessageLimit > 0 && currentCount >= options.MessageLimit)
lock (pendingLock)
{
stopCts.Cancel();
var hasAnalog = message.AnalogInData.Count > 0 || message.AnalogInDataFloat.Count > 0;
var hasDigital = message.DigitalData.Length > 0;

if (hasAnalog && !hasDigital)
{
// Flush any stale pending message before buffering the new one
if (pendingAnalog != null)
{
WriteMergedSample(outputWriter, pendingAnalog, null, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts);
}

pendingAnalog = message;
return;
}

if (hasDigital && pendingAnalog != null && pendingAnalog.MsgTimeStamp == message.MsgTimeStamp)
{
// Matching pair — merge and write
WriteMergedSample(outputWriter, pendingAnalog, message, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts);
pendingAnalog = null;
return;
}

// Digital-only with no matching analog, or timestamp mismatch
if (pendingAnalog != null)
{
WriteMergedSample(outputWriter, pendingAnalog, null, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts);
pendingAnalog = null;
}

WriteMergedSample(outputWriter, message, null, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts);
}
};

Expand Down Expand Up @@ -249,6 +283,16 @@ private static async Task<int> RunStreamingSessionAsync(CliOptions options)
device.Send(ScpiMessageProducer.StopStreaming);
Console.WriteLine("Streaming stopped.");

// Flush any buffered analog-only message that never got a matching digital
lock (pendingLock)
{
if (pendingAnalog != null)
{
WriteMergedSample(outputWriter, pendingAnalog, null, options.OutputFormat, ref messageCount, options.MessageLimit, stopCts);
pendingAnalog = null;
}
}

if (options.MinSamples > 0 && messageCount < options.MinSamples)
{
Console.Error.WriteLine(
Expand Down Expand Up @@ -663,8 +707,39 @@ private static async Task<int> RunSdCardOperationAsync(CliOptions options)
{
streamingDevice.StreamingFrequency = options.SampleRate;

// Enable channels before starting SD card logging. Core's
// StartSdCardLoggingAsync only forwards the channel mask — it does
// not enable channels itself. Without an explicit mask we enable all
// ADC channels (the device reports AnalogInputChannels in its
// capabilities after InitializeAsync) and DIO ports so the log
// file is not empty.
var channelMask = options.ChannelMask;
if (!string.IsNullOrWhiteSpace(channelMask) && !IsValidChannelMask(channelMask))
{
Console.Error.WriteLine($"Invalid channel mask: {channelMask}");
return 1;
}

if (string.IsNullOrWhiteSpace(channelMask))
{
var adcCount = streamingDevice.Metadata.Capabilities.AnalogInputChannels;
if (adcCount > 0)
{
channelMask = new string('1', adcCount);
}
}

if (!string.IsNullOrWhiteSpace(channelMask))
{
streamingDevice.Send(ScpiMessageProducer.EnableAdcChannels(channelMask));
await Task.Delay(100);
}

streamingDevice.Send(ScpiMessageProducer.EnableDioPorts());
await Task.Delay(100);

await streamingDevice.StartSdCardLoggingAsync(
channelMask: options.ChannelMask,
channelMask: channelMask,
format: options.SdLogFormat);
Console.WriteLine("SD card logging started.");

Expand Down Expand Up @@ -1071,20 +1146,38 @@ private static bool IsStreamLikeMessage(DaqifiOutMessage message)
message.DigitalData.Length > 0;
}

private static void WriteStreamSample(TextWriter writer, DaqifiOutMessage message, OutputFormat format)
private static void WriteMergedSample(
TextWriter writer,
DaqifiOutMessage analogMessage,
DaqifiOutMessage? digitalMessage,
OutputFormat format,
ref int messageCount,
int messageLimit,
CancellationTokenSource stopCts)
{
var currentCount = Interlocked.Increment(ref messageCount);
if (messageLimit > 0 && currentCount > messageLimit)
{
return;
}

switch (format)
{
case OutputFormat.Jsonl:
writer.WriteLine(ToJsonLine(message));
writer.WriteLine(ToJsonLine(analogMessage, digitalMessage));
break;
case OutputFormat.Csv:
writer.WriteLine(ToCsvLine(message));
writer.WriteLine(ToCsvLine(analogMessage, digitalMessage));
break;
default:
writer.WriteLine(ToTextLine(message));
writer.WriteLine(ToTextLine(analogMessage, digitalMessage));
break;
}

if (messageLimit > 0 && currentCount >= messageLimit)
{
stopCts.Cancel();
}
}

private static void WriteStatusSummary(TextWriter writer, DaqifiOutMessage message)
Expand All @@ -1094,19 +1187,19 @@ private static void WriteStatusSummary(TextWriter writer, DaqifiOutMessage messa
$"fw={message.DeviceFwRev ?? "unknown"} sn={message.DeviceSn}");
}

private static string ToTextLine(DaqifiOutMessage message)
private static string ToTextLine(DaqifiOutMessage analogMsg, DaqifiOutMessage? digitalMsg)
{
var builder = new StringBuilder();
if (message.MsgTimeStamp != 0)
if (analogMsg.MsgTimeStamp != 0)
{
builder.Append("ts=");
builder.Append(message.MsgTimeStamp.ToString(CultureInfo.InvariantCulture));
builder.Append(analogMsg.MsgTimeStamp.ToString(CultureInfo.InvariantCulture));
builder.Append(' ');
}

var analogValues = message.AnalogInDataFloat.Count > 0
? message.AnalogInDataFloat.Select(value => value.ToString("F3", CultureInfo.InvariantCulture)).ToList()
: message.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList();
var analogValues = analogMsg.AnalogInDataFloat.Count > 0
? analogMsg.AnalogInDataFloat.Select(value => value.ToString("F3", CultureInfo.InvariantCulture)).ToList()
: analogMsg.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList();

if (analogValues.Count > 0)
{
Expand All @@ -1119,43 +1212,48 @@ private static string ToTextLine(DaqifiOutMessage message)
builder.Append(']');
}

if (message.DigitalData.Length > 0)
// Prefer the paired digital message; fall back to analog message's own digital data
var digitalSource = digitalMsg ?? analogMsg;
if (digitalSource.DigitalData.Length > 0)
{
var digital = BitConverter.ToString(message.DigitalData.ToByteArray());
var digital = BitConverter.ToString(digitalSource.DigitalData.ToByteArray());
builder.Append(" digital=");
builder.Append(digital);
}

return builder.ToString();
}

private static string ToCsvLine(DaqifiOutMessage message)
private static string ToCsvLine(DaqifiOutMessage analogMsg, DaqifiOutMessage? digitalMsg)
{
var analogValues = message.AnalogInDataFloat.Count > 0
? message.AnalogInDataFloat.Select(value => value.ToString("F6", CultureInfo.InvariantCulture)).ToList()
: message.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList();
var analogValues = analogMsg.AnalogInDataFloat.Count > 0
? analogMsg.AnalogInDataFloat.Select(value => value.ToString("F6", CultureInfo.InvariantCulture)).ToList()
: analogMsg.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList();

var timestamp = message.MsgTimeStamp.ToString(CultureInfo.InvariantCulture);
var timestamp = analogMsg.MsgTimeStamp.ToString(CultureInfo.InvariantCulture);
var analog = string.Join(",", analogValues);
var digital = message.DigitalData.Length > 0
? BitConverter.ToString(message.DigitalData.ToByteArray())

var digitalSource = digitalMsg ?? analogMsg;
var digital = digitalSource.DigitalData.Length > 0
? BitConverter.ToString(digitalSource.DigitalData.ToByteArray())
: string.Empty;

return $"{timestamp},{analog},{digital}";
}

private static string ToJsonLine(DaqifiOutMessage message)
private static string ToJsonLine(DaqifiOutMessage analogMsg, DaqifiOutMessage? digitalMsg)
{
var analogValues = message.AnalogInDataFloat.Count > 0
? message.AnalogInDataFloat.Select(value => value.ToString("F6", CultureInfo.InvariantCulture)).ToList()
: message.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList();
var analogValues = analogMsg.AnalogInDataFloat.Count > 0
? analogMsg.AnalogInDataFloat.Select(value => value.ToString("F6", CultureInfo.InvariantCulture)).ToList()
: analogMsg.AnalogInData.Select(value => value.ToString(CultureInfo.InvariantCulture)).ToList();

var digitalBytes = message.DigitalData.Length > 0
? BitConverter.ToString(message.DigitalData.ToByteArray())
var digitalSource = digitalMsg ?? analogMsg;
var digitalBytes = digitalSource.DigitalData.Length > 0
? BitConverter.ToString(digitalSource.DigitalData.ToByteArray())
: string.Empty;

return "{" +
$"\"ts\":{message.MsgTimeStamp.ToString(CultureInfo.InvariantCulture)}," +
$"\"ts\":{analogMsg.MsgTimeStamp.ToString(CultureInfo.InvariantCulture)}," +
$"\"analog\":[{string.Join(",", analogValues)}]," +
$"\"digital\":\"{digitalBytes}\"" +
"}";
Expand Down