forked from embodyhub/EmbodyHub
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmultimodal_example.py
More file actions
74 lines (55 loc) · 2.28 KB
/
multimodal_example.py
File metadata and controls
74 lines (55 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Multimodal Data Management Example
This example demonstrates how to use the MultimodalDataManager to handle
different types of data streams in an embodied AI application.
"""
import numpy as np
import time
from threading import Thread
from embodyhub.adapters.multimodal_data_manager import MultimodalDataManager
def generate_image_data():
"""Generate simulated image data."""
return np.random.rand(64, 64, 3) # RGB image
def generate_audio_data():
"""Generate simulated audio data."""
return np.random.rand(16000) # 1 second of audio at 16kHz
def generate_sensor_data():
"""Generate simulated sensor data."""
return np.random.rand(10) # 10 sensor readings
def data_callback(data: np.ndarray):
"""Callback function for processing received data."""
print(f"Received data shape: {data.shape}, dtype: {data.dtype}")
def main():
# Initialize data manager
data_manager = MultimodalDataManager()
# Create data streams
data_manager.create_stream('camera_feed')
data_manager.create_stream('microphone_input')
data_manager.create_stream('sensor_readings')
# Subscribe to streams
data_manager.subscribe('camera_feed', data_callback)
data_manager.subscribe('microphone_input', data_callback)
data_manager.subscribe('sensor_readings', data_callback)
# Start data processing
data_manager.start()
# Simulate data generation
def data_generator():
for _ in range(10): # Generate 10 samples
# Publish image data
image_data = generate_image_data()
data_manager.publish('camera_feed', image_data)
# Publish audio data
audio_data = generate_audio_data()
data_manager.publish('microphone_input', audio_data)
# Publish sensor data
sensor_data = generate_sensor_data()
data_manager.publish('sensor_readings', sensor_data)
time.sleep(1) # Wait 1 second between samples
# Run data generation in a separate thread
generator_thread = Thread(target=data_generator)
generator_thread.start()
# Wait for data generation to complete
generator_thread.join()
# Stop data manager
data_manager.stop()
if __name__ == '__main__':
main()