DataDynamics / nifi-custom-processor

NiFi에서 동작하는 Custom Processor를 모아둔 프로젝트.
0 stars 0 forks source link
csv dbcp nar nifi parquet processor recordreader reportingtask

NiFi Custom Services

이 프로젝트는 NiFi에서 사용할 수 있는 Custom Processor, Controller Service, Reporting Task 등을 구현한 NiFi 프로젝트입니다.

Requirement

Build

# mvn clean package

NiFi 로그인 인증

NiFi 최초 설치시 SSL을 위한 구성이 자동으로 진행되며 이 경우 단일 사용자를 위한 로그인을 다음의 커맨드를 이용하여 설정합니다.

# cd <NIFI_HOME>/bin
# sh nifi.sh set-single-user-credentials admin adminadminadmin

NAR 파일 배포

빌드한 NAR 파일을 배포하려면 다음과 같이 NAR 파일을 복사합니다. 소스코드를 수정하고 재배포하는 경우에도 동일하게 진행하면 변경내용을 확인하여 NiFi가 내부적으로 NAR를 재배포합니다.

# cp nifi-custom-reporting-tasks-1.0.6.nar <NIFI_HOME>/extentions
# cd <NIFI_HOME>/bin
# sh nifi.sh start

Processor

PutKudu

기존 Put Kudu는 다음의 문제점이 있습니다.

따라서 PutKudu 모듈은 다음을 추가로 지원합니다.

테스트를 위해서 CSV 파일을 다음과 같이 작성합니다.

COL_INT,COL_FLOAT,COL_TIMESTAMP,COL_TIMESTAMP_MILLIS,COL_TIMESTAMP_MICROS
1,1.1,2022-11-11 11:11:11,2022-11-11 11:11:11.111,2022-11-11 11:11:111111

해당 CSV 파일의 Avro Schema는 다음과 같으며 Timstamp는 문자열로 우선 처리를 하게 되므로 string으로 자료형을 지정합니다.

{
  "namespace": "nifi",
  "type": "record",
  "name": "input",
  "fields": [
    {
      "name": "col_int",
      "type": ["null", "int"]
    },
    {
      "name": "col_float",
      "type": ["null", "float"]
    },
    {
      "name": "col_timestamp",
      "type": ["null", {"type": "string", "logicalType": "timestamp-millis"}]
    },
    {
      "name": "col_timestamp_millis",
      "type": ["null", {"type": "string", "logicalType": "timestamp-millis"}]
    },
    {
      "name": "col_timestamp_micros",
      "type": ["null", {"type": "string", "logicalType": "timestamp-micros"}]
    }
  ]
}

이제 데이터를 저장할 Kudu 테이블을 생성합니다.

CREATE TABLE input
(
    col_int INT,
    col_float FLOAT,
    col_timestamp TIMESTAMP,
    col_timestamp_millis TIMESTAMP,
    col_timestamp_micros TIMESTAMP,
    PRIMARY KEY(col_int)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;

이제 Timestamp 컬럼에 대해서 문자열 Timestamp 컬럼을 파싱하기 위한 Timestamp Pattern을 다음과 같이 정의합니다. Impala와 Kudu의 경우 nanoseconds는 지원하지 않습니다.

{
  "formats": [
    {
      "column-name": "col_timestamp",
      "timestamp-pattern": "yyyy-MM-dd HH:mm:ss",
      "type": "TIMESTAMP_MILLIS"
    },
    {
      "column-name": "col_timestamp_millis",
      "timestamp-pattern": "yyyy-MM-dd HH:mm:ss.SSS",
      "type": "TIMESTAMP_MILLIS"
    },
    {
      "column-name": "col_timestamp_micros",
      "timestamp-pattern": "yyyy-MM-dd HH:mm:ss.SSSSSS",
      "type": "TIMESTAMP_MICROS"
    }
  ]
}

이제 Put Kudu를 실행하면 다음과 같이 UTC가 적용되어 -9시간으로 시간이 변경됩니다.

+---------+-------------------+---------------------+-------------------------------+-------------------------------+
| col_int | col_float         | col_timestamp       | col_timestamp_millis          | col_timestamp_micros          |
+---------+-------------------+---------------------+-------------------------------+-------------------------------+
| 1       | 1.100000023841858 | 2022-11-11 02:11:11 | 2022-11-11 02:11:11.111000000 | 2022-11-11 02:11:11.111111000 |
+---------+-------------------+---------------------+-------------------------------+-------------------------------+

Timestamp 컬럼의 시간을 +9로 입력하면 이제 11시로 정상적으로 표시됩니다.

+---------+-------------------+---------------------+-------------------------------+-------------------------------+
| col_int | col_float         | col_timestamp       | col_timestamp_millis          | col_timestamp_micros          |
+---------+-------------------+---------------------+-------------------------------+-------------------------------+
| 1       | 1.100000023841858 | 2022-11-11 11:11:11 | 2022-11-11 11:11:11.111000000 | 2022-11-11 11:11:11.111111000 |
+---------+-------------------+---------------------+-------------------------------+-------------------------------+

DatabaseProcessor

DBCP Connection Pool을 이용한 SQL을 실행시키는 예제 Processor입니다.

Impala JDBC Driver

Impala JDBC Driver로 테스트를 위해서 lib/ImpalaJDBC42.jar 파일을 NIFI가 설치되어 있는 적정 위치(예; /opt/cloudera/parcels/CFM/NIFI/lib)에 업로드합니다.

Bulk Oracle Insert Processor

Bulk Oracle Insert Processor는 Record Reader를 통해서 수신한 Record를 Avro Parser를 통해 Avro Schema를 확인하여 다음의 Bulk Insert를 위한 SQL을 생성해서 INSERT합니다.

INSERT ALL
INTO schema1.table1 ( TypeBoolean, TypeInt, TypeLong, TypeFloat, TypeDouble, TypeString, TypeBytesDecimal, TypeDate, TypeTimeInMillis, TypeTimeInMicros, TypeTimestampInMillis, TypeStringTimestampInMillis ) VALUES ( false, 1, 123123, 123123.0, 3.14, 'Hello World', 11.11, 19575, 32051998, 32051998834, 1691279651998, '2022-11-11 11:11:11.111' )
INTO schema1.table1 ( TypeBoolean, TypeInt, TypeLong, TypeFloat, TypeDouble, TypeString, TypeBytesDecimal, TypeDate, TypeTimeInMillis, TypeTimeInMicros, TypeTimestampInMillis, TypeStringTimestampInMillis ) VALUES ( false, 1, 123123, 123123.0, 3.14, 'Hello World', 11.11, 19575, 32051998, 32051998834, 1691279651998, '2022-11-11 11:11:11.111' )
INTO schema1.table1 ( TypeBoolean, TypeInt, TypeLong, TypeFloat, TypeDouble, TypeString, TypeBytesDecimal, TypeDate, TypeTimeInMillis, TypeTimeInMicros, TypeTimestampInMillis, TypeStringTimestampInMillis ) VALUES ( false, 1, 123123, 123123.0, 3.14, 'Hello World', 11.11, 19575, 32051998, 32051998834, 1691279651998, '2022-11-11 11:11:11.111' )
INTO schema1.table1 ( TypeBoolean, TypeInt, TypeLong, TypeFloat, TypeDouble, TypeString, TypeBytesDecimal, TypeDate, TypeTimeInMillis, TypeTimeInMicros, TypeTimestampInMillis, TypeStringTimestampInMillis ) VALUES ( 'NULL', 1, 'NULL', 'NULL', 'NULL', 'Hello World', 11.11, 19575, 32051999, 'NULL', 1691279651999, '2022-11-11 11:11:11.111' )
INTO schema1.table1 ( TypeBoolean, TypeInt, TypeLong, TypeFloat, TypeDouble, TypeString, TypeBytesDecimal, TypeDate, TypeTimeInMillis, TypeTimeInMicros, TypeTimestampInMillis, TypeStringTimestampInMillis ) VALUES ( 'NULL', 1, 'NULL', 'NULL', 'NULL', 'Hello World', 11.11, 19575, 32051999, 'NULL', 1691279651999, '2022-11-11 11:11:11.111' )
INTO schema1.table1 ( TypeBoolean, TypeInt, TypeLong, TypeFloat, TypeDouble, TypeString, TypeBytesDecimal, TypeDate, TypeTimeInMillis, TypeTimeInMicros, TypeTimestampInMillis, TypeStringTimestampInMillis ) VALUES ( 'NULL', 1, 'NULL', 'NULL', 'NULL', 'Hello World', 11.11, 19575, 32051999, 'NULL', 1691279651999, '2022-11-11 11:11:11.111' )
SELECT 1 FROM dual;

Reporting Tasks

기존에 NiFi에 포함되어 있는 Reporting Task의 경우 로그 메시지 출력을 통해 NiFi UI의 Bulletin에 표시되도록 할 수는 있습니다. 하지만 장애가 발생할 수 있는 중요한 정보는 알람이 필요한 경우가 많으므로 다음의 Reporting Task의 경우 외부 HTTP URI에 임계치를 초과하는 경우 알람을 발송할 수 있습니다.

기타

Apache Ant

build.xml 파일에서 SCP를 수행하기 위해서 IntelliJ IDEA에서 실행하는 경우 다음과 같이 JSCH가 필요합니다.

# wget https://repo1.maven.org/maven2/com/jcraft/jsch/0.1.55/jsch-0.1.55.jar
# mkdir -p ~/.ant/lib
# mv jsch-0.1.55.jar ~/.ant/lib