대용량 데이터 처리를 위한 기술 활용 (Kafka, ES)
대용량 데이터 처리 및 Kafka + Bulk 활용
데이터 처리 흐름
-
DB -> Kafka -> ElasticSearch 순서로 데이터가 전달됨
-
Kafka를 bucket 단위로 나누어 데이터 전송하여 효율적인 처리
-
ElasticSearch에는 Bulk API를 활용하여 대량의 데이터를 일괄 삽입
주요 개선 사항
-
Kafka를 Bucket 단위로 나눠서 전송 → 데이터 스트리밍 성능 최적화
-
ElasticSearch에 Bulk 처리 적용 → 인덱싱 성능 향상 및 네트워크 부하 감소
-
대용량 데이터(150,000개 이상)를 효율적으로 처리하여 빠른 데이터 동기화 가능
Kafka Bucket 처리 예시
qryCnt := models.QryCount(y.Db, cntStr)
fmt.Println("qryCnt:", qryCnt)
bucketCnt := 0
if qryCnt == 0 {
bucketCnt = 0
} else {
bucketCnt = 1 + (int(qryCnt) / p.Limit)
}
fmt.Println("bucketCnt:", bucketCnt)
for i := 0; i < bucketCnt; i++ {
// for i := 0; i < 3; i++ {
p.Offset = p.Limit * i
sqlStr, _, _ := models_join.ListType1PlainQryStrGet(y, lt, p, q, "/list")
if err := t.ProduceaBucketToKafka(y, "kkk", headers, sqlStr, topic); err != nil {
return errors.New(e.PageQryErr("vrt452", ": "+err.Error()))
}
}
👉 Bucket을 활용하여 병렬 처리 가능!
ElasticSearch Bulk API 예시
for _, row := range v.Page {
row.ClientCode = clientCode
fmt.Printf("BuyerId: %d, LastSorderDate: %s, ClientCode : %s\n", row.Id, row.LastSorderDate, row.ClientCode)
row.LastVistDate = row.LastSorderDate
// Meta 데이터 작성
meta := fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }%s`, esIndex, row.Id, "\n")
bulkRequest.WriteString(meta)
// 문서 데이터 작성
data, _ := json.Marshal(row)
bulkRequest.WriteString(string(data) + "\n")
fmt.Println(string(data)) // 데이터를 출력하여 검증
fmt.Printf("Prepared document for BuyerId: %d\n", row.Id)
}
// Bulk 요청 전송
res, err := es.Bulk(bytes.NewReader(bulkRequest.Bytes()))
if err != nil {
return fmt.Errorf("bulk request failed: %w", err)
}
defer res.Body.Close()
// Elasticsearch 응답 확인
if res.IsError() {
log.Printf("Bulk indexing failed: %s", res.String())
return fmt.Errorf("bulk indexing failed")
}
👉 Bulk API를 활용하면 한 번에 많은 데이터를 삽입 가능!
✅ 이 방식은 대량의 데이터를 효율적으로 처리하고 성능을 최적화할 수 있도록 설계됨.
No Comments