Conversation
… configurable. Also moved the SqlAlchemyDB to the setup() method for better connection pool handling
…s configurable. Also moved the SqlAlchemyDB to the setup() method for better connection pool handling"
There was a problem hiding this comment.
Thanks @chishankar-work for this PR (and sorry for being late to review it). I had just minor comments.
P.S. you might want to check #38 if you will be running with SQLAlchemy 1.4.
| assert isinstance(element, dict) | ||
| self.records.append(element) | ||
| self.record_counter = self.record_counter + 1 | ||
| self._db.write_record(self.table_config, element) |
There was a problem hiding this comment.
Shouldn't we remove this call now?
| self.record_counter = self.record_counter + 1 | ||
| self._db.write_record(self.table_config, element) | ||
|
|
||
| if (self.record_counter > self.max_batch_size): |
| self.commit_records() | ||
|
|
||
| def commit_records(self): | ||
| if self.record_counter() == 0: |
There was a problem hiding this comment.
self.record_counter() to self.record_counter
| """ | ||
|
|
||
| def __init__(self, source_config, table_config, *args, **kwargs): | ||
| def __init__(self, source_config, table_config, max_batch_size=1000, *args, **kwargs): |
There was a problem hiding this comment.
Let's add this to the class documentation and describe the new behavior.
| @@ -358,7 +361,6 @@ def write_record(self, session, create_insert_f, record_dict): | |||
| record=record_dict | |||
| ) | |||
| session.execute(insert_stmt) | |||
There was a problem hiding this comment.
There was a problem hiding this comment.
@mohaseeb Correct. I ended up creating my own batch insert mod based on your project and the only thing I changed was around removing assumptions that record is a dict. SQLAlchemy while generating insert statement supports both a single row (dict ) or multiple rows (list of dicts).
The idea is too emulate the way JDBCIO writes to SQL.
When calling
write_recordit does the.execute()and then.commit()in sequence, writing and committing each record to disk one at a time. The proposed change allows for more effective batching while better managing connection pooling to CSQL.Removed the
session.commit()from thewrite_recordwhich is called on every element in the_WriteToRelationalDBFnParDo. Instead, we just call.execute()on each record, and then commit it to disk all at once.Instead of building the engine at the start of each bundle, move
self._db = SqlAlchemyDB(self.source_config)to the.setup()method so it's only created once for the object and handles for connection pooling for the sessions that are opened and closed at the start and finish of each bundle.Handled the
.commit()logic in the DoFn. In thestart_bundlecreate arecord_counter = 0andrecords = []. This will allow us to build the commits up to sizes and ensure that they don't get too big.In cases where the bundles are small or divide unevenly leaving a chunk with less than 1000 records, we can directly call
commit_recordsin thefinish_bundle()to take care of the remaining elements in the bundle and flush the buffer.Made
max_batch_sizea configurable value with a default value of 1000. This can be changed easily by the user by doing something such as: