设为首页 收藏本站
查看: 978|回复: 0

[经验分享] storm操作zookeeper源码分析-cluster.clj

[复制链接]

尚未签到

发表于 2015-9-6 07:16:40 | 显示全部楼层 |阅读模式
  storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:
ClusterState协议

(defprotocol ClusterState
  (set-ephemeral-node [this path data])
  (delete-node [this path])
  (create-sequential [this path data])
  ;; if node does not exist, create persistent with this data
  (set-data [this path data])
  (get-data [this path watch?])
  (get-version [this path watch?])
  (get-data-with-version [this path watch?])
  (get-children [this path watch?])
  (mkdirs [this path])
  (close [this])
  (register [this callback])
  (unregister [this id]))  StormClusterState协议封装了一组storm与zookeeper进行交互的函数,可以将StormClusterState协议中的函数看成ClusterState协议中函数的"组合"。StormClusterState协议定义如下:
StormClusterState协议

(defprotocol StormClusterState
  (assignments [this callback])
  (assignment-info [this storm-id callback])
  (assignment-info-with-version [this storm-id callback])
  (assignment-version [this storm-id callback])
  (active-storms [this])
  (storm-base [this storm-id callback])
  (get-worker-heartbeat [this storm-id node port])
  (executor-beats [this storm-id executor->node+port])
  (supervisors [this callback])
  (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
  (setup-heartbeats! [this storm-id])
  (teardown-heartbeats! [this storm-id])
  (teardown-topology-errors! [this storm-id])
  (heartbeat-storms [this])
  (error-topologies [this])
  (worker-heartbeat! [this storm-id node port info])
  (remove-worker-heartbeat! [this storm-id node port])
  (supervisor-heartbeat! [this supervisor-id info])
  (activate-storm! [this storm-id storm-base])
  (update-storm! [this storm-id new-elems])
  (remove-storm-base! [this storm-id])
  (set-assignment! [this storm-id info])
  (remove-storm! [this storm-id])
  (report-error [this storm-id task-id node port error])
  (errors [this storm-id task-id])
  (disconnect [this]))  命名空间backtype.storm.cluster除了定义ClusterState和StormClusterState这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函数如下:
该函数返回一个实现了ClusterState协议的对象,通过这个对象就可以与zookeeper进行交互了。
mk-distributed-cluster-state函数

(defn mk-distributed-cluster-state
  ;; conf绑定了storm.yaml中的配置信息,是一个map对象
  [conf]
  ;; zk绑定一个zk client,Storm使用CuratorFramework与Zookeeper进行交互
  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
    ;; 创建storm集群在zookeeper上的根目录,默认值为/storm
    (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
    (.close zk))
  ;; callbacks绑定回调函数集合,是一个map对象
  (let [callbacks (atom {})
        ;; active标示zookeeper集群状态
        active (atom true)
        ;; zk重新绑定新的zk client,该zk client设置了watcher,这样当zookeeper集群的状态发生变化时,zk server会给zk client发送相应的event,zk client设置的watcher会调用callbacks中相应回调函数来处理event
        ;; 启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zk server发送的event后,会调用相应的回调函数
        ;; mk-client函数定义在zookeeper.clj文件中,请参见其定义部分
      zk (zk/mk-client conf
                         (conf STORM-ZOOKEEPER-SERVERS)
                         (conf STORM-ZOOKEEPER-PORT)
                         :auth-conf conf
                         :root (conf STORM-ZOOKEEPER-ROOT)
                         ;; :watcher绑定一个函数,指定zk client的默认watcher函数,state标示当前zk client的状态;type标示事件类型;path标示zookeeper上产生该事件的znode
                         ;; 该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用ClusterState的register函数添加的
                         :watcher (fn [state type path]
                                    (when @active
                                      (when-not (= :connected state)
                                        (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
                                      (when-not (= :none type)
                                        (doseq [callback (vals @callbacks)]
                                          (callback type path))))))]
    ;; reify相当于java中的implements,这里表示实现一个协议
    (reify
     ClusterState
     ;; register函数用于将回调函数加入callbacks中,key是一个32位的标识
     (register
       [this callback]
       (let [id (uuid)]
         (swap! callbacks assoc id callback)
         id))
     ;; unregister函数用于将指定key的回调函数从callbacks中删除
     (unregister
       [this id]
       (swap! callbacks dissoc id))
     ;; 在zookeeper上添加一个临时节点
     (set-ephemeral-node
       [this path data]
       (zk/mkdirs zk (parent-path path))
       (if (zk/exists zk path false)
         (try-cause
           (zk/set-data zk path data) ; should verify that it's ephemeral
           (catch KeeperException$NoNodeException e
             (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
             (zk/create-node zk path data :ephemeral)
             ))
         (zk/create-node zk path data :ephemeral)))
     ;; 在zookeeper上添加一个顺序节点
     (create-sequential
       [this path data]
       (zk/create-node zk path data :sequential))
     ;; 修改某个节点数据
     (set-data
       [this path data]
       ;; note: this does not turn off any existing watches
       (if (zk/exists zk path false)
         (zk/set-data zk path data)
         (do
           (zk/mkdirs zk (parent-path path))
           (zk/create-node zk path data :persistent))))
     ;; 删除指定节点
     (delete-node
       [this path]
       (zk/delete-recursive zk path))
     ;; 获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后,
     ;; 会给zk client发送一个事件,zk client接收事件后,会调用创建zk client时指定的默认watcher函数(即:watcher绑定的函数)
     (get-data
       [this path watch?]
       (zk/get-data zk path watch?))
     ;; 与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数
     (get-data-with-version
       [this path watch?]
       (zk/get-data-with-version zk path watch?))
     ;; 获取指定节点的version,watch?的含义与get-data函数中的watch?相同
     (get-version
       [this path watch?]
       (zk/get-version zk path watch?))
     ;; 获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同
     (get-children
       [this path watch?]
       (zk/get-children zk path watch?))
     ;; 在zookeeper上创建一个节点
     (mkdirs
       [this path]
       (zk/mkdirs zk path))
     ;; 关闭zk client
     (close
       [this]
       (reset! active false)
       (.close zk)))))  mk-storm-cluster-state函数定义如下:
mk-storm-cluster-state函数非常重要,该函数返回一个实现了StormClusterState协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互在启动nimbus和supervisor的函数中均调用了
  mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍。
mk-storm-cluster-state函数

(defn mk-storm-cluster-state
  [cluster-state-spec]
  ;; satisfies?谓词相当于java中的instanceof,判断cluster-state-spec是不是ClusterState实例
  (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                                [false cluster-state-spec]
                                [true (mk-distributed-cluster-state cluster-state-spec)])
      ;; 绑定topology id->回调函数的map,当/assignments/{topology id}数据发生变化时,zk client执行assignment-info-callback中topology id所对应的回调函数
        assignment-info-callback (atom {})
      ;; assignment-info-with-version-callback与assignment-info-callback类似
        assignment-info-with-version-callback (atom {})
      ;; assignment-version-callback与assignments-callback类似
        assignment-version-callback (atom {})
      ;; 当/supervisors标示的znode的子节点发生变化时,zk client执行supervisors-callback指向的函数
        supervisors-callback (atom nil)
      ;; 当/assignments标示的znode的子节点发生变化时,zk client执行assignments-callback指向的函数
        assignments-callback (atom nil)
      ;; 当/storms/{topology id}标示的znode的数据发生变化时,zk client执行storm-base-callback中topology id所对应的回调函数
        storm-base-callback (atom {})
      ;; register函数将"回调函数(fn ...)"添加到cluster-state的callbacks集合中,并返回标示该回调函数的uuid
        state-id (register
                   cluster-state
                ;; 定义"回调函数",type标示事件类型,path标示znode
                   (fn [type path]
                  ;; subtree绑定路径前缀如"assignments"、"storms"、"supervisors"等,args存放topology id
                     (let [[subtree & args] (tokenize-path path)]
                    ;; condp相当于java中的switch
                       (condp = subtree
                      ;; 当subtree="assignments"时,如果args为空,说明是/assignments的子节点发生变化,执行assignments-callback指向的回调函数,否则
                    ;; 说明/assignments/{topology id}标示的节点数据发生变化,执行assignment-info-callback指向的回调函数
                         ASSIGNMENTS-ROOT (if (empty? args)
                                            (issue-callback! assignments-callback)
                                            (issue-map-callback! assignment-info-callback (first args)))
                      ;; 当subtree="supervisors"时,说明是/supervisors的子节点发生变化,执行supervisors-callback指向的回调函数
                         SUPERVISORS-ROOT (issue-callback! supervisors-callback)
                    ;; 当subtree="storms"时,说明是/storms/{topology id}标示的节点数据发生变化,执行storm-base-callback指向的回调函数
                         STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                         ;; this should never happen
                         (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
    ;; 在zookeeper上创建storm运行topology所必需的znode
    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
      (mkdirs cluster-state p))
    ;; 返回一个实现StormClusterState协议的实例
    (reify
      StormClusterState
      ;; 获取/assignments的子节点列表,如果callback不为空,将其赋值给assignments-callback,并对/assignments添加"节点观察"
      (assignments
        [this callback]
        (when callback
          (reset! assignments-callback callback))
        (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
      ;; 获取/assignments/{storm-id}节点数据,即storm-id的分配信息,如果callback不为空,将其添加到assignment-info-callback中,并对/assignments/{storm-id}添加"数据观察"
      (assignment-info
        [this storm-id callback]
        (when callback
          (swap! assignment-info-callback assoc storm-id callback))
        (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
      ;; 获取/assignments/{storm-id}节点数据包括version信息,如果callback不为空,将其添加到assignment-info-with-version-callback中,并对/assignments/{storm-id}添加"数据观察"
      (assignment-info-with-version
        [this storm-id callback]
        (when callback
          (swap! assignment-info-with-version-callback assoc storm-id callback))
        (let [{data :data version :version}
              (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
        {:data (maybe-deserialize data)
         :version version}))
      ;; 获取/assignments/{storm-id}节点数据的version信息,如果callback不为空,将其添加到assignment-version-callback中,并对/assignments/{storm-id}添加"数据观察"
      (assignment-version
        [this storm-id callback]
        (when callback
          (swap! assignment-version-callback assoc storm-id callback))
        (get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
      ;; 获取storm集群中正在运行的topology id即/storms的子节点列表
      (active-storms
        [this]
        (get-children cluster-state STORMS-SUBTREE false))
      ;; 获取storm集群中所有有心跳的topology id即/workerbeats的子节点列表
      (heartbeat-storms
        [this]
        (get-children cluster-state WORKERBEATS-SUBTREE false))
      ;; 获取所有有错误的topology id即/errors的子节点列表
      (error-topologies
        [this]
        (get-children cluster-state ERRORS-SUBTREE false))
      ;; 获取指定storm-id进程的心跳信息,即/workerbeats/{storm-id}/{node-port}节点数据
      (get-worker-heartbeat
        [this storm-id node port]
        (-> cluster-state
            (get-data (workerbeat-path storm-id node port) false)
            maybe-deserialize))
      ;; 获取指定进程中所有线程的心跳信息
      (executor-beats
        [this storm-id executor->node+port]
        ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
        ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
        ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
        ;; we avoid situations like that
        (let [node+port->executors (reverse-map executor->node+port)
              all-heartbeats (for [[[node port] executors] node+port->executors]
                               (->> (get-worker-heartbeat this storm-id node port)
                                    (convert-executor-beats executors)
                                    ))]
          (apply merge all-heartbeats)))
      ;; 获取/supervisors的子节点列表,如果callback不为空,将其赋值给supervisors-callback,并对/supervisors添加"节点观察"
      (supervisors
        [this callback]
        (when callback
          (reset! supervisors-callback callback))
        (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
      ;; 获取/supervisors/{supervisor-id}节点数据,即supervisor的心跳信息
      (supervisor-info
        [this supervisor-id]
        (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
      ;; 设置进程心跳信息
      (worker-heartbeat!
        [this storm-id node port info]
        (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
      ;; 删除进程心跳信息
      (remove-worker-heartbeat!
        [this storm-id node port]
        (delete-node cluster-state (workerbeat-path storm-id node port)))
      ;; 创建指定storm-id的topology的用于存放心跳信息的节点
      (setup-heartbeats!
        [this storm-id]
        (mkdirs cluster-state (workerbeat-storm-root storm-id)))
      ;; 删除指定storm-id的topology的心跳信息节点
      (teardown-heartbeats!
        [this storm-id]
        (try-cause
          (delete-node cluster-state (workerbeat-storm-root storm-id))
          (catch KeeperException e
            (log-warn-error e "Could not teardown heartbeats for " storm-id))))
      ;; 删除指定storm-id的topology的错误信息节点
      (teardown-topology-errors!
        [this storm-id]
        (try-cause
          (delete-node cluster-state (error-storm-root storm-id))
          (catch KeeperException e
            (log-warn-error e "Could not teardown errors for " storm-id))))
      ;; 创建临时节点存放supervisor的心跳信息
      (supervisor-heartbeat!
        [this supervisor-id info]
        (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
      ;; 创建/storms/{storm-id}节点
      (activate-storm!
        [this storm-id storm-base]
        (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
      ;; 更新topology对应的StormBase对象,即更新/storm/{storm-id}节点
      (update-storm!
        [this storm-id new-elems]
        ;; base绑定storm-id在zookeeper上的StormBase对象
        (let [base (storm-base this storm-id nil)
              ;; executors绑定component名称->组件并行度的map
              executors (:component->executors base)
              ;; new-elems绑定合并后的组件并行度map,update函数将组件新并行度map合并到旧map中
              new-elems (update new-elems :component->executors (partial merge executors))]
          ;; 更新StormBase对象中的组件并行度map,并写入zookeeper的/storms/{storm-id}节点
          (set-data cluster-state (storm-path storm-id)
                    (-> base
                        (merge new-elems)
                        Utils/serialize))))
      ;; 获取storm-id的StormBase对象,即读取/storms/{storm-id}节点数据,如果callback不为空,将其赋值给storm-base-callback,并为/storms/{storm-id}节点添加"数据观察"
      (storm-base
        [this storm-id callback]
        (when callback
          (swap! storm-base-callback assoc storm-id callback))
        (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
      ;; 删除storm-id的StormBase对象,即删除/storms/{storm-id}节点
      (remove-storm-base!
        [this storm-id]
        (delete-node cluster-state (storm-path storm-id)))
      ;; 更新storm-id的分配信息,即更新/assignments/{storm-id}节点数据
      (set-assignment!
        [this storm-id info]
        (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
      ;; 删除storm-id的分配信息,同时删除其StormBase信息,即删除/assignments/{storm-id}节点和/storms/{storm-id}节点
      (remove-storm!
        [this storm-id]
        (delete-node cluster-state (assignment-path storm-id))
        (remove-storm-base! this storm-id))
      ;; 将组件异常信息写入zookeeper
      (report-error
        [this storm-id component-id node port error]
        ;; path绑定"/errors/{storm-id}/{component-id}"
        (let [path (error-path storm-id component-id)
              ;; data绑定异常信息,包括异常时间、异常堆栈信息、主机和端口
              data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
              ;; 创建/errors/{storm-id}/{component-id}节点
              _ (mkdirs cluster-state path)
              ;; 创建/errors/{storm-id}/{component-id}的子顺序节点,并写入异常信息
              _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
              ;; to-kill绑定除去顺序节点编号最大的前10个节点的剩余节点的集合
              to-kill (->> (get-children cluster-state path false)
                           (sort-by parse-error-path)
                           reverse
                           (drop 10))]
          ;; 删除to-kill中包含的节点
          (doseq [k to-kill]
            (delete-node cluster-state (str path "/" k)))))
      ;; 得到给定的storm-id component-id下的异常信息
      (errors
        [this storm-id component-id]
        (let [path (error-path storm-id component-id)
              _ (mkdirs cluster-state path)
              children (get-children cluster-state path false)
              errors (dofor [c children]
                            (let [data (-> (get-data cluster-state (str path "/" c) false)
                                           maybe-deserialize)]
                              (when data
                                (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
                                )))
              ]
          (->> (filter not-nil? errors)
               (sort-by (comp - :time-secs)))))
      ;; 关闭连接,在关闭连接前,将回调函数从cluster-state的callbacks中删除
      (disconnect
        [this]
        (unregister cluster-state state-id)
        (when solo?
          (close cluster-state))))))  zookeeper.clj中mk-client函数定义如下:
mk-client函数创建一个CuratorFramework实例,为该实例注册了CuratorListener,当一个后台操作完成或者指定的watch被触发时将会执行CuratorListener中的eventReceived()。eventReceived中调用的wacher函数就是mk-distributed-cluster-state中:watcher绑定的函数。
mk-client函数

(defnk mk-client
  [conf servers port
   :root ""
   :watcher default-watcher
   :auth-conf nil]
  (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
    (.. fk
        (getCuratorListenable)
        (addListener
          (reify CuratorListener
            (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
                   (when (= (.getType e) CuratorEventType/WATCHED)
                     (let [^WatchedEvent event (.getWatchedEvent e)]
                       (watcher (zk-keeper-states (.getState event))
                                (zk-event-types (.getType event))
                                (.getPath event))))))))
    (.start fk)
    fk))  以上就是storm与zookeeper进行交互的源码分析,我觉得最重要的部分就是如何给zk client添加"wacher",storm的很多功能都是通过zookeeper的wacher机制实现的,如"分配信息领取"。添加"wacher"大概分为以下几个步骤:

  •   mk-distributed-cluster-state函数创建了一个zk client,并通过:watcher给该zk client指定了"wacher"函数,这个"wacher"函数只是简单调用ClusterState的callbacks集合中的函数,这样这个"wacher"函数执行哪些函数将由ClusterState实例决定
  • ClusterState实例提供register函数来更新callbacks集合,ClusterState实例被传递给了mk-storm-cluster-state函数,在mk-storm-cluster-state中调用register添加了一个函数(fn [type path] ... ),这个函数实现了"watcher"函数的全部逻辑
  •   mk-storm-cluster-state中注册的函数执行的具体内容由StormClusterState实例决定,对zookeeper节点添加"观察"也是通过StormClusterState实例实现的,这样我们就可以通过StormClusterState实例对我们感兴趣的节点添加"观察"和"回调函数",当节点或节点数据发生变化后,zk server就会给zk client发送"通知",zk client中的"wather"函数将被调用,进而我们注册的"回到函数"将被执行。
  这部分源码与zookeeper联系十分紧密,涉及了很多zookeeper中的概念和特性,如"数据观察"和"节点观察"等,有关zookeeper的wacher机制请参考
http://www.cnblogs.com/ggjucheng/p/3369946.html
http://www.cnblogs.com/zhangchaoyang/articles/3813217.html
storm并没有直接使用zookeeper的api,而是使用Curator框架,Curator框架简化了访问zookeeper的操作。关于Curator框架请参考
http://f.dataguru.cn/thread-120125-1-1.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-109840-1-1.html 上篇帖子: 一键安装zookeeper脚本制作篇 相关经验浅谈 下篇帖子: Thrift 个人实战--RPC服务的发布订阅实现(基于Zookeeper服务)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表