[[410477]] 【51CTO.com クイック翻訳】はじめにMySQL は、データを正規化された方法で表形式で保存する RDBMS プラットフォームですが、MongoDB は、コレクションにグループ化されたドキュメントとしてスキーマレスな方法で情報を保存する NoSQL データベースです。データの表現方法が完全に異なるため、MySQL テーブル データを MongoDB コレクションに移行するのは困難な作業のように思えるかもしれません。しかし、Python は強力な接続性とデータ処理機能を備えているため、これを簡単に実現できます。 この記事では、簡単な Python スクリプトを使用して MySQL テーブル データを MongoDB コレクションに移行するために必要な手順を詳しく説明します。スクリプトは Windows 上の Python 3.9.5 を使用して開発されました。ただし、どのプラットフォームでも Python 3 以降のバージョンで動作するはずです。 ステップ1: 必要なモジュールをインストールする 最初のステップは、MySQL および MongoDB データベース インスタンスに接続するために必要なモジュールをインストールすることです。 MySQL データベースに接続するには、 mysql.connectorを使用します。 MongoDB の場合は、Python から MongoDB に接続するための推奨モジュールであるpymongo を使用します。 必要なモジュールがまだインストールされていない場合は、次の PIP コマンドを実行してインストールします。 - pip mysqlコネクタをインストール pip pymongoをインストール
PIP は、Python パッケージまたはモジュール用のパッケージ マネージャーです。 ステップ2: MySQLテーブルからデータを読み取る 最初のステップは、ソース MySQL テーブルからデータを読み取り、ターゲット MongoDB データベースにデータをロードするために使用できる形式で準備することです。 MongoDB はデータを JSON ドキュメントとして保存する NoSQL データベースなので、ソース データを JSON 形式で生成するのが最適です。 Python には強力なデータ処理機能があり、データを JSON 形式に簡単に変換できることは注目に値します。 - mysql.connectorをインポートする
- mysqldb = mysql.connector.connect( ホスト = "localhost" 、データベース = "employees" 、ユーザー = "root" 、パスワード = "" )
- mycursor = mysqldb.cursor(dictionary= True ) mycursor.execute( "SELECT * from Categories;" ) myresult = mycursor.fetchall()
- 印刷(myresult)
スクリプトがエラーなしで完了すると、出力は次のようになります。 - [
- {
- 「id」 : 4 ,
- 「名前」 : 「薬」 、
- 「説明」 : 「<p>医薬品<br></p>」 、
- "作成日時" : "" ,
- 「更新日時」 : 「」
- },
- {
- 「id」 : 6 ,
- 「名前」 : 「食べ物」 、
- 「説明」 : 「<p>食品</p>」 、
- "作成日時" : "" ,
- 「更新日時」 : 「」
- },
- {
- 「id」 : 8 、
- 「名前」 : 「食料品」 、
- 「説明」 : 「<p>食料品<br></p>」 、
- "作成日時" : "" ,
- 「更新日時」 : 「」
- },
- {
- 「id」 : 9 、
- 「名前」 : 「ケーキ&焼き菓子」 、
- 「説明」 : 「<p>ケーキと焼き菓子<br></p>」 、
- 「作成日時」 :d 「」 、
- 「更新日時」 : 「」
- }
- ]
dictionary=Trueパラメータをカーソルに渡したため、出力は JSON 配列になることに注意してください。それ以外の場合、結果はリスト形式になります。ソース データが JSON 形式で取得できたので、それを MongoDB コレクションに移行できます。 ステップ3: MongoDBコレクションに書き込む JSON 形式でソース データを取得したら、次のステップではそのデータを MongoDB コレクションに挿入します。コレクションはドキュメントのグループであり、RDBMS のテーブル (またはリレーション) に相当します。これは、コレクション クラスのinsert_many()メソッドを呼び出すことによって実行できます。このメソッドは、挿入されたドキュメントのオブジェクト ID のリストを返します。このメソッドは、空のリストが引数として渡されると例外をスローするため、メソッド呼び出しの前に長さのチェックが行われることに注意してください。 - pymongoをインポートする
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_db名]
- mycol = mydb[ "カテゴリ" ]
- len(myresult) > 0 の場合:
- x = mycol.insert_many(myresult)
- 印刷(len(x.inserted_ids))
この手順を完了したら、MongoDB インスタンスをチェックして、データベースとコレクションが作成され、ドキュメントが挿入されたことを確認します。 MongoDB はスキーマレスであるため、ドキュメントを挿入するためにスキーマを定義する必要はなく、スキーマは動的に推測され、自動的に作成されることに注意してください。 MongoDB は、コード内で参照されるデータベースやコレクションがまだ存在しない場合は、それらを作成することもできます。 ステップ4: データをまとめる 以下は、MySQL からテーブルを読み取り、それを MongoDB のコレクションに挿入する完全なスクリプトです。 - mysql.connectorをインポートする
- pymongoをインポートする
- 既存のドキュメントを削除する = True
- mysql_host = "ローカルホスト"
- mysql_database= "mydatabase"
- mysql_schema = "myschema"
- mysql_user = "myuser"
- mysql_password= "********"
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- mysqldb = mysql.connector.connect(
- ホスト=mysql_host、
- データベース=mysql_database、
- ユーザー=mysql_user、
- パスワード=mysql_password
- )
- mycursor = mysqldb.cursor(辞書 = True )
- mycursor.execute( "カテゴリから * を選択;" )
- myresult = mycursor.fetchall()
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_db名]
- mycol = mydb[ "カテゴリ" ]
- len(myresult) > 0 の場合:
- x = mycol.insert_many(myresult)
- 印刷(len(x.inserted_ids))
ステップ5: MySQLスキーマ内のすべてのテーブルをロードするようにスクリプトを強化する このスクリプトは、MySQL からテーブルを読み取り、その結果を MongoDB コレクションにロードします。次のステップでは、ソース データベース内のすべてのテーブルのリストを反復処理し、結果を新しい MySQL コレクションにロードします。これは、特定のスキーマ内のテーブルのリストを提供する information_schema.tables メタデータ テーブルをクエリすることで実行できます。その後、結果を反復処理し、上記のスクリプトを呼び出して各テーブルのデータを移行できます。 -
- table_list_cursor = mysqldb.cursor()
- table_list_cursor.execute( "SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name;" , (mysql_schema,))
- テーブル = table_list_cursor.fetchall()
- テーブル内のテーブルの場合:
-
これは、移行ロジックを関数に抽象化することによっても実現できます。 -
- def migrate_table(db, col_name):
- mycursor = db.cursor(辞書 = True )
- mycursor.execute( "SELECT * FROM " + col_name + ";" )
- myresult = mycursor.fetchall()
- mycol = mydb[列名]
- 既存のドキュメントを削除する場合:
-
- mycol.delete_many({})
-
- len(myresult) > 0 の場合:
- x = mycol.insert_many(myresult)
- len(x.inserted_ids)を返す
- それ以外:
- 0を返す
ステップ6: スクリプトの進行状況を出力して読みやすくする スクリプトの進行状況は、print ステートメントを使用して伝達されます。色分けにより出力を読みやすくします。たとえば、成功したステートメントを緑色で印刷し、失敗したステートメントを赤色で印刷します。 - クラスbcolors:
- ヘッダー = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- 警告 = '\033[93m'
- 失敗 = '\033[91m'
- ENDC = '\033[0m'
- 太字 = '\033[1m'
- 下線 = '\033[4m'
- print (f "{bcolors.HEADER}これはヘッダーです{bcolors.ENDC}" )
- print (f "{bcolors.OKBLUE}これは青色で印刷されます{bcolors.ENDC}" )
- print (f "{bcolors.OKGREEN}このメッセージは緑です{bcolors.ENDC}" )
最終結果- ソース MySQL データベース
- 移行後のターゲット MongoDB データベース
- VSCode での Python スクリプトと出力
完全なスクリプト- mysql.connectorをインポートする
- pymongoをインポートする
- 日時をインポート
- クラスbcolors:
- ヘッダー = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- 警告 = '\033[93m'
- 失敗 = '\033[91m'
- ENDC = '\033[0m'
- 太字 = '\033[1m'
- 下線 = '\033[4m'
- begin_time = datetime.datetime.now()
- print (f "{bcolors.HEADER}スクリプトは{begin_time} {bcolors.ENDC}に開始されました" )
- 既存のドキュメントを削除します。
- mysql_host = "ローカルホスト"
- mysql_database= "mydatabase"
- mysql_schema = "myschema"
- mysql_user = "ルート"
- mysql_password=""
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- print (f "{bcolors.HEADER}データベース接続を初期化しています...{bcolors.ENDC}" )
- 印刷("")
-
- print (f "{bcolors.HEADER}MySQL サーバーに接続しています...{bcolors.ENDC}" )
- mysqldb = mysql.connector.connect(
- ホスト=mysql_host、
- データベース=mysql_database、
- ユーザー=mysql_user、
- パスワード=mysql_password
- )
- print (f "{bcolors.HEADER}MySQL サーバーへの接続に成功しました。{bcolors.ENDC}" )
-
- print (f "{bcolors.HEADER}MongoDB サーバーに接続しています...{bcolors.ENDC}" )
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_db名]
- print (f "{bcolors.HEADER}MongoDB サーバーへの接続に成功しました。{bcolors.ENDC}" )
- print (f "{bcolors.HEADER}データベース接続が正常に初期化されました。{bcolors.ENDC}" )
-
- print (f "{bcolors.HEADER}移行が開始されました...{bcolors.ENDC}" )
- dblist = myclient.list_database_names()
- dblistにmongodb_dbnameがある場合:
- print (f "{bcolors.OKBLUE}データベースが存在します。{bcolors.ENDC}" )
- それ以外:
- print (f "{bcolors.WARNING}データベースが存在しません。作成中です。{bcolors.ENDC}" )
-
- def migrate_table(db, col_name):
- mycursor = db.cursor(辞書 = True )
- mycursor.execute( "SELECT * FROM " + col_name + ";" )
- myresult = mycursor.fetchall()
- mycol = mydb[列名]
- 既存のドキュメントを削除する場合:
-
- mycol.delete_many({})
-
- len(myresult) > 0 の場合:
- x = mycol.insert_many(myresult)
- len(x.inserted_ids)を返す
- それ以外:
- 0を返す
- table_list_cursor = mysqldb.cursor()
- table_list_cursor.execute( "SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name LIMIT 15;" , (mysql_schema,))
- テーブル = table_list_cursor.fetchall()
- total_count = len(テーブル数)
- 成功回数 = 0
- 失敗回数 = 0
- テーブル内のテーブルの場合:
- 試す:
- print (f "{bcolors.OKCYAN}処理テーブル: {table[0]}...{bcolors.ENDC}" )
- 挿入された数 = migrate_table(mysqldb、テーブル[ 0 ])
- print (f "{bcolors.OKGREEN}テーブル {table[0]} の処理が完了しました。{inserted_count} 個のドキュメントが挿入されました。{bcolors.ENDC}" )
- 成功回数 += 1
- except例外を e として:
- print (f "{bcolors.FAIL} {e} {bcolors.ENDC}" )
- 失敗回数 += 1
- 印刷("")
- print ( "移行が完了しました。" )
- print (f "{bcolors.OKGREEN}{success_count} / {total_count} 個のテーブルが正常に移行されました。{bcolors.ENDC}" )
- fail_count > 0の場合:
- print (f "{bcolors.FAIL}{fail_count} テーブルの移行に失敗しました。上記のエラーを参照してください。{bcolors.ENDC}" )
- 終了時刻 = datetime.datetime.now()
- print (f "{bcolors.HEADER}スクリプトは次の時刻に完了しました: {end_time} {bcolors.ENDC}" )
- print (f "{bcolors.HEADER}合計実行時間: {end_time-begin_time} {bcolors.ENDC}" )
警告するこのスクリプトは、数百のテーブルとテーブルあたり数千の行を持つ小規模から中規模の MySQL データベースに適しています。数百万行の大規模なデータベースの場合、パフォーマンスに影響が出る可能性があります。実際の移行を開始する前に、テーブル リスト クエリと実際のテーブル選択クエリの両方で LIMIT キーワードを使用して、制限された行を検出します。 ダウンロードスクリプト全体は、GitHub からダウンロードできます。 https://github.com/zshameel/MySQL2MongoDB [51CTOによる翻訳。パートナーサイトに転載する場合は、元の翻訳者と出典を51CTO.comとして明記してください。 |