# 대용량 데이터 처리를 위한 기술 활용 (Kafka, ES)

## 대용량 데이터 처리 및 Kafka + Bulk 활용

### **데이터 처리 흐름**

1. **DB -&gt; Kafka -&gt; ElasticSearch** 순서로 데이터가 전달됨
2. **Kafka를 bucket 단위로 나누어 데이터 전송**하여 효율적인 처리
3. **ElasticSearch에는 Bulk API를 활용하여 대량의 데이터를 일괄 삽입**

### **주요 개선 사항**

1. **Kafka를 Bucket 단위로 나눠서 전송** → 데이터 스트리밍 성능 최적화
2. **ElasticSearch에 Bulk 처리 적용** → 인덱싱 성능 향상 및 네트워크 부하 감소
3. **대용량 데이터(150,000개 이상)를 효율적으로 처리**하여 빠른 데이터 동기화 가능

### **Kafka Bucket 처리 예시**

```go
qryCnt := models.QryCount(y.Db, cntStr)
	fmt.Println("qryCnt:", qryCnt)

	bucketCnt := 0
	if qryCnt == 0 {
		bucketCnt = 0
	} else {
		bucketCnt = 1 + (int(qryCnt) / p.Limit)
	}

    fmt.Println("bucketCnt:", bucketCnt)
	for i := 0; i < bucketCnt; i++ {
		// for i := 0; i < 3; i++ {
		p.Offset = p.Limit * i
		sqlStr, _, _ := models_join.ListType1PlainQryStrGet(y, lt, p, q, "/list")
		if err := t.ProduceaBucketToKafka(y, "kkk", headers, sqlStr, topic); err != nil {
			return errors.New(e.PageQryErr("vrt452", ": "+err.Error()))
		}
	}
```

👉 **Bucket을 활용하여 병렬 처리 가능!**

### **ElasticSearch Bulk API 예시**

```go
	for _, row := range v.Page {
		row.ClientCode = clientCode
		fmt.Printf("BuyerId: %d, LastSorderDate: %s, ClientCode : %s\n", row.Id, row.LastSorderDate, row.ClientCode)
		row.LastVistDate = row.LastSorderDate
		// Meta 데이터 작성
		meta := fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }%s`, esIndex, row.Id, "\n")
		bulkRequest.WriteString(meta)

		// 문서 데이터 작성
		data, _ := json.Marshal(row)
		bulkRequest.WriteString(string(data) + "\n")
		fmt.Println(string(data)) // 데이터를 출력하여 검증

		fmt.Printf("Prepared document for BuyerId: %d\n", row.Id)
	}

	// Bulk 요청 전송
	res, err := es.Bulk(bytes.NewReader(bulkRequest.Bytes()))
	if err != nil {
		return fmt.Errorf("bulk request failed: %w", err)
	}
	defer res.Body.Close()

	// Elasticsearch 응답 확인
	if res.IsError() {
		log.Printf("Bulk indexing failed: %s", res.String())
		return fmt.Errorf("bulk indexing failed")
	}
```

👉 **Bulk API를 활용하면 한 번에 많은 데이터를 삽입 가능!**

✅ **이 방식은 대량의 데이터를 효율적으로 처리하고 성능을 최적화할 수 있도록 설계됨.**