DeepRec-AI / HybridBackend

A high-performance framework for training wide-and-deep recommender systems on heterogeneous cluster
Apache License 2.0
156 stars 30 forks source link

error: Variables not initialized: communicator/1/HbNcclCommHandleOp #78

Closed shijieliu closed 1 year ago

shijieliu commented 2 years ago

Current behavior

2022-10-19 12:39:39.948019: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2022-10-19 12:39:39.948020: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
INFO:tensorflow:Parsing ../data//train.csv
INFO:tensorflow:Parsing ../data//train.csv
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
INFO:tensorflow:Aggregate 12 dense gradients (33.35MB) and 0 sparse gradients (0.00MB), skip 26 aggregated gradients
INFO:tensorflow:Aggregate 12 dense gradients (33.35MB) and 0 sparse gradients (0.00MB), skip 26 aggregated gradients
2022-10-19 12:39:43.135528: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3792945000 Hz
2022-10-19 12:39:43.136209: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x53d08e0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2022-10-19 12:39:43.136227: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2022-10-19 12:39:43.137613: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2022-10-19 12:39:43.147796: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3792945000 Hz
2022-10-19 12:39:43.148600: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x4190950 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2022-10-19 12:39:43.148638: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2022-10-19 12:39:43.150144: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2022-10-19 12:39:43.263791: I tensorflow/stream_executor/cuda/cuda_driver.cc:404] Cuda add device primary context 0x6a91880
2022-10-19 12:39:43.263977: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.264217: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x6a644c0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2022-10-19 12:39:43.264241: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
2022-10-19 12:39:43.264400: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.264809: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1687] Found device 0 with properties: 
name: NVIDIA GeForce RTX 2080 Ti major: 7 minor: 5 memoryClockRate(GHz): 1.635
pciBusID: 0000:08:00.0
2022-10-19 12:39:43.264837: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2022-10-19 12:39:43.267560: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
2022-10-19 12:39:43.267587: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
2022-10-19 12:39:43.272210: I tensorflow/stream_executor/cuda/cuda_driver.cc:404] Cuda add device primary context 0x51f3c70
2022-10-19 12:39:43.272373: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.272643: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x51bf3f0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2022-10-19 12:39:43.272666: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
2022-10-19 12:39:43.272784: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.272986: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1687] Found device 0 with properties: 
name: NVIDIA GeForce RTX 2080 Ti major: 7 minor: 5 memoryClockRate(GHz): 1.635
pciBusID: 0000:07:00.0
2022-10-19 12:39:43.273007: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2022-10-19 12:39:43.275711: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
2022-10-19 12:39:43.275737: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
2022-10-19 12:39:43.288994: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
2022-10-19 12:39:43.289162: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
2022-10-19 12:39:43.289498: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.11
2022-10-19 12:39:43.290069: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
2022-10-19 12:39:43.290150: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
2022-10-19 12:39:43.290231: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.290471: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.290643: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1815] Adding visible gpu devices: 0
2022-10-19 12:39:43.292542: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1170] Device interconnect StreamExecutor with strength 1 edge matrix:
2022-10-19 12:39:43.292558: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1176]      0 
2022-10-19 12:39:43.292564: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1189] 0:   N 
2022-10-19 12:39:43.292640: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.292843: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.293058: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1372] Created TensorFlow device (/job:worker/replica:0/task:0/device:GPU:0 with 9793 MB memory) -> physical GPU (device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:08:00.0, compute capability: 7.5)
2022-10-19 12:39:43.294188: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> 127.0.0.1:20001}
2022-10-19 12:39:43.294199: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> localhost:20002}
2022-10-19 12:39:43.294986: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:374] Started server with target: grpc://localhost:20002
2022-10-19 12:39:43.297217: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
2022-10-19 12:39:43.297453: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
2022-10-19 12:39:43.297814: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.11
2022-10-19 12:39:43.298428: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
2022-10-19 12:39:43.298520: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
2022-10-19 12:39:43.298607: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.298845: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.299025: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1815] Adding visible gpu devices: 0
2022-10-19 12:39:43.301007: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1170] Device interconnect StreamExecutor with strength 1 edge matrix:
2022-10-19 12:39:43.301024: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1176]      0 
2022-10-19 12:39:43.301031: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1189] 0:   N 
2022-10-19 12:39:43.301114: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.301327: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.301552: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1372] Created TensorFlow device (/job:chief/replica:0/task:0/device:GPU:0 with 9729 MB memory) -> physical GPU (device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:07:00.0, compute capability: 7.5)
2022-10-19 12:39:43.302687: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> localhost:20001}
2022-10-19 12:39:43.302705: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> 127.0.0.1:20002}
2022-10-19 12:39:43.303434: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:374] Started server with target: grpc://localhost:20001
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:run without loading checkpoint
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:run without loading checkpoint
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
Using TensorFlow version 1.15.5
Checking dataset...
Numbers of training dataset is 8000000
The training steps is 100
Traceback (most recent call last):
  File "benchmark_hb.py", line 405, in <module>
    main()
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/function.py", line 144, in wrapped_fn
    return fn(*args, **kwargs)
  File "benchmark_hb.py", line 339, in main
    config=sess_config) as sess:
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 174, in HybridBackendMonitoredTrainingSession
    sess = fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 633, in MonitoredTrainingSession
    stop_grace_period_secs=stop_grace_period_secs)
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 69, in __init__
    session_creator, hooks, should_recover=True, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 775, in __init__
    self._sess = _RecoverableSession(self._coordinated_creator)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1257, in __init__
    _WrappedSession.__init__(self, self._create_session())
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1262, in _create_session
    return self._sess_creator.create_session()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 928, in create_session
    self.tf_sess = self._session_creator.create_session()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 697, in create_session
    init_fn=self._scaffold.init_fn)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/session_manager.py", line 323, in prepare_session
    (_maybe_name(init_op), init_fn, self._local_init_op, msg))
RuntimeError: Init operations did not make model ready.  Init op: group_deps_2, init fn: None, local_init_op: name: "group_deps_1"
op: "NoOp"
input: "^group_deps_1/NoOp"
input: "^group_deps_1/NoOp_1"
device: "/job:chief/task:0/device:GPU:0"
, error: Variables not initialized: communicator/0/HbNcclCommHandleOp
Using TensorFlow version 1.15.5
Checking dataset...
Numbers of training dataset is 8000000
The training steps is 100
Traceback (most recent call last):
  File "benchmark_hb.py", line 405, in <module>
    main()
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/function.py", line 144, in wrapped_fn
    return fn(*args, **kwargs)
  File "benchmark_hb.py", line 339, in main
    config=sess_config) as sess:
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 174, in HybridBackendMonitoredTrainingSession
    sess = fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 633, in MonitoredTrainingSession
    stop_grace_period_secs=stop_grace_period_secs)
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 69, in __init__
    session_creator, hooks, should_recover=True, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 775, in __init__
    self._sess = _RecoverableSession(self._coordinated_creator)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1257, in __init__
    _WrappedSession.__init__(self, self._create_session())
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1262, in _create_session
    return self._sess_creator.create_session()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 928, in create_session
    self.tf_sess = self._session_creator.create_session()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 697, in create_session
    init_fn=self._scaffold.init_fn)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/session_manager.py", line 323, in prepare_session
    (_maybe_name(init_op), init_fn, self._local_init_op, msg))
RuntimeError: Init operations did not make model ready.  Init op: group_deps_2, init fn: None, local_init_op: name: "group_deps_1"
op: "NoOp"
input: "^group_deps_1/NoOp"
input: "^group_deps_1/NoOp_1"
device: "/job:worker/task:0/device:GPU:0"
, error: Variables not initialized: communicator/1/HbNcclCommHandleOp

Expected behavior

code run well

System information

Code to reproduce

  1. Download the train dataset(in csv format) from https://storage.googleapis.com/dataset-uploader/criteo-kaggle/large_version/train.csv
  2. The training script
    
    # Copyright (c) 2022 Intel Corporation
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # ==============================================================================

from tensorflow.python.framework import dtypes import numpy as np from ast import arg import time import argparse import tensorflow as tf import os import sys import math import collections from tensorflow.python.client import timeline import json

from tensorflow.python.framework import sparse_tensor from tensorflow.python.feature_column import feature_column_v2 as fc from tensorflow.python.ops import partitioned_variables from tensorflow.python.framework import ops os.environ["TF_GPU_THREAD_MODE"] = "global" import hybridbackend.tensorflow as hb

Set to INFO for tracking training, default is WARN. ERROR for least messages

tf.logging.set_verbosity(tf.logging.INFO) print("Using TensorFlow version %s" % (tf.version))

Definition of some constants

CONTINUOUS_COLUMNS = ['I' + str(i) for i in range(1, 14)] # 1-13 inclusive CATEGORICAL_COLUMNS = ['C' + str(i) for i in range(1, 27)] # 1-26 inclusive LABEL_COLUMN = ['clicked'] TRAIN_DATA_COLUMNS = LABEL_COLUMN + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS FEATURE_COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS HASH_BUCKET_SIZES = { 'C1': 2500, 'C2': 2000, 'C3': 300000, 'C4': 250000, 'C5': 1000, 'C6': 100, 'C7': 20000, 'C8': 4000, 'C9': 20, 'C10': 100000, 'C11': 10000, 'C12': 250000, 'C13': 40000, 'C14': 100, 'C15': 100, 'C16': 200000, 'C17': 50, 'C18': 10000, 'C19': 4000, 'C20': 20, 'C21': 250000, 'C22': 100, 'C23': 100, 'C24': 250000, 'C25': 400, 'C26': 100000 }

EMBEDDING_DIMENSIONS = { 'C1': 64, 'C2': 64, 'C3': 128, 'C4': 128, 'C5': 64, 'C6': 64, 'C7': 64, 'C8': 64, 'C9': 64, 'C10': 128, 'C11': 64, 'C12': 128, 'C13': 64, 'C14': 64, 'C15': 64, 'C16': 128, 'C17': 64, 'C18': 64, 'C19': 64, 'C20': 64, 'C21': 128, 'C22': 64, 'C23': 64, 'C24': 128, 'C25': 64, 'C26': 128 }

def transform_numeric(feature): r'''Transform numeric features. '''

Notes: Statistics of Kaggle's Criteo Dataset has been calculated in advance to save time.

mins_list = [
    0.0, -3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
]
range_list = [
    1539.0, 22069.0, 65535.0, 561.0, 2655388.0, 233523.0, 26297.0, 5106.0,
    24376.0, 9.0, 181.0, 1807.0, 6879.0
]

def make_minmaxscaler(min, range):
    def minmaxscaler(col):
        return (col - min) / range

    return minmaxscaler

numeric_list = []

for column_name in CONTINUOUS_COLUMNS:
    normalizer_fn = None
    i = CONTINUOUS_COLUMNS.index(column_name)
    normalizer_fn = make_minmaxscaler(mins_list[i], range_list[i])
    numeric = normalizer_fn(feature[column_name])
    numeric_list.append(tf.reshape(numeric, shape=[-1, 1]))
return numeric_list

def transform_categorical(feature): r'''Transform categorical features. ''' deep_features = [] max_value = np.iinfo(dtypes.int64.as_numpy_dtype).max

variables = []
indices = []
for column_name in CATEGORICAL_COLUMNS:
    ev_opt = tf.EmbeddingVariableOption(
        evict_option=None, filter_option=None)
    device_str = '/gpu'
    with tf.device(device_str), hb.scope(sharding=True):
        embedding_weights = tf.get_embedding_variable(
            f'{column_name}_weight',
            initializer=tf.random_normal_initializer(
                mean=0.0, stddev=0.05
            ),
            embedding_dim=EMBEDDING_DIMENSIONS[column_name],
            ev_option=ev_opt
        )

    category = tf.strings.to_hash_bucket_fast(
        feature[column_name], max_value)
    sparse_tensor = fc._to_sparse_input_and_drop_ignore_values(category)
    sparse_tensor = tf.sparse.reshape(sparse_tensor, (-1, 1))

    deep_features.append(tf.nn.embedding_lookup_sparse(
        embedding_weights, sparse_tensor, None))

    variables.append(embedding_weights)
    indices.append(sparse_tensor)
return deep_features

def stacked_dcn_v2(features, mlp_dims): r'''Stacked DCNv2.

DCNv2: Improved Deep & Cross Network and Practical Lessons for Web-scale
Learning to Rank Systems.

See https://arxiv.org/abs/2008.13535 for more information.
'''
with tf.name_scope('cross'):
    cross_input = tf.concat(features, axis=-1)
    cross_input_shape = [-1, sum([f.shape[-1] for f in features])]
    cross_input = tf.reshape(cross_input, cross_input_shape)
    cross_input_sq = tf.layers.dense(
        cross_input, cross_input.shape[-1],
        activation=tf.nn.relu,
        kernel_initializer=tf.truncated_normal_initializer(),
        bias_initializer=tf.zeros_initializer())
    cross_output = cross_input * cross_input_sq + cross_input
    cross_output = tf.reshape(cross_output, [-1, cross_input.shape[1]])
    cross_output_dim = (len(features) * (len(features) + 1)) / 2

with tf.name_scope('mlp'):
    prev_layer = cross_output
    prev_dim = cross_output_dim
    for i, d in enumerate(mlp_dims[:-1]):
        prev_layer = tf.layers.dense(
            prev_layer, d,
            activation=tf.nn.relu,
            kernel_initializer=tf.random_normal_initializer(
                mean=0.0,
                stddev=math.sqrt(2.0 / (prev_dim + d))),
            bias_initializer=tf.random_normal_initializer(
                mean=0.0,
                stddev=math.sqrt(1.0 / d)),
            name=f'mlp_{i}')
        prev_dim = d
    return tf.layers.dense(
        prev_layer, mlp_dims[-1],
        activation=tf.nn.sigmoid,
        kernel_initializer=tf.random_normal_initializer(
            mean=0.0,
            stddev=math.sqrt(2.0 / (prev_dim + mlp_dims[-1]))),
        bias_initializer=tf.random_normal_initializer(
            mean=0.0,
            stddev=math.sqrt(1.0 / mlp_dims[-1])),
        name=f'mlp_{len(mlp_dims) - 1}')

generate dataset pipline

def build_model_input(filename, batch_size, num_epochs): def parse_csv(value): tf.logging.info('Parsing {}'.format(filename)) cont_defaults = [[0.0] for i in range(1, 14)] cate_defaults = [[' '] for i in range(1, 27)] label_defaults = [[0]] column_headers = TRAIN_DATA_COLUMNS record_defaults = label_defaults + cont_defaults + cate_defaults columns = tf.io.decode_csv(value, record_defaults=record_defaults) all_columns = collections.OrderedDict(zip(column_headers, columns)) labels = all_columns.pop(LABEL_COLUMN[0]) features = all_columns return features, labels

'''Work Queue Feature'''
if args.workqueue:
    from tensorflow.python.ops.work_queue import WorkQueue
    work_queue = WorkQueue([filename])
    # For multiple files:
    # work_queue = WorkQueue([filename, filename1,filename2,filename3])
    files = work_queue.input_dataset()
else:
    files = filename
# Extract lines from input files using the Dataset API.
dataset = tf.data.TextLineDataset(files)
dataset = dataset.shuffle(buffer_size=20000,
                          seed=args.seed)  # fix seed for reproducing
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
dataset = dataset.map(parse_csv, num_parallel_calls=28)
dataset = dataset.prefetch(2)
return dataset

@hb.function() def main():

# check dataset and count data set size
print("Checking dataset...")
train_file = args.data_location + '/train.csv'
if (not os.path.exists(train_file)):
    print("Dataset does not exist in the given data_location.")
    sys.exit()
no_of_training_examples = sum(1 for line in open(train_file))
print("Numbers of training dataset is {}".format(no_of_training_examples))

# set batch size, eporch & steps
batch_size = args.batch_size

if args.steps == 0:
    no_of_epochs = 1
    train_steps = math.ceil(
        (float(no_of_epochs) * no_of_training_examples) / batch_size)
else:
    no_of_epochs = math.ceil(
        (float(batch_size) * args.steps) / no_of_training_examples)
    train_steps = args.steps
print("The training steps is {}".format(train_steps))

# set fixed random seed
tf.set_random_seed(args.seed)

# create data pipline of train & test dataset
with tf.device('/cpu:0'):
    train_dataset = build_model_input(train_file, batch_size, no_of_epochs)

    iterator = tf.data.Iterator.from_structure(train_dataset.output_types,
                                            train_dataset.output_shapes)
    next_element = iterator.get_next()

train_init_op = iterator.make_initializer(train_dataset)

# create feature column
feature, labels = next_element[0], next_element[1]

deep_features = transform_categorical(feature)
wide_features = transform_numeric(feature)
logits = stacked_dcn_v2(features=deep_features + wide_features,
                        mlp_dims=[1024, 1024, 512, 256, 1]
                        )
loss = tf.reduce_mean(tf.keras.losses.binary_crossentropy(tf.reshape(labels, (-1, 1)), logits))

step = tf.train.get_or_create_global_step()
opt = tf.train.AdagradOptimizer(learning_rate=0.01)
train_op = opt.minimize(loss, global_step=step)

# Session config
sess_config = tf.ConfigProto()

# # Session hooks
hooks = []

# if args.smartstaged and not args.tf:
#     '''Smart staged Feature'''
#     next_element = tf.staged(next_element, num_threads=4, capacity=40)
#     sess_config.graph_options.optimizer_options.do_smart_stage = True
#     hooks.append(tf.make_prefetch_hook())
# if args.op_fusion and not args.tf:
#     '''Auto Graph Fusion'''
#     sess_config.graph_options.optimizer_options.do_op_fusion = True
# if args.micro_batch and not args.tf:
#     '''Auto Mirco Batch'''
#     sess_config.graph_options.optimizer_options.micro_batch_num = args.micro_batch

scaffold = tf.train.Scaffold(
    local_init_op=tf.group(
        tf.local_variables_initializer(), train_init_op),
)

stop_hook = tf.train.StopAtStepHook(last_step=train_steps)
log_hook = tf.train.LoggingTensorHook(
    {
        'steps': step,
        'loss': loss,
    }, every_n_iter=1)
hooks.append(stop_hook)
hooks.append(log_hook)

with tf.train.MonitoredTrainingSession(
        master='',
        hooks=hooks,
        scaffold=scaffold,
        config=sess_config) as sess:
    while not sess.should_stop():
        print(sess.run([feature]))
        sess.run([loss, train_op])
print("Training completed.")

def boolean_string(string): low_string = string.lower() if low_string not in {'false', 'true'}: raise ValueError('Not a valid boolean string') return low_string == 'true'

Get parse

def get_arg_parser(): parser = argparse.ArgumentParser() parser.add_argument('--data_location', help='Full path of train data', required=False, default='./data') parser.add_argument('--steps', help='set the number of steps on train dataset', type=int, default=0) parser.add_argument('--batch_size', help='Batch size to train. Default is 512', type=int, default=512) parser.add_argument('--seed', help='set the random seed for tensorflow', type=int, default=2021) parser.add_argument('--workqueue', help='Whether to enable Work Queue. Default to False.', type=boolean_string, default=False) return parser

Some DeepRec's features are enabled by ENV.

This func is used to set ENV and enable these features.

A triple quotes comment is used to introduce these features and play an emphasizing role.

def set_env_for_DeepRec(): ''' Set some ENV for these DeepRec's features enabled by ENV. More Detail information is shown in https://deeprec.readthedocs.io/zh/latest/index.html. START_STATISTIC_STEP & STOP_STATISTIC_STEP: On CPU platform, DeepRec supports memory optimization in both stand-alone and distributed trainging. It's default to open, and the default start and stop steps of collection is 1000 and 1100. Reduce the initial cold start time by the following settings. MALLOC_CONF: On CPU platform, DeepRec can use memory optimization with the jemalloc library. Please preload libjemalloc.so by LD_PRELOAD=./libjemalloc.so.2 python ... ''' os.environ['START_STATISTIC_STEP'] = '100' os.environ['STOP_STATISTIC_STEP'] = '110' os.environ['MALLOC_CONF'] = \ 'background_thread:true,metadata_thp:auto,dirty_decay_ms:20000,muzzy_decay_ms:20000'

if name == 'main': parser = get_arg_parser() args = parser.parse_args()

set_env_for_DeepRec()

main()
3. Training command:

python -m hybridbackend.run python benchmark_hb.py --data_location ../data/ --steps 100


# Willing to contribute

Yes