garyfeng / flink-playgrounds

Apache Flink Playgrounds
https://flink.apache.org/
Apache License 2.0
0 stars 0 forks source link

Json connector fails #2

Closed garyfeng closed 4 years ago

garyfeng commented 4 years ago

Same problem as with Kafka #1

>>> from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
>>> Kafka()
<pyflink.table.descriptors.Kafka object at 0x7f54553cc0f0>
>>> Json()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.7/dist-packages/apache_flink-1.10.dev0-py3.7.egg/pyflink/table/descriptors.py", line 553, in __init__
    self._j_json = gateway.jvm.Json()
TypeError: 'JavaPackage' object is not callable

Same solution, but need to identify which JAR Json() is in. Is it even a connector?

garyfeng commented 4 years ago

Json is a formatDescriptor

class Json(FormatDescriptor):
    """
    Format descriptor for JSON.
    """
garyfeng commented 4 years ago

The following Dockerfile fixed the issue. Note that we copied over many connectors and formatters.

###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

###############################################################################
# Build a PyFlink docker
###############################################################################

FROM maven:3.6-jdk-8-slim AS builder

# compile flink
RUN apt-get update && apt-get install -y git curl 
RUN git clone https://github.com/apache/flink.git
RUN cd flink && git fetch origin release-1.10  && git checkout -b release-1.10 origin/release-1.10

# running into an atp-get update error message; sites not available
# turns out it's docker's system clock was behind; restarted docker
# assuming we have the flink source installed under ./flink
# ADD flink /usr/local/flink

# fix the hive server-down issue by skipping it
# overwriting the pom with the modified one
# gut this cause other problems
# COPY pom.xml ./flink/flink-connectors/pom.xml
RUN cd ./flink && mvn clean install -DskipTests -Dfast

# Install python3
# Install Python
RUN apt-get -y install python3 python3-distutils && \
    ln -s /usr/bin/python3 /usr/bin/python; 

# install setuptools and pip
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py &&  \
    python ./get-pip.py
# create the pyflink official version
# unzip the official tar.gz, and copy over the 2 missing JARs
WORKDIR /flink/flink-python/ 
RUN python setup.py sdist bdist_wheel 
RUN cd dist && \
    tar -xf apache-flink-1.10.dev0.tar.gz && \
    cd ../.. && \
    cp flink-connectors/flink-connector-cassandra/target/flink-connector-cassandra_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-elasticsearch-base/target/flink-connector-elasticsearch-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-filesystem/target/flink-connector-filesystem_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-gcp-pubsub/target/flink-connector-gcp-pubsub_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-hive/target/flink-connector-hive_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-kafka-base/target/flink-connector-kafka-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-kinesis/target/flink-connector-kinesis_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-nifi/target/flink-connector-nifi_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-rabbitmq/target/flink-connector-rabbitmq_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-twitter/target/flink-connector-twitter_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-hadoop-compatibility/target/flink-hadoop-compatibility_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-hbase/target/flink-hbase_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-hcatalog/target/flink-hcatalog_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-jdbc/target/flink-jdbc_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-sql-connector-kafka/target/flink-sql-connector-kafka_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-sql-connector-elasticsearch6/target/flink-sql-connector-elasticsearch6_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-sql-connector-elasticsearch7/target/flink-sql-connector-elasticsearch7_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    \
    cp flink-formats/flink-avro/target/flink-avro-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-formats/flink-avro/target/flink-avro-1.10-SNAPSHOT-sql-jar.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-formats/flink-csv/target/flink-csv-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-formats/flink-csv/target/flink-csv-1.10-SNAPSHOT-sql-jar.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-formats/flink-json/target/flink-json-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-formats/flink-json/target/flink-json-1.10-SNAPSHOT-sql-jar.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-formats/flink-compress/target/flink-compress-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-formats/flink-parquet/target/flink-parquet_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib 

#    cp flink-connectors/flink-connector-kafka-base/target/original-flink-connector-kafka-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib

###############################################################################
# Build Operations Playground Image
###############################################################################

FROM flink:1.10.0-scala_2.11 AS final

# Install Python
RUN apt-get update && \
    apt-get -y install curl python3 python3-distutils && \
    ln -s /usr/bin/python3 /usr/bin/python; 
# install pyFlink & kafka-python 
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
RUN python ./get-pip.py
RUN python -m pip install kafka-python

# Copy over the fixed pyflink
WORKDIR /
COPY --from=builder /flink/flink-python/dist/apache-flink-1.10.dev0 /apache-flink-1.10.dev0

# install the fixed version
RUN cd /apache-flink-1.10.dev0 && \
    python setup.py install
# remember to export the FLINK_HOME env var
ENV FLINK_HOME=/usr/local/lib/python3.7/dist-packages/apache_flink-1.10.dev0-py3.7.egg/pyflink

# setting up the work env
WORKDIR /opt/flink/bin

# Copy pyflink word count example
COPY WordCount.py WordCount.py
COPY input.txt /tmp/input

# Copy pyflink click count example
COPY ClickEventGenerator.py ClickEventGenerator.py
COPY ClickEventCount.py ClickEventCount.py
garyfeng commented 4 years ago

Need a redo in light of #3's finding

garyfeng commented 4 years ago

Fixed for now, but we may need to add more connector files in the future.