대용량 데이터 처리를 위한 기술 활용 (Kafka, ES)

대용량 데이터 처리 및 Kafka + Bulk 활용 

 데이터 처리 흐름 

 

 

 DB -> Kafka -> ElasticSearch 순서로 데이터가 전달됨 

 

 

 Kafka를 bucket 단위로 나누어 데이터 전송 하여 효율적인 처리 

 

 

 ElasticSearch에는 Bulk API를 활용하여 대량의 데이터를 일괄 삽입 

 

 

 주요 개선 사항 

 

 

 Kafka를 Bucket 단위로 나눠서 전송 → 데이터 스트리밍 성능 최적화 

 

 

 ElasticSearch에 Bulk 처리 적용 → 인덱싱 성능 향상 및 네트워크 부하 감소 

 

 

 대용량 데이터(150,000개 이상)를 효율적으로 처리 하여 빠른 데이터 동기화 가능 

 

 

 Kafka Bucket 처리 예시 

 qryCnt := models.QryCount(y.Db, cntStr)

	fmt.Println("qryCnt:", qryCnt)

	bucketCnt := 0

	if qryCnt == 0 {

		bucketCnt = 0

	} else {

		bucketCnt = 1 + (int(qryCnt) / p.Limit)

	}

 fmt.Println("bucketCnt:", bucketCnt)

	for i := 0; i < bucketCnt; i++ {

		// for i := 0; i < 3; i++ {

		p.Offset = p.Limit * i

		sqlStr, _, _ := models_join.ListType1PlainQryStrGet(y, lt, p, q, "/list")

		if err := t.ProduceaBucketToKafka(y, "kkk", headers, sqlStr, topic); err != nil {

			return errors.New(e.PageQryErr("vrt452", ": "+err.Error()))

		}

	} 

 👉 Bucket을 활용하여 병렬 처리 가능! 

 ElasticSearch Bulk API 예시 

 	for _, row := range v.Page {

		row.ClientCode = clientCode

		fmt.Printf("BuyerId: %d, LastSorderDate: %s, ClientCode : %s\n", row.Id, row.LastSorderDate, row.ClientCode)

		row.LastVistDate = row.LastSorderDate

		// Meta 데이터 작성

		meta := fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }%s`, esIndex, row.Id, "\n")

		bulkRequest.WriteString(meta)

		// 문서 데이터 작성

		data, _ := json.Marshal(row)

		bulkRequest.WriteString(string(data) + "\n")

		fmt.Println(string(data)) // 데이터를 출력하여 검증

		fmt.Printf("Prepared document for BuyerId: %d\n", row.Id)

	}

	// Bulk 요청 전송

	res, err := es.Bulk(bytes.NewReader(bulkRequest.Bytes()))

	if err != nil {

		return fmt.Errorf("bulk request failed: %w", err)

	}

	defer res.Body.Close()

	// Elasticsearch 응답 확인

	if res.IsError() {

		log.Printf("Bulk indexing failed: %s", res.String())

		return fmt.Errorf("bulk indexing failed")

	} 

 👉 Bulk API를 활용하면 한 번에 많은 데이터를 삽입 가능! 

 ✅ 이 방식은 대량의 데이터를 효율적으로 처리하고 성능을 최적화할 수 있도록 설계됨.