Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Airflow Mongodb

Airflow Mongodb

안녕하세요
오늘은 Mongodb와 연동하여 데이터 가져오는 파이프라인 구축 관련해서 이야기 해보려고 합니다.

2번 Pymongo를 활용하여 S3에 적재하는 방식이 편합니다.


Airflow Mongodb Atlas 연동하기

1. Apache.airflow.providers 연동

들어가기 앞서 Apache.airflow.providers Library 활용에 대한 아무 설명이 없어서 코드를 분석한 다음에 알맞게 구현하였습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
```sql
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Mongo DB"""
from __future__ import annotations

from ssl import CERT_NONE
from types import TracebackType

import pymongo
from pymongo import MongoClient, ReplaceOne

from airflow.hooks.base import BaseHook

[docs]class MongoHook(BaseHook):
    """
    Interact with Mongo. This hook uses the Mongo conn_id.
    PyMongo Wrapper to Interact With Mongo Database
    Mongo Connection Documentation
    https://docs.mongodb.com/manual/reference/connection-string/index.html
    You can specify connection string options in extra field of your connection
    https://docs.mongodb.com/manual/reference/connection-string/index.html#connection-string-options

    If you want use DNS seedlist, set `srv` to True.

    ex.
        {"srv": true, "replicaSet": "test", "ssl": true, "connectTimeoutMS": 30000}

    :param mongo_conn_id: The :ref:`Mongo connection id <howto/connection:mongo>` to use
        when connecting to MongoDB.
    """

[docs]    conn_name_attr = 'conn_id'

[docs]    default_conn_name = 'mongo_default'

[docs]    conn_type = 'mongo'

[docs]    hook_name = 'MongoDB'

    def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:

        super().__init__()
        self.mongo_conn_id = conn_id
        self.connection = self.get_connection(conn_id)
        self.extras = self.connection.extra_dejson.copy()
        self.client = None

        srv = self.extras.pop('srv', False)
        scheme = 'mongodb+srv' if srv else 'mongodb'

        creds = f'{self.connection.login}:{self.connection.password}@' if self.connection.login else ''
        port = '' if self.connection.port is None else f':{self.connection.port}'
        self.uri = f'{scheme}://{creds}{self.connection.host}{port}/{self.connection.schema}'

[docs]    def __enter__(self):
        return self

[docs]    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        if self.client is not None:
            self.close_conn()

[docs]    def get_conn(self) -> MongoClient:
        """Fetches PyMongo Client"""
        if self.client is not None:
            return self.client

        # Mongo Connection Options dict that is unpacked when passed to MongoClient
        options = self.extras

        # If we are using SSL disable requiring certs from specific hostname
        if options.get('ssl', False):
            options.update({'ssl_cert_reqs': CERT_NONE})

        self.client = MongoClient(self.uri, **options)

        return self.client

[docs]    def close_conn(self) -> None:
        """Closes connection"""
        client = self.client
        if client is not None:
            client.close()
            self.client = None

[docs]    def get_collection(
        self, mongo_collection: str, mongo_db: str | None = None
    ) -> pymongo.collection.Collection:
        """
        Fetches a mongo collection object for querying.

        Uses connection schema as DB unless specified.
        """
        mongo_db = mongo_db if mongo_db is not None else self.connection.schema
        mongo_conn: MongoClient = self.get_conn()

        return mongo_conn.get_database(mongo_db).get_collection(mongo_collection)

[docs]    def aggregate(
        self, mongo_collection: str, aggregate_query: list, mongo_db: str | None = None, **kwargs
    ) -> pymongo.command_cursor.CommandCursor:
        """
        Runs an aggregation pipeline and returns the results
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.aggregate
        https://pymongo.readthedocs.io/en/stable/examples/aggregation.html
        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        return collection.aggregate(aggregate_query, **kwargs)

[docs]    def find(
        self,
        mongo_collection: str,
        query: dict,
        find_one: bool = False,
        mongo_db: str | None = None,
        projection: list | dict | None = None,
        **kwargs,
    ) -> pymongo.cursor.Cursor:
        """
        Runs a mongo find query and returns the results
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find
        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        if find_one:
            return collection.find_one(query, projection, **kwargs)
        else:
            return collection.find(query, projection, **kwargs)

[docs]    def insert_one(
        self, mongo_collection: str, doc: dict, mongo_db: str | None = None, **kwargs
    ) -> pymongo.results.InsertOneResult:
        """
        Inserts a single document into a mongo collection
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_one
        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        return collection.insert_one(doc, **kwargs)

[docs]    def insert_many(
        self, mongo_collection: str, docs: dict, mongo_db: str | None = None, **kwargs
    ) -> pymongo.results.InsertManyResult:
        """
        Inserts many docs into a mongo collection.
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_many
        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        return collection.insert_many(docs, **kwargs)

[docs]    def update_one(
        self,
        mongo_collection: str,
        filter_doc: dict,
        update_doc: dict,
        mongo_db: str | None = None,
        **kwargs,
    ) -> pymongo.results.UpdateResult:
        """
        Updates a single document in a mongo collection.
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.update_one

        :param mongo_collection: The name of the collection to update.
        :param filter_doc: A query that matches the documents to update.
        :param update_doc: The modifications to apply.
        :param mongo_db: The name of the database to use.
            Can be omitted; then the database from the connection string is used.

        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        return collection.update_one(filter_doc, update_doc, **kwargs)

[docs]    def update_many(
        self,
        mongo_collection: str,
        filter_doc: dict,
        update_doc: dict,
        mongo_db: str | None = None,
        **kwargs,
    ) -> pymongo.results.UpdateResult:
        """
        Updates one or more documents in a mongo collection.
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.update_many

        :param mongo_collection: The name of the collection to update.
        :param filter_doc: A query that matches the documents to update.
        :param update_doc: The modifications to apply.
        :param mongo_db: The name of the database to use.
            Can be omitted; then the database from the connection string is used.

        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        return collection.update_many(filter_doc, update_doc, **kwargs)

[docs]    def replace_one(
        self,
        mongo_collection: str,
        doc: dict,
        filter_doc: dict | None = None,
        mongo_db: str | None = None,
        **kwargs,
    ) -> pymongo.results.UpdateResult:
        """
        Replaces a single document in a mongo collection.
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.replace_one

        .. note::
            If no ``filter_doc`` is given, it is assumed that the replacement
            document contain the ``_id`` field which is then used as filters.

        :param mongo_collection: The name of the collection to update.
        :param doc: The new document.
        :param filter_doc: A query that matches the documents to replace.
            Can be omitted; then the _id field from doc will be used.
        :param mongo_db: The name of the database to use.
            Can be omitted; then the database from the connection string is used.
        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        if not filter_doc:
            filter_doc = {'_id': doc['_id']}

        return collection.replace_one(filter_doc, doc, **kwargs)

[docs]    def replace_many(
        self,
        mongo_collection: str,
        docs: list[dict],
        filter_docs: list[dict] | None = None,
        mongo_db: str | None = None,
        upsert: bool = False,
        collation: pymongo.collation.Collation | None = None,
        **kwargs,
    ) -> pymongo.results.BulkWriteResult:
        """
        Replaces many documents in a mongo collection.

        Uses bulk_write with multiple ReplaceOne operations
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write

        .. note::
            If no ``filter_docs``are given, it is assumed that all
            replacement documents contain the ``_id`` field which are then
            used as filters.

        :param mongo_collection: The name of the collection to update.
        :param docs: The new documents.
        :param filter_docs: A list of queries that match the documents to replace.
            Can be omitted; then the _id fields from airflow.docs will be used.
        :param mongo_db: The name of the database to use.
            Can be omitted; then the database from the connection string is used.
        :param upsert: If ``True``, perform an insert if no documents
            match the filters for the replace operation.
        :param collation: An instance of
            :class:`~pymongo.collation.Collation`. This option is only
            supported on MongoDB 3.4 and above.

        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        if not filter_docs:
            filter_docs = [{'_id': doc['_id']} for doc in docs]

        requests = [
            ReplaceOne(filter_docs[i], docs[i], upsert=upsert, collation=collation) for i in range(len(docs))
        ]

        return collection.bulk_write(requests, **kwargs)

[docs]    def delete_one(
        self, mongo_collection: str, filter_doc: dict, mongo_db: str | None = None, **kwargs
    ) -> pymongo.results.DeleteResult:
        """
        Deletes a single document in a mongo collection.
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.delete_one

        :param mongo_collection: The name of the collection to delete from.
        :param filter_doc: A query that matches the document to delete.
        :param mongo_db: The name of the database to use.
            Can be omitted; then the database from the connection string is used.

        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        return collection.delete_one(filter_doc, **kwargs)

[docs]    def delete_many(
        self, mongo_collection: str, filter_doc: dict, mongo_db: str | None = None, **kwargs
    ) -> pymongo.results.DeleteResult:
        """
        Deletes one or more documents in a mongo collection.
        https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.delete_many

        :param mongo_collection: The name of the collection to delete from.
        :param filter_doc: A query that matches the documents to delete.
        :param mongo_db: The name of the database to use.
            Can be omitted; then the database from the connection string is used.

        """
        collection = self.get_collection(mongo_collection, mongo_db=mongo_db)

        return collection.delete_many(filter_doc, **kwargs)
```
  • (삽질 해결) 이때 uri 값이 있는데 이 값으로 실제 Mongodb와 연동되기 때문에 해당 값을 통해 Connection 값을 설정하여 주어야합니다.
  • Airflow Connection type은 Apache.airflow.providers에 라이브러리를 설치해야 추가가 된다.
  • srv는 mongodb atlas(Cloud)와 Local을 구분하는 의미
1.a) Airflow mongodb atlas Connection
  • URL : mongodb+srv://:@atlasinfo/?retryWrites=true&w=majority
  • mongodb+srv://<id>:<pw>@mongodb/?readPreference=secondary&readPreferenceTags=nodeType:READ_ONLY
  • Login = id
  • Password = pw
  • extra = { “srv” : true }
  • host = atlasinfo

https://ifh.cc/g/x3TP0J.png

1.b) serverlet error
  • Connection이 제대로 되지 않아 나타나는 오류
  • 위처럼 제대로 Connection이 설정되면 나타나지 않는다.
    1
    
    error = pymongo.errors.ServerSelectionTimeoutError
    
1.c) MongoHook
  • Hook 활용하여 Find Query
  • Connection 미리 설정해주어야 동작한다.
1
2
3
4
5
6
cursor = MongoHook(info.mongo_conn_id).find(
            mongo_collection=info.mongo_collection,
            query=cast(dict, info.mongo_query),
            mongo_db=info.mongo_db,
            projection=projection(add_key)
        ).skip(i).limit(end)

https://ifh.cc/g/FYbqKT.png

2. Pymongo 활용하기

  • Python에서 지원하여주는 Pymongo Library 활용
  • Client로 Connection
1
2
3
4
5
6
7
8
9
import pymongo

#Mongodb
myclient = pymongo.MongoClient(
    "mongodb+srv://id:pw@atlas.mongodb.net/?retryWrites=true&w=majority")
mydb = myclient[schema]
mycol = mydb[collections]

query = collection.get_query(collections,searching_date)
  • Dataframe -> Parquet -> S3 Load 처리
  • boto3 Library를 활용
  • Session에 S3 정보를 넣어서 처리한다.
    1
    2
    3
    4
    5
    6
    7
    
    wr.s3.to_parquet(
          df=df,
          path=f"s3://{Bucket}/{s3_prefix}/{file_name}",
          boto3_session=session,
          dataset=True,
          mode="append"
      )
    
참고 자료