From 197f694cae70eca7adb6b7a09e759e5202ec67f6 Mon Sep 17 00:00:00 2001 From: Pedram Pejman Date: Sun, 25 Oct 2020 19:20:14 -0700 Subject: [PATCH 1/2] Adds basic E2E TFX pipeline --- .gitignore | 3 + learning/bert_model.py | 128 +++++++++++++++++++ learning/bert_tokenizer.py | 182 +++++++++++++++++++++++++++ learning/speedo_pipeline.py | 156 +++++++++++++++++++++++ learning/speedo_pipeline_e2e_test.py | 86 +++++++++++++ learning/speedo_utils.py | 96 ++++++++++++++ 6 files changed, 651 insertions(+) create mode 100644 learning/bert_model.py create mode 100644 learning/bert_tokenizer.py create mode 100644 learning/speedo_pipeline.py create mode 100644 learning/speedo_pipeline_e2e_test.py create mode 100644 learning/speedo_utils.py diff --git a/.gitignore b/.gitignore index a48caef..bfcc24d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ functions/*/node_modules **/.DS_STORE manifest.json +**/*_sa.json +learning/tfx/metadata/* +learning/tfx/pipelines/* diff --git a/learning/bert_model.py b/learning/bert_model.py new file mode 100644 index 0000000..276f8b9 --- /dev/null +++ b/learning/bert_model.py @@ -0,0 +1,128 @@ +# Lint as: python2, python3 +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Configurable fine-tuning BERT models for various tasks.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from typing import Text, Optional, List, Union + +import tensorflow as tf +import tensorflow.keras as keras + + +def build_bert_classifier(bert_layer: tf.keras.layers.Layer, + max_len: int, + num_classes: int, + dropout: float = 0.1, + activation: Optional[Text] = None): + """BERT Keras model for classification. + + Connect configurable fully connected layers on top of the BERT + pooled_output. + + Args: + bert_layer: A tensorflow_hub.KerasLayer intence of BERT layer. + max_len: The maximum length of preprocessed tokens. + num_classes: Number of unique classes in the labels. Determines the output + shape of the classification layer. + dropout: Dropout rate to be used for the classification layer. + activation: Activation function to use. If you don't specify anything, no + activation is applied (ie. "linear" activation: a(x) = x). + + Returns: + A Keras model. + """ + input_layer_names = ["input_word_ids", "input_mask", "segment_ids"] + + input_layers = [ + keras.layers.Input(shape=(max_len,), dtype=tf.int64, name=name) + for name in input_layer_names + ] + + converted_layers = [tf.cast(k, tf.int32) for k in input_layers] + + pooled_output, _ = bert_layer(converted_layers) + output = keras.layers.Dropout(dropout)(pooled_output) + output = keras.layers.Dense(num_classes, activation=activation)(output) + model = keras.Model(input_layers, output) + return model + + +def compile_bert_classifier( + model: tf.keras.Model, + loss: tf.keras.losses = tf.keras.losses.SparseCategoricalCrossentropy( + from_logits=True), + learning_rate: float = 2e-5, + metrics: List[Union[Text, tf.keras.metrics.Metric]] = None): + """Compile the BERT classifier using suggested parameters. + + Args: + model: A keras model. Most likely the output of build_bert_classifier. + loss: tf.keras.losses. The suggested loss function expects integer labels + (e.g. 0, 1, 2). If the labels are one-hot encoded, consider using + tf.keras.lossesCategoricalCrossEntropy with from_logits set to true. + learning_rate: Suggested learning rate to be used in + tf.keras.optimizer.Adam. The three suggested learning_rates for + fine-tuning are [2e-5, 3e-5, 5e-5]. + metrics: Default None will use ['sparse_categorical_accuracy']. An array of + strings or tf.keras.metrics. + + Returns: + None. + """ + if metrics is None: + metrics = ["sparse_categorical_accuracy"] + + model.compile( + optimizer=tf.keras.optimizers.Adam(learning_rate), + loss=loss, + metrics=metrics) + + +def build_and_compile_bert_classifier( + bert_layer: tf.keras.layers.Layer, + max_len: int, + num_classes: int, + learning_rate: float = 5e-5, + metrics: List[Union[Text, tf.keras.metrics.Metric]] = None): + """Build and compile keras BERT classification model. + + Apart from the necessary inputs, use default/suggested parameters in build + and compile BERT classifier functions. + + Args: + bert_layer: A tensorflow_hub.KerasLayer intence of BERT layer. + max_len: The maximum length of preprocessed tokens. + num_classes: Number of unique classes in the labels. Determines the output + shape of the classification layer. + learning_rate: Suggested learning rate to be used in + tf.keras.optimizer.Adam. The three suggested learning_rates for + fine-tuning are [2e-5, 3e-5,5e-5] + metrics: Default None will use ['sparse_categorical_accuracy']. An array of + strings or tf.keras.metrics. + + Returns: + A compiled keras BERT Classification model. + """ + if metrics is None: + metrics = ["sparse_categorical_accuracy"] + + model = build_bert_classifier(bert_layer, max_len, num_classes) + + compile_bert_classifier(model, learning_rate=learning_rate, metrics=metrics) + return model + diff --git a/learning/bert_tokenizer.py b/learning/bert_tokenizer.py new file mode 100644 index 0000000..5b5017d --- /dev/null +++ b/learning/bert_tokenizer.py @@ -0,0 +1,182 @@ +# Lint as: python2, python3 +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Prepressing using tensorflow_text BertTokenizer.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from typing import Text + +import tensorflow as tf +import tensorflow_hub as hub +import tensorflow_text as text + +from tensorflow.python.eager.context import eager_mode # pylint: disable=g-direct-tensorflow-import + + +_CLS = '[CLS]' +_PAD = '[PAD]' +_SEP = '[SEP]' + + +class BertPreprocessor(object): + """Bert Tokenizer built ontop of tensorflow_text.BertTokenizer.""" + + def __init__(self, model_link: Text): + self._model_link = model_link + self._model = hub.KerasLayer(model_link) + self._find_special_tokens() + + def _find_special_tokens(self): + """Find the special token ID's for [CLS] [PAD] [SEP]. + + Since each Bert model is trained on different vocabulary, it's important + to find the special token indices pertaining to that model. + Since in Transform, tensorflow_hub.KerasLayer loads a symbolic tensor, turn + on eager mode to get the actual vocab_file location. + """ + + with eager_mode(): + model = hub.KerasLayer(self._model_link) + vocab = model.resolved_object.vocab_file.asset_path.numpy() + self._do_lower_case = model.resolved_object.do_lower_case.numpy() + with tf.io.gfile.GFile(vocab, 'r') as f: + lines = f.read().split('\n') + self._sep_id = lines.index(_SEP) + self._cls_id = lines.index(_CLS) + self._pad_id = lines.index(_PAD) + + def tokenize_single_sentence_unpad(self, + sequence: tf.Tensor, + max_len: int = 128, + add_cls: bool = True, + add_sep: bool = True): + """Tokenize a sentence with the BERT model vocab file and without padding. + + Add special tokens according to config. + + Args: + sequence: Tensor of shape [batch_size, 1]. + max_len: The number of tokens after padding and truncating. + add_cls: Whether to add CLS token at the front of each sequence. + add_sep: Whether to add SEP token at the end of each sequence. + + Returns: + word_ids: Ragged tokenized sequences [batch_size, None]. + """ + vocab_file_path = self._model.resolved_object.vocab_file.asset_path + tokenizer = text.BertTokenizer( + vocab_file_path, + lower_case=self._do_lower_case, + token_out_type=tf.int64) + word_ids = tokenizer.tokenize(sequence) + # Tokenizer default puts tokens into array of size 1. merge_dims flattens it + word_ids = word_ids.merge_dims(-2, -1) + if add_cls: + cls_token = tf.fill([tf.shape(sequence)[0], 1], + tf.constant(self._cls_id, dtype=tf.int64)) + + word_ids = tf.concat([cls_token, word_ids], 1) + + if add_sep: + sep_token = tf.fill([tf.shape(sequence)[0], 1], + tf.constant(self._sep_id, dtype=tf.int64)) + + word_ids = word_ids[:, :max_len - 1] + word_ids = tf.concat([word_ids, sep_token], 1) + + return word_ids + + def tokenize_single_sentence_pad(self, + sequence: tf.Tensor, + max_len: int = 128, + add_cls: bool = True, + add_sep: bool = True): + """Tokenize a single sentence according to the vocab used by the Bert model. + + Add special tokens according to config. + + Args: + sequence: Tensor of shape [batch_size, 1]. + max_len: The number of tokens after padding and truncating. + add_cls: Whether to add CLS token at the front of each sequence. + add_sep: Whether to add SEP token at the end of each sequence. + + Returns: + word_ids: Tokenized sequences [batch_size, max_len]. + input_mask: Mask padded tokens [batch_size, max_len]. + segment_ids: Distinguish multiple sequences [batch_size, max_len]. + """ + word_ids = self.tokenize_single_sentence_unpad(sequence, max_len, add_cls, + add_sep) + + word_ids = word_ids.to_tensor( + shape=[None, max_len], + default_value=tf.constant(self._pad_id, dtype=tf.int64)) + + input_mask = tf.cast(tf.not_equal(word_ids, self._pad_id), tf.int64) + segment_ids = tf.fill(tf.shape(input_mask), tf.constant(0, dtype=tf.int64)) + + return word_ids, input_mask, segment_ids + + def tokenize_sentence_pair(self, sequence_a: tf.Tensor, sequence_b: tf.Tensor, + max_len: int): + """Tokenize a sequence pair. + + Tokenize each sequence with self.tokenize_single_sentence. Then add CLS + token in front of the first sequence, add SEP tokens between the two + sequences and at the end of the second sequence. + + Args: + sequence_a: [batch_size, 1] + sequence_b: [batch_size, 1] + max_len: The length of the concatenated tokenized sentences. + + Returns: + word_ids: Tokenized sequences [batch_size, max_len]. + input_mask: Mask padded tokens [batch_size, max_len]. + segment_ids: Distinguish multiple sequences [batch_size, max_len]. + """ + # TODO(dzats): the issue here is nuanced. Depending on the dataset, one + # might want to keep the entire first sentence, or the second. Consider + # alternate truncate stratagies. + sentence_len = max_len // 2 + word_id_a = self.tokenize_single_sentence_unpad( + sequence_a, + sentence_len, + True, + True, + ) + + word_id_b = self.tokenize_single_sentence_unpad( + sequence_b, + sentence_len, + False, + True, + ) + + word_ids = tf.concat([word_id_a, word_id_b], 1) + word_ids = word_ids.to_tensor( + shape=[None, max_len], + default_value=tf.constant(self._pad_id, dtype=tf.int64)) + + input_mask = tf.cast(tf.not_equal(word_ids, self._pad_id), tf.int64) + # Fill a ragged tensor of zero with word_id_a's shape + segment_ids = tf.cast(word_id_a < 0, tf.int64) + segment_ids = segment_ids.to_tensor( + shape=[None, max_len], default_value=tf.constant(1, dtype=tf.int64)) + return word_ids, input_mask, segment_ids + diff --git a/learning/speedo_pipeline.py b/learning/speedo_pipeline.py new file mode 100644 index 0000000..8a0b5a9 --- /dev/null +++ b/learning/speedo_pipeline.py @@ -0,0 +1,156 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +from typing import Text + +import absl +import tensorflow_model_analysis as tfma +from tfx.extensions.google_cloud_big_query.example_gen.component import BigQueryExampleGen +from tfx.components import Evaluator +from tfx.components import ExampleValidator +from tfx.components import Pusher +from tfx.components import ResolverNode +from tfx.components import SchemaGen +from tfx.components import StatisticsGen +from tfx.components import Trainer +from tfx.components import Transform +from tfx.components.trainer.executor import GenericExecutor +from tfx.orchestration import metadata +from tfx.orchestration import pipeline +from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner +from tfx.components.base import executor_spec +from tfx.dsl.experimental import latest_blessed_model_resolver +from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner +from tfx.proto import example_gen_pb2 +from tfx.proto import pusher_pb2 +from tfx.proto import trainer_pb2 +from tfx.types import Channel +from tfx.types.standard_artifacts import Model +from tfx.types.standard_artifacts import ModelBlessing +from tfx.utils.dsl_utils import external_input + +os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.join(os.getcwd(), "speedo_bq_user_sa.json") + +_pipeline_name = 'speedo_pipeline' +_project_root = os.getcwd() +_data_root = os.path.join(_project_root, 'data', 'simple') +_module_file = os.path.join(_project_root, 'speedo_utils.py') +_serving_model_dir = os.path.join(_project_root, 'serving_model', _pipeline_name) + +_tfx_root = os.path.join(_project_root, 'tfx') +_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name) +_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, + 'metadata.db') + +_gcp_project_id = 'speedo-249504' +_gcs_tmp_dir = 'gs://speedo-dev-tmp/' + +def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, + module_file: Text, serving_model_dir: Text, + metadata_path: Text, + direct_num_workers: int) -> pipeline.Pipeline: + """Returns a pipeline composed of a single ExampleGen component""" + + query = """ + SELECT + features.title, + features.description, + features.channel, + playback_rate.playback_rate + FROM + speedo_dev.playback_rate""" + + example_gen = BigQueryExampleGen(query=query) + statistics_gen = StatisticsGen(examples=example_gen.outputs['examples']) + schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True) + example_validator = ExampleValidator( + statistics=statistics_gen.outputs['statistics'], + schema=schema_gen.outputs['schema']) + + transform = Transform(examples=example_gen.outputs['examples'], + schema=schema_gen.outputs['schema'], + module_file=_module_file) + + trainer = Trainer( + module_file=module_file, + custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor), + examples=transform.outputs['transformed_examples'], + transform_graph=transform.outputs['transform_graph'], + schema=schema_gen.outputs['schema'], + train_args=trainer_pb2.TrainArgs(num_steps=2), + eval_args=trainer_pb2.EvalArgs(num_steps=1)) + + model_resolver = ResolverNode( + instance_name='latest_blessed_model_resolver', + resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver, + model=Channel(type=Model), + model_blessing=Channel(type=ModelBlessing)) + + # Uses TFMA to compute an evaluation statistics over features of a model and + # perform quality validation of a candidate model (compared to a baseline). + eval_config = tfma.EvalConfig( + model_specs=[tfma.ModelSpec(label_key='playback_rate')], + slicing_specs=[tfma.SlicingSpec()], + metrics_specs=[ + tfma.MetricsSpec(metrics=[ + tfma.MetricConfig( + class_name='SparseCategoricalAccuracy', + threshold=tfma.MetricThreshold( + value_threshold=tfma.GenericValueThreshold( + # Adjust the threshold when training on the + # full dataset. + lower_bound={'value': 0.5}), + change_threshold=tfma.GenericChangeThreshold( + direction=tfma.MetricDirection.HIGHER_IS_BETTER, + absolute={'value': -1e-2}))) + ]) + ]) + + evaluator = Evaluator( + examples=example_gen.outputs['examples'], + model=trainer.outputs['model'], + baseline_model=model_resolver.outputs['model'], + # Change threshold will be ignored if there is no baseline (first run). + eval_config=eval_config) + + # Checks whether the model passed the validation steps and pushes the model + # to a file destination if check passed. + pusher = Pusher( + model=trainer.outputs['model'], + model_blessing=evaluator.outputs['blessing'], + push_destination=pusher_pb2.PushDestination( + filesystem=pusher_pb2.PushDestination.Filesystem( + base_directory=serving_model_dir))) + + return pipeline.Pipeline( + pipeline_name=pipeline_name, + pipeline_root=pipeline_root, + components=[ + example_gen, + statistics_gen, + schema_gen, + example_validator, + transform, + trainer, + model_resolver, + evaluator, + pusher, + ], + enable_cache=True, + metadata_connection_config=metadata.sqlite_metadata_connection_config( + metadata_path), + beam_pipeline_args=['--direct_num_workers=%d' % direct_num_workers, + '--temp_location=%s' % _gcs_tmp_dir, + '--project=%s' % _gcp_project_id]) + +BeamDagRunner().run( + _create_pipeline( + pipeline_name=_pipeline_name, + pipeline_root=_pipeline_root, + data_root=_data_root, + module_file=_module_file, + serving_model_dir=_serving_model_dir, + metadata_path=_metadata_path, + direct_num_workers=0)) diff --git a/learning/speedo_pipeline_e2e_test.py b/learning/speedo_pipeline_e2e_test.py new file mode 100644 index 0000000..414e504 --- /dev/null +++ b/learning/speedo_pipeline_e2e_test.py @@ -0,0 +1,86 @@ +"""E2E Tests for Speedo TFX Pipeline.""" + +import os +from typing import Text + +import tensorflow as tf + +from tfx.examples.bert.cola import bert_cola_pipeline +from tfx.orchestration import metadata +from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner + +class SpeedoPipelineEndToEndTest(tf.test.TestCase): + def setUp(self): + super(SpeedoPipelineEndToEndTest, self).setUp() + self._test_dir = os.path.join( + os.environ.get('TEST_UNDECLARED_OUTPUTS_DIR', self.get_temp_dir()), + self._testMethodName) + + self._pipeline_name = 'speedo_test' + self._data_root = os.path.join(os.path.dirname(__file__), 'data') + self._module_file = os.path.join( + os.path.dirname(__file__), 'speedo_utils.py') + self._serving_model_dir = os.path.join(self._test_dir, 'serving_model') + self._pipeline_root = os.path.join(self._test_dir, 'tfx', 'pipelines', self._pipeline_name) + self._metadata_path = os.path.join(self._test_dir, 'tfx', 'metadata', self._pipeline_name, 'metadata.db') + + def assertExecutedOnce(self, component: Text) -> None: + """Check the component is executed exactly once.""" + component_path = os.path.join(self._pipeline_root, component) + self.assertTrue(tf.io.gfile.exists(component_path)) + outputs = tf.io.gfile.listdir(component_path) + for output in outputs: + executions = tf.io.gfile.listdir(os.path.join(component_path, output)) + self.assertEqual(1, len(executions)) + + + def assertPipelineExecution(self) -> None: + components = ['BigQueryExampleGen', 'Evaluator', 'ExampleValidator', 'Pusher', 'SchemaGen', 'StatisticsGen', 'Trainer', 'Transform'] + for component in components: + self.assertExecutedOnce(component) + + def testPipeline(self) -> None: + pipeline = speedo_pipeline._create_pipeline( + pipeline_name=self._pipeline_name, + data_root=self._data_root, + module_file=self._module_file, + serving_model_dir=self._serving_model_dir, + pipeline_root=self._pipeline_root, + metadata_path=self._matadata_path, + beam_pipeline_args=['--direct_num_workers=1']) + + BeamDagRunner().run(pipeline) + + self.assertTrue(tf.io.gfile.exist(self._serving_model_dir)) + self.assertTrue(tf.io.gfile.exists(self._metadata_path)) + expected_execution_count = 9 = 8 components + 1 resolver + metadata_config = metadata.sqlite_metadata_connection_config(self._metadata_path) + + with metadata.Metadata(metadata_config) as m: + artifact_count = len(m.store.get_artifacts()) + execution_count = len(m.store.get_executions()) + self.assertGreaterEqual(artifact_count, execution_count) + self.assertEqual(expected_execution_count, execution_count) + + self.assertPipelineExecution() + + # Runs pipeline the second time. + BeamDagRunner().run(pipeline) + + with metadata.Metadata(metadata_config) as m: + # Artifact count is increased by 3 caused by Evaluator and Pusher. + self.assertGreaterEqual(3 + artifact_count, execution_count) + artifact_count = len(m.store.get_artifacts()) + self.assertEqual(expected_execution_count * 2, len(m.store.get_executions())) + + # Runs pipeline the third time. + BeamDagRunner().run(pipeline) + + with metadata.Metadata(metadata_config) as m: + # Artifact count is unchanged. + self.assertGreaterEqual(artifact_count, execution_count) + self.assertEqual(expected_execution_count * 3, len(m.store.get_executions())) + +if __name__ == '__main__': + tf.compat.v1.enable_v2_behavior() + tf.test.main() diff --git a/learning/speedo_utils.py b/learning/speedo_utils.py new file mode 100644 index 0000000..3f5a642 --- /dev/null +++ b/learning/speedo_utils.py @@ -0,0 +1,96 @@ +from typing import List, Text +import tensorflow as tf +import tensorflow_hub as hub +import tensorflow_transform as tft +from tfx.components.trainer.executor import TrainerFnArgs +from bert_model import build_and_compile_bert_classifier +from bert_tokenizer import BertPreprocessor + +_BERT_LINK = 'https://tfhub.dev/tensorflow/bert_en_cased_L-12_H-768_A-12/2' + +_TRAIN_BATCH_SIZE = 16 +_EVAL_BATCH_SIZE = 16 +_TEXT_FEATURE_KEY = 'channel' +_LABEL_KEY = 'playback_rate' +_MAX_LEN = 256 +_EPOCHS = 1 + +def preprocessing_fn(inputs): + """tf.transform's callback function for preprocessing inputs.""" + input_word_ids, input_mask, segment_ids = _tokenize(inputs[_TEXT_FEATURE_KEY]) + labels = inputs[_LABEL_KEY] - 1 + + return { + _LABEL_KEY: labels, + 'input_word_ids': input_word_ids, + 'input_mask': input_mask, + 'segment_ids': segment_ids + } + +def run_fn(fn_args: TrainerFnArgs): + """Train the model based on given args.""" + + tf_transform_output = tft.TFTransformOutput(fn_args.transform_output) + train_dataset = _input_fn(fn_args.train_files, tf_transform_output, batch_size=_TRAIN_BATCH_SIZE) + eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, batch_size=_EVAL_BATCH_SIZE) + + mirrored_strategy = tf.distribute.MirroredStrategy() + with mirrored_strategy.scope(): + bert_layer = hub.KerasLayer(_BERT_LINK, trainable=True) + model = build_and_compile_bert_classifier(bert_layer, _MAX_LEN, 2) + + model.fit( + train_dataset, + epochs=_EPOCHS, + steps_per_epoch=fn_args.train_steps, + validation_data=eval_dataset, + validation_steps=fn_args.eval_steps) + + serving_fn = _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function( + tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')) + + signatures = { + 'serving_default': serving_fn + } + + model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures) + +def _gzip_reader_fn(filenames): + """Small utility returning a record reader that can read gzip'ed files.""" + return tf.data.TFRecordDataset(filenames, compression_type='GZIP') + +def _tokenize(sequence): + """Tokenize the sentence and insert appropriate tokens.""" + processor = BertPreprocessor(_BERT_LINK) + return processor.tokenize_single_sentence_pad( + tf.reshape(sequence, [-1]), max_len=_MAX_LEN) + +def _input_fn(file_pattern: List[Text], tf_transform_output: tft.TFTransformOutput, + batch_size: int = 200) -> tf.data.Dataset: + """Generates features and label for tuning/training.""" + transformed_feature_spec = ( + tf_transform_output.transformed_feature_spec().copy()) + dataset = tf.data.experimental.make_batched_features_dataset( + file_pattern=file_pattern, + batch_size=batch_size, + features=transformed_feature_spec, + reader=_gzip_reader_fn, + label_key=_LABEL_KEY) + + return dataset.prefetch(tf.data.experimental.AUTOTUNE) + +def _get_serve_tf_examples_fn(model, tf_transform_output): + """Returns inference function: serialized tf.Example -> infernece results.""" + + model.tft_layer = tf_transform_output.transform_features_layer() + + @tf.function + def serve_tf_examples_fn(serialized_tf_examples): + feature_spec = tf_transform_output.raw_feature_spec() + feature_spec.pop(_LABEL_KEY) + parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec) + + transformed_features = model.tft_layer(parsed_features) + return model(transformed_features) + + return serve_tf_examples_fn From de0af5c3c12a478733edede882323a2ef615b002 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 26 Oct 2020 02:21:00 +0000 Subject: [PATCH 2/2] Bump node-fetch from 2.6.0 to 2.6.1 in /functions/recordPlaybackRate Bumps [node-fetch](https://github.com/bitinn/node-fetch) from 2.6.0 to 2.6.1. - [Release notes](https://github.com/bitinn/node-fetch/releases) - [Changelog](https://github.com/node-fetch/node-fetch/blob/master/docs/CHANGELOG.md) - [Commits](https://github.com/bitinn/node-fetch/compare/v2.6.0...v2.6.1) Signed-off-by: dependabot[bot] --- functions/recordPlaybackRate/package-lock.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/functions/recordPlaybackRate/package-lock.json b/functions/recordPlaybackRate/package-lock.json index 0cfb65b..9633258 100644 --- a/functions/recordPlaybackRate/package-lock.json +++ b/functions/recordPlaybackRate/package-lock.json @@ -4835,9 +4835,9 @@ } }, "node-fetch": { - "version": "2.6.0", - "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.0.tgz", - "integrity": "sha512-8dG4H5ujfvFiqDmVu9fQ5bOHUC15JMjMY/Zumv26oOvvVJjM67KF8koCWIabKQ1GJIa9r2mMZscBq/TbdOcmNA==" + "version": "2.6.1", + "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.1.tgz", + "integrity": "sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw==" }, "node-forge": { "version": "0.8.5",