chanelcolgate / hydroelectric-project

0 stars 0 forks source link

Data Ingestion #12

Open chanelcolgate opened 2 years ago

chanelcolgate commented 2 years ago

Description

Actions

with tf.io.TFRecordWriter("test.tfrecord") as w: w.write(b"First record") w.write(b"Second record")

for record in tf.data.TFRecordDataset("test.tfrecord"): print(record)

- If TFRecord files contain tf.Example records, each record contains one or more features that would represent the columns in our data. The data is the stored in binary files, which can be digested efficiently. If you are interested in the internals of TFRecord files, we recommend the [TensorFlow documentation](https://www.tensorflow.org/tutorials/load_data/tfrecord)
- Storing your data as TFRecord and tf.Examples provides a few benefits:
  - The data structure is system independent since it relies on Protocol Buffers, a cross-platform, cross-language library, to serialize data.
  - TFRecord is optimized for downloading or writing large amounts of data quickly.
  - tf.Example, the data structure representing every data row within TFRecord, is also the default data structure in the TensorFlow ecosystem and, therefore, is used in all TFX components.
```python
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext(pipeline_root='./tfx')

base_dir = os.getcwd() data_dir = "data"

Define the data path

examples = external_input(os.path.join(base_dir, data_dir))

Instantiate the pipeline component.

example_gen = CsvExampleGen(input=examples)

Execute the component interactively

context.run(example_gen)

- Importing existing TFRecord Files
  - Sometimes our data can't be expressed efficiently as CSVs. In these cases, it is cases, it is recommeded to convert the datasets to TFRecord data structures and then load the saved TFRecord files with the ImportExampleGen component.
```python
import os
from tfx.components import ImportExampleGen
from tfx.utils.dsl_utils import external_input

base_dir = os.getcwd()
data_dir = "tfrecord_data"
examples = external_input(os.path.join(base_dir, data_dir))
example_gen = ImportExampleGen(input=examples)

context.run(example_gen)

base_dir = os.getcwd() data_dir = 'data' output = example_gen_pb2.Output(

Define preferred splits

split_config=example_gen_pb2.SplitConfig(splits=[
  # Specify the ratio
  example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6),
  example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
  example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)
])

) examples = external_input(os.path.join(base_dir, data_dir))

Add output_config argument

example_gen = CsvExampleGen(input=examples, output_config=output)

context.run(example_gen)

  - Default Splits: If we don't specify any output configuration, the ExampleGen component splits the dataset into a training and evaluation split with a ratio of 2:1 by default
  - Preserving existing splits
```python
import os

from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input

base_dir = os.getcwd()
data_dir = 'data'
input = example_gen_pb2.Input(splits=[
  example_gen_pb2.Input.Split(name='train', pattern='train/*'),
  # Set existing subdirectories
  example_gen_pb2.Input.Split(name='test', pattern='test/*')
])
examples = external_input(os.path.join(base_dir, data_dir))
# Add the input_config argument
example_gen = CsvExampleGen(input=examples, input_config=input)

context.run(example_gen)

def _bytes_feature(value): return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value): return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value): return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

```python
import csv
import tensorflow as tf

LOCAL_FILE_NAME = "data/consumer_complaints_with_narrative.csv"
with open(LOCAL_FILE_NAME) as csv_file:
  reader = csv.DictReader(csv_file, delimiter=",", quotechar='"')
  for index, row in enumerate(reader):
    if index > 10 and index < 20:
      example = tf.train.Example(features=tf.train.Features(feature={
          "product": _bytes_feature(row['product'].encode('utf-8')),
          "sub_product": _bytes_feature(row['sub_product'].encode('utf-8')),
          "issue": _bytes_feature(row['issue'].encode('utf-8')),
          "sub_issue": _bytes_feature(row['sub_issue'].encode('utf-8')),
          "state": _bytes_feature(row['state'].encode('utf-8')),
          "zip_code": _bytes_feature(row['zip_code'].encode('utf-8')),
          "company": _bytes_feature(row['company'].encode('utf-8')),
          "company_response": _bytes_feature(row['company_response'].encode('utf-8')),
          "consumer_complaint_narrative": _bytes_feature(row['consumer_complaint_narrative'].encode('utf-8')),
          "timely_response": _bytes_feature(row['timely_response'].encode('utf-8')),
          "consumer_disputed": _bytes_feature(row['consumer_disputed'].encode('utf-8')),
      })

Estimate

Tests

chanelcolgate commented 2 years ago
import logging, urllib3, shutil
logging.basicConfig(level=logging.INFO)

def download_dataset(url, LOCAL_FILE_NAME):
  urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
  c = urllib3.PoolManager()
  with c.request("GET", url, preload_content=False) as res, open(
      LOCAL_FILE_NAME, "wb"
  ) as out_file:
    shutil.copyfileobj(res, out_file)
  logging.info("Dowload completed.")

logging.info("Started download script")
URL_1 = 'https://raw.githubusercontent.com/chanelcolgate/hydroelectric-project/master/data/SMP2020-2021_48h.csv'
LOCAL_FILE_NAME_1 = 'SMP2020-2021_48h.csv'
download_dataset(URL_1, LOCAL_FILE_NAME_1)

URL_2 = 'https://raw.githubusercontent.com/chanelcolgate/hydroelectric-project/master/data/SMP_5678_2021.csv'
LOCAL_FILE_NAME_2 = 'SMP_5678_2021.csv'
download_dataset(URL_2, LOCAL_FILE_NAME_2)