Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
40 changes: 40 additions & 0 deletions components/Python/fileConnectors/dataframe_to_file/component.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"engineType": "Python",
"language": "Python",
"userStandalone": false,
"name": "dataframe_to_file",
"label": "Sink DataFrame to File",
"program": "main.py",
"componentClass": "MCenterComponentAdapter",
"modelBehavior": "Auxiliary",
"useMLOps": true,
"inputInfo": [
{
"description": "Pandas DataFrame",
"label": "dataframe",
"defaultComponent": "",
"type": "dataframe",
"group": "data"
}
],
"outputInfo": [
{
"description": "File name",
"label": "filename",
"defaultComponent": "",
"type": "str",
"group": "data"
}
],
"group": "Sinks",
"arguments": [
{
"key": "file-path",
"label": "save to file",
"type": "str",
"description": "Save DataFrame to file",
"optional": true
}
],
"version": 1
}
47 changes: 47 additions & 0 deletions components/Python/fileConnectors/dataframe_to_file/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import print_function

import argparse
import sys
import time
import os
import pandas

from parallelm.components import ConnectableComponent
from parallelm.mlops.stats.multi_line_graph import MultiLineGraph
from parallelm.mlops import mlops as mlops

class MCenterComponentAdapter(ConnectableComponent):
"""
Adapter for df_to_file
"""

def __init__(self, engine):
super(self.__class__, self).__init__(engine)

def _materialize(self, parent_data_objs, user_data):
df_results = parent_data_objs[0]
results_path = self._params.get('file-path')
return [df_to_file(df_results, results_path)]


def df_to_file(df_predict_results, filepath):
"""
Save DataFrame to file
"""
prog_start_time = time.time()
mlops.init()
suffix_time_stamp = str(int(time.time()))
save_file = filepath + '.' + suffix_time_stamp
sfile = open(save_file, 'w+')
pandas.DataFrame(df_predict_results).to_csv(save_file)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSV saving:

  1. rename accordingly
  2. Add CSV options (like header and so on)

sfile.close()
mlops.done()
return save_file


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--file-path", default='/tmp/results', help="Save DataFrame to file")
options = parser.parse_args()
return options

Empty file.
38 changes: 38 additions & 0 deletions components/Python/fileConnectors/file_to_dataframe/component.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"engineType": "Python",
"language": "Python",
"userStandalone": false,
"name": "file_to_dataframe",
"label": "Source File to DataFrame",
"program": "main.py",
"componentClass": "MCenterComponentAdapter",
"modelBehavior": "Auxiliary",
"useMLOps": true,
"inputInfo": [{
"description": "File to read contents",
"label": "File-Name",
"defaultComponent": "",
"type": "str",
"group": "data"
}],
"outputInfo": [
{
"description": "Pandas Dataframe",
"label": "dataframe",
"defaultComponent": "",
"type": "dataframe",
"group": "data"
}
],
"group": "Connectors",
"arguments": [
{
"key": "file-path",
"label": "Dataset file to read",
"type": "str",
"description": "File to use for loading DataSet into DataFrame",
"optional": true
}
],
"version": 1
}
47 changes: 47 additions & 0 deletions components/Python/fileConnectors/file_to_dataframe/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import print_function

import argparse
import sys
import time
import os
import pandas

from parallelm.components import ConnectableComponent
from parallelm.mlops.stats.multi_line_graph import MultiLineGraph
from parallelm.mlops import mlops as mlops

class MCenterComponentAdapter(ConnectableComponent):
"""
Adapter for read_file_to_df
"""

def __init__(self, engine):
super(self.__class__, self).__init__(engine)

def _materialize(self, parent_data_objs, user_data):
file_path = str(parent_data_objs[0])
if file_path is None:
file_path = self._params.get('file_path')
return [read_file_to_df(file_path)]


def read_file_to_df(filepath):
"""
Read file and return DataFrame
"""
mlops.init()
if not os.path.exists(filepath):
print("stderr- failed to find {}".format(filepath), file=sys.stderr)
raise Exception("file path does not exist: {}".format(filepath))

test_data = pandas.read_csv(filepath)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it is a csv read. Maybe:

  1. change the name of the component to include that.
  2. Expose csv loading options - like header, and so on

mlops.done()
return test_data


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--file-path", default='/tmp/test-data.csv', help="Dataset to read")
options = parser.parse_args()
return options