garyfeng / flink-playgrounds

Apache Flink Playgrounds
https://flink.apache.org/
Apache License 2.0
0 stars 0 forks source link

Kafka connector not working for pylink #1

Closed garyfeng closed 4 years ago

garyfeng commented 4 years ago

Python PyFlink StreamTable example

error

>docker-compose exec client python ClickEventCount.py
Traceback (most recent call last):
  File "ClickEventCount.py", line 24, in <module>
    Kafka()
  File "/usr/local/lib/python3.7/dist-packages/pyflink/table/descriptors.py", line 705, in __init__
    self._j_kafka = gateway.jvm.Kafka()
TypeError: 'JavaPackage' object is not callable

From https://blog.csdn.net/ghostyusheng/article/details/102696867

如果你是按照上面教程,你会发现pyflink.table.descriptors import Kafka 中的 Kafka()实例化会失败,然而官方也没有给出解决方案,根据不断的尝试摸索,正确的做法是,flink/flink-connectors/flink-connector-kafka-base/target里面的flink-connector-kafka-base_2.11-1.9-SNAPSHOT.jar + original-flink-connector-kafka-base_2.11-1.9-SNAPSHOT.jar JAR包复制到flink/flink-python/dist/apache-flink-1.9.dev0/deps/lib目录,让后使用tar czvf命令重新打包成pyflink依赖包,把之前的卸载掉pip3 uninstall apache-flink && pipi3 install 新打包.tar.gz. ———————————————— 版权声明:本文为CSDN博主「ghostyusheng」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/ghostyusheng/article/details/102696867

Logging into the clint container, we find python3.7 installation with dist at /usr/local/lib/python3.7/dist-packages/pyflink/lib, with flink-dist_2.11-1.10.0.jar,flink-table_2.11-1.10.0.jar, slf4j-log4j12-1.7.15.jar, flink-table-blink_2.11-1.10.0.jar, log4j-1.2.17.jar

Sounds likwe need to copy the flink-connector-kafka-base*.jar and original-flink-connector-kafka-base_2.11-1.9-SNAPSHOT.jar to here.

According to the above post, we need to reinstall pyflink from the above source. But maybe adding the JAR files to the right place will do the trick?

garyfeng commented 4 years ago

I was able to find the jar file at https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-base_2.11/1.10.0. Can download the 104K jar file.

garyfeng commented 4 years ago

I did a brute force fix with the following to no avail; same result.

# to fix #1: Kafka connector not working for pylink
# https://github.com/garyfeng/flink-playgrounds/issues/1
COPY ./python/flink-connector-kafka-base_2.11-1.10.0.jar /usr/local/lib/python3.7/dist-packages/pyflink/lib/flink-connector-kafka-base_2.11-1.10.0.jar
garyfeng commented 4 years ago

The offending lines are "/usr/local/lib/python3.7/dist-packages/pyflink/table/descriptors.py", line 705

class Kafka(ConnectorDescriptor):
    """
    Connector descriptor for the Apache Kafka message queue.
    """

    def __init__(self):
        gateway = get_gateway()
        self._j_kafka = gateway.jvm.Kafka()
        super(Kafka, self).__init__(self._j_kafka)

It looks like it's a jvm setup issue.

garyfeng commented 4 years ago

Taking the route of compiling flink, as indicated in https://blog.csdn.net/ghostyusheng/article/details/102696867

Instead of installing java and maven on my local machine, I used the same maven docker that is used to build the official Flink Playground example. Modified Dockerfile below. This passed for release 1.9 (see the original post). I want to test this out with Flink v1.10.

FROM maven:3.6-jdk-8-slim AS builder

# compile flink
RUN apt-get update && apt-get install -y git curl 
RUN git clone https://github.com/apache/flink.git
RUN cd flink && git fetch origin release-1.10  && git checkout -b release-1.10 origin/release-1.10

# running into an atp-get update error message; sites not available
# turns out it's docker's system clock was behind; restarted docker
# assuming we have the flink source installed under ./flink
# ADD flink /usr/local/flink
RUN cd ./flink && mvn clean install -DskipTests -Dfast

The idea is the build the JAR files, package it to a local PyFlink package, and COPY it to the next build stage (official Flink docker), to in stall python and this special PyFlink version. This is what the OP indicated.

Compile failed because Datanucleus.org happens to be down :-(. It was working in the afternoon.

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  11:45 min
[INFO] Finished at: 2020-03-17T22:49:34Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project flink-connector-hive_2.11: Could not resolve dependencies for project org.apache.flink:flink-connector-hive_2.11:jar:1.10-SNAPSHOT: Could not transfer artifact javax.jms:jms:jar:1.1 from/to datanucleus (http://www.datanucleus.org/downloads/maven2): Transfer failed for http://www.datanucleus.org/downloads/maven2/javax/jms/jms/1.1/jms-1.1.jar: Connect to www.datanucleus.org:80 [www.datanucleus.org/80.86.85.8] failed: Connection refused (Connection refused) -> [Help 1]
garyfeng commented 4 years ago

Note that according to the OP, these source JARs need to be copied from ./flink-connectors/flink-connector-kafka-base/target/

Once you get to python3 setup.py sdist bdist_wheel, the setup process will pack up apache-flink-1.9.dev0.tar.gz and apache_flink-1.9.dev0-py2.py3-none-any.whl under folder flink-python/dist.

Need to unpack the tar ball: tar -xf apache-flink-1.9.dev0.tar.gz, which gives the folder flink-python/dist/apache-flink-1.9.dev0

Then we can copy the Kafka JAR files over. In this example Flink v1.9 was installed in /usr/local/flink.

root@b169c1b7c81b:/usr/local/flink/flink-python/dist/apache-flink-1.9.dev0/deps/lib# cp /usr/local/flink/flink-connectors/flink-connector-kafka-base/target/flink-connector-kafka-base_2.11-1.9-SNAPSHOT.jar .
root@b169c1b7c81b:/usr/local/flink/flink-python/dist/apache-flink-1.9.dev0/deps/lib# cp /usr/local/flink/flink-connectors/flink-connector-kafka-base/target/original-flink-connector-kafka-base_2.11-1.9-SNAPSHOT.jar .

Then I did the following to install

cd apache-flink-1.9.dev0
python setup.py install

To check:

pip3 list |grep dev

The actual JAR files were finally installed at /usr/local/lib/python3.7/dist-packages/apache_flink-1.9.dev0-py3.7.egg/pyflink/lib. I wonder if I can just copy these JARs to the properly installed apache-flink or pyflink package.

Note that this was done with a half installed docker image for Flink 1.9. Need to modify for v1.10, as follows:

python3 setup.py sdist bdist_wheel
cd flink-python/dist
tar -xf apache-flink-1.10.dev0.tar.gz
cd ../..
cp flink-connectors/flink-connector-kafka-base/target/flink-connector-kafka-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib
cp flink-connectors/flink-connector-kafka-base/target/original-flink-connector-kafka-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib

cd flink-python/dist/apache-flink-1.10.dev0
python setup.py install
pip3 list |grep dev
garyfeng commented 4 years ago

In addition, it looks like the packing of apache-flink is missing the connectors as dependencies. How do we get those in here? In setup.py?

garyfeng commented 4 years ago

I couldn't find anything in the setup.py that explicitly defines the JAR dependency. It looks like we will just need to copy all the connectors JAR files mention in the pyflink.table.descriptors.py file in the pyflink/lib folder of the python3.7/dist-packages/apache_flink-1.10.dev0-py3.7.egg.

I tried to install pyflink v1.9 on a flinkv1.10 machine, and get the following error message:

>>> from pyflink.table.descriptors import Kafka
>>> Kafka()
ERROR:root:Could not find valid FLINK_HOME(Flink distribution directory) in current environment.

That's to be expected. But at least fylink seems to work.

garyfeng commented 4 years ago

The http://www.datanucleus.org/ site is still down today.

Turns out this is referenced in hive-site.xml as an alternative to default values, see https://cwiki.apache.org/confluence/display/Hive/AdminManual+Configuration#AdminManualConfiguration-hive-site.xmlandhive-default.xml.template

Nothing else is referencing the website. I can disable hive as a connector if the downtime continues.

garyfeng commented 4 years ago

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html#adding-connector-and-library-dependencies documents the need to include connector dependencies in your application. In particular they recommend putting all functions in a single JAR for your application. Then looking at the pom.xml for the flink-python directory, no connector dependency is specified.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>flink-parent</artifactId>
        <groupId>org.apache.flink</groupId>
        <version>1.9-SNAPSHOT</version>
        <relativePath>..</relativePath>
    </parent>

    <artifactId>flink-python_${scala.binary.version}</artifactId>
    <name>flink-python</name>

    <packaging>jar</packaging>

    <dependencies>

        <!-- core dependencies -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${project.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${project.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${project.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Python API dependencies -->

        <dependency>
            <groupId>net.sf.py4j</groupId>
            <artifactId>py4j</artifactId>
            <version>${py4j.version}</version>
        </dependency>
        <dependency>
            <groupId>net.razorvine</groupId>
            <artifactId>pyrolite</artifactId>
            <version>4.13</version>
            <exclusions>
                <exclusion>
                    <groupId>net.razorvine</groupId>
                    <artifactId>serpent</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-antrun-plugin</artifactId>
                <executions>
                    <execution>
                        <id>clean</id>
                        <phase>clean</phase>
                        <goals>
                            <goal>run</goal>
                        </goals>
                        <configuration>
                            <target>
                                <delete includeEmptyDirs="true">
                                    <fileset dir="${project.basedir}/pyflink"
                                             includes="**/*.pyc,**/__pycache__"/>
                                    <fileset dir="${project.basedir}/pyflink">
                                        <and>
                                            <size value="0"/>
                                            <type type="dir"/>
                                        </and>
                                    </fileset>
                                </delete>
                                <delete file="${project.basedir}/lib/pyflink.zip"/>
                                <delete dir="${project.basedir}/target"/>
                                <delete dir="${project.basedir}/build"/>
                                <delete dir="${project.basedir}/dist"/>
                                <delete dir="${project.basedir}/apache_flink.egg-info"/>
                            </target>
                        </configuration>
                    </execution>
                    <execution>
                        <id>generate-resources</id>
                        <phase>generate-resources</phase>
                        <goals>
                            <goal>run</goal>
                        </goals>
                        <configuration>
                            <target>
                                <delete includeEmptyDirs="true">
                                    <fileset dir="${project.basedir}/pyflink"
                                             includes="**/*.pyc,**/__pycache__"/>
                                    <fileset dir="${project.basedir}/pyflink">
                                        <and>
                                            <size value="0"/>
                                            <type type="dir"/>
                                        </and>
                                    </fileset>
                                </delete>
                                <delete file="${project.basedir}/lib/pyflink.zip"/>
                                <zip destfile="${project.basedir}/lib/pyflink.zip">
                                    <fileset dir="${project.basedir}"
                                             includes="pyflink/**/*"/>
                                </zip>
                            </target>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>shade-flink</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <includes combine.children="append">
                                    <include>net.razorvine:*</include>
                                    <include>net.sf.py4j:*</include>
                                </includes>
                            </artifactSet>
                            <relocations combine.children="append">
                                <relocation>
                                    <pattern>py4j</pattern>
                                    <shadedPattern>org.apache.flink.api.python.shaded.py4j</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>net.razorvine</pattern>
                                    <shadedPattern>org.apache.flink.api.python.shaded.net.razorvine</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
garyfeng commented 4 years ago

http://www.datanucleus.org/ is back online. Compiling flinkv1.10 now. Will test if moving the JAR files into the python package will work.

garyfeng commented 4 years ago

This fixed it for flinkv1.10. Will incorporate this into the Dockerfile

# compile flink
RUN apt-get update && apt-get install -y git curl 
RUN git clone https://github.com/apache/flink.git
RUN cd flink && git fetch origin release-1.10  && git checkout -b release-1.10 origin/release-1.10

# running into an atp-get update error message; sites not available
# turns out it's docker's system clock was behind; restarted docker
# assuming we have the flink source installed under ./flink
# ADD flink /usr/local/flink

# fix the hive server-down issue by skipping it
# overwriting the pom with the modified one
# gut this cause other problems
# COPY pom.xml ./flink/flink-connectors/pom.xml
RUN cd ./flink && mvn clean install -DskipTests -Dfast

# Install python3
# Install Python
RUN apt-get -y install python3 python3-distutils && \
    ln -s /usr/bin/python3 /usr/bin/python; 

Now from this one, we do docker build -t flink_1.10 . to build, and then run it docker run -it flink_1.10 /bin/bash. Then we manually test out these steps:

# install setuptools and pip
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python ./get-pip.py
# create the pyflink official version
python3 setup.py sdist bdist_wheel
cd dist
# unzip the official tar.gz, and copy over the 2 missing JARs
tar -xf apache-flink-1.10.dev0.tar.gz
cd ../..
cp flink-connectors/flink-connector-kafka-base/target/flink-connector-kafka-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib
cp flink-connectors/flink-connector-kafka-base/target/original-flink-connector-kafka-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib
# install the fixed version
cd flink-python/dist/apache-flink-1.10.dev0
python setup.py install
# remember to export the FLINK_HOME env var
export FLINK_HOME=/usr/local/lib/python3.7/dist-packages/apache_flink-1.10.dev0-py3.7.egg/pyflink
# verify that this is installed
pip3 list |grep dev

Now we start python and test this out therein:

from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
Kafka()

This should give you an object like <pyflink.table.descriptors.Kafka object at 0x7fc4d5c42208>. If it quits python, something is wrong.

garyfeng commented 4 years ago

The following Dockerfile works for the Kafka() bug

###############################################################################
FROM maven:3.6-jdk-8-slim AS builder

# compile flink
RUN apt-get update && apt-get install -y git curl 
RUN git clone https://github.com/apache/flink.git
RUN cd flink && git fetch origin release-1.10  && git checkout -b release-1.10 origin/release-1.10

# running into an atp-get update error message; sites not available
# turns out it's docker's system clock was behind; restarted docker
# assuming we have the flink source installed under ./flink
# ADD flink /usr/local/flink

# fix the hive server-down issue by skipping it
# overwriting the pom with the modified one
# gut this cause other problems
# COPY pom.xml ./flink/flink-connectors/pom.xml
RUN cd ./flink && mvn clean install -DskipTests -Dfast

# Install python3
# Install Python
RUN apt-get -y install python3 python3-distutils && \
    ln -s /usr/bin/python3 /usr/bin/python; 

# install setuptools and pip
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py &&  \
    python ./get-pip.py
# create the pyflink official version
# unzip the official tar.gz, and copy over the 2 missing JARs
WORKDIR /flink/flink-python/ 
RUN python setup.py sdist bdist_wheel 
RUN cd dist && \
    tar -xf apache-flink-1.10.dev0.tar.gz && \
    cd ../.. && \
    cp flink-connectors/flink-connector-kafka-base/target/flink-connector-kafka-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib && \
    cp flink-connectors/flink-connector-kafka-base/target/original-flink-connector-kafka-base_2.11-1.10-SNAPSHOT.jar flink-python/dist/apache-flink-1.10.dev0/deps/lib

###############################################################################
# Build Operations Playground Image
###############################################################################

FROM flink:1.10.0-scala_2.11 AS final

# Install Python
RUN apt-get update && \
    apt-get -y install curl python3 python3-distutils && \
    ln -s /usr/bin/python3 /usr/bin/python; 
# install pyFlink & kafka-python 
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
RUN python ./get-pip.py
RUN python -m pip install kafka-python

# Copy over the fixed pyflink
WORKDIR /
COPY --from=builder /flink/flink-python/dist/apache-flink-1.10.dev0 /apache-flink-1.10.dev0

# install the fixed version
RUN cd /apache-flink-1.10.dev0 && \
    python setup.py install
# remember to export the FLINK_HOME env var
ENV FLINK_HOME=/usr/local/lib/python3.7/dist-packages/apache_flink-1.10.dev0-py3.7.egg/pyflink

# setting up the work env
WORKDIR /opt/flink/bin

# Copy pyflink word count example
COPY WordCount.py WordCount.py
COPY input.txt /tmp/input

# Copy pyflink click count example
COPY ClickEventGenerator.py ClickEventGenerator.py
COPY ClickEventCount.py ClickEventCount.py

This however gives the same error message when I did Json(). Need to find the JAR for JSON and do this over again.

garyfeng commented 4 years ago

Turns out we don't need the original-*.JAR files. See #2 for a more complete solution. Note that #3 shows there are still missing JAR files

garyfeng commented 4 years ago

Need a redo in light of #3's finding

garyfeng commented 4 years ago

Fixed for now, but we may need to add more connector files in the future.