대용량 데이터 처리를 위한 기술 활용 (Kafka, ES)
대용량 데이터 처리 및 Kafka + Bulk 활용
데이터 처리 흐름
- 
DB -> Kafka -> ElasticSearch 순서로 데이터가 전달됨 
- 
Kafka를 bucket 단위로 나누어 데이터 전송하여 효율적인 처리 
- 
ElasticSearch에는 Bulk API를 활용하여 대량의 데이터를 일괄 삽입 
주요 개선 사항
- 
Kafka를 Bucket 단위로 나눠서 전송 → 데이터 스트리밍 성능 최적화 
- 
ElasticSearch에 Bulk 처리 적용 → 인덱싱 성능 향상 및 네트워크 부하 감소 
- 
대용량 데이터(150,000개 이상)를 효율적으로 처리하여 빠른 데이터 동기화 가능 
Kafka Bucket 처리 예시
qryCnt := models.QryCount(y.Db, cntStr)
	fmt.Println("qryCnt:", qryCnt)
	bucketCnt := 0
	if qryCnt == 0 {
		bucketCnt = 0
	} else {
		bucketCnt = 1 + (int(qryCnt) / p.Limit)
	}
    fmt.Println("bucketCnt:", bucketCnt)
	for i := 0; i < bucketCnt; i++ {
		// for i := 0; i < 3; i++ {
		p.Offset = p.Limit * i
		sqlStr, _, _ := models_join.ListType1PlainQryStrGet(y, lt, p, q, "/list")
		if err := t.ProduceaBucketToKafka(y, "kkk", headers, sqlStr, topic); err != nil {
			return errors.New(e.PageQryErr("vrt452", ": "+err.Error()))
		}
	}👉 Bucket을 활용하여 병렬 처리 가능!
ElasticSearch Bulk API 예시
	for _, row := range v.Page {
		row.ClientCode = clientCode
		fmt.Printf("BuyerId: %d, LastSorderDate: %s, ClientCode : %s\n", row.Id, row.LastSorderDate, row.ClientCode)
		row.LastVistDate = row.LastSorderDate
		// Meta 데이터 작성
		meta := fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }%s`, esIndex, row.Id, "\n")
		bulkRequest.WriteString(meta)
		// 문서 데이터 작성
		data, _ := json.Marshal(row)
		bulkRequest.WriteString(string(data) + "\n")
		fmt.Println(string(data)) // 데이터를 출력하여 검증
		fmt.Printf("Prepared document for BuyerId: %d\n", row.Id)
	}
	// Bulk 요청 전송
	res, err := es.Bulk(bytes.NewReader(bulkRequest.Bytes()))
	if err != nil {
		return fmt.Errorf("bulk request failed: %w", err)
	}
	defer res.Body.Close()
	// Elasticsearch 응답 확인
	if res.IsError() {
		log.Printf("Bulk indexing failed: %s", res.String())
		return fmt.Errorf("bulk indexing failed")
	}👉 Bulk API를 활용하면 한 번에 많은 데이터를 삽입 가능!
✅ 이 방식은 대량의 데이터를 효율적으로 처리하고 성능을 최적화할 수 있도록 설계됨.
