DUICUO

K8S - Exec Websocketインターフェースを使用してPod間のファイルコピーを実装する

[[380142]]

必要

次のような問題を通常はどうやって解決するのでしょうか?

新しいポッドを作成し、別のポッドからこの新しいポッドにファイルをコピーして分析を行いたいのですが、どうすればよいでしょうか?

`kubectl cp` を使用してファイルをコピーするのと同様に、プロジェクト内の Pod 間でファイルのコピーを実装するにはどうすればよいですか?

新しい Pod はインスタンス Pod と PVC を共有する必要がありますか? それとも、認証コンテキストを使用してコマンドラインを実行する kubectl インスタンスでラップする必要がありますか?

導入

この記事では、Kubernetes の exec websocket インターフェースを tar と組み合わせて使用​​し、ファイル ツリーをデータ ストリームに圧縮してから解凍して復元することで、2 つの Pod 間でファイルをコピーする方法を説明します。

exec インターフェースの詳細については、https://www.cnblogs.com/a00ium/p/10905279.html を参照してください。

画像をご覧ください。

プロセスの説明

  • まず、コルーチン間のシグナル通知のためにシグナルチャネルが初期化されます。シグナルを受信したコルーチンは、一時停止、ループの終了、チャネルのクローズなどの操作を実行します。
  • 初期化プロセスでは、バイト配列 `[]byte` であるデータチャネル `srcStdOutCh` を使用します。このチャネルは、ソース Pod の標準出力をチャネルに入力するために使用され、宛先 Pod の標準入力に送信されるデータは、このデータチャネルから読み取られます。
  • exec インターフェースのアクセス アドレス (クラスター接続、トークン)、tar 圧縮コマンド、標準入出力、tty、ポッド名、コンテナ名、その他のパラメータを連結します。`tar czf - /var/log/xxx.log` は、ファイル ツリー構造がデータ ストリームに圧縮されることを示します。
  • WebSocket の Dialer メソッドが呼び出され、ソース Pod コンテナとの WebSocket 接続が確立され、goroutine が開始されて標準出力がデータ チャネル srcStdOutCh に書き込まれます。
  • ソース pod exec インターフェイスを参照して、宛先 pod exec アクセス接続を構築します。`tar xzf - -C /tmp` は、標準入力からデータ ストリームを読み取り、それをファイル ツリー構造に解凍することを示します (注: 解凍されたファイル ディレクトリ ツリー構造が含まれます)。
  • 宛先PodとのWSS接続を確立し、データチャネルsrcStdOutChからソースPodの標準出力を読み取り、宛先Podの標準入力に書き込むgoroutineを起動します。データチャネルからの読み取りがタイムアウトした場合、データ転送が完了したことを意味します。この時点で、宛先コンテナへのデータ入力を停止し、メインgoroutineに終了通知を送信して、ソースPodとのWSS接続を閉じます。

予防

  • wesocketがソースPodに接続すると、標準出力に空のデータや、tarコマンドの出力など、他の干渉データが含まれる場合があります。そのため、データフィルタリングのためにデータを受信する際には、フィルターコールバック関数を渡す必要があります。
  • 宛先コンテナにデータを送信する際、送信元コンテナが受信した最初のバイトを削除する必要があります。このバイトは通常1で、標準出力フラグを表します。宛先コンテナにデータを送信する際には不要です。
  • データを送信するときは、標準入力に送信されていることを示すために、最初のバイトを 0 に設定する必要があります。

参照コード

cp.go

  1. /*
  2. 概要: 1. バッファリングされていないチャネルでは、書き込み前に読み取りが必要です。2. WebSocket の ReadMessage メソッドはブロックされています。読み取りを中断するには、接続を閉じてエラーをキャッチします。
  3. /
  4. パッケージ cpFilePod2Pod
  5.  
  6. 輸入 (
  7. 「暗号/TLS」  
  8. 「エラー」  
  9. 「fmt」  
  10. "ログ"  
  11. 「ネット/URL」  
  12. 「正規表現 
  13. 「文字列」  
  14. 「同期」  
  15. "時間"  
  16.  
  17. 「github.com/gorilla/websocket」  
  18.  
  19. // フィルタコールバック関数を定義する
  20. type filterCallback func(入力文字列) bool
  21.  
  22. // ミューテックスロック付きのWebSocket接続オブジェクト
  23. WsConn構造体型{
  24. Conn *websocket.Conn
  25. mu sync.Mutex
  26. }
  27.  
  28. // 文字列を送信し、自動的に改行文字を追加します
  29. func (self *WsConn) Send(送信者文字列, str文字列) {
  30. self.mu.Lock()
  31. self.mu.Unlock() を延期する
  32. Kubernetes exec websocket インターフェースを使用してデータを送信する場合、データが標準入力に送信されていることを示すために、最初のバイトを 0 に設定する必要があります。
  33. データ := []バイト{0}
  34. データ = append(data, []byte(str+ "\n" )...)
  35. err := self.Conn.WriteMessage(websocket.BinaryMessage, data) // バイナリデータ型を送信
  36. err != nil の場合 {
  37. log.Printf( "送信エラー、%s" , err.Error())
  38. }
  39. log.Printf( "%s, データ:%s, バイ​​ト数:%+v" , 送信者, str, []byte(str+ "\n" ))
  40. }
  41.  
  42. // 改行文字を追加せずに文字列を送信します。内部のバイト フィルタリングなどの操作が実行されます。
  43. func (self *WsConn) SendWithFilter(送信者文字列, str文字列) {
  44. self.mu.Lock()
  45. self.mu.Unlock() を延期する
  46. // log.Printf( "宛先コンテナにデータを送信しました:%s" , str)
  47. str = strings.ReplaceAll(str, "\r\n" , "\n" ) // /r=13, /n=10、Windows の改行文字を Linux の改行文字に変換します
  48. //最初のバイト (標準出力 1、バイト: [0 1 ...]) を削除します。ソース コンテナからのバイト出力の最初のビットは標準出力 1 を識別するため、このフラグは宛先コンテナにバイトを送信するときに削除する必要があります。
  49. //WebSocket 接続が確立されると、データを送信するときはバイト バッファーの最初のバイトを stdin (buf[0] = 0) に設定する必要があり、データを受信するときは最初のバイトをチェックして、stdout (buf[0] = 1) または stderr (buf[0] = 2) のいずれかを使用する必要があります。
  50. strByte := append([]byte(str)[:0], []byte(str)[1:]...)
  51. データ := []バイト{0}
  52. データ = append(データ、strByte...)
  53. エラー:= self.Conn.WriteMessage(websocket.BinaryMessage, データ)
  54. log.Printf( "宛先コンテナの標準入力にデータを送信しました:\n%s、バイト数:%d、バイト数:%+v" 、文字列(データ)、長さ(データ)、データ)
  55. err != nil の場合 {
  56. log.Printf( "送信エラー、%s" , err.Error())
  57. }
  58. }
  59.  
  60. // 接続からデータ ストリームを取得し、バイト配列チャネルに書き込み、内部でフィルター (コールバック関数) を実行します。
  61. func (self *WsConn) Receive(receiver string, ch chan []byte, filter filterCallback) error {
  62. self.mu.Lock()
  63. self.mu.Unlock() を延期する
  64. msgType, msgByte, err := self.Conn.ReadMessage() // ブロッキング読み取り、タイプ 2 はバイナリデータ、1 はテキスト、-1 は接続が閉じられていることを示します: websocket: close 1000 (正常)
  65. log.Printf( "%s, 読み取ったデータ:%s, タイプ:%d, バイト数:%d, バイト数:%+v" , レシーバー, string(msgByte), msgType, len(msgByte), msgByte)
  66. err != nil の場合 {
  67. log.Printf( "%s, 読み取りエラー、%s" , レシーバー, err.Error())
  68. エラーを返す
  69. }
  70. フィルター(文字列(msgByte)) && len(msgByte) > 1 の場合 {
  71. ch <- メッセージバイト
  72. }それ以外{
  73. log.Printf( "%s, データが満たされていないため、データを直接破棄します。文字:%s、バイト数:%d、バイト数:%v" , レシーバー, string(msgByte), len(msgByte), msgByte)
  74. }
  75. nilを返す
  76. }
  77.  
  78. func NewWsConn(ホスト文字列、パス文字列、パラメータマップ[文字列]文字列、ヘッダーマップ[文字列][]文字列) (*websocket.Conn, error) {
  79. パラメータ配列:=[]文字列{}
  80. k , v := 範囲パラメータ {
  81. paramArray = append(paramArray, fmt.Sprintf( "%s=%s" , k, v))
  82. }
  83. u := url.URL{スキーム: "wss" , ホスト: host, パス: path, RawQuery: strings. Join (paramArray, "&" )}
  84. log.Printf( "API:%s" , u.String())
  85. ダイヤラー:= websocket.Dialer{TLSClientConfig: &tls.Config{RootCAs: nil, InsecureSkipVerify: true }}
  86. conn, _, err := dialer.Dial(u.String(), ヘッダー)
  87. err != nil の場合 {
  88. nilを返します。errors.New(fmt.Sprintf( "接続エラー:%s" , err.Error()))
  89. }
  90. conn、nilを返す
  91. }
  92.  
  93. //コア: tar -cf - フォルダー構造を持つデータをデータ ストリームに変換し、tar -xf - データ ストリームを Linux ファイル システムに変換します。
  94. CpPod2Pod()関数 {
  95. // メイン関数に終了できることを通知するシグナルチャネル
  96. signalExit := make(chan bool, 1)
  97. 遅延クローズ(signalExit)
  98.  
  99. //ターゲットコンテナにデータを送信しないように信号を送信します
  100. signalStopDstSend := make(chan bool, 1)
  101. 遅延クローズ(signalStopDstSend)
  102.  
  103. // ソースコンテナからのデータの読み取りを防止するためのシグナルを送信します
  104. signalStopSrcRead := make(chan bool, 1)
  105. 遅延クローズ(signalStopSrcRead)
  106.  
  107. //宛先コンテナからデータを読み取らないように信号を送信します
  108. signalStopDstRead := make(chan bool, 1)
  109. 遅延クローズ(signalStopDstRead)
  110.  
  111. // 印刷しない出力データを宛先コンテナーに送信します。
  112. signalStopPrintDstStdout := make(chan bool, 1)
  113. 遅延クローズ(signalStopPrintDstStdout)
  114.  
  115. //ポッドに接続
  116. ホスト := "172.16.xxx.xxx:6443"  
  117. トークン := "xxx"  
  118. headers := map[string][]string{ "authorization" : {fmt.Sprintf( "Bearer %s" , token)}}
  119.  
  120. パスソース:= "/api/v1/namespaces/xxx/pods/xxx/exec"  
  121. commandSrc := "tar&command=czf&command=-&command=/var/log/mysql/slow.log" //tar czf - ソースファイル
  122. paraSrc := map[string]string{ "stdout" : "1" , "stdin" : "0" , "stderr" : "1" , "tty" : "0" , "container" : "xxx" , "command" : commandSrc}
  123. srcConn, err := NewWsConn(ホスト, pathSrc, paraSrc, ヘッダー)
  124. err != nil の場合 {
  125. log.Printf( "ソースポッド接続エラー、%s" 、 err.Error())
  126. }
  127.  
  128. パス宛先:= "/api/v1/namespaces/xxx/pods/xxx/exec"  
  129. commandDst := "tar&command=xzf&command=-&command=-C&command=/tmp" // tar xzf - -C /tmp
  130. // paraDst := map[string]string{ "stdout" : "1" , "stdin" : "1" , "stderr" : "1" , "tty" : "0" , "container" : "xxx" , "command" : commandDst}
  131. paraDst := map[string]string{ "stdout" : "0" , "stdin" : "1" , "stderr" : "0" , "tty" : "0" , "container" : "xxx" , "command" : commandDst} //宛先Podの標準出力とエラー出力をオフにする
  132. dstConn, err := NewWsConn(ホスト, pathDst, paraDst, ヘッダー)
  133. err != nil の場合 {
  134. log.Printf( "宛先ポッド接続エラー、%s" 、 err.Error())
  135. }
  136.  
  137. wsSrc := WsConn{
  138. コネチカット州: srcConn、
  139. }
  140.  
  141. wsDst := WsConn{
  142. コネチカット州: dstConn、
  143. }
  144.  
  145. srcConn.Close ()を延期する
  146. dstConn.Close ()を延期する
  147.  
  148. srcStdOutCh := make(chan []byte, 2048)
  149. dstStdOutCh := make(chan []byte)
  150. 遅延クローズ(srcStdOutCh)
  151. 遅延クローズ(dstStdOutCh)
  152.  
  153. // ソースコンテナの標準出力をデータチャネルに受信する
  154. ゴー関数() {
  155. 私 := 1
  156. のために{
  157. log.Printf( "%d回目、ソースコンテナから標準出力を読み取りました" , i)
  158. i++
  159. // ソース コンテナーの標準出力から不要なデータをフィルター処理するための匿名フィルター コールバック メソッドを定義します。
  160. err := wsSrc.Receive( "ソースコンテナ" , srcStdOutCh, func(入力文字列) bool {
  161. 入力 == "cat /var/log/mysql/slow.log"の場合{
  162. 戻る 間違い 
  163. // }それ以外の場合、一致する場合、_ := regexp.MatchString( "root@(.+)#" , input); 一致 {
  164. //戻る 間違い 
  165. // }それ以外の場合、一致する場合、_ := regexp.MatchString( "cat /(.+).log" , input); 一致 {
  166. //戻る 間違い 
  167. // }それ以外の場合、一致する場合、_ := regexp.MatchString( "cat /tmp/(.+)" , input); 一致 {
  168. //戻る 間違い 
  169. }それ以外の場合、一致する場合、_ := regexp.MatchString( "tar: 先頭の(.+)を削除します" , input); 一致 {
  170. 戻る 間違い 
  171. } else if len(input) == 0 { // 空のメッセージをフィルタリングする
  172. // log.Printf( "標準エラー出力が読み取られました" )
  173. 戻る 間違い 
  174. }
  175. 戻る 真実 
  176. })
  177. err != nil の場合 {
  178. log.Printf( "ソースコンテナからの標準出力の読み取りに失敗しました" )
  179. // シグナル終了 <- true  
  180. 壊す
  181. }
  182. // time.Sleep ( time .マイクロ秒 * 100)
  183. }
  184. }()
  185.  
  186. /* 宛先コンテナの標準出力を読み取るために、ここでは並行コルーチンを有効にできないことに注意してください。これを有効にすると、データ送信中のコルーチンとロックの競合が発生し、宛先コンテナへのデータ送信がブロックされる可能性があります。 */
  187. // 宛先コンテナから標準出力を取得し、データチャネルに格納します
  188. // ゴー関数() {
  189. // i := 0
  190. //のために{
  191. // このフィルターは直接trueを返します。これは単なるプレースホルダーです。
  192. // err := wsDst.Receive( "宛先コンテナ" , dstStdOutCh, func(入力文字列) bool {
  193. //戻る 真実 
  194. // })
  195. // err != nil の場合 {
  196. // log.Printf( "宛先コンテナからのデータの読み取りに失敗しました" )
  197. // 壊す
  198. // }
  199. // wsDst.Send()
  200. // time.Sleep ( time .マイクロ秒 * 100000)
  201. // }
  202. // log.Printf( "宛先コンテナからデータを読み取り中、%d 回目の反復" , i)
  203. // i++
  204. // }()
  205.  
  206. // データ チャネルから宛先コンテナの標準出力を読み取り、印刷します。
  207. // ゴー関数() {
  208. // BreakPrintDstPodStdout:
  209. //のために{
  210. //選択{
  211. //ケースデータ := <-dstStdOutCh:
  212. // log.Printf( "宛先コンテナの標準出力:%s" , string(data))
  213. // // time.Sleep ( time .マイクロ秒 * 200)
  214. //ケース<-signalStopPrintDstStdout:
  215. // log.Printf( "シグナルを受信しました。宛先コンテナの標準出力への出力を停止します" )
  216. // //閉じる(dataOutput)
  217. //閉じる(dataCh)
  218. // // signalStopRead <- true  
  219. // log.Printf( "読み取り停止信号を送信しました" )
  220. // //閉じる(dataOutput)
  221. //閉じる(dataCh)
  222. // BreakPrintDstPodStdout を中断する
  223. // }
  224. // // time.Sleep ( time .マイクロ秒 * 100)
  225. // }
  226. // }()
  227.  
  228. // ソース コンテナの標準出力データ チャネルからデータを取得し、それを宛先コンテナの標準入力に送信します。
  229. // タイムアウト期間を定義する
  230. タイムアウト秒数:= 3
  231. タイマー := time .NewTimer( time . Second * time . Duration(timeOutSecond))
  232. ブレイク2メイン:
  233. のために{
  234. 選択{
  235. ケースデータ := <-srcStdOutCh:
  236. wsDst.SendWithFilter( "宛先コンテナに送信する" , string(data))
  237. // time.Sleep ( time .ミリ秒 * 200)
  238. timer.Reset(時間.*時間. Duration(timeOutSecond))
  239. ケース<-timer.C:
  240. // time .Sleep( time . Second * 5)
  241. log.Printf( "================== ソースコンテナの標準出力、新しいデータなし、タイムアウト、宛先コンテナへのデータ送信を停止 ==================" )
  242. // log.Printf( "シグナル送信: 宛先コンテナの標準出力への出力を停止します" )
  243. // signalStopPrintDstStdout <- true  
  244. log.Printf( "シグナル送信: ソースコンテナからのデータの読み取りを停止します" )
  245. wsSrc.Conn.Close ()
  246. // log.Printf( "シグナル送信: 宛先コンテナからのデータの読み取りを停止します" )
  247. // wsDst.Conn.Close ()
  248. log.Printf( "シグナルが送信されました: メイン関数は終了できます。" )
  249. シグナル終了 <- true  
  250. log.Printf( "すべての信号が送信されました" )
  251. log.Printf( "================== ループ終了 ===================" )
  252. ブレークBreak2Main
  253. }
  254. // time.Sleep ( time .マイクロ秒 * 1000)
  255. }
  256.  
  257. // signalStopRead <- true  
  258. <-signalExit // 信号が受信されるまでチャネルをブロックします。
  259. // signalStopRead <- true  
  260. log.Printf( "メイン関数はシグナルを受信し、終了の準備をしています" )。
  261. //閉じる(dataCh)
  262. // time .Sleep(時間.)
  263. //閉じる(データ出力)
  264. // time .Sleep(時間.)
  265. // {}を選択
  266. }

cp_test.go

  1. パッケージ cpFilePod2Pod
  2.  
  3. 輸入 (
  4. "ログ"  
  5. 「テスト」  
  6.  
  7. // Go test -race -test.run TestCpPod2Pod このディレクトリに移動してテストを実行します。
  8. TestCpPod2Pod関数(t *テスト.T) {
  9. log.Printf( "テストを開始しています" )
  10. CpPod2Pod()
  11. }

  1. 参考結果:
  2. ソースコンテナ:
  3. root@xxx-mysql-0:/var/log/mysql# md5sum slow.log
  4. 16577613b6ea957ecb5d9d5e976d9c50 遅い.log
  5. 宛先コンテナ:
  6. root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql# md5sum slow.log
  7. 16577613b6ea957ecb5d9d5e976d9c50 遅い.log

参考資料

Kubernetes exec API 接続分析: https://www.cnblogs.com/a00ium/p/10905279.html

kubernetes-client-go-implementation-kubectl-copy:https://ica10888.com/2019/08/31/kubernetes-client-go-%E5%AE%9E%E7%8E%B0-kubectl-copy.html