范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

云原生Flinkonk8s讲解与实战操作

  一、概述
  Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
  Flink官网:https://flink.apache.org/
  不同版本的文档:https://nightlies.apache.org/flink/
  k8s on flink 官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/
  也可以参考我之前的文章:大数据Hadoop之——实时计算流计算引擎Flink(Flink环境部署)
  GitHub地址:https://github.com/apache/flink/tree/release-1.14.6/ 二、Flink 运行模式
  官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/overview/
  FLink on yarn 有三种运行模式: yarn-session模式(Seesion Mode) yarn-cluster模式(Per-Job Mode) Application模式(Application Mode)
  【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在FLINK-26000中。 三、Flink on k8s实战操作
  1)flink下载
  下载地址:https://flink.apache.org/downloads.html wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz2)构建基础镜像docker pull apache/flink:1.14.6-scala_2.12 docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12 docker push myharbor.com/bigdata/flink:1.14.6-scala_2.123)session模式
  Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。
  Kubernetes 中的Flink Session 集群部署至少包含三个组件: 运行 JobManager  的部署TaskManagers  池的部署暴露 JobManager   的REST 和 UI 端口的服务1、Native Kubernetes 模式
  参数配置:
  https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace 【1】构建镜像DockerfileFROM myharbor.com/bigdata/flink:1.14.6-scala_2.12 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8
  开始构建镜像 docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache  # 上传镜像 docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12【2】创建命名空间和serviceaccount# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account【3】创建flink集群./bin/kubernetes-session.sh  	-Dkubernetes.cluster-id=my-first-flink-cluster   	-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12  	-Dkubernetes.namespace=flink  	-Dkubernetes.jobmanager.service-account=flink-service-account  	-Dkubernetes.rest-service.exposed.type=NodePort
  【4】提交任务./bin/flink run      --target kubernetes-session      -Dkubernetes.cluster-id=my-first-flink-cluster      -Dkubernetes.namespace=flink      -Dkubernetes.jobmanager.service-account=flink-service-account      ./examples/streaming/TopSpeedWindowing.jar               #   参数配置     ./examples/streaming/WordCount.jar     -Dkubernetes.taskmanager.cpu=2000m      -Dexternal-resource.limits.kubernetes.cpu=4000m  	-Dexternal-resource.limits.kubernetes.memory=10Gi  	-Dexternal-resource.requests.kubernetes.cpu=2000m  	-Dexternal-resource.requests.kubernetes.memory=8Gi  	-Dkubernetes.taskmanager.cpu=2000m
  【温馨提示】注意jdk版本,目前jdk8是正常的。
  【5】查看kubectl get pods -n flink kubectl logs -f my-first-flink-cluster-taskmanager-1-1
  【6】删除flink集群kubectl delete deployment/my-first-flink-cluster -n flink kubectl delete ns flink --force2、Standalone模式【1】构建镜像
  默认用户是flink用户,这里我换成admin,根据企业需要更换用户,脚本可以通过上面运行的pod拿到。
  启动脚本  docker-entrypoint.sh  #!/usr/bin/env bash  ############################################################################### #  Licensed to the Apache Software Foundation (ASF) under one #  or more contributor license agreements.  See the NOTICE file #  distributed with this work for additional information #  regarding copyright ownership.  The ASF licenses this file #  to you under the Apache License, Version 2.0 (the #  "License"); you may not use this file except in compliance #  with the License.  You may obtain a copy of the License at # #      http://www.apache.org/licenses/LICENSE-2.0 # #  Unless required by applicable law or agreed to in writing, software #  distributed under the License is distributed on an "AS IS" BASIS, #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #  See the License for the specific language governing permissions and # limitations under the License. ###############################################################################  COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server"  # If unspecified, the hostname of the container is taken as the JobManager address JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"  drop_privs_cmd() {     if [ $(id -u) != 0 ]; then         # Don"t need to drop privs if EUID != 0         return     elif [ -x /sbin/su-exec ]; then         # Alpine         echo su-exec admin     else         # Others         echo gosu admin     fi }  copy_plugins_if_required() {   if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then     return 0   fi    echo "Enabling required built-in plugins"   for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ";" " "); do     echo "Linking ${target_plugin} to plugin directory"     plugin_name=${target_plugin%.jar}      mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"     if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then       echo "Plugin ${target_plugin} does not exist. Exiting."       exit 1     else       ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"       echo "Successfully enabled ${target_plugin}"     fi   done }  set_config_option() {   local option=$1   local value=$2    # escape periods for usage in regular expressions   local escaped_option=$(echo ${option} | sed -e "s/./\./g")    # either override an existing entry, or append a new one   if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then         sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"   else         echo "${option}: ${value}" >> "${CONF_FILE}"   fi }  prepare_configuration() {     set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}     set_config_option blob.server.port 6124     set_config_option query.server.port 6125      if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then         set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}     fi      if [ -n "${FLINK_PROPERTIES}" ]; then         echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"     fi     envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}" }  maybe_enable_jemalloc() {     if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then         JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"         JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"         if [ -f "$JEMALLOC_PATH" ]; then             export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH         elif [ -f "$JEMALLOC_FALLBACK" ]; then             export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK         else             if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then                 MSG_PATH=$JEMALLOC_PATH             else                 MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"             fi             echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn"t be found. glibc will be used instead."         fi     fi }  maybe_enable_jemalloc  copy_plugins_if_required  prepare_configuration  args=("$@") if [ "$1" = "help" ]; then     printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER}) "     printf "    Or $(basename "$0") help  "     printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the "DISABLE_JEMALLOC" environment variable to "true". "     exit 0 elif [ "$1" = "jobmanager" ]; then     args=("${args[@]:1}")      echo "Starting Job Manager"      exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_STANDALONE} ]; then     args=("${args[@]:1}")      echo "Starting Job Manager"      exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then     args=("${args[@]:1}")      echo "Starting History Server"      exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" elif [ "$1" = "taskmanager" ]; then     args=("${args[@]:1}")      echo "Starting Task Manager"      exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" fi  args=("${args[@]}")  # Running command in pass-through mode exec $(drop_privs_cmd) "${args[@]}"
  编排Dockerfile FROM myharbor.com/bigdata/centos:7.9.2009  USER root  # 安装常用工具 RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof  # 设置时区,默认是UTC时区 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone  RUN mkdir -p /opt/apache  ADD jdk-8u212-linux-x64.tar.gz /opt/apache/  ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/  ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH  # 创建用户应用jar目录 RUN mkdir $FLINK_HOME/usrlib/  #RUN mkdir home COPY docker-entrypoint.sh /opt/apache/ RUN chmod +x /opt/apache/docker-entrypoint.sh  RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin  RUN chown -R admin:admin /opt/apache  #设置的工作目录 WORKDIR $FLINK_HOME  # 对外暴露端口 EXPOSE 6123 8081  # 执行脚本,构建镜像时不执行,运行实例才会执行 ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]
  开始构建镜像 docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache  # 上传镜像 docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12  # 删除镜像 docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12【2】创建命名空间和serviceaccount# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account【3】编排yaml文件flink-configuration-configmap.yaml  apiVersion: v1 kind: ConfigMap metadata:   name: flink-config   labels:     app: flink data:   flink-conf.yaml: |+     jobmanager.rpc.address: flink-jobmanager     taskmanager.numberOfTaskSlots: 2     blob.server.port: 6124     jobmanager.rpc.port: 6123     taskmanager.rpc.port: 6122     queryable-state.proxy.ports: 6125     jobmanager.memory.process.size: 3200m     taskmanager.memory.process.size: 2728m     taskmanager.memory.flink.size: 2280m     parallelism.default: 2       log4j-console.properties: |+     # This affects logging for both user code and Flink     rootLogger.level = INFO     rootLogger.appenderRef.console.ref = ConsoleAppender     rootLogger.appenderRef.rolling.ref = RollingFileAppender      # Uncomment this if you want to _only_ change Flink"s logging     #logger.flink.name = org.apache.flink     #logger.flink.level = INFO      # The following lines keep the log level of common libraries/connectors on     # log level INFO. The root logger does not override this. You have to manually     # change the log levels here.     logger.akka.name = akka     logger.akka.level = INFO     logger.kafka.name= org.apache.kafka     logger.kafka.level = INFO     logger.hadoop.name = org.apache.hadoop     logger.hadoop.level = INFO     logger.zookeeper.name = org.apache.zookeeper     logger.zookeeper.level = INFO      # Log all infos to the console     appender.console.name = ConsoleAppender     appender.console.type = CONSOLE     appender.console.layout.type = PatternLayout     appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n      # Log all infos in the given rolling file     appender.rolling.name = RollingFileAppender     appender.rolling.type = RollingFile     appender.rolling.append = false     appender.rolling.fileName = ${sys:log.file}     appender.rolling.filePattern = ${sys:log.file}.%i     appender.rolling.layout.type = PatternLayout     appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n     appender.rolling.policies.type = Policies     appender.rolling.policies.size.type = SizeBasedTriggeringPolicy     appender.rolling.policies.size.size=100MB     appender.rolling.strategy.type = DefaultRolloverStrategy     appender.rolling.strategy.max = 10      # Suppress the irrelevant (wrong) warnings from the Netty channel handler     logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline     logger.netty.level = OFFjobmanager-service.yaml  可选服务,仅非 HA 模式需要。apiVersion: v1 kind: Service metadata:   name: flink-jobmanager spec:   type: ClusterIP   ports:   - name: rpc     port: 6123   - name: blob-server     port: 6124   - name: webui     port: 8081   selector:     app: flink     component: jobmanagerjobmanager-rest-service.yaml   可选服务,将 jobmanager rest  端口公开为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata:   name: flink-jobmanager-rest spec:   type: NodePort   ports:   - name: rest     port: 8081     targetPort: 8081     nodePort: 30081   selector:     app: flink     component: jobmanagertaskmanager-query-state-service.yaml   可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata:   name: flink-taskmanager-query-state spec:   type: NodePort   ports:   - name: query-state     port: 6125     targetPort: 6125     nodePort: 30025   selector:     app: flink     component: taskmanager
  以上几个配置文件是公共的 jobmanager-session-deployment-non-ha.yaml  apiVersion: apps/v1 kind: Deployment metadata:   name: flink-jobmanager spec:   replicas: 1   selector:     matchLabels:       app: flink       component: jobmanager   template:     metadata:       labels:         app: flink         component: jobmanager     spec:       containers:       - name: jobmanager         image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12         args: ["jobmanager"]         ports:         - containerPort: 6123           name: rpc         - containerPort: 6124           name: blob-server         - containerPort: 8081           name: webui         livenessProbe:           tcpSocket:             port: 6123           initialDelaySeconds: 30           periodSeconds: 60         volumeMounts:         - name: flink-config-volume           mountPath: /opt/apache/flink-1.14.6/conf/         securityContext:           runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary       volumes:       - name: flink-config-volume         configMap:           name: flink-config           items:           - key: flink-conf.yaml             path: flink-conf.yaml           - key: log4j-console.properties             path: log4j-console.propertiestaskmanager-session-deployment.yaml  apiVersion: apps/v1 kind: Deployment metadata:   name: flink-taskmanager spec:   replicas: 2   selector:     matchLabels:       app: flink       component: taskmanager   template:     metadata:       labels:         app: flink         component: taskmanager     spec:       containers:       - name: taskmanager         image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12         args: ["taskmanager"]         ports:         - containerPort: 6122           name: rpc         - containerPort: 6125           name: query-state         livenessProbe:           tcpSocket:             port: 6122           initialDelaySeconds: 30           periodSeconds: 60         volumeMounts:         - name: flink-config-volume           mountPath: /opt/apache/flink-1.14.6/conf/         securityContext:           runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary       volumes:       - name: flink-config-volume         configMap:           name: flink-config           items:           - key: flink-conf.yaml             path: flink-conf.yaml           - key: log4j-console.properties             path: log4j-console.properties【4】创建flink集群kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink  # service kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink  # Create the deployments for the cluster kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink kubectl create -f taskmanager-session-deployment.yaml -n flink
  镜像逆向解析dockerfile alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler" whaler flink:1.14.6-scala_2.12
  查看 kubectl get pods,svc -n flink -owide
  web:http://192.168.182.110:30081/#/overview
  【5】提交任务./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
  kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink
  【6】删除flink集群kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f taskmanager-session-deployment.yaml -n flink kubectl delete -f jobmanager-session-deployment.yaml -n flink kubectl delete ns flink --force【7】访问flink web
  端口就是 jobmanager-rest-service.yaml  文件中的NodePort
  http://192.168.182.110:30081/#/overview
  4)application模式(推荐)
  Kubernetes 中一个基本的Flink Application 集群部署包含三个组件: 运行 JobManager  的应用程序TaskManagers  池的部署暴露 JobManager   的REST 和 UI 端口的服务1、Native Kubernetes 模式(常用)【1】构建镜像DockerfileFROM myharbor.com/bigdata/flink:1.14.6-scala_2.12 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8 RUN mkdir -p $FLINK_HOME/usrlib COPY  TopSpeedWindowing.jar $FLINK_HOME/usrlib/
  开始构建镜像 docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache  # 上传镜像 docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12  # 删除镜像 docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12【2】创建命名空间和serviceacount# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account【3】创建flink集群并提交任务./bin/flink run-application      --target kubernetes-application      -Dkubernetes.cluster-id=my-first-application-cluster   	-Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12  	-Dkubernetes.jobmanager.replicas=1  	-Dkubernetes.namespace=flink  	-Dkubernetes.jobmanager.service-account=flink-service-account  	-Dexternal-resource.limits.kubernetes.cpu=2000m  	-Dexternal-resource.limits.kubernetes.memory=2Gi  	-Dexternal-resource.requests.kubernetes.cpu=1000m  	-Dexternal-resource.requests.kubernetes.memory=1Gi  	-Dkubernetes.rest-service.exposed.type=NodePort  	local:///opt/flink/usrlib/TopSpeedWindowing.jar
  【注意】  local  是应用模式中唯一支持的方案。local代表本地环境,这里即pod或者容器环境,并非宿主机。
  查看 kubectl get pods pods,svc -n flink
  kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink
  【4】删除flink集群kubectl delete deployment/my-first-application-cluster -n flink kubectl delete ns flink --force2、Standalone模式【1】构建镜像 Dockerfile
  启动脚本  docker-entrypoint.sh  #!/usr/bin/env bash  ############################################################################### #  Licensed to the Apache Software Foundation (ASF) under one #  or more contributor license agreements.  See the NOTICE file #  distributed with this work for additional information #  regarding copyright ownership.  The ASF licenses this file #  to you under the Apache License, Version 2.0 (the #  "License"); you may not use this file except in compliance #  with the License.  You may obtain a copy of the License at # #      http://www.apache.org/licenses/LICENSE-2.0 # #  Unless required by applicable law or agreed to in writing, software #  distributed under the License is distributed on an "AS IS" BASIS, #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #  See the License for the specific language governing permissions and # limitations under the License. ###############################################################################  COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server"  # If unspecified, the hostname of the container is taken as the JobManager address JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"  drop_privs_cmd() {     if [ $(id -u) != 0 ]; then         # Don"t need to drop privs if EUID != 0         return     elif [ -x /sbin/su-exec ]; then         # Alpine         echo su-exec admin     else         # Others         echo gosu admin     fi }  copy_plugins_if_required() {   if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then     return 0   fi    echo "Enabling required built-in plugins"   for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ";" " "); do     echo "Linking ${target_plugin} to plugin directory"     plugin_name=${target_plugin%.jar}      mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"     if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then       echo "Plugin ${target_plugin} does not exist. Exiting."       exit 1     else       ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"       echo "Successfully enabled ${target_plugin}"     fi   done }  set_config_option() {   local option=$1   local value=$2    # escape periods for usage in regular expressions   local escaped_option=$(echo ${option} | sed -e "s/./\./g")    # either override an existing entry, or append a new one   if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then         sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"   else         echo "${option}: ${value}" >> "${CONF_FILE}"   fi }  prepare_configuration() {     set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}     set_config_option blob.server.port 6124     set_config_option query.server.port 6125      if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then         set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}     fi      if [ -n "${FLINK_PROPERTIES}" ]; then         echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"     fi     envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}" }  maybe_enable_jemalloc() {     if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then         JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"         JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"         if [ -f "$JEMALLOC_PATH" ]; then             export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH         elif [ -f "$JEMALLOC_FALLBACK" ]; then             export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK         else             if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then                 MSG_PATH=$JEMALLOC_PATH             else                 MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"             fi             echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn"t be found. glibc will be used instead."         fi     fi }  maybe_enable_jemalloc  copy_plugins_if_required  prepare_configuration  args=("$@") if [ "$1" = "help" ]; then     printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER}) "     printf "    Or $(basename "$0") help  "     printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the "DISABLE_JEMALLOC" environment variable to "true". "     exit 0 elif [ "$1" = "jobmanager" ]; then     args=("${args[@]:1}")      echo "Starting Job Manager"      exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_STANDALONE} ]; then     args=("${args[@]:1}")      echo "Starting Job Manager"      exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then     args=("${args[@]:1}")      echo "Starting History Server"      exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" elif [ "$1" = "taskmanager" ]; then     args=("${args[@]:1}")      echo "Starting Task Manager"      exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" fi  args=("${args[@]}")  # Running command in pass-through mode exec $(drop_privs_cmd) "${args[@]}"
  编排 Dockerfile  FROM myharbor.com/bigdata/centos:7.9.2009  USER root  # 安装常用工具 RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof  # 设置时区,默认是UTC时区 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone  RUN mkdir -p /opt/apache  ADD jdk-8u212-linux-x64.tar.gz /opt/apache/  ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/  ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH  # 创建用户应用jar目录 RUN mkdir $FLINK_HOME/usrlib/  #RUN mkdir home COPY docker-entrypoint.sh /opt/apache/  RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin  RUN chown -R admin:admin /opt/apache RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh  #设置的工作目录 WORKDIR $FLINK_HOME  # 对外暴露端口 EXPOSE 6123 8081  # 执行脚本,构建镜像时不执行,运行实例才会执行 ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache  # 上传镜像 docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12  # 删除镜像 docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12【2】创建命名空间和 serviceacount# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account【3】编排yaml文件
  flink-configuration-configmap.yaml  apiVersion: v1 kind: ConfigMap metadata:   name: flink-config   labels:     app: flink data:   flink-conf.yaml: |+     jobmanager.rpc.address: flink-jobmanager     taskmanager.numberOfTaskSlots: 2     blob.server.port: 6124     jobmanager.rpc.port: 6123     taskmanager.rpc.port: 6122     queryable-state.proxy.ports: 6125     jobmanager.memory.process.size: 3200m     taskmanager.memory.process.size: 2728m     taskmanager.memory.flink.size: 2280m     parallelism.default: 2       log4j-console.properties: |+     # This affects logging for both user code and Flink     rootLogger.level = INFO     rootLogger.appenderRef.console.ref = ConsoleAppender     rootLogger.appenderRef.rolling.ref = RollingFileAppender      # Uncomment this if you want to _only_ change Flink"s logging     #logger.flink.name = org.apache.flink     #logger.flink.level = INFO      # The following lines keep the log level of common libraries/connectors on     # log level INFO. The root logger does not override this. You have to manually     # change the log levels here.     logger.akka.name = akka     logger.akka.level = INFO     logger.kafka.name= org.apache.kafka     logger.kafka.level = INFO     logger.hadoop.name = org.apache.hadoop     logger.hadoop.level = INFO     logger.zookeeper.name = org.apache.zookeeper     logger.zookeeper.level = INFO      # Log all infos to the console     appender.console.name = ConsoleAppender     appender.console.type = CONSOLE     appender.console.layout.type = PatternLayout     appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n      # Log all infos in the given rolling file     appender.rolling.name = RollingFileAppender     appender.rolling.type = RollingFile     appender.rolling.append = false     appender.rolling.fileName = ${sys:log.file}     appender.rolling.filePattern = ${sys:log.file}.%i     appender.rolling.layout.type = PatternLayout     appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n     appender.rolling.policies.type = Policies     appender.rolling.policies.size.type = SizeBasedTriggeringPolicy     appender.rolling.policies.size.size=100MB     appender.rolling.strategy.type = DefaultRolloverStrategy     appender.rolling.strategy.max = 10      # Suppress the irrelevant (wrong) warnings from the Netty channel handler     logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline     logger.netty.level = OFF
  jobmanager-service.yaml  可选服务,仅非 HA 模式需要。apiVersion: v1 kind: Service metadata:   name: flink-jobmanager spec:   type: ClusterIP   ports:   - name: rpc     port: 6123   - name: blob-server     port: 6124   - name: webui     port: 8081   selector:     app: flink     component: jobmanager
  jobmanager-rest-service.yaml   可选服务,将 jobmanager rest  端口公开为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata:   name: flink-jobmanager-rest spec:   type: NodePort   ports:   - name: rest     port: 8081     targetPort: 8081     nodePort: 30081   selector:     app: flink     component: jobmanager
  taskmanager-query-state-service.yaml   可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata:   name: flink-taskmanager-query-state spec:   type: NodePort   ports:   - name: query-state     port: 6125     targetPort: 6125     nodePort: 30025   selector:     app: flink     component: taskmanager
  jobmanager-application-non-ha.yaml   ,非高可用apiVersion: batch/v1 kind: Job metadata:   name: flink-jobmanager spec:   template:     metadata:       labels:         app: flink         component: jobmanager     spec:       restartPolicy: OnFailure       containers:         - name: jobmanager           image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12           env:           args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]           ports:             - containerPort: 6123               name: rpc             - containerPort: 6124               name: blob-server             - containerPort: 8081               name: webui           livenessProbe:             tcpSocket:               port: 6123             initialDelaySeconds: 30             periodSeconds: 60           volumeMounts:             - name: flink-config-volume               mountPath: /opt/apache/flink-1.14.6/conf             - name: job-artifacts-volume               mountPath: /opt/apache/flink-1.14.6/usrlib           securityContext:             runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary       volumes:         - name: flink-config-volume           configMap:             name: flink-config             items:               - key: flink-conf.yaml                 path: flink-conf.yaml               - key: log4j-console.properties                 path: log4j-console.properties         - name: job-artifacts-volume           hostPath:             path: /mnt/nfsdata/flink/application/job-artifacts
  【温馨提示】注意这里的挂载 /mnt/bigdata/flink/usrlib  ,最好这里使用共享目录。
  taskmanager-job-deployment.yaml  apiVersion: apps/v1 kind: Deployment metadata:   name: flink-taskmanager spec:   replicas: 2   selector:     matchLabels:       app: flink       component: taskmanager   template:     metadata:       labels:         app: flink         component: taskmanager     spec:       containers:       - name: taskmanager         image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12         env:         args: ["taskmanager"]         ports:         - containerPort: 6122           name: rpc         - containerPort: 6125           name: query-state         livenessProbe:           tcpSocket:             port: 6122           initialDelaySeconds: 30           periodSeconds: 60         volumeMounts:         - name: flink-config-volume           mountPath: /opt/apache/flink-1.14.6/conf         - name: job-artifacts-volume           mountPath: /opt/apache/flink-1.14.6/usrlib         securityContext:           runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary       volumes:       - name: flink-config-volume         configMap:           name: flink-config           items:           - key: flink-conf.yaml             path: flink-conf.yaml           - key: log4j-console.properties             path: log4j-console.properties       - name: job-artifacts-volume         hostPath:           path: /mnt/nfsdata/flink/application/job-artifacts【4】创建flink集群并提交任务kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink  # service kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink  # Create the deployments for the cluster kubectl create -f  jobmanager-application-non-ha.yaml -n flink kubectl create -f  taskmanager-job-deployment.yaml -n flink
  查看 kubectl get pods,svc -n flink
  【5】删除flink集群kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f jobmanager-rest-service.yaml -n flink kubectl delete -f taskmanager-query-state-service.yaml -n flink kubectl delete -f jobmanager-application-non-ha.yaml -n flink kubectl delete -f taskmanager-job-deployment.yaml -n flink  kubectl delete ns flink --force【6】查看kubectl get pods,svc -n flink kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash
  Flink on k8s 讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享【云原生+大数据】相关的教程,请小伙伴耐心等待~

1099起!盘点元旦五款不吃灰数码清单,照着买就对了2021年已到了最后的尾声,不知道大家在跨年元旦之际有没有纳新的准备呢?如果想着入手一些极具科技感的数码好物来为新的一年打气的话,那么下面这份超酷数码装备清单则不容错过了,颜值与实一场雪,才知素颜的中国有多美雪恋冬寂悸雪恋冬江山不夜雪千里天地无私玉万家时光流逝没有比四季更好的顺序了冬天的中国一下雪就变成了中國苏州,杭州,南京上海,西安,洛阳新疆,西藏,敦煌一场雪下来天地银装素裹不管在城中国同胞去泰国,明知会被坑也无怨无悔?全因为这些老板娘?泰国虽然是东南亚地区的国家之一,但是这几年凭借自身的特色,吸引了国内许许多多的游客到泰国游玩。一般国内的游客到了泰国以后,他们第1站会去曼谷,因为在曼谷拥有众多的美食。(此处已添加4连败!西甲卫冕冠军创耻辱纪录,新年首战大黑马落后皇马17分时间进入2022年,202122赛季西甲联赛第19轮的赛事全面打响,其中卫冕冠军马德里竞技将坐镇主场万达大都会球场迎战本赛季超级大黑马升班马巴列卡诺,在积分榜上,马竞的积分比升班马阿联老了,中国男篮难了在CBA元旦大战中广东男篮以95115惨败辽宁男篮!而比输球更郁闷的是阿联的老去!上一场战胜吉林,易建联发挥出色,状态回暖,全场出战26分钟,12投9中,三分球6中4,砍下了25分美国也有战忽局,居然开始力捧中国,唱衰美国,这好像真的是实话美国媒体这是怎么了?平时不都是一三五中国威胁论,二四六中国崩溃论,怎么这次媒体却在渲染中国各方面的优势,而贬低美国的错误和衰落呢?美国媒体在2021年12月31日报道称美国VS中国詹姆斯7场30成历史首人,帕金斯直呼山羊,库兹马发文祝贺在湖人对阵开拓者队比赛中,勒布朗詹姆斯出战仅29分钟,26投16中,三分10中5,罚球9中6,砍下43分14篮板4助2断2帽0失误。据悉,NBA历史上在35岁及以上的球员,能完成单云顶S6上分吃鸡阵容,炼金变异枪手推荐大家好我是Libra,接下来给大家带来一套简单成型吃分运营阵容!废话不多说,我们往下看1炼金变异枪手8人口阵容组成从左到右,从上到下祖安怒兽沃里克(2费),祖安狂人蒙多医生(4费)对抗路再添神秘路人王,赖神对线被其教训,拿出老夫子也无济于事在王者荣耀的五条分路当中,对抗路是最需要细节的一条分路,从灵活的战士到承伤的坦克,都需要对抗路玩家灵活应变,在对线的过程中更是如此。再加上对抗路是孤独一般的存在,因此很多强者为了追4个月开始旅行,见识远超同龄人,北大爸爸的另类育儿经值得借鉴认识一位北大毕业的学霸爸爸,他特别看重旅行对孩子成长的重要性,从他家儿子4个月开始带出去旅行,至今还在6岁,已经去过20多个城市。他说从一开始的走马观花,到现在旅行通识教育,孩子的90后父母给娃起名,上学第一天就出尽风头,老师这名字有水平7080后的爸妈给娃起名时,可能会参考长辈的建议,所以一些孩子的名字里,还会有建国成功类的字眼。但现如今,90后已经晋升成了爸妈,新时代小孩子的名字里,大都透着满满的朝气和个性,立
欧冠巴萨VS比尔森,死亡之组送分童子?欧冠无弱旅昨天只对了一半,这是远远不够的,我又掉进切尔西的坑里了,无法理解切尔西怎么就突然不会踢球了。后防线缺乏默契?中场缺少坎特?没有正印前锋?那么多场了也解决不了?还能输给萨格勒布迪纳摩稻盛和夫去世,他这28个观点依然影响着千万餐饮人文章来源职业餐饮网据日媒报道,日本知名实业家京瓷名誉会长稻盛和夫去世,终年90岁。一批餐饮人在自己的社交平台悼念这位经营之圣人生引领者精神导师。稻盛和夫在27岁时创立了京瓷,又在5ampampquot央视一姐ampampquot倪萍苦恋陈凯歌8年遭陈红逼宫,四张白纸骗上亿观众陈红用一句我们都是单身彻底粉碎倪萍苦守八年的付出彼时的倪萍在央视混的风生水起机缘巧合下结识才华横溢的陈凯歌才子与佳人怎么看都应该传出一段佳话果不其然两人一见如故火速开启同居生活陷入AiFA体育巴塞罗那VS比尔森胜利,巴萨主场实力碾压,或轻取三分欧冠巴塞罗那VS比尔森胜利,巴萨主场实力碾压,或能轻取三分9月8日凌晨300,巴塞罗那将在主场迎来他们新赛季在欧冠赛场上的第一场比赛,他们的对手将是来自捷克联赛的比尔森胜利。这两支71岁王石二次创业,青春洋溢,曾同情比尔盖茨不知道花钱老的快导读2019年,王石公开同情比尔盖茨,表示他每天都在发愁一年几十亿美金怎么花,这几年老的非常迅速。并谈到如果当年自己没有放弃万科股份改制时的40股权,现在也是百亿富翁,也会老的很快S12世界赛时间被曝光,决赛在凌晨3点进行,对LPL观众很不友好最近关于电竞圈的热度是大家有目共睹的,就目前这个情况来看,我们赛区在今年S12是很有机会夺冠的,毕竟TES和JDG的实力都很强,都有可能在世界赛上大展宏图,所以大家还是相当期待世界华为宣布在30多个国家推出经CE认证的GT3Pro手表和腕部血压监测华为终于开始在欧洲和其他市场的WatchGT3Pro上启用ECG功能,这是一款于5月发布的智能手表。华为ECG应用程序也已在欧洲为华为手表D推出,该设备备受期待的腕上血压监测也是如推荐五部经典的国外游戏改编战争灾难电影第一部魔兽影片根据1994年暴雪娱乐制作的游戏魔兽争霸人类与兽人改编,以人类联盟和兽人部落之间发生冲突为背景,讲述了黑暗之门打开之后,两个世界的种族为了各自的生存和家园奋起而战的故摩尔庄园手游花园巧匠特里物资获取方法介绍摩尔庄园手游中花园巧匠运送物资可以解锁快运的功能,但是需要买超拉,可是还有很多小伙伴不知道怎么获取花园巧匠特里物资,下面就由小编来为大家具体介绍一下,还不了解的小伙伴快来看看吧!摩基于真实物理的超逼真的攻城模拟器发售由ScreamboxStudio独立开发的超逼真的攻城模拟器于今日(9月8日)以抢先登陆的方式正式在Steam上线。超逼真的攻城模拟器是一款基于真实物理的战术攻城游戏。你可以在游戏等等党看傻了!30系还能涨价?巫师新作不止一部!2077资料片唯一CDPR确认新巫师不止一部!但往日之影将是赛博2077唯一资料片CDPR官方近日公布了赛博朋克2077的全新资料片往日之影,并预告资料片将于2023年登陆PC等平台。据官方回复,往